1、什么是死信交換機
首先我們要知道什么是死信?
當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數設置為false。
- 消息是一個過期消息,超時無人消費。
- 要投遞的隊列消息堆積滿了,最早的消息可能成為死信。
一般呢?一旦消息變成死信是會被我們丟棄的,但是有了死信交換機就不一樣了。
如果這個包含死信的隊列配置了dead-letter-exchange屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)。
其實呢,所謂的死信交換機就是一個普通交換機,只不過是某個隊列用dead-letter-exchange這個屬性綁定到一起了,當這個隊列出現了死信,就會丟到我們這個死信交換機里了,就有點像垃圾桶一樣的了。
如圖,一個消息被消費者拒絕了,變成了死信:
因為simple.queue綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:
如果這個死信交換機也綁定了一個隊列,則消息最終會進入這個存放死信的隊列:
另外,隊列將死信投遞給死信交換機時,必須知道兩個信息:
- 死信交換機名稱
- 死信交換機與死信隊列綁定的RoutingKey
這樣才能確保投遞的消息能到達死信交換機,并且正確的路由到死信隊列。
小結:
什么樣的消息會成為死信?
- 消息被消費者reject或者返回nack。
- 消息超時未消費。
- 隊列滿了。
2、TTL
TTL,也就是Time-To-Live。如果一個隊列中的消息TTL結束仍未消費,則會變?yōu)樗佬?,TTL超時分為兩種情況:
- 消息所在的隊列設置了超時時間
- 消息本身設置了超時時間
2.1 Demo
1、準備接收超時死信的死信交換機
在consumer服務的SpringRabbitListener中,定義一個新的消費者,并且聲明 死信交換機、死信隊列:
/**
? * @description:注解方式聲明死信交換機、死信隊列
? * @author: jie
? * @time: 2022/3/5 10:30
?*/
@RabbitListener(bindings = @QueueBinding(
? ? ? ? ? ?//隊列,持久化為true
? ? ? ? ? ?value = @Queue(name = "dl.ttl.queue", durable = "true"),
? ? ? ? ? ?//交換機
? ? ? ? ? ?exchange = @Exchange(name = "dl.ttl.direct"),
? ? ? ? ? ?//Routing Key
? ? ? ? ? ?key = "ttl"
? ))
public void listenDlQueue(String msg){
? ? ? ?log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
復制代碼
2、聲明一個隊列,并且指定TTL
package com.jie.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLMessageConfig {
? ?/**
? ? * @description:交換機
? ? * @author: jie
? ? * @time: 2022/3/5 10:36
? ? */
? ?@Bean
? ?public DirectExchange ttlDirectExchange(){
? ? ? ?return new DirectExchange("ttl.direct");
? }
? ?/**
? ? * @description:隊列
? ? * @author: jie
? ? * @time: 2022/3/5 10:38
? ? */
? ?@Bean
? ?public Queue ttlQueue(){
? ? ? ?return QueueBuilder
? ? ? ? ? ? ? ?//指定隊列名稱,并持久化
? ? ? ? ? ? ? .durable("ttl.queue")
? ? ? ? ? ? ? ?//設置隊列的超時時間,10秒
? ? ? ? ? ? ? .ttl(10000)
? ? ? ? ? ? ? ?//指定死信交換機
? ? ? ? ? ? ? .deadLetterExchange("dl.ttl.direct")
? ? ? ? ? ? ? ?//設置RoutingKey
? ? ? ? ? ? ? .deadLetterRoutingKey("dl")
? ? ? ? ? ? ? .build();
? }
? ?/**
? ? * @description:將隊列和交換機綁定
? ? * @author: jie
? ? * @time: 2022/3/5 10:41
? ? */
? ?@Bean
? ?public Binding ttlBinding(){
? ? ? ?return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
? }
}
復制代碼
3、發(fā)送消息
3.1 不指定TTL
@Test
public void testTTLQueue() {
? ? ? ?// 創(chuàng)建消息
? ? ? ?String message = "hello, ttl queue";
? ? ? ?// 消息ID,需要封裝到CorrelationData中
? ? ? ?CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
? ? ? ?// 發(fā)送消息
? ? ? ?rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
? ? ? ?// 記錄日志
? ? ? ?log.error("發(fā)送消息成功");
? }
復制代碼
消息發(fā)送時間:
接受消息的時間:
因為隊列的TTL值是10000ms,也就是10秒。可以看到消息發(fā)送與接收之間的時差剛好是10秒。
我們這個是基于隊列去設置延遲時間,我們給隊列設置了10秒鐘,我們也可以給消息設置延遲。
3.2 指定TTL
public void testTTLMessage() {
? ? ? ?//準備消息
? ? ? ?Message message = MessageBuilder
? ? ? ? ? ? ? .withBody("hell,TTL".getBytes(StandardCharsets.UTF_8))
? ? ? ? ? ? ? .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
? ? ? ? ? ? ? ?//設置延遲時間
? ? ? ? ? ? ? .setExpiration("5000")
? ? ? ? ? ? ? .build();
? ? ? ?// 2.發(fā)送消息
? ? ? ?rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
? ? ? ?//3、記錄日志
? ? ? ?log.info("消息已經成功發(fā)送!");
? }
復制代碼
這里呢?我們的隊列是10秒,而消息是5秒,到底是哪個優(yōu)先呢?還是15秒呢?
消息發(fā)送時間:
消息接受時間:
這里我們可以看出,當兩個都有延遲的時候,它會以較短的時間為準。
4、小結
消息超時的兩種方式是?
- 給隊列設置ttl屬性,進入隊列后超過ttl時間的消息變?yōu)樗佬?/li>
- 給消息設置ttl屬性,隊列接收到消息超過ttl時間后變?yōu)樗佬?/li>
如何實現發(fā)送一個消息20秒后消費者才收到消息?
- 給消息的目標隊列指定死信交換機
- 將消費者監(jiān)聽的隊列綁定到死信交換機
- 發(fā)送消息時給消息設置超時時間為20秒
3、延遲隊列
概念:
利用TTL結合死信交換機,我們實現了消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就稱為延遲隊列(Delay Queue)模式。
延遲隊列的使用場景包括:1、延遲發(fā)送短信。
2、用戶下單,如果用戶在15 分鐘內未支付,則自動取消。
3、預約工作會議,20分鐘后自動通知所有參會人員。
因為延遲隊列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊列效果。
這個插件就是DelayExchange插件。參考RabbitMQ的插件列表頁面:Community Plugins — RabbitMQ
使用方式可以參考官網地址:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
3.1 安裝DelayExchange插件
官方的安裝指南地址為:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
上述文檔是基于linux原生安裝RabbitMQ,然后安裝插件。
因為我是基于Docker安裝RabbitMQ,所以下面我會講解基于Docker來安裝RabbitMQ插件。
RabbitMQ有一個官方的插件社區(qū),地址為:Community Plugins — RabbitMQ
其中包含各種各樣的插件,包括我們要使用的DelayExchange插件:
下載好后,就會獲得一個ez文件。
1、上傳插件
因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的數據卷。
我之前設定的RabbitMQ的數據卷名稱為mq-plugins
,所以我使用下面命令查看數據卷:
docker volume inspect mq-plugins
復制代碼
可以得到下面結果:
接下來,將插件上傳到這個目錄即可:
2、安裝插件
最后就是安裝了,需要進入MQ容器內部來執(zhí)行安裝。我的容器名為mq
,所以執(zhí)行下面命令:
docker exec -it mq bash
復制代碼
執(zhí)行時,請將其中的 -it
后面的mq
替換為你自己的容器名.
進入容器內部后,執(zhí)行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
復制代碼
3.2 DelayExchange原理
DelayExchange的本質還是官方的三種交換機,只是添加了延遲功能。因此使用時只需要聲明一個交換機,交換機的類型可以是任意類型,然后設定delayed屬性為true即可。
- 接收消息。
- 判斷消息是否具備x-delay屬性。
- 如果有x-delay屬性,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時間。
- 返回routing not found結果給消息發(fā)送者。
- x-delay時間到期后,重新投遞消息到指定隊列。
3.3 使用DelayExchange
1、基于注解的方式(推薦)
2、基于Bean的方式
3.4 發(fā)送消息
發(fā)送消息時,一定要攜帶x-delay屬性,指定延遲的時間:
發(fā)送消息時間:
接受消息時間:
相差五秒,說明是有用的。
3.5 小結
延遲隊列插件的使用步驟包括哪些?
?聲明一個交換機,添加delayed屬性為true文章來源:http://www.zghlxwxcb.cn/news/detail-634846.html
?發(fā)送消息時,添加x-delay頭,值為超時時間文章來源地址http://www.zghlxwxcb.cn/news/detail-634846.html
到了這里,關于如何解決RabbitMQ中的延遲消息問題的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!