消息隊(duì)列是現(xiàn)代分布式應(yīng)用中的關(guān)鍵組件,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)組件以及處理高并發(fā)請求。消息隊(duì)列可以用于各種應(yīng)用場景,包括任務(wù)調(diào)度、事件通知、日志處理等。在消息隊(duì)列的應(yīng)用中,有時需要實(shí)現(xiàn)消息的延遲處理、處理未能成功消費(fèi)的消息等功能。
本文將介紹一些與消息隊(duì)列相關(guān)的關(guān)鍵概念和技術(shù),包括死信交換機(jī)(Dead Letter Exchange)、消息的 TTL(Time To Live,生存時間)、以及使用 DelayExchange 插件實(shí)現(xiàn)消息的延遲處理。通過深入理解這些概念和技術(shù),將能幫助我們更好地設(shè)計(jì)和構(gòu)建具有高可用性和可靠性的消息隊(duì)列系統(tǒng)。
首先,我將介紹死信交換機(jī)以及它的作用,然后討論如何創(chuàng)建死信交換機(jī)和死信隊(duì)列。隨后,將深入研究消息的TTL,了解它的作用和如何配置。最后,將探討如何使用 DelayExchange 插件來實(shí)現(xiàn)消息的延遲處理,以滿足各種應(yīng)用需求。
一、死信交換機(jī)
1.1 什么是死信和死信交換機(jī)
在了解什么是死信交換機(jī)之前,讓我們首先來了解一下什么是死信。 在消息隊(duì)列系統(tǒng)中,死信(Dead Letter)是指未能被成功消費(fèi)的消息。這些消息通常由于多種原因而變?yōu)樗佬?,一些主要的原因如下?/p>
-
消費(fèi)失?。?/strong> 當(dāng)消息被消費(fèi)者(consumer)拒絕(
reject
)或未能被確認(rèn)(acknowledge
),并且針對與處理失敗的消息沒有設(shè)置重新入隊(duì)(requeue
)參數(shù)時,它們可能成為死信。這可能是因?yàn)橄⒏袷藉e誤、業(yè)務(wù)處理失敗、或者其他原因?qū)е孪M(fèi)者無法處理消息。 -
消息超時: 消息在隊(duì)列中等待消費(fèi),但在一定時間內(nèi)未被消費(fèi)者處理。這個時間限制通常由消息的 TTL(Time To Live,生存時間)來定義。當(dāng)消息超過其 TTL 后,它就變?yōu)樗佬拧?/p>
-
隊(duì)列堆積滿: 當(dāng)消息隊(duì)列積累了大量消息,無法容納更多消息時,最早的消息可能成為死信,因?yàn)樗鼈儫o法被及時處理。
因此為了處理這些死信消息,消息隊(duì)列系統(tǒng)引入了 死信交換機(jī)(Dead Letter Exchange)。死信交換機(jī)是一個特殊的交換機(jī),它接收死信消息,并根據(jù)規(guī)則將這些消息路由到死信隊(duì)列。通過使用死信交換機(jī),系統(tǒng)可以將死信消息從正常隊(duì)列中分離出來,以便進(jìn)一步處理或分析。
死信交換機(jī)通常與隊(duì)列綁定,當(dāng)隊(duì)列中的消息變?yōu)樗佬艜r,它們會被發(fā)送到與之相關(guān)聯(lián)的死信交換機(jī),然后再路由到死信隊(duì)列。這種機(jī)制使得系統(tǒng)能夠更好地處理消息的異常情況,確保消息不會被永久丟失。
給隊(duì)列綁定死信交換機(jī)的方法:
- 給隊(duì)列設(shè)置
dead-letter-exchange
屬性,指定一個交換機(jī); - 給隊(duì)列設(shè)置
dead-letter-routing-key
屬性,設(shè)置死信交換機(jī)與死信隊(duì)列的RoutingKey
。
如下圖所示:
在上圖中,simple.queue
就與死信交換機(jī) dl.direct
綁定,最后路由到死信隊(duì)列dl.queue
,后續(xù)就可以編寫其他邏輯來處理死信隊(duì)列中的消息。
死信和死信交換機(jī)是構(gòu)建可靠消息處理系統(tǒng)的重要組成部分,它們能夠幫助我們跟蹤和處理未能成功消費(fèi)的消息,確保數(shù)據(jù)不會遺失,同時提供更好的可用性和可維護(hù)性。
1.2 死信交換機(jī)和死信隊(duì)列的創(chuàng)建方式
- 使用
@Bean
的方式創(chuàng)建:
// 聲明普通的 simple.queue 隊(duì)列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
- 使用
@RabbitListener
注解的方式創(chuàng)建:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDLQueue(String msg) {
log.info("消費(fèi)者接收到 dl.queue 的延遲消息:" + msg);
}
在這種情況下,注意需要在創(chuàng)建 simple.queue
是時候,綁定死信交換機(jī)。
二、消息的 TTL
2.1 什么是消息的 TTL
消息的TTL,全稱為"Time To Live",是消息隊(duì)列系統(tǒng)中的一個重要概念。它定義了消息在隊(duì)列中存活的時間,也就是消息在被發(fā)送到隊(duì)列后,允許存留在隊(duì)列中的時間長度。一旦消息的TTL超過設(shè)定的時間,消息將被認(rèn)為已過期,消息隊(duì)列系統(tǒng)將會將其標(biāo)記為死信(Dead Letter)并將其路由到相關(guān)的死信隊(duì)列。
在消息隊(duì)列中,消息的超時分為兩種情況:
-
消息所在的隊(duì)列設(shè)置了儲存消息的超時時間;
-
消息本身設(shè)置了超時時間;
但是不管哪種情況,一定消息超時了,都會成為死信,如下圖所示:
對上圖的簡單解釋:
- 上圖中,設(shè)置了
ttl.queue
的超時時間為 10000 毫秒,意味著一個消息在該隊(duì)列中儲存的時間不會超過這么長的時間; - 另外,也可以在發(fā)送消息的時候給這個消息設(shè)置在隊(duì)列中的超時時間,例如 5000 毫秒。
- 無論是哪種情況,一旦消息超時了,都會發(fā)送到死信交換機(jī),然后再路由死信隊(duì)列,最后由處理死信的邏輯處理這些消息。
2.2 基于死信交換機(jī)和 TTL 實(shí)現(xiàn)消息的延遲
根據(jù)上面的死信交換機(jī)和 TTL 的特點(diǎn),我們可以實(shí)現(xiàn)延遲處理消息的功能,TTL 和 死信的交換機(jī)及其隊(duì)列的結(jié)構(gòu)圖示如下:
下面就使用 Spring AMQP 來聲明和實(shí)現(xiàn)這些交換機(jī)和隊(duì)列:
-
首先通過
@RabbitListener
注解聲明一組死信交換機(jī)和死信隊(duì)列,并指定處理死信的邏輯:@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue", durable = "true"), exchange = @Exchange(name = "dl.direct"), key = "dl" )) public void listenDLQueue(String msg) { log.info("消費(fèi)者接收到 dl.queue 的延遲消息:" + msg); }
-
然后通過
@Bean
的方式聲明一組 TTL 的交換機(jī)和隊(duì)列/** * 聲明 TTL 交換機(jī) */ @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct", true, false); } /** * 聲明 TTL 隊(duì)列 * 1. 指定消息的 TTL * 2. 指定死信交換機(jī) * 3. 指定死信交換機(jī)的 RoutingKey */ @Bean public Queue ttlQueue() { return QueueBuilder .durable("ttl.queue") // 指定隊(duì)列的名稱 .ttl(10_000) // 指定 TTL 為 10 秒 .deadLetterExchange("dl.direct") // 指定死信交換機(jī) .deadLetterRoutingKey("dl") // 指定死信交換機(jī)的 RoutingKey .build(); } /** * 綁定 TTL 交換機(jī)和隊(duì)列 */ @Bean public Binding ttlBinding() { return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); }
-
最后,在
publisher
中編寫發(fā)送消息的邏輯@Test public void testTTLMessage() { // 1. 創(chuàng)建消息 Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2. 創(chuàng)建消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3. 發(fā)送消息 rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData); log.info("發(fā)送延遲消息成功!消息ID: {}", correlationData.getId()); }
- 驗(yàn)證延遲消息
在創(chuàng)建ttl.queue
的時候,指定了消息在隊(duì)列中的 TTL 不超過 10 秒,因此預(yù)測當(dāng)發(fā)送消息 10s 后,才會被消費(fèi)者接收:
首先啟動 consumer
,并清除控制臺日志,然后再發(fā)送消息:
通過對比控制臺日志的時間,可以發(fā)現(xiàn)成功將消息延遲了 10 秒。
另外,也可以在發(fā)送消息時設(shè)置超時時間,可以通過 MessageBuilder
中的 setExpiration
設(shè)置消息的超時時間,這里設(shè)置為 5 秒:再次發(fā)送消息,并對比觀察控制臺日志的輸出時間:
可以發(fā)現(xiàn),此時消息延遲了 5 秒,通過上面的對比演示可以得出結(jié)論:那就是在同時指定了消息的過期時間以及隊(duì)列的超時時間,將會以短的那個時間為準(zhǔn)。
三、基于 DelayExchang 插件實(shí)現(xiàn)延遲隊(duì)列
3.1 安裝 DelayExchang 插件
- 下載插件
RabbitMQ 有一個官方的插件社區(qū),地址為:https://www.rabbitmq.com/community-plugins.html。其中包含各種各樣的插件,包括我們要使用的 DelayExchange 插件:
這里我選擇的是 3.8.9 的版本:
- 上傳插件
這里我的 RabbitMQ 是基于 Docker 安裝的,因此需要先查看 RabbitMQ 的插件目錄對應(yīng)的數(shù)據(jù)卷:
然后,直接進(jìn)入數(shù)據(jù)卷掛載點(diǎn)目錄:可以發(fā)現(xiàn)這個目錄下其實(shí)以及有很多的插件的了,然后上傳剛才下載的插件到這個目錄:
- 安裝插件
最后就是安裝了,安裝時需要進(jìn)入 MQ 容器內(nèi)部來執(zhí)行安裝。我的容器名為mq
,所以執(zhí)行下面命令:
docker exec -it mq bash
然后執(zhí)行安裝的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后出現(xiàn)下面的日志,就說明安裝 DelayExchang 插件成功了:
3.2 DelayExchang 實(shí)現(xiàn)消息延遲的原理
DelayExchange 是一個用于實(shí)現(xiàn)消息延遲發(fā)送的插件,可以在消息隊(duì)列系統(tǒng)中非常有用。其工作原理如下:
-
創(chuàng)建 DelayExchange:首先,需要創(chuàng)建一個 DelayExchange,這是一個特殊的交換機(jī),用于處理延遲消息。通常,可以使用消息隊(duì)列系統(tǒng)的管理工具或API(如 Spring AMQP 的API)來聲明和配置 DelayExchange。
-
發(fā)送消息到 DelayExchange:當(dāng)需要發(fā)送一個延遲消息時,將消息發(fā)送到 DelayExchange,而不是直接發(fā)送到目標(biāo)隊(duì)列。在發(fā)送消息時,需要為消息設(shè)置一個屬性,通常稱為
x-delay
,它表示消息的延遲時間。這個屬性的值通常以毫秒為單位,定義了消息應(yīng)該延遲多長時間才會被投遞。 -
DelayExchange 檢查 x-delay 屬性:當(dāng)消息到達(dá)DelayExchange時,它會檢查消息的
x-delay
屬性。如果該屬性存在,說明這是一個延遲消息。DelayExchange會將消息持久化到硬盤,并記錄x-delay
的值作為延遲時間。 -
返回 Routing Not Found:DelayExchange 會向消息的發(fā)送者返回 “Routing Not Found” 的響應(yīng),意味著消息當(dāng)前沒有目標(biāo)隊(duì)列可以接收。這是因?yàn)橄⒉粫⒓幢煌哆f,而是需要等待一定的延遲時間。因此如果設(shè)置了生產(chǎn)者消息確認(rèn)的
publisher-return
的ReturnCallback
,就需要進(jìn)行額外的處理以避免錯誤的提示。 -
延遲時間到期:經(jīng)過預(yù)定的延遲時間后,DelayExchange 會重新檢查已存儲的消息,查看是否有消息已經(jīng)到達(dá)或超過了其設(shè)定的延遲時間。
-
重新投遞消息:一旦消息的延遲時間到期,DelayExchange將重新投遞消息到指定的目標(biāo)隊(duì)列,允許消費(fèi)者最終接收和處理消息。
通過 DelayExchange 的這一機(jī)制,可以實(shí)現(xiàn)消息的延遲發(fā)送,非常適合需要進(jìn)行任務(wù)調(diào)度、處理延遲任務(wù)或者在時間敏感任務(wù)的應(yīng)用中使用。它有助于減輕系統(tǒng)負(fù)載,提高消息傳遞的可靠性,以及更好地滿足特定的應(yīng)用需求。
3.3 使用 DelayExchang 實(shí)現(xiàn)消息的延遲
-
首先,使用
@RabbitListener
注解聲明一組延遲交換機(jī)和隊(duì)列,以及延遲消息的處理邏輯。@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayExchange(String msg) { log.info("消費(fèi)者接收到了 delay.queue 的消息:" + msg); }
這里使用 @RabbitListener
注解聲明交換機(jī)和隊(duì)列和前面的操作基本一致,唯一的區(qū)別在于聲明交換機(jī)的時候,額外設(shè)置了一個 delayed
參數(shù),表明聲明的是一個延遲交換機(jī)。
-
在
publisher
中發(fā)送延遲消息@Test public void testDelayMessage() { // 1. 創(chuàng)建消息 Message message = MessageBuilder.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay", 5000) // 添加 x-delay 頭信息 .build(); // 2. 創(chuàng)建消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3. 發(fā)送消息 rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData); log.info("通過延遲交換機(jī)發(fā)送延遲消息成功!消息ID: {}", correlationData.getId()); }
同樣,此處發(fā)送消息的邏輯也和前面基本一致,只是在 MessageBuilder
中使用 setHeader
額外設(shè)置了一個x-delay
的頭信息,表明了該消息是延遲消息,同時也指定了消息的超時時間。
- 驗(yàn)證延遲消息
同樣的,首先啟動 consumer
,清除控制臺日志,然后向延遲交換機(jī)發(fā)送消息:
通過日志可以看到,成功發(fā)送了延遲消息,但是卻出現(xiàn)了錯誤的日志信息,告訴我們是delay.direct
交換機(jī)沒有成功將消息路由到 delay.queue
中,但是通過 consumer
的控制臺在延遲 5 秒后發(fā)現(xiàn)成功接收并處理了這個消息:
出現(xiàn)上面錯誤日志的原則在上文的 DelayExchang 實(shí)現(xiàn)消息延遲的原理中的第 4 點(diǎn)已經(jīng)提到了,使用 DelayExchang 實(shí)現(xiàn)消息的延遲,是會在達(dá)到了設(shè)置延遲時間,再將消息發(fā)送給隊(duì)列的。但是,由于交換機(jī)在收到消息的時候,沒有立即路由給隊(duì)列,在返回確認(rèn)消息給生產(chǎn)者的就是“Routing Not Found”,因此就會使得生產(chǎn)者誤以為路由失敗了。
另外,在上面的錯誤日志中,可以發(fā)現(xiàn)有一個 receivedDelay
參數(shù)的值是 5000,也就是延遲的時間,我們可以根據(jù)這個參數(shù),在 RetuenCallback
中排除發(fā)送延遲消息時產(chǎn)生的的錯誤提示:
然后,再次發(fā)送延遲消息到延遲交換機(jī),就不會出現(xiàn)上面的錯誤提示了:
文章來源:http://www.zghlxwxcb.cn/news/detail-744869.html
至此,我們便成功使用 DelayExchang 實(shí)現(xiàn)了發(fā)送延遲消息的功能??梢园l(fā)現(xiàn),使用 DelayExchang 插件實(shí)現(xiàn)延遲消息比前面使用死信交換機(jī)和 TTL 來實(shí)現(xiàn)延遲消息更加的簡單。文章來源地址http://www.zghlxwxcb.cn/news/detail-744869.html
到了這里,關(guān)于【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!