国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ如何保證消息的可靠性6000字詳解

這篇具有很好參考價值的文章主要介紹了RabbitMQ如何保證消息的可靠性6000字詳解。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RabbitMQ通過生產(chǎn)者、消費者以及MQ Broker達到了解耦的特點,實現(xiàn)了異步通訊等一些優(yōu)點,但是在消息的傳遞中引入了MQ Broker必然會帶來一些其他問題,比如如何保證消息在傳輸過程中可靠性(即不讓數(shù)據(jù)丟失,發(fā)送一次消息就會被消費一次)?這篇博客將詳細從生產(chǎn)者,MQ Broker以及消費者的角度講解如何保證消息的可靠性!

1,消息丟失的情況

1.1 消息傳遞流程圖如下

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

?Producer -> exchange ->queue -> Consumer(其中exchange和queue屬于MQ Broker的組件)

1.2 消息可能丟失的情況

  • 生產(chǎn)者給交換機exchange的過程中發(fā)生數(shù)據(jù)丟失;
  • 交換機exchange路由給隊列queue的過程中發(fā)生數(shù)據(jù)丟失;
  • 消息到達MQ的一瞬間,MQ發(fā)生了宕機的情況造成數(shù)據(jù)丟失;
  • 消費者從隊列queue中取出消息進行消費的一瞬間消費者宕機了造成數(shù)據(jù)丟失。

2,生產(chǎn)者確認機制

生產(chǎn)者確認機制主要是站在生產(chǎn)者的角度來保證消息的可靠性,針對的是生產(chǎn)者給交換機發(fā)送消息以及交換機給隊列發(fā)送消息的過程中數(shù)據(jù)丟失的情況!

2.1 書寫配置信息

# 配置日志信息
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug

spring:
  rabbitmq:
    host: 123.207.72.43 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 123
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    #消息發(fā)送失敗時執(zhí)行returnCallback回調(diào)函數(shù)
    template:
      mandatory: true
  • publisher-confirm-type表示開啟publisher-confirm;這個參數(shù)有兩種類型,分別是correlated和simple(correlated代表異步等待回調(diào),類似于js中發(fā)送的ajax請求的回調(diào)函數(shù),MQ返回結(jié)果時會執(zhí)行定義的confirmCallback函數(shù);simple代表同步等待confirm結(jié)果直到超時);
  • publisher-returns表示開啟publish-return功能,同樣是基于callback機制,不過是定義returnCallback;
  • template.mandatory定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息。

2.2 定義return回調(diào)機制

我們使用的是SpringBoot來整合的RabbitMQ,所以不論是return回調(diào)還是confim回調(diào)都是用rabbittemplate對象進行定義的。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //獲取獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //記錄日志
            log.error("消息發(fā)送隊列失敗,響應(yīng)碼:{},失敗原因:{},交換機:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果需要的話進行消息的重發(fā)
        });
    }
}

注意:

  1. 一個RabbitTemplate只能配置一個ReturnCallback,所以需要在項目啟動的時候進行定義,這樣rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法進行定義);
  2. ApplicationContextAware是Spring創(chuàng)建完Bean工廠之后的通知方法,當Spring創(chuàng)建完Bean工廠之后就可以在Spring容器中拿到RabbitTemplate對象了;
  3. 配置ReturnCallback時可以采用匿名內(nèi)部類的方法簡化代碼,如果消息發(fā)送失敗可以根據(jù)需要進行消息重發(fā)操作。

2.3?定義confirm回調(diào)機制

ConfirmCallback可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同,可以通過測試方法進行定義。

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() throws InterruptedException {
        //1.準備消息
        String message = "hello spring amqp";

        //2.準備CorrelationData
        //2.1 消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2 準備準備ConfirmCallback
        correlationData.getFuture().addCallback(confirm -> {
            if (confirm.isAck()) {
                log.debug("消息成功投遞到交換機!消息ID:{}", correlationData.getId());
            } else {
                log.error("消息投遞到交換機上失??!消息ID:{}", correlationData.getId());
                //重發(fā)消息
            }
        }, throwable -> {
            //記錄日志
            log.error("發(fā)送消息失敗!",throwable);
            //重發(fā)消息
        });

        //3.發(fā)送消息
        rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);
        //加上休眠時間 避免mq連接直接關(guān)閉
        Thread.sleep(1000);
    }

注意:

  1. 生產(chǎn)者給交換機發(fā)送的消息數(shù)據(jù)很多的,為了區(qū)分每個消息的歸屬,每個消息都要附屬上一個ID信息,可以采用UUID的方式生成唯一身份標識;
  2. 在發(fā)送消息的時候需要增加一個correlation變量,這個變量記錄了兩個東西(1.每個消息的ID 2.定義的cinfirm回調(diào)機制);
  3. 加上線程休眠的操作是為了避免消息發(fā)送到交換機之后mq的連接直接關(guān)閉,這樣會導(dǎo)致返回ack的錯誤。

3,消息持久化

消息持久化是站在MQ Broker的角度來保證消息的可靠性的,將交換機、隊列以及消息設(shè)置成持久化的從而避免MQ宕機造成消息的丟失!

3.1 交換機持久化

@Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct",true,false);
    }

第二個參數(shù)設(shè)置成true就是讓就交換機是可持久化的,第三個參數(shù)是是否自動刪除,一般設(shè)為false;

3.2 隊列持久化

@Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }

durable的意思就是可持久化的,傳入隊列名稱然后進行build操作,這樣創(chuàng)建的隊列就是一個可持久化的隊列;

3.3 消息持久化

將交換機和隊列設(shè)置為持久化的之后重啟MQ服務(wù)器之后消息依然會丟失,因為發(fā)送的消息不是可持久化的,所以也需要將消息設(shè)置成可持久化的

4,消費者消息確認

消費者消息確認是站在消費者的角度來保證消息可靠性的,消息者處理完一條消息之后需要給MQ Broker返回一條ACK表示消息處理完成!

4.1 三種確認模式

RabbitMQ支持消費者確認機制,即:消費者處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息。而SpringAMQP則允許配置三種確認模式:

  • manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack;
  • auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack;
  • none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除。

4.2 none模式的演示

1.修改消費者工程中的配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 關(guān)閉ack
2.監(jiān)聽一個隊列,在監(jiān)聽的方法中模擬一個異常情況,觀察消息是否會被刪除
@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
        //這里模擬一個異常
        System.out.println(1 / 0);
        log.info("消費者處理消息成功!");
    }
3.在rabbitmq控制臺模擬發(fā)送一條消息,觀察拋出異常之后消息是否會重發(fā)

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

?拋出異常消費者并沒有處理消息成功,再觀察控制臺是否將消息刪除:

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

?隊列中已經(jīng)沒有消息了,說明消息被刪除了!

消費者確認機制為none的時候,只要消費者拿到消息之后MQ就會把消息刪除,不關(guān)心消費者是否將消息成功處理!

4.3 auto模式的演示

1.修改消費者工程中的配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 關(guān)閉ack
?2.監(jiān)聽一個隊列,在監(jiān)聽的方法中模擬一個異常情況,觀察消息是否會被刪除
@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
        //這里模擬一個異常
        System.out.println(1 / 0);
        log.info("消費者處理消息成功!");
    }
3.在rabbitmq控制臺模擬發(fā)送一條消息,觀察拋出異常之后消息是否會重發(fā)

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

消費者確認機制為auto的時候,消費者拿到消息之后MQ并不會立刻刪除隊列中的消息,只有消費者成功處理完消息之后給隊列返回一個ack的時候隊列才會刪除消息!

5, 消費者失敗重試機制

我們發(fā)現(xiàn)當消費者確認機制為auto時,如果代碼中出現(xiàn)了異常,消息會進行重復(fù)入隊列(requeue)的操作,重復(fù)入隊的操作對于MQ來說開銷會非常大,消息處理飆升,所以引入了失敗重試機制:當代碼中出現(xiàn)了異常的時候,消費者內(nèi)部會進行重發(fā)的操作(可以控制重發(fā)的時間和次數(shù)),如果超過設(shè)置的重發(fā)次數(shù)消費者還未成功處理消息默認將消息丟棄!

5.1 本地重試

Spring的retry機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列,可以在消費者工程的yml文件中添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初識的失敗等待時長為1秒
          multiplier: 3 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
          max-attempts: 4 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java

?4次重發(fā)之后消息還未成功處理spring拋出了AmqpRejectAndDontRequeueException異常,這是失敗之后的默認處理方式,默認消費者給隊列返回了ack,此時隊列會將消息從隊列中刪除!

5.2 失敗策略

失敗達到最大重試次數(shù)后,消息會被丟棄,這是由Spring內(nèi)部機制決定的。在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息,默認就是這種方式;
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊;
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機。

如果消息這個消息比較重要,達到最大重試次數(shù)之后這個消息不能被丟棄該怎么辦,此時就可以使用RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。

@Configuration
public class ErrorMessageConfig {
    //定義失敗之后處理的交換機和隊列
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    //將交換機和隊列進行綁定
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }


    //定義一個RepublishMessageRecoverer,替換spring默認的處理機制?
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

流程圖如下:

RabbitMQ如何保證消息的可靠性6000字詳解,RabbitMQ,java-rabbitmq,rabbitmq,java文章來源地址http://www.zghlxwxcb.cn/news/detail-577678.html

?6, 如何保證RabbitMQ消息的可靠性?

  • 開啟生產(chǎn)者確認機制,確保生產(chǎn)者的消息能到達隊列;
  • 開啟持久化功能,確保消息未消費前在隊列中不會丟失;
  • 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack;
  • 開啟消費者失敗重試機制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理。

到了這里,關(guān)于RabbitMQ如何保證消息的可靠性6000字詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • RabbitMQ如何保證消息可靠性,看完這篇文章佬會有新的理解

    RabbitMQ如何保證消息可靠性,看完這篇文章佬會有新的理解

    前言:大家好,我是小威,24屆畢業(yè)生,在一家滿意的公司實習。本篇文章將詳細介紹RabbitMQ的消息可靠性機制,如消息丟失,消息重復(fù)性消費,消息積壓等問題。 如果文章有什么需要改進的地方還請大佬不吝賜教 ????。 小威在此先感謝各位大佬啦~~???? ??個人主頁:小

    2024年02月03日
    瀏覽(29)
  • RabbitMQ 能保證消息可靠性嗎

    RabbitMQ 能保證消息可靠性嗎

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊列選型——為什么選擇RabbitMQ RabbitMQ靈活運用,怎么理解五種消息模型 推或拉? RabbitMQ 消費模式該如何選擇 死信是什么,如何運用RabbitMQ的死信機制? 前面我們在做MQ組件選型時,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    瀏覽(29)
  • Rabbitmq怎么保證消息的可靠性?

    Rabbitmq怎么保證消息的可靠性?

    一、消費端消息可靠性保證 : 消息確認(Acknowledgements) : 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費者在處理過程中發(fā)生異?;蛘呶赐瓿?/p>

    2024年04月14日
    瀏覽(24)
  • 【SpringBoot】 整合RabbitMQ 保證消息可靠性傳遞

    【SpringBoot】 整合RabbitMQ 保證消息可靠性傳遞

    生產(chǎn)者端 目錄結(jié)構(gòu) 導(dǎo)入依賴 修改yml 業(yè)務(wù)邏輯 測試結(jié)果 ??????? 在publisher-confirm-type中有三個確認消息接受類型:none、correlated、simple。 ??????? publisher-confirm-type: none 表示 禁用發(fā)布確認模式 。是 默認值 。使用此模式之后,不管消息有沒有發(fā)送到Broker(RabbitMQ)都不會

    2024年02月10日
    瀏覽(24)
  • 【云原生進階之PaaS中間件】第四章RabbitMQ-4.3-如何保證消息的可靠性投遞與消費

    【云原生進階之PaaS中間件】第四章RabbitMQ-4.3-如何保證消息的可靠性投遞與消費

    ????????根據(jù)RabbitMQ的工作模式,一條消息從生產(chǎn)者發(fā)出,到消費者消費,需要經(jīng)歷以下4個步驟: 生產(chǎn)者將消息發(fā)送給RabbitMQ的Exchange交換機; Exchange交換機根據(jù)Routing key將消息路由到指定的Queue隊列; 消息在Queue中暫存,等待消費者消費消息; 消費者從Queue中取出消息消費

    2024年03月11日
    瀏覽(28)
  • [rocketmq] 如何保證消息可靠性

    1、生產(chǎn)者發(fā)送消息到Broker時; 2、Broker內(nèi)部存儲消息到磁盤以及主從復(fù)制同步時; 3、Broker把消息推送給消費者或者消費者主動拉取消息時; 1.重試策略,發(fā)送消息失敗后會進行一定的重試策略 重試機制:固定重試次數(shù),同步刷盤會切換 broker 重試,異步刷盤會在同一 broker

    2024年02月11日
    瀏覽(30)
  • 如何保證消息的可靠性(面試題)

    如何保證消息的可靠性(面試題)

    面試題 :Rebbitmq怎么保證消息的可靠性 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費者在處理過程中發(fā)生異?;蛘呶赐瓿商幚砭徒K止運行,那

    2024年04月14日
    瀏覽(26)
  • RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機制探究

    RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機制探究

    學(xué)習RabbitMQ高級特性,涵蓋消息的持久化、確認模式、退回模式以及消費者ACK機制等方面,助您構(gòu)建高可靠性的消息隊列系統(tǒng)。

    2024年01月16日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包