国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ常見問題之消息可靠性

這篇具有很好參考價值的文章主要介紹了RabbitMQ常見問題之消息可靠性。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、介紹

MQ的消息可靠性,將從以下四個方面展開并實踐:

  1. 生產(chǎn)者消息確認
  2. 消息持久化
  3. 消費者消息確認
  4. 消費失敗重試機制

二、生產(chǎn)者消息確認

對于publisher,如果message到達exchange與否,rabbitmq提供publiser-comfirm機制,如果message達到exchange但是是否到達queue,rabbitmq提供publisher-return機制。這兩種機制在代碼中都可以通過配置來自定義實現(xiàn)。

以下操作都在publisher服務(wù)方完成。

1. 引入依賴

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

配置說明:
publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:

  • simple:同步等待confirm結(jié)果,直到超時
  • correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback

publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
template.mandatory:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback; false,則直接丟棄消息

2. 配置ReturnCallBack

每個RabbitTemplate只能配置一個ReturnCallBack,所以直接給IoC里面的RabbitTemplate配上,所有人都統(tǒng)一用。
新建配置類,實現(xiàn)ApplicationContextAware 接口,在接口中setReturnCallback。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            //check if is delay message
            if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {
                return;
            }
            log.error("消息發(fā)送到queue失敗,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

3. 配置ConfirmCallBack

ConfirmCallBack在message發(fā)送時配置,每個message都可以有自己的ConfirmCallBack。

@Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        String message = "hello, spring amqp!";
        // confirm callback
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(
                result -> {
                    if (result.isAck()){
                        log.debug("消息到exchange成功, id={}", correlationData.getId());
                    }else {
                        log.error("消息到exchange失敗, id={}", correlationData.getId());
                    }
                },
                throwable -> {
                    log.error("消息發(fā)送失敗", throwable);
                }
        );

        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }

4. 測試

將消息發(fā)送到一個不存在的exchange,模擬消息達到exchange失敗,觸發(fā)ConfirmCallBack,日志如下。

18:22:03:913 ERROR 23232 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aamq.topic' in vhost '/', class-id=60, method-id=40)
18:22:03:915 ERROR 23232 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange失敗, id=0c0910a3-7937-43ea-9606-e5bbcdda0b5c

將消息發(fā)送到一個存在的exchange,但routekey異常,模擬消息到達exchange但沒有到達queue,觸發(fā)ConfirmCallBackReturnCallBack,日志如下。

18:27:22:757  INFO 20184 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7428de63:0/SimpleConnection@6d60899e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 53662]
18:27:22:797 DEBUG 20184 --- [ 127.0.0.1:5672] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange成功, id=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11
18:27:22:796 ERROR 20184 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig         : 消息發(fā)送到queue失敗,replyCode=312, reason=NO_ROUTE, exchange=amq.topic, routeKey=simplee.test, message=(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

三、消息持久化

新版本的SpringAMQP默認開啟持久化。RabbitMQ本身并不默認開啟持久化。

隊列持久化,通過QueueBuilder構(gòu)建持久化隊列,比如

	@Bean
    public Queue simpleQueue(){
        return QueueBuilder
                .durable("simple.queue")
                .build();
    }

消息持久化,在發(fā)送時可以設(shè)置,比如

@Test
public void testDurableMessage(){
    Message message = MessageBuilder.withBody("hello springcloud".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    rabbitTemplate.convertAndSend("simple.queue", message);
}

四、消費者消息確認

消費者消息確認是指,consumer收到消息后會給rabbitmq發(fā)送回執(zhí)來確認消息接收狀況。

SpringAMQP允許配置三種確認模式:

  • manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
  • auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
  • none:關(guān)閉ack, MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # manual auto none

但是auto有個很大的缺陷,因為rabbitmq會自動不斷給有問題的listen反復(fù)投遞消息,導(dǎo)致不斷報錯,所以建議使用下一章的操作。

五、消費失敗重試機制

當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue (重新入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次requeue
,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力。

我們可以利用Springretry機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeuemq隊列。

1. 引入依賴

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 #初識的失敗等待時長為1秒
          multiplier: 2 # 下次失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

2. 配置重試次數(shù)耗盡策略

RabbitMQ常見問題之消息可靠性,Server架構(gòu),# RabbitMQ,rabbitmq,分布式
我們采用RepublishMessageRecoverer
定義用于接收失敗消息的exchange,queue以及它們之間的bindings

然后定義MessageRecoverer,比如

@Component
public class ErrorMessageConfig {
    @Bean
    public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }
}

3. 測試

定義處理異常消息的exchangequeue,比如

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "error.queue"),
            exchange = @Exchange(name = "error.exchange"),
            key = "error"
    ))
    public void listenErrorQueue(String msg){
        log.info("消費者接收到error.queue的消息:【" + msg + "】");
    }

定義如下一個listener,來模擬consumer處理消息失敗觸發(fā)消息重試。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue"),
            exchange = @Exchange(name = "simple.exchange"),
            key = "simple"
    ))
    public void listenSimpleQueue(String msg) {
        log.info("消費者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1/0);
        log.info("consumer handle message success");
    }

寫一個簡單的測試,往simple.exchange發(fā)送消息,比如

    @Test
    public void testSendMessageSimpleQueue() throws InterruptedException {
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
    }

運行測試,consumer得到以下日志

18:51:10:164  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:11:167  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:168  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消費者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:176  WARN 24072 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.exchange' with routing key error
18:51:13:181  INFO 24072 --- [ntContainer#1-1] c.i.mq.listener.SpringRabbitListener     : 消費者接收到error.queue的消息:【hello, spring amqp!】

可以看到spring嘗試2次重發(fā),一共3次,第一次間隔1秒,第二次間隔2秒,重試次數(shù)耗盡,消息被consumer傳入error.exchange,注意,是consumer傳的,不是simple.queue文章來源地址http://www.zghlxwxcb.cn/news/detail-802054.html

到了這里,關(guān)于RabbitMQ常見問題之消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • rabbitmq消息可靠性之消息回調(diào)機制

    rabbitmq消息可靠性之消息回調(diào)機制

    rabbitmq消息可靠性之消息回調(diào)機制 rabbitmq在消息的發(fā)送與接收中,會經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費失敗甚至直接是服務(wù)器宕機等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機制 生產(chǎn)者確認機制 消息持久

    2024年02月08日
    瀏覽(37)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場景 1.2 MQ導(dǎo)致消息丟失 1.3 消費者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機制 2.2 生產(chǎn)者確認機制 2.3 實現(xiàn)生產(chǎn)者確認 2.3.1 配置yml開啟生產(chǎn)者確認 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • RabbitMQ高級篇---消息可靠性

    RabbitMQ高級篇---消息可靠性

    1、消息可靠性: 消息從發(fā)送到消費者接受,會經(jīng)歷多個過程,每個消息傳遞的過程都可能導(dǎo)致消息的丟失: 常見的丟失原因: 發(fā)送時消息丟失原因: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 Rab

    2024年01月20日
    瀏覽(30)
  • RabbitMQ之消息的可靠性傳遞

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加 RabbitMQ之消息的可靠性傳遞 提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內(nèi)容: 在當(dāng)今的信息化時代,消息傳遞在企業(yè)級應(yīng)用和分布式

    2024年01月19日
    瀏覽(22)
  • rabbitmq如何保證消息的可靠性

    RabbitMQ可以通過以下方式來保證消息的可靠性: 在發(fā)布消息時,可以設(shè)置消息的delivery mode為2,這樣消息會被持久化存儲在磁盤上,即使RabbitMQ服務(wù)器重啟,消息也不會丟失。 可以創(chuàng)建持久化的隊列,這樣即使RabbitMQ服務(wù)器重啟,隊列也不會丟失。 在消費者端,可以 設(shè)置手動

    2024年01月23日
    瀏覽(26)
  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項目開發(fā)中經(jīng)常會使用消息隊列來 完成異步處理、應(yīng)用解耦、流量控制等功能 。雖然消息隊列的出現(xiàn)解決了一些場景下的問題,但是同時也引出了一些問題,其中使用消息隊列時如何保證消息的可靠性就是一個常見的問題。 如果在項目中遇到需要保證消息一定被消費的場景

    2024年02月07日
    瀏覽(27)
  • 【RabbitMQ】之消息的可靠性方案

    【RabbitMQ】之消息的可靠性方案

    一、數(shù)據(jù)丟失場景 二、數(shù)據(jù)可靠性方案 1、生產(chǎn)者丟失消息解決方案 2、MQ 隊列丟失消息解決方案 3、消費者丟失消息解決方案 MQ 消息數(shù)據(jù)完整的鏈路為 :從 Producer 發(fā)送消息到 RabbitMQ 服務(wù)器中,再由 Broker 服務(wù)的 Exchange 根據(jù) Routing_Key 路由到指定的 Queue 隊列中,最后投送到消

    2024年02月14日
    瀏覽(24)
  • RabbitMQ 能保證消息可靠性嗎

    RabbitMQ 能保證消息可靠性嗎

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊列選型——為什么選擇RabbitMQ RabbitMQ靈活運用,怎么理解五種消息模型 推或拉? RabbitMQ 消費模式該如何選擇 死信是什么,如何運用RabbitMQ的死信機制? 前面我們在做MQ組件選型時,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包