RabbitMq消息丟失原因及其解決方案
一、RabbitMQ消息丟失原因
我們首先了解下一條消息從生產(chǎn)到消費(fèi)的整個(gè)流程如下:
生產(chǎn)-->MQ Broker --> 消費(fèi)。所以這三個(gè)環(huán)節(jié)都有丟失消息的可能。
1.1、生產(chǎn)者丟失消息
生產(chǎn)者將數(shù)據(jù)發(fā)送到rabbitmq的時(shí)候,可能因?yàn)榫W(wǎng)絡(luò)問題導(dǎo)致數(shù)據(jù)就在半路給搞丟了。
1.使用事務(wù)(性能差)
? RabbitMQ 客戶端中與事務(wù)機(jī)制相關(guān)的方法有三個(gè): channel.txSelect 、channel.txCommit 和 channel.txRollback。channel.txSelect 用于將當(dāng)前的信道設(shè)置成事務(wù)模式,channel.txCommit 用于提交事務(wù),channel.txRollback 用于事務(wù)回滾。在通過 channel.txSelect 方法開啟事務(wù)之后,我們便可以發(fā)布消息給 RabbitMQ 了,如果事務(wù)提交成功,則消息一定到達(dá)了 RabbitMQ 中,如果在事務(wù)提交執(zhí)行之前由于 RabbitMQ異常崩潰或者其他原因拋出異常,這個(gè)時(shí)候我們便可以將其捕獲,進(jìn)而通過執(zhí)行channel.txRollback 方法來實(shí)現(xiàn)事務(wù)回滾。注意這里的 RabbitMQ 中的事務(wù)機(jī)制與大多數(shù)數(shù)據(jù)庫中的事務(wù)概念并不相同,需要注意區(qū)分。
? 事務(wù)確實(shí)能夠解決消息發(fā)送方和 RabbitMQ 之間消息確認(rèn)的問題,只有消息成功被RabbitMQ 接收,事務(wù)才能提交成功,否則便可在捕獲異常之后進(jìn)行事務(wù)回滾,與此同時(shí)可以進(jìn)行消息重發(fā)。但是使用事務(wù)機(jī)制會(huì)“吸干”RabbitMQ 的性能。
2.發(fā)送回執(zhí)確認(rèn)(推薦)
? 生產(chǎn)者將信道設(shè)置成 confirm(確認(rèn))模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知曉消息已經(jīng)正確到達(dá)了目的地了。如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在消息寫入磁盤之后發(fā)出。RabbitMQ 回傳給生產(chǎn)者的確認(rèn)消息中的 deliveryTag 包含了確認(rèn)消息的序號(hào),此外 RabbitMQ 也可以設(shè)置 channel.basicAck 方法中的 multiple 參數(shù),表示到這個(gè)序號(hào)之前的所有消息都已經(jīng)得到了處理,注意辨別這里的確認(rèn)和消費(fèi)時(shí)候的確認(rèn)之間的異同。
注意要點(diǎn):
- 事務(wù)機(jī)制和 publisher confirm 機(jī)制兩者是互斥的,不能共存。
- 事務(wù)機(jī)制和 publisher confirm 機(jī)制確保的是消息能夠正確地發(fā)送至 RabbitMQ,這里的“發(fā)送至 RabbitMQ”的含義是指消息被正確地發(fā)往至 RabbitMQ 的交換器,如果此交換器沒有匹配的隊(duì)列,那么消息也會(huì)丟失。
1.2、RabbitMQ弄丟了數(shù)據(jù)-開啟RabbitMQ的數(shù)據(jù)持久化
? 為了防止rabbitmq自己弄丟了數(shù)據(jù),這個(gè)你必須開啟rabbitmq的持久化,就是消息寫入之后會(huì)持久化到磁盤,哪怕是rabbitmq自己掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟。除非極其罕見的是,rabbitmq還沒持久化,自己就掛了,可能導(dǎo)致少量數(shù)據(jù)會(huì)丟失的,但是這個(gè)概率較小。
設(shè)置持久化有兩個(gè)步驟,第一個(gè)是創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里的數(shù)據(jù);第二個(gè)是發(fā)送消息的時(shí)候?qū)⑾⒌膁eliveryMode設(shè)置為2,就是將消息設(shè)置為持久化的,此時(shí)rabbitmq就會(huì)將消息持久化到磁盤上去。必須要同時(shí)設(shè)置這兩個(gè)持久化才行,rabbitmq哪怕是掛了,再次重啟,也會(huì)從磁盤上重啟恢復(fù)queue,恢復(fù)這個(gè)queue里的數(shù)據(jù)。
而且持久化可以跟生產(chǎn)者那邊的confirm機(jī)制配合起來,只有消息被持久化到磁盤之后,才會(huì)通知生產(chǎn)者ack了,所以哪怕是在持久化到磁盤之前,rabbitmq掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack,你也是可以自己重發(fā)的。
若生產(chǎn)者那邊的confirm機(jī)制未開啟的情況下,哪怕是你給rabbitmq開啟了持久化機(jī)制,也有一種可能,就是這個(gè)消息寫到了rabbitmq中,但是還沒來得及持久化到磁盤上,結(jié)果不巧,此時(shí)rabbitmq掛了,就會(huì)導(dǎo)致內(nèi)存里的一點(diǎn)點(diǎn)數(shù)據(jù)會(huì)丟失。
1.3、消費(fèi)端弄丟了數(shù)據(jù)
? 為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者,RabbitMQ 提供了消息確認(rèn)機(jī)制(message acknowledgement)。消費(fèi)者在訂閱隊(duì)列時(shí),可以指定 autoAck 參數(shù),當(dāng) autoAck 等于 false時(shí),RabbitMQ 會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)后才從內(nèi)存(或者磁盤)中移去消息(實(shí)質(zhì)上是先打上刪除標(biāo)記,之后再刪除)。當(dāng) autoAck 等于 true 時(shí),RabbitMQ 會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息。
? 采用消息確認(rèn)機(jī)制后,只要設(shè)置 autoAck 參數(shù)為 false,消費(fèi)者就有足夠的時(shí)間處理消息(任務(wù)),不用擔(dān)心處理消息過程中消費(fèi)者進(jìn)程掛掉后消息丟失的問題,因?yàn)?RabbitMQ 會(huì)一直等待持有消息直到消費(fèi)者顯式調(diào)用 Basic.Ack 命令為止。
? 這里我們可以通過RabbtiMQ 的 Web 管理平臺(tái)上可以看到當(dāng)前隊(duì)列中的“Ready”狀態(tài)和“Unacknowledged”狀態(tài)的消息數(shù),分別對(duì)應(yīng)上文中的等待投遞給消費(fèi)者的消息數(shù)和已經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號(hào)的消息數(shù)。
二、模擬消費(fèi)端丟失數(shù)據(jù)
2.1、初始化工程
這里不做說了,可以看我《SpringBoot集成RabbitMQ》中的相關(guān)步驟。這里我們直接寫生產(chǎn)者和消費(fèi)者。
2.2、編寫生產(chǎn)者
我們?cè)谥暗墓こ躺线M(jìn)行相應(yīng)的修改,通過http方式生產(chǎn)消息。我們直接在IndexController
中新增simpleProducer方法;
內(nèi)容如下:
/**
* @Author julyWhj
* @Description IndexController$
* @Date 2021/10/7 10:18 上午
**/
@RestController
public class IndexController {
@Autowired
private SimpleProducer simpleProducer;
@GetMapping("/index")
public String index() {
return "Hello RabbitMQ";
}
@GetMapping("/producer")
public String simpleProducer() {
for (int i = 0; i < 10; i++) {
simpleProducer.sendOrderMessage(Simple.builder()
.createTime(new Date())
.name("JulyWhj")
.age(i)
.no("ID-000" + i)
.phone("138XXXXXXXX")
.build());
}
return "消息發(fā)送成功";
}
}
我們測試下生產(chǎn)者,啟動(dòng)服務(wù),在瀏覽器中訪問"http://127.0.0.1:8080/producer"。
注意:
??這里我們使用Debug方式啟動(dòng),在消費(fèi)者中加上斷點(diǎn),因?yàn)槲覀兊纳a(chǎn)者和消費(fèi)者在同一個(gè)服務(wù)中,生產(chǎn)的消息會(huì)被消費(fèi)者直接消費(fèi)掉。所以我們加上斷點(diǎn),阻止消費(fèi)者消費(fèi)消息。
可以看到消息瀏覽器返回消息發(fā)送成功。
我們通過RabbitMQ管理平臺(tái)看下隊(duì)列中消息數(shù)量:
通過管理平臺(tái)我們可以看到有10條消息沒消費(fèi)。
2.3、模擬消費(fèi)者斷電
我們可以看到隊(duì)列中有10條消息沒有消費(fèi),我們通過Debug讓消費(fèi)者消費(fèi)一條消息。
消費(fèi)一條后,我們首先查看下隊(duì)列中消息數(shù)量:
可以看到,隊(duì)列中還有9條消息,這里我們停止消費(fèi)者服務(wù)。
這里我們看到,隊(duì)列中消息條數(shù)為0,出現(xiàn)了消息丟失的問題。
? 因?yàn)檫@里我們使用了消費(fèi)者自動(dòng)消息確認(rèn)機(jī)制。當(dāng) autoAck 等于 true 時(shí),RabbitMQ 會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息。
2.4、使用手動(dòng)消息確認(rèn)機(jī)制
? 我們通過配置spring.rabbitmq.listener.simple.acknowledge-mode=manual
屬性,開啟手動(dòng)ack模式。默認(rèn)配置auto自動(dòng)確認(rèn)。
在application配置文件中新增如下配置:
spring.rabbitmq.host=host
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 開啟手動(dòng)ack確認(rèn)消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
這里我們?cè)跍y試下,重新啟動(dòng)服務(wù),
我們可以看到,這里控制臺(tái)已經(jīng)消費(fèi)了10條消息,但是我們沒有確認(rèn)消費(fèi)消息,我們可以看下隊(duì)列中是否依然存在10條數(shù)據(jù)。
我們可以看到隊(duì)列中依然存在10條消息。如果我們重啟服務(wù),模擬斷電,消息依然存在不會(huì)丟失。
我們將手動(dòng)ack代碼注釋去掉,我們可以看到已經(jīng)消費(fèi)了一條消息。我們停止服務(wù),模擬斷電效果。
我們查看下消息隊(duì)列中的數(shù)據(jù),是9條,而非0。這樣消費(fèi)端不會(huì)因斷電等情況導(dǎo)致消費(fèi)端數(shù)據(jù)丟失。
三、RabbitMQ消息可靠性保障策略
1、生產(chǎn)者開啟消息確認(rèn)機(jī)制
2、消息隊(duì)列數(shù)據(jù)持久化
3、消費(fèi)者手動(dòng)ack
4、生產(chǎn)者消息記錄+定期補(bǔ)償機(jī)制
5、服務(wù)冪等處理
6、消息擠壓處理等文章來源:http://www.zghlxwxcb.cn/news/detail-717981.html
這里的內(nèi)容我們?cè)诤罄m(xù)文章中單獨(dú)展開分析,這里只是給大家提供下需要注意的事項(xiàng)。包括TTL和DLX隊(duì)列的使用等。文章來源地址http://www.zghlxwxcb.cn/news/detail-717981.html
到了這里,關(guān)于RabbitMq消息丟失原因及其解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!