国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、什么是延遲消息

假設(shè)有一個(gè)業(yè)務(wù)場景:超過30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)?

RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。

1、隊(duì)列的屬性

隊(duì)列有一個(gè)消息過期屬性。就像豐巢超過24小時(shí)就收費(fèi)一樣,通過設(shè)置這個(gè)屬性,超過了指定事件的消息將會(huì)被丟棄。

這個(gè)屬性交:x-message-ttl

所有隊(duì)列中的消息超過時(shí)間未被消費(fèi)時(shí),都會(huì)過期。不管是誰發(fā)送的消息都一視同仁。

@Bean("ttlQueue")
public Queue queue() {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl", 11000); // 隊(duì)列中的消息未被消費(fèi)11秒后過期
    // map.put("x-expire", 30000); // 隊(duì)列30秒沒有使用以后會(huì)被刪除
    return new Queue("TTL_QUEUE", true, false, false, map);
}

但是這種方式似乎并不是那么的靈活。所以RabbitMQ的消息也有單獨(dú)的過期時(shí)間屬性。

2、消息的屬性

在生產(chǎn)者發(fā)送消息時(shí),可以通過MessageProperties指定消息屬性。

MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("這條消息4秒后過期".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE", "test.ttl", message);

那么問題來了:如果隊(duì)列的TTL是6秒過期,消息的TTL是10秒過期,這個(gè)消息會(huì)在什么時(shí)候被丟棄?
答:如果同時(shí)指定了Message TTL和Queue TTL,那么小的那個(gè)會(huì)生效。

3、什么是死信

上面我們了解到,rabbitMQ的消息可以設(shè)置過期時(shí)間,消息過期后會(huì)被直接丟棄,我們可以通過配置死信隊(duì)列,將這種消息變成死信(Dead Letter),然后將這種過期的消息丟入死信隊(duì)列。

隊(duì)列在創(chuàng)建的時(shí)候可以指定一個(gè)死信交換機(jī)DLX(Dead Letter Exchange)。死信交換機(jī)綁定的隊(duì)列被稱為死信隊(duì)列DLQ(Dead Letter Queue),DLX實(shí)際上也是普通的交換機(jī),DLQ也是普通的隊(duì)列。

RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件,中間件,java-rabbitmq,rabbitmq,分布式
也就是說,如果消息過期了,隊(duì)列指定了DLX,就會(huì)發(fā)送到DLX。如果DLX綁定了DLQ,就會(huì)路由到DLQ。路由到DLQ之后,我們就可以消費(fèi)死信隊(duì)列了。

4、使用死信隊(duì)列的缺點(diǎn)

(1)如果統(tǒng)一用隊(duì)列來設(shè)置消息的TTL,當(dāng)梯度非常多的情況下,比如1分鐘、2分鐘、5分鐘、10分鐘……需要?jiǎng)?chuàng)建很多交換機(jī)和隊(duì)列來路由消息,這時(shí)可以考慮使用消息的TTL。
(2)如果單獨(dú)設(shè)置消息的TTL,則可能會(huì)造成隊(duì)列中的消息阻塞——前一條消息沒有出隊(duì)(沒有被消費(fèi)),后面的消息無法投遞(比如第一條消息的過期時(shí)間是30分鐘,第二條消息的過期時(shí)間是10分鐘。10分鐘后,即使第二條消息應(yīng)該投遞了,但是由于第一條消息還未出隊(duì),所以無法投遞)。
(3)可能存在一定的時(shí)間誤差。

5、延時(shí)消息插件

在RabbitMQ 3.5.7及以后的版本提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來實(shí)現(xiàn)延時(shí)隊(duì)列功能(Linux和Windows都可以用)。同時(shí)插件依賴Erlang/OPT 18.0及以上。

插件源碼地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
找到對應(yīng)版本的插件,然后下載。


# 下載到plugins目錄
cd rabbitmq_server-3.7.7/plugins

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

# 啟用插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 停用插件
./rabbitmq-plugins disable rabbitmq_delayed_message_exchange

此時(shí),在管理界面的創(chuàng)建交換機(jī)頁面,會(huì)出現(xiàn)一個(gè)x-delayed-message類型的交換機(jī):
RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件,中間件,java-rabbitmq,rabbitmq,分布式

二、JavaAPI利用死信隊(duì)列實(shí)現(xiàn)RabbitMQ延遲消息

1、代碼實(shí)現(xiàn)

引包:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.AMQBasicProperties;
import java.util.HashMap;
import java.util.Map;

/**
 * 消息生產(chǎn)者,通過TTL測試死信隊(duì)列
 */
public class DlxProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// rabbitmq.uri=amqp://admin:admin@192.168.56.10:5672

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, DLX MSG";

        // 設(shè)置屬性,消息10秒鐘過期
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) // 持久化消息
                .contentEncoding("UTF-8")
                .expiration("5000") // TTL
                .build();

        // 發(fā)送消息,普通隊(duì)列
        channel.basicPublish("ORI_USE_EXCHANGE", "ORI_USE_QUEUE", properties, msg.getBytes());

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 5秒鐘后,消息會(huì)從正常隊(duì)列 ORI_USE_QUEUE 到達(dá)死信交換機(jī) DEAD_LETTER_EXCHANGE ,然后路由到死信隊(duì)列 DEAD_LETTER_QUEUE
 *
 */
public class DlxConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // rabbitmq.uri=amqp://admin:admin@192.168.56.10:5672
        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        // 指定隊(duì)列的死信交換機(jī)
        Map<String,Object> arguments = new HashMap<String,Object>();
        arguments.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");
        // arguments.put("x-expires",9000L); // 設(shè)置隊(duì)列的TTL
        // arguments.put("x-max-length", 4); // 如果設(shè)置了隊(duì)列的最大長度,超過長度時(shí),先入隊(duì)的消息會(huì)被發(fā)送到DLX

        // 聲明交換機(jī)
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare("ORI_USE_EXCHANGE","direct",false, false, null);
        // 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare("ORI_USE_QUEUE", false, false, false, arguments);
        // 綁定隊(duì)列和交換機(jī),以及routingKey
        channel.queueBind("ORI_USE_QUEUE","ORI_USE_EXCHANGE","ORI_USE_QUEUE");

        // 聲明死信交換機(jī)
        channel.exchangeDeclare("DEAD_LETTER_EXCHANGE","topic", false, false, false, null);
        // 聲明死信隊(duì)列
        channel.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null);
        // 綁定,此處 Dead letter routing key 設(shè)置為 #
        channel.queueBind("DEAD_LETTER_QUEUE","DEAD_LETTER_EXCHANGE","#");
        System.out.println(" Waiting for message....");

        // 創(chuàng)建消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
            }
        };

        // 開始獲取消息,消費(fèi)死信隊(duì)列
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume("DEAD_LETTER_QUEUE", true, consumer);
    }
}

2、基本流程

利用消息的過期時(shí)間,過期之后投遞到死信交換機(jī)(DLX),路由到死信隊(duì)列(DLQ),我們消費(fèi)者監(jiān)聽死信隊(duì)列(DLQ),實(shí)現(xiàn)延遲消息。

消息的流轉(zhuǎn)流程:生產(chǎn)者- 原交換機(jī) - 原隊(duì)列(超過TTL之后) - 死信交換機(jī) - 死信隊(duì)列 - 最終消費(fèi)者。

RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件,中間件,java-rabbitmq,rabbitmq,分布式

三、JavaAPI利用插件實(shí)現(xiàn)RabbitMQ延遲消息

1、代碼實(shí)現(xiàn)

import com.rabbitmq.client.*;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 *  使用延時(shí)插件實(shí)現(xiàn)的消息投遞-消費(fèi)者
 *  必須要在服務(wù)端安裝rabbitmq-delayed-message-exchange插件
 *  先啟動(dòng)消費(fèi)者
 */
public class DelayPluginConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:admin@192.168.56.10:5672");
        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        // 聲明x-delayed-message類型的exchange
        Map<String, Object> argss = new HashMap<String, Object>();
        argss.put("x-delayed-type", "direct");
        channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,
                false, argss);

        // 聲明隊(duì)列
        channel.queueDeclare("DELAY_QUEUE", false,false,false,null);

        // 綁定交換機(jī)與隊(duì)列
        channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");

        // 創(chuàng)建消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                System.out.println("收到消息:[" + msg + "]\n接收時(shí)間:" +sf.format(new Date()));
            }
        };

        // 開始獲取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume("DELAY_QUEUE", true, consumer);
    }
}
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 *  使用延時(shí)插件實(shí)現(xiàn)的消息投遞-生產(chǎn)者
 *  必須要在服務(wù)端安裝rabbitmq-delayed-message-exchange插件
 *  先啟動(dòng)消費(fèi)者
 */
public class DelayPluginProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:admin@192.168.56.10:5672");

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        // 延時(shí)投遞,比如延時(shí)10秒
        Date now = new Date();
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, +10);// 10秒
        Date delayTime = calendar.getTime();

        // 定時(shí)投遞,把這個(gè)值替換delayTime即可
        // Date exactDealyTime = new Date("2019/01/14,22:30:00");

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String msg = "發(fā)送時(shí)間:" + sf.format(now) + ",投遞時(shí)間:" + sf.format(delayTime);

        // 延遲的間隔時(shí)間,目標(biāo)時(shí)刻減去當(dāng)前時(shí)刻
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-delay", delayTime.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),
                msg.getBytes());

        channel.close();
        conn.close();
    }
}

2、基本原理

rabbitMQ的延遲消息插件,可以有效的避免消息堵塞問題。

相當(dāng)于投遞給一個(gè)延遲消息的交換機(jī),并指定延遲時(shí)間,大大簡化了開發(fā)。文章來源地址http://www.zghlxwxcb.cn/news/detail-549091.html

四、Springboot利用死信隊(duì)列實(shí)現(xiàn)延遲消息

1、配置實(shí)現(xiàn)

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信隊(duì)列 DLX DLQ
 */
@Configuration
public class DlxConfig {
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean("oriUseExchange")
    public DirectExchange exchange() {

        return new DirectExchange("ORI_USE_EXCHANGE", true, false, new HashMap<>());
    }

    @Bean("oriUseQueue")
    public Queue queue() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-message-ttl", 10000); // 10秒鐘后成為死信
        map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE"); // 隊(duì)列中的消息變成死信后,進(jìn)入死信交換機(jī)
        return new Queue("ORI_USE_QUEUE", true, false, false, map);
    }

    @Bean
    public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("ori.use");
    }

    /**
     * 隊(duì)列的死信交換機(jī)
     * @return
     */
    @Bean("deatLetterExchange")
    public TopicExchange deadLetterExchange() {
        return new TopicExchange("DEAD_LETTER_EXCHANGE", true, false, new HashMap<>());
    }

    @Bean("deatLetterQueue")
    public Queue deadLetterQueue() { // 消費(fèi)者只監(jiān)聽該隊(duì)列即可
        return new Queue("DEAD_LETTER_QUEUE", true, false, false, new HashMap<>());
    }

    @Bean
    public Binding bindingDead(@Qualifier("deatLetterQueue") Queue queue,@Qualifier("deatLetterExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("#"); // 無條件路由
    }

}

import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackages = "com.dlx.ttl")
public class DlxSender {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DlxSender.class);
        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 隨隊(duì)列的過期屬性過期,單位ms
        rabbitTemplate.convertAndSend("ORI_USE_EXCHANGE", "ori.use", "測試死信消息");

    }
}

五、Springboot利用插件實(shí)現(xiàn)延遲消息

1、配置實(shí)現(xiàn)

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

// 配置類
@Configuration
public class DelayPluginConfig {
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setUri("amqp://admin:admin@192.168.56.10:5672");
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean("delayExchange")
    public TopicExchange exchange() {
        Map<String, Object> argss = new HashMap<String, Object>();
        argss.put("x-delayed-type", "direct");
        return new TopicExchange("DELAY_EXCHANGE", true, false, argss);
    }

    @Bean("delayQueue")
    public Queue deadLetterQueue() {
        return new Queue("DELAY_QUEUE", true, false, false, new HashMap<>());
    }

    @Bean
    public Binding bindingDead(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("#"); // 無條件路由
    }

}

import com.rabbitmq.client.*;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * 消費(fèi)者
 */
public class DelayPluginConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:admin@192.168.56.10:5672");
        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        // 創(chuàng)建消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                System.out.println("收到消息:[" + msg + "]\n接收時(shí)間:" +sf.format(new Date()));
            }
        };

        // 開始獲取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume("DELAY_QUEUE", true, consumer);
    }
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;


/**
 * 生產(chǎn)者
 * 延時(shí)消息插件,去管控臺隊(duì)列看有無收到消息
 */
@ComponentScan(basePackages = "com.dlx.delayplugin")
public class DelayPluginProducer {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DelayPluginProducer.class);
        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 延時(shí)投遞,比如延時(shí)4秒
        Date now = new Date();
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, +4);
        Date delayTime = calendar.getTime();

        // 定時(shí)投遞,把這個(gè)值替換delayTime即可
        // Date exactDealyTime = new Date("2019/06/24,22:30:00");
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String msg = "延時(shí)插件測試消息,發(fā)送時(shí)間:" + sf.format(now) + ",理論路由時(shí)間:" + sf.format(delayTime);

        MessageProperties messageProperties = new MessageProperties();
        // 延遲的間隔時(shí)間,目標(biāo)時(shí)刻減去當(dāng)前時(shí)刻
        messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime());
        Message message = new Message(msg.getBytes(), messageProperties);

        // 不能在本地測試,必須發(fā)送消息到安裝了插件的服務(wù)端
        rabbitTemplate.send("DELAY_EXCHANGE", "#", message);

    }
}

到了這里,關(guān)于RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊(duì)列

    【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊(duì)列

    消息隊(duì)列是現(xiàn)代分布式應(yīng)用中的關(guān)鍵組件,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)組件以及處理高并發(fā)請求。消息隊(duì)列可以用于各種應(yīng)用場景,包括任務(wù)調(diào)度、事件通知、日志處理等。在消息隊(duì)列的應(yīng)用中,有時(shí)需要實(shí)現(xiàn)消息的延遲處理、處理未能成功消費(fèi)的消息等功能。 本文將介紹

    2024年02月05日
    瀏覽(95)
  • rabbitmq基礎(chǔ)7——隊(duì)列和消息過期時(shí)間設(shè)置、死信隊(duì)列、延遲隊(duì)列、優(yōu)先級隊(duì)列、回調(diào)隊(duì)列、惰性隊(duì)列

    rabbitmq基礎(chǔ)7——隊(duì)列和消息過期時(shí)間設(shè)置、死信隊(duì)列、延遲隊(duì)列、優(yōu)先級隊(duì)列、回調(diào)隊(duì)列、惰性隊(duì)列

    這里過一個(gè)知識點(diǎn)——過期時(shí)間,即對消息或隊(duì)列設(shè)置過期時(shí)間(TTL)。一旦消息過期,消費(fèi)就無法接收到這條消息,這種情況是絕不允許存在的,所以官方就出了一個(gè)對策——死信隊(duì)列,死信隊(duì)列最初出現(xiàn)的意義就是為了應(yīng)對消息過期丟失情況的手段之一。 那么過期時(shí)間具

    2024年02月03日
    瀏覽(100)
  • Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列實(shí)現(xiàn)

    問題:什么是延遲隊(duì)列 我們常說的延遲隊(duì)列是指消息進(jìn)入隊(duì)列后不會(huì)被立即消費(fèi),只有達(dá)到指定時(shí)間后才能被消費(fèi)。 但RabbitMq中并 沒有提供延遲隊(duì)列功能 。那么RabbitMQ如何實(shí)現(xiàn)延遲隊(duì)列 通過:死信隊(duì)列 + RabbitMQ的TTL特性實(shí)現(xiàn)。 實(shí)現(xiàn)原理 給一個(gè)普通帶有過期功能的隊(duì)列綁定一

    2024年02月15日
    瀏覽(19)
  • .NET中使用RabbitMQ延時(shí)隊(duì)列和死信隊(duì)列

    .NET中使用RabbitMQ延時(shí)隊(duì)列和死信隊(duì)列

    延時(shí)隊(duì)列是RabbitMQ中的一種特殊隊(duì)列,它可以在消息到達(dá)隊(duì)列后延遲一段時(shí)間再被消費(fèi)。 延時(shí)隊(duì)列的實(shí)現(xiàn)原理是通過使用消息的過期時(shí)間和死信隊(duì)列來實(shí)現(xiàn)。當(dāng)消息被發(fā)送到延時(shí)隊(duì)列時(shí),可以為消息設(shè)置一個(gè)過期時(shí)間,這個(gè)過期時(shí)間決定了消息在延時(shí)隊(duì)列中等待的時(shí)間。如果

    2024年02月15日
    瀏覽(16)
  • RabbitMQ之TTL+死信隊(duì)列實(shí)現(xiàn)延遲隊(duì)列

    RabbitMQ是一個(gè)流行的消息隊(duì)列系統(tǒng),它提供了許多有用的功能,其中之一是TTL(Time To Live)和死信隊(duì)列。這些功能可以用來實(shí)現(xiàn)延遲隊(duì)列,讓我們來看看如何使用它們。 首先,什么是TTL?TTL是消息的存活時(shí)間,它可以設(shè)置為一個(gè)特定的時(shí)間段。如果消息在這個(gè)時(shí)間段內(nèi)沒有被

    2024年02月13日
    瀏覽(18)
  • SpringCloudStream整合RabbitMQ用ttl+死信實(shí)現(xiàn)延遲隊(duì)列的實(shí)踐

    這篇是關(guān)于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信隊(duì)列的方式實(shí)現(xiàn)的延遲隊(duì)列。 在公司項(xiàng)目中遇到了需要延遲隊(duì)列的需求,為了以后可維護(hù)性和擴(kuò)展性要求必須要用Springcloud Stream組件來操作mq,而且公司的rabbit也不允許安裝延遲插件,只能用最原始的ttl+死信來實(shí)現(xiàn),在搭

    2024年02月12日
    瀏覽(26)
  • RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

    RabbitMQ - 死信隊(duì)列,延時(shí)隊(duì)列

    死信隊(duì)列: DLX 全稱(Dead-Letter-Exchange),稱之為死信交換器,當(dāng)消息變成一個(gè)死信之后,如果這個(gè)消息所在的隊(duì)列存在 x-dead-letter-exchange 參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對應(yīng)值的交換器上,這個(gè)交換器就稱之為死信交換器,與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列

    2024年02月09日
    瀏覽(24)
  • RabbitMQ延遲隊(duì)列,死信隊(duì)列配置

    延遲和死信隊(duì)列的配置 延遲隊(duì)列有效期一分鐘,后進(jìn)入死信隊(duì)列,如果異常就進(jìn)入異常隊(duì)列 異常隊(duì)列配置類

    2024年02月14日
    瀏覽(27)
  • 【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列

    【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列

    在電商平臺下單,訂單創(chuàng)建成功,等待支付,一般會(huì)給30分鐘的時(shí)間,開始倒計(jì)時(shí)。如果在這段時(shí)間內(nèi)用戶沒有支付,則默認(rèn)訂單取消。 該如何實(shí)現(xiàn)? 定期輪詢(數(shù)據(jù)庫等) 用戶下單成功,將訂單信息放入數(shù)據(jù)庫,同時(shí)將支付狀態(tài)放入數(shù)據(jù)庫,用戶付款更改數(shù)據(jù)庫狀態(tài)。定

    2024年01月17日
    瀏覽(18)
  • 【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說, producer 將消息投遞到 broker 或者直接到 queue 里了, consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候 由于特定的原因?qū)е?queue 中的某些消息無法被消費(fèi) ,這樣的消息如果沒有后續(xù)的處理,就變成了死

    2024年02月06日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包