MQ服務(wù)異步通信
MQ的一些常見問題
a.消息可靠性
消息可靠性問題
消息從發(fā)送,到消費者接收,會經(jīng)歷多個過程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
- 發(fā)送時丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費就宕機(jī)
1) 生產(chǎn)者消息確認(rèn)
生產(chǎn)者確認(rèn)機(jī)制
RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。結(jié)果有兩種請求:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊列。返回ACK,及路由失敗原因。
注意:
SpringAMQP實現(xiàn)生產(chǎn)者確認(rèn)
1.修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
說明:
-
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:-
simple
:同步等待confirm結(jié)果,直到超時 -
correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
-
-
publish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback -
template.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
2.每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目啟動過程中配置ApplicationContextAware:
package cn.itcast.mq.config;
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置returnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 記錄日志
log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},路由key:{},消息:{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要,重發(fā)消息或通知管理員
});
}
}
3.發(fā)送消息,指定消息ID、消息ConfirmCallback
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.準(zhǔn)備消息
String message = "hello, spring amqp!";
// 2.準(zhǔn)備correlationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.準(zhǔn)備ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判斷結(jié)果
if (result.isAck()){
// ACK
log.debug("消息成功投遞到交換機(jī)!消息ID:{}", correlationData.getId());
}else {
// NACK
log.error("消息投遞到交換機(jī)失??!消息ID:{}", correlationData.getId());
// 如果有需要,重發(fā)消息或通知管理員
}
}, ex -> {
// 記錄日志
log.error("消息發(fā)送失??!", ex);
// 如果有需要,重發(fā)消息或通知管理員
});
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("amq.topic", "simple.text", message, correlationData);
}
pringAMQP中處理消息確認(rèn)的幾種情況:
- publisher-comfirm:
- 消息成功發(fā)送到exchange,返回ack
- 消息發(fā)送失敗,沒有到達(dá)交換機(jī),返回nack
- 消息發(fā)送過程中出現(xiàn)異常,沒有收到回執(zhí)
- 消息成功發(fā)送到exchange,但沒有路由到queue,調(diào)用ReturnCallback
2) 消息持久化
MQ默認(rèn)是內(nèi)存存儲消息,開啟持久化功能可以確保緩存在MQ中的消息不丟失。8
1.交換機(jī)持久化:
@Bean
public DirectExchange simpleDirect(){
// 三個參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時是否自動刪除
return new DirectExchange("simple.direct", true, false);
}
2.隊列持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
3.消息持久化,SpringAMQP中的的消息默認(rèn)是持久的,可以通過MessageProperties中的DeliveryMode來指定的:
// 1.準(zhǔn)備消息
Message message = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
3) 消費者消息確認(rèn)
RabbitMQ支持消費者確認(rèn)機(jī)制,即:消費者處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息。
SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
- none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除
配置方式是修改application.yml文件,添加下面配置:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關(guān)閉ack
4) 消費者失敗重試
4.a) 本地重試
當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue(重新入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:
可以利用Spring的retry機(jī)制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
結(jié)論:
- 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會返回ack,消息會被丟棄
4.b) 失敗策略
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecoverer接口來處理,它包含三種不同的實現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
1.首先,定義接收失敗消息的交換機(jī)、隊列及其綁定關(guān)系:
2.定義RepublishMessageRecoverer的Bean:
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
如何確保RabbitMQ消息的可靠性?
- 1.開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊列
- 2.開啟持久化功能,確保消息未消費前在隊列中不會丟失
- 3.開啟消費者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 4.開啟消費者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
b.死信交換機(jī)
1) 初識死信交換機(jī)
當(dāng)一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個過期消息,超時無人消費
- 要投遞的隊列消息滿了,無法投遞
如果該隊列配置了dead-letter-exchange屬性,指定了一個交換機(jī),那么隊列中的死信就會投遞到這個交換機(jī)中,而這個交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)。
如圖,一個消息被消費者拒絕了,變成了死信:
因為simple.queue綁定了死信交換機(jī) dl.direct,因此死信會投遞給這個交換機(jī):
如果這個死信交換機(jī)也綁定了一個隊列,則消息最終會進(jìn)入這個存放死信的隊列:
另外,隊列將死信投遞給死信交換機(jī)時,必須知道兩個信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊列綁定的RoutingKey
這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊列。
2) TTL
TTL,也就是Time-To-Live。如果一個隊列中的消息TTL結(jié)束仍未消費,則會變?yōu)樗佬牛瑃tl超時分為兩種情況:
- 消息所在的隊列設(shè)置了存活時間
- 消息本身設(shè)置了存活時間
1.在消費者Listener中,聲明一組死信交換機(jī)和隊列,基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("消費者接收到了dl.queue的延遲消息");
}
2.在config中,要給隊列設(shè)置超時時間,需要在聲明隊列時配置x-message-ttl屬性:
package cn.itcast.mq.config;
@Configuration
public class TTLMessageConfig {
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000) // 設(shè)置隊列的超時時間,10秒
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.deadLetterRoutingKey("dl") // 指定死信RoutingKey
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
}
}
3.發(fā)送消息時,給消息本身設(shè)置超時時間
@Test
public void testTTLMessage(){
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000") // 設(shè)置超時時間5秒
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct","ttl", message);
// 3.記錄日志
log.info("消息已經(jīng)成功發(fā)送");
}
消息超時的兩種方式是?
- 1.給隊列設(shè)置ttl屬性,進(jìn)入隊列后超過ttl時間的消息變?yōu)樗佬?/strong>
- 2.給消息設(shè)置ttl屬性,隊列接收到消息超過ttl時間后變?yōu)樗佬?/strong>
- 兩者共存時,以時間短的ttl為準(zhǔn)
3) 延遲隊列
利用TTL結(jié)合死信交換機(jī),我們實現(xiàn)了消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就稱為**延遲隊列(Delay Queue)**模式。
延遲隊列的使用場景包括:
- 延遲發(fā)送短信
- 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動取消
- 預(yù)約工作會議,20分鐘后自動通知所有參會人員
a) 安裝延遲隊列插件
因為延遲隊列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊列效果。
拉取MQ
docker pull rabbitmq:3.8-management
安裝MQ
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
之前設(shè)定的RabbitMQ的數(shù)據(jù)卷名稱為mq-plugins
,使用下面命令查看數(shù)據(jù)卷:
docker volume inspect mq-plugins
可以得到下面結(jié)果:
可以去對應(yīng)的GitHub頁面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9這個對應(yīng)RabbitMQ的3.8.5以上版本。
接下來,將rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
插件上傳到這個目錄即可:
最后就是安裝了,需要進(jìn)入MQ容器內(nèi)部來執(zhí)行安裝。我的容器名為mq
,所以執(zhí)行下面命令:
docker exec -it mq bash
執(zhí)行時,請將其中的 -it
后面的mq
替換為你自己的容器名.
進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
結(jié)果如下:
b) SpringAMQP使用延遲隊列插件
DelayExchange的本質(zhì)還是官方的三種交換機(jī),只是添加了延遲功能。因此使用時只需要聲明一個交換機(jī),交換機(jī)的類型可以是任意類型,然后設(shè)定delayed屬性為true即可。
基于注解方式:
基于java代碼的方式:
向這個delay為true的交換機(jī)中發(fā)送消息,一定要給消息添加一個header:x-delay,值為延遲的時間,單位為毫秒:
c.惰性隊列
1) 消息堆積問題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費者處理消息的速度,就會導(dǎo)致隊列中的消息堆積,直到隊列存儲消息達(dá)到上限。最早接收到的消息,可能就會成為死信,會被丟棄,這就是消息堆積問題。
解決消息堆積有三種種思路:
- 增加更多消費者,提高消費速度
- 在消費者內(nèi)開啟線程池加快消息處理速度
- 擴(kuò)大隊列容積,提高堆積上限
2) 惰性隊列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊列。
惰性隊列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲
要設(shè)置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可??梢酝ㄟ^命令行將一個運行中的隊列修改為惰性隊列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一個策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表達(dá)式匹配隊列的名字 -
'{"queue-mode":"lazy"}'
:設(shè)置隊列模式為lazy模式 -
--apply-to queues
:策略的作用對象,是所有的隊列
用SpringAMQP聲明惰性隊列分兩種方式:
1.@Bean的方式
2.注解方式:
消息堆積問題的解決方案?
- 1.隊列上綁定多個消費者,提高消費速度
- 2.給消費者開啟線程池,提高消費速度
- 3.使用惰性隊列,可以再mq中保存更多消息
惰性隊列的優(yōu)點有哪些?文章來源:http://www.zghlxwxcb.cn/news/detail-671384.html
- 1.基于磁盤存儲,消息上限高
- 2.沒有間歇性的page-out,性能比較穩(wěn)定
惰性隊列的缺點有哪些?文章來源地址http://www.zghlxwxcb.cn/news/detail-671384.html
- 1.基于磁盤存儲,消息時效性會降低
- 2.性能受限于磁盤的IO
到了這里,關(guān)于微服務(wù)中間件--MQ服務(wù)異步通信的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!