国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ消息可靠性問題及解決

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ消息可靠性問題及解決。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

說明:在RabbitMQ消息傳遞過程中,有以下問題:

  • 消息沒發(fā)到交換機(jī)

  • 消息沒發(fā)到隊(duì)列

  • MQ宕機(jī),消息在隊(duì)列中丟失

  • 消息者接收到消息后,未能正常消費(fèi)(程序報(bào)錯(cuò)),此時(shí)消息已在隊(duì)列中移除

針對(duì)以上問題,提供以下解決方案:

  • 消息確認(rèn):確認(rèn)消息是否發(fā)送到交換機(jī)、隊(duì)列;

  • 消息持久化:持久化消息,以防MQ宕機(jī)造成消息丟失;

  • 消費(fèi)者消息確認(rèn):確認(rèn)消費(fèi)者已正確消費(fèi)消息,才把消息從隊(duì)列中刪除;

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

消息確認(rèn)

可以使用Rabbit MQ提供的publisher confirm機(jī)制來避免消息發(fā)送到MQ過程丟失。具體實(shí)現(xiàn)是,publisher-confirm(發(fā)送者確定)、publisher-return(發(fā)送者回執(zhí)),前者判斷消息到交換機(jī)、后者判斷交換機(jī)到隊(duì)列


publisher-confirm(發(fā)送者確定)

  • 消息成功投遞到交換機(jī),返回ack;

  • 消息未投遞到交換機(jī),返回nack;

publisher-return(發(fā)送者回執(zhí))

  • 消息投遞到交換機(jī),但沒有到隊(duì)列,返回ack,即失敗原因;

在生產(chǎn)者端添加配置

spring:
  rabbitmq:
    # rabbitMQ相關(guān)配置
    host: 118.178.228.175
    port: 5672
    username: root
    password: 123456
    virtual-host: /

    # 開啟生產(chǎn)者確認(rèn),correlated為異步,simple為同步
    publisher-confirm-type: correlated

    # 開啟publish-return功能,基于callback機(jī)制
    publisher-returns: true

    # 開啟消息路由失敗的策略,true是調(diào)用returnCallback方法,false是丟棄消息
    template:
      mandatory: true

publisher-return(發(fā)送者回執(zhí))代碼

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * 發(fā)送者回執(zhí)實(shí)現(xiàn)
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對(duì)象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 設(shè)置ReturnCallback
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 回執(zhí)信息
             * @param message 信息對(duì)象
             * @param replyCode 回執(zhí)碼
             * @param replyText 回執(zhí)內(nèi)容
             * @param exchange 交換機(jī)
             * @param routingKey 路由鍵值
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息發(fā)送隊(duì)列失敗=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);
            }
        });
    }
}

publisher-confirm(發(fā)送者確定)代碼

    @Test
    public void sendExceptionMessage() {
        // 路由鍵值
        String routingKey = "exception";

        // 消息
        String message = "This is a exception message";

        // 給消息設(shè)置一個(gè)唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 編寫confirmCallBack回調(diào)函數(shù)
        correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
                if (confirm.isAck()) {
                    // 消息發(fā)送交換機(jī)成功
                    log.debug("消息送達(dá)至交換機(jī)成功");
                } else {
                    // 消息發(fā)送交換機(jī)失敗,打印消息
                    log.error("消息未能送達(dá)至交換機(jī),ID{},原因{}", correlationData.getId(), confirm.getReason());
                }
            }
        }, new FailureCallback() {
            // 消息發(fā)送交換機(jī)異常
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息發(fā)送交換機(jī)異常,ID:{},原因{}", correlationData.getId(), ex.getMessage());
            }
        });

        rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);
    }

測(cè)試,設(shè)置一個(gè)不存在的routingKey,被發(fā)送者確認(rèn)(publisher-confirm)捕獲到;

// 路由鍵值
String routingKey = "null";

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

設(shè)置一個(gè)不存在的路由,被發(fā)送者回執(zhí)(publisher-return)捕獲到;

rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

消息持久化

消息持久化,是指把消息保存到磁盤中,在RabbitMQ宕機(jī)或者關(guān)機(jī)時(shí),重啟后,消息仍可以保存下來。消息依賴于交換機(jī)、隊(duì)列,因此持久化消息,同時(shí)也需要持久化交換機(jī)、隊(duì)列。

創(chuàng)建一個(gè)持久化的交換機(jī)、隊(duì)列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息持久化
 */
@Configuration
public class DurableConfig {

    /**
     * 交換機(jī)持久化
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        // 三個(gè)參數(shù)分別是:交換機(jī)名、是否持久化、沒有隊(duì)列與之綁定時(shí)是否自動(dòng)刪除
        return new DirectExchange("durable.direct",true,false);
    }

    /**
     * 隊(duì)列持久化
     * @return
     */
    @Bean
    public Queue durableQueue(){
        return QueueBuilder.durable("durable.queue").build();
    }

    /**
     * 交換機(jī)與隊(duì)列綁定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");
    }

}

發(fā)送一個(gè)持久化的消息

    /**
     * 發(fā)送持久化消息
     */
    @Test
    public void sendDurableMessage() {
        String routingKey = "durable";

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))
                // 設(shè)置該消息未持久化消息
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

        rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);
    }

打開RabbitMQ管理平臺(tái),可以看到"delivery_mode: 2",表示該消息是持久化消息

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

(源碼:MessageDeliveryMode類)
RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

實(shí)際上,交換機(jī)、隊(duì)列默認(rèn)就是持久化的(durable: true),所以不用特意設(shè)置;

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

消費(fèi)者消息確認(rèn)

介紹

消費(fèi)者消息確認(rèn),是為了確保消費(fèi)者已經(jīng)消費(fèi)了消息,才讓MQ把該消息刪除;

可通過在消費(fèi)者的配置文件中增加下面這行配置實(shí)現(xiàn),備選項(xiàng)有以下三個(gè):

  • none:關(guān)閉ack,表示不做處理,消息發(fā)給消費(fèi)者之后就立即被刪除;

  • auto:自動(dòng)ack,表示由Spring檢測(cè)代碼是否出現(xiàn)異常,出現(xiàn)異常則保留消息,沒有異常則刪除消息;

  • manual:手動(dòng)ack,可根據(jù)業(yè)務(wù)手動(dòng)編寫代碼,返回ack;

spring:
  rabbitmq:
    listener:
      simple:
      	# 設(shè)置消息確認(rèn)模式
        acknowledge-mode: none

測(cè)試:none

可編寫代碼測(cè)試,下面是生產(chǎn)者代碼,發(fā)送消息

    /**
     * 發(fā)送普通消息
     */
    @Test
    public void sendNoneMessage() {
        String directName = "none.direct";

        String routingKey = "none";

        String message = "This is a test message";

        rabbitTemplate.convertAndSend(directName, routingKey, message);
    }

消費(fèi)者代碼有問題,未能正常消費(fèi)消息

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "none.queue"),
            exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),
            key = {"none"}
    ))
    public void getNoneMessage(String normalMessage){
        System.out.println(1/0);
        System.out.println("normalMessage = " + normalMessage);
    }

測(cè)試結(jié)果,程序報(bào)錯(cuò),消息也沒能保留下來

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java
RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

測(cè)試:auto

更改設(shè)置為:auto,重試

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

但是消息未被刪除

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

這種情況,在實(shí)際開發(fā)中是不能允許,可以通過更改消費(fèi)失敗的重試機(jī)制解決。

消費(fèi)失敗重試機(jī)制

方法一:設(shè)置retry

因?yàn)橄⒈幌M(fèi)失敗,消息會(huì)一直循環(huán)重試,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力,這種情況可以通過在消費(fèi)者端添加以下配置,限制失敗重試的條件來解決:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 開啟消費(fèi)者失敗重試
          enabled: true
          # 初次失敗等待時(shí)長(zhǎng)為1秒
          initial-interval: 1000
          # 失敗的等待時(shí)長(zhǎng)倍數(shù),即后一次等待的時(shí)間是前一次等待時(shí)間的多少倍
          multiplier: 1
          # 最多重試次數(shù)
          max-attempts: 3
          # true 無狀態(tài) false 有狀態(tài) 如果業(yè)務(wù)中包含事務(wù) 改為false
          stateless: true

開啟后,控制臺(tái)可以發(fā)現(xiàn),信息不回一直循環(huán)打印,而是打印數(shù)條后停止,日志信息中有提示“Retry Policy Exhausted”(重試策略已用盡)

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java
這種通過配置的方式,并不會(huì)重試數(shù)次后仍保留消息,而是重試數(shù)次仍失敗,隨即丟棄消息,消息丟失,這在實(shí)際開發(fā)中也是不能被允許的。

方法二:路由存儲(chǔ)消息

因此,可以通過下面這個(gè)方法,把消費(fèi)失敗的消息,通過交換機(jī)路由到另外的隊(duì)列中存儲(chǔ)起來,等業(yè)務(wù)代碼被修復(fù),再路由回來消費(fèi)。

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

代碼如下

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 錯(cuò)誤消息隊(duì)列
 */
@Configuration
public class ErrorMessageQueueConfig {

    /**
     * 創(chuàng)建一個(gè)交換機(jī),用于路由消費(fèi)失敗的消息
     * @return
     */
    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * 創(chuàng)建一個(gè)隊(duì)列,用于存儲(chǔ)消費(fèi)失敗的消息
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    /**
     * 綁定
     * @return
     */
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
    }

    /**
     * 路由,當(dāng)消費(fèi)失敗時(shí),把消費(fèi)失敗的消息路由到此隊(duì)列中,路由key為"error"
     * @param rabbitTemplate
     * @return
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

可以看到,消息消費(fèi)失敗后并沒有被丟失,而是路由到錯(cuò)誤隊(duì)列中存儲(chǔ)了起來。因?yàn)殄e(cuò)誤隊(duì)列沒有設(shè)置RabbitListener,所以可以存儲(chǔ)消息,等帶代碼問題被排查出來后,可以再針對(duì)該隊(duì)列設(shè)置監(jiān)聽方法,消費(fèi)這部分錯(cuò)誤的消息。

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

另外,值得一提的是,消費(fèi)者這邊的控制臺(tái)會(huì)報(bào)一個(gè)警告,提示路由密鑰錯(cuò)誤。我們可以理解,在RabbitMQ底層,會(huì)把消費(fèi)失敗了的消息,統(tǒng)一路由到一個(gè)地方去,而我們這種手動(dòng)把消費(fèi)失敗的消息路由到自定義的隊(duì)列中的方式,打破了這種“默認(rèn)的規(guī)則”,所以報(bào)了一個(gè)這樣的警告。這種警告是在可控范圍內(nèi)的。

RabbitMQ消息可靠性問題及解決,rabbitmq,分布式,java

總結(jié)

RabbitMQ發(fā)送消息,為了確保消息的可靠性,保證消息能被交換機(jī)、隊(duì)列收到,消息能被正常消費(fèi),而不會(huì)因消費(fèi)失敗而丟失,提供了對(duì)應(yīng)的一系列方法,并且最后還提供了兩種消費(fèi)失敗重試方法,優(yōu)化了消費(fèi)過程,非常Nice。文章來源地址http://www.zghlxwxcb.cn/news/detail-601451.html

到了這里,關(guān)于RabbitMQ消息可靠性問題及解決的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對(duì)這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場(chǎng)景 1.2 MQ導(dǎo)致消息丟失 1.3 消費(fèi)者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機(jī)制 2.2 生產(chǎn)者確認(rèn)機(jī)制 2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn) 2.3.1 配置yml開啟生產(chǎn)者確認(rèn) 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(24)
  • RabbitMQ高級(jí)篇---消息可靠性

    RabbitMQ高級(jí)篇---消息可靠性

    1、消息可靠性: 消息從發(fā)送到消費(fèi)者接受,會(huì)經(jīng)歷多個(gè)過程,每個(gè)消息傳遞的過程都可能導(dǎo)致消息的丟失: 常見的丟失原因: 發(fā)送時(shí)消息丟失原因: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) Rab

    2024年01月20日
    瀏覽(30)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對(duì)這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制 rabbitmq在消息的發(fā)送與接收中,會(huì)經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費(fèi)失敗甚至直接是服務(wù)器宕機(jī)等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機(jī)制 生產(chǎn)者確認(rèn)機(jī)制 消息持久

    2024年02月08日
    瀏覽(37)
  • 【RabbitMQ】之消息的可靠性方案

    【RabbitMQ】之消息的可靠性方案

    一、數(shù)據(jù)丟失場(chǎng)景 二、數(shù)據(jù)可靠性方案 1、生產(chǎn)者丟失消息解決方案 2、MQ 隊(duì)列丟失消息解決方案 3、消費(fèi)者丟失消息解決方案 MQ 消息數(shù)據(jù)完整的鏈路為 :從 Producer 發(fā)送消息到 RabbitMQ 服務(wù)器中,再由 Broker 服務(wù)的 Exchange 根據(jù) Routing_Key 路由到指定的 Queue 隊(duì)列中,最后投送到消

    2024年02月14日
    瀏覽(24)
  • RabbitMQ之消息的可靠性傳遞

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動(dòng)添加 RabbitMQ之消息的可靠性傳遞 提示:寫完文章后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內(nèi)容: 在當(dāng)今的信息化時(shí)代,消息傳遞在企業(yè)級(jí)應(yīng)用和分布式

    2024年01月19日
    瀏覽(21)
  • rabbitmq如何保證消息的可靠性

    RabbitMQ可以通過以下方式來保證消息的可靠性: 在發(fā)布消息時(shí),可以設(shè)置消息的delivery mode為2,這樣消息會(huì)被持久化存儲(chǔ)在磁盤上,即使RabbitMQ服務(wù)器重啟,消息也不會(huì)丟失。 可以創(chuàng)建持久化的隊(duì)列,這樣即使RabbitMQ服務(wù)器重啟,隊(duì)列也不會(huì)丟失。 在消費(fèi)者端,可以 設(shè)置手動(dòng)

    2024年01月23日
    瀏覽(26)
  • RabbitMQ 能保證消息可靠性嗎

    RabbitMQ 能保證消息可靠性嗎

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊(duì)列選型——為什么選擇RabbitMQ RabbitMQ靈活運(yùn)用,怎么理解五種消息模型 推或拉? RabbitMQ 消費(fèi)模式該如何選擇 死信是什么,如何運(yùn)用RabbitMQ的死信機(jī)制? 前面我們?cè)谧鯩Q組件選型時(shí),提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    瀏覽(29)
  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項(xiàng)目開發(fā)中經(jīng)常會(huì)使用消息隊(duì)列來 完成異步處理、應(yīng)用解耦、流量控制等功能 。雖然消息隊(duì)列的出現(xiàn)解決了一些場(chǎng)景下的問題,但是同時(shí)也引出了一些問題,其中使用消息隊(duì)列時(shí)如何保證消息的可靠性就是一個(gè)常見的問題。 如果在項(xiàng)目中遇到需要保證消息一定被消費(fèi)的場(chǎng)景

    2024年02月07日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包