国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rabbitMQ引入死信隊列

這篇具有很好參考價值的文章主要介紹了rabbitMQ引入死信隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、基本概念

1、死信定義

? ? ? ? 指的是,從隊列當(dāng)中取出來的消息,到達(dá)消費方后,因為某些原因?qū)е孪⒉]有被正常消費掉,這些沒有被后續(xù)處理的消息就是“死信”,而保存死信的隊列,就是死信隊列。

2、死信出現(xiàn)的場景舉例

? ? ? ? 為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用死信隊列機制,在消息消費發(fā)生異常的時候,將消息給投入到死信隊列當(dāng)中;

? ? ? ? ? 用戶在商城下單成功并點進(jìn)去準(zhǔn)備支付,超過指定時間未支付時,消息自動失效成為死信(消息超時情況);

3、死信的來源

? ? ? ? 消息TTL過期;

? ? ? ? 隊列已經(jīng)到達(dá)最大長度,數(shù)據(jù)無法再添加到MQ;

? ? ? ? 消息被拒絕了;

4、死信架構(gòu)圖

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

分析:

對于消息生產(chǎn)者而言,只需要關(guān)注將消息發(fā)送給交換機即可;

而對于普通消費者C1而言,需要關(guān)注普通交換機、普通隊列、死信交換機的相關(guān)信息,要做兩次綁定操作(普通交換機和普通隊列綁定,普通隊列和死信隊列綁定),難點在于---普通隊列怎么與死信交換機進(jìn)行綁定?

而消費者C2也是一個普通消費者,專注于死信隊列當(dāng)中消息的處理,需要關(guān)注死信交換機、死信隊列的信息,在死信交換機和死信隊列綁定之后,從隊列當(dāng)中拿到死信進(jìn)行處理;

二、代碼部分

(0)提前準(zhǔn)備工具類

封裝獲取MQ的connection方法,以及釋放資源的方法

public class AMQPUtils {
    //用于獲取客戶端和MQ綁定的連接對象
    public static Connection getConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/test");//用于隔離資源的虛擬主機
        factory.setHost("MQServer的ip地址");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        Connection connection = factory.newConnection();
        return connection;
    }

    //釋放資源
    public static void close(Channel channel,Connection connection) throws Exception{
        channel.close();
        connection.close();
    }
}

(一)模擬消息超時情況

1、消息發(fā)布者

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //聲明一個普通交換機
        String normalExchange = "normal_Exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //設(shè)置消息的ttl時間為5s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
        //發(fā)布消息-發(fā)布多條消息驗證
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,properties,("message---->"+i).getBytes());
        }
        //釋放資源
        AMQPUtils.close(channel,connection);
    }
}

2、消費者C1

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

代碼:

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //提前準(zhǔn)備一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //聲明普通交換機
        channel.exchangeDeclare(normalExchange,"direct");
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);
        //聲明普通隊列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //將普通交換機和普通隊列綁定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //將將死信交換機和死信隊列綁定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消費消息
        channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1消費的消息是---->"+new String(body));
            }
        });
    }
}

注意:開啟C1接受普通交換機的消息之后,關(guān)閉C1,讓普通隊列當(dāng)中的消息超過超時時間,成為死信,后被死信交換機路由進(jìn)入dead_queue當(dāng)中,下圖所示的就是消息超時之后進(jìn)入死信隊列:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

點擊dead_queue可以查看具體的死信來源、交換機、路由key等信息;

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

此時:注意此時死信消息是保存在MQ當(dāng)中的;

3、消費者c2

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

消費者C2要去消費死信隊列當(dāng)中的消息:

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //準(zhǔn)備一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交換機和隊列綁定
        channel.queueBind(deadQueue,deadExchange,key);
        //消費死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消費了死信----->"+new String(body));
            }
        });
    }
}

控制臺結(jié)果:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

此時存儲在死信隊列當(dāng)中的消息已經(jīng)被C2消費了!

(二)模擬隊列達(dá)到最大長度

請?zhí)崆霸贛Q的控制臺上,將情況1當(dāng)中設(shè)置的隊列給刪除;

1、消息發(fā)布者

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //聲明一個普通交換機
        String normalExchange = "normal_exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //發(fā)布消息-發(fā)布多條消息驗證
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
        }
        //釋放資源
        AMQPUtils.close(channel,connection);
    }
}

2、消費者C1

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //提前準(zhǔn)備一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //聲明普通交換機
        channel.exchangeDeclare(normalExchange,"direct");
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);
        //設(shè)置普通隊列的最大長度
        params.put("x-max-length",6);

        //聲明普通隊列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //將普通交換機和普通隊列綁定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //將將死信交換機和死信隊列綁定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消費消息
        channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1消費的消息是---->"+new String(body));
            }
        });
       
    }
}

開啟C1,然后關(guān)閉,再啟動消息生產(chǎn)者,結(jié)果:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

3、消費者C2

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //準(zhǔn)備一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交換機和隊列綁定
        channel.queueBind(deadQueue,deadExchange,key);
        //消費死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消費了死信----->"+new String(body));
            }
        });
    }
}

結(jié)果:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

死信隊列當(dāng)中的消息已經(jīng)被消費了;

(三)模擬消息被拒絕

1、消息發(fā)布者

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //聲明一個普通交換機
        String normalExchange = "normal_exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //發(fā)布消息-發(fā)布多條消息驗證
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
        }
        //釋放資源
        AMQPUtils.close(channel,connection);
    }
}

2、消費者C1

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        final Channel channel = connection.createChannel();
        //提前準(zhǔn)備一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //聲明普通交換機
        channel.exchangeDeclare(normalExchange,"direct");
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);

        //聲明普通隊列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //將普通交換機和普通隊列綁定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //將將死信交換機和死信隊列綁定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消費消息--注意關(guān)閉自動確認(rèn)
        channel.basicConsume(normalQueue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /*System.out.println("C1消費的消息是---->"+new String(body));*/
                //模擬消息被拒絕--把所有消息都拒絕
                channel.basicReject(envelope.getDeliveryTag(),false);
            }
        });
      
    }
}

注意:先啟動C1,然后關(guān)閉,啟動消息發(fā)布者,結(jié)果如下:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

這10條消息目前都保存在MQ當(dāng)中,然后再啟動C1,把消息全部拒絕掉,讓它們成為死信:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

點擊dead_queue,去查看死信隊列當(dāng)中的一些信息:

rabbitMQ引入死信隊列,java-rabbitmq,rabbitmq,java

3、消費者C2

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //準(zhǔn)備一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //聲明死信交換機
        channel.exchangeDeclare(deadExchange,"direct");
        //聲明死信隊列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交換機和隊列綁定
        channel.queueBind(deadQueue,deadExchange,key);
        //消費死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消費了死信----->"+new String(body));
            }
        });
    }
}

啟動消費者C2,消費死信隊列當(dāng)中的消息!

三、小結(jié)

????????死信隊列的出現(xiàn),是為了保存因為特殊原因無法被消費的消息,避免消息直接失效!這些消息通過rabbitMQ的死信隊列機制,可以保存在MQ服務(wù)的死信隊列當(dāng)中,等待被其他的消費者進(jìn)行處理!

?需要注意的是:

? ? ? ?只有針對消息的設(shè)置會放在消息發(fā)布方進(jìn)行,隊列等操作,因為發(fā)布方無法自己決定消息被路由到哪個隊列,只能決定把消息交給哪個交換機,以及給定路由規(guī)則;

? ? ? 對于消息消費方而言,需要確定交換機、消息隊列,已經(jīng)完成?交換機和隊列的綁定操作,所以針對于隊列的設(shè)置都是放在消費方完成的!

????????文章來源地址http://www.zghlxwxcb.cn/news/detail-788056.html

到了這里,關(guān)于rabbitMQ引入死信隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ延遲隊列,死信隊列配置

    延遲和死信隊列的配置 延遲隊列有效期一分鐘,后進(jìn)入死信隊列,如果異常就進(jìn)入異常隊列 異常隊列配置類

    2024年02月14日
    瀏覽(28)
  • RabbitMQ-死信交換機和死信隊列

    RabbitMQ-死信交換機和死信隊列

    DLX: Dead-Letter-Exchange 死信交換器,死信郵箱 當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。 如下圖所示: 其實死信隊列就是一個普通的交換機,有些隊列的消息成為死信后,(比如過期了或者隊列滿了)這些死信一般情況下是會被 RabbitMQ 清理

    2024年02月08日
    瀏覽(26)
  • RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    假設(shè)有一個業(yè)務(wù)場景:超過30分鐘未付款的訂單自動關(guān)閉,這個功能應(yīng)該怎么實現(xiàn)? RabbitMQ使用死信隊列,可以實現(xiàn)消息的延遲接收。 隊列有一個消息過期屬性。就像豐巢超過24小時就收費一樣,通過設(shè)置這個屬性,超過了指定事件的消息將會被丟棄。 這個屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • Rabbitmq死信隊列及延時隊列實現(xiàn)

    問題:什么是延遲隊列 我們常說的延遲隊列是指消息進(jìn)入隊列后不會被立即消費,只有達(dá)到指定時間后才能被消費。 但RabbitMq中并 沒有提供延遲隊列功能 。那么RabbitMQ如何實現(xiàn)延遲隊列 通過:死信隊列 + RabbitMQ的TTL特性實現(xiàn)。 實現(xiàn)原理 給一個普通帶有過期功能的隊列綁定一

    2024年02月15日
    瀏覽(20)
  • RabbitMQ——死信隊列

    死信隊列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一種重要特性,用于處理無法被消費的消息,防止消息丟失。 死信的來源 在消息隊列中,當(dāng)消息滿足一定條件而無法被正常消費時,這些消息會被發(fā)送到死信隊列。滿足條件的情況包括但不限于: 消息被拒絕( basic.reject 或 bas

    2024年03月14日
    瀏覽(22)
  • RabbitMQ進(jìn)階——死信隊列

    RabbitMQ進(jìn)階——死信隊列

    在消息隊列中,執(zhí)行異步任務(wù)時,通常是將消息生產(chǎn)者發(fā)布的消息存儲在隊列中,由消費者從隊列中獲取并處理這些消息。但是,在某些情況下,消息可能無法正常地被處理和消耗,例如:格式錯誤、設(shè)備故障等,這些未成功處理的消息就被稱為“死信”。 為了避免這些未成

    2024年04月13日
    瀏覽(25)
  • RabbitMQ: 死信隊列

    RabbitMQ: 死信隊列

    其實就是一個普通的隊列,綁定號私信交換機,不給ttl,給上匹配的路由,等待交換機發(fā)送消息。 1.在消費者里的RabbitMQConfig配置類里,創(chuàng)建隊列,給它加參數(shù) 第四個參數(shù),就是放入這個隊列,的一些屬性參數(shù) 也就是這兩個位置 對應(yīng)Java代碼里好像少個參數(shù),排他性,是指,

    2024年02月09日
    瀏覽(23)
  • RabbitMQ 死信隊列實現(xiàn)

    RabbitMQ 死信隊列實現(xiàn)

    死信隊列使用注解實現(xiàn) 報錯: 可以使用注解的方式來綁定 死信隊列,但是還是會報上面的錯誤,繼續(xù)修改 參數(shù)試試 java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow 但是使用注解綁定的話好像又不生效了,問題原因,tmd將死信參數(shù)綁到交換機上了,c 修改代碼 ?至于問題

    2024年02月02日
    瀏覽(16)
  • rabbitmq的死信隊列

    rabbitmq的死信隊列

    目錄 成為死信的條件? 消息TTL過期? ?隊列達(dá)到最大長度 ?消息被拒 延遲隊列 ?延遲隊列使用場景 ?消息設(shè)置 TTL 隊列設(shè)置 TTL ?兩者區(qū)別 ? producer 將消息投遞到 broker 或者直接到 queue 里了, consumer 從 queue 取出消息 進(jìn)行消費,但某些時候由于特定的 原因?qū)е?queue 中的某些消

    2024年02月12日
    瀏覽(18)
  • RabbitMQ:死信隊列

    RabbitMQ:死信隊列

    ??個人主頁:不斷前進(jìn)的皮卡丘 ??博客描述:夢想也許遙不可及,但重要的是追夢的過程,用博客記錄自己的成長,記錄自己一步一步向上攀登的印記 ??個人專欄:消息中間件 隊列中不能被消費的消息稱為死信隊列 有時候因為特殊原因,可能導(dǎo)致隊列中的某些信息無法被消費

    2024年02月02日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包