国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ(四):RabbitMQ高級(jí)特性

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ(四):RabbitMQ高級(jí)特性。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考:

  • 消息可靠性問題:如何確保發(fā)送的消息至少被消費(fèi)—次
  • 延遲消息問題:如何實(shí)現(xiàn)消息的延遲投遞
  • 消息堆積問題:如何解決數(shù)百萬(wàn)消息堆積,無(wú)法及時(shí)消費(fèi)的問題
  • 高可用問題:如何避免單點(diǎn)的MQ故障而導(dǎo)致的不可用問題

一、消息可靠性

背景/需求:消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:

  • 發(fā)送時(shí)丟失:

    • 生產(chǎn)者發(fā)送的消息【未送達(dá)exchange】——返回nack(消息確認(rèn)模式)
    • 消息【到達(dá)exchange】——返回ack(消息確認(rèn)模式)
  • 到達(dá)queue后,MQ宕機(jī),queue將消息丟失
    ——返回ACK,及路由失敗原因(回退模式)

  • consumer接收到消息后還未消費(fèi)就宕機(jī)——消息持久化

1、【生產(chǎn)者】消息確認(rèn)

RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

  • publisher-confirm,發(fā)送者確認(rèn)
    • 消息成功投遞到交換機(jī),返回ack
    • 消息未投遞到交換機(jī),返回nack
  • publisher-return,發(fā)送者回執(zhí)
    • 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
      注意:確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突
      rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

1.1 修改application.yml配置文件,添加下面的內(nèi)容:

位置:生產(chǎn)者/publisher服務(wù)
目的:
1、開啟消息確認(rèn)模式
2、開啟消息回退(并設(shè)置消息路由到隊(duì)列失敗時(shí),回退消息給回調(diào)接口)

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    # 開啟publisher-confirm,且選擇correlated:【異步】回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
    publisher-returns: true # 開啟publish-return功能
    template:
      mandatory: true # 定義當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí)的策略?!総rue,則調(diào)用ReturnCallback;false:則直接丟棄消息】

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

說(shuō)明:

  • publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
    • simple:【同步】等待confirm結(jié)果,直到超時(shí)(可能引起代碼阻塞)
    • correlated:【異步】回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
  • template.mandatory:定義當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí)的策略?!総rue,則調(diào)用ReturnCallback;false:則直接丟棄消息】

1.2定義Return回退:

說(shuō)明:因?yàn)樵趛ml配置文件中定義消息路由失敗時(shí)的策略為true,所以當(dāng)消息從交換機(jī)路由到隊(duì)列失敗時(shí),會(huì)調(diào)用ReturnCallback

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)添加配置
修改publisher服務(wù),添加一個(gè)【配置類】:
位置:config/commic配置類

如何保證在項(xiàng)目加載時(shí)添加配置?
1、實(shí)現(xiàn)ApplicationContextAware(實(shí)現(xiàn)了ApplicationContextAware接口的實(shí)現(xiàn)類,在Spring容器的Bean工廠創(chuàng)建完畢后會(huì)通知該實(shí)現(xiàn)類)
2、此時(shí),該實(shí)現(xiàn)/配置類有了Spring容器的Bean工廠類;就可以獲取并設(shè)置ReturnCallback(Spring容器的Bean對(duì)象)
3、開始配置ReturnCallback;

ReturnCallback的回調(diào)函數(shù):當(dāng)消息成功發(fā)送到交換機(jī),但是沒有成功發(fā)送到消息隊(duì)列時(shí),回退到回調(diào)函數(shù),應(yīng)該如何處理?就是回調(diào)函數(shù)里面的內(nèi)容

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    //實(shí)現(xiàn)了ApplicationContextAware接口的實(shí)現(xiàn)類,在Spring容器的Bean工廠創(chuàng)建完畢后會(huì)通知該實(shí)現(xiàn)類
    //有了Bean工廠類,然后就可以獲取并設(shè)置ReturnCallback(Spring容器的Bean對(duì)象)
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // (從Spring容器中)獲取RabbitTemplate對(duì)象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 記錄日志
            log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
    }
}

1、重寫方法setApplicationContext(ApplicationContext applicationContext)
參數(shù)為接口,且該【接口只有一個(gè)方法】可以用lambda表達(dá)式代替
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

代替后如下rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

2、編寫回調(diào)函數(shù)
(ReturnCallback回調(diào)函數(shù):當(dāng)消息成功發(fā)送到交換機(jī),但是沒有成功發(fā)送到消息隊(duì)列時(shí),回退到回調(diào)函數(shù),應(yīng)該如何處理?就是回調(diào)函數(shù)里面的內(nèi)容)rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

1.3 定義ConfirmCallback(消息確認(rèn))

ConfirmCallback【可以在發(fā)送消息時(shí)指定】因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同

消息發(fā)送代碼如下:
位置:在publisher服務(wù)的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:

注意:確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突

CorrelationData的作用:
1、消息ID需要封裝到CorrelationData
2、correlationData.getFuture().addCallback(…)是一個(gè)回調(diào)函數(shù):決定了每個(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.準(zhǔn)備消息
        String message = "hello, spring amqp!";
        // 2.準(zhǔn)備CorrelationData(消息ID需要封裝到CorrelationData)
        // 2.1.消息ID,確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.準(zhǔn)備ConfirmCallback(Future是對(duì)將來(lái)的一種處理的封裝)(Future.addCallback)
        correlationData.getFuture().addCallback(
                result -> {
                    // 判斷結(jié)果
                    if (result.isAck()) {
                        // ACK
                        log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
                    } else {
                        // NACK
                        log.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId());
                        // 重發(fā)消息
                    }
                },
                ex -> {
                    // 記錄日志
                    log.error("消息發(fā)送失?。?, ex);
                    // 重發(fā)消息
                });
        // 3.發(fā)送消息(這里如果沒有綁定交換機(jī)和隊(duì)列關(guān)系等,可以去管控臺(tái)綁定,也可以在消費(fèi)者的配置類中聲明)
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }
}  

附1:有/無(wú)消息確認(rèn)機(jī)制的publish消息發(fā)送的對(duì)比

沒有confirm確認(rèn)機(jī)制:
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
消息確認(rèn)機(jī)制:

RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
附2:lambda表達(dá)式如何變換
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
變化后:
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

測(cè)試

(這里如果沒有綁定交換機(jī)和隊(duì)列關(guān)系等,可以去管控臺(tái)綁定,也可以在消費(fèi)者的配置類中聲明)

測(cè)試1、confirm:消息成功到達(dá)交換機(jī)——返回ack

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

測(cè)試2:confirm:消息未成功到達(dá)交換機(jī)——返回nack

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

測(cè)試3:消息發(fā)送到了交換機(jī)但沒有發(fā)送到隊(duì)列——返回ack,但是return回退

故意將隊(duì)列名字寫錯(cuò)(交換機(jī)不存在綁定該隊(duì)列)
返回ACK,及路由失敗原因.
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java


2、消息持久化(了解)

背景/需求:生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果消息隊(duì)列突然宕機(jī),也可能導(dǎo)致消息丟失。
(因?yàn)橄㈥?duì)列默認(rèn)是內(nèi)存存儲(chǔ))
(發(fā)送到消息隊(duì)列成功+消息隊(duì)列突然宕機(jī)=消息丟失)
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制(寫入到磁盤中)

注:SpringAMQP默認(rèn)是進(jìn)行持久化(包括聲明隊(duì)列、交換機(jī)、發(fā)送消息)(備注:通過管控臺(tái)創(chuàng)建的默認(rèn)是非持久化的)

那么,下面學(xué)的消息持久化有什么用呢?持久化畢竟是寫磁盤,會(huì)有一定的性能損耗,不是所有的數(shù)據(jù)都需要持久化,學(xué)了下面的持久化后可以手動(dòng)將不需要持久化的數(shù)據(jù)取消持久化

RabbitMQ Management 控制臺(tái)設(shè)置:

說(shuō)明:
Durable:持久的
Transient:轉(zhuǎn)瞬即逝的

交換機(jī)持久化
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
消息隊(duì)列持久化
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

代碼(配置類聲明)

說(shuō)明:由SpringAMQP聲明的交換機(jī)和隊(duì)列都是持久化的(所以持久化隊(duì)列和交換機(jī)的代碼和我們之前配置類聲明隊(duì)列、交換機(jī)一樣)
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

@Configuration
public class CommonConfig {
    // 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除(事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)和隊(duì)列都是持久化的)
    @Bean
    public DirectExchange simpleDirect(){
        //默認(rèn)為return new DirectExchange("simple.direct",true,false);
        return new DirectExchange("simple.direct");
    }
    // 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
    @Bean
    public Queue simpleQueue(){
        //new Queue("");默認(rèn)代碼為public Queue(String name) { this(name, true, false, false);}
        return QueueBuilder.durable("simple.queue").build();
    }
}

聲明完隊(duì)列和交換機(jī),可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)和隊(duì)列都會(huì)帶上D的標(biāo)示:

此時(shí),消息和隊(duì)列都持久化了,但是,如果消息還是沒有持久化(重啟rabbitmq,交換機(jī)和隊(duì)列都在,但是消息會(huì)消失)

消息持久化

默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
下面是手動(dòng)設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
NON_PERSISTENT,非持久化
PERSISTENT;持久化

@Test
public void testDurableMessage() {
    // 1.準(zhǔn)備消息
    Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.發(fā)送消息
    rabbitTemplate.convertAndSend("simple.queue", message);
}

3、【消費(fèi)者】消息確認(rèn)

RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。此時(shí),如果消費(fèi)者還沒有處理消息,然后消費(fèi)者掛掉了,就會(huì)導(dǎo)致消息丟失。

場(chǎng)景如下:

  • 1)RabbitMQ投遞消息給消費(fèi)者
  • 2)消費(fèi)者獲取消息后,【返回ACK給RabbitMQ】
  • 3)RabbitMQ刪除消息
  • 4)消費(fèi)者宕機(jī),消息尚未處理

(成功發(fā)送到消費(fèi)者+消費(fèi)者還沒處理消息就宕機(jī)了)消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。

而SpringAMQP則允許配置三種確認(rèn)模式:

  • manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
  • auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
  • none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除(此時(shí),消息投遞是不可靠的,可能丟失)

一般,我們都是使用默認(rèn)的auto即可。

【yml配置文件中配置消息確認(rèn)模式】:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto #由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack

消費(fèi)者模擬處理異常

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        //none模式下,消費(fèi)者在這里接收到消息后,消息就從隊(duì)列中被刪除了
        log.debug("消費(fèi)者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);//拋出異常、后面就不會(huì)執(zhí)行業(yè)務(wù)代碼
        log.info("消費(fèi)者處理消息成功!");//模擬業(yè)務(wù)代碼
    }
}

發(fā)送消息測(cè)試

管控臺(tái)發(fā)送消息
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

auto模式下:由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
【問題】:當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:(這里測(cè)試一條數(shù)據(jù)就已經(jīng)達(dá)到 3000條/s 了)
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

4、消費(fèi)失敗重試機(jī)制

我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列(不返回ack,也不返回nack),而是可以自己設(shè)置重試的次數(shù)(如果在重試n次后仍然失敗,那么后面在繼續(xù)重入隊(duì)大概率也會(huì)失敗,那么就直接扔掉,不再重入隊(duì),此時(shí),Spring會(huì)返回ack)
總結(jié):

  • 開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 【重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄】

1、本地重試

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry: # Spring消費(fèi)者失敗重試
          enabled: true # 【開關(guān)】開啟消費(fèi)者失敗重試
          initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒(第一次失敗后1s重試)
          multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval(舉例:倍數(shù)*第一次等待時(shí)長(zhǎng)1s,這樣子永遠(yuǎn)都是1s)
          #但是如果設(shè)置為2,下次等待時(shí)長(zhǎng)為上次的2倍,因此等待時(shí)長(zhǎng)依次為1、2、4、8、16....
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # (默認(rèn)為true)true無(wú)狀態(tài);false有狀態(tài)【如果業(yè)務(wù)中包含事務(wù),這里改為false】
          #(備注:如果設(shè)置為false,那么Spring在重試的時(shí)候保留事務(wù)——消耗性能,所以沒有事務(wù)時(shí)設(shè)置為true提升性能)
          max-interval: 10000 # 最大等待時(shí)長(zhǎng),大于此時(shí)長(zhǎng)的一律按最大時(shí)長(zhǎng)來(lái)計(jì)算

重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):

  • 在重試4次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說(shuō)明本地重試觸發(fā)了
  • 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是ack,mq刪除消息了
  • rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

2、失敗策略

問題:在上面的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。但是,有些數(shù)據(jù)特別重要,我們不希望任何消息被丟棄,此時(shí),我們應(yīng)該如何實(shí)現(xiàn)?

在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來(lái)處理,它包含三種不同的實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,【丟棄消息】【默認(rèn)】就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)(Immediate立刻重入隊(duì))(但是頻率比沒有配置消費(fèi)失敗重載機(jī)制低一些)
  • RepublishMessageRecoverer(推薦):重試耗盡后,將失敗消息投遞到指定的交換機(jī)

RepublishMessageRecoverer:失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理,這樣所有的消息都不會(huì)丟失。
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

【RepublishMessageRecoverer處理模式的代碼實(shí)現(xiàn)】 :
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

1)【定義】處理失敗消息的【交換機(jī)和隊(duì)列】

位置:在consumer服務(wù)中的配置類config/ErrorMessageConfig.java
作用:聲明交換機(jī)、隊(duì)列、綁定關(guān)系

綁定關(guān)系:交換機(jī)error.direct–routingkey(error)–》error.queue

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
2)定義一個(gè)RepublishMessageRecoverer,指定/關(guān)聯(lián)隊(duì)列和交換機(jī)

失敗策略:
1、參數(shù)rabbitTemplate(Spring容器自動(dòng)注入)
2、通過rabbitTemplate將消息發(fā)送到(處理失敗消息的)交換機(jī)(routingKey為error)

這里的RepublishMessageRecoverer的作用:當(dāng)消費(fèi)者的消息失敗重試次數(shù)用盡后,將失敗的消息【丟棄給指定的error交換機(jī)的error隊(duì)列】

//參數(shù) Spring自動(dòng)注入的rabbitTemplate
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
測(cè)試結(jié)果:

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
我們可以在error隊(duì)列中查看具體的錯(cuò)誤信息,然后進(jìn)行修改rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

5、總結(jié):

如何確保RabbitMQ消息的可靠性?

  • 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
  • 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
  • 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
  • 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

二、死信交換機(jī)

1、初識(shí)死信交換機(jī)

什么是死信?
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):

  • 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個(gè)過期消息,超時(shí)無(wú)人消費(fèi)
  • 要投遞的隊(duì)列消息滿了,無(wú)法投遞

如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)

死信交換機(jī)過程大致如下:
1、(重試次數(shù)耗盡)一個(gè)消息被消費(fèi)者拒絕了,變成了死信
2、因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī)
3、如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

附:死信交換機(jī)對(duì)比:消費(fèi)失敗策略

發(fā)送消息的對(duì)象不同?
republish是由consumer發(fā)送,死信是由隊(duì)列去發(fā)送
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
從上圖可以看出,發(fā)送消息的對(duì)象不同,因此,死信交換機(jī)的其中一個(gè)功能和消費(fèi)失敗策略功能類似:作為一個(gè)兜底方案,當(dāng)消費(fèi)者宕機(jī),導(dǎo)致隊(duì)列滿了放不下 隊(duì)列還可以將溢出的消息轉(zhuǎn)發(fā)到死信隊(duì)列。

【代碼實(shí)現(xiàn):利用死信交換機(jī)接收死信】

那么如何實(shí)現(xiàn)呢?
隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:

  • 死信交換機(jī)名稱
  • 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey

這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。

代碼實(shí)現(xiàn):
聲明交換機(jī)、隊(duì)列、綁定關(guān)系
+指定死信交換機(jī)

// 【聲明普通的 simple.queue隊(duì)列,并且為其指定死信交換機(jī):dl.direct】
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
        .deadLetterExchange("dl.direct") // 【指定死信交換機(jī)】
        .build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲(chǔ)死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

2、TTL

什么是TTL?
TTL,也就是Time-To-Live。如果一個(gè)隊(duì)列中的消息TTL結(jié)束仍未消費(fèi),則會(huì)變?yōu)樗佬?,ttl超時(shí)分為兩種情況:

  • 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
  • 消息本身設(shè)置了超時(shí)時(shí)間

從上面的死信交換機(jī)中指定,消息是一個(gè)過期消息,超時(shí)無(wú)人消費(fèi),這條消息就會(huì)被投遞到死信交換機(jī)中。

其流程大致如下:
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

TTL的延申功能:延遲消息
給一個(gè)消息/隊(duì)列設(shè)置超時(shí)時(shí)間,將消息發(fā)送到ttl.queue(該隊(duì)列沒有消費(fèi)者,消息一定會(huì)超時(shí))消息超時(shí)后變成了死信。交給死信交換機(jī)-隊(duì)列-消費(fèi)者 ,這樣就完成了延遲消息的功能。
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

代碼實(shí)現(xiàn):延遲消息功能

1、監(jiān)聽器:【定義一個(gè)新的消費(fèi)者(方法)】【并且聲明死信交換機(jī)、死信隊(duì)列、綁定關(guān)系】

位置:在consumer服務(wù)的SpringRabbitListener中
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "dl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
2、聲明交換機(jī)、隊(duì)列,綁定關(guān)系、為隊(duì)列指定TTL超時(shí)時(shí)間

新建一個(gè)配置類(便于管理),配置類記得添加@Configuration
位置:consumer/config/TTLMessageConfig類
要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置ttl屬性

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

@Configuration
public class TTLMessageConfig {
//聲明隊(duì)列ttl.queue,設(shè)置超時(shí)時(shí)間
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
        .ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
        .deadLetterExchange("dl.direct") // 隊(duì)列指定死信交換機(jī),即消息超時(shí)就投到這個(gè)交換機(jī)
        .deadLetterRoutingKey("dl")//消息到死信交換機(jī)的RoutingKey
        .build();
}
//正常的聲明交換機(jī)ttl.direct
@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
//正常的綁定交換機(jī)和隊(duì)列,routingkey為ttl
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
3、發(fā)送消息時(shí),設(shè)定TTL

在發(fā)送消息時(shí),也可以指定TTL:
位置:publisher

@Test
public void testTTLMsg() {
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("發(fā)送消息成功");
}

測(cè)試結(jié)果:
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信

三、延時(shí)隊(duì)列

因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。

這個(gè)延時(shí)插件需要自己安裝,下面文章有基于Linux系統(tǒng)的docekr方式安裝
備注:
RabbitMQ(二):RabbitMQ的安裝(Linux、基于docker安裝)及其插件安裝


DelayExchange原理:

DelayExchange插件的原理是對(duì)官方原生的Exchange做了功能的升級(jí):

  • 將DelayExchange接受到的消息暫存在內(nèi)存中(官方的Exchange是無(wú)法存儲(chǔ)消息的)

  • 在DelayExchange中計(jì)時(shí),超時(shí)后才投遞消息到隊(duì)列中

使用DelayExchange-控制臺(tái)方式

1、控制臺(tái)聲明延遲交換機(jī)rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
2、發(fā)送消息rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

使用DelayExchange-代碼方式

DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:

  • 接收消息
  • 判斷消息是否具備x-delay屬性
  • 如果有x-delay屬性,說(shuō)明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時(shí)間
  • 返回routing not found結(jié)果給消息發(fā)送者
  • x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列

1)聲明DelayExchange交換機(jī)

聲明交換機(jī)為delayed類型
法一:基于@RabbitListener(推薦)
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayExchange(String msg) {
    log.info("消費(fèi)者接收到了delay.queue的延遲消息");
}

法二:基于@Bean的方式:

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

2)publisher發(fā)送消息

向這個(gè)delay為true的交換機(jī)中發(fā)送消息時(shí),一定要給消息添加一個(gè)header:x-delay屬性,指定延遲的時(shí)間,單位為毫秒:
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

@Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.準(zhǔn)備消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.準(zhǔn)備CorrelationData
        //消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.發(fā)送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("發(fā)送消息成功");
    }

測(cè)試:

運(yùn)行會(huì)出現(xiàn)如下錯(cuò)誤
錯(cuò)誤原因:因?yàn)闆]有給延遲交換機(jī)指定routingKey,所以路由失?。ㄒ矝]有消費(fèi)者)rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

解決方案:

因?yàn)槭窍⒊晒Πl(fā)送到交換機(jī),交換機(jī)發(fā)送到隊(duì)列失敗——此時(shí)會(huì)進(jìn)行return消息回退;那么,我們可以回退模式中添加判斷
位置:publisher服務(wù)的config/CommonConfig配置類下(ReturnCallback)

添加如下判斷

// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
    //判斷延遲值非空且大于0==》是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示
    return;
}

rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java

延遲隊(duì)列插件的使用步驟包括哪些?

?聲明一個(gè)交換機(jī),添加delayed屬性為true

?發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間

四、惰性隊(duì)列

消息堆積問題

【當(dāng)生產(chǎn)者發(fā)送消息的速度】超過了【消費(fèi)者處理消息的速度】,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。

解決消息堆積有三種思路:

  • 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說(shuō)的work queue模式
  • 在消費(fèi)者內(nèi)開啟線程池加快消息處理速度(限制:當(dāng)消息很多時(shí),需要開啟很多線程,線程越多,CPU需要進(jìn)行上下文切換——消耗性能;適用于消息處理時(shí)間較長(zhǎng)的情況,開多個(gè)線程并行處理多個(gè)業(yè)務(wù))
  • 擴(kuò)大隊(duì)列容積,提高堆積上限

其中,要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。

惰性隊(duì)列的特征如下:

  • 接收到消息后直接【存入磁盤】而非內(nèi)存

    mq消息一般都是儲(chǔ)存在內(nèi)存——響應(yīng)速度快(優(yōu)點(diǎn)),但是,mq在內(nèi)存儲(chǔ)存設(shè)置了一個(gè)上限,mq設(shè)置內(nèi)存預(yù)警值,當(dāng)消息占了內(nèi)存的40%時(shí),mq會(huì)處于暫停的狀態(tài),阻止生產(chǎn)者投遞消息,將這部分消息刷出到磁盤,清理出一部分內(nèi)存空間出來(lái),導(dǎo)致mq會(huì)間歇性的出現(xiàn)暫停,導(dǎo)致mq的并發(fā)能力出現(xiàn)忽高忽低的性能不穩(wěn)定的情況
    將消息存入磁盤就不會(huì)出現(xiàn)這個(gè)問題,但是磁盤的速度肯定沒有內(nèi)存的快——性能損耗

  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存——同上,性能損耗
  • 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)

惰性隊(duì)列的如何創(chuàng)建

方式1、基于@Bean聲明lazy-queue
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
方式2、基于@RabbitListener聲明LazyQueue
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java
方式3、通過命令行可以將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
使用Xshell,進(jìn)入mq容器中,執(zhí)行該指令
1、進(jìn)入容器

docker exec -it mq1

2、執(zhí)行命令

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個(gè)策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達(dá)式,匹配隊(duì)列的名字,(凡是符合該正則表達(dá)式規(guī)則的隊(duì)列,全部按照該策略設(shè)置)
  • '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
  • --apply-to queues :策略的作用對(duì)象,是所有的隊(duì)列

執(zhí)行完該指令后,可以在Rabbitmq的管控臺(tái)出處查看策略
rabbitmq correlationdata,消息隊(duì)列,java-rabbitmq,rabbitmq,java


消息隊(duì)列專欄文章:

RabbitMQ(一)初識(shí)消息隊(duì)列(MQ)

RabbitMQ(二):RabbitMQ的安裝(Linux、基于docker安裝)及其插件安裝

RabbitMQ(三):RabbitMQ快速入門(SpringBoot)

RabbitMQ(四):RabbitMQ高級(jí)特性

文章整理自:黑馬教學(xué)視頻文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-804347.html

到了這里,關(guān)于RabbitMQ(四):RabbitMQ高級(jí)特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 4.RabbitMQ高級(jí)特性 冪等 可靠消息 等等

    4.RabbitMQ高級(jí)特性 冪等 可靠消息 等等

    保障消息的成功發(fā)出 保障MQ節(jié)點(diǎn)的成功接收 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答 完善的消息進(jìn)行補(bǔ)償機(jī)制 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。 生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常的發(fā)送到了Broker,這種方式也是

    2024年02月11日
    瀏覽(25)
  • RabbitMQ高級(jí)特性解析:消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制探究

    RabbitMQ高級(jí)特性解析:消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制探究

    學(xué)習(xí)RabbitMQ高級(jí)特性,涵蓋消息的持久化、確認(rèn)模式、退回模式以及消費(fèi)者ACK機(jī)制等方面,助您構(gòu)建高可靠性的消息隊(duì)列系統(tǒng)。

    2024年01月16日
    瀏覽(18)
  • RabbitMQ(四):RabbitMQ高級(jí)特性

    RabbitMQ(四):RabbitMQ高級(jí)特性

    消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考: 消息可靠性問題:如何確保發(fā)送的消息至少被消費(fèi)—次 延遲消息問題:如何實(shí)現(xiàn)消息的延遲投遞 消息堆積問題:如何解決數(shù)百萬(wàn)消息堆積,無(wú)法及時(shí)消費(fèi)的問題 高可用問題:如何避免單點(diǎn)的MQ故障而導(dǎo)致的不可用問題

    2024年01月19日
    瀏覽(14)
  • RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景:超過30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)? RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。 隊(duì)列有一個(gè)消息過期屬性。就像豐巢超過24小時(shí)就收費(fèi)一樣,通過設(shè)置這個(gè)屬性,超過了指定事件的消息將會(huì)被丟棄。 這個(gè)屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • 【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    【RabbitMQ】RabbitMQ高級(jí):死信隊(duì)列和延遲隊(duì)列

    在電商平臺(tái)下單,訂單創(chuàng)建成功,等待支付,一般會(huì)給30分鐘的時(shí)間,開始倒計(jì)時(shí)。如果在這段時(shí)間內(nèi)用戶沒有支付,則默認(rèn)訂單取消。 該如何實(shí)現(xiàn)? 定期輪詢(數(shù)據(jù)庫(kù)等) 用戶下單成功,將訂單信息放入數(shù)據(jù)庫(kù),同時(shí)將支付狀態(tài)放入數(shù)據(jù)庫(kù),用戶付款更改數(shù)據(jù)庫(kù)狀態(tài)。定

    2024年01月17日
    瀏覽(19)
  • 高級(jí)篇-rabbitmq的高級(jí)特性

    高級(jí)篇-rabbitmq的高級(jí)特性

    ? ? ?啟動(dòng)MQ 創(chuàng)建Queues:? 兩種Callback: 1.ReturnCallback:全局callback? ?2.ComfirmCallback:?發(fā)送信息時(shí)候設(shè)置 ? ?執(zhí)行成功: ?監(jiān)控頁(yè)面: 模擬失?。??1.投遞到交互機(jī)失敗 2.投遞到交換機(jī)了,但是沒有進(jìn)入隊(duì)列? ? ?注意: ? 演示數(shù)據(jù)是否默認(rèn)持久化:? ? ? ?重啟mq: ?1. 交互機(jī)、

    2024年02月09日
    瀏覽(26)
  • rabbitmq筆記-rabbitmq進(jìn)階-數(shù)據(jù)可靠性,rabbitmq高級(jí)特性

    消息何去何從 mandatory和immediate是channel.basicPublish方法的兩個(gè)參數(shù),都有消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能。 mandatory參數(shù) true:交換器無(wú)法根據(jù)自身的類型 和路由鍵找到符合條件的隊(duì)列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者 生產(chǎn)者調(diào)用channel.

    2024年02月10日
    瀏覽(45)
  • 【RabbitMQ】消息隊(duì)列-RabbitMQ篇章

    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章

    RabbitMQ是一個(gè)開源的 遵循AMQP協(xié)議 實(shí)現(xiàn)的基于Erlang語(yǔ)言編寫,支持多種客戶端(語(yǔ)言)。用于在分布式系統(tǒng)中 存儲(chǔ)消息,轉(zhuǎn)發(fā)消息 ,具有 高可用 , 高可擴(kuò)性 , 易用性 等特征。 1.1、RabbitMQ—使用場(chǎng)景 一般場(chǎng)景 像一般的下訂單業(yè)務(wù)如下圖: 將訂單信息寫入數(shù)據(jù)庫(kù)成功后,發(fā)

    2024年02月12日
    瀏覽(20)
  • 3.精通RabbitMQ—消息隊(duì)列、RabbitMQ

    3.精通RabbitMQ—消息隊(duì)列、RabbitMQ

    RabbitMQ面試題 (總結(jié)最全面的面試題) 入門RabbitMQ消息隊(duì)列,看這篇文章就夠了 消息隊(duì)列 是一種基于 隊(duì)列 ,用于解決 不同進(jìn)程或應(yīng)用 之間 通訊 的 消息中間件 。 支持多種 消息傳遞模式 ,如 隊(duì)列模型 、 發(fā)布/訂閱模型 等。 業(yè)務(wù)解耦 :通過 發(fā)布/訂閱 模式,減少系統(tǒng)的 耦

    2024年02月15日
    瀏覽(19)
  • 【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之死信隊(duì)列。 目錄 一、RabbitMQ死信隊(duì)列 1.1、什么是死信隊(duì)列 1.2、設(shè)置過期時(shí)間TTL 1.3、配置死信交換機(jī)和死信隊(duì)列(代碼配置) (1)設(shè)置隊(duì)列過期時(shí)間 (2)設(shè)置單條消息過期時(shí)間 (3)隊(duì)列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包