目錄
1、RabbitMQ消息丟失的可能性
1.1 生產(chǎn)者消息丟失場景
1.2 MQ導(dǎo)致消息丟失
1.3 消費(fèi)者丟失
2、如何保證生產(chǎn)者消息的可靠性
2.1 生產(chǎn)者重試機(jī)制
2.2 生產(chǎn)者確認(rèn)機(jī)制
2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn)
2.3.1 配置yml開啟生產(chǎn)者確認(rèn)
2.3.2 定義ReturnCallback
2.3.3 定義ConfirmCallback
3、MQ消息可靠性
3.1 Exchange交換機(jī)持久化
3.2 Queues隊(duì)列持久化
3.3 消息的持久化
3.4 LazyQueue
4、消費(fèi)者的可靠性
4.1 消費(fèi)者確認(rèn)機(jī)制
4.1.1首先我們來測試一下acknowledge-mode: none不做處理的場景:
4.1.2 測試acknowledge-mode: auto自動處理情況下
4.1.2.1 消費(fèi)者拋出消息異常
4.1.2.2 消費(fèi)者拋出業(yè)務(wù)異常
5、消費(fèi)者失敗重試機(jī)制
5.1 消費(fèi)者失敗重試機(jī)制
5.2 失敗處理策略
1、RabbitMQ消息丟失的可能性
如下圖是消息從生產(chǎn)者到消費(fèi)者的關(guān)系圖。通過圖片我們可以分析,消息從生產(chǎn)者,MQ,消費(fèi)者這三個環(huán)節(jié)任一個都有可能丟失。那么下面我們就從這三點(diǎn)進(jìn)行分析。
1.1 生產(chǎn)者消息丟失場景
-
生產(chǎn)者發(fā)送消息時連接MQ失敗
-
生產(chǎn)發(fā)生消息到達(dá)MQ后未找到Exchange
-
生產(chǎn)者發(fā)送消息到達(dá)MQ的 Exchange 后,未找到合適的 Queue
-
消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常
1.2 MQ導(dǎo)致消息丟失
-
消息到達(dá)MQ,保存到隊(duì)列后,尚未消費(fèi)就突然宕機(jī)
1.3 消費(fèi)者丟失
-
消息接收后尚未處理突然宕機(jī)
-
消息接收后處理過程中拋出異常
綜上,我們要解決消息丟失問題,保證MQ的可靠性,就必須從3個方面入手:
-
確保生產(chǎn)者一定把消息發(fā)送到MQ
-
確保MQ不會將消息弄丟
-
確保消費(fèi)者一定要處理消息
2、如何保證生產(chǎn)者消息的可靠性
2.1 生產(chǎn)者重試機(jī)制
生產(chǎn)者發(fā)送消息時,出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。為了解決這個問題,SpringAMQP提供的消息發(fā)送時的重試機(jī)制。當(dāng) RabbitTemplate與MQ連接超時后,多次重試。
我們可以在生產(chǎn)者對應(yīng)的yml配置中配置:
spring:
rabbitmq:
?connection-timeout: 1s # 設(shè)置MQ的連接超時時間
?template:
??retry:
???enabled: true # 開啟超時重試機(jī)制
???initial-interval: 1000ms # 失敗后的初始等待時間
???multiplier: 2 # 失敗后下次的等待時長倍數(shù),下次等待時長 = initial-interval *multiplier
???max-attempts: 3 # 最大重試次數(shù)
我這邊故意把URL地址寫錯:
spring:
rabbitmq:
? host: 601.204.203.40
我們可以發(fā)現(xiàn)總共重試了3次。如圖所示:
但是SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,當(dāng)前線程是被阻塞的。如果對于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。
2.2 生產(chǎn)者確認(rèn)機(jī)制
其實(shí)一般我們生產(chǎn)者與MQ網(wǎng)絡(luò)連接比較穩(wěn)定,所以基本上不用考慮第一種場景。但是還有一些到達(dá)MQ之后可能會丟失的場景,比如:
-
生產(chǎn)者發(fā)送的消息到達(dá)MQ沒有找到Exchange
-
生產(chǎn)者發(fā)送的消息到達(dá)MQ找到Exchange,但是沒有找到Queue
-
MQ內(nèi)部處理消息進(jìn)程異常
基于上面幾種情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括 Publisher Confirm 和 Publisher Return 兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會根據(jù)消息處理的情況返回不同的回執(zhí)。具體主要有以下幾種情況:
-
當(dāng)消息發(fā)送到MQ上,但是路由失敗了,會返回會通過Publisher Return返回返回信息。同時會返回ack確認(rèn)信息,表示投遞成功。
-
當(dāng)非持久化消息發(fā)送到MQ上,并且入隊(duì)成功,會返回ACK確認(rèn)信息,表示投遞成功。
-
當(dāng)持久化消息發(fā)送到MQ上,入隊(duì)成功并且持久化到磁盤,會返回ACK確認(rèn)信息,表示投遞成功。
-
其它情況都會返回NACK,告知投遞失敗
其中 ack 和 nack 屬于Publisher Confirm機(jī)制, ack 是投遞成功; nack 是投遞失敗。而 return 則屬于Publisher Return機(jī)制。默認(rèn)情況,這兩種都是關(guān)閉的,需要通過配置開啟。
2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn)
2.3.1 配置yml開啟生產(chǎn)者確認(rèn)
我們在生產(chǎn)者對應(yīng)yml配置中加入
spring:
rabbitmq:
?publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型
?publisher-returns: true # 開啟publisher return機(jī)制
其中publisher-confirm-type一共有三種模式:
-
none :關(guān)閉confirm機(jī)制
-
simple :同步阻塞等待MQ的回執(zhí)
-
correlated :MQ異步回調(diào)返回回執(zhí)
一般我們都是開啟correlated模式。
2.3.2 定義ReturnCallback
每個 RabbitTemplate 只能配置一個 ReturnCallback,我們可以定義一個配置類統(tǒng)一配置。下面我們在生產(chǎn)者中定義配置類ReturnsCallbackConfig:
package com.chenwen.producer.config;
?
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
?
import javax.annotation.PostConstruct;
?
@Slf4j
@AllArgsConstructor
@Configuration
public class ReturnsCallbackConfig {
? ?private final RabbitTemplate rabbitTemplate;
?
? ?@PostConstruct
? ?public void init(){
? ? ? ?rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void returnedMessage(ReturnedMessage returned) {
? ? ? ? ? ? ? ?log.error("觸發(fā)return callback,");
? ? ? ? ? ? ? ?log.debug("交換機(jī)exchange: {}", returned.getExchange());
? ? ? ? ? ? ? ?log.debug("路由鍵routingKey: {}", returned.getRoutingKey());
? ? ? ? ? ? ? ?log.debug("message: {}", returned.getMessage());
? ? ? ? ? ? ? ?log.debug("replyCode: {}", returned.getReplyCode());
? ? ? ? ? ? ? ?log.debug("replyText: {}", returned.getReplyText());
? ? ? ? ? }
? ? ? });
? }
}
2.3.3 定義ConfirmCallback
因?yàn)槊總€消息處理邏輯不同,所以我們需要每個消息單獨(dú)定義ConfirmCallback。其實(shí)簡單來說,就是是在調(diào)用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數(shù)CorrelationData。
CorrelationData中包含兩個核心的東西:
-
id :消息的唯一標(biāo)示,MQ對不同的消息的回執(zhí)以此做判斷,避免混淆
-
SettableListenableFuture :回執(zhí)結(jié)果的Future對象
將來MQ的回執(zhí)就會通過這個 Future 來返回,我們可以提前給 CorrelationData 中的 Future 添加回調(diào)函數(shù)來處理消息回執(zhí):
下面我們定義一個測試生產(chǎn)者ConfirmCallback方法:
@Test
? ?void testProducerConfirmCallback() throws InterruptedException {
? ? ? ?// 創(chuàng)建CorrelationData
? ? ? ?CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
? ? ? cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
? ? ? ? ? @Override
? ? ? ? ? public void onFailure(Throwable ex) {
? ? ? ? ? ? ? log.error("消息回調(diào)失敗", ex);
? ? ? ? ? }
?
? ? ? ? ? @Override
? ? ? ? ? public void onSuccess(CorrelationData.Confirm result) {
? ? ? ? ? ? ? log.info("收到confirm callback回執(zhí)");
? ? ? ? ? ? ? if (result.isAck()) {
? ? ? ? ? ? ? ? ? log.info("消息發(fā)送成功,收到ack");
? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? // 消息發(fā)送失敗
? ? ? ? ? ? ? ? ? log.error("消息發(fā)送失敗,收到nack, 原因:{}", result.getReason());
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? });
? ? ? ?rabbitTemplate.convertAndSend("test.queue", "chenwen", "hello", cd);
? }
rabbitTemplate.convertAndSend("test.direct", "chenwen1", "hello", cd);
目前存在交換機(jī)test.direct,并且正確的路由鍵是chenwen。首先我這邊故意將路由鍵我寫錯成chenwen1。執(zhí)行測試方法,通過控制臺日志可以看到,路由失敗后,會通過Publisher Return返回異常信息,并且會返回ACK。
我們修改成功正確的路由鍵chenwen,執(zhí)行測試方法,可以看不會返回Publisher Return信息,只返回了ACK。
注意:
開啟生產(chǎn)者確認(rèn)模式比較消耗MQ性能,一般不建議開啟。我們分析一下幾種場景:
-
路由失?。哼@個其實(shí)是人為因素。由于我們編程錯誤導(dǎo)致的。
-
交換機(jī)名稱錯誤:同樣是編程錯誤導(dǎo)致
-
MQ內(nèi)部故障:這種需要處理,但概率往往較低。所以一般只有對消息可靠性極高的場景才需要開啟,這種的我們只需要開啟Publisher Confirm模式通過處理nack就可以。
3、MQ消息可靠性
MQ的可靠性,其實(shí)就是當(dāng)消息到達(dá)MQ,還沒有被消費(fèi)者消費(fèi),MQ就由于某些情況出現(xiàn)重啟,導(dǎo)致的消息丟失。主要是這幾個方面:
-
交換機(jī)Exchange持久化
-
隊(duì)列Queue持久化
-
消息本身的持久化
下面我們就以控制臺展示的為例子:
3.1 Exchange交換機(jī)持久化
Durability 就是表示設(shè)置交換機(jī)持久化的參數(shù), Durable 就是持久化模式, Transient 就是臨時模式。
3.2 Queues隊(duì)列持久化
同樣隊(duì)列持久化可以在控制臺Queues那邊設(shè)置,根據(jù)Durability設(shè)置,Durable 就是持久化模式, Transient 就是臨時模式。
3.3 消息的持久化
根據(jù)Delivery mode參數(shù)設(shè)置成2,就是持久化。
注意:如果開啟消息持久化,并且也開啟了生產(chǎn)者確認(rèn)模式。需要等消息持久化到磁盤,才會發(fā)送ACK回執(zhí),來保證消息的可靠性。
不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。
3.4 LazyQueue
在默認(rèn)的情況下,生產(chǎn)者發(fā)送消息是存放在內(nèi)存中,以提高收發(fā)消息的效率。但是由于某些情況會導(dǎo)致消息堆積。比如:
-
消費(fèi)者消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
-
生產(chǎn)者生產(chǎn)消息過快,超過了消費(fèi)者處理消息的能力
-
消費(fèi)者處理業(yè)務(wù)發(fā)生了堵塞
一旦消息堆積,會導(dǎo)致占用的內(nèi)存越來越大,直到觸發(fā)內(nèi)存預(yù)警。此時的RabbitMQ會將內(nèi)存上的消息持久化到磁盤中。這個行為成為PageOut 。 PageOut會耗費(fèi)一段時間,并且會阻塞隊(duì)列進(jìn)程。所以此時RabbitMQ不會再處理新的消息,生產(chǎn)者的所有的請求都會被阻塞。
RabbitMQ為了解決這個問題,從3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特性主要有如下:
-
接收到消息后直接存入磁盤而非內(nèi)存
-
消費(fèi)者要消費(fèi)消息時才會從磁盤中讀取并加載到內(nèi)存(也就是懶加載)
-
支持?jǐn)?shù)百萬條的消息存儲
而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級MQ為3.12版本或者所有隊(duì)列都設(shè)置為LazyQueue模式。
4、消費(fèi)者的可靠性
當(dāng)RabbitMQ向消費(fèi)者投遞消息的時候,可能由于某些因素會導(dǎo)致消息丟失,比如:
-
消息的投遞過程中出現(xiàn)網(wǎng)絡(luò)故障
-
消費(fèi)者接受到消息后突然宕機(jī)
-
消息者已經(jīng)接受到消息了,但是由于消費(fèi)者處理報錯導(dǎo)致異常
-
...............
一旦發(fā)生上面幾種情況,都會導(dǎo)致消息丟失。那我RabbitMQ肯定需要知道消費(fèi)者處理消息的狀態(tài),如果失敗了,可以再次進(jìn)行投遞。下面我們就來學(xué)習(xí)一下消費(fèi)者如何進(jìn)行消息確認(rèn)機(jī)制的。
4.1 消費(fèi)者確認(rèn)機(jī)制
消費(fèi)者處理消息之后,應(yīng)該向RabbitMQ發(fā)送一個回執(zhí)。告知RabbitMQ自己的消息處理狀態(tài)。主要有三個:
-
ack:處理消息成功,RabbitMQ從隊(duì)列中把消息刪除
-
nack:處理消息失敗,RabbitMQ需要重新投遞消息
-
reject:消息處理失敗并拒絕該消息,并且RabbitMQ會從隊(duì)列中刪除該消息
一般我們可以使用try catch 成功即返回ack,失敗返回nack。但是SpringAMQP幫我們實(shí)現(xiàn)了,我們只需要通過配置對應(yīng)acknowledge-mode參數(shù)即可實(shí)現(xiàn)。主要有三種模式:
-
none:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會立刻從MQ刪除,這種不建議使用
-
manual:手動模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送 ack 或 reject ,存在業(yè)務(wù)入侵,但更靈活
-
auto:自動模式。SpringAMQP利用AOP切面對我們方法進(jìn)行環(huán)繞增強(qiáng)。正常執(zhí)行返回ack,失敗則根據(jù)異常返回nack或者reject
-
如果是業(yè)務(wù)異常,會自動返回 nack;
-
如果是消息處理或校驗(yàn)異常,自動返回 reject ;
-
spring:
rabbitmq:
?listener:
??simple:
???acknowledge-mode: none # 不做處理
4.1.1首先我們來測試一下acknowledge-mode: none不做處理的場景:
我們先向隊(duì)列test.queue里面發(fā)送一條消息,可以看到test.queue隊(duì)列現(xiàn)在有一條消息。
消費(fèi)者這邊去監(jiān)聽這個消息,對應(yīng)代碼如下:
@Slf4j
@Component
public class ConsumeMqListener {
? ?@RabbitListener(queues = "test.queue")
? ?public void listenWorkQueue2(String msg) throws InterruptedException {
? ? ? ?log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
? ? ? ?if (true) {
? ? ? ? ? ?throw new MessageConversionException("測試沒有開啟消費(fèi)者確認(rèn)");
? ? ? }
? ? ? ?log.info("消息處理完成");
? }
}
dubug斷點(diǎn),還未拋出異常之前,此時我們先去刷下一下UI控制臺頁面,可以看到test.queue的一條消息已經(jīng)不存在了。
4.1.2 測試acknowledge-mode: auto自動處理情況下
4.1.2.1 消費(fèi)者拋出消息異常
現(xiàn)在我們在異常點(diǎn)打上斷點(diǎn),然后看看UI后臺消息的狀態(tài),我們看到現(xiàn)在消息狀態(tài)變成了Unacked。
斷點(diǎn)執(zhí)行完之后,我們看到此時UI頁面的消息數(shù)量為0。說明拋出MessageConversionException異常,是將消息直接reject的。
4.1.2.2 消費(fèi)者拋出業(yè)務(wù)異常
我們將消費(fèi)者內(nèi)部邏輯拋出RuntimeException異常,在拋出異常之前打上斷點(diǎn),然后觀察看UI頁面的消息狀態(tài)也是為Unacked狀態(tài)。
異常拋出之后我們?nèi)タ碪I頁面隊(duì)列中消息的狀態(tài)又回到了Ready狀態(tài)了,這樣就可以確保消費(fèi)者業(yè)務(wù)異常之后,消息還能夠再次投遞
5、消費(fèi)者失敗重試機(jī)制
5.1 消費(fèi)者失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯,消息會再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。極端情況就是消費(fèi)者一直無法執(zhí)行成功,那么消息requeue就會無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力。為了解決這個問題,Spring又提供了消費(fèi)者重試機(jī)制:在消費(fèi)者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊(duì)列。
在消費(fèi)者的application.yml配置下面參數(shù):
spring:
rabbitmq:
?listener:
??simple:
???retry:
????enabled: true # 開啟消費(fèi)者失敗重試
????initial-interval: 1000ms # 初識的失敗等待時長為1秒
????multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
????max-attempts: 3 # 最大重試次數(shù)
????stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
可以看到:
-
消費(fèi)者在消息失敗之后消息沒有重新回到隊(duì)列,而是在本地重試了三次
-
在本地重試三次以后,拋出了 AmqpRejectAndDontRequeueException 異常。我們查看UI頁面看到消息被刪除了,說明返回的消息回執(zhí)是reject。
5.2 失敗處理策略
我們可以看到,當(dāng)失敗重試3次,消息會被從隊(duì)列中刪除。這樣對于一些要求消息可靠性比較高的情況下,肯定是不符合的。因此Spring有提供了失敗處理的策略。這個策略是由 MessageRecovery 接口來定義的,它有3個不同實(shí)現(xiàn):
-
RejectAndDontRequeueRecoverer:重試次數(shù)耗盡之后,返回reject。直接將消息丟棄,這個是默認(rèn)模式
-
ImmediateRequeueMessageRecoverer:重試耗盡后,返回 nack ,消息重新入隊(duì)
-
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
最好的策略是RepublishMessageRecoverer,重試次數(shù)耗盡之后,將消息投遞到指定的交換機(jī)中,后續(xù)由人工來處理。下面我們就來演示一下這個場景,我們在消費(fèi)者服務(wù)這邊新增配置類ErrorConfiguration,聲明交換機(jī)和隊(duì)列,并綁定:
package com.chenwen.consumer.config;
?
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
?
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
?
?
? ?@Bean
? ?public DirectExchange errorExchange(){
? ? ? ?return new DirectExchange("error.direct");
? }
?
? ?@Bean
? ?public Queue errorQueue(){
? ? ? ?return new Queue("error.queue");
? }
?
? ?@Bean
? ?public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
? ? ? ?return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
? }
?
? ?@Bean
? ?public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
? ? ? ?log.debug("加載RepublishMessageRecoverer");
? ? ? ?return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
? }
}
?
可以看到,當(dāng)重試3次數(shù)耗盡之后,會將MQ信息放在error.queue隊(duì)列中,此時error.queue隊(duì)列多了一條數(shù)據(jù),后續(xù)我們?nèi)藶槿ヌ幚?,或者單?dú)使用一個監(jiān)聽去處理。
文章來源:http://www.zghlxwxcb.cn/news/detail-829666.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-829666.html
到了這里,關(guān)于RabbitMQ如何保證消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!