消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考:
- 消息可靠性問題:如何確保發(fā)送的消息至少被消費(fèi)—次
- 延遲消息問題:如何實(shí)現(xiàn)消息的延遲投遞
- 消息堆積問題:如何解決數(shù)百萬(wàn)消息堆積,無(wú)法及時(shí)消費(fèi)的問題
- 高可用問題:如何避免單點(diǎn)的MQ故障而導(dǎo)致的不可用問題
一、消息可靠性
背景/需求:消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
-
發(fā)送時(shí)丟失:
- 生產(chǎn)者發(fā)送的消息【未送達(dá)exchange】——返回nack(消息確認(rèn)模式)
- 消息【到達(dá)exchange】——返回ack(消息確認(rèn)模式)
-
到達(dá)queue后,MQ宕機(jī),queue將消息丟失
——返回ACK,及路由失敗原因(回退模式) -
consumer接收到消息后還未消費(fèi)就宕機(jī)——消息持久化
1、【生產(chǎn)者】消息確認(rèn)
RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
注意:確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
1.1 修改application.yml配置文件,添加下面的內(nèi)容:
位置:生產(chǎn)者/publisher服務(wù)
目的:
1、開啟消息確認(rèn)模式
2、開啟消息回退(并設(shè)置消息路由到隊(duì)列失敗時(shí),回退消息給回調(diào)接口)
spring:
rabbitmq:
publisher-confirm-type: correlated
# 開啟publisher-confirm,且選擇correlated:【異步】回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
publisher-returns: true # 開啟publish-return功能
template:
mandatory: true # 定義當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí)的策略?!総rue,則調(diào)用ReturnCallback;false:則直接丟棄消息】
說(shuō)明:
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:
simple
:【同步】等待confirm結(jié)果,直到超時(shí)(可能引起代碼阻塞)correlated
:【異步】回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallbackpublish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallbacktemplate.mandatory
:定義當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí)的策略?!総rue,則調(diào)用ReturnCallback;false:則直接丟棄消息】
1.2定義Return回退:
說(shuō)明:因?yàn)樵趛ml配置文件中定義消息路由失敗時(shí)的策略為true,所以當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí),會(huì)調(diào)用ReturnCallback
每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)添加配置:
修改publisher服務(wù),添加一個(gè)【配置類】:
位置:config/commic配置類
如何保證在項(xiàng)目加載時(shí)添加配置?
1、實(shí)現(xiàn)ApplicationContextAware(實(shí)現(xiàn)了ApplicationContextAware接口的實(shí)現(xiàn)類,在Spring容器的Bean工廠創(chuàng)建完畢后會(huì)通知該實(shí)現(xiàn)類)
2、此時(shí),該實(shí)現(xiàn)/配置類有了Spring容器的Bean工廠類;就可以獲取并設(shè)置ReturnCallback(Spring容器的Bean對(duì)象)
3、開始配置ReturnCallback;
ReturnCallback的回調(diào)函數(shù):當(dāng)消息成功發(fā)送到交換機(jī),但是沒有成功發(fā)送到消息隊(duì)列時(shí),回退到回調(diào)函數(shù),應(yīng)該如何處理?就是回調(diào)函數(shù)里面的內(nèi)容
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
//實(shí)現(xiàn)了ApplicationContextAware接口的實(shí)現(xiàn)類,在Spring容器的Bean工廠創(chuàng)建完畢后會(huì)通知該實(shí)現(xiàn)類
//有了Bean工廠類,然后就可以獲取并設(shè)置ReturnCallback(Spring容器的Bean對(duì)象)
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// (從Spring容器中)獲取RabbitTemplate對(duì)象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 記錄日志
log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的話,重發(fā)消息
});
}
}
1、重寫方法setApplicationContext(ApplicationContext applicationContext)
參數(shù)為接口,且該【接口只有一個(gè)方法】可以用lambda表達(dá)式代替
代替后如下
2、編寫回調(diào)函數(shù)
(ReturnCallback回調(diào)函數(shù):當(dāng)消息成功發(fā)送到交換機(jī),但是沒有成功發(fā)送到消息隊(duì)列時(shí),回退到回調(diào)函數(shù),應(yīng)該如何處理?就是回調(diào)函數(shù)里面的內(nèi)容)
1.3 定義ConfirmCallback(消息確認(rèn))
ConfirmCallback【可以在發(fā)送消息時(shí)指定】因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同
消息發(fā)送代碼如下:
位置:在publisher服務(wù)的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:
注意:確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突
CorrelationData的作用:
1、消息ID需要封裝到CorrelationData
2、correlationData.getFuture().addCallback(…)是一個(gè)回調(diào)函數(shù):決定了每個(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.準(zhǔn)備消息
String message = "hello, spring amqp!";
// 2.準(zhǔn)備CorrelationData(消息ID需要封裝到CorrelationData)
// 2.1.消息ID,確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.準(zhǔn)備ConfirmCallback(Future是對(duì)將來(lái)的一種處理的封裝)(Future.addCallback)
correlationData.getFuture().addCallback(
result -> {
// 判斷結(jié)果
if (result.isAck()) {
// ACK
log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId());
// 重發(fā)消息
}
},
ex -> {
// 記錄日志
log.error("消息發(fā)送失?。?, ex);
// 重發(fā)消息
});
// 3.發(fā)送消息(這里如果沒有綁定交換機(jī)和隊(duì)列關(guān)系等,可以去管控臺(tái)綁定,也可以在消費(fèi)者的配置類中聲明)
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
}
附1:有/無(wú)消息確認(rèn)機(jī)制的publish消息發(fā)送的對(duì)比
沒有confirm確認(rèn)機(jī)制:
消息確認(rèn)機(jī)制:
RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
附2:lambda表達(dá)式如何變換
變化后:
測(cè)試
(這里如果沒有綁定交換機(jī)和隊(duì)列關(guān)系等,可以去管控臺(tái)綁定,也可以在消費(fèi)者的配置類中聲明)
測(cè)試1、confirm:消息成功到達(dá)交換機(jī)——返回ack
測(cè)試2:confirm:消息未成功到達(dá)交換機(jī)——返回nack
測(cè)試3:消息發(fā)送到了交換機(jī)但沒有發(fā)送到隊(duì)列——返回ack,但是return回退
故意將隊(duì)列名字寫錯(cuò)(交換機(jī)不存在綁定該隊(duì)列)
返回ACK,及路由失敗原因.
2、消息持久化(了解)
背景/需求:生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果消息隊(duì)列突然宕機(jī),也可能導(dǎo)致消息丟失。
(因?yàn)橄㈥?duì)列默認(rèn)是內(nèi)存存儲(chǔ))
(發(fā)送到消息隊(duì)列成功+消息隊(duì)列突然宕機(jī)=消息丟失)
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制(寫入到磁盤中)
注:SpringAMQP默認(rèn)是進(jìn)行持久化(包括聲明隊(duì)列、交換機(jī)、發(fā)送消息)(備注:通過管控臺(tái)創(chuàng)建的默認(rèn)是非持久化的)
那么,下面學(xué)的消息持久化有什么用呢?持久化畢竟是寫磁盤,會(huì)有一定的性能損耗,不是所有的數(shù)據(jù)都需要持久化,學(xué)了下面的持久化后可以手動(dòng)將不需要持久化的數(shù)據(jù)取消持久化
RabbitMQ Management 控制臺(tái)設(shè)置:
說(shuō)明:
Durable:持久的
Transient:轉(zhuǎn)瞬即逝的
交換機(jī)持久化
消息隊(duì)列持久化
代碼(配置類聲明)
說(shuō)明:由SpringAMQP聲明的交換機(jī)和隊(duì)列都是持久化的(所以持久化隊(duì)列和交換機(jī)的代碼和我們之前配置類聲明隊(duì)列、交換機(jī)一樣)
@Configuration
public class CommonConfig {
// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除(事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)和隊(duì)列都是持久化的)
@Bean
public DirectExchange simpleDirect(){
//默認(rèn)為return new DirectExchange("simple.direct",true,false);
return new DirectExchange("simple.direct");
}
// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
@Bean
public Queue simpleQueue(){
//new Queue("");默認(rèn)代碼為public Queue(String name) { this(name, true, false, false);}
return QueueBuilder.durable("simple.queue").build();
}
}
聲明完隊(duì)列和交換機(jī),可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)和隊(duì)列都會(huì)帶上D
的標(biāo)示:
此時(shí),消息和隊(duì)列都持久化了,但是,如果消息還是沒有持久化(重啟rabbitmq,交換機(jī)和隊(duì)列都在,但是消息會(huì)消失)
消息持久化
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
下面是手動(dòng)設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
NON_PERSISTENT,非持久化
PERSISTENT;持久化
@Test
public void testDurableMessage() {
// 1.準(zhǔn)備消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
3、【消費(fèi)者】消息確認(rèn)
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。此時(shí),如果消費(fèi)者還沒有處理消息,然后消費(fèi)者掛掉了,就會(huì)導(dǎo)致消息丟失。
場(chǎng)景如下:
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,【返回ACK給RabbitMQ】
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
(成功發(fā)送到消費(fèi)者+消費(fèi)者還沒處理消息就宕機(jī)了)消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。
而SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除(此時(shí),消息投遞是不可靠的,可能丟失)
一般,我們都是使用默認(rèn)的auto即可。
【yml配置文件中配置消息確認(rèn)模式】:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto #由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
消費(fèi)者模擬處理異常
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
//none模式下,消費(fèi)者在這里接收到消息后,消息就從隊(duì)列中被刪除了
log.debug("消費(fèi)者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);//拋出異常、后面就不會(huì)執(zhí)行業(yè)務(wù)代碼
log.info("消費(fèi)者處理消息成功!");//模擬業(yè)務(wù)代碼
}
}
發(fā)送消息測(cè)試
管控臺(tái)發(fā)送消息
auto模式下:由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:
【問題】:當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:(這里測(cè)試一條數(shù)據(jù)就已經(jīng)達(dá)到 3000條/s 了)
4、消費(fèi)失敗重試機(jī)制
我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列(不返回ack,也不返回nack),而是可以自己設(shè)置重試的次數(shù)(如果在重試n次后仍然失敗,那么后面在繼續(xù)重入隊(duì)大概率也會(huì)失敗,那么就直接扔掉,不再重入隊(duì),此時(shí),Spring會(huì)返回ack)
總結(jié):
- 開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 【重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄】
1、本地重試
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry: # Spring消費(fèi)者失敗重試
enabled: true # 【開關(guān)】開啟消費(fèi)者失敗重試
initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒(第一次失敗后1s重試)
multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval(舉例:倍數(shù)*第一次等待時(shí)長(zhǎng)1s,這樣子永遠(yuǎn)都是1s)
#但是如果設(shè)置為2,下次等待時(shí)長(zhǎng)為上次的2倍,因此等待時(shí)長(zhǎng)依次為1、2、4、8、16....
max-attempts: 3 # 最大重試次數(shù)
stateless: true # (默認(rèn)為true)true無(wú)狀態(tài);false有狀態(tài)【如果業(yè)務(wù)中包含事務(wù),這里改為false】
#(備注:如果設(shè)置為false,那么Spring在重試的時(shí)候保留事務(wù)——消耗性能,所以沒有事務(wù)時(shí)設(shè)置為true提升性能)
max-interval: 10000 # 最大等待時(shí)長(zhǎng),大于此時(shí)長(zhǎng)的一律按最大時(shí)長(zhǎng)來(lái)計(jì)算
重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):
- 在重試4次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說(shuō)明本地重試觸發(fā)了
- 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是ack,mq刪除消息了
2、失敗策略
問題:在上面的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。但是,有些數(shù)據(jù)特別重要,我們不希望任何消息被丟棄,此時(shí),我們應(yīng)該如何實(shí)現(xiàn)?
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來(lái)處理,它包含三種不同的實(shí)現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,【丟棄消息】【默認(rèn)】就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)(Immediate立刻重入隊(duì))
(但是頻率比沒有配置消費(fèi)失敗重載機(jī)制低一些)- RepublishMessageRecoverer(推薦):重試耗盡后,將失敗消息投遞到指定的交換機(jī)
RepublishMessageRecoverer:失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理,這樣所有的消息都不會(huì)丟失。
【RepublishMessageRecoverer處理模式的代碼實(shí)現(xiàn)】 :
1)【定義】處理失敗消息的【交換機(jī)和隊(duì)列】
位置:在consumer服務(wù)中的配置類config/ErrorMessageConfig.java
作用:聲明交換機(jī)、隊(duì)列、綁定關(guān)系
綁定關(guān)系:交換機(jī)error.direct–routingkey(error)–》error.queue
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
2)定義一個(gè)RepublishMessageRecoverer,指定/關(guān)聯(lián)隊(duì)列和交換機(jī)
失敗策略:
1、參數(shù)rabbitTemplate(Spring容器自動(dòng)注入)
2、通過rabbitTemplate將消息發(fā)送到(處理失敗消息的)交換機(jī)(routingKey為error)
這里的RepublishMessageRecoverer的作用:當(dāng)消費(fèi)者的消息失敗重試次數(shù)用盡后,將失敗的消息【丟棄給指定的error交換機(jī)的error隊(duì)列】
//參數(shù) Spring自動(dòng)注入的rabbitTemplate
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
測(cè)試結(jié)果:
我們可以在error隊(duì)列中查看具體的錯(cuò)誤信息,然后進(jìn)行修改
5、總結(jié):
如何確保RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
二、死信交換機(jī)
1、初識(shí)死信交換機(jī)
什么是死信?
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):
- 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個(gè)過期消息,超時(shí)無(wú)人消費(fèi)
- 要投遞的隊(duì)列消息滿了,無(wú)法投遞
如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange
屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)
死信交換機(jī)過程大致如下:
1、(重試次數(shù)耗盡)一個(gè)消息被消費(fèi)者拒絕了,變成了死信
2、因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī)
3、如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列
附:死信交換機(jī)對(duì)比:消費(fèi)失敗策略
發(fā)送消息的對(duì)象不同?
republish是由consumer發(fā)送,死信是由隊(duì)列去發(fā)送
從上圖可以看出,發(fā)送消息的對(duì)象不同,因此,死信交換機(jī)的其中一個(gè)功能和消費(fèi)失敗策略功能類似:作為一個(gè)兜底方案,當(dāng)消費(fèi)者宕機(jī),導(dǎo)致隊(duì)列滿了放不下 隊(duì)列還可以將溢出的消息轉(zhuǎn)發(fā)到死信隊(duì)列。
【代碼實(shí)現(xiàn):利用死信交換機(jī)接收死信】
那么如何實(shí)現(xiàn)呢?
隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey
這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。
代碼實(shí)現(xiàn):
聲明交換機(jī)、隊(duì)列、綁定關(guān)系
+指定死信交換機(jī)
// 【聲明普通的 simple.queue隊(duì)列,并且為其指定死信交換機(jī):dl.direct】
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
.deadLetterExchange("dl.direct") // 【指定死信交換機(jī)】
.build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲(chǔ)死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
2、TTL
什么是TTL?
TTL,也就是Time-To-Live。如果一個(gè)隊(duì)列中的消息TTL結(jié)束仍未消費(fèi),則會(huì)變?yōu)樗佬?,ttl超時(shí)分為兩種情況:
- 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
- 消息本身設(shè)置了超時(shí)時(shí)間
從上面的死信交換機(jī)中指定,消息是一個(gè)過期消息,超時(shí)無(wú)人消費(fèi),這條消息就會(huì)被投遞到死信交換機(jī)中。
其流程大致如下:
TTL的延申功能:延遲消息
給一個(gè)消息/隊(duì)列設(shè)置超時(shí)時(shí)間,將消息發(fā)送到ttl.queue(該隊(duì)列沒有消費(fèi)者,消息一定會(huì)超時(shí))消息超時(shí)后變成了死信。交給死信交換機(jī)-隊(duì)列-消費(fèi)者 ,這樣就完成了延遲消息的功能。
代碼實(shí)現(xiàn):延遲消息功能
1、監(jiān)聽器:【定義一個(gè)新的消費(fèi)者(方法)】【并且聲明死信交換機(jī)、死信隊(duì)列、綁定關(guān)系】
位置:在consumer服務(wù)的SpringRabbitListener中
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
2、聲明交換機(jī)、隊(duì)列,綁定關(guān)系、為隊(duì)列指定TTL超時(shí)時(shí)間
新建一個(gè)配置類(便于管理),配置類記得添加@Configuration
位置:consumer/config/TTLMessageConfig類
要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置ttl屬性
@Configuration
public class TTLMessageConfig {
//聲明隊(duì)列ttl.queue,設(shè)置超時(shí)時(shí)間
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
.ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
.deadLetterExchange("dl.direct") // 隊(duì)列指定死信交換機(jī),即消息超時(shí)就投到這個(gè)交換機(jī)
.deadLetterRoutingKey("dl")//消息到死信交換機(jī)的RoutingKey
.build();
}
//正常的聲明交換機(jī)ttl.direct
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
//正常的綁定交換機(jī)和隊(duì)列,routingkey為ttl
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
3、發(fā)送消息時(shí),設(shè)定TTL
在發(fā)送消息時(shí),也可以指定TTL:
位置:publisher
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("發(fā)送消息成功");
}
測(cè)試結(jié)果:
當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信
三、延時(shí)隊(duì)列
因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。
這個(gè)延時(shí)插件需要自己安裝,下面文章有基于Linux系統(tǒng)的docekr方式安裝
備注:
RabbitMQ(二):RabbitMQ的安裝(Linux、基于docker安裝)及其插件安裝
DelayExchange原理:
DelayExchange插件的原理是對(duì)官方原生的Exchange做了功能的升級(jí):
-
將DelayExchange接受到的消息暫存在內(nèi)存中(官方的Exchange是無(wú)法存儲(chǔ)消息的)
-
在DelayExchange中計(jì)時(shí),超時(shí)后才投遞消息到隊(duì)列中
使用DelayExchange-控制臺(tái)方式
1、控制臺(tái)聲明延遲交換機(jī)
2、發(fā)送消息
使用DelayExchange-代碼方式
DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:
- 接收消息
- 判斷消息是否具備x-delay屬性
- 如果有x-delay屬性,說(shuō)明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時(shí)間
- 返回routing not found結(jié)果給消息發(fā)送者
- x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列
1)聲明DelayExchange交換機(jī)
聲明交換機(jī)為delayed類型
法一:基于@RabbitListener(推薦)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消費(fèi)者接收到了delay.queue的延遲消息");
}
法二:基于@Bean的方式:
2)publisher發(fā)送消息
向這個(gè)delay為true的交換機(jī)中發(fā)送消息時(shí),一定要給消息添加一個(gè)header:x-delay屬性,指定延遲的時(shí)間,單位為毫秒:
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// 2.準(zhǔn)備CorrelationData
//消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("發(fā)送消息成功");
}
測(cè)試:
運(yùn)行會(huì)出現(xiàn)如下錯(cuò)誤
錯(cuò)誤原因:因?yàn)闆]有給延遲交換機(jī)指定routingKey,所以路由失?。ㄒ矝]有消費(fèi)者)
解決方案:
因?yàn)槭窍⒊晒Πl(fā)送到交換機(jī),交換機(jī)發(fā)送到隊(duì)列失敗——此時(shí)會(huì)進(jìn)行return消息回退;那么,我們可以回退模式中添加判斷
位置:publisher服務(wù)的config/CommonConfig配置類下(ReturnCallback)
添加如下判斷
// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
//判斷延遲值非空且大于0==》是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示
return;
}
延遲隊(duì)列插件的使用步驟包括哪些?
?聲明一個(gè)交換機(jī),添加delayed屬性為true
?發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間
四、惰性隊(duì)列
消息堆積問題
【當(dāng)生產(chǎn)者發(fā)送消息的速度】超過了【消費(fèi)者處理消息的速度】,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。
解決消息堆積有三種思路:
- 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說(shuō)的work queue模式
- 在消費(fèi)者內(nèi)開啟線程池加快消息處理速度(限制:當(dāng)消息很多時(shí),需要開啟很多線程,線程越多,CPU需要進(jìn)行上下文切換——消耗性能;適用于消息處理時(shí)間較長(zhǎng)的情況,開多個(gè)線程并行處理多個(gè)業(yè)務(wù))
- 擴(kuò)大隊(duì)列容積,提高堆積上限
其中,要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。
惰性隊(duì)列的特征如下:
- 接收到消息后直接【存入磁盤】而非內(nèi)存
mq消息一般都是儲(chǔ)存在內(nèi)存——響應(yīng)速度快(優(yōu)點(diǎn)),但是,mq在內(nèi)存儲(chǔ)存設(shè)置了一個(gè)上限,mq設(shè)置內(nèi)存預(yù)警值,當(dāng)消息占了內(nèi)存的40%時(shí),mq會(huì)處于暫停的狀態(tài),阻止生產(chǎn)者投遞消息,將這部分消息刷出到磁盤,清理出一部分內(nèi)存空間出來(lái),導(dǎo)致mq會(huì)間歇性的出現(xiàn)暫停,導(dǎo)致mq的并發(fā)能力出現(xiàn)忽高忽低的性能不穩(wěn)定的情況
將消息存入磁盤就不會(huì)出現(xiàn)這個(gè)問題,但是磁盤的速度肯定沒有內(nèi)存的快——性能損耗 - 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存——同上,性能損耗
- 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
惰性隊(duì)列的如何創(chuàng)建
方式1、基于@Bean聲明lazy-queue
方式2、基于@RabbitListener聲明LazyQueue
方式3、通過命令行可以將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
使用Xshell,進(jìn)入mq容器中,執(zhí)行該指令
1、進(jìn)入容器
docker exec -it mq1
2、執(zhí)行命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一個(gè)策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表達(dá)式,匹配隊(duì)列的名字,(凡是符合該正則表達(dá)式規(guī)則的隊(duì)列,全部按照該策略設(shè)置) -
'{"queue-mode":"lazy"}'
:設(shè)置隊(duì)列模式為lazy模式 -
--apply-to queues
:策略的作用對(duì)象,是所有的隊(duì)列
執(zhí)行完該指令后,可以在Rabbitmq的管控臺(tái)出處查看策略
消息隊(duì)列專欄文章:
RabbitMQ(一)初識(shí)消息隊(duì)列(MQ)
RabbitMQ(二):RabbitMQ的安裝(Linux、基于docker安裝)及其插件安裝
RabbitMQ(三):RabbitMQ快速入門(SpringBoot)
RabbitMQ(四):RabbitMQ高級(jí)特性文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-804347.html
文章整理自:黑馬教學(xué)視頻文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-804347.html
到了這里,關(guān)于RabbitMQ(四):RabbitMQ高級(jí)特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!