這篇文章,主要介紹消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)。
目錄
一、防止消息丟失
1.1、消息確認(rèn)機(jī)制(生產(chǎn)者)
(1)生產(chǎn)者丟失消息
(2)生產(chǎn)者消息確認(rèn)機(jī)制
1.2、消息確認(rèn)機(jī)制(消費(fèi)者)
(1)消費(fèi)者丟失消息
(2)消費(fèi)者消息確認(rèn)機(jī)制
1.3、消息持久化(RabbitMQ)
(1)RabbitMQ丟失消息
(2)消息持久化機(jī)制
一、防止消息丟失
RabbitMQ消息隊(duì)列,在使用的時(shí)候,可能會(huì)存在消息丟失的情況,所謂的消息丟失就是生產(chǎn)者發(fā)送的消息沒(méi)辦法被消費(fèi)者正確的消費(fèi),消息隊(duì)列中導(dǎo)致消息丟失的地方有三個(gè),分別是:
- 第一種情況:生產(chǎn)者發(fā)送的消息沒(méi)有正確的發(fā)送到RabbitMQ里面,導(dǎo)致發(fā)送的消息丟失。
- 第二種情況:消費(fèi)者從RabbitMQ消費(fèi)消息時(shí)候,消費(fèi)失敗,但是RabbitMQ認(rèn)為消費(fèi)成功,從而刪除了消息。
- 第三種情況:RabbitMQ中保存的消息還沒(méi)有被消費(fèi)者消費(fèi),此時(shí)RabbitMQ服務(wù)宕機(jī),導(dǎo)致內(nèi)存中的消息丟失。
1.1、消息確認(rèn)機(jī)制(生產(chǎn)者)
(1)生產(chǎn)者丟失消息
生產(chǎn)者丟失消息,是指:當(dāng)生產(chǎn)者發(fā)送消息給RabbitMQ的時(shí)候,此時(shí)消息發(fā)送失敗了,并且生產(chǎn)者又沒(méi)有重新發(fā)送這一條消息,所以這個(gè)時(shí)候,生產(chǎn)者這一條失敗的消息就丟失了。
既然是生產(chǎn)者發(fā)送消息失敗導(dǎo)致這一條消息丟失的,那么我們?cè)谔幚磉@個(gè)丟失消息問(wèn)題的時(shí)候,就可以這樣做:當(dāng)生產(chǎn)者消息發(fā)送失敗之后,可以讓生產(chǎn)者再次發(fā)送這一條消息,這里就有一個(gè)問(wèn)題啦,那就是生產(chǎn)者怎么知道消息有沒(méi)有發(fā)送成功???
RabbitMQ給我們提供了一個(gè)機(jī)制,即:發(fā)布確認(rèn)機(jī)制,大致思想是:當(dāng)生產(chǎn)者將消息發(fā)送到RabbitMQ之后,并且RabbitMQ正確接收到消息并將其放入Queue隊(duì)列里面時(shí),RabbitMQ會(huì)返回一個(gè)ACK標(biāo)識(shí)給生產(chǎn)者,生產(chǎn)者接收到ACK標(biāo)識(shí)就可以認(rèn)為消息發(fā)送成功啦;如果消息接收失敗,RabbitMQ會(huì)返回一個(gè)NACK標(biāo)識(shí),表示接收失敗。
(2)生產(chǎn)者消息確認(rèn)機(jī)制
生產(chǎn)者消息確認(rèn)機(jī)制,上一篇文章已經(jīng)介紹了(【RabbitMQ筆記07】消息隊(duì)列RabbitMQ七種模式之Publisher Confirms發(fā)布確認(rèn)模式),這里就不再重復(fù)。
1.2、消息確認(rèn)機(jī)制(消費(fèi)者)
(1)消費(fèi)者丟失消息
如果生產(chǎn)者已經(jīng)將消息正確的發(fā)送到RabbitMQ里面了,消費(fèi)者從Queue隊(duì)列里面獲取消息消費(fèi)時(shí)候,如果消費(fèi)失敗,那么此時(shí)就會(huì)導(dǎo)致這一條消息丟失,這是因?yàn)?,默認(rèn)情況下,RabbitMQ將消息分發(fā)給消費(fèi)者之后,消費(fèi)者接收到消息時(shí)候,就會(huì)返回一個(gè)ACK標(biāo)識(shí)給消息隊(duì)列RabbitMQ,此時(shí)RabbitMQ就會(huì)將這一條消息從Queue隊(duì)列里面刪除,但是這種情況下,消費(fèi)者是否正確將這條消息消費(fèi)了,RabbitMQ是不知道的,所以這就有可能導(dǎo)致丟失。
如何解決消費(fèi)者丟失消息???
- 既然丟失消息是因?yàn)橄M(fèi)者消費(fèi)失敗,并且RabbitMQ把消息刪除了,那么我們就可以開(kāi)啟手動(dòng)確認(rèn)的方式來(lái)告訴RabbitMQ,消費(fèi)者是否正確的消費(fèi)消息,是否可以將消息從Queue隊(duì)列里面刪除了。
(2)消費(fèi)者消息確認(rèn)機(jī)制
- 消費(fèi)者進(jìn)行消息確認(rèn),需要關(guān)閉自動(dòng)確認(rèn),將【basicConsume()】方法的第二個(gè)參數(shù)設(shè)置為【false】。
- 消息成功消費(fèi)之后,主動(dòng)調(diào)用【basicAck()】方法,返回ACK標(biāo)識(shí)給RabbitMQ。
package com.rabbitmq.demo.dropmsg;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:30
* @Copyright (C) ZhuYouBin
* @Description: 消息消費(fèi)者
*/
public class Consumer {
public static void main(String[] args) {
// 1、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2、設(shè)置連接的 RabbitMQ 服務(wù)地址
factory.setHost("127.0.0.1"); // 默認(rèn)就是本機(jī)
factory.setPort(5672); // 默認(rèn)就是 5672 端口
// 3、獲取連接
Connection connection = null; // 連接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、獲取通道
channel = connection.createChannel();
// 5、聲明 Exchange,如果不存在,則會(huì)創(chuàng)建
String exchangeName = "exchange_dropmsg_2023";
channel.exchangeDeclare(exchangeName, "direct");
// 6、指定需要操作的消息隊(duì)列,如果隊(duì)列不存在,則會(huì)創(chuàng)建
String queueName = "queue_dropmsg_2023";
channel.queueDeclare(queueName, false, false, false, null);
// 7、綁定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
channel.queueBind(queueName, exchangeName, "key_2023");
// 8、消費(fèi)消息
Channel finalChannel = channel;
DeliverCallback callback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
// 接收消息
System.out.println("這是接收的消息:" + new String(delivery.getBody()));
// TODO 消費(fèi)者正確消費(fèi)消息之后,主動(dòng)返回 ACK 標(biāo)識(shí)
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
}
};
// TODO 這第二個(gè)參數(shù)修改為 false,表示消費(fèi)者需要手動(dòng)發(fā)送 ACK 標(biāo)識(shí)給 RabbitMQ(默認(rèn)是true)
channel.basicConsume(queueName, false, callback, i->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3、消息持久化(RabbitMQ)
(1)RabbitMQ丟失消息
上面介紹了兩種丟失消息的情況,分別是生產(chǎn)者和消費(fèi)者丟失消息,還有一種丟失消息的情況,那就是RabbitMQ消息隊(duì)列將消息丟失了。假設(shè),現(xiàn)在存在這一種情況,生產(chǎn)者已經(jīng)正確將消息發(fā)送到RabbitMQ里面,正準(zhǔn)備將消息發(fā)送給消費(fèi)者的時(shí)候,此時(shí)RabbitMQ服務(wù)宕機(jī)了,導(dǎo)致RabbitMQ中的消息丟失了(默認(rèn)情況下,RabbitMQ是將消息保存在內(nèi)存中的),由于內(nèi)存中的數(shù)據(jù)斷電即失,所以這就導(dǎo)致消息丟失情況。
如何解決RabbitMQ出現(xiàn)的消息丟失問(wèn)題呢???
- 既然RabbitMQ是將消息保存在內(nèi)存中的,那么為了避免消息丟失,可以將內(nèi)存中的消息保存到磁盤文件里面,這樣即使RabbitMQ宕機(jī)了,重新啟動(dòng)的時(shí)候也可以從磁盤文件里面讀取消息到內(nèi)存里面。
(2)消息持久化機(jī)制
- 在調(diào)用【queueDeclare()】方法,創(chuàng)建Queue隊(duì)列的時(shí)候,設(shè)置第二個(gè)參數(shù)等于【true】,表示消息允許持久化。
- 生產(chǎn)者調(diào)用【basicPublish()】方法發(fā)送消息的時(shí)候,設(shè)置消息屬性等于【MessageProperties.PERSISTENT_TEXT_PLAIN】,表示文本持久化。
// 第二個(gè)參數(shù)設(shè)置為true,表示開(kāi)啟持久化消息
channel.queueDeclare("Queue隊(duì)列名稱", true, false, false, null);
// 生產(chǎn)者發(fā)送消息時(shí)候,設(shè)置消息屬性是文本持久化
channel.basicPublish("Exchange交換機(jī)名稱", "Queue隊(duì)列名稱", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
到此,RabbitMQ消息隊(duì)列防止消息丟失的三種方式介紹完啦。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-787896.html
綜上,這篇文章結(jié)束了,主要介紹消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-787896.html
到了這里,關(guān)于【RabbitMQ筆記08】消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!