国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列

這篇具有很好參考價值的文章主要介紹了【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

設(shè)置TTL(過期時間)

概述

在電商平臺下單,訂單創(chuàng)建成功,等待支付,一般會給30分鐘的時間,開始倒計(jì)時。如果在這段時間內(nèi)用戶沒有支付,則默認(rèn)訂單取消。

該如何實(shí)現(xiàn)?

  1. 定期輪詢(數(shù)據(jù)庫等)

用戶下單成功,將訂單信息放入數(shù)據(jù)庫,同時將支付狀態(tài)放入數(shù)據(jù)庫,用戶付款更改數(shù)據(jù)庫狀態(tài)。定期輪詢數(shù)據(jù)庫支付狀態(tài),如果超過30分鐘就將該訂單取消。

優(yōu)點(diǎn):設(shè)計(jì)實(shí)現(xiàn)簡單

缺點(diǎn):需要對數(shù)據(jù)庫進(jìn)行大量的IO操作,效率低下。

  1. Timer

Timer可以用來設(shè)置指定時間后執(zhí)行的任務(wù)。

SimpleDateFormat simpleDateFormat=new SimpleDateFormat("HH:mm:ss");
Timer timer=new Timer();
TimerTask timerTask=new TimerTask(){
    @Override public void run(){
        System.out.println("用戶沒有付款,交易取消:"+simpleDateFormat.format(new Date(System.currentTimeMillis())));
        timer.cancel();
    }
};
System.out.println("等待用戶付款:"+simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后執(zhí)行timerTask 
timer.schedule(timerTask, 10 * 1000);

缺點(diǎn):

Timers沒有持久化機(jī)制;不靈活 (只可以設(shè)置開始時間和重復(fù)間隔,對等待支付貌似夠用);不能利用線程池,一個timer一個線程;沒有真正的管理計(jì)劃。

  1. ScheduledExecutorService
SimpleDateFormat format=new SimpleDateFormat("HH:mm:ss"); 
// 線程工廠
ThreadFactory factory = Executors.defaultThreadFactory(); 
// 使用線程池 
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("開始等待用戶付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() { 
    @Override public void run() { 
        System.out.println("用戶未付款,交易取消:" + format.format(new Date())); 
    }
    // 等待10s 單位秒 
}, 10, TimeUnit.SECONDS);

優(yōu)點(diǎn):可以多線程執(zhí)行,一定程度上避免任務(wù)間互相影響,單個任務(wù)異常不影響其它任務(wù)。

在高并發(fā)的情況下,不建議使用定時任務(wù)去做,因?yàn)樘速M(fèi)服務(wù)器性能,不建議。

  1. RabbitMQ的TTL

  2. Quartz

  3. JCronTab

等等。

下面就重點(diǎn)介紹下,如何使用RabbitMQ的TTL

RabbitMQ使用TTL

TTL,Time to Live 的簡稱,即過期時間。

RabbitMQ 可以對消息和隊(duì)列兩個維度來設(shè)置TTL。

任何消息中間件的容量和堆積能力都是有限的,如果有一些消息總是不被消費(fèi)掉,那么需要有一種過期的機(jī)制來做兜底。

目前有兩種方法可以設(shè)置消息的TTL:

  1. 通過Queue屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時間。
  2. 對消息自身進(jìn)行單獨(dú)設(shè)置,每條消息的TTL 可以不同。

如果兩種方法一起使用,則消息的TTL 以兩者之間較小數(shù)值為準(zhǔn)。通常來講,消息在隊(duì)列中的生存時間一旦超過設(shè)置的TTL 值時,就會變成“死信”(Dead Message),消費(fèi)者默認(rèn)就無法再收到該消息。當(dāng)然,“死信”也是可以被取出來消費(fèi)的,下一小節(jié)我們會講解。

原生API案例

try{
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel())
        // 創(chuàng)建隊(duì)列(實(shí)際上使用的是AMQP default這個direct類型的交換器) 
        // 設(shè)置隊(duì)列屬性 
        Map<String, Object> arguments=new HashMap<>();
        // 設(shè)置隊(duì)列的TTL 
        arguments.put("x-message-ttl",30000);
        // 設(shè)置隊(duì)列的空閑存活時間(如該隊(duì)列根本沒有消費(fèi)者,一直沒有使用,隊(duì)列可以存活多久) 
        arguments.put("x-expires",10000);
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
        for(int i=0;i< 1000000;i++){
            String message="Hello World!"+i;
            channel.basicPublish("",QUEUE_NAME,new AMQP.BasicProperties().builder().expiration("30000").build(),message.getBytes());
            System.out.println(" [X] Sent '"+message+"'");
        }
}catch(TimeoutException e){
    e.printStackTrace();
}catch(IOException e){
    e.printStackTrace();
}

此外,還可以通過命令行方式設(shè)置全局TTL,執(zhí)行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

還可以通過restful api方式設(shè)置,這里不做過多介紹。

默認(rèn)規(guī)則:

  1. 如果不設(shè)置TTL,則表示此消息不會過期;
  2. 如果TTL設(shè)置為0,則表示除非此時可以直接將消息投遞到消費(fèi)者,否則該消息會被立即丟棄;

注意理解 message-ttl 、 x-expires 這兩個參數(shù)的區(qū)別,有不同的含義。但是這兩個參數(shù)屬性都遵循上面的默認(rèn)規(guī)則。一般TTL相關(guān)的參數(shù)單位都是毫秒(ms)。

springboot案例

在配置類里聲明隊(duì)列的時候設(shè)置TTL:

@Bean 
public Queue queueTTLWaiting() { 
    Map<String, Object> props = new HashMap<>(); 
    // 對于該隊(duì)列中的消息,設(shè)置都等待10s 
    props.put("x-message-ttl", 10000); 
    Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props); 
    return queue; 
}

在生產(chǎn)者發(fā)消息時,可以指定消息的TTL:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;

@RestController
public class PayController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/pay/queuettl")
    public String sendMessage() {
        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttl-waiting", "發(fā)送了TTL-WAITING-MESSAGE");
        return "queue-ttl-ok";
    }

    @RequestMapping("/pay/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");  // 設(shè)置消息的過期時間
        Message message = new Message("發(fā)送了WAITING- MESSAGE".getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok";
    }
}

死信隊(duì)列

概述

死信隊(duì)列,英文縮寫是:DLX(Dead Letter Exchange),其實(shí)應(yīng)該稱為死信交換機(jī)更為合適。

當(dāng)消息成為死信后,可以被重新發(fā)送到另一個交換機(jī),這個交換機(jī)就是死信交換機(jī)。

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列,# RabbitMq,rabbitmq,消息隊(duì)列,分布式

實(shí)際上,死信隊(duì)列就是普通的交換機(jī),只不過我們?nèi)藶榈慕o其賦予了特殊的含義:當(dāng)消息成為死信后,會重新發(fā)送到 DLX(死信交換機(jī))。

默認(rèn)情況下,當(dāng)消息成為死信(過期、隊(duì)列滿了、消息 TTL 過期)的時候,RabbitMQ 會將這些消息進(jìn)行清理,但是當(dāng)配置了死信隊(duì)列之后,RabbitMQ 會將死信發(fā)送到 DLX (死信交換機(jī))中,這樣就可以避免消息丟失。

死信隊(duì)列的應(yīng)用場景:

    • 為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時,將消息投入到死信隊(duì)列中。
    • 用戶在商城下單成功并進(jìn)行支付活動,如果在指定的時候沒有支付,將會將訂單自動失效。

以下幾種情況導(dǎo)致消息變?yōu)樗佬牛?/p>

  1. 消息被拒絕(Basic.Reject/Basic.Nack),并且設(shè)置requeue參數(shù)為false;
  2. 消息過期;
  3. 隊(duì)列達(dá)到最大長度。

對于RabbitMQ 來說,DLX 是一個非常有用的特性。它可以處理異常情況下,消息不能夠被消費(fèi)者正確消費(fèi)(消費(fèi)者調(diào)用了Basic.Nack 或者Basic.Reject)而被置入死信隊(duì)列中的情況,后續(xù)分析程序可以通過消費(fèi)這個死信隊(duì)列中的內(nèi)容來分析當(dāng)時所遇到的異常情況,進(jìn)而可以改善和優(yōu)化系統(tǒng)。

原生API案例

try{
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        // 定義一個死信交換器(也是一個普通的交換器)

        channel.exchangeDeclare("exchange.dlx","direct",true);
        // 定義一個正常業(yè)務(wù)的交換器
        channel.exchangeDeclare("exchange.biz", "fanout", true);
        Map<String, Object> arguments = new HashMap<>();
        // 設(shè)置隊(duì)列TTL
        arguments.put("x-message-ttl", 10000);
        // 設(shè)置該隊(duì)列所關(guān)聯(lián)的死信交換器(當(dāng)隊(duì)列消息TTL到期后依然沒有消費(fèi),則加入死信隊(duì)列)
        arguments.put("x-dead-letter-exchange", "exchange.dlx");
        // 設(shè)置該隊(duì)列所關(guān)聯(lián)的死信交換器的routingKey,如果沒有特殊指定,使用原隊(duì)列的 routingKey
        arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
        channel.queueDeclare("queue.biz", true, false, false, arguments);
        channel.queueBind("queue.biz", "exchange.biz", "");
        channel.queueDeclare("queue.dlx", true, false, false, null);
        // 死信隊(duì)列和死信交換器
        channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");
        channel.basicPublish("exchange.biz", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes());
} catch (Exception e) {
    e.printStackTrace();
}

springboot案例

下面通過設(shè)置TTL模擬在SpringBoot中如何使用死信隊(duì)列。

修改RabbitConfig配置類,設(shè)置普通隊(duì)列的屬性(聲明其死信隊(duì)列和交換器),聲明死信交換器,代碼如下:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        Map<String, Object> props = new HashMap<>();
        // 消息的生存時間 10s
        props.put("x-message-ttl", 10000);
        // 設(shè)置該隊(duì)列所關(guān)聯(lián)的死信交換器(當(dāng)隊(duì)列消息TTL到期后依然沒有消費(fèi),則加 入死信隊(duì)列)
        props.put("x-dead-letter-exchange", "ex.go.dlx");
        // 設(shè)置該隊(duì)列所關(guān)聯(lián)的死信交換器的routingKey,如果沒有特殊指定,使用原 隊(duì)列的routingKey
        props.put("x-dead-letter-routing-key", "go.dlx");
        Queue queue = new Queue("q.go", true, false, false, props);
        return queue;
    }

    @Bean
    public Queue queueDlx() {
        Queue queue = new Queue("q.go.dlx", true, false, false);
        return queue;
    }

    @Bean
    public Exchange exchange() {
        DirectExchange exchange = new DirectExchange("ex.go", true, false, null);
        return exchange;
    }

    /**
     * 死信交換器
     *
     * @return
     */
    @Bean
    public Exchange exchangeDlx() {
        DirectExchange exchange = new DirectExchange("ex.go.dlx", true, false, null);
        return exchange;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
    }

    /**
     * 死信交換器綁定死信隊(duì)列
     * @return
     */
    @Bean
    public Binding bindingDlx() {
        return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").noargs();
    }
}

在生產(chǎn)者端代碼不用變。

如果想演示超過最大最列長度,可以設(shè)置普通對列長度:

Map<String, Object> props = MapUtil.newHashMap();
// 設(shè)置隊(duì)列的最大長度
props.put("x-max-length", 10);

延遲隊(duì)列

概述

延遲消息是指的消息發(fā)送出去后并不想立即就被消費(fèi),而是需要等(指定的)一段時間后才觸發(fā)消費(fèi)。

例如下面的業(yè)務(wù)場景:在支付寶上面買電影票,鎖定了一個座位后系統(tǒng)默認(rèn)會幫你保留15分鐘時間,如果15分鐘后還沒付款那么不好意思系統(tǒng)會自動把座位釋放掉。怎么實(shí)現(xiàn)類似的功能呢?

  1. 可以用定時任務(wù)每分鐘掃一次,發(fā)現(xiàn)有占座超過15分鐘還沒付款的就釋放掉。但是這樣做很低效,很多時候做的都是些無用功;

  2. 可以用分布式鎖、分布式緩存的被動過期時間,15分鐘過期后鎖也釋放了,緩存key也不存在了;

  3. 還可以用延遲隊(duì)列,鎖座成功后會發(fā)送1條延遲消息,這條消息15分鐘后才會被消費(fèi),消費(fèi)的過程就是檢查這個座位是否已經(jīng)是“已付款”狀態(tài);

延遲隊(duì)列的應(yīng)用場景:

    • ① 訂單在 10 分鐘之內(nèi)沒有付款就自動取消。
    • ② 新創(chuàng)建的店鋪,如果在 10 天之內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
    • ③ 用戶注冊成功后,如果三天沒有登錄,則發(fā)送短信進(jìn)行提醒。
    • ④ 用戶發(fā)起退款,如果三天之內(nèi)沒有得到處理,則通知相關(guān)運(yùn)營人員。
    • ⑤ 預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前 10 分鐘通知各個與會人員參加會議。

遺憾的是,在AMQP協(xié)議和RabbitMQ中都沒有相關(guān)的規(guī)定和實(shí)現(xiàn)。

不過可以使用rabbitmq_delayed_message_exchange插件實(shí)現(xiàn)。

還可以我們可以借助上一小節(jié)介紹的“死信隊(duì)列”來變相的實(shí)現(xiàn)。

插件和TTL方式有個很大的不同就是TTL存放消息在死信隊(duì)列(delayqueue)里,二基于插件存放消息在延時交換機(jī)里(x-delayed-message exchange)。

插件實(shí)現(xiàn)延遲隊(duì)列

安裝插件

官網(wǎng),下載 rabbitmq_delayed_message_exchange 插件,并解壓到 RabbitMQ 的插件目錄。

進(jìn)入 RabbitMQ 的插件目錄:

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

啟用插件:

rabbitmq-plugins list 
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重啟rabbitmq-server:

systemctl restart rabbitmq-server

添加延遲隊(duì)列插件之后:

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列,# RabbitMq,rabbitmq,消息隊(duì)列,分布式

代碼

實(shí)現(xiàn)流程如下:

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列,# RabbitMq,rabbitmq,消息隊(duì)列,分布式

配置類RabbitmqConfig.java:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置類,用來聲明交換機(jī)和隊(duì)列,并配置之間的關(guān)系
 */
@Configuration
public class RabbitmqConfig {

    /**
     * 普通交換機(jī)
     */
    public static final String EXCHANGE = "delayed.exchange";

    /**
     * routingkey
     */
    public static final String ROUTING_KEY = "delayed.routingkey";

    /**
     * 普通隊(duì)列
     */
    public static final String QUEUE = "delayed.queue";

    @Bean
    public CustomExchange exchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 聲明隊(duì)列
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 綁定關(guān)系
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
    }
}

生產(chǎn)者ProducerController.java:

import com.github.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
 * 生產(chǎn)者
 */
@Slf4j
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{msg}/{ttl}")
    public String msg(@PathVariable("msg") String msg, @PathVariable("ttl") Integer ttl) {
        log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒 TTL 信息給隊(duì)列:{}", LocalDateTime.now(), ttl, msg);
        MessagePostProcessor messagePostProcessor = (message) -> {
            // 注意,這里不再是 setExpiration ,而是 setDelay
            message.getMessageProperties().setDelay(ttl);
            return message;
        };
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg, messagePostProcessor);
        return "發(fā)送消息成功";
    }
}

消費(fèi)者RabbitmqListener.java:

import com.github.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;

/**
 * 消費(fèi)者
 */
@Slf4j
@Component
public class RabbitmqListener {

    @RabbitListener(queues = RabbitmqConfig.QUEUE)
    public void receive(Message message) {
        log.info("當(dāng)前時間:{},收到死信隊(duì)列信息:{}", LocalDateTime.now(), new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

演示:

curl 'http://127.0.0.1:8080/send/消息1/20000' -X GET
curl 'http://127.0.0.1:8080/send/消息2/2000' -X GET

IDEA 控制臺結(jié)果顯示:

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列,# RabbitMq,rabbitmq,消息隊(duì)列,分布式

TTL實(shí)現(xiàn)延遲隊(duì)列

實(shí)現(xiàn)

實(shí)現(xiàn)過程如下:

【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列,# RabbitMq,rabbitmq,消息隊(duì)列,分布式

配置類RabbitmqConfig.java:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置類,用來聲明交換機(jī)和隊(duì)列,并配置之間的關(guān)系
 */
@Configuration
public class RabbitmqConfig {

    /**
     * 普通交換機(jī) X
     */
    public static final String EXCHANGE_X = "X";

    /**
     * 普通隊(duì)列 QA
     */
    public static final String QUEUE_A = "QA";

    /**
     * 普通 routing key
     */
    public static final String ROUTING_KEY_XA = "XA";

    /**
     * 普通隊(duì)列 QB
     */
    public static final String QUEUE_B = "QB";

    /**
     * 普通 routing key
     */
    public static final String ROUTING_KEY_XB = "XB";

    /**
     * 死信交換機(jī) Y
     */
    public static final String DEAD_EXCHANGE_Y = "Y";

    /**
     * 死信隊(duì)列 QD
     */
    public static final String DEAD_QUEUE_D = "QD";

    /**
     * 死信 routing key
     */
    public static final String DEAD_ROUTING_KEY_YD = "YD";

    /**
     * 聲明交換機(jī)
     */
    @Bean
    public DirectExchange xExchange() {
        return new DirectExchange(EXCHANGE_X);
    }

    /**
     * 聲明死信交換機(jī)
     */
    @Bean
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE_Y);
    }

    /**
     * 聲明隊(duì)列
     */
    @Bean
    public Queue aQueue() {
        return QueueBuilder.durable(QUEUE_A)
                // 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
                .deadLetterExchange(DEAD_EXCHANGE_Y)
                // 聲明當(dāng)前隊(duì)列綁定的死信隊(duì)列
                .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
                // 設(shè)置 TTL 時間
                .ttl(10 * 1000)
                .build();
    }

    /**
     * 聲明隊(duì)列
     */
    @Bean
    public Queue bQueue() {
        return QueueBuilder.durable(QUEUE_B)
                // 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
                .deadLetterExchange(DEAD_EXCHANGE_Y)
                // 聲明當(dāng)前隊(duì)列綁定的死信隊(duì)列
                .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
                // 設(shè)置 TTL 時間
                .ttl(40 * 1000)
                .build();
    }

    /**
     * 聲明死信隊(duì)列
     */
    @Bean
    public Queue dQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_D).build();
    }

    /**
     * 綁定關(guān)系
     */
    @Bean
    public Binding xaBinding() {
        return BindingBuilder.bind(aQueue()).to(xExchange()).with(ROUTING_KEY_XA);
    }

    /**
     * 綁定關(guān)系
     */
    @Bean
    public Binding xbBinding() {
        return BindingBuilder.bind(bQueue()).to(xExchange()).with(ROUTING_KEY_XB);
    }

    /**
     * 綁定關(guān)系
     */
    @Bean
    public Binding ydBinding() {
        return BindingBuilder.bind(dQueue()).to(yExchange()).with(DEAD_ROUTING_KEY_YD);
    }

}

生產(chǎn)者ProducerController.java:

import com.github.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
 * 生產(chǎn)者
 */
@Slf4j
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public String msg(@PathVariable("msg") String msg) {
        log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊(duì)列:{}", LocalDateTime.now(), msg);
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XA, "消息來自 ttl 為 10S 的隊(duì)列: " + msg);
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XB, "消息來自 ttl 為 40s 的隊(duì)列: " + msg);
        return "發(fā)送消息成功";
    }

}

消費(fèi)者RabbitmqListener.java:

import com.github.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;

/**
 * 消費(fèi)者
 */
@Slf4j
@Component
public class RabbitmqListener {

    @RabbitListener(queues = RabbitmqConfig.DEAD_QUEUE_D)
    public void receive(Message message) {
        log.info("當(dāng)前時間:{},收到死信隊(duì)列信息:{}", LocalDateTime.now(), new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

延遲隊(duì)列優(yōu)化

上面使用TTL實(shí)現(xiàn)了延遲隊(duì)列,但是此時有些問題,如果現(xiàn)在我需要 5 min、10 min……,那么我豈不是每增加一個時間需求,就需要增加一個隊(duì)列,如果是預(yù)定會議提前通知的場景,難道要增加無數(shù)個隊(duì)列來滿足要求?

解決:在消費(fèi)者那邊設(shè)置消息的 TTL 時間。

但是注意: RabbitMQ只會檢查隊(duì)列頭部的消息是否過期,如果過期就放到死信隊(duì)列,假如第一個過期時間很長,10s,第二個消息3s,則系統(tǒng)先看第一個消息,等到第一個消息過期,放到DLX。此時才會檢查第二個消息,但實(shí)際上此時第二個消息早已經(jīng)過期了,但是并沒有先于第一個消息放到DLX。 使用插件不會出現(xiàn)這個問題,所以推薦使用插件實(shí)現(xiàn)延遲隊(duì)列。文章來源地址http://www.zghlxwxcb.cn/news/detail-796528.html

到了這里,關(guān)于【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包