国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rabbitmq的發(fā)布確認(rèn)

這篇具有很好參考價(jià)值的文章主要介紹了rabbitmq的發(fā)布確認(rèn)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式, 所有在該信道上面發(fā)布的
消息都將會(huì)被指派一個(gè)唯一的 ID (從 1 開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker
就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)
列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,broker 回傳
給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 broker 也可以設(shè)置
basic.ack 的 multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。

單個(gè)確認(rèn)發(fā)布 ?

這是一種簡(jiǎn)單的確認(rèn)方式,它是一種 同步確認(rèn)發(fā)布 的方式,也就是發(fā)布一個(gè)消息之后只有它
被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個(gè)方法只有在消息被確認(rèn)
的時(shí)候才返回,如果在指定時(shí)間范圍內(nèi)這個(gè)消息沒(méi)有被確認(rèn)那么它將拋出異常。
這種確認(rèn)方式有一個(gè)最大的缺點(diǎn)就是: 發(fā)布速度特別的慢, 因?yàn)槿绻麤](méi)有確認(rèn)發(fā)布的消息就會(huì)
阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過(guò)數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對(duì)于某
些應(yīng)用程序來(lái)說(shuō)這可能已經(jīng)足夠了。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;

public class publishMessageIndividually {
    private static final int MESSAGE_COUNT = 5;
    

    public static void publishMessageIndividually() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //開(kāi)啟發(fā)布確認(rèn)
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                //服務(wù)端返回 false 或超時(shí)時(shí)間內(nèi)未返回,生產(chǎn)者可以消息重發(fā)
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息發(fā)送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí)" + (end - begin) +
                    "ms");
        }
    }
}

耗時(shí)

rabbitmq的發(fā)布確認(rèn),rabbitmq,java-rabbitmq,rabbitmq,java

?批量確認(rèn)發(fā)布

上面那種方式非常慢,與單個(gè)等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地
提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問(wèn)題時(shí),不知道是哪個(gè)消息出現(xiàn)
問(wèn)題了,我們必須將整個(gè)批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種
方案仍然是同步的,也一樣阻塞消息的發(fā)布。


import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;




public class publishMessageBatch {
    private static final int MESSAGE_COUNT = 5;

    public static void publishMessageBatch() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //開(kāi)啟發(fā)布確認(rèn)
            channel.confirmSelect();
            //批量確認(rèn)消息大小
            int batchSize = 100;
            //未確認(rèn)消息個(gè)數(shù)
            int outstandingMessageCount = 0;
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize) {
                    channel.waitForConfirms();
                    outstandingMessageCount = 0;
                }
            }
            //為了確保還有剩余沒(méi)有確認(rèn)消息 再次確認(rèn)
            if (outstandingMessageCount > 0) {
                channel.waitForConfirms();
            }
            long end = System.currentTimeMillis();
            System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)批量確認(rèn)消息,耗時(shí)" + (end - begin) +
                    "ms");
        }
    }

    public static void main(String[] args) throws Exception {
        publishMessageBatch.publishMessageBatch();
    }
}

?耗時(shí)

rabbitmq的發(fā)布確認(rèn),rabbitmq,java-rabbitmq,rabbitmq,java

?異步確認(rèn)發(fā)布

異步確認(rèn)雖然編程邏輯比上兩個(gè)要復(fù)雜,但是性價(jià)比最高,無(wú)論是可靠性還是效率都沒(méi)得說(shuō),
他是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功,
下面就讓我們來(lái)詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。


import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class publishMessageAsync {
    private static final int MESSAGE_COUNT = 5;

    public static void publishMessageAsync() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //開(kāi)啟發(fā)布確認(rèn)
            channel.confirmSelect();
            /**
             * 線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
             * 1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián)
             * 2.輕松批量刪除條目 只要給到序列號(hào)
             * 3.支持并發(fā)訪問(wèn)
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                    ConcurrentSkipListMap<>();
            /**
             * 確認(rèn)收到消息的一個(gè)回調(diào)
             * 1.消息序列號(hào)
             * 2.true 可以確認(rèn)小于等于當(dāng)前序列號(hào)的消息
             * false 確認(rèn)當(dāng)前序列號(hào)消息
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息集合 是一個(gè) map
                    ConcurrentNavigableMap<Long, String> confirmed =
                            outstandingConfirms.headMap(sequenceNumber, true);
                    //清除該部分未確認(rèn)消息集合
                    confirmed.clear();
                }else{
                    //只清除當(dāng)前序列號(hào)的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("發(fā)布的消息"+message+"未被確認(rèn),序列號(hào)"+sequenceNumber);
            };
            /**
             * 添加一個(gè)異步確認(rèn)的監(jiān)聽(tīng)器
             * 1.確認(rèn)收到消息的回調(diào)
             * 2.未收到消息的回調(diào)
             */
            channel.addConfirmListener(ackCallback, nackCallback);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                /**
                 * channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
                 * 通過(guò)序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián)
                 * 全部都是未確認(rèn)的消息體
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) +
                    "ms");
        }
    }

    public static void main(String[] args) throws Exception {
        publishMessageAsync.publishMessageAsync();
    }
}

耗時(shí)

rabbitmq的發(fā)布確認(rèn),rabbitmq,java-rabbitmq,rabbitmq,java

?以上 3 種發(fā)布確認(rèn)速度對(duì)比

單獨(dú)發(fā)布消息

同步等待確認(rèn),簡(jiǎn)單,但吞吐量非常有限。

批量發(fā)布消息

批量同步等待確認(rèn),簡(jiǎn)單,合理的吞吐量,一旦出現(xiàn)問(wèn)題但很難推斷出是那條

消息出現(xiàn)了問(wèn)題。

異步處理: 最佳性能和資源使用,在出現(xiàn)錯(cuò)誤的情況下可以很好地控制,但是實(shí)現(xiàn)起來(lái)稍微難些

rabbitmq的發(fā)布確認(rèn),rabbitmq,java-rabbitmq,rabbitmq,java文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-663076.html

到了這里,關(guān)于rabbitmq的發(fā)布確認(rèn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【RabbitMQ教程】第七章 —— RabbitMQ - 發(fā)布確認(rèn)高級(jí)

    【RabbitMQ教程】第七章 —— RabbitMQ - 發(fā)布確認(rèn)高級(jí)

    ?????????????????????????????????????????????????????????????????? ?? 【 R a b b i t M Q 教程】第七章—— R a b b i t M Q ? 發(fā)布確認(rèn)高級(jí) color{#FF1493}{【RabbitMQ教程】第七章 —— RabbitMQ - 發(fā)布確認(rèn)高級(jí)} 【 R abbi tMQ 教程】第七章 —— R abbi tMQ ?

    2024年02月09日
    瀏覽(17)
  • springboot整合rabbitmq發(fā)布確認(rèn)高級(jí)

    springboot整合rabbitmq發(fā)布確認(rèn)高級(jí)

    在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq 重啟,在 RabbitMQ 重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動(dòng)處理和恢復(fù)。于是,我們?nèi)绾尾拍苓M(jìn)行 RabbitMQ 的消息可靠投遞。 ? 在配置文件當(dāng)中添加 ? spring.rabbitmq.publisher-confirm-type=correlated ? NONE:禁用發(fā)布確認(rèn)模式

    2024年02月11日
    瀏覽(17)
  • 8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制

    8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制

    在 RabbitMQ之生產(chǎn)者發(fā)布確認(rèn)原理章節(jié)已經(jīng)介紹了rabbitmq生產(chǎn)者是如何對(duì)消息進(jìn)行發(fā)布確認(rèn)保證消息不丟失的。本章節(jié)繼續(xù)看下springboot整合rabbitmq后是如何保證消息不丟失的。 消息正常是通過(guò)生產(chǎn)者生產(chǎn)消息傳遞到交換機(jī),然后經(jīng)過(guò)交換機(jī)路由到消息隊(duì)列中,最后消費(fèi)者消費(fèi),

    2023年04月25日
    瀏覽(20)
  • RabbitMq-發(fā)布確認(rèn)高級(jí)(避坑指南版)

    RabbitMq-發(fā)布確認(rèn)高級(jí)(避坑指南版)

    在初學(xué)rabbitMq的時(shí)候,伙伴們肯定已經(jīng)接觸到了“發(fā)布確認(rèn)”的概念,但是到了后期學(xué)習(xí)中,會(huì)接觸到“springboot”中使用“發(fā)布確認(rèn)”高級(jí)的概念。后者主要是解決什么問(wèn)題呢?或者是什么樣的場(chǎng)景引出這樣的概念呢? 廢話不說(shuō)直接開(kāi)始擼代碼!??!在代碼中解決實(shí)際問(wèn)題

    2024年02月11日
    瀏覽(17)
  • RabbitMQ系列(10)--RabbitMQ發(fā)布確認(rèn)模式的概念及實(shí)現(xiàn)

    RabbitMQ系列(10)--RabbitMQ發(fā)布確認(rèn)模式的概念及實(shí)現(xiàn)

    概念:雖然我們可以設(shè)置隊(duì)列和隊(duì)列中的消息持久化,但任然存在消息在持久化的過(guò)程中,即在寫(xiě)入磁盤(pán)的過(guò)程中,消息未完全寫(xiě)入,然后服務(wù)器宕機(jī)導(dǎo)致消息丟失的情況,發(fā)布確認(rèn)就是為了解決這種情況的概念,在消息完全寫(xiě)入磁盤(pán)后才確認(rèn)消息完全持久化了 1、發(fā)布確認(rèn)

    2024年02月13日
    瀏覽(24)
  • RabbitMQ:概念和安裝,簡(jiǎn)單模式,工作,發(fā)布確認(rèn),交換機(jī),死信隊(duì)列,延遲隊(duì)列,發(fā)布確認(rèn)高級(jí),其它知識(shí),集群

    RabbitMQ:概念和安裝,簡(jiǎn)單模式,工作,發(fā)布確認(rèn),交換機(jī),死信隊(duì)列,延遲隊(duì)列,發(fā)布確認(rèn)高級(jí),其它知識(shí),集群

    1.1.1.什么是MQ MQ(message queue:消息隊(duì)列) ,從字面意思上看,本質(zhì)是個(gè) 隊(duì)列 , FIFO 先入先出 ,只不過(guò)隊(duì)列中存放的 內(nèi)容是message 而已 ,還是一種 跨進(jìn)程的通信機(jī)制 , 用于上下游傳遞消息 。在互聯(lián)網(wǎng)架構(gòu)中,MQ 是一種非常常見(jiàn)的上下游 “邏輯解耦+物理解耦” 的消息通信服

    2024年01月20日
    瀏覽(92)
  • RabbitMQ學(xué)習(xí)——發(fā)布訂閱/fanout模式 & topic模式 & rabbitmq回調(diào)確認(rèn) & 延遲隊(duì)列(死信)設(shè)計(jì)

    RabbitMQ學(xué)習(xí)——發(fā)布訂閱/fanout模式 & topic模式 & rabbitmq回調(diào)確認(rèn) & 延遲隊(duì)列(死信)設(shè)計(jì)

    1.rabbitmq隊(duì)列方式的梳理,點(diǎn)對(duì)點(diǎn),一對(duì)多; 2.發(fā)布訂閱模式,交換機(jī)到消費(fèi)者,以郵箱和手機(jī)驗(yàn)證碼為例; 3.topic模式,根據(jù)規(guī)則決定發(fā)送給哪個(gè)隊(duì)列; 4.rabbitmq回調(diào)確認(rèn),setConfirmCallback和setReturnsCallback; 5.死信隊(duì)列,延遲隊(duì)列,創(chuàng)建方法,正?!佬?,設(shè)置延遲時(shí)間; 點(diǎn)對(duì)

    2024年02月13日
    瀏覽(92)
  • RabbitMQ基礎(chǔ)(2)——發(fā)布訂閱/fanout模式 & topic模式 & rabbitmq回調(diào)確認(rèn) & 延遲隊(duì)列(死信)設(shè)計(jì)

    RabbitMQ基礎(chǔ)(2)——發(fā)布訂閱/fanout模式 & topic模式 & rabbitmq回調(diào)確認(rèn) & 延遲隊(duì)列(死信)設(shè)計(jì)

    1.rabbitmq隊(duì)列方式的梳理,點(diǎn)對(duì)點(diǎn),一對(duì)多; 2.發(fā)布訂閱模式,交換機(jī)到消費(fèi)者,以郵箱和手機(jī)驗(yàn)證碼為例; 3.topic模式,根據(jù)規(guī)則決定發(fā)送給哪個(gè)隊(duì)列; 4.rabbitmq回調(diào)確認(rèn),setConfirmCallback和setReturnsCallback; 5.死信隊(duì)列,延遲隊(duì)列,創(chuàng)建方法,正?!佬?,設(shè)置延遲時(shí)間; 點(diǎn)對(duì)

    2024年02月10日
    瀏覽(98)
  • (七)「消息隊(duì)列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    (七)「消息隊(duì)列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    發(fā)布者確認(rèn) 是一個(gè) RabbitMQ 擴(kuò)展,用于實(shí)現(xiàn)可靠的發(fā)布。當(dāng)在通道上啟用發(fā)布者確認(rèn)時(shí),客戶端發(fā)布的消息將由代理 異步確認(rèn) ,這意味著它們已在服務(wù)器端得到處理。 先決條件 本教程假設(shè) RabbitMQ 已安裝并且正在 本地主機(jī) 的標(biāo)準(zhǔn)端口( 5672 )上運(yùn)行。如果您使用了不同的主

    2024年02月16日
    瀏覽(19)
  • springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動(dòng)返回ack,設(shè)置備用隊(duì)列,以及面試題:rabbitmq確保消息不丟失

    springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動(dòng)返回ack,設(shè)置備用隊(duì)列,以及面試題:rabbitmq確保消息不丟失

    目錄 1.生產(chǎn)者發(fā)消息到交換機(jī)時(shí)候的消息確認(rèn) 2.交換機(jī)給隊(duì)列發(fā)消息時(shí)候的消息確認(rèn) 3.備用隊(duì)列 3.消費(fèi)者手動(dòng)ack ? rabbitmq的發(fā)布確認(rèn)方式,可以有效的保證我們的數(shù)據(jù)不丟失。 ? 消息正常發(fā)送的流程是:生產(chǎn)者發(fā)送消息到交換機(jī),然后交換機(jī)通過(guò)路由鍵把消息發(fā)送給對(duì)應(yīng)的隊(duì)

    2024年02月09日
    瀏覽(28)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包