系列文章目錄
手把手教你,本地RabbitMQ服務搭建(windows)
消息隊列選型——為什么選擇RabbitMQ
RabbitMQ靈活運用,怎么理解五種消息模型
推或拉? RabbitMQ 消費模式該如何選擇
死信是什么,如何運用RabbitMQ的死信機制?
前言
前面我們在做MQ組件選型時,提到了rabbitMQ的消息可靠性,那么它到底可靠到什么程度?又是如何保證消息可靠性的呢?今天我們就一起來看一下
一、消息可靠性的定義
消息可靠性是指在消息傳遞過程中,確保消息能夠被完整、準確、可靠地傳遞到目的地。更具體的說分為兩個角度:
- 不會意外丟失
- 不會重復傳遞
因此,我們必須保證消息不會因為網(wǎng)絡故障、系統(tǒng)故障或其他異常原因而丟失或重復傳遞,否則可能導致業(yè)務邏輯錯誤、數(shù)據(jù)損壞或系統(tǒng)崩潰等問題
二、幾種不可靠的場景
- 消息漏發(fā)送:生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知
- 消息重復發(fā)送:如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復
- 消息未儲存:rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失
- 消費者重復消費:如果消費者和MQ都不記得曾經(jīng)消費過的消息,主動拉取或推送了舊的消息,導致重復消費,
三、防意外丟失
在這里,必須提前聲明一點:即消息意外丟失,因為rabbitMQ經(jīng)由轉換機,如果匹配不到任何隊列,是會主動丟棄該消息的,這種丟失屬于業(yè)務配置上的主動丟棄,不記在意外丟失中
1. 消息持久化
消息持久化需要在消息生產(chǎn)者修改代碼
String MESSAGE = "Hello, RabbitMQ!";
// 設置消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2) // deliveryMode=1代表不持久化,deliveryMode=2代表持久化
.build();
channel.basicPublish("", MESSAGE_QUEUE, properties, MESSAGE.getBytes("UTF-8"));
也可以直接使用內(nèi)置的properties
channel.basicPublish("", MESSAGE_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes("UTF-8"));
2. 隊列持久化
盡管我們上面已經(jīng)使用了消息持久化,但是這是不夠的,消息本身不會作為一個實體存在硬盤上,真正落在硬盤上的是隊列,及隊列中的消息。所以,要想保存消息,還得把消息所在的隊列持久化,因此需要在聲明隊列時,將其 durable 屬性設置為true
// 設置隊列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
注意,該屬性不可修改,如果要把一個隊列改成持久化,得先刪除,再創(chuàng)建才行
3. 發(fā)布確認
我們上面已經(jīng)成功把消息做了持久化,不過這并不能徹底避免消息丟失,比如在消息發(fā)布者發(fā)布消息的過程中,在消息成功持久化之前,rabbitMQ就崩潰了,此時消息仍然會丟失。因此,有必要執(zhí)行發(fā)布確認的操作
即消息發(fā)送后,MQ要對生產(chǎn)者發(fā)送消息確認,確認已經(jīng)持久化后,再進行發(fā)布確認
發(fā)布確認默認不開啟,如果要開啟,需要在channel上設置
Channel channel = connection.createChannel();
// 將信道設置為發(fā)布確認
channel.confirmSelect();
進行完該項設置后,還需要針對確認消息的類型,適當?shù)男薷陌l(fā)送方代碼。一般來說,發(fā)布確認有以下類型
3.1 簡單發(fā)布確認
即發(fā)送后,單條單條的消息是否被rabbitMQ服務器接受
String message = "Hello, RabbitMQ!";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 設置簡單發(fā)布確認
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("Message published successfully.");
} else {
System.err.println("Failed to publish message.");
}
可以看到,這種方式其實采用的是發(fā)一條消息,確認一次,效率并不高。
3.2 批量發(fā)布確認
批量發(fā)布和簡單發(fā)布,在調用方法上并沒有區(qū)別,只是發(fā)送的消息,從發(fā)一條就等待確認一次,變成了發(fā)一批,才確認一次。
int MESSAGE_COUNT = 100;
String message = "Hello, RabbitMQ!";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 設置批量發(fā)布確認
channel.confirmSelect();
for (int i = 0; i < MESSAGE_COUNT; i++) {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
int outstandingConfirms = MESSAGE_COUNT;
while (outstandingConfirms > 0) {
outstandingConfirms -= channel.waitForConfirms();
}
System.out.println("All messages published successfully.");
此種方式,雖然仍然會同步阻塞,但從每條確認一次進化到批量確認一次,大大節(jié)約了網(wǎng)絡耗時。但是可能會出現(xiàn)一些消息發(fā)布成功,但是一些消息未成功的情況,不易進行排查和處理。
3.3 異步發(fā)布確認
異步確認則采用的另一種方案,通過給channel設置一個確認監(jiān)聽器,來異步的做確認,即將發(fā)布消息和確認處理放在不同的線程中處理
int MESSAGE_COUNT = 100;
String message = "Hello, RabbitMQ!";
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
Set<Long> failConfirmMessages = new HashSet<>();
// 異步發(fā)布確認
channel.confirmSelect();
// 需設置兩個監(jiān)聽器,前者為肯定確認,后者為否定確認
channel.addConfirmListener(new ConfirmCallback() {
@Override
// deliveryTag 代表 投遞消息的序號;multiple為true,則代表確認所有小于或等于當前消息deliveryTag的狀態(tài),為false,代表僅確認該條消息
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
// 清除所有小于該序號的消息
confirmed.clear();
} else {
// 僅清除本條消息
outstandingConfirms.remove(deliveryTag);
}
}
}, new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
System.err.println("Failed to publish message.");
failConfirmMessages.add(deliveryTag);
}
});
for (int i = 0; i < MESSAGE_COUNT; i++) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
outstandingConfirms.put(nextSeqNo, message);
}
// 一段時間過后
......
// 看最后是否還有消息被確認丟失,此時可選擇是否要重新發(fā)送
if (failConfirmMessages .size() == 0 && outstandingConfirms.size() == 0) {
System.out.println("All messages published successfully.");
} else {
System.out.println("Some messages need republish.");
}
通過異步方式做確認,能提升性能,缺點是需要一些多線程的知識,實現(xiàn)難度較高。
4. 手動接收確認
如果第三點,是保證消息發(fā)送者到MQ服務器之間,消息不會丟失。那么同理,還需要保證MQ服務器到消費者間,消息不會丟失。
這時候,就需要手動接收確認了,即消費者得到消息后,先進行業(yè)務處理(或消息存儲),直到業(yè)務處理完成后。再告知rabbitMQ服務器,消息我收到了。從而避免了自動ack后,消費者宕機導致的消息未處理完就丟失的問題,其示例代碼如下
// 創(chuàng)建消費者對象
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
// 處理消息
System.out.println("Received message: " + message);
// 顯式 ack 消息
channel.basicAck(envelope.getDeliveryTag(), false); // 第二個參數(shù)表示是否批量處理
} catch (Exception ex) {
// 處理消息時發(fā)生異常,拒絕消息并重新將其放回隊列中
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 開始消費消息,使用手動ack
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
PS:需要注意的是,手動ack可能帶來重復消費的問題,比如消息處理成功后,在執(zhí)行channel.basicAck時宕機,導致RabbitMQ服務器沒收到消息接收確認的信號,超時后會認為該消息未被接收
5. 死信隊列
在某些情況下(如手動ACK),如消費者在暫時無法處理該消息,RabbitMQ 可能會將消息重新放回隊列,但大量的重新放回會導致消息堆積,也是不可取的。
// 如下,消費者可以向rabbitMQ發(fā)送nack的消息,且設置requeue參數(shù)為false
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
為了避免這種情況,RabbitMQ 提供了死信隊列的功能。當消息因為某些原因不能被消費時,RabbitMQ 將消息放入死信隊列而不是重新放回隊列,防止消息丟失
四、防重復傳遞
上面一節(jié),我們?yōu)閞abbitMQ在消息傳遞過程中,各個節(jié)點都有防消息丟失的配置。這一節(jié),我們來說rabbitMQ為了防止一條消息重復傳遞而做的努力
1. 消息確認機制
上面,我們說了發(fā)布確認和接收確認。其實,不管是發(fā)布和接收,這都屬于消息確認機制的一種,而消息確認機制是AMQP協(xié)議所規(guī)定的。發(fā)布確認是為了防止丟失消息,接收確認則是為了防止重復消費,當消費者成功接收到消息并完成處理后,發(fā)送確認通知給 RabbitMQ,RabbitMQ 才會將該消息標記為已消費,防止重復傳遞
2. 冪等性校驗(需代碼實現(xiàn))
在消息生產(chǎn)者發(fā)送消息之前,消息可以被設置上全局唯一uuid,而消費者在消費前,則會判斷該uuid是否已經(jīng)消費過。
// 生產(chǎn)者發(fā)送消息之前,將消息標記為idempotent
// 通過設置 messageId 屬性為一個唯一值,即可標記該消息為冪等消息
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
// 消費者在處理消息之前,檢查該消息是否已經(jīng)被消費過
// 如果該消息已經(jīng)被消費過,則直接確認消息
String messageId = properties.getMessageId();
if (processedIds.contains(messageId)) {
channel.basicAck(envelope.getDeliveryTag(), false);
return;
}
// 處理消息,并將 messageId 加入已處理集合
// ...
processedIds.add(messageId);
以上代碼僅展示原理,實際上分布式高并發(fā)的情況下,uuid應該交由專門的服務器用雪花算法等方式去產(chǎn)生全局唯一的uuid。同樣消費者處的processedIds也會進行遠端存儲
五、不可靠場景的對策
現(xiàn)在,讓我們回頭來看看不可靠場景下,rabbitMQ和我們開發(fā)者能用什么對策解決文章來源:http://www.zghlxwxcb.cn/news/detail-577543.html
- 消息漏發(fā)送:生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知
- 消息重復發(fā)送:如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復
- 消息未儲存:rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失
- 消費者重復消費:如果消費者不記得曾經(jīng)消費過的消息,主動拉取或被推送了舊的消息,導致重復消費,
場景 | 場景解釋 | 解決對策 |
---|---|---|
消息漏發(fā)送 | 生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知 | 發(fā)布確認 |
消息重復發(fā)送 | 如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復 | 無策略 |
消息未儲存 | rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失 | 隊列、消息持久化 |
消費者重復消費 | 如果消費者和MQ都不記得曾經(jīng)消費過的消息,主動拉取或推送了舊的消息,導致重復消費 | 接受確認、冪等性校驗(代碼實現(xiàn)) |
六、總結
RabbitMQ 能保證消息可靠性嗎?答案是絕大部分情況可靠,但僅靠其自身機制無法做到100%。比如對于沒有收到發(fā)布確認信息,導致消息生產(chǎn)者重復傳遞這種場景就并沒有好的辦法,只能通過開發(fā)者額外代碼去解決,比如發(fā)消息帶全局唯一id,然后由消費者去做冪等性校驗。而針對更極端的場景,如RabbitMQ硬盤故障導致消息丟失,就得依托鏡像部署等手段去處理了
文章來源地址http://www.zghlxwxcb.cn/news/detail-577543.html
到了這里,關于RabbitMQ 能保證消息可靠性嗎的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!