消息丟失
消息從生產(chǎn)到消費(fèi),要經(jīng)歷三個(gè)階段,分別是生產(chǎn)、隊(duì)列轉(zhuǎn)發(fā)與消費(fèi),每個(gè)環(huán)節(jié)都可能丟失消息。
以下以RabbitMQ為例,來(lái)說(shuō)明各個(gè)階段會(huì)產(chǎn)生的問(wèn)題以及解決方式。在說(shuō)明之前,先回顧一下RabbitMQ的一個(gè)基本架構(gòu)圖
1. 生產(chǎn)者生產(chǎn)消息到RabbitMQ Server 消息丟失場(chǎng)景
1. 網(wǎng)絡(luò)問(wèn)題
外界環(huán)境問(wèn)題導(dǎo)致:發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成RabbitMQ Server端收不到消息,因?yàn)樯a(chǎn)環(huán)境的網(wǎng)絡(luò)是很復(fù)雜的,網(wǎng)絡(luò)抖動(dòng),丟包現(xiàn)象很常見(jiàn),下面會(huì)講到針對(duì)這個(gè)問(wèn)題是如何解決的。
2. 代碼層面,配置層面,考慮不全導(dǎo)致消息丟失
一般情況下,生產(chǎn)者使用Confirm模式投遞消息,如果方案不夠嚴(yán)謹(jǐn),比如RabbitMQ Server 接收消息失敗后會(huì)發(fā)送nack消息通知生產(chǎn)者,生產(chǎn)者監(jiān)聽(tīng)消息失敗或者沒(méi)做任何事情,消息存在丟失風(fēng)險(xiǎn);
生產(chǎn)者發(fā)送消息到exchange后,發(fā)送的路由和queue沒(méi)有綁定,消息會(huì)存在丟失情況,下面會(huì)講到具體的例子,保證意外情況的發(fā)生,即使發(fā)生,也在可控范圍內(nèi)。
解決方案:開(kāi)啟confirm模式
首先生產(chǎn)者通過(guò)調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一deliveryTag和multiple參數(shù))。
其實(shí)Confirm模式有三種方式實(shí)現(xiàn):
串行confirm模式:producer每發(fā)送一條消息后,調(diào)用waitForConfirms()方法,等待broker端confirm,如果服務(wù)器端返回false或者在超時(shí)時(shí)間內(nèi)未返回,客戶端進(jìn)行消息重傳。
for(int i = 0;i<50;i++){
channel.basicPublish(
exchange, routingKey,
mandatory, immediate,
messageProperties,
message.getContent()
);
if (channel.waitForConfirms()) {
System.out.println("發(fā)送成功");
} else {
//發(fā)送失敗這里可進(jìn)行消息重新投遞的邏輯
System.out.println("發(fā)送失敗");
}
}
批量confirm模式:producer每發(fā)送一批消息后,調(diào)用waitForConfirms()方法,等待broker端confirm。
問(wèn)題:一旦出現(xiàn)confirm返回false或者超時(shí)的情況時(shí),客戶端需要將這一批次的消息全部重發(fā),這會(huì)帶來(lái)明顯的重復(fù)消息數(shù)量,并且當(dāng)消息經(jīng)常丟失時(shí),批量confirm性能應(yīng)該是不升反降的。
for(int i = 0;i<50;i++){
channel.basicPublish(
exchange, routingKey,
mandatory, immediate,
messageProperties,
message.getContent()
);
}
if (channel.waitForConfirms()) {
System.out.println("發(fā)送成功");
} else {
System.out.println("發(fā)送失敗");
}
異步confirm模式:提供一個(gè)回調(diào)方法,broker confirm了一條或者多條消息后producer端會(huì)回調(diào)這個(gè)方法。 我們分別來(lái)看看這三種confirm模式。
public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
List<Object> list = new ArrayList<>();
JSONObject jsonObject = new JSONObject();
jsonObject.put(DeviceConstant.COMMAND, DELETE);
jsonObject.put(DeviceConstant.BODY, list );
String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
//rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
try {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
channel.confirmSelect();
channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
channel.addConfirmListener(new ConfirmListener() {
//消息失敗處理
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//重發(fā)
channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
}
//消息成功處理
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
}
});
} catch (Exception e) {
log.error("sendQueue-ack-發(fā)送消息失敗:{}", ExceptionUtils.getStackTrace(e));
}
}
2. 隊(duì)列本身可能丟失消息
1. 消息未完全持久化,當(dāng)機(jī)器重啟后,消息會(huì)全部丟失,甚至Queue也不見(jiàn)了
僅僅持久化了Message,而Exchange,Queue沒(méi)有持久化,這個(gè)持久化是無(wú)效的。
解決方案:
交換機(jī)持久化:在聲明交換器時(shí)將 durable 設(shè)為 true。
// 參數(shù)1:交換機(jī)的名字
// 參數(shù)2:交換機(jī)的類(lèi)型,topic/direct/fanout/headers
// 參數(shù)3:是否持久化
channel.exchangeDeclare(exchangeName,exchangeType,true);
隊(duì)列持久化:在聲明隊(duì)列的時(shí)候把 durable 參數(shù)設(shè)置為true。
消息持久化:
想讓消息實(shí)現(xiàn)持久化需要在消息生產(chǎn)者推送消息的方法中修改參數(shù),MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個(gè)屬性。
2. 單節(jié)點(diǎn)模式問(wèn)題,節(jié)點(diǎn)掛了,消息只存在當(dāng)前節(jié)點(diǎn)。硬盤(pán)壞了,那消息真的就無(wú)法恢復(fù)了
如果做了消息持久化方案,消息會(huì)持久化硬盤(pán),機(jī)器重啟后消息不會(huì)丟失;但是還有一個(gè)極端情況,這臺(tái)服務(wù)器磁盤(pán)突然壞了(公司遇到過(guò)磁盤(pán)問(wèn)題還是很多的),消息持久化不了,非高可用狀態(tài),這個(gè)模式生產(chǎn)環(huán)境慎重考慮。
3. 默認(rèn)的集群模式,消息只會(huì)存在與當(dāng)前節(jié)點(diǎn)中,并不會(huì)同步到其他節(jié)點(diǎn),其他節(jié)點(diǎn)也僅只會(huì)同步該節(jié)點(diǎn)的隊(duì)列結(jié)構(gòu)
上圖中的三個(gè)節(jié)點(diǎn)組成了一個(gè)RabbitMQ的集群。其中exchange是交換器,它的元數(shù)據(jù)信息(交換器名稱(chēng)、交換器屬性、綁定鍵等)在所有節(jié)點(diǎn)上都是一致的,而隊(duì)列中的實(shí)際消息數(shù)據(jù)則只會(huì)存在于所創(chuàng)建的那個(gè)節(jié)點(diǎn)上,其它節(jié)點(diǎn)只知道這個(gè)隊(duì)列的元數(shù)據(jù)信息和一個(gè)指向擁有這個(gè)消息的隊(duì)列的節(jié)點(diǎn)指針。
RabbitMQ集群會(huì)同步四種類(lèi)型的內(nèi)部元數(shù)據(jù):隊(duì)列元數(shù)據(jù)(隊(duì)列名和屬性)、交換器元數(shù)據(jù)(交換器名和屬性)、綁定鍵和虛擬機(jī)。在用戶訪問(wèn)其中任何一個(gè)rabbitmq節(jié)點(diǎn)時(shí)查詢(xún)到的queue、user、exchange和vhost等信息都是一致的。
那為什么普通集群只保持元數(shù)據(jù)同步,消息內(nèi)容卻沒(méi)同步呢?這里涉及到存儲(chǔ)空間和性能的問(wèn)題,如果保持每個(gè)節(jié)點(diǎn)都有一份消息,那會(huì)導(dǎo)致每個(gè)節(jié)點(diǎn)的空間都非常大,消息的積壓量會(huì)增加且無(wú)法通過(guò)擴(kuò)容節(jié)點(diǎn)解決積壓?jiǎn)栴}。另外如果要使每個(gè)節(jié)點(diǎn)存儲(chǔ)一份消息,對(duì)于持久化的消息而言,內(nèi)存和磁盤(pán)同步復(fù)制機(jī)制會(huì)導(dǎo)致性能受到很大影響。
工作原理
上圖中的三個(gè)節(jié)點(diǎn),其中節(jié)點(diǎn)1是數(shù)據(jù)節(jié)點(diǎn)(即實(shí)際存儲(chǔ)消息內(nèi)容的節(jié)點(diǎn))。
如果客戶端(生產(chǎn)者或消費(fèi)者)與節(jié)點(diǎn)1建立了連接,那么關(guān)于消息的收發(fā)就只在節(jié)點(diǎn)1上進(jìn)行(可以理解為簡(jiǎn)單的單機(jī)模式);
如果客戶端(消費(fèi)者)是與節(jié)點(diǎn)2或者節(jié)點(diǎn)3建立的連接,此時(shí)由于數(shù)據(jù)在節(jié)點(diǎn)1上,那么節(jié)點(diǎn)2或節(jié)點(diǎn)3只會(huì)起到一個(gè)消息轉(zhuǎn)發(fā)的作用,例如此客戶端是消費(fèi)者,那么消息將由節(jié)點(diǎn)2或節(jié)點(diǎn)3從節(jié)點(diǎn)1中拉取,再經(jīng)自身節(jié)點(diǎn)路由給消費(fèi)者端;
如果客戶端(生產(chǎn)者),那么消息先發(fā)給節(jié)點(diǎn)2或3,再路由到節(jié)點(diǎn)1的隊(duì)列中存儲(chǔ)。
一個(gè)節(jié)點(diǎn)可以是磁盤(pán)節(jié)點(diǎn)和內(nèi)存節(jié)點(diǎn),磁盤(pán)節(jié)點(diǎn)將元數(shù)據(jù)存儲(chǔ)在磁盤(pán),內(nèi)存節(jié)點(diǎn)將元數(shù)據(jù)存儲(chǔ)在內(nèi)存。
這里需要注意的是,內(nèi)存節(jié)點(diǎn)只是將元數(shù)據(jù)(比如隊(duì)列名和屬性、交換器名和屬性和虛擬機(jī)等)存儲(chǔ)在內(nèi)存,因此在對(duì)資源管理(創(chuàng)建和刪除隊(duì)列、交換器和虛擬機(jī)等)時(shí)的性能有所提升,但是對(duì)發(fā)布和訂閱的消息速率并沒(méi)有提升。
RabbitMQ要求集群中至少有一個(gè)磁盤(pán)節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)加入和離開(kāi)集群時(shí),必須通知磁盤(pán)節(jié)點(diǎn)(如果集群中唯一的磁盤(pán)節(jié)點(diǎn)崩潰了,則不能進(jìn)行創(chuàng)建隊(duì)列、創(chuàng)建交換器、創(chuàng)建綁定、添加用戶、更改權(quán)限、添加和刪除集群節(jié)點(diǎn))。如果唯一磁盤(pán)的磁盤(pán)節(jié)點(diǎn)崩潰,集群是可以保持運(yùn)行的,但不能更改任何東西。因此建議在集群中設(shè)置兩個(gè)磁盤(pán)節(jié)點(diǎn),只要一個(gè)即可正常操作??傊跓o(wú)法得知它們?nèi)绾问褂貌拍鼙WC最佳時(shí)建議最好都用磁盤(pán)節(jié)點(diǎn)。
總結(jié):普通集群模式并不能保證服務(wù)的高可用,因?yàn)槠渌?jié)點(diǎn)只復(fù)制了隊(duì)列和交換器等元數(shù)據(jù)信息,并沒(méi)有將真實(shí)的消息內(nèi)容復(fù)制到自身節(jié)點(diǎn)。該部署模式只解決了單節(jié)點(diǎn)的壓力問(wèn)題,但是當(dāng)數(shù)據(jù)節(jié)點(diǎn)宕機(jī)之后便無(wú)法提供服務(wù)了,消息的路由線路受到了阻隔,客戶端則無(wú)法繼續(xù)與服務(wù)交互。為了解決這個(gè)問(wèn)題,就需要此消息數(shù)據(jù)也能被復(fù)制到集群的其它節(jié)點(diǎn)中,因此rabbitmq引入了鏡像部署模式。
解決方案:鏡像部署,消息會(huì)同步到其他節(jié)點(diǎn)上,可以設(shè)置同步的節(jié)點(diǎn)個(gè)數(shù),但吞吐量會(huì)下降。
Rabbitmq的鏡像集群實(shí)際上是在普通集群的基礎(chǔ)上增加了策略,它需要先按照普通集群的方式進(jìn)行部署,部署完成之后再通過(guò)創(chuàng)建鏡像隊(duì)列的策略實(shí)現(xiàn)主備節(jié)點(diǎn)消息同步。也就是說(shuō),每個(gè)備用節(jié)點(diǎn)都有和主節(jié)點(diǎn)一樣的隊(duì)列,這個(gè)隊(duì)列是由主節(jié)點(diǎn)通過(guò)創(chuàng)建鏡像隊(duì)列所產(chǎn)生的,且這些備用節(jié)點(diǎn)能及時(shí)的同步主節(jié)點(diǎn)中隊(duì)列的入隊(duì)消息。當(dāng)消息設(shè)置了持久化時(shí),每個(gè)節(jié)點(diǎn)都有屬于自己的本地消息持久化存儲(chǔ)機(jī)制。當(dāng)消息入隊(duì)和出隊(duì)時(shí),所有關(guān)于對(duì)主節(jié)點(diǎn)的操作都會(huì)同步給備用節(jié)點(diǎn)用來(lái)更新。此集群模式在主節(jié)點(diǎn)宕機(jī)之后備用節(jié)點(diǎn)所保留的消息與主節(jié)點(diǎn)完全一致,即可實(shí)現(xiàn)高可用。
工作原理
上圖就是鏡像集群模式的實(shí)現(xiàn)流程,其中有三個(gè)節(jié)點(diǎn)(主節(jié)點(diǎn)、備節(jié)點(diǎn)1、備節(jié)點(diǎn)2)和三個(gè)鏡像隊(duì)列queue(其中備節(jié)點(diǎn)上的queue是由主節(jié)點(diǎn)鏡像生成的)。要注意的是,這里的主節(jié)點(diǎn)和備節(jié)點(diǎn)是針對(duì)某個(gè)隊(duì)列而言的,并不能認(rèn)為一個(gè)節(jié)點(diǎn)作為了所有隊(duì)列的主節(jié)點(diǎn),因?yàn)樵谡麄€(gè)鏡像集群模式下,會(huì)存在多個(gè)節(jié)點(diǎn)和多個(gè)隊(duì)列,這時(shí)候任何一個(gè)節(jié)點(diǎn)都能作為某一個(gè)隊(duì)列的鏡像主節(jié)點(diǎn),其它節(jié)點(diǎn)則成了鏡像備節(jié)點(diǎn)(例如:有A、B、C三個(gè)節(jié)點(diǎn)和Q1、Q2、Q3三個(gè)隊(duì)列,如果A作為Q1的鏡像主節(jié)點(diǎn),那么B和C就作為了Q1的鏡像備節(jié)點(diǎn),在此基礎(chǔ)上,如果B作為了Q2的鏡像主節(jié)點(diǎn),那么A和C就是Q2的鏡像備節(jié)點(diǎn))。
每一個(gè)隊(duì)列都是由兩部分組成的,一個(gè)是queue,用來(lái)接收消息和發(fā)布消息,另外還有一個(gè)BackingQueue,它是用來(lái)做本地消息持久化處理。客戶端發(fā)送給主節(jié)點(diǎn)隊(duì)列的消息和ack應(yīng)答都將會(huì)同步到其它備節(jié)點(diǎn)上。
所有關(guān)于鏡像主隊(duì)列(mirror_queue_master)的操作,都會(huì)通過(guò)組播GM的方式同步到其它備用節(jié)點(diǎn)上,這里的GM負(fù)責(zé)消息的廣播,mirror_queue_slave則負(fù)責(zé)回調(diào)處理(更新本次同步內(nèi)容),因此當(dāng)消息發(fā)送給備用節(jié)點(diǎn)時(shí),則由mirror_queue_slave來(lái)做實(shí)際處理,將消息存儲(chǔ)在queue中,如果是持久化消息則同時(shí)存儲(chǔ)在BackingQueue中。master上的回調(diào)則由coordinator來(lái)處理(發(fā)布本次同步內(nèi)容)。在主節(jié)點(diǎn)中,BackingQueue的存儲(chǔ)則是由Queue進(jìn)行調(diào)用。對(duì)于生產(chǎn)者而言,消息發(fā)送給queue之后,接著調(diào)用mirror_queue_master進(jìn)行持久化處理,之后再通過(guò)GM廣播發(fā)送本次同步消息給備用節(jié)點(diǎn),備用節(jié)點(diǎn)通過(guò)回調(diào)mirror_queue_slave同步本次消息到queue和BackingQueue;對(duì)于消費(fèi)者而言,從queue中獲取消息之后,消息隊(duì)列會(huì)等待消費(fèi)者的ack應(yīng)答,ack應(yīng)答收到之后刪除queue和BackingQueue中的該條消息,并將本次ack內(nèi)容通過(guò)GM廣播發(fā)送給備用節(jié)點(diǎn)同步本次操作。如果slave宕機(jī)了,那對(duì)于客戶端的服務(wù)提供將不會(huì)有任何影響。如果master宕機(jī)了,則其它備用節(jié)點(diǎn)就提升為master繼續(xù)服務(wù)消息不會(huì)丟失。那這其中多個(gè)備用節(jié)點(diǎn)是如何選擇其中一個(gè)來(lái)作為master的呢?這里通過(guò)選取出“最年長(zhǎng)的”節(jié)點(diǎn)作為master,因?yàn)檫@個(gè)備用節(jié)點(diǎn)相對(duì)于其它節(jié)點(diǎn)而言是同步時(shí)間最長(zhǎng)、同步狀態(tài)最好的一個(gè)節(jié)點(diǎn),但如果存在沒(méi)有任何一個(gè)slave與master完全同步的情況,那么master中未同步的消息將會(huì)丟失。
GM
GM模塊實(shí)現(xiàn)的一種可靠的組播通訊協(xié)議,該協(xié)議能夠保證組播消息的原子性,即保證組中活著的節(jié)點(diǎn)要么都收到消息要么都收不到。
它的實(shí)現(xiàn)大致為:將所有的節(jié)點(diǎn)形成一個(gè)循環(huán)鏈表,每個(gè)節(jié)點(diǎn)都會(huì)監(jiān)控位于自己左右兩邊的節(jié)點(diǎn),當(dāng)有節(jié)點(diǎn)新增時(shí),相鄰的節(jié)點(diǎn)保證當(dāng)前廣播的消息會(huì)復(fù)制到新的節(jié)點(diǎn)上;當(dāng)有節(jié)點(diǎn)失效時(shí),相鄰的節(jié)點(diǎn)會(huì)接管保證本次廣播的消息會(huì)復(fù)制到下一個(gè)節(jié)點(diǎn)。在master節(jié)點(diǎn)和slave節(jié)點(diǎn)上的這些gm形成一個(gè)group,group(gm_group)的信息會(huì)記錄在mnesia中。不同的鏡像隊(duì)列形成不同的group。消息從master節(jié)點(diǎn)對(duì)應(yīng)的gm發(fā)出后,順著鏈表依次傳送到所有的節(jié)點(diǎn),由于所有節(jié)點(diǎn)組成一個(gè)循環(huán)鏈表,master節(jié)點(diǎn)對(duì)應(yīng)的gm最終會(huì)收到自己發(fā)送的消息,這個(gè)時(shí)候master節(jié)點(diǎn)就知道消息已經(jīng)復(fù)制到所有的slave節(jié)點(diǎn)了。另外需要注意的是,每一個(gè)新節(jié)點(diǎn)的加入都會(huì)先清空這個(gè)節(jié)點(diǎn)原有數(shù)據(jù),下圖是新節(jié)點(diǎn)加入集群的一個(gè)簡(jiǎn)單模型:
消息的同步:
將新節(jié)點(diǎn)加入已存在的鏡像隊(duì)列,在默認(rèn)情況下ha-sync-mode=manual,鏡像隊(duì)列中的消息不會(huì)主動(dòng)同步到新節(jié)點(diǎn),除非顯式調(diào)用同步命令。當(dāng)調(diào)用同步命令后,隊(duì)列開(kāi)始阻塞,無(wú)法對(duì)其進(jìn)行操作,直到同步完畢。
總結(jié)
鏡像集群模式通過(guò)從主節(jié)點(diǎn)拷貝消息的方式使所有節(jié)點(diǎn)都能保留一份數(shù)據(jù),一旦主節(jié)點(diǎn)崩潰,備節(jié)點(diǎn)就能完成替換從而繼續(xù)對(duì)外提供服務(wù)。這解決了節(jié)點(diǎn)宕機(jī)帶來(lái)的困擾,提高了服務(wù)穩(wěn)定性,但是它并不能實(shí)現(xiàn)負(fù)載均衡,因?yàn)槊總€(gè)操作都要在所有節(jié)點(diǎn)做一遍,這無(wú)疑降低了系統(tǒng)性能。再者當(dāng)消息大量入隊(duì)時(shí),集群內(nèi)部的網(wǎng)絡(luò)帶寬會(huì)因此時(shí)的同步通訊被大大消耗掉,因此對(duì)于可靠性要求高、性能要求不高且消息量并不多的場(chǎng)景比較適用。如果對(duì)高可用和負(fù)載均衡都有要求的場(chǎng)景則需要結(jié)合HAProxy(實(shí)現(xiàn)節(jié)點(diǎn)間負(fù)載均衡)和keepalived(實(shí)現(xiàn)HAproxy的主備模式)中間件搭配使用,下面我們將對(duì)這種場(chǎng)景的部署進(jìn)行全流程概述。
3. 消費(fèi)端可能丟失消息
消費(fèi)端采用自動(dòng)ack機(jī)制,還沒(méi)有處理完畢,消費(fèi)端宕機(jī)。
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成了部分突然它掛掉了,會(huì)導(dǎo)致消息丟失。因?yàn)镽abbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。
解決方案:改為手動(dòng)ack,當(dāng)消息正確處理完成后,再通知mq。消費(fèi)端處理消息異常后,回傳nack,這樣mq會(huì)把這條消息投遞到另外一個(gè)消費(fèi)端上。
消息應(yīng)答的方法
- Channel.basicAck(long deliveryTag, boolean multiple):用于肯定確認(rèn)。RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
deliveryTag:該消息的index
multiple:是否批量.。true:將一次性ack所有小于deliveryTag的消息。
multiple參數(shù)解析:true
代表批量應(yīng)答
比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8
那么此時(shí) 5-8 的消息都會(huì)被確認(rèn)收到消息應(yīng)答false
同上面相比
只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
- Channel.void basicNack(long deliveryTag, boolean multiple, boolean requeue) :用于否定確認(rèn)
deliveryTag:該消息的index。
multiple:是否批量。true:將一次性拒絕所有小于deliveryTag的消息。
requeue:被拒絕的是否重新入隊(duì)列。
- Channel.basicReject(long deliveryTag, boolean requeue):用于否定確認(rèn) (推薦使用)
deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊(duì)列。
basicNack()和basicReject()的區(qū)別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-690488.html
demo
@RabbitHandler
@RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
public void receiveQueueCommonLocal(Channel channel, Message message) {
String messageBody = new String(message.getBody());
//System.out.println("messageBody===>"+messageBody);
try {
//todo 業(yè)務(wù)邏輯
/*手動(dòng)確認(rèn)成功
* 參數(shù):
* deliveryTag:該消息的index
* multiple:是否批量處理.true:將一次性ack所有小于deliveryTag的消息
* **/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
try {
//手動(dòng)確認(rèn)回滾 拒絕deliveryTag對(duì)應(yīng)的消息,第二個(gè)參數(shù)是否requeue,true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
文章來(lái)源:https://blog.51cto.com/u_15840568/5784352
https://zhuanlan.zhihu.com/p/79545722
集群:https://blog.csdn.net/weixin_43498985/article/details/122185972
消費(fèi)者ack:https://blog.csdn.net/m0_64337991/article/details/122755297
https://zhuanlan.zhihu.com/p/483289106?utm_id=0文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-690488.html
到了這里,關(guān)于RabbitMQ如何避免丟失消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!