国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

這篇具有很好參考價值的文章主要介紹了消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

十六、延遲隊列

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

1、延遲隊列概念

延時隊列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。

延遲隊列使用場景:

  • 訂單在十分鐘之內(nèi)未支付則自動取消;
  • 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒;
  • 用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒;
  • 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運營人員;
  • 預(yù)定會議后,需要在預(yù)定的時間點前十分鐘通知各個與會人員參加會議。

這些場景都有一個特點,需要在某個事件發(fā)生之后或者之前的指定時間點完成某一項任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉。那我們一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?

如果數(shù)據(jù)量比較少,確實可以這樣做,比如:對于 “如果賬單一周內(nèi)未支付則進(jìn)行自動結(jié)算” 這樣的需求, 如果對于時間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實也是一個可行的方案。

但對于數(shù)據(jù)量比較大,并且時效性較強(qiáng)的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉 “,短期內(nèi)未支付的訂單數(shù)據(jù)可能會有很多,活動期間甚至?xí)_(dá)到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

2、整合SpringBoot

(1)創(chuàng)建模塊項目

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

(2)添加依賴

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--RabbitMQ 依賴-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--swagger-->
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger2</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!--RabbitMQ 測試依賴-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

(3)修改配置文件

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

spring.rabbitmq.host=42.192.149.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

(4)添加Swagger配置類

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;


@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文檔")
                .description("本文檔描述了 rabbitmq 微服務(wù)接口定義")
                .version("1.0")
                .contact(new Contact("zhiyuan", "http://oddfar.com", "test@qq.com"))
                .build();
    }

}

3、隊列 TTL

代碼架構(gòu)圖

創(chuàng)建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設(shè)置為 10S 和 40S,然后在創(chuàng)建一個交換機(jī) X 和死信交換機(jī) Y,它們的類型都是 direct,創(chuàng)建一個死信隊列 QD,它們的綁定關(guān)系如下:

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

(1)配置類

聲明:普通交換機(jī)、死信交換機(jī)、普通隊列X2、死信隊列

綁定:XA、XB、DY

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交換機(jī)
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信隊列
    public static final String DEAD_LETTER_QUEUE = "QD";

    // 聲明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 聲明 死信隊列交換機(jī)
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //聲明隊列 A ttl 為 10s 并綁定到對應(yīng)的死信交換機(jī)
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //聲明隊列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 聲明隊列 A 綁定 X 交換機(jī)
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //聲明隊列 B ttl 為 40s 并綁定到對應(yīng)的死信交換機(jī)
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //聲明隊列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //聲明隊列 B 綁定 X 交換機(jī)
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //聲明死信隊列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //聲明死信隊列 QD 綁定關(guān)系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

(2)消息生產(chǎn)者

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

使用RabbitTemplate發(fā)消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息來自 ttl 為 10S 的隊列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息來自 ttl 為 40S 的隊列: " + message);
    }
    
}

(3)消息消費者

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

package com.kdz.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當(dāng)前時間:{},收到死信隊列信息{}", new Date().toString(), msg);
    }
}

測試效果:

訪問:localhost:8080/ttl/sendMsg/LBJ

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

延遲消息發(fā)送成功

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

第一條消息在 10S 后變成了死信消息,然后被消費者消費掉,第二條消息在 40S 之后變成了死信消息, 然后被消費掉,這樣一個延時隊列就打造完成了。

不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有 10S 和 40S 兩個時間選項,如果需要一個小時后處理,那么就需要增加 TTL 為一個小時的隊列,如果是預(yù)定會議室然后提前通知這樣的場景,豈不是要增加無數(shù)個隊列才能滿足需求?

4、延時隊列 TTL 優(yōu)化

在這里新增了一個隊列 QC,綁定關(guān)系如下,該隊列不設(shè)置 TTL 時間

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

(1)配置文件類

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

@Configuration
public class MsgTtlQueueConfig {
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String QUEUE_C = "QC";

    //聲明隊列 C 死信交換機(jī)
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //沒有聲明 TTL 屬性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //聲明隊列 B 綁定 X 交換機(jī)
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

(2)生產(chǎn)者

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

查看源碼,配置參數(shù)

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

/**
 * 延時隊列優(yōu)化
 * @param message 消息
 * @param ttlTime 延時的毫秒
 */
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
    log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(), ttlTime, message);
}

測試效果:

發(fā)起請求:

http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000

http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

看起來似乎沒什么問題,但是在最開始的時候,就介紹過如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會按時 “死亡 “。

因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列, 如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行。這也就是為什么第二個延遲時間短,卻后執(zhí)行。

此外,我們還可以通過 Rabbitmq 插件實現(xiàn)延遲隊列。

17、 Rabbitmq 插件實現(xiàn)延遲隊列

上文中提到的問題,確實是一個問題,如果不能實現(xiàn)在消息粒度上的 TTL,并使其在設(shè)置的 TTL 時間 及時死亡,就無法設(shè)計成一個通用的延時隊列。那如何解決呢,接下來我們就去解決該問題

1、安裝延時隊列插件

在官網(wǎng)上下載 https://www.rabbitmq.com/community-plugins.html,下載 rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。 進(jìn)入 RabbitMQ 的安裝目錄下的 plugins 目錄,執(zhí)行下面命令讓該插件生效,然后重啟 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式
消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

2、基于死信情況與基于插件的對比

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

3、代碼架構(gòu)圖

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

4、代碼實現(xiàn)

(1)配置文件

在我們自定義的交換機(jī)中,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并 不會立即投遞到目標(biāo)隊列中,而是存儲在 mnesia(一個分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時間時,才 投遞到目標(biāo)隊列中。

①定義隊列、交換機(jī)、RoutingKey

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

②聲明交換機(jī)

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

查看源碼

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

③聲明延遲隊列

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

④綁定交換機(jī)和隊列

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

@Configuration
public class DelayedQueueConfig {
 public static final String DELAYED_QUEUE_NAME = "delayed.queue";
 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
 public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
 @Bean
 public Queue delayedQueue() {
 return new Queue(DELAYED_QUEUE_NAME);
 }
 //自定義交換機(jī) 我們在這里定義的是一個延遲交換機(jī)
 @Bean
 public CustomExchange delayedExchange() {
 Map<String, Object> args = new HashMap<>();
 //自定義交換機(jī)的類型
 args.put("x-delayed-type", "direct");
 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, 
args);
 }
 @Bean
 public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
 @Qualifier("delayedExchange") CustomExchange 
delayedExchange) {
 return 
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
 }
}

(2)生產(chǎn)者

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
 rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, 
correlationData ->{
 correlationData.getMessageProperties().setDelay(delayTime);
 return correlationData;
 });
 log.info(" 當(dāng) 前 時 間 : {}, 發(fā)送一條延遲 {} 毫秒的信息給隊列 delayed.queue:{}", new 
Date(),delayTime, message);
}

(3) 消息消費者:

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
 String msg = new String(message.getBody());
 log.info("當(dāng)前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg);
}

測試效果:

發(fā)起請求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot,RabbitMQ,rabbitmq,spring boot,分布式

第二個消息被先消費掉了,符合預(yù)期

5、延遲隊列總結(jié)

延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現(xiàn)延時隊列可以很好的利用 RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為單個節(jié)點掛掉導(dǎo)致延時隊列不可用或者消息丟失。

當(dāng)然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset利用 Quartz 或者利用 kafka 的時間輪,這些方式各有特點,看需要適用的場景。

消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot 到此完結(jié),筆者歸納、創(chuàng)作不易,大佬們給個3連再起飛吧文章來源地址http://www.zghlxwxcb.cn/news/detail-836361.html

到了這里,關(guān)于消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機(jī),RountKey的聲明

    消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機(jī),RountKey的聲明

    目錄 0.交換機(jī)種類和區(qū)別 1.聲明隊列和交換機(jī)以及RountKey 2.初始化循環(huán)綁定 3.聲明交換機(jī) 4.監(jiān)聽隊列 4.1 監(jiān)聽普通隊列 4.2監(jiān)聽死信隊列 ?5.削峰填谷的實現(xiàn) Direct Exchange(直連交換機(jī)) : 直連交換機(jī)將消息發(fā)送到與消息的路由鍵完全匹配的隊列。它是最簡單的交換機(jī)類型之一。

    2024年04月23日
    瀏覽(587)
  • 【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊列

    【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊列

    消息隊列是現(xiàn)代分布式應(yīng)用中的關(guān)鍵組件,用于實現(xiàn)異步通信、解耦系統(tǒng)組件以及處理高并發(fā)請求。消息隊列可以用于各種應(yīng)用場景,包括任務(wù)調(diào)度、事件通知、日志處理等。在消息隊列的應(yīng)用中,有時需要實現(xiàn)消息的延遲處理、處理未能成功消費的消息等功能。 本文將介紹

    2024年02月05日
    瀏覽(96)
  • Rabbitmq 延遲隊列---插件

    ? ? ? ? 解決沒法優(yōu)先發(fā)送延時時間短的消息。 插件安裝 配置類 生產(chǎn)者 消費者

    2024年02月12日
    瀏覽(21)
  • 學(xué)會RabbitMQ的延遲隊列,提高消息處理效率

    學(xué)會RabbitMQ的延遲隊列,提高消息處理效率

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊列選型——為什么選擇RabbitMQ RabbitMQ靈活運用,怎么理解五種消息模型 RabbitMQ 能保證消息可靠性嗎 推或拉? RabbitMQ 消費模式該如何選擇 死信是什么,如何運用RabbitMQ的死信機(jī)制? 真的好用嗎?鮮有人提的 RabbitMQ-RPC模式 前面

    2024年02月14日
    瀏覽(21)
  • Docker版RabbitMQ安裝延遲隊列插件及延遲隊列項目應(yīng)用實戰(zhàn)

    Docker版RabbitMQ安裝延遲隊列插件及延遲隊列項目應(yīng)用實戰(zhàn)

    在項目中經(jīng)常有延遲業(yè)務(wù)處理的背景,此時可以借助于Rabbitmq的延遲隊列進(jìn)行實現(xiàn),但Rabbitmq本身并不支持延遲隊列,但可以通過安裝插件的方式實現(xiàn)延遲隊列 首先確認(rèn)目前項目使用的Rabbitmq的版本,這里博主的版本是3.9.15的。 訪問 Rabbitmq的github網(wǎng)址,檢索 delay 找到插件 rabb

    2024年02月02日
    瀏覽(27)
  • [超詳細(xì)]RabbitMQ安裝延遲消息插件

    [超詳細(xì)]RabbitMQ安裝延遲消息插件

    Community Plugins — RabbitMQ https://www.rabbitmq.com/community-plugins.html 進(jìn)入以上地址以后,找到Routing里邊的rabbitmq_delayed_message_exchange然后點擊Releases ? 下載完成以后 ?然后解壓到plugins文件中 ?然后再sbin目錄下運行?rabbitmq-plugins enable rabbitmq_delayed_message_exchange ?查看交換機(jī)類型中是否有

    2024年02月07日
    瀏覽(25)
  • liunx+docker+rabbitmq安裝延遲隊列插件

    liunx+docker+rabbitmq安裝延遲隊列插件

    前言 在這篇文章中,我們將討論如何在 Linux 系統(tǒng)上安裝 Docker 和 RabbitMQ,并設(shè)置延遲隊列。 Docker 是一個開放源代碼的軟件,它可以使應(yīng)用程序的部署更加簡單,而 RabbitMQ 是一個開放源代碼的消息代理軟件,它接受和轉(zhuǎn)發(fā)消息。 延遲隊列是一種在特定的延遲之后才開始處理

    2024年02月11日
    瀏覽(19)
  • Docker中為RabbitMQ安裝rabbitmq_delayed_message_exchange延遲隊列插件

    Docker中為RabbitMQ安裝rabbitmq_delayed_message_exchange延遲隊列插件

    1、前言 rabbitmq_delayed_message_exchange是一款向RabbitMQ添加延遲消息傳遞(或計劃消息傳遞)的插件。 插件下載地址:https://www.rabbitmq.com/community-plugins.html 1、下載插件 首先需要確定我們當(dāng)前使用的RabbitMQ的版本,我們可以直接登錄Web端的管理界面查看版本 ? 也可以在RabbitMQ容器中

    2024年02月12日
    瀏覽(27)
  • rabbitmq基礎(chǔ)7——隊列和消息過期時間設(shè)置、死信隊列、延遲隊列、優(yōu)先級隊列、回調(diào)隊列、惰性隊列

    rabbitmq基礎(chǔ)7——隊列和消息過期時間設(shè)置、死信隊列、延遲隊列、優(yōu)先級隊列、回調(diào)隊列、惰性隊列

    這里過一個知識點——過期時間,即對消息或隊列設(shè)置過期時間(TTL)。一旦消息過期,消費就無法接收到這條消息,這種情況是絕不允許存在的,所以官方就出了一個對策——死信隊列,死信隊列最初出現(xiàn)的意義就是為了應(yīng)對消息過期丟失情況的手段之一。 那么過期時間具

    2024年02月03日
    瀏覽(100)
  • .NetCore 使用 RabbitMQ (交換機(jī)/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

    .NetCore 使用 RabbitMQ (交換機(jī)/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

    目錄 一、安裝mq 二、實操 1、簡單模式 2、工作模式 3、fanout扇形模式(發(fā)布訂閱) 4、direct路由模式也叫定向模式 5、topic主題模式也叫通配符模式(路由模式的一種) 6、header 參數(shù)匹配模式 7、延時隊列(插件方式實現(xiàn)) 參考資料: 1、我的環(huán)境是使用VMware安裝的Centos7系統(tǒng)。MQ部署

    2023年04月09日
    瀏覽(111)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包