国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ 能保證消息可靠性嗎

這篇具有很好參考價值的文章主要介紹了RabbitMQ 能保證消息可靠性嗎。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

系列文章目錄

手把手教你,本地RabbitMQ服務搭建(windows)
消息隊列選型——為什么選擇RabbitMQ
RabbitMQ靈活運用,怎么理解五種消息模型
推或拉? RabbitMQ 消費模式該如何選擇
死信是什么,如何運用RabbitMQ的死信機制?



前言

前面我們在做MQ組件選型時,提到了rabbitMQ的消息可靠性,那么它到底可靠到什么程度?又是如何保證消息可靠性的呢?今天我們就一起來看一下


一、消息可靠性的定義

消息可靠性是指在消息傳遞過程中,確保消息能夠被完整、準確、可靠地傳遞到目的地。更具體的說分為兩個角度:

  1. 不會意外丟失
  2. 不會重復傳遞

因此,我們必須保證消息不會因為網(wǎng)絡故障、系統(tǒng)故障或其他異常原因而丟失或重復傳遞,否則可能導致業(yè)務邏輯錯誤、數(shù)據(jù)損壞或系統(tǒng)崩潰等問題

二、幾種不可靠的場景

  1. 消息漏發(fā)送:生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知
  2. 消息重復發(fā)送:如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復
  3. 消息未儲存:rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失
  4. 消費者重復消費:如果消費者和MQ都不記得曾經(jīng)消費過的消息,主動拉取或推送了舊的消息,導致重復消費,

三、防意外丟失

在這里,必須提前聲明一點:即消息意外丟失因為rabbitMQ經(jīng)由轉換機,如果匹配不到任何隊列,是會主動丟棄該消息的,這種丟失屬于業(yè)務配置上的主動丟棄,不記在意外丟失中

1. 消息持久化

消息持久化需要在消息生產(chǎn)者修改代碼

   String MESSAGE = "Hello, RabbitMQ!";
   // 設置消息持久化
   AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .contentType("text/plain")
      .deliveryMode(2) // deliveryMode=1代表不持久化,deliveryMode=2代表持久化
      .build();
   channel.basicPublish("", MESSAGE_QUEUE, properties, MESSAGE.getBytes("UTF-8"));

也可以直接使用內(nèi)置的properties

   channel.basicPublish("", MESSAGE_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes("UTF-8"));

2. 隊列持久化

盡管我們上面已經(jīng)使用了消息持久化,但是這是不夠的,消息本身不會作為一個實體存在硬盤上,真正落在硬盤上的是隊列,及隊列中的消息。所以,要想保存消息,還得把消息所在的隊列持久化,因此需要在聲明隊列時,將其 durable 屬性設置為true

    // 設置隊列持久化
    boolean durable = true;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意,該屬性不可修改,如果要把一個隊列改成持久化,得先刪除,再創(chuàng)建才行


3. 發(fā)布確認

我們上面已經(jīng)成功把消息做了持久化,不過這并不能徹底避免消息丟失,比如在消息發(fā)布者發(fā)布消息的過程中,在消息成功持久化之前,rabbitMQ就崩潰了,此時消息仍然會丟失。因此,有必要執(zhí)行發(fā)布確認的操作

即消息發(fā)送后,MQ要對生產(chǎn)者發(fā)送消息確認,確認已經(jīng)持久化后,再進行發(fā)布確認
RabbitMQ 能保證消息可靠性嗎,RabbitMQ,java架構,面試熱點,java-rabbitmq,rabbitmq,分布式
發(fā)布確認默認不開啟,如果要開啟,需要在channel上設置

    Channel channel = connection.createChannel();
    // 將信道設置為發(fā)布確認
    channel.confirmSelect();

進行完該項設置后,還需要針對確認消息的類型,適當?shù)男薷陌l(fā)送方代碼。一般來說,發(fā)布確認有以下類型

3.1 簡單發(fā)布確認

即發(fā)送后,單條單條的消息是否被rabbitMQ服務器接受

	String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     // 設置簡單發(fā)布確認
    channel.confirmSelect();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
    if (channel.waitForConfirms()) {
         System.out.println("Message published successfully.");
    } else {
         System.err.println("Failed to publish message.");
    }

可以看到,這種方式其實采用的是發(fā)一條消息,確認一次,效率并不高。

3.2 批量發(fā)布確認

批量發(fā)布和簡單發(fā)布,在調用方法上并沒有區(qū)別,只是發(fā)送的消息,從發(fā)一條就等待確認一次,變成了發(fā)一批,才確認一次。

	int MESSAGE_COUNT = 100;
    String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 設置批量發(fā)布確認
    channel.confirmSelect();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }

    int outstandingConfirms = MESSAGE_COUNT;
    while (outstandingConfirms > 0) {
        outstandingConfirms -= channel.waitForConfirms();
    }
    System.out.println("All messages published successfully.");

此種方式,雖然仍然會同步阻塞,但從每條確認一次進化到批量確認一次,大大節(jié)約了網(wǎng)絡耗時。但是可能會出現(xiàn)一些消息發(fā)布成功,但是一些消息未成功的情況,不易進行排查和處理。

3.3 異步發(fā)布確認

異步確認則采用的另一種方案,通過給channel設置一個確認監(jiān)聽器,來異步的做確認,即將發(fā)布消息和確認處理放在不同的線程中處理

   int MESSAGE_COUNT = 100;
   String message = "Hello, RabbitMQ!";
   ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
   Set<Long> failConfirmMessages = new HashSet<>();
   // 異步發(fā)布確認
   channel.confirmSelect();
   // 需設置兩個監(jiān)聽器,前者為肯定確認,后者為否定確認
   channel.addConfirmListener(new ConfirmCallback() {
       @Override
       // deliveryTag 代表 投遞消息的序號;multiple為true,則代表確認所有小于或等于當前消息deliveryTag的狀態(tài),為false,代表僅確認該條消息
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           if (multiple) {
               ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
               // 清除所有小于該序號的消息
               confirmed.clear();
           } else {
           	   // 僅清除本條消息
               outstandingConfirms.remove(deliveryTag);
           }
       }
   }, new ConfirmCallback() {
       @Override
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           System.err.println("Failed to publish message.");
           failConfirmMessages.add(deliveryTag);
       }
   });
   for (int i = 0; i < MESSAGE_COUNT; i++) {
       long nextSeqNo = channel.getNextPublishSeqNo();
       channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
       outstandingConfirms.put(nextSeqNo, message);
   }
   // 一段時間過后
   ......
   // 看最后是否還有消息被確認丟失,此時可選擇是否要重新發(fā)送
   if (failConfirmMessages .size() == 0 && outstandingConfirms.size() == 0) {
   		System.out.println("All messages published successfully.");
   } else {
		System.out.println("Some messages need republish.");
   }
   

RabbitMQ 能保證消息可靠性嗎,RabbitMQ,java架構,面試熱點,java-rabbitmq,rabbitmq,分布式

通過異步方式做確認,能提升性能,缺點是需要一些多線程的知識,實現(xiàn)難度較高。

4. 手動接收確認

如果第三點,是保證消息發(fā)送者到MQ服務器之間,消息不會丟失。那么同理,還需要保證MQ服務器到消費者間,消息不會丟失。

這時候,就需要手動接收確認了,即消費者得到消息后,先進行業(yè)務處理(或消息存儲),直到業(yè)務處理完成后。再告知rabbitMQ服務器,消息我收到了。從而避免了自動ack后,消費者宕機導致的消息未處理完就丟失的問題,其示例代碼如下

 // 創(chuàng)建消費者對象
 final Consumer consumer = new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         String message = new String(body, "UTF-8");

         try {
             // 處理消息
             System.out.println("Received message: " + message);

             // 顯式 ack 消息
             channel.basicAck(envelope.getDeliveryTag(), false); // 第二個參數(shù)表示是否批量處理

         } catch (Exception ex) {
             // 處理消息時發(fā)生異常,拒絕消息并重新將其放回隊列中
             channel.basicNack(envelope.getDeliveryTag(), false, true);
         }
     }
 };

 // 開始消費消息,使用手動ack
 boolean autoAck = false;
 channel.basicConsume(QUEUE_NAME, autoAck, consumer);

PS:需要注意的是,手動ack可能帶來重復消費的問題,比如消息處理成功后,在執(zhí)行channel.basicAck時宕機,導致RabbitMQ服務器沒收到消息接收確認的信號,超時后會認為該消息未被接收

5. 死信隊列

在某些情況下(如手動ACK),如消費者在暫時無法處理該消息,RabbitMQ 可能會將消息重新放回隊列,但大量的重新放回會導致消息堆積,也是不可取的。

// 如下,消費者可以向rabbitMQ發(fā)送nack的消息,且設置requeue參數(shù)為false
 void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

為了避免這種情況,RabbitMQ 提供了死信隊列的功能。當消息因為某些原因不能被消費時,RabbitMQ 將消息放入死信隊列而不是重新放回隊列,防止消息丟失
RabbitMQ 能保證消息可靠性嗎,RabbitMQ,java架構,面試熱點,java-rabbitmq,rabbitmq,分布式

四、防重復傳遞

上面一節(jié),我們?yōu)閞abbitMQ在消息傳遞過程中,各個節(jié)點都有防消息丟失的配置。這一節(jié),我們來說rabbitMQ為了防止一條消息重復傳遞而做的努力

1. 消息確認機制

上面,我們說了發(fā)布確認和接收確認。其實,不管是發(fā)布和接收,這都屬于消息確認機制的一種,而消息確認機制是AMQP協(xié)議所規(guī)定的。發(fā)布確認是為了防止丟失消息,接收確認則是為了防止重復消費,當消費者成功接收到消息并完成處理后,發(fā)送確認通知給 RabbitMQ,RabbitMQ 才會將該消息標記為已消費,防止重復傳遞

2. 冪等性校驗(需代碼實現(xiàn))

在消息生產(chǎn)者發(fā)送消息之前,消息可以被設置上全局唯一uuid,而消費者在消費前,則會判斷該uuid是否已經(jīng)消費過。

// 生產(chǎn)者發(fā)送消息之前,將消息標記為idempotent
// 通過設置 messageId 屬性為一個唯一值,即可標記該消息為冪等消息
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .messageId(messageId)
        .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());

// 消費者在處理消息之前,檢查該消息是否已經(jīng)被消費過
// 如果該消息已經(jīng)被消費過,則直接確認消息
String messageId = properties.getMessageId();
if (processedIds.contains(messageId)) {
    channel.basicAck(envelope.getDeliveryTag(), false);
    return;
}
// 處理消息,并將 messageId 加入已處理集合
// ...
processedIds.add(messageId);

以上代碼僅展示原理,實際上分布式高并發(fā)的情況下,uuid應該交由專門的服務器用雪花算法等方式去產(chǎn)生全局唯一的uuid。同樣消費者處的processedIds也會進行遠端存儲

五、不可靠場景的對策

現(xiàn)在,讓我們回頭來看看不可靠場景下,rabbitMQ和我們開發(fā)者能用什么對策解決

  1. 消息漏發(fā)送:生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知
  2. 消息重復發(fā)送:如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復
  3. 消息未儲存:rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失
  4. 消費者重復消費:如果消費者不記得曾經(jīng)消費過的消息,主動拉取或被推送了舊的消息,導致重復消費,
場景 場景解釋 解決對策
消息漏發(fā)送 生產(chǎn)者在發(fā)送消息時,如果不觀察RabbitMQ服務器的確認消息,可能導致有些消息在網(wǎng)絡中丟失而不自知 發(fā)布確認
消息重復發(fā)送 如果生產(chǎn)者在發(fā)送消息時,由于網(wǎng)絡抖動或者其他原因,生產(chǎn)者無法從RabbitMQ收到消息確認,此時生產(chǎn)者會重發(fā)同樣一條消息,從而導致消息重復 無策略
消息未儲存 rabbitMQ服務器宕機,導致已經(jīng)在rabbit服務器內(nèi)的消息直接丟失 隊列、消息持久化
消費者重復消費 如果消費者和MQ都不記得曾經(jīng)消費過的消息,主動拉取或推送了舊的消息,導致重復消費 接受確認、冪等性校驗(代碼實現(xiàn))

六、總結

RabbitMQ 能保證消息可靠性嗎?答案是絕大部分情況可靠,但僅靠其自身機制無法做到100%。比如對于沒有收到發(fā)布確認信息,導致消息生產(chǎn)者重復傳遞這種場景就并沒有好的辦法,只能通過開發(fā)者額外代碼去解決,比如發(fā)消息帶全局唯一id,然后由消費者去做冪等性校驗。而針對更極端的場景,如RabbitMQ硬盤故障導致消息丟失,就得依托鏡像部署等手段去處理了文章來源地址http://www.zghlxwxcb.cn/news/detail-577543.html

到了這里,關于RabbitMQ 能保證消息可靠性嗎的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項目開發(fā)中經(jīng)常會使用消息隊列來 完成異步處理、應用解耦、流量控制等功能 。雖然消息隊列的出現(xiàn)解決了一些場景下的問題,但是同時也引出了一些問題,其中使用消息隊列時如何保證消息的可靠性就是一個常見的問題。 如果在項目中遇到需要保證消息一定被消費的場景

    2024年02月07日
    瀏覽(27)
  • Rabbitmq怎么保證消息的可靠性?

    Rabbitmq怎么保證消息的可靠性?

    一、消費端消息可靠性保證 : 消息確認(Acknowledgements) : 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費者在處理過程中發(fā)生異?;蛘呶赐瓿?/p>

    2024年04月14日
    瀏覽(25)
  • 【SpringBoot】 整合RabbitMQ 保證消息可靠性傳遞

    【SpringBoot】 整合RabbitMQ 保證消息可靠性傳遞

    生產(chǎn)者端 目錄結構 導入依賴 修改yml 業(yè)務邏輯 測試結果 ??????? 在publisher-confirm-type中有三個確認消息接受類型:none、correlated、simple。 ??????? publisher-confirm-type: none 表示 禁用發(fā)布確認模式 。是 默認值 。使用此模式之后,不管消息有沒有發(fā)送到Broker(RabbitMQ)都不會

    2024年02月10日
    瀏覽(24)
  • 如何保證消息的可靠性(面試題)

    如何保證消息的可靠性(面試題)

    面試題 :Rebbitmq怎么保證消息的可靠性 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費者在處理過程中發(fā)生異?;蛘呶赐瓿商幚砭徒K止運行,那

    2024年04月14日
    瀏覽(26)
  • rabbitmq如何保證消息的可靠性傳輸(簡述版本)?

    我需要從三點去考慮, 生產(chǎn)者弄丟了數(shù)據(jù),生產(chǎn)者將消息發(fā)送的Exchange并且路由到隊列 隊列需要將消息給它持久化 消費者要成功消費隊列中的消息 RabbitMQ提供了confirm機制,保證了消息消息發(fā)送的Exchange交換機,那么還提供了return機制,可以保證消息從exchange路由到隊列中,如

    2024年02月13日
    瀏覽(23)
  • RabbitMQ如何保證消息的可靠性6000字詳解

    RabbitMQ如何保證消息的可靠性6000字詳解

    RabbitMQ通過生產(chǎn)者、消費者以及MQ Broker達到了解耦的特點,實現(xiàn)了異步通訊等一些優(yōu)點,但是在消息的傳遞中引入了MQ Broker必然會帶來一些其他問題,比如如何保證消息在傳輸過程中可靠性(即不讓數(shù)據(jù)丟失,發(fā)送一次消息就會被消費一次)?這篇博客將詳細從生產(chǎn)者,MQ B

    2024年02月16日
    瀏覽(26)
  • RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機制探究

    RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機制探究

    學習RabbitMQ高級特性,涵蓋消息的持久化、確認模式、退回模式以及消費者ACK機制等方面,助您構建高可靠性的消息隊列系統(tǒng)。

    2024年01月16日
    瀏覽(18)
  • RabbitMQ如何保證消息可靠性,看完這篇文章佬會有新的理解

    RabbitMQ如何保證消息可靠性,看完這篇文章佬會有新的理解

    前言:大家好,我是小威,24屆畢業(yè)生,在一家滿意的公司實習。本篇文章將詳細介紹RabbitMQ的消息可靠性機制,如消息丟失,消息重復性消費,消息積壓等問題。 如果文章有什么需要改進的地方還請大佬不吝賜教 ????。 小威在此先感謝各位大佬啦~~???? ??個人主頁:小

    2024年02月03日
    瀏覽(29)
  • 【云原生進階之PaaS中間件】第四章RabbitMQ-4.3-如何保證消息的可靠性投遞與消費

    【云原生進階之PaaS中間件】第四章RabbitMQ-4.3-如何保證消息的可靠性投遞與消費

    ????????根據(jù)RabbitMQ的工作模式,一條消息從生產(chǎn)者發(fā)出,到消費者消費,需要經(jīng)歷以下4個步驟: 生產(chǎn)者將消息發(fā)送給RabbitMQ的Exchange交換機; Exchange交換機根據(jù)Routing key將消息路由到指定的Queue隊列; 消息在Queue中暫存,等待消費者消費消息; 消費者從Queue中取出消息消費

    2024年03月11日
    瀏覽(28)
  • RabbitMQ可靠性消息發(fā)送(java實現(xiàn))

    RabbitMQ可靠性消息發(fā)送(java實現(xiàn))

    本博客屬于 《RabbitMQ基礎組件封裝—整體結構》的子博客 step1:消息落庫,業(yè)務數(shù)據(jù)存庫的同時,也要將消息記錄存入數(shù)據(jù)庫,二者要保證原子性; step2:Producer發(fā)送消息到MQ Broker; step3:Producer收到 broker 返回的確認消息; step4:更改消息記錄庫的狀態(tài)(定義三種狀態(tài):0待確

    2024年02月04日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包