1.消息可靠性
消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過(guò)程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
- 發(fā)送時(shí)丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費(fèi)就宕機(jī)
針對(duì)這些問(wèn)題,RabbitMQ分別給出了解決方案:
- 生產(chǎn)者確認(rèn)機(jī)制
- mq持久化
- 消費(fèi)者確認(rèn)機(jī)制
- 失敗重試機(jī)制
下面操作通過(guò)案例來(lái)演示每一個(gè)步驟。
項(xiàng)目結(jié)構(gòu)如下:
1.1.生產(chǎn)者消息確認(rèn)
RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過(guò)程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
注意:
1.1.1.修改配置
首先,在publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
說(shuō)明:
-
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:-
simple
:同步等待confirm結(jié)果,直到超時(shí) -
correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
-
-
publish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過(guò)是定義ReturnCallback -
template.mandatory
:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
1.1.2.定義Return回調(diào)
每個(gè)RabbitTemplate只能配置一個(gè)R
eturnCallback,因此需要在項(xiàng)目加載時(shí)配置:
修改publisher服務(wù),添加一個(gè):
package cn.zqd.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業(yè)務(wù)需要,可以重發(fā)消息
});
}
}
1.1.3.定義ConfirmCallback
ConfirmCallback可以在發(fā)送消息時(shí)指定,因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。
在publisher服務(wù)的cn.zqd.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息體
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失敗
log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一會(huì)兒,等待ack回執(zhí)
Thread.sleep(2000);
}
1.2.消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
1.2.1.交換機(jī)持久化
RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("simple.direct", true, false);
}
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)都會(huì)帶上D
的標(biāo)示:
1.2.2.隊(duì)列持久化
RabbitMQ中隊(duì)列默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的隊(duì)列都會(huì)帶上D
的標(biāo)示:
1.2.3.消息持久化
利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代碼指定:
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
1.3.消費(fèi)者消息確認(rèn)
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。
而RabbitMQ是通過(guò)消費(fèi)者回執(zhí)來(lái)確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場(chǎng)景:
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。
而SpringAMQP則允許配置三種確認(rèn)模式:
?manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
?auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
?none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除
由此可知:
- none模式下,消息投遞是不可靠的,可能丟失
- auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒有異常,返回ack
- manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack
一般,我們都是使用默認(rèn)的auto即可。
1.3.1.演示none模式
修改consumer服務(wù)的application.yml文件,添加下面內(nèi)容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關(guān)閉ack
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);
// 模擬異常
System.out.println(1 / 0);
log.debug("消息處理完成!");
}
測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。
1.3.2.演示auto模式
再次把確認(rèn)機(jī)制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關(guān)閉ack
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:
1.4.消費(fèi)失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:
怎么辦呢?
1.4.1.本地重試
我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費(fèi)者失敗重試
initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒
multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):
- 在重試3次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說(shuō)明本地重試觸發(fā)了
- 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是ack,mq刪除消息了
結(jié)論:
- 開啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄
1.4.2.失敗策略
在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來(lái)處理,它包含三種不同的實(shí)現(xiàn):
-
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
-
ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
-
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代碼:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-470851.html
package cn.zqd.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
1.5.總結(jié)
如何確保RabbitMQ消息的可靠性?(面試題)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-470851.html
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
到了這里,關(guān)于RabbitMQ-保證消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!