一、介紹
MQ
的消息可靠性,將從以下四個方面展開并實踐:
- 生產(chǎn)者消息確認
- 消息持久化
- 消費者消息確認
- 消費失敗重試機制
二、生產(chǎn)者消息確認
對于publisher
,如果message
到達exchange
與否,rabbitmq
提供publiser-comfirm
機制,如果message
達到exchange
但是是否到達queue
,rabbitmq
提供publisher-return
機制。這兩種機制在代碼中都可以通過配置來自定義實現(xiàn)。
以下操作都在publisher
服務(wù)方完成。
1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
配置說明:publish-confirm-type
:開啟publisher-confirm
,這里支持兩種類型:
-
simple
:同步等待confirm
結(jié)果,直到超時 -
correlated
:異步回調(diào),定義ConfirmCallback
,MQ
返回結(jié)果時會回調(diào)這個ConfirmCallback
publish-returns
:開啟publish-return
功能,同樣是基于callback
機制,不過是定義ReturnCallback
template.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback
; false,則直接丟棄消息
2. 配置ReturnCallBack
每個RabbitTemplate
只能配置一個ReturnCallBack
,所以直接給IoC
里面的RabbitTemplate
配上,所有人都統(tǒng)一用。
新建配置類,實現(xiàn)ApplicationContextAware
接口,在接口中setReturnCallback
。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
//check if is delay message
if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {
return;
}
log.error("消息發(fā)送到queue失敗,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}
3. 配置ConfirmCallBack
ConfirmCallBack在message發(fā)送時配置,每個message都可以有自己的ConfirmCallBack。
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String message = "hello, spring amqp!";
// confirm callback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()){
log.debug("消息到exchange成功, id={}", correlationData.getId());
}else {
log.error("消息到exchange失敗, id={}", correlationData.getId());
}
},
throwable -> {
log.error("消息發(fā)送失敗", throwable);
}
);
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
4. 測試
將消息發(fā)送到一個不存在的exchange
,模擬消息達到exchange
失敗,觸發(fā)ConfirmCallBack
,日志如下。
18:22:03:913 ERROR 23232 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aamq.topic' in vhost '/', class-id=60, method-id=40)
18:22:03:915 ERROR 23232 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest : 消息到exchange失敗, id=0c0910a3-7937-43ea-9606-e5bbcdda0b5c
將消息發(fā)送到一個存在的exchange
,但routekey
異常,模擬消息到達exchange
但沒有到達queue
,觸發(fā)ConfirmCallBack
和ReturnCallBack
,日志如下。
18:27:22:757 INFO 20184 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7428de63:0/SimpleConnection@6d60899e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 53662]
18:27:22:797 DEBUG 20184 --- [ 127.0.0.1:5672] cn.itcast.mq.spring.SpringAmqpTest : 消息到exchange成功, id=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11
18:27:22:796 ERROR 20184 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig : 消息發(fā)送到queue失敗,replyCode=312, reason=NO_ROUTE, exchange=amq.topic, routeKey=simplee.test, message=(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
三、消息持久化
新版本的
SpringAMQP
默認開啟持久化。RabbitMQ
本身并不默認開啟持久化。
隊列持久化,通過QueueBuilder
構(gòu)建持久化隊列,比如
@Bean
public Queue simpleQueue(){
return QueueBuilder
.durable("simple.queue")
.build();
}
消息持久化,在發(fā)送時可以設(shè)置,比如
@Test
public void testDurableMessage(){
Message message = MessageBuilder.withBody("hello springcloud".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("simple.queue", message);
}
四、消費者消息確認
消費者消息確認是指,consumer
收到消息后會給rabbitmq
發(fā)送回執(zhí)來確認消息接收狀況。
SpringAMQP
允許配置三種確認模式:
-
manual
:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。 -
auto
:自動ack,由spring
監(jiān)測listener
代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack -
none
:關(guān)閉ack, MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto # manual auto none
但是auto
有個很大的缺陷,因為rabbitmq
會自動不斷給有問題的listen
反復(fù)投遞消息,導(dǎo)致不斷報錯,所以建議使用下一章的操作。
五、消費失敗重試機制
當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue
(重新入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次requeue
,無限循環(huán),導(dǎo)致mq
的消息處理飆升,帶來不必要的壓力。
我們可以利用Spring
的retry
機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue
到mq
隊列。
1. 引入依賴
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 #初識的失敗等待時長為1秒
multiplier: 2 # 下次失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
2. 配置重試次數(shù)耗盡策略
我們采用RepublishMessageRecoverer
。
定義用于接收失敗消息的exchange
,queue
以及它們之間的bindings
。
然后定義MessageRecoverer
,比如
@Component
public class ErrorMessageConfig {
@Bean
public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
3. 測試
定義處理異常消息的exchange
和queue
,比如
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "error.queue"),
exchange = @Exchange(name = "error.exchange"),
key = "error"
))
public void listenErrorQueue(String msg){
log.info("消費者接收到error.queue的消息:【" + msg + "】");
}
定義如下一個listener
,來模擬consumer
處理消息失敗觸發(fā)消息重試。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "simple.queue"),
exchange = @Exchange(name = "simple.exchange"),
key = "simple"
))
public void listenSimpleQueue(String msg) {
log.info("消費者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1/0);
log.info("consumer handle message success");
}
寫一個簡單的測試,往simple.exchange發(fā)送消息,比如
@Test
public void testSendMessageSimpleQueue() throws InterruptedException {
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
}
運行測試,consumer得到以下日志文章來源:http://www.zghlxwxcb.cn/news/detail-802054.html
18:51:10:164 INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:11:167 INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:168 INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:176 WARN 24072 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.exchange' with routing key error
18:51:13:181 INFO 24072 --- [ntContainer#1-1] c.i.mq.listener.SpringRabbitListener : 消費者接收到error.queue的消息:【hello, spring amqp!】
可以看到spring
嘗試2次重發(fā),一共3次,第一次間隔1秒,第二次間隔2秒,重試次數(shù)耗盡,消息被consumer
傳入error
.exchange
,注意,是consumer
傳的,不是simple
.queue
。文章來源地址http://www.zghlxwxcb.cn/news/detail-802054.html
到了這里,關(guān)于RabbitMQ常見問題之消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!