国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

這篇具有很好參考價(jià)值的文章主要介紹了【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、死信隊(duì)列

1.1 相關(guān)概念

死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō),producer 將消息投遞到 broker 或者直接到 queue 里了,consumerqueue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因?qū)е?queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列

應(yīng)用場(chǎng)景:

  • 為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中
  • 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)間未支付時(shí)自動(dòng)失效

1.2 死信的來(lái)源

  • 消息TTL過(guò)期 【Time to live 存活時(shí)間】
  • 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無(wú)法再添加數(shù)據(jù)到 mq 中)
  • 消息被拒絕(basic.rejectbasic.nack)并且 requeue=false 【消息應(yīng)答被拒絕并且不能重新返回隊(duì)列】

1.3 死信實(shí)戰(zhàn)

?? 1、我們先看一下案例的整體架構(gòu)圖

  • 一個(gè)生產(chǎn)者、兩個(gè)消費(fèi)者
    • 一個(gè)消費(fèi)者從正常隊(duì)列接收消息、另一個(gè)從死信隊(duì)列接收消息
      延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 2、演示第一種情況:消息TTL過(guò)期

  • 我們需要思考如何將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)?
    • 此時(shí)就用到了我們聲明隊(duì)列的第四個(gè)參數(shù),準(zhǔn)備一個(gè) map 集合
      • 以添加鍵值對(duì)的方式設(shè)置 死信交換機(jī) 和 routingKey
      //正常隊(duì)列綁定死信隊(duì)列信息
      Map<String, Object> params = new HashMap<>();
      //正常隊(duì)列設(shè)置死信交換機(jī)
      params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
      //正常隊(duì)列設(shè)置死信 routingKey
      params.put("x-dead-letter-routing-key", "lisi");
      
  • 第二個(gè)我們需要思考如何設(shè)置過(guò)期時(shí)間
    • 首先,過(guò)期時(shí)間肯定是設(shè)置給消息的,所以應(yīng)該在生產(chǎn)者發(fā)出消息前進(jìn)行設(shè)置

    • 這就用到了我們發(fā)送消息的第三個(gè)參數(shù),通過(guò) AMQP.BasicProperties 指明過(guò)期時(shí)間

      AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
      
  • 接下來(lái)是代碼部分和效果演示:

1?? 生產(chǎn)者

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

/**
 * @author Bonbons
 * @version 1.0
 * 死信隊(duì)列的生產(chǎn)者
 */
public class Producer {
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //為了實(shí)現(xiàn)消息的存活時(shí)間為10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            //演示過(guò)期成為死信消息
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("UTF-8"));
        }
    }
}

2?? 消費(fèi)者1

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我們死信隊(duì)列的消費(fèi)者1號(hào)
 */
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //獲取信道
        Channel channel = RabbitMqUtils.getChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);


        //正常隊(duì)列綁定死信隊(duì)列信息
        Map<String, Object> params = new HashMap<>();
        //正常隊(duì)列設(shè)置死信交換機(jī)
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常隊(duì)列設(shè)置死信 routingKey
        params.put("x-dead-letter-routing-key", "lisi");

        //聲明隊(duì)列 [此處用到了第四個(gè)參數(shù),實(shí)現(xiàn)將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)]
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //將我們的隊(duì)列和交換機(jī)綁定到一起
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息......");
        channel.basicConsume(NORMAL_QUEUE, true, (consumersTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
           	System.out.println("Consumer01接收到的消息: " + msg);
        }, consumersTag -> {});
    }
}

3?? 消費(fèi)者2

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我們死信隊(duì)列的消費(fèi)者2號(hào),就負(fù)責(zé)接收死信隊(duì)列的消息
 */
public class Consumer02 {
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //獲取信道
        Channel channel = RabbitMqUtils.getChannel();
        //最普通的消費(fèi)者
        System.out.println("等待接收消息......");
        channel.basicConsume(DEAD_QUEUE, true, (consumersTag, message) -> {
            System.out.println("Consumer02接收到的消息: " + new String(message.getBody(), "UTF-8"));
        }, consumersTag -> {});

    }
}
  • 我們通過(guò)不啟動(dòng)消費(fèi)者1(C1)來(lái)演示消息過(guò)期的效果
    • 因?yàn)闆](méi)有消費(fèi)者1,當(dāng)生產(chǎn)者的消息發(fā)過(guò)來(lái)就沒(méi)有消費(fèi)者進(jìn)行處理,此處10s后就會(huì)成為死信消息
    • 成為死信消息后就會(huì)從當(dāng)前的這個(gè)隊(duì)列發(fā)往死信交換機(jī)
    • 死信交換機(jī)將死信消息發(fā)往死信隊(duì)列,我們的第二個(gè)消費(fèi)者從死信隊(duì)列獲取消息并消費(fèi)

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 3、演示第二種情況:隊(duì)列達(dá)到最大長(zhǎng)度

  • 只需要在上面的代碼中進(jìn)行部分修改
    • 第一步,去掉生產(chǎn)者中消息的過(guò)期時(shí)間參數(shù)設(shè)置
      延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

    • 第二步 ,在消費(fèi)者C1的聲明普通隊(duì)列的map集合參數(shù)加入一條關(guān)于隊(duì)列長(zhǎng)度的限制

      • params.put("x-max-length", 6);
        延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

重點(diǎn): 當(dāng)我們想要修改已經(jīng)存在隊(duì)列、交換機(jī)的屬性時(shí),需要將已經(jīng)存在的刪除,然后運(yùn)行代碼重新創(chuàng)建

  • 通過(guò)RabbitMQWeb管理工具我們可以看出達(dá)到了預(yù)期的測(cè)試效果:
    • 發(fā)送十條消息,正常隊(duì)列里有六條消息,死信隊(duì)列里有四條消息

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 4、第三種情況:消息被拒并不能返回隊(duì)列

  • 對(duì)于生產(chǎn)者與第二種情況一致,消費(fèi)者C2與第一種情況一致
  • 與前兩種情況最大的不同在于消費(fèi)者C1的設(shè)定,接下來(lái)我們看一下這部分的代碼
    • 要關(guān)閉自動(dòng)應(yīng)答
package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我們死信隊(duì)列的消費(fèi)者1號(hào)
 */
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //獲取信道
        Channel channel = RabbitMqUtils.getChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);


        //正常隊(duì)列綁定死信隊(duì)列信息
        Map<String, Object> params = new HashMap<>();
        //正常隊(duì)列設(shè)置死信交換機(jī)
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常隊(duì)列設(shè)置死信 routingKey
        params.put("x-dead-letter-routing-key", "lisi");
        //設(shè)置正常隊(duì)列的長(zhǎng)度限制
//        params.put("x-max-length", 6);

        //聲明隊(duì)列 [此處用到了第四個(gè)參數(shù),實(shí)現(xiàn)將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)]
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //將我們的隊(duì)列和交換機(jī)綁定到一起
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息......");
        //此處要開(kāi)啟手動(dòng)應(yīng)答,因?yàn)槿绻O(shè)置為true(自動(dòng)應(yīng)答)就不存在拒絕消息這一說(shuō)了
        channel.basicConsume(NORMAL_QUEUE, false, (consumersTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if(msg.equals("info5")){
                System.out.println("Consumer01接收到的消息: " + msg + ": 此消息是被C1拒絕的");
                //拒絕消息,并設(shè)置不重新添加到隊(duì)列 >> 變成死信消息
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            }else{
                System.out.println("Consumer01接收到的消息: " + msg);
            }
        }, consumersTag -> {});
    }
}

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

二、延遲隊(duì)列

  • 此部分開(kāi)始,我們不再使用簡(jiǎn)單的Maven項(xiàng)目,而是創(chuàng)建一個(gè)SpringBoot項(xiàng)目來(lái)演示

2.1 延遲隊(duì)列的概念

  • 延時(shí)隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理
  • 簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列
  • 對(duì)于我們死信隊(duì)列的C2消費(fèi)者來(lái)說(shuō),它就是一個(gè)延遲隊(duì)列

2.2 延遲隊(duì)列使用場(chǎng)景

  • 1.訂單在十分鐘之內(nèi)未支付則自動(dòng)取消
  • 2.新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
  • 3.用戶注冊(cè)成功后,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒。
  • 4.用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
  • 5.預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
這些場(chǎng)景都有一個(gè)特點(diǎn),需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;看起來(lái)似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求,如果對(duì)于時(shí)間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案。但對(duì)于數(shù)據(jù)量比較大,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多,活動(dòng)期間甚至?xí)_(dá)到百萬(wàn)甚至千萬(wàn)級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無(wú)法完成所有訂單的檢查,同時(shí)會(huì)給數(shù)據(jù)庫(kù)帶來(lái)很大壓力,無(wú)法滿足業(yè)務(wù)要求而且性能低下?!疽鑫覀兊难舆t隊(duì)列】

2.3 RabbitMQ 中的 TTL

  • TTL 是什么呢?
    • TTLRabbitMQ 中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。
    • 換句話說(shuō),如果一條消息設(shè)置了 TTL 屬性或者進(jìn)入了設(shè)置 TTL 屬性的隊(duì)列,那么這條消息如果在 TTL 設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為"死信"。
    • 如果同時(shí)配置了隊(duì)列的 TTL 和消息的TTL,那么較小的那個(gè)值將會(huì)被使用,有兩種方式設(shè)置 TTL。

?? 1、如何為消息設(shè)置 TTL 呢?

  • 在我們使用 RabbitTemplateconvertAndSend 方法發(fā)送消息時(shí),通過(guò)參數(shù)設(shè)置消息的過(guò)期時(shí)間 【SpringBoot】
    延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 2、如何為隊(duì)列設(shè)置 TTL 呢?

  • 在我們創(chuàng)建隊(duì)列之間,設(shè)置隊(duì)列 map 集合參數(shù)時(shí),設(shè)置我們隊(duì)列的過(guò)期時(shí)間 【SpringBoot】
    延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 3、兩者有什么區(qū)別呢?

  • 區(qū)別:
    • 第一種方式:如果設(shè)置了隊(duì)列的 TTL 屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄(如果配置了死信隊(duì)列被丟到死信隊(duì)列中),
    • 第二種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)?mark>消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間
    • 另外,還需要注意的一點(diǎn)是,如果不設(shè)置 TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期
    • 如果將 TTL 設(shè)置為 0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。

2.4 整合 SpringBoot

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

?? 1、創(chuàng)建項(xiàng)目

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
?? 2、添加依賴 【此處直接貼上我們的pom.xml】

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu.rabbitmq</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot-rabbitmq</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--RabbitMQ 依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 測(cè)試依賴-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

?? 3、修改配置文件 【連接到我們的RabbitMQ

spring.rabbitmq.host=8.130.95.101
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

?? 4、添加 Swagger 配置類

  • 根據(jù)在代碼中使用自定義的注解來(lái)生成接口文檔,這個(gè)在前后端分離的項(xiàng)目中很重要
  • 這樣做的好處是 在開(kāi)發(fā)接口時(shí)可以通過(guò)swagger 將接口文檔定義好,同時(shí)也方便以后的維護(hù)
package com.atguigu.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文檔")
                .description("本文檔描述了 rabbitmq 微服務(wù)接口定義")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://atguigu.com",
                        "1551388580@qq.com"))
                .build();
    }
}

2.5 隊(duì)列 TTL

  • 就是通過(guò)死信隊(duì)列的方式實(shí)現(xiàn)延遲隊(duì)列,接下來(lái)直接通過(guò)案例演示
  • 首先,我們要知道:
    • 聲明工作通過(guò)配置類完成 【就是定義聲明隊(duì)列、交換機(jī)】
    • 生產(chǎn)者通過(guò)controller完成 【以瀏覽器發(fā)送請(qǐng)求的方式讓生產(chǎn)者發(fā)送消息】
    • 消費(fèi)者通過(guò)監(jiān)聽(tīng)器完成 【通過(guò)監(jiān)聽(tīng)器監(jiān)聽(tīng)對(duì)應(yīng)的隊(duì)列,來(lái)完成消費(fèi)者的功能】

1?? 代碼架構(gòu)圖

  • 創(chuàng)建兩個(gè)隊(duì)列 QAQB,兩者隊(duì)列 TTL 分別設(shè)置為 10S40S
  • 然后在創(chuàng)建一個(gè)交換機(jī) X 和死信交換機(jī) Y,它們的類型都是 direct
  • 創(chuàng)建一個(gè)死信隊(duì)列 QD,它們的綁定關(guān)系如下

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

2?? 配置類代碼 【定義隊(duì)列、交換機(jī)部分】

package com.atguigu.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 延遲隊(duì)列演示配置類
 */
@Configuration
public class TtlQueueConfig {
    //普通交換機(jī)
    public static final String X_EXCHANGE = "X";
    //死信交換機(jī)
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通隊(duì)列
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信隊(duì)列
    public static final String DEAD_LETTER_QUEUE = "QD";

    //聲明我們的兩個(gè)交換機(jī)
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //聲明我們的兩個(gè)普通隊(duì)列
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的長(zhǎng)度
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //設(shè)置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //設(shè)置過(guò)期時(shí)間
        arguments.put("x-message-ttl", 10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的長(zhǎng)度
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //設(shè)置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //設(shè)置過(guò)期時(shí)間
        arguments.put("x-message-ttl", 40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //聲明我們的死信隊(duì)列
    @Bean("queueD")
    public Queue queueD(){
//        return new Queue(DEAD_LETTER_QUEUE); //兩種創(chuàng)建隊(duì)列的方式均可
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //將我們的兩個(gè)普通隊(duì)列綁定到普通交換機(jī)X上
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //將我們的死信隊(duì)列綁定到死信交換機(jī)上
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

3?? 消息生產(chǎn)者 【編寫一個(gè)處理請(qǐng)求的控制器】

package com.atguigu.rabbitmq.controller;

import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    //發(fā)送我們基礎(chǔ)死信消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條消息給兩個(gè)TTL隊(duì)列: {}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息來(lái)自ttl為10s的隊(duì)列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息來(lái)自ttl為40s的隊(duì)列" + message);
    }
}

4?? 消息消費(fèi)者 【一個(gè)組件類里定義Rabbit監(jiān)聽(tīng)器】

package com.atguigu.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveMsg(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間: {}, 收到死信隊(duì)列的消息: {}", new Date().toString(), msg);
    }
}
  • 接下來(lái)我們通過(guò)瀏覽器發(fā)送請(qǐng)求進(jìn)行測(cè)試:

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

  • 第一條消息在 10S 后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在 40S 之后變成了死信消息,然后被消費(fèi)掉,這樣一個(gè)延時(shí)隊(duì)列就打造完成了
  • 但是基于上述這種情況,每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求?

2.6 延遲隊(duì)列優(yōu)化

  • 就是我們先不為隊(duì)列設(shè)置 TTL,而是通過(guò)瀏覽器發(fā)起請(qǐng)求的占位符來(lái)攜帶我們的過(guò)期時(shí)間
    • 注意此處的過(guò)期時(shí)間,我們是設(shè)置給消息的,并沒(méi)有設(shè)置給隊(duì)列
  • 映射URL綁定的占位符 帶占位符的URL是 Spring3.0 新增的功能

1?? 代碼架構(gòu)圖

  • 基于上面通過(guò)TTL設(shè)置的延遲隊(duì)列,我們新增了一個(gè)隊(duì)列 QC,綁定關(guān)系如下,該隊(duì)列不設(shè)置 TTL 時(shí)間

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
2?? 配置類代碼

  • 只需要在我上面的配置類 TtlQueueConfig.java 代碼里增加下面的內(nèi)容
//優(yōu)化我們的延遲隊(duì)列,再設(shè)置一個(gè)普通隊(duì)列,不為其設(shè)置過(guò)期時(shí)間,讓它做公共隊(duì)列
    public static final String QUEUE_C = "QC";
    //聲明這個(gè)隊(duì)列
    @Bean
    public Queue queueC(){
        Map<String, Object> arguments = new HashMap<>(3);
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //設(shè)置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //綁定到我們的交換機(jī)X
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

3?? 消息生產(chǎn)者代碼 【增加了一個(gè)新的請(qǐng)求映射】

  • 在我們上面的生產(chǎn)者代碼文件中進(jìn)行添加即可
//發(fā)送我們優(yōu)化后的延遲隊(duì)列的消息
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}毫秒的消息給TTL隊(duì)列QC: {}", new Date().toString(), ttlTime, message);
        //使用springboot為我們提供的rabbitTemplate來(lái)發(fā)送延遲消息
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            //設(shè)置過(guò)期時(shí)間
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
  • 對(duì)于消費(fèi)者代碼不變,接下來(lái)我們發(fā)起請(qǐng)求進(jìn)行測(cè)試:
    • 我們先發(fā)起一個(gè)過(guò)期時(shí)間長(zhǎng)的請(qǐng)求,再發(fā)起一個(gè)過(guò)期時(shí)間短的請(qǐng)求,這樣便于說(shuō)明優(yōu)化后延遲隊(duì)列的問(wèn)題

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

  • 看起來(lái)似乎沒(méi)什么問(wèn)題,但是在最開(kāi)始的時(shí)候,就介紹過(guò)如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“
  • 因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行

2.7 Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

  • 對(duì)于上文優(yōu)化后延遲隊(duì)列存在的問(wèn)題,我們可以通過(guò)使用 RabbitMQ 為我們提供的插件解決
    • 我們需要知道,這種基于插件的==實(shí)現(xiàn)延遲的效果是在交換機(jī)處完成的 ==

1?? 安裝延遲隊(duì)列插件 【在我們的第一篇配置文章資源包里有】

  • 在官網(wǎng)上下載 https://www.rabbitmq.com/community-plugins.html,下載 rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄
    延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

  • 進(jìn)入 RabbitMQ 的安裝目錄下的 plgins 目錄,執(zhí)行下面命令讓該插件生效,然后重啟 RabbitMQ

    • cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
    • rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    • rabbitmqctl start_app
      延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
  • 通過(guò) RabbitMQweb 控制臺(tái)可以看到添加插件前后交換機(jī)處的變換

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

  • 插件安裝好了之后,就可以演示如何使用我們基于插件的方式實(shí)現(xiàn)延遲隊(duì)列

2?? 代碼架構(gòu)圖

  • 在這里新增了一個(gè)隊(duì)列 delayed.queue,一個(gè)自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:
    延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

3?? 配置類代碼

在我們自定義的交換機(jī)中,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并不會(huì)立即投遞到目標(biāo)隊(duì)列中,而是存儲(chǔ)在 mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時(shí)間時(shí),才投遞到目標(biāo)隊(duì)列中

新建的配置類

package com.atguigu.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 */
@Configuration
public class DelayedQueueConfig {
    //定義我們的交換機(jī)、隊(duì)列、RoutingKey
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    //聲明交換機(jī)
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        //延遲類型
        arguments.put("x-delayed-type", "direct");
        /**
         * 1、交換機(jī)的名字
         * 2、交換機(jī)的類型 [指明是延遲消息]
         * 3、是否需要持久化
         * 4、是否開(kāi)啟自動(dòng)刪除
         * 5、其他參數(shù)
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
                true, false, arguments);
    }
    //聲明隊(duì)列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //將交換機(jī)與隊(duì)列進(jìn)行綁定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4?? 消息生產(chǎn)者

  • 在原來(lái)的生產(chǎn)者代碼里添加
    //發(fā)送我們使用插件的延遲隊(duì)列消息
    @GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime){
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}毫秒的消息給延遲隊(duì)列delayed.queue: {}", new Date().toString(), delayedTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
            //設(shè)置延遲消息
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

5?? 消息消費(fèi)者

新的監(jiān)聽(tīng)器

package com.atguigu.rabbitmq.consumer;

import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@Component
public class DelayQueueConsumer {
    //設(shè)置監(jiān)聽(tīng)器
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間: {}, 收到延遲隊(duì)列的消息: {}", new Date().toString(), msg);
    }
}
  • 發(fā)起請(qǐng)求,測(cè)試效果

延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列
延時(shí)隊(duì)列和死信隊(duì)列,RabbitMQ,java-rabbitmq,rabbitmq,學(xué)習(xí),死信隊(duì)列,延遲隊(duì)列

6?? 總結(jié)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-736805.html

  • 延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。
  • 另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
  • 當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇,比如利用 JavaDelayQueue,利用 Rediszset,利用 Quartz或者利用 kafka 的時(shí)間輪,這些方式各有特點(diǎn),看需要適用的場(chǎng)景

到了這里,關(guān)于【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ延遲隊(duì)列,死信隊(duì)列配置

    延遲和死信隊(duì)列的配置 延遲隊(duì)列有效期一分鐘,后進(jìn)入死信隊(duì)列,如果異常就進(jìn)入異常隊(duì)列 異常隊(duì)列配置類

    2024年02月14日
    瀏覽(27)
  • 【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    在電商平臺(tái)下單,訂單創(chuàng)建成功,等待支付,一般會(huì)給30分鐘的時(shí)間,開(kāi)始倒計(jì)時(shí)。如果在這段時(shí)間內(nèi)用戶沒(méi)有支付,則默認(rèn)訂單取消。 該如何實(shí)現(xiàn)? 定期輪詢(數(shù)據(jù)庫(kù)等) 用戶下單成功,將訂單信息放入數(shù)據(jù)庫(kù),同時(shí)將支付狀態(tài)放入數(shù)據(jù)庫(kù),用戶付款更改數(shù)據(jù)庫(kù)狀態(tài)。定

    2024年01月17日
    瀏覽(18)
  • .NET中使用RabbitMQ延時(shí)隊(duì)列和死信隊(duì)列

    .NET中使用RabbitMQ延時(shí)隊(duì)列和死信隊(duì)列

    延時(shí)隊(duì)列是RabbitMQ中的一種特殊隊(duì)列,它可以在消息到達(dá)隊(duì)列后延遲一段時(shí)間再被消費(fèi)。 延時(shí)隊(duì)列的實(shí)現(xiàn)原理是通過(guò)使用消息的過(guò)期時(shí)間和死信隊(duì)列來(lái)實(shí)現(xiàn)。當(dāng)消息被發(fā)送到延時(shí)隊(duì)列時(shí),可以為消息設(shè)置一個(gè)過(guò)期時(shí)間,這個(gè)過(guò)期時(shí)間決定了消息在延時(shí)隊(duì)列中等待的時(shí)間。如果

    2024年02月15日
    瀏覽(16)
  • RabbitMQ實(shí)現(xiàn)延遲消息的方式-死信隊(duì)列、延遲隊(duì)列和惰性隊(duì)列

    當(dāng)一條消息因?yàn)橐恍┰驘o(wú)法被成功消費(fèi),那么這這條消息就叫做死信,如果包含死信的隊(duì)列配置了dead-letter-exchange屬性指定了一個(gè)交換機(jī),隊(duì)列中的死信都會(huì)投遞到這個(gè)交換機(jī)內(nèi),這個(gè)交換機(jī)就叫死信交換機(jī),死信交換機(jī)再綁定一個(gè)隊(duì)列,死信最終會(huì)進(jìn)入到這個(gè)存放死信的

    2024年02月19日
    瀏覽(94)
  • 深入淺出RabbitMQ:順序消費(fèi)、死信隊(duì)列和延時(shí)隊(duì)列

    深入淺出RabbitMQ:順序消費(fèi)、死信隊(duì)列和延時(shí)隊(duì)列

    大家好,我是小?,一個(gè)漂泊江湖多年的 985 非科班程序員,曾混跡于國(guó)企、互聯(lián)網(wǎng)大廠和創(chuàng)業(yè)公司的后臺(tái)開(kāi)發(fā)攻城獅。 上篇文章(應(yīng)對(duì)流量高峰的利器——消息中間件)中,我們已經(jīng)介紹了消息中間件的用途,主要用作:解耦、削峰、異步通信、應(yīng)用解耦,并介紹了業(yè)界常

    2024年02月03日
    瀏覽(19)
  • RabbitMQ之TTL+死信隊(duì)列實(shí)現(xiàn)延遲隊(duì)列

    RabbitMQ是一個(gè)流行的消息隊(duì)列系統(tǒng),它提供了許多有用的功能,其中之一是TTL(Time To Live)和死信隊(duì)列。這些功能可以用來(lái)實(shí)現(xiàn)延遲隊(duì)列,讓我們來(lái)看看如何使用它們。 首先,什么是TTL?TTL是消息的存活時(shí)間,它可以設(shè)置為一個(gè)特定的時(shí)間段。如果消息在這個(gè)時(shí)間段內(nèi)沒(méi)有被

    2024年02月13日
    瀏覽(18)
  • (五)RabbitMQ-進(jìn)階 死信隊(duì)列、延遲隊(duì)列、防丟失機(jī)制

    (五)RabbitMQ-進(jìn)階 死信隊(duì)列、延遲隊(duì)列、防丟失機(jī)制

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 概念 在MQ中,當(dāng)消息成為死信(Dead message)后,消息中間件可以 將其從當(dāng)前隊(duì)列發(fā)送到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就是死信隊(duì)列。而 在RabbitMQ中,由于有交換機(jī)的概念,實(shí)際是將死信發(fā)送給了死 信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)。死信交

    2024年02月15日
    瀏覽(25)
  • RabbitMQ養(yǎng)成記 (10.高級(jí)特性:死信隊(duì)列,延遲隊(duì)列)

    RabbitMQ養(yǎng)成記 (10.高級(jí)特性:死信隊(duì)列,延遲隊(duì)列)

    這個(gè)概念 在其他MQ產(chǎn)品里面也是有的,只不過(guò)在Rabbitmq中稍微特殊一點(diǎn) 什么叫私信隊(duì)列呢? 就是當(dāng)消息成為 dead message之后,可以重新發(fā)到另外一臺(tái)交換機(jī),這個(gè)交換機(jī)就是DLX。 注意這里的有翻譯歧義, 這里的DLX 指的是 交換機(jī) ,而不是一個(gè)隊(duì)列。 隊(duì)列的消息長(zhǎng)度 到達(dá)限制

    2024年02月05日
    瀏覽(17)
  • RabbitMQ高級(jí)特性2 、TTL、死信隊(duì)列和延遲隊(duì)列

    RabbitMQ高級(jí)特性2 、TTL、死信隊(duì)列和延遲隊(duì)列

    設(shè)置 消費(fèi)者 測(cè)試 添加多條消息 拉取消息 每隔20秒拉取一次 一次拉取五條 然后在20秒內(nèi)一條一條消費(fèi) Time To Live(存活時(shí)間/過(guò)期時(shí)間)。 當(dāng)消息到達(dá)存活時(shí)間后,還沒(méi)有被消費(fèi),會(huì)被自動(dòng)清除。 RabbitMQ可以對(duì)消息設(shè)置過(guò)期時(shí)間,也可以對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過(guò)期時(shí)間。 可

    2024年01月16日
    瀏覽(20)
  • 【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    消息隊(duì)列在使用過(guò)程中,面臨著很多實(shí)際問(wèn)題需要思考: 消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過(guò)程: 其中的每一步都可能導(dǎo)致消息丟失,常見(jiàn)的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(98)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包