說明:在RabbitMQ消息傳遞過程中,有以下問題:
-
消息沒發(fā)到交換機(jī)
-
消息沒發(fā)到隊(duì)列
-
MQ宕機(jī),消息在隊(duì)列中丟失
-
消息者接收到消息后,未能正常消費(fèi)(程序報(bào)錯(cuò)),此時(shí)消息已在隊(duì)列中移除
針對(duì)以上問題,提供以下解決方案:
-
消息確認(rèn):確認(rèn)消息是否發(fā)送到交換機(jī)、隊(duì)列;
-
消息持久化:持久化消息,以防MQ宕機(jī)造成消息丟失;
-
消費(fèi)者消息確認(rèn):確認(rèn)消費(fèi)者已正確消費(fèi)消息,才把消息從隊(duì)列中刪除;
消息確認(rèn)
可以使用Rabbit MQ提供的publisher confirm機(jī)制來避免消息發(fā)送到MQ過程丟失。具體實(shí)現(xiàn)是,publisher-confirm(發(fā)送者確定)、publisher-return(發(fā)送者回執(zhí)),前者判斷消息到交換機(jī)、后者判斷交換機(jī)到隊(duì)列
publisher-confirm(發(fā)送者確定)
-
消息成功投遞到交換機(jī),返回ack;
-
消息未投遞到交換機(jī),返回nack;
publisher-return(發(fā)送者回執(zhí))
- 消息投遞到交換機(jī),但沒有到隊(duì)列,返回ack,即失敗原因;
在生產(chǎn)者端添加配置
spring:
rabbitmq:
# rabbitMQ相關(guān)配置
host: 118.178.228.175
port: 5672
username: root
password: 123456
virtual-host: /
# 開啟生產(chǎn)者確認(rèn),correlated為異步,simple為同步
publisher-confirm-type: correlated
# 開啟publish-return功能,基于callback機(jī)制
publisher-returns: true
# 開啟消息路由失敗的策略,true是調(diào)用returnCallback方法,false是丟棄消息
template:
mandatory: true
publisher-return(發(fā)送者回執(zhí))代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
/**
* 發(fā)送者回執(zhí)實(shí)現(xiàn)
*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對(duì)象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 回執(zhí)信息
* @param message 信息對(duì)象
* @param replyCode 回執(zhí)碼
* @param replyText 回執(zhí)內(nèi)容
* @param exchange 交換機(jī)
* @param routingKey 路由鍵值
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息發(fā)送隊(duì)列失敗=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);
}
});
}
}
publisher-confirm(發(fā)送者確定)代碼
@Test
public void sendExceptionMessage() {
// 路由鍵值
String routingKey = "exception";
// 消息
String message = "This is a exception message";
// 給消息設(shè)置一個(gè)唯一ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 編寫confirmCallBack回調(diào)函數(shù)
correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
if (confirm.isAck()) {
// 消息發(fā)送交換機(jī)成功
log.debug("消息送達(dá)至交換機(jī)成功");
} else {
// 消息發(fā)送交換機(jī)失敗,打印消息
log.error("消息未能送達(dá)至交換機(jī),ID{},原因{}", correlationData.getId(), confirm.getReason());
}
}
}, new FailureCallback() {
// 消息發(fā)送交換機(jī)異常
@Override
public void onFailure(Throwable ex) {
log.error("消息發(fā)送交換機(jī)異常,ID:{},原因{}", correlationData.getId(), ex.getMessage());
}
});
rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);
}
測(cè)試,設(shè)置一個(gè)不存在的routingKey,被發(fā)送者確認(rèn)(publisher-confirm)捕獲到;
// 路由鍵值
String routingKey = "null";
設(shè)置一個(gè)不存在的路由,被發(fā)送者回執(zhí)(publisher-return)捕獲到;
rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);
消息持久化
消息持久化,是指把消息保存到磁盤中,在RabbitMQ宕機(jī)或者關(guān)機(jī)時(shí),重啟后,消息仍可以保存下來。消息依賴于交換機(jī)、隊(duì)列,因此持久化消息,同時(shí)也需要持久化交換機(jī)、隊(duì)列。
創(chuàng)建一個(gè)持久化的交換機(jī)、隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息持久化
*/
@Configuration
public class DurableConfig {
/**
* 交換機(jī)持久化
* @return
*/
@Bean
public DirectExchange directExchange(){
// 三個(gè)參數(shù)分別是:交換機(jī)名、是否持久化、沒有隊(duì)列與之綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("durable.direct",true,false);
}
/**
* 隊(duì)列持久化
* @return
*/
@Bean
public Queue durableQueue(){
return QueueBuilder.durable("durable.queue").build();
}
/**
* 交換機(jī)與隊(duì)列綁定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");
}
}
發(fā)送一個(gè)持久化的消息
/**
* 發(fā)送持久化消息
*/
@Test
public void sendDurableMessage() {
String routingKey = "durable";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))
// 設(shè)置該消息未持久化消息
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);
}
打開RabbitMQ管理平臺(tái),可以看到"delivery_mode: 2",表示該消息是持久化消息
(源碼:MessageDeliveryMode類)
實(shí)際上,交換機(jī)、隊(duì)列默認(rèn)就是持久化的(durable: true),所以不用特意設(shè)置;
消費(fèi)者消息確認(rèn)
介紹
消費(fèi)者消息確認(rèn),是為了確保消費(fèi)者已經(jīng)消費(fèi)了消息,才讓MQ把該消息刪除;
可通過在消費(fèi)者的配置文件中增加下面這行配置實(shí)現(xiàn),備選項(xiàng)有以下三個(gè):
-
none:關(guān)閉ack,表示不做處理,消息發(fā)給消費(fèi)者之后就立即被刪除;
-
auto:自動(dòng)ack,表示由Spring檢測(cè)代碼是否出現(xiàn)異常,出現(xiàn)異常則保留消息,沒有異常則刪除消息;
-
manual:手動(dòng)ack,可根據(jù)業(yè)務(wù)手動(dòng)編寫代碼,返回ack;
spring:
rabbitmq:
listener:
simple:
# 設(shè)置消息確認(rèn)模式
acknowledge-mode: none
測(cè)試:none
可編寫代碼測(cè)試,下面是生產(chǎn)者代碼,發(fā)送消息
/**
* 發(fā)送普通消息
*/
@Test
public void sendNoneMessage() {
String directName = "none.direct";
String routingKey = "none";
String message = "This is a test message";
rabbitTemplate.convertAndSend(directName, routingKey, message);
}
消費(fèi)者代碼有問題,未能正常消費(fèi)消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "none.queue"),
exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),
key = {"none"}
))
public void getNoneMessage(String normalMessage){
System.out.println(1/0);
System.out.println("normalMessage = " + normalMessage);
}
測(cè)試結(jié)果,程序報(bào)錯(cuò),消息也沒能保留下來
測(cè)試:auto
更改設(shè)置為:auto,重試
但是消息未被刪除
這種情況,在實(shí)際開發(fā)中是不能允許,可以通過更改消費(fèi)失敗的重試機(jī)制解決。
消費(fèi)失敗重試機(jī)制
方法一:設(shè)置retry
因?yàn)橄⒈幌M(fèi)失敗,消息會(huì)一直循環(huán)重試,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力,這種情況可以通過在消費(fèi)者端添加以下配置,限制失敗重試的條件來解決:
spring:
rabbitmq:
listener:
simple:
retry:
# 開啟消費(fèi)者失敗重試
enabled: true
# 初次失敗等待時(shí)長(zhǎng)為1秒
initial-interval: 1000
# 失敗的等待時(shí)長(zhǎng)倍數(shù),即后一次等待的時(shí)間是前一次等待時(shí)間的多少倍
multiplier: 1
# 最多重試次數(shù)
max-attempts: 3
# true 無狀態(tài) false 有狀態(tài) 如果業(yè)務(wù)中包含事務(wù) 改為false
stateless: true
開啟后,控制臺(tái)可以發(fā)現(xiàn),信息不回一直循環(huán)打印,而是打印數(shù)條后停止,日志信息中有提示“Retry Policy Exhausted”(重試策略已用盡)
這種通過配置的方式,并不會(huì)重試數(shù)次后仍保留消息,而是重試數(shù)次仍失敗,隨即丟棄消息,消息丟失,這在實(shí)際開發(fā)中也是不能被允許的。
方法二:路由存儲(chǔ)消息
因此,可以通過下面這個(gè)方法,把消費(fèi)失敗的消息,通過交換機(jī)路由到另外的隊(duì)列中存儲(chǔ)起來,等業(yè)務(wù)代碼被修復(fù),再路由回來消費(fèi)。
代碼如下
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 錯(cuò)誤消息隊(duì)列
*/
@Configuration
public class ErrorMessageQueueConfig {
/**
* 創(chuàng)建一個(gè)交換機(jī),用于路由消費(fèi)失敗的消息
* @return
*/
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.direct");
}
/**
* 創(chuàng)建一個(gè)隊(duì)列,用于存儲(chǔ)消費(fèi)失敗的消息
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
/**
* 綁定
* @return
*/
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
/**
* 路由,當(dāng)消費(fèi)失敗時(shí),把消費(fèi)失敗的消息路由到此隊(duì)列中,路由key為"error"
* @param rabbitTemplate
* @return
*/
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
可以看到,消息消費(fèi)失敗后并沒有被丟失,而是路由到錯(cuò)誤隊(duì)列中存儲(chǔ)了起來。因?yàn)殄e(cuò)誤隊(duì)列沒有設(shè)置RabbitListener,所以可以存儲(chǔ)消息,等帶代碼問題被排查出來后,可以再針對(duì)該隊(duì)列設(shè)置監(jiān)聽方法,消費(fèi)這部分錯(cuò)誤的消息。
另外,值得一提的是,消費(fèi)者這邊的控制臺(tái)會(huì)報(bào)一個(gè)警告,提示路由密鑰錯(cuò)誤。我們可以理解,在RabbitMQ底層,會(huì)把消費(fèi)失敗了的消息,統(tǒng)一路由到一個(gè)地方去,而我們這種手動(dòng)把消費(fèi)失敗的消息路由到自定義的隊(duì)列中的方式,打破了這種“默認(rèn)的規(guī)則”,所以報(bào)了一個(gè)這樣的警告。這種警告是在可控范圍內(nèi)的。
文章來源:http://www.zghlxwxcb.cn/news/detail-601451.html
總結(jié)
RabbitMQ發(fā)送消息,為了確保消息的可靠性,保證消息能被交換機(jī)、隊(duì)列收到,消息能被正常消費(fèi),而不會(huì)因消費(fèi)失敗而丟失,提供了對(duì)應(yīng)的一系列方法,并且最后還提供了兩種消費(fèi)失敗重試方法,優(yōu)化了消費(fèi)過程,非常Nice。文章來源地址http://www.zghlxwxcb.cn/news/detail-601451.html
到了這里,關(guān)于RabbitMQ消息可靠性問題及解決的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!