国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

如何解決RabbitMQ中的延遲消息問題

這篇具有很好參考價值的文章主要介紹了如何解決RabbitMQ中的延遲消息問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1、什么是死信交換機

首先我們要知道什么是死信?

當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):

  1. 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數設置為false。
  2. 消息是一個過期消息,超時無人消費。
  3. 要投遞的隊列消息堆積滿了,最早的消息可能成為死信。

一般呢?一旦消息變成死信是會被我們丟棄的,但是有了死信交換機就不一樣了。

如果這個包含死信的隊列配置了dead-letter-exchange屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)。

其實呢,所謂的死信交換機就是一個普通交換機,只不過是某個隊列用dead-letter-exchange這個屬性綁定到一起了,當這個隊列出現了死信,就會丟到我們這個死信交換機里了,就有點像垃圾桶一樣的了。

如圖,一個消息被消費者拒絕了,變成了死信:

rabbitmq 消息超時,java,分布式,rabbitmq

因為simple.queue綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:

rabbitmq 消息超時,java,分布式,rabbitmq

如果這個死信交換機也綁定了一個隊列,則消息最終會進入這個存放死信的隊列:

rabbitmq 消息超時,java,分布式,rabbitmq

另外,隊列將死信投遞給死信交換機時,必須知道兩個信息:

  • 死信交換機名稱
  • 死信交換機與死信隊列綁定的RoutingKey

這樣才能確保投遞的消息能到達死信交換機,并且正確的路由到死信隊列。

小結:

什么樣的消息會成為死信?

  1. 消息被消費者reject或者返回nack。
  2. 消息超時未消費。
  3. 隊列滿了。

2、TTL

TTL,也就是Time-To-Live。如果一個隊列中的消息TTL結束仍未消費,則會變?yōu)樗佬?,TTL超時分為兩種情況:

  • 消息所在的隊列設置了超時時間
  • 消息本身設置了超時時間

rabbitmq 消息超時,java,分布式,rabbitmq

2.1 Demo

1、準備接收超時死信的死信交換機

在consumer服務的SpringRabbitListener中,定義一個新的消費者,并且聲明 死信交換機、死信隊列:

 /**
 ? * @description:注解方式聲明死信交換機、死信隊列
 ? * @author: jie 
 ? * @time: 2022/3/5 10:30
 ?*/
@RabbitListener(bindings = @QueueBinding(
 ? ? ? ? ? ?//隊列,持久化為true
 ? ? ? ? ? ?value = @Queue(name = "dl.ttl.queue", durable = "true"),
 ? ? ? ? ? ?//交換機
 ? ? ? ? ? ?exchange = @Exchange(name = "dl.ttl.direct"),
 ? ? ? ? ? ?//Routing Key
 ? ? ? ? ? ?key = "ttl"
 ?  ))
public void listenDlQueue(String msg){
 ? ? ? ?log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
復制代碼

2、聲明一個隊列,并且指定TTL

package com.jie.mq.config;
 
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class TTLMessageConfig  {
 ? ?/**
 ? ? * @description:交換機
 ? ? * @author: jie
 ? ? * @time: 2022/3/5 10:36
 ? ? */
 ? ?@Bean
 ? ?public DirectExchange ttlDirectExchange(){
 ? ? ? ?return new DirectExchange("ttl.direct");
 ?  }
 ? ?/**
 ? ? * @description:隊列
 ? ? * @author: jie
 ? ? * @time: 2022/3/5 10:38
 ? ? */
 ? ?@Bean
 ? ?public Queue ttlQueue(){
 ? ? ? ?return QueueBuilder
 ? ? ? ? ? ? ? ?//指定隊列名稱,并持久化
 ? ? ? ? ? ? ?  .durable("ttl.queue")
 ? ? ? ? ? ? ? ?//設置隊列的超時時間,10秒
 ? ? ? ? ? ? ?  .ttl(10000)
 ? ? ? ? ? ? ? ?//指定死信交換機
 ? ? ? ? ? ? ?  .deadLetterExchange("dl.ttl.direct")
 ? ? ? ? ? ? ? ?//設置RoutingKey
 ? ? ? ? ? ? ?  .deadLetterRoutingKey("dl")
 ? ? ? ? ? ? ?  .build();
 ?  }
 ? ?/**
 ? ? * @description:將隊列和交換機綁定
 ? ? * @author: jie
 ? ? * @time: 2022/3/5 10:41
 ? ? */
 ? ?@Bean
 ? ?public Binding ttlBinding(){
 ? ? ? ?return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
 ?  }
}
復制代碼

3、發(fā)送消息

3.1 不指定TTL

@Test
public void testTTLQueue() {
 ? ? ? ?// 創(chuàng)建消息
 ? ? ? ?String message = "hello, ttl queue";
 ? ? ? ?// 消息ID,需要封裝到CorrelationData中
 ? ? ? ?CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 ? ? ? ?// 發(fā)送消息
 ? ? ? ?rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
 ? ? ? ?// 記錄日志
 ? ? ? ?log.error("發(fā)送消息成功");
 ?  }
復制代碼

消息發(fā)送時間:

rabbitmq 消息超時,java,分布式,rabbitmq

接受消息的時間:

rabbitmq 消息超時,java,分布式,rabbitmq

因為隊列的TTL值是10000ms,也就是10秒。可以看到消息發(fā)送與接收之間的時差剛好是10秒。

我們這個是基于隊列去設置延遲時間,我們給隊列設置了10秒鐘,我們也可以給消息設置延遲。

3.2 指定TTL

public void testTTLMessage() {
 ? ? ? ?//準備消息
 ? ? ? ?Message message = MessageBuilder
 ? ? ? ? ? ? ?  .withBody("hell,TTL".getBytes(StandardCharsets.UTF_8))
 ? ? ? ? ? ? ?  .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
 ? ? ? ? ? ? ? ?//設置延遲時間
 ? ? ? ? ? ? ?  .setExpiration("5000")
 ? ? ? ? ? ? ?  .build();
 ? ? ? ?// 2.發(fā)送消息
 ? ? ? ?rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
 ? ? ? ?//3、記錄日志
 ? ? ? ?log.info("消息已經成功發(fā)送!");
 ?  }
復制代碼

這里呢?我們的隊列是10秒,而消息是5秒,到底是哪個優(yōu)先呢?還是15秒呢?

消息發(fā)送時間:

rabbitmq 消息超時,java,分布式,rabbitmq

消息接受時間:

rabbitmq 消息超時,java,分布式,rabbitmq

這里我們可以看出,當兩個都有延遲的時候,它會以較短的時間為準。

4、小結

消息超時的兩種方式是?

  1. 給隊列設置ttl屬性,進入隊列后超過ttl時間的消息變?yōu)樗佬?/li>
  2. 給消息設置ttl屬性,隊列接收到消息超過ttl時間后變?yōu)樗佬?/li>

如何實現發(fā)送一個消息20秒后消費者才收到消息?

  • 給消息的目標隊列指定死信交換機
  • 將消費者監(jiān)聽的隊列綁定到死信交換機
  • 發(fā)送消息時給消息設置超時時間為20秒

3、延遲隊列

概念:

利用TTL結合死信交換機,我們實現了消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就稱為延遲隊列(Delay Queue)模式。

延遲隊列的使用場景包括:1、延遲發(fā)送短信。

2、用戶下單,如果用戶在15 分鐘內未支付,則自動取消。

3、預約工作會議,20分鐘后自動通知所有參會人員。

因為延遲隊列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊列效果。

這個插件就是DelayExchange插件。參考RabbitMQ的插件列表頁面:Community Plugins — RabbitMQ

使用方式可以參考官網地址:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

3.1 安裝DelayExchange插件

官方的安裝指南地址為:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文檔是基于linux原生安裝RabbitMQ,然后安裝插件。

因為我是基于Docker安裝RabbitMQ,所以下面我會講解基于Docker來安裝RabbitMQ插件。

RabbitMQ有一個官方的插件社區(qū),地址為:Community Plugins — RabbitMQ

其中包含各種各樣的插件,包括我們要使用的DelayExchange插件:

rabbitmq 消息超時,java,分布式,rabbitmq

下載好后,就會獲得一個ez文件。

rabbitmq 消息超時,java,分布式,rabbitmq

1、上傳插件

因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的數據卷。

我之前設定的RabbitMQ的數據卷名稱為mq-plugins,所以我使用下面命令查看數據卷:

docker volume inspect mq-plugins
復制代碼

可以得到下面結果:

rabbitmq 消息超時,java,分布式,rabbitmq

接下來,將插件上傳到這個目錄即可:

rabbitmq 消息超時,java,分布式,rabbitmq

2、安裝插件

最后就是安裝了,需要進入MQ容器內部來執(zhí)行安裝。我的容器名為mq,所以執(zhí)行下面命令:

docker exec -it mq bash
復制代碼

執(zhí)行時,請將其中的 -it 后面的mq替換為你自己的容器名.

進入容器內部后,執(zhí)行下面命令開啟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
復制代碼

rabbitmq 消息超時,java,分布式,rabbitmq

3.2 DelayExchange原理

DelayExchange的本質還是官方的三種交換機,只是添加了延遲功能。因此使用時只需要聲明一個交換機,交換機的類型可以是任意類型,然后設定delayed屬性為true即可。

  1. 接收消息。
  2. 判斷消息是否具備x-delay屬性。
  3. 如果有x-delay屬性,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時間。
  4. 返回routing not found結果給消息發(fā)送者。
  5. x-delay時間到期后,重新投遞消息到指定隊列。

3.3 使用DelayExchange

1、基于注解的方式(推薦)

rabbitmq 消息超時,java,分布式,rabbitmq

2、基于Bean的方式

rabbitmq 消息超時,java,分布式,rabbitmq

3.4 發(fā)送消息

發(fā)送消息時,一定要攜帶x-delay屬性,指定延遲的時間:

rabbitmq 消息超時,java,分布式,rabbitmq

發(fā)送消息時間:

rabbitmq 消息超時,java,分布式,rabbitmq

接受消息時間:

rabbitmq 消息超時,java,分布式,rabbitmq

相差五秒,說明是有用的。

3.5 小結

延遲隊列插件的使用步驟包括哪些?

?聲明一個交換機,添加delayed屬性為true

?發(fā)送消息時,添加x-delay頭,值為超時時間文章來源地址http://www.zghlxwxcb.cn/news/detail-634846.html

到了這里,關于如何解決RabbitMQ中的延遲消息問題的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機,消息的 TTL 以及延遲隊列

    【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機,消息的 TTL 以及延遲隊列

    消息隊列是現代分布式應用中的關鍵組件,用于實現異步通信、解耦系統組件以及處理高并發(fā)請求。消息隊列可以用于各種應用場景,包括任務調度、事件通知、日志處理等。在消息隊列的應用中,有時需要實現消息的延遲處理、處理未能成功消費的消息等功能。 本文將介紹

    2024年02月05日
    瀏覽(96)
  • 消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現延遲隊列、整合SpringBoot

    消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現延遲隊列、整合SpringBoot

    1、延遲隊列概念 延時隊列內部是有序的 , 最重要的特性 就體現在它的 延時屬性 上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說, 延時隊列就是用來存放需要在指定時間被處理的元素的隊列。 延遲隊列使用場景: 訂單在十分鐘之內未支付則

    2024年02月22日
    瀏覽(20)
  • Centos安裝RabbitMQ,JavaSpring發(fā)送RabbitMQ延遲延時消息,JavaSpring消費RabbitMQ消息

    Centos安裝RabbitMQ,JavaSpring發(fā)送RabbitMQ延遲延時消息,JavaSpring消費RabbitMQ消息

    erlang 和 rabbitmq 版本說明 https://www.rabbitmq.com/which-erlang.html 確認需要安裝的mq版本以及對應的erlang版本。 RabbitMQ下載地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下載地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延遲消息插件下載 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    瀏覽(24)
  • RabbitMQ系列(27)--RabbitMQ使用Federation Exchange(聯邦交換機)解決異地訪問延遲問題

    RabbitMQ系列(27)--RabbitMQ使用Federation Exchange(聯邦交換機)解決異地訪問延遲問題

    前言: (broker北京)、(broker深圳)彼此之間相距甚遠,網絡延遲是一個不得不面對的問題。有一個在北京的業(yè)務(Client北京)需要連接(broker北京),向其中的交換器exchangeA發(fā)送消息,此時的網絡延遲很小,(Client北京)可以迅速將消息發(fā)送至exchangeA 中,就算在開啟了publisherconfirm機制或

    2024年02月13日
    瀏覽(18)
  • RabbitMQ實現延遲消息

    RabbitMQ實現延遲消息

    1,基于死信隊列 2,集成延遲插件 使用RabbitMQ來實現延遲消息必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,通過這兩者的組合來實現延遲隊列 消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可

    2024年02月12日
    瀏覽(22)
  • Rabbitmq延遲消息

    Rabbitmq延遲消息

    延遲消息有兩種實現方案: 1,基于死信隊列 2,集成延遲插件 使用RabbitMQ來實現延遲消息必須先了解RabbitMQ的兩個概念: 消息的TTL(存活時間)和死信交換機Exchange,通過這兩者的組合來實現延遲隊列 1.1 消息的TTL(Time To Live) 消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和

    2024年02月13日
    瀏覽(18)
  • RabbitMQ-消息延遲

    RabbitMQ-消息延遲

    ????????一個隊列接收到的消息有過期時間,消息過期之后,如果配置有死信隊列,消息就會進去死信隊列。 ????????當生產者將消息發(fā)送到exchange1,然后交換機將消息路由到隊列queue1,但是隊列queue1沒有消費者,所以當該隊列里面的值過期時,就會將消息發(fā)送到死信

    2024年01月22日
    瀏覽(20)
  • 【RabbitMQ】RabbitMQ 消息的堆積問題 —— 使用惰性隊列解決消息的堆積問題

    【RabbitMQ】RabbitMQ 消息的堆積問題 —— 使用惰性隊列解決消息的堆積問題

    消息的堆積問題是指在消息隊列系統中,當生產者以較快的速度發(fā)送消息,而消費者處理消息的速度較慢,導致消息在隊列中積累并達到隊列的存儲上限。在這種情況下,最早被發(fā)送的消息可能會在隊列中滯留較長時間,直到超過隊列的容量上限。當隊列已滿且沒有更多的可

    2024年02月05日
    瀏覽(19)
  • RabbitMQ+springboot用延遲插件實現延遲消息的發(fā)送

    RabbitMQ+springboot用延遲插件實現延遲消息的發(fā)送

    延遲隊列:其實就是死信隊列中消息過期的特殊情況 延遲隊列應用場景: 可以用死信隊列來實現,不過死信隊列要等上一個消息消費成功,才會進行下一個消息的消費,這時候就需要用到延遲插件了,不過要線在docker上裝一個插件 前置條件是在Docker中部署過RabbitMq。 1、打開

    2024年02月10日
    瀏覽(25)
  • [超詳細]RabbitMQ安裝延遲消息插件

    [超詳細]RabbitMQ安裝延遲消息插件

    Community Plugins — RabbitMQ https://www.rabbitmq.com/community-plugins.html 進入以上地址以后,找到Routing里邊的rabbitmq_delayed_message_exchange然后點擊Releases ? 下載完成以后 ?然后解壓到plugins文件中 ?然后再sbin目錄下運行?rabbitmq-plugins enable rabbitmq_delayed_message_exchange ?查看交換機類型中是否有

    2024年02月07日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包