RabbitMQ通過生產(chǎn)者、消費者以及MQ Broker達到了解耦的特點,實現(xiàn)了異步通訊等一些優(yōu)點,但是在消息的傳遞中引入了MQ Broker必然會帶來一些其他問題,比如如何保證消息在傳輸過程中可靠性(即不讓數(shù)據(jù)丟失,發(fā)送一次消息就會被消費一次)?這篇博客將詳細從生產(chǎn)者,MQ Broker以及消費者的角度講解如何保證消息的可靠性!
1,消息丟失的情況
1.1 消息傳遞流程圖如下
?Producer -> exchange ->queue -> Consumer(其中exchange和queue屬于MQ Broker的組件)
1.2 消息可能丟失的情況
- 生產(chǎn)者給交換機exchange的過程中發(fā)生數(shù)據(jù)丟失;
- 交換機exchange路由給隊列queue的過程中發(fā)生數(shù)據(jù)丟失;
- 消息到達MQ的一瞬間,MQ發(fā)生了宕機的情況造成數(shù)據(jù)丟失;
- 消費者從隊列queue中取出消息進行消費的一瞬間消費者宕機了造成數(shù)據(jù)丟失。
2,生產(chǎn)者確認機制
生產(chǎn)者確認機制主要是站在生產(chǎn)者的角度來保證消息的可靠性,針對的是生產(chǎn)者給交換機發(fā)送消息以及交換機給隊列發(fā)送消息的過程中數(shù)據(jù)丟失的情況!
2.1 書寫配置信息
# 配置日志信息
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: 123.207.72.43 # rabbitMQ的ip地址
port: 5672 # 端口
username: admin
password: 123
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
#消息發(fā)送失敗時執(zhí)行returnCallback回調(diào)函數(shù)
template:
mandatory: true
- publisher-confirm-type:表示開啟publisher-confirm;這個參數(shù)有兩種類型,分別是correlated和simple(correlated代表異步等待回調(diào),類似于js中發(fā)送的ajax請求的回調(diào)函數(shù),MQ返回結(jié)果時會執(zhí)行定義的confirmCallback函數(shù);simple代表同步等待confirm結(jié)果直到超時);
- publisher-returns:表示開啟publish-return功能,同樣是基于callback機制,不過是定義returnCallback;
- template.mandatory:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息。
2.2 定義return回調(diào)機制
我們使用的是SpringBoot來整合的RabbitMQ,所以不論是return回調(diào)還是confim回調(diào)都是用rabbittemplate對象進行定義的。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//獲取獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//記錄日志
log.error("消息發(fā)送隊列失敗,響應(yīng)碼:{},失敗原因:{},交換機:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey,message.toString());
//如果需要的話進行消息的重發(fā)
});
}
}
注意:
- 一個RabbitTemplate只能配置一個ReturnCallback,所以需要在項目啟動的時候進行定義,這樣rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法進行定義);
- ApplicationContextAware是Spring創(chuàng)建完Bean工廠之后的通知方法,當Spring創(chuàng)建完Bean工廠之后就可以在Spring容器中拿到RabbitTemplate對象了;
- 配置ReturnCallback時可以采用匿名內(nèi)部類的方法簡化代碼,如果消息發(fā)送失敗可以根據(jù)需要進行消息重發(fā)操作。
2.3?定義confirm回調(diào)機制
ConfirmCallback可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同,可以通過測試方法進行定義。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws InterruptedException {
//1.準備消息
String message = "hello spring amqp";
//2.準備CorrelationData
//2.1 消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//2.2 準備準備ConfirmCallback
correlationData.getFuture().addCallback(confirm -> {
if (confirm.isAck()) {
log.debug("消息成功投遞到交換機!消息ID:{}", correlationData.getId());
} else {
log.error("消息投遞到交換機上失??!消息ID:{}", correlationData.getId());
//重發(fā)消息
}
}, throwable -> {
//記錄日志
log.error("發(fā)送消息失敗!",throwable);
//重發(fā)消息
});
//3.發(fā)送消息
rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);
//加上休眠時間 避免mq連接直接關(guān)閉
Thread.sleep(1000);
}
注意:
- 生產(chǎn)者給交換機發(fā)送的消息數(shù)據(jù)很多的,為了區(qū)分每個消息的歸屬,每個消息都要附屬上一個ID信息,可以采用UUID的方式生成唯一身份標識;
- 在發(fā)送消息的時候需要增加一個correlation變量,這個變量記錄了兩個東西(1.每個消息的ID 2.定義的cinfirm回調(diào)機制);
- 加上線程休眠的操作是為了避免消息發(fā)送到交換機之后mq的連接直接關(guān)閉,這樣會導(dǎo)致返回ack的錯誤。
3,消息持久化
消息持久化是站在MQ Broker的角度來保證消息的可靠性的,將交換機、隊列以及消息設(shè)置成持久化的從而避免MQ宕機造成消息的丟失!
3.1 交換機持久化
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct",true,false);
}
第二個參數(shù)設(shè)置成true就是讓就交換機是可持久化的,第三個參數(shù)是是否自動刪除,一般設(shè)為false;
3.2 隊列持久化
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
durable的意思就是可持久化的,傳入隊列名稱然后進行build操作,這樣創(chuàng)建的隊列就是一個可持久化的隊列;
3.3 消息持久化
將交換機和隊列設(shè)置為持久化的之后重啟MQ服務(wù)器之后消息依然會丟失,因為發(fā)送的消息不是可持久化的,所以也需要將消息設(shè)置成可持久化的
4,消費者消息確認
消費者消息確認是站在消費者的角度來保證消息可靠性的,消息者處理完一條消息之后需要給MQ Broker返回一條ACK表示消息處理完成!
4.1 三種確認模式
RabbitMQ支持消費者確認機制,即:消費者處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息。而SpringAMQP則允許配置三種確認模式:
- manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack;
- auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack;
- none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除。
4.2 none模式的演示
1.修改消費者工程中的配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關(guān)閉ack
2.監(jiān)聽一個隊列,在監(jiān)聽的方法中模擬一個異常情況,觀察消息是否會被刪除
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
//這里模擬一個異常
System.out.println(1 / 0);
log.info("消費者處理消息成功!");
}
3.在rabbitmq控制臺模擬發(fā)送一條消息,觀察拋出異常之后消息是否會重發(fā)
?拋出異常消費者并沒有處理消息成功,再觀察控制臺是否將消息刪除:
?隊列中已經(jīng)沒有消息了,說明消息被刪除了!
消費者確認機制為none的時候,只要消費者拿到消息之后MQ就會把消息刪除,不關(guān)心消費者是否將消息成功處理!
4.3 auto模式的演示
1.修改消費者工程中的配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關(guān)閉ack
?2.監(jiān)聽一個隊列,在監(jiān)聽的方法中模擬一個異常情況,觀察消息是否會被刪除
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
//這里模擬一個異常
System.out.println(1 / 0);
log.info("消費者處理消息成功!");
}
3.在rabbitmq控制臺模擬發(fā)送一條消息,觀察拋出異常之后消息是否會重發(fā)
消費者確認機制為auto的時候,消費者拿到消息之后MQ并不會立刻刪除隊列中的消息,只有消費者成功處理完消息之后給隊列返回一個ack的時候隊列才會刪除消息!
5, 消費者失敗重試機制
我們發(fā)現(xiàn)當消費者確認機制為auto時,如果代碼中出現(xiàn)了異常,消息會進行重復(fù)入隊列(requeue)的操作,重復(fù)入隊的操作對于MQ來說開銷會非常大,消息處理飆升,所以引入了失敗重試機制:當代碼中出現(xiàn)了異常的時候,消費者內(nèi)部會進行重發(fā)的操作(可以控制重發(fā)的時間和次數(shù)),如果超過設(shè)置的重發(fā)次數(shù)消費者還未成功處理消息默認將消息丟棄!
5.1 本地重試
Spring的retry機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列,可以在消費者工程的yml文件中添加如下配置:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 3 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 4 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
?4次重發(fā)之后消息還未成功處理spring拋出了AmqpRejectAndDontRequeueException異常,這是失敗之后的默認處理方式,默認消費者給隊列返回了ack,此時隊列會將消息從隊列中刪除!
5.2 失敗策略
失敗達到最大重試次數(shù)后,消息會被丟棄,這是由Spring內(nèi)部機制決定的。在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息,默認就是這種方式;
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊;
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機。
如果消息這個消息比較重要,達到最大重試次數(shù)之后這個消息不能被丟棄該怎么辦,此時就可以使用RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。
@Configuration
public class ErrorMessageConfig {
//定義失敗之后處理的交換機和隊列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
//將交換機和隊列進行綁定
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//定義一個RepublishMessageRecoverer,替換spring默認的處理機制?
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
流程圖如下:文章來源:http://www.zghlxwxcb.cn/news/detail-577678.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-577678.html
?6, 如何保證RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認機制,確保生產(chǎn)者的消息能到達隊列;
- 開啟持久化功能,確保消息未消費前在隊列中不會丟失;
- 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack;
- 開啟消費者失敗重試機制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理。
到了這里,關(guān)于RabbitMQ如何保證消息的可靠性6000字詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!