国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Time-To-Live and Expiration — RabbitMQ

一、死信隊(duì)列

Dead Letter Exchanges — RabbitMQ

死信隊(duì)列:

DLX 全稱(Dead-Letter-Exchange),稱之為死信交換器,當(dāng)消息變成一個(gè)死信之后,如果這個(gè)消息所在的隊(duì)列存在x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換器上,這個(gè)交換器就稱之為死信交換器,與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列

RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

死信消息:

  • 消息被拒絕(Basic.Reject或Basic.Nack)并且設(shè)置 requeue 參數(shù)的值為 false
  • 消息過(guò)期(消息TTL過(guò)期。TTL:Time To Live的簡(jiǎn)稱,即過(guò)期時(shí)間)
  • 隊(duì)列達(dá)到最大的長(zhǎng)度

過(guò)期消息:

在 rabbitmq 中存在2種方法可設(shè)置消息的過(guò)期時(shí)間:

  • 第一種通過(guò)對(duì)隊(duì)列進(jìn)行設(shè)置,這種設(shè)置后,該隊(duì)列中所有的消息都存在相同的過(guò)期時(shí)間
  • 第二種通過(guò)對(duì)消息本身進(jìn)行設(shè)置,那么每條消息的過(guò)期時(shí)間都不一樣

如果同時(shí)使用這2種方法,那么以過(guò)期時(shí)間小的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過(guò)期時(shí)間還沒(méi)有被消費(fèi),那么那個(gè)消息就成為了一個(gè) 死信 消息

隊(duì)列設(shè)置:在隊(duì)列申明的時(shí)候使用** x-message-ttl **參數(shù),單位為 毫秒;

  • 隊(duì)列中這個(gè)屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開始已存在且沒(méi)有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以。
  • 隊(duì)列的 TTL 只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常

單個(gè)消息設(shè)置:是設(shè)置消息屬性的 expiration 參數(shù)的值,單位為 毫秒。

說(shuō)明:

對(duì)于第一種設(shè)置隊(duì)列屬性的方法,一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去;而在第二種方法中,即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的

?1. 生產(chǎn)者:
  聲明隊(duì)列的時(shí)候用屬性指定其死信隊(duì)列交換機(jī)名稱。

測(cè)試:

package rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

? ? public static ConnectionFactory getConnectionFactory() {
? ? ? ? // 創(chuàng)建連接工程,下面給出的是默認(rèn)的case
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("192.168.99.100");
? ? ? ? factory.setPort(5672);
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? factory.setVirtualHost("/");
? ? ? ? return factory;
? ? }

? ? public static void main(String[] args) throws IOException, TimeoutException ?{
? ? ? ? ConnectionFactory connectionFactory = getConnectionFactory();
? ? ? ? Connection newConnection = null;
? ? ? ? Channel createChannel = null;
? ? ? ? try {
? ? ? ? ? ? newConnection = connectionFactory.newConnection();
? ? ? ? ? ? createChannel = newConnection.createChannel();
? ? ? ? ? ??
? ? ? ? ? ? // 聲明一個(gè)正常的direct類型的交換機(jī)
? ? ? ? ? ? createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
? ? ? ? ? ? // 聲明死信交換機(jī)為===order.dead.exchange
? ? ? ? ? ? String dlxName = "order.dead.exchange";
? ? ? ? ? ? createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
? ? ? ? ? ? // 聲明隊(duì)列并指定死信交換機(jī)為上面死信交換機(jī)
? ? ? ? ? ? Map<String, Object> arg = new HashMap<String, Object>();
? ? ? ? ? ? arg.put("x-dead-letter-exchange", dlxName);
? ? ? ? ? ? createChannel.queueDeclare("myQueue", true, false, false, arg);
? ? ? ? ? ??
? ? ? ? ? ? String message = "測(cè)試消息";
? ? ? ? ? ? createChannel.basicPublish("order.exchange", "routing_key_myQueue", null, message.getBytes());
? ? ? ? ? ? System.out.println("消息發(fā)送成功");
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally {
? ? ? ? ? ? if (createChannel != null) {
? ? ? ? ? ? ? ? createChannel.close();
? ? ? ? ? ? }
? ? ? ? ? ? if (newConnection != null) {
? ? ? ? ? ? ? ? newConnection.close();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ??
? ? }
}

結(jié)果:

(1)生成兩個(gè)Exchange

?(2)隊(duì)列myQueue的死信隊(duì)列有屬性

2. 消費(fèi)者:?
  一個(gè)消費(fèi)者監(jiān)聽(tīng)正常隊(duì)列,一個(gè)消費(fèi)者監(jiān)聽(tīng)死信隊(duì)列。(只是綁定的交換機(jī)不同)

消費(fèi)者一:監(jiān)聽(tīng)正常隊(duì)列

package rabbitmq;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

? ? public static ConnectionFactory getConnectionFactory() {
? ? ? ? // 創(chuàng)建連接工程,下面給出的是默認(rèn)的case
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("192.168.99.100");
? ? ? ? factory.setPort(5672);
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? factory.setVirtualHost("/");
? ? ? ? return factory;
? ? }

? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory connectionFactory = getConnectionFactory();
? ? ? ? Connection newConnection = null;
? ? ? ? Channel createChannel = null;
? ? ? ? try {
? ? ? ? ? ? newConnection = connectionFactory.newConnection();
? ? ? ? ? ? createChannel = newConnection.createChannel();

? ? ? ? ? ? // 隊(duì)列綁定交換機(jī)-channel.queueBind(隊(duì)列名, 交換機(jī)名, 路由key[廣播消息設(shè)置為空串])
? ? ? ? ? ? createChannel.queueBind("myQueue", "order.exchange", "routing_key_myQueue");

? ? ? ? ? ? createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {

? ? ? ? ? ? ? ? ? ? System.out.println("consumerTag: " + consumerTag);
? ? ? ? ? ? ? ? ? ? System.out.println("envelope: " + envelope);
? ? ? ? ? ? ? ? ? ? System.out.println("properties: " + properties);
? ? ? ? ? ? ? ? ? ? String string = new String(body, "UTF-8");
? ? ? ? ? ? ? ? ? ? System.out.println("接收到消息: -》 " + string);

? ? ? ? ? ? ? ? ? ? long deliveryTag = envelope.getDeliveryTag();
? ? ? ? ? ? ? ? ? ? Channel channel = this.getChannel();
? ? ? ? ? ? ? ? ? ? System.out.println("拒絕消息, 使之進(jìn)入死信隊(duì)列");
? ? ? ? ? ? ? ? ? ? System.out.println("時(shí)間: " + new Date());
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? TimeUnit.SECONDS.sleep(3);
? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ? ? // basicReject第二個(gè)參數(shù)為false進(jìn)入死信隊(duì)列或丟棄
? ? ? ? ? ? ? ? ? ? channel.basicReject(deliveryTag, false);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });

? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally {
? ? ? ? }

? ? }
}

消費(fèi)者二:監(jiān)聽(tīng)死信隊(duì)列

package rabbitmq;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer2 {

? ? public static ConnectionFactory getConnectionFactory() {
? ? ? ? // 創(chuàng)建連接工程,下面給出的是默認(rèn)的case
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("192.168.99.100");
? ? ? ? factory.setPort(5672);
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? factory.setVirtualHost("/");
? ? ? ? return factory;
? ? }

? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory connectionFactory = getConnectionFactory();
? ? ? ? Connection newConnection = null;
? ? ? ? Channel createChannel = null;
? ? ? ? try {
? ? ? ? ? ? newConnection = connectionFactory.newConnection();
? ? ? ? ? ? createChannel = newConnection.createChannel();

? ? ? ? ? ? // 隊(duì)列綁定交換機(jī)-channel.queueBind(隊(duì)列名, 交換機(jī)名, 路由key[廣播消息設(shè)置為空串])
? ? ? ? ? ? createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue");

? ? ? ? ? ? createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {

? ? ? ? ? ? ? ? ? ? System.out.println("時(shí)間: " + new Date());
? ? ? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ? ? System.out.println("consumerTag: " + consumerTag);
? ? ? ? ? ? ? ? ? ? System.out.println("envelope: " + envelope);
? ? ? ? ? ? ? ? ? ? System.out.println("properties: " + properties);
? ? ? ? ? ? ? ? ? ? String string = new String(body, "UTF-8");
? ? ? ? ? ? ? ? ? ? System.out.println("接收到消息: -》 " + string);

? ? ? ? ? ? ? ? ? ? long deliveryTag = envelope.getDeliveryTag();
? ? ? ? ? ? ? ? ? ? Channel channel = this.getChannel();
? ? ? ? ? ? ? ? ? ? channel.basicAck(deliveryTag, true);
? ? ? ? ? ? ? ? ? ? System.out.println("死信隊(duì)列中處理完消息息");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });

? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally {
? ? ? ? }

? ? }
}

結(jié)果: 消費(fèi)者一先正常監(jiān)聽(tīng)到,basicReject為false拒絕后進(jìn)入死信隊(duì)列;消費(fèi)者二監(jiān)聽(tīng)的死信隊(duì)列收到消息。

消費(fèi)者一打出的日志如下:

consumerTag: amq.ctag-0noHs24F0FsGe-dfwwqWNw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測(cè)試消息
拒絕消息, 使之進(jìn)入死信隊(duì)列
時(shí)間: Sat Nov 07 12:18:44 CST 2020

消費(fèi)者二打出的日志如下:

時(shí)間: Sat Nov 07 12:18:47 CST 2020
consumerTag: amq.ctag-ajYMpMFkXHDiYWkD3XFJ7Q
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 01:52:19 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測(cè)試消息
死信隊(duì)列中處理完消息息

注意:

  進(jìn)入死信隊(duì)列之后,headers 加了一些死信相關(guān)的信息,包括原隊(duì)列以及進(jìn)入死信的原因。

補(bǔ)充:在隊(duì)列進(jìn)入死信隊(duì)列之前也可以修改其routingKey,而且只有在指定x-dead-letter-exchange的前提下才能修改下面屬性,否則會(huì)報(bào)錯(cuò)

(1)修改生產(chǎn)者聲明隊(duì)列的方式,如下:

// 聲明一個(gè)正常的direct類型的交換機(jī)
? ? ? ? ? ? createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
? ? ? ? ? ? // 聲明死信交換機(jī)為===order.dead.exchange
? ? ? ? ? ? String dlxName = "order.dead.exchange";
? ? ? ? ? ? createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
? ? ? ? ? ? // 聲明隊(duì)列并指定死信交換機(jī)為上面死信交換機(jī)
? ? ? ? ? ? Map<String, Object> arg = new HashMap<String, Object>();
? ? ? ? ? ? arg.put("x-dead-letter-exchange", dlxName);
? ? ? ? ? ? // 修改進(jìn)入死信隊(duì)列的routingkey,如果不修改會(huì)使用默認(rèn)的routingKey
? ? ? ? ? ? arg.put("x-dead-letter-routing-key", "routing_key_myQueue_dead");
? ? ? ? ? ? createChannel.queueDeclare("myQueue", true, false, false, arg);

(2)修改監(jiān)聽(tīng)死信隊(duì)列的消費(fèi)者二:

// 隊(duì)列綁定交換機(jī)-channel.queueBind(隊(duì)列名, 交換機(jī)名, 路由key[廣播消息設(shè)置為空串])
? ? ? ? ? ? createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue_dead");

結(jié)果,收到消費(fèi)者二收到的信息如下:

時(shí)間: Sat Nov 07 12:27:08 CST 2020
consumerTag: amq.ctag-THqpEdYH_-iNeCIccgpuaw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue_dead)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 02:00:41 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測(cè)試消息
死信隊(duì)列中處理完消息

二、延時(shí)隊(duì)列

延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會(huì)被消費(fèi)

RabbitMQ本身沒(méi)提供延時(shí)隊(duì)列,我們可以利用消息的生存時(shí)間和死信隊(duì)列實(shí)現(xiàn)延時(shí)

典型的應(yīng)用場(chǎng)景就是訂單30分鐘內(nèi)未支付就關(guān)閉訂單,還有一種場(chǎng)景,賬單24小時(shí)未確認(rèn),就發(fā)送提醒消息

RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

延時(shí)隊(duì)列插件安裝

2.1.1、yml配置

spring:
    rabbitmq:
        host: 192.168.99.12
        port: 5672
        username: guest
        password: guest
        # 發(fā)送確認(rèn)
        publisher-confirms: true
        # 路由失敗回調(diào)
        publisher-returns: true
        template:
            # 必須設(shè)置成true 消息路由失敗通知監(jiān)聽(tīng)者,false 將消息丟棄
            mandatory: true
        #消費(fèi)端
        listener:
            simple:
                # 每次從RabbitMQ獲取的消息數(shù)量
                prefetch: 1
                default-requeue-rejected: false
                # 每個(gè)隊(duì)列啟動(dòng)的消費(fèi)者數(shù)量
                concurrency: 1
                # 每個(gè)隊(duì)列最大的消費(fèi)者數(shù)量
                max-concurrency: 1
                # 簽收模式為手動(dòng)簽收-那么需要在代碼中手動(dòng)ACK
                acknowledge-mode: manual
#郵件隊(duì)列
email:
    queue:
        name: demo.email

#郵件交換器名稱
exchange:
    name: demoTopicExchange

#死信隊(duì)列
dead:
    letter:
        queue:
            name: demo.dead.letter
        exchange:
            name: demoDeadLetterTopicExchange

#延時(shí)隊(duì)列
delay:
    queue:
        name: demo.delay
    exchange:
        name: demoDelayTopicExchange

2.1.2、延時(shí)隊(duì)列配置

/**
 * rabbitmq 配置
 *
 * @author DUCHONG
 * @since 2020-08-23 14:05
 **/
@Configuration
@Slf4j
public class RabbitmqConfig {


    @Value("${email.queue.name}")
    private String emailQueue;
    @Value("${exchange.name}")
    private String topicExchange;
    @Value("${dead.letter.queue.name}")
    private String deadLetterQueue;
    @Value("${dead.letter.exchange.name}")
    private String deadLetterExchange;
    @Value("${delay.queue.name}")
    private String delayQueue;
    @Value("${delay.exchange.name}")
    private String delayExchange;

    @Bean
    public Queue emailQueue() {

        Map<String, Object> arguments = new HashMap<>(2);
        // 綁定死信交換機(jī)
        arguments.put("x-dead-letter-exchange", deadLetterExchange);
        // 綁定死信的路由key
        arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");

        return new Queue(emailQueue,true,false,false,arguments);
    }


    @Bean
    TopicExchange emailExchange() {
        return new TopicExchange(topicExchange);
    }


    @Bean
    Binding bindingEmailQueue() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
    }


    //私信隊(duì)列和交換器
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(deadLetterQueue);
    }

    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(deadLetterExchange);
    }

    @Bean
    Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
    }
    //延時(shí)隊(duì)列
    @Bean
    public Queue delayQueue() {
        return new Queue(delayQueue);
    }

    @Bean
    CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "topic");
        //參數(shù)二為類型:必須是x-delayed-message
        return new CustomExchange(delayExchange, "x-delayed-message", true, false, args);

    }

    @Bean
    Binding bindingDelayQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
    }
}

2.2、消息發(fā)送方

30分鐘時(shí)間太久了,這里延時(shí)2分鐘來(lái)看效果

@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${exchange.name}")
    private String topicExchange;

    @Value("${delay.exchange.name}")
    private String delayTopicExchange;

    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendEmailMessage() {

        String msg = RandomStringUtils.randomAlphanumeric(8);
        JSONObject email=new JSONObject();
        email.put("content",msg);
        email.put("to","duchong@qq.com");
        CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
        log.info("---發(fā)送 email 消息---{}---messageId---{}",email,correlationData.getId());
    }


    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendDelayOrderMessage() throws Exception{

        //訂單號(hào) id實(shí)際是保存訂單后返回的,這里用uuid代替
        String orderId = UUID.randomUUID().toString();
        // 模擬訂單信息
        JSONObject order=new JSONObject();
        order.put("orderId",orderId);
        order.put("goodsName","vip充值");
        order.put("orderAmount","99.00");
        CorrelationData correlationData=new CorrelationData(orderId);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(orderId);
        //30分鐘時(shí)間太長(zhǎng),這里延時(shí)120s消費(fèi)
        messageProperties.setHeader("x-delay", 120000);
        Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);

        rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);

        log.info("---發(fā)送 order 消息---{}---orderId---{}",order,correlationData.getId());
        //睡一會(huì),為了看延遲效果
        TimeUnit.MINUTES.sleep(10);
    }
}

2.3、消息消費(fèi)方

@Component
@Slf4j
public class MessageHandler {


    /**
     * 郵件發(fā)送
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.email")
    @RabbitHandler
    public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
            log.info("---接受到消息---{}",jsonObject);
			//主動(dòng)異常
			int m=1/0;
            //手動(dòng)簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("handleEmailMessage捕獲到異常,拒絕重新入隊(duì)---消息ID---{}", headers.get("spring_returned_message_correlation"));
            //異常,ture 重新入隊(duì),或者false,進(jìn)入死信隊(duì)列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
    }

    /**
     * 死信消費(fèi)者,自動(dòng)簽收開啟狀態(tài)下,超過(guò)重試次數(shù),或者手動(dòng)簽收,reject或者Nack
     * @param message
     */
    @RabbitListener(queues = "demo.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {

        //可以考慮數(shù)據(jù)庫(kù)記錄,每次進(jìn)來(lái)查數(shù)量,達(dá)到一定的數(shù)量,進(jìn)行預(yù)警,人工介入處理
        log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
		//回復(fù)ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    /**
     * 延時(shí)隊(duì)列消費(fèi)
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.delay")
    @RabbitHandler
    public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            log.info("---接受到訂單消息---orderId---{}",message.getMessageProperties().getMessageId());
            log.info("---訂單信息---order---{}",jsonObject);
            //業(yè)務(wù)邏輯,根據(jù)訂單id獲取訂單信息,如果還未支付,設(shè)置關(guān)閉狀態(tài),如果已支付,不做任何處理
            //手動(dòng)簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("handleOrderDelayMessage捕獲到異常,重新入隊(duì)---orderId---{}", headers.get("spring_returned_message_correlation"));
            //異常,ture 重新入隊(duì),或者false,進(jìn)入死信隊(duì)列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

        }
    }

}

2.4、結(jié)果

RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

運(yùn)行結(jié)果顯示,同一個(gè)訂單號(hào)的消息,發(fā)送過(guò)后2分鐘,消費(fèi)者才接受到,符合預(yù)期

https://www.cnblogs.com/geekdc/p/13550620.html

消息隊(duì)列RabbitMQ(五):死信隊(duì)列與延遲隊(duì)列

rabbitmq的延遲隊(duì)列和死信隊(duì)列_死信隊(duì)列和延時(shí)隊(duì)列的區(qū)別_zhuwenaptx的博客-CSDN博客

RabbitMQ的死信隊(duì)列和延時(shí)隊(duì)列 - 簡(jiǎn)書

RabbitMQ死信隊(duì)列與延遲隊(duì)列_51CTO博客_rabbitmq延遲隊(duì)列文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-485004.html

到了這里,關(guān)于RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Time to live exceeded

    Time to live exceeded的原因:數(shù)據(jù)包未上傳成功,形成路由環(huán)路。 當(dāng)對(duì)網(wǎng)絡(luò)上的主機(jī)進(jìn)行ping操作的時(shí)候,本地機(jī)器會(huì)發(fā)出一個(gè)數(shù)據(jù)包,數(shù)據(jù)包經(jīng)過(guò)一定數(shù)量的路由器傳送到目的主機(jī),但是由于很多的原因,一些數(shù)據(jù)包不能正常傳送到目的主機(jī),如果不給這些數(shù)據(jù)包一個(gè)生存時(shí)間

    2024年02月14日
    瀏覽(20)
  • 【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    在電商平臺(tái)下單,訂單創(chuàng)建成功,等待支付,一般會(huì)給30分鐘的時(shí)間,開始倒計(jì)時(shí)。如果在這段時(shí)間內(nèi)用戶沒(méi)有支付,則默認(rèn)訂單取消。 該如何實(shí)現(xiàn)? 定期輪詢(數(shù)據(jù)庫(kù)等) 用戶下單成功,將訂單信息放入數(shù)據(jù)庫(kù),同時(shí)將支付狀態(tài)放入數(shù)據(jù)庫(kù),用戶付款更改數(shù)據(jù)庫(kù)狀態(tài)。定

    2024年01月17日
    瀏覽(18)
  • RabbitMQ-死信交換機(jī)和死信隊(duì)列

    RabbitMQ-死信交換機(jī)和死信隊(duì)列

    DLX: Dead-Letter-Exchange 死信交換器,死信郵箱 當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。 如下圖所示: 其實(shí)死信隊(duì)列就是一個(gè)普通的交換機(jī),有些隊(duì)列的消息成為死信后,(比如過(guò)期了或者隊(duì)列滿了)這些死信一般情況下是會(huì)被 RabbitMQ 清理

    2024年02月08日
    瀏覽(26)
  • RabbitMQ延遲隊(duì)列,死信隊(duì)列配置

    延遲和死信隊(duì)列的配置 延遲隊(duì)列有效期一分鐘,后進(jìn)入死信隊(duì)列,如果異常就進(jìn)入異常隊(duì)列 異常隊(duì)列配置類

    2024年02月14日
    瀏覽(28)
  • 【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之死信隊(duì)列。 目錄 一、RabbitMQ死信隊(duì)列 1.1、什么是死信隊(duì)列 1.2、設(shè)置過(guò)期時(shí)間TTL 1.3、配置死信交換機(jī)和死信隊(duì)列(代碼配置) (1)設(shè)置隊(duì)列過(guò)期時(shí)間 (2)設(shè)置單條消息過(guò)期時(shí)間 (3)隊(duì)列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • 【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō), producer 將消息投遞到 broker 或者直接到 queue 里了, consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候 由于特定的原因?qū)е?queue 中的某些消息無(wú)法被消費(fèi) ,這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死

    2024年02月06日
    瀏覽(22)
  • 【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列

    【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列

    ?????????????????????????????????????????????????????????????????? ?? 【 R a b b i t M Q 教 程 】 第 五 章 — — R a b b i t M Q ? 死 信 隊(duì) 列 color{#FF1493}{【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列} 【 R a b b i t M Q 教 程 】 第 五 章 — — R a

    2024年02月09日
    瀏覽(18)
  • RabbitMQ——死信隊(duì)列

    死信隊(duì)列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一種重要特性,用于處理無(wú)法被消費(fèi)的消息,防止消息丟失。 死信的來(lái)源 在消息隊(duì)列中,當(dāng)消息滿足一定條件而無(wú)法被正常消費(fèi)時(shí),這些消息會(huì)被發(fā)送到死信隊(duì)列。滿足條件的情況包括但不限于: 消息被拒絕( basic.reject 或 bas

    2024年03月14日
    瀏覽(22)
  • RabbitMQ進(jìn)階——死信隊(duì)列

    RabbitMQ進(jìn)階——死信隊(duì)列

    在消息隊(duì)列中,執(zhí)行異步任務(wù)時(shí),通常是將消息生產(chǎn)者發(fā)布的消息存儲(chǔ)在隊(duì)列中,由消費(fèi)者從隊(duì)列中獲取并處理這些消息。但是,在某些情況下,消息可能無(wú)法正常地被處理和消耗,例如:格式錯(cuò)誤、設(shè)備故障等,這些未成功處理的消息就被稱為“死信”。 為了避免這些未成

    2024年04月13日
    瀏覽(25)
  • RabbitMQ: 死信隊(duì)列

    RabbitMQ: 死信隊(duì)列

    其實(shí)就是一個(gè)普通的隊(duì)列,綁定號(hào)私信交換機(jī),不給ttl,給上匹配的路由,等待交換機(jī)發(fā)送消息。 1.在消費(fèi)者里的RabbitMQConfig配置類里,創(chuàng)建隊(duì)列,給它加參數(shù) 第四個(gè)參數(shù),就是放入這個(gè)隊(duì)列,的一些屬性參數(shù) 也就是這兩個(gè)位置 對(duì)應(yīng)Java代碼里好像少個(gè)參數(shù),排他性,是指,

    2024年02月09日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包