国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

這篇具有很好參考價(jià)值的文章主要介紹了(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

[此文檔是在心向陽(yáng)光的天域的博客加了一些有助于自己的知識(shí)體系,也歡迎大家關(guān)注這個(gè)大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html)
[是這個(gè)視頻](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5&spm_id_from=pageDriver&vd_source=9beb0a2f0cec6f01c2433a881b54152c)

= = = = = = = = = = = = = = = 微服務(wù)技術(shù)——可靠性消息服務(wù) = = = = = = = = = = = = = = =

今日目標(biāo)

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

服務(wù)異步通信-高級(jí)篇

消息隊(duì)列在使用過(guò)程中,面臨著很多實(shí)際問(wèn)題需要思考:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.消息可靠性

消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過(guò)程:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
其中的每一步都可能導(dǎo)致消息丟失,常見(jiàn)的丟失原因包括:

  • 發(fā)送時(shí)丟失:
    • 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
    • 消息到達(dá)exchange后未到達(dá)queue
  • MQ宕機(jī),queue將消息丟失
  • consumer接收到消息后未消費(fèi)就宕機(jī)

針對(duì)這些問(wèn)題,RabbitMQ分別給出了解決方案:

  • 生產(chǎn)者確認(rèn)機(jī)制
  • mq持久化
  • 消費(fèi)者確認(rèn)機(jī)制
  • 失敗重試機(jī)制

下面我們就通過(guò)案例來(lái)演示每一個(gè)步驟。
首先,導(dǎo)入課前資料提供的demo工程:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
項(xiàng)目結(jié)構(gòu)如下:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
用docker啟動(dòng)即可

docker start mq

要?jiǎng)?chuàng)建一個(gè)隊(duì)列起名simple.queue
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
然后在交換機(jī)中把a(bǔ)mq.topic交換機(jī),和上面創(chuàng)建的隊(duì)列simple.queue綁定,我們手動(dòng)配置
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
進(jìn)入amq.topic交換機(jī)后,綁定隊(duì)列
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
綁定后如圖:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.1.生產(chǎn)者消息確認(rèn)

RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過(guò)程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

  • publisher-confirm,發(fā)送者確認(rèn)
    • 消息成功投遞到交換機(jī),返回ack
    • 消息未投遞到交換機(jī),返回nack
  • publisher-return,發(fā)送者回執(zhí)
    • 消息投遞到交換機(jī)了,但是沒(méi)有路由到隊(duì)列。返回ACK,及路由失敗原因。

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

注意:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.1.1.修改配置

首先,修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

說(shuō)明:

  • publish-confirm-type:開(kāi)啟publisher-confirm,這里支持兩種類型:
    • simple:同步等待confirm結(jié)果,直到超時(shí)
    • correlated?:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
  • publish-returns:開(kāi)啟publish-return功能,同樣是基于callback機(jī)制,不過(guò)是定義ReturnCallback
  • template.mandatory:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
1.1.2.定義Return回調(diào)

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)配置:

修改publisher服務(wù),添加一個(gè):

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 設(shè)置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投遞失敗,記錄日志
            log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有業(yè)務(wù)需要,可以重發(fā)消息
        });
    }
}
1.1.3.定義ConfirmCallback

ConfirmCallback可以在發(fā)送消息時(shí)指定,因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。

在publisher服務(wù)的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息體
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失敗
                log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.發(fā)送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一會(huì)兒,等待ack回執(zhí)
    Thread.sleep(2000);
}

全部配置完后,運(yùn)行測(cè)試類SpringAmqpTest.java,這說(shuō)明消息發(fā)送成功
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
然后呢,我們來(lái)一個(gè)消息發(fā)送失敗的情況,我們故意填錯(cuò)交換機(jī)的名字
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
調(diào)用后,后臺(tái)打印日志如下:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
然后我們嘗試填錯(cuò),routingKey看一下
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
報(bào)錯(cuò)信息如下:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
之后我們恢復(fù)代碼,都保證正確即可

總結(jié):
SpringAMQP中處理消息確認(rèn)的幾種情況:
● publisher-comfirm:

  • 消息成功發(fā)送到exchange,返回ack
  • 消息發(fā)送失敗,沒(méi)有到達(dá)交換機(jī),返回nack
  • 消息發(fā)送過(guò)程中出現(xiàn)異常,沒(méi)有收到回執(zhí)

● 消息成功發(fā)送到exchange, 但沒(méi)有路由到queue,

  • 調(diào)用ReturnCallback

1.2.消息持久化

生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。

要想確保消息在RabbitMQ中安全保存,必須開(kāi)啟消息持久化機(jī)制。

  • 交換機(jī)持久化
  • 隊(duì)列持久化
  • 消息持久化
1.2.1.交換機(jī)持久化

RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。

我們通過(guò)命令
重啟mq

docker restart mq

然后查看隊(duì)列、交換機(jī)的情況,比如我們創(chuàng)建的是持久化隊(duì)列
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒(méi)有queue與其綁定時(shí)是否自動(dòng)刪除
    return new DirectExchange("simple.direct", true, false);
}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。

可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)都會(huì)帶上D的標(biāo)示:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.2.2.隊(duì)列持久化

RabbitMQ中隊(duì)列默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:

我們可以先去mq圖形化界面把simple.queue刪除

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的隊(duì)列都會(huì)帶上D的標(biāo)示:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

這些做完后,我們啟動(dòng)ConsumerApplication.java,然后查看mq的圖形化界面
交換機(jī)是持久的
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
隊(duì)列是持久的
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.2.3.消息持久化

首先把consumer服務(wù)停了,不要消費(fèi)我們的消息
我們?cè)趍q的圖形化界面,點(diǎn)擊simple.queue隊(duì)列,然后編輯消息,點(diǎn)擊發(fā)送
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
查看有1條消息
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
然后我們重啟docker中的mq

docker restart mq

然后再回來(lái)看mq的圖形化界面,發(fā)現(xiàn)隊(duì)列還在,但是消息沒(méi)了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代碼指定:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
運(yùn)行測(cè)試類SpringAmqpTest.java之后,查看mq的圖形化界面
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
查看一下具體消息
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
然后我們重啟一下docker的mq容器

docker restart mq

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

注意:AMQP中創(chuàng)建的交換機(jī)、隊(duì)列、消息默認(rèn)都是持久的
交換機(jī):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

消息:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.3.消費(fèi)者消息確認(rèn)

RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。
而RabbitMQ是通過(guò)消費(fèi)者回執(zhí)來(lái)確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。

設(shè)想這樣的場(chǎng)景:

  • 1)RabbitMQ投遞消息給消費(fèi)者
  • 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費(fèi)者宕機(jī),消息尚未處理

這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。

而SpringAMQP則允許配置三種確認(rèn)模式:

  • manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
  • auto?:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒(méi)有異常則返回ack;拋出異常則返回nack。
  • none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除

由此可知:

  • none模式下,消息投遞是不可靠的,可能丟失
  • auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒(méi)有異常,返回ack
  • manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack

一般,我們都是使用默認(rèn)的auto即可。

1.3.1.演示none模式

修改consumer服務(wù)的application.yml文件,添加下面內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 關(guān)閉ack

修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:
修改SpringRabbitListener.java

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);
    // 模擬異常
    System.out.println(1 / 0);
    log.debug("消息處理完成!");
}

測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。
dubug啟動(dòng)Consumer
發(fā)現(xiàn)消息還沒(méi)接收呢,直接就沒(méi)了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
也就是說(shuō),消費(fèi)者雖然接收到了消息,但是假如消費(fèi)者還沒(méi)有讀取,發(fā)生了報(bào)錯(cuò)或者宕機(jī),這個(gè)消息就會(huì)丟失

1.3.2.演示auto模式

再次把確認(rèn)機(jī)制修改為auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 關(guān)閉ack

我們?nèi)q的圖形化界面創(chuàng)建消息
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
發(fā)送后,我們看到圖形化界面中有1條消息
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
IDEA后臺(tái)因?yàn)槲覀冋J(rèn)為寫(xiě)了1/0的錯(cuò)誤算數(shù)運(yùn)算,導(dǎo)致IDEA不停重發(fā)請(qǐng)求重試消息的推送,這顯然也不符合我們的要求
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒(méi)有被RabbitMQ刪除:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.4.消費(fèi)失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
怎么辦呢?

1.4.1.本地重試

我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開(kāi)啟消費(fèi)者失敗重試
          initial-interval: 1000 # 初始的失敗等待時(shí)長(zhǎng)為1秒
          multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
          max-attempts: 4 # 最大重試次數(shù)
          stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

修改SpringRabbitListener.java
修改為日志打印的形式

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消費(fèi)者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);
        log.info("消費(fèi)者處理消息成功!");
    }

重啟consumer服務(wù),重復(fù)之前的測(cè)試。可以發(fā)現(xiàn):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

  • 在重試4次后,SpringAMQP會(huì)拋出異常

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

AmqpRejectAndDontRequeueException,說(shuō)明本地重試觸發(fā)了

  • 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是ack,mq刪除消息了

結(jié)論:

  • 開(kāi)啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄
1.4.2.失敗策略

在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。

在開(kāi)啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來(lái)處理,它包含三種不同的實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式

  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)

  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)?
    (黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門(mén)存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代碼:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

以上配置完之后,我們?cè)僦貜?fù)步驟發(fā)送消息
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
發(fā)送后我們看到失敗交換機(jī)有了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
隊(duì)列也有了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
看一下IDEA的后臺(tái)
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
看一下error.queue中的消息,很清晰把錯(cuò)誤棧都輸出了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.5.總結(jié)

如何確保RabbitMQ消息的可靠性?

  • 開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
  • 開(kāi)啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
  • 開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
  • 開(kāi)啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

2.死信交換機(jī)

2.1.初識(shí)死信交換機(jī)

2.1.1.什么是死信交換機(jī)

什么是死信?

當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):

  • 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)
  • 要投遞的隊(duì)列消息滿了,無(wú)法投遞

如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,檢查DLX)。

如圖,一個(gè)消息被消費(fèi)者拒絕了,變成了死信:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

另外,隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:

  • 死信交換機(jī)名稱
  • 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey

這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

2.1.2.利用死信交換機(jī)接收死信(拓展)

在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會(huì)在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄。

我們可以給simple.queue添加一個(gè)死信交換機(jī),給死信交換機(jī)綁定一個(gè)隊(duì)列。這樣消息變成死信后也不會(huì)丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊(duì)列。
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

我們?cè)赾onsumer服務(wù)中,定義一組死信交換機(jī)、死信隊(duì)列:

// 聲明普通的 simple.queue隊(duì)列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交換機(jī)
        .build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲(chǔ)死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
2.1.3.總結(jié)

什么樣的消息會(huì)成為死信?

  • 消息被消費(fèi)者reject或者返回nack
  • 消息超時(shí)未消費(fèi)
  • 隊(duì)列滿了

死信交換機(jī)的使用場(chǎng)景是什么?

  • 如果隊(duì)列綁定了死信交換機(jī),死信會(huì)投遞到死信交換機(jī);
  • 可以利用死信交換機(jī)收集所有消費(fèi)者處理失敗的消息(死信),交由人工處理,進(jìn)一步提高消息隊(duì)列的可靠性。

2.2.TTL

一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變?yōu)樗佬?,超時(shí)分為兩種情況:

  • 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
  • 消息本身設(shè)置了超時(shí)時(shí)間
    (黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
2.2.1.接收超時(shí)死信的死信交換機(jī)

在consumer服務(wù)的SpringRabbitListener中,定義一個(gè)新的消費(fèi)者,并且聲明 死信交換機(jī)、死信隊(duì)列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
2.2.2.聲明一個(gè)隊(duì)列,并且指定TTL

要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
        .ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)
        .build();
}

注意,這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.direct

聲明交換機(jī),將ttl與交換機(jī)綁定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

發(fā)送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 創(chuàng)建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 記錄日志
    log.debug("發(fā)送消息成功");
}

發(fā)送消息的日志:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

查看下接收消息的日志:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

因?yàn)殛?duì)列的TTL值是10000ms,也就是10秒??梢钥吹较l(fā)送與接收之間的時(shí)差剛好是10秒。

2.2.3.發(fā)送消息時(shí),設(shè)定TTL

在發(fā)送消息時(shí),也可以指定TTL:

@Test
public void testTTLMsg() {
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("發(fā)送消息成功");
}

查看發(fā)送消息日志:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

接收消息日志:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

這次,發(fā)送與接收的延遲只有5秒。說(shuō)明當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信。

2.2.4.總結(jié)

消息超時(shí)的兩種方式是?

  • 給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過(guò)ttl時(shí)間的消息變?yōu)樗佬?/li>
  • 給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過(guò)ttl時(shí)間后變?yōu)樗佬?/li>

如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?

  • 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)
  • 將消費(fèi)者監(jiān)聽(tīng)的隊(duì)列綁定到死信交換機(jī)
  • 發(fā)送消息時(shí)給消息設(shè)置超時(shí)時(shí)間為20秒

2.3.延遲隊(duì)列

利用TTL結(jié)合死信交換機(jī),我們實(shí)現(xiàn)了消息發(fā)出后,消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。

延遲隊(duì)列的使用場(chǎng)景包括:

  • 延遲發(fā)送短信
  • 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動(dòng)取消
  • 預(yù)約工作會(huì)議,20分鐘后自動(dòng)通知所有參會(huì)人員

因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。

這個(gè)插件就是DelayExchange插件。參考RabbitMQ的插件列表頁(yè)面:https://www.rabbitmq.com/community-plugins.html
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

使用方式可以參考官網(wǎng)地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

2.3.1.安裝DelayExchange插件

參考課前資料:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

1.安裝DelayExchange插件

官方的安裝指南地址為:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文檔是基于linux原生安裝RabbitMQ,然后安裝插件。

因?yàn)槲覀冎笆腔贒ocker安裝RabbitMQ,所以下面我們會(huì)講解基于Docker來(lái)安裝RabbitMQ插件。

2.下載插件

RabbitMQ有一個(gè)官方的插件社區(qū),地址為:https://www.rabbitmq.com/community-plugins.html

其中包含各種各樣的插件,包括我們要使用的DelayExchange插件:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

大家可以去對(duì)應(yīng)的GitHub頁(yè)面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9這個(gè)對(duì)應(yīng)RabbitMQ的3.8.5以上版本。

課前資料也提供了下載好的插件:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

3.上傳插件

因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。如果不是基于Docker的同學(xué),請(qǐng)參考第一章部分,重新創(chuàng)建Docker容器。

我們之前設(shè)定的RabbitMQ的數(shù)據(jù)卷名稱為mq-plugins,所以我們使用下面命令查看數(shù)據(jù)卷:

docker volume inspect mq-plugins

可以得到下面結(jié)果:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
接下來(lái),將插件上傳到這個(gè)目錄即可:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

4.安裝插件

最后就是安裝了,需要進(jìn)入MQ容器內(nèi)部來(lái)執(zhí)行安裝。我的容器名為mq,所以執(zhí)行下面命令:

docker exec -it mq bash

執(zhí)行時(shí),請(qǐng)將其中的 -it 后面的mq替換為你自己的容器名.

進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開(kāi)啟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

結(jié)果如下:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

2.3.2.DelayExchange原理

DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

  • 接收消息
  • 判斷消息是否具備x-delay屬性
  • 如果有x-delay屬性,說(shuō)明是延遲消息,持久化到硬盤(pán),讀取x-delay值,作為延遲時(shí)間
  • 返回routing not found結(jié)果給消息發(fā)送者
  • x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列
2.3.3.使用DelayExchange

插件的使用也非常簡(jiǎn)單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。

1)聲明DelayExchange交換機(jī)

基于注解方式(推薦):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
SpringRabbitListener.java

/**
     * 延遲交換機(jī)和隊(duì)列
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String message) {
        log.info("消費(fèi)者接收到了delay.queue的消息" + message);
    }

也可以基于@Bean的方式:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

2)發(fā)送消息

發(fā)送消息時(shí),一定要攜帶x-delay屬性,指定延遲的時(shí)間:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
SpringAmqpTest.java

    /**
     * 發(fā)送延遲消息
     *
     * @throws InterruptedException
     */
    @Test
    public void testSendDealyMessage() throws InterruptedException {
        String routingKey = "delay";

        // 創(chuàng)建消息
        Message message = MessageBuilder.withBody("hello, delay message !".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 準(zhǔn)備CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("delay.direct", routingKey, message, correlationData);
    }
    log.info("發(fā)送delay消息成功!");

看一下mq的圖形界面
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

運(yùn)行測(cè)試類SpringAmqpTest.java發(fā)送消息,雖然發(fā)送delay消息成功,但是下面報(bào)了錯(cuò)誤
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
查看consumer,5秒后接收消息成功
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

那這里為什么會(huì)報(bào)錯(cuò)呢,這是因?yàn)閐elay的交換機(jī)是將消息持有了5秒后發(fā)送的,這里其實(shí)不是報(bào)錯(cuò),而是消息暫存了5秒,過(guò)了5秒才發(fā)送到隊(duì)列,那我們能不讓它們報(bào)錯(cuò)嗎,當(dāng)然可以,我們繼續(xù)修改。
根據(jù)receivedDelay是否有值判斷是否重發(fā),有就不重發(fā)
修改publisher中的CommonConfig.java
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對(duì)象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置eturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            if (message.getMessageProperties().getReceivedDelay() > 0) {
                // 是一個(gè)延遲消息,忽略報(bào)錯(cuò)信息
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},routingKey:{},消息:{}",
                    replyCode, replyText, exchange, routingKey, message);
            // 如果有失敗的,可以進(jìn)行消息的重發(fā)

        });
    }

再此通過(guò)測(cè)試類發(fā)送消息,報(bào)錯(cuò)就沒(méi)有了
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

2.3.4.總結(jié)

延遲隊(duì)列插件的使用步驟包括哪些?

?聲明一個(gè)交換機(jī),添加delayed屬性為true

?發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間

3.惰性隊(duì)列

3.1.消息堆積問(wèn)題

當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問(wèn)題。

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
解決消息堆積有三種思路:

  • 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說(shuō)的work queue模式
  • 在消費(fèi)者內(nèi)開(kāi)啟線程池加快消息處理速度
  • 擴(kuò)大隊(duì)列容積,提高堆積上限

要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。

3.2.惰性隊(duì)列

從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

  • 接收到消息后直接存入磁盤(pán)而非內(nèi)存
  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存
  • 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
3.2.1.基于命令行設(shè)置lazy-queue

而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可??梢酝ㄟ^(guò)命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個(gè)策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字
  • '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
  • --apply-to queues :策略的作用對(duì)象,是所有的隊(duì)列
3.2.2.基于@Bean聲明lazy-queue

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
新建類LazyConfig.java

@Configuration
public class LazyConfig {
    /**
     * 惰性隊(duì)列
     *
     * @return
     */
    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable("lazy.queue").lazy().build();
    }

    /**
     * 普通隊(duì)列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal.queue").build();
    }
}

啟動(dòng)Consumer服務(wù),查看mq圖形界面
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
修改SpringAmqpTest.java

    /**
     * 測(cè)試惰性隊(duì)列
     *
     * @throws InterruptedException
     */
    @Test
    public void testSendLazyQueue() throws InterruptedException {
        for (int i = 0; i < 1000000; i++) {
            String routingKey = "lazy.queue";

            // 創(chuàng)建消息
            Message message = MessageBuilder.withBody("hello, lazy queue".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(routingKey, message);
            //log.info("發(fā)送lazy隊(duì)列消息成功!");
        }
    }

    /**
     * 測(cè)試惰性隊(duì)列
     *
     * @throws InterruptedException
     */
    @Test
    public void testSendNormalQueue() throws InterruptedException {
        for (int i = 0; i < 1000000; i++) {
            String routingKey = "normal.queue";

            // 創(chuàng)建消息
            Message message = MessageBuilder.withBody("hello, normal queue".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(routingKey, message);
            //log.info("發(fā)送normal隊(duì)列消息成功!");
        }
    }

我們看到lazy queue中,memory中沒(méi)有,直接寫(xiě)磁盤(pán),比較穩(wěn)定
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
我們看normal queue中,往內(nèi)存中寫(xiě),而且寫(xiě)一會(huì)兒就會(huì)往磁盤(pán)刷新一部分,穩(wěn)定性不好
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

3.2.3.基于@RabbitListener聲明LazyQueue

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

3.3.總結(jié)

消息堆積問(wèn)題的解決方案?

  • 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
  • 使用惰性隊(duì)列,可以再mq中保存更多消息

惰性隊(duì)列的優(yōu)點(diǎn)有哪些?

  • 基于磁盤(pán)存儲(chǔ),消息上限高
  • 沒(méi)有間歇性的page-out,性能比較穩(wěn)定

惰性隊(duì)列的缺點(diǎn)有哪些?

  • 基于磁盤(pán)存儲(chǔ),消息時(shí)效性會(huì)降低
  • 性能受限于磁盤(pán)的IO

4.MQ集群

4.1.集群分類

RabbitMQ的是基于Erlang語(yǔ)言編寫(xiě),而Erlang又是一個(gè)面向并發(fā)的語(yǔ)言,天然支持集群模式。RabbitMQ的集群有兩種模式:

?普通集群:是一種分布式集群,將隊(duì)列分散到集群的各個(gè)節(jié)點(diǎn),從而提高整個(gè)集群的并發(fā)能力。

?鏡像集群:是一種主從集群,普通集群的基礎(chǔ)上,添加了主從備份功能,提高集群的數(shù)據(jù)可用性。

鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險(xiǎn)。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁隊(duì)列來(lái)代替鏡像集群,底層采用Raft協(xié)議確保主從的數(shù)據(jù)一致性。

4.2.普通集群

4.2.1.集群結(jié)構(gòu)和特征

普通集群,或者叫標(biāo)準(zhǔn)集群(classic cluster),具備下列特征:

  • 會(huì)在集群的各個(gè)節(jié)點(diǎn)間共享部分?jǐn)?shù)據(jù),包括:交換機(jī)、隊(duì)列元信息。不包含隊(duì)列中的消息。
  • 當(dāng)訪問(wèn)集群某節(jié)點(diǎn)時(shí),如果隊(duì)列不在該節(jié)點(diǎn),會(huì)從數(shù)據(jù)所在節(jié)點(diǎn)傳遞到當(dāng)前節(jié)點(diǎn)并返回
  • 隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)丟失

結(jié)構(gòu)如圖:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

4.2.2.部署

參考課前資料:《RabbitMQ部署指南.md》

集群部署
接下來(lái),我們看看如何安裝RabbitMQ的集群。

1.集群分類

在RabbitMQ的官方文檔中,講述了兩種集群的配置方式:

  • 普通模式:普通模式集群不進(jìn)行數(shù)據(jù)同步,每個(gè)MQ都有自己的隊(duì)列、數(shù)據(jù)信息(其它元數(shù)據(jù)信息如交換機(jī)等會(huì)同步)。例如我們有2個(gè)MQ:mq1,和mq2,如果你的消息在mq1,而你連接到了mq2,那么mq2會(huì)去mq1拉取消息,然后返回給你。如果mq1宕機(jī),消息就會(huì)丟失。
  • 鏡像模式:與普通模式不同,隊(duì)列會(huì)在各個(gè)mq的鏡像節(jié)點(diǎn)之間同步,因此你連接到任何一個(gè)鏡像節(jié)點(diǎn),均可獲取到消息。而且如果一個(gè)節(jié)點(diǎn)宕機(jī),并不會(huì)導(dǎo)致數(shù)據(jù)丟失。不過(guò),這種方式增加了數(shù)據(jù)同步的帶寬消耗。

我們先來(lái)看普通模式集群,我們的計(jì)劃部署3節(jié)點(diǎn)的mq集群:

主機(jī)名 控制臺(tái)端口 amqp通信端口
mq1 8081 —> 15672 8071 —> 5672
mq2 8082 —> 15672 8072 —> 5672
mq3 8083 —> 15672 8073 —> 5672

集群中的節(jié)點(diǎn)標(biāo)示默認(rèn)都是:rabbit@[hostname],因此以上三個(gè)節(jié)點(diǎn)的名稱分別為:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3
2.獲取cookie

RabbitMQ底層依賴于Erlang,而Erlang虛擬機(jī)就是一個(gè)面向分布式的語(yǔ)言,默認(rèn)就支持集群模式。集群模式中的每個(gè)RabbitMQ 節(jié)點(diǎn)使用 cookie 來(lái)確定它們是否被允許相互通信。

要使兩個(gè)節(jié)點(diǎn)能夠通信,它們必須具有相同的共享秘密,稱為Erlang cookie。cookie 只是一串最多 255 個(gè)字符的字母數(shù)字字符。

每個(gè)集群節(jié)點(diǎn)必須具有相同的 cookie。實(shí)例之間也需要它來(lái)相互通信。

我們先在之前啟動(dòng)的mq容器中獲取一個(gè)cookie值,作為集群的cookie。執(zhí)行下面的命令:

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:

FXZMCVGLBIXZCDEMMVZQ

接下來(lái),停止并刪除當(dāng)前的mq容器,我們重新搭建集群。

docker rm -f mq

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
清理數(shù)據(jù)卷

docker volume prune
3.準(zhǔn)備集群配置

在/tmp目錄新建一個(gè)配置文件 rabbitmq.conf:

cd /tmp
# 創(chuàng)建文件
touch rabbitmq.conf

文件內(nèi)容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再創(chuàng)建一個(gè)文件,記錄cookie

cd /tmp
# 創(chuàng)建cookie文件
touch .erlang.cookie
# 寫(xiě)入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的權(quán)限
chmod 600 .erlang.cookie

準(zhǔn)備三個(gè)目錄,mq1、mq2、mq3:

cd /tmp
# 創(chuàng)建目錄
mkdir mq1 mq2 mq3

然后拷貝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

# 進(jìn)入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
4.啟動(dòng)集群

創(chuàng)建一個(gè)網(wǎng)絡(luò):

docker network create mq-net

docker volume create

運(yùn)行命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management
5.測(cè)試

在mq1這個(gè)節(jié)點(diǎn)上添加一個(gè)隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

如圖,在mq2和mq3兩個(gè)控制臺(tái)也都能看到:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

6.數(shù)據(jù)共享測(cè)試

點(diǎn)擊這個(gè)隊(duì)列,進(jìn)入管理頁(yè)面:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

然后利用控制臺(tái)發(fā)送一條消息到這個(gè)隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

結(jié)果在mq2、mq3上都能看到這條消息:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

7.可用性測(cè)試

我們讓其中一臺(tái)節(jié)點(diǎn)mq1宕機(jī):

docker stop mq1

然后登錄mq2或mq3的控制臺(tái),發(fā)現(xiàn)simple.queue也不可用了:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

說(shuō)明數(shù)據(jù)并沒(méi)有拷貝到mq2和mq3。

4.3.鏡像集群

4.3.1.集群結(jié)構(gòu)和特征

鏡像集群:本質(zhì)是主從模式,具備下面的特征:

  • 交換機(jī)、隊(duì)列、隊(duì)列中的消息會(huì)在各個(gè)mq的鏡像節(jié)點(diǎn)之間同步備份。
  • 創(chuàng)建隊(duì)列的節(jié)點(diǎn)被稱為該隊(duì)列的主節(jié)點(diǎn),備份到的其它節(jié)點(diǎn)叫做該隊(duì)列的鏡像節(jié)點(diǎn)。
  • 一個(gè)隊(duì)列的主節(jié)點(diǎn)可能是另一個(gè)隊(duì)列的鏡像節(jié)點(diǎn)
  • 所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)
  • 主宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成新的主

結(jié)構(gòu)如圖:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

4.3.2.部署

參考課前資料:《RabbitMQ部署指南.md》
鏡像模式

在剛剛的案例中,一旦創(chuàng)建隊(duì)列的主機(jī)宕機(jī),隊(duì)列就會(huì)不可用。不具備高可用能力。如果要解決這個(gè)問(wèn)題,必須使用官方提供的鏡像集群方案。

官方文檔地址:https://www.rabbitmq.com/ha.html

1.鏡像模式的特征

默認(rèn)情況下,隊(duì)列只保存在創(chuàng)建該隊(duì)列的節(jié)點(diǎn)上。而鏡像模式下,創(chuàng)建隊(duì)列的節(jié)點(diǎn)被稱為該隊(duì)列的主節(jié)點(diǎn),隊(duì)列還會(huì)拷貝到集群中的其它節(jié)點(diǎn),也叫做該隊(duì)列的鏡像節(jié)點(diǎn)。

但是,不同隊(duì)列可以在集群中的任意節(jié)點(diǎn)上創(chuàng)建,因此不同隊(duì)列的主節(jié)點(diǎn)可以不同。甚至,一個(gè)隊(duì)列的主節(jié)點(diǎn)可能是另一個(gè)隊(duì)列的鏡像節(jié)點(diǎn)。

用戶發(fā)送給隊(duì)列的一切請(qǐng)求,例如發(fā)送消息、消息回執(zhí)默認(rèn)都會(huì)在主節(jié)點(diǎn)完成,如果是從節(jié)點(diǎn)接收到請(qǐng)求,也會(huì)路由到主節(jié)點(diǎn)去完成。鏡像節(jié)點(diǎn)僅僅起到備份數(shù)據(jù)作用。

當(dāng)主節(jié)點(diǎn)接收到消費(fèi)者的ACK時(shí),所有鏡像都會(huì)刪除節(jié)點(diǎn)中的數(shù)據(jù)。

總結(jié)如下:

  • 鏡像隊(duì)列結(jié)構(gòu)是一主多從(從就是鏡像)
  • 所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)
  • 主宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成新的主(如果在主從同步完成前,主就已經(jīng)宕機(jī),可能出現(xiàn)數(shù)據(jù)丟失)
  • 不具備負(fù)載均衡功能,因?yàn)樗胁僮鞫紩?huì)有主節(jié)點(diǎn)完成(但是不同隊(duì)列,其主節(jié)點(diǎn)可以不同,可以利用這個(gè)提高吞吐量)
2.鏡像模式的配置

鏡像模式的配置有3種模式:

ha-mode ha-params 效果
準(zhǔn)確模式exactly 隊(duì)列的副本量count 集群中隊(duì)列副本(主服務(wù)器和鏡像服務(wù)器之和)的數(shù)量。count如果為1意味著單個(gè)副本:即隊(duì)列主節(jié)點(diǎn)。count值為2表示2個(gè)副本:1個(gè)隊(duì)列主和1個(gè)隊(duì)列鏡像。換句話說(shuō):count = 鏡像數(shù)量 + 1。如果群集中的節(jié)點(diǎn)數(shù)少于count,則該隊(duì)列將鏡像到所有節(jié)點(diǎn)。如果有集群總數(shù)大于count+1,并且包含鏡像的節(jié)點(diǎn)出現(xiàn)故障,則將在另一個(gè)節(jié)點(diǎn)上創(chuàng)建一個(gè)新的鏡像。
all (none) 隊(duì)列在群集中的所有節(jié)點(diǎn)之間進(jìn)行鏡像。隊(duì)列將鏡像到任何新加入的節(jié)點(diǎn)。鏡像到所有節(jié)點(diǎn)將對(duì)所有群集節(jié)點(diǎn)施加額外的壓力,包括網(wǎng)絡(luò)I / O,磁盤(pán)I / O和磁盤(pán)空間使用情況。推薦使用exactly,設(shè)置副本數(shù)為(N / 2 +1)。
nodes node names 指定隊(duì)列創(chuàng)建到哪些節(jié)點(diǎn),如果指定的節(jié)點(diǎn)全部不存在,則會(huì)出現(xiàn)異常。如果指定的節(jié)點(diǎn)在集群中存在,但是暫時(shí)不可用,會(huì)創(chuàng)建節(jié)點(diǎn)到當(dāng)前客戶端連接到的節(jié)點(diǎn)。

這里我們以rabbitmqctl命令作為案例來(lái)講解配置語(yǔ)法。

語(yǔ)法示例:

3.exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定寫(xiě)法
  • ha-two:策略名稱,自定義
  • "^two\.":匹配隊(duì)列的正則表達(dá)式,符合命名規(guī)則的隊(duì)列才生效,這里是任何以two.開(kāi)頭的隊(duì)列名稱
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略內(nèi)容
    • "ha-mode":"exactly":策略模式,此處是exactly模式,指定副本數(shù)量
    • "ha-params":2:策略參數(shù),這里是2,就是副本數(shù)量為2,1主1鏡像
    • "ha-sync-mode":"automatic":同步策略,默認(rèn)是manual,即新加入的鏡像節(jié)點(diǎn)不會(huì)同步舊的消息。如果設(shè)置為automatic,則新加入的鏡像節(jié)點(diǎn)會(huì)把主節(jié)點(diǎn)中所有消息都同步,會(huì)帶來(lái)額外的網(wǎng)絡(luò)開(kāi)銷
4.模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名稱,自定義
  • "^all\.":匹配所有以all.開(kāi)頭的隊(duì)列名
  • '{"ha-mode":"all"}':策略內(nèi)容
    • "ha-mode":"all":策略模式,此處是all模式,即所有節(jié)點(diǎn)都會(huì)稱為鏡像節(jié)點(diǎn)
5.nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定寫(xiě)法
  • ha-nodes:策略名稱,自定義
  • "^nodes\.":匹配隊(duì)列的正則表達(dá)式,符合命名規(guī)則的隊(duì)列才生效,這里是任何以nodes.開(kāi)頭的隊(duì)列名稱
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略內(nèi)容
    • "ha-mode":"nodes":策略模式,此處是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略參數(shù),這里指定副本所在節(jié)點(diǎn)名稱
6.測(cè)試

我們使用exactly模式的鏡像,因?yàn)榧汗?jié)點(diǎn)數(shù)量為3,因此鏡像數(shù)量就設(shè)置為2.

運(yùn)行下面的命令:

docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

下面,我們創(chuàng)建一個(gè)新的隊(duì)列:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

在任意一個(gè)mq控制臺(tái)查看隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

7.測(cè)試數(shù)據(jù)共享

給two.queue發(fā)送一條消息:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

然后在mq1、mq2、mq3的任意控制臺(tái)查看消息:

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

8.測(cè)試高可用

現(xiàn)在,我們讓two.queue的主節(jié)點(diǎn)mq1宕機(jī):

docker stop mq1

查看集群狀態(tài):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

查看隊(duì)列狀態(tài):
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

發(fā)現(xiàn)依然是健康的!并且其主節(jié)點(diǎn)切換到了rabbit@mq2上

5.仲裁隊(duì)列

從RabbitMQ 3.8版本開(kāi)始,引入了新的仲裁隊(duì)列,他具備與鏡像隊(duì)里類似的功能,但使用更加方便。

4.4.仲裁隊(duì)列

4.4.1.集群特征

仲裁隊(duì)列:仲裁隊(duì)列是3.8版本以后才有的新功能,用來(lái)替代鏡像隊(duì)列,具備下列特征:

  • 與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步
  • 使用非常簡(jiǎn)單,沒(méi)有復(fù)雜的配置
  • 主從同步基于Raft協(xié)議,強(qiáng)一致
4.4.2.部署

參考課前資料:《RabbitMQ部署指南.md》

仲裁隊(duì)列
從RabbitMQ 3.8版本開(kāi)始,引入了新的仲裁隊(duì)列,他具備與鏡像隊(duì)里類似的功能,但使用更加方便。

1.添加仲裁隊(duì)列

在任意控制臺(tái)添加一個(gè)隊(duì)列,一定要選擇隊(duì)列類型為Quorum類型。
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
在任意控制臺(tái)查看隊(duì)列:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

可以看到,仲裁隊(duì)列的 + 2字樣。代表這個(gè)隊(duì)列有2個(gè)鏡像節(jié)點(diǎn)。

因?yàn)橹俨藐?duì)列默認(rèn)的鏡像數(shù)為5。如果你的集群有7個(gè)節(jié)點(diǎn),那么鏡像數(shù)肯定是5;而我們集群只有3個(gè)節(jié)點(diǎn),因此鏡像數(shù)量就是3.

2.測(cè)試

可以參考對(duì)鏡像集群的測(cè)試,效果是一樣的。

3.集群擴(kuò)容
4.加入集群

1)啟動(dòng)一個(gè)新的MQ容器:

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)進(jìn)入容器控制臺(tái):

docker exec -it mq4 bash

3)停止mq進(jìn)程

rabbitmqctl stop_app

4)重置RabbitMQ中的數(shù)據(jù):

rabbitmqctl reset

5)加入mq1:

rabbitmqctl join_cluster rabbit@mq1

6)再次啟動(dòng)mq進(jìn)程

rabbitmqctl start_app

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

5.增加仲裁隊(duì)列副本

我們先查看下quorum.queue這個(gè)隊(duì)列目前的副本情況,進(jìn)入mq1容器:

docker exec -it mq1 bash

執(zhí)行命令:

rabbitmq-queues quorum_status "quorum.queue"

結(jié)果:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
現(xiàn)在,我們讓mq4也加入進(jìn)來(lái):

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

結(jié)果:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

再次查看:

rabbitmq-queues quorum_status "quorum.queue"

(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式
查看控制臺(tái),發(fā)現(xiàn)quorum.queue的鏡像數(shù)量也從原來(lái)的 +2 變成了 +3:
(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,SpringCloud,java,spring cloud,分布式

4.4.3.Java代碼創(chuàng)建仲裁隊(duì)列
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁隊(duì)列
        .build();
}
4.4.4.SpringAMQP連接MQ集群

注意,這里用address來(lái)代替host、port方式文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-841411.html

spring:
  rabbitmq:
    addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
    username: itcast
    password: 123321
    virtual-host: /

到了這里,關(guān)于(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 黑馬Redis視頻教程高級(jí)篇(安裝OpenResty)

    黑馬Redis視頻教程高級(jí)篇(安裝OpenResty)

    目錄 一、安裝 1.1、安裝開(kāi)發(fā)庫(kù) 1.2、安裝OpenResty倉(cāng)庫(kù) 1.3、安裝OpenResty 1.4、安裝opm工具 1.5、目錄結(jié)構(gòu) 1.6、配置nginx的環(huán)境變量 二、啟動(dòng)和運(yùn)行 三、備注 首先你的Linux虛擬機(jī)必須聯(lián)網(wǎng)。 首先要安裝OpenResty的依賴開(kāi)發(fā)庫(kù),執(zhí)行命令: 你可以在你的 CentOS 系統(tǒng)中添加 openresty 倉(cāng)庫(kù)

    2024年02月08日
    瀏覽(23)
  • 黑馬Redis視頻教程高級(jí)篇(二:多級(jí)緩存)

    黑馬Redis視頻教程高級(jí)篇(二:多級(jí)緩存)

    目錄 一、什么是多級(jí)緩存? 二、JVM進(jìn)程緩存 2.1、導(dǎo)入案例 2.2、初識(shí)Caffeine 2.3、實(shí)現(xiàn)JVM進(jìn)程緩存 2.3.1、需求 2.3.2、實(shí)現(xiàn) 三、Lua語(yǔ)法入門(mén) 3.1、初識(shí)Lua 3.2、HelloWord 3.3、變量和循環(huán) 3.3.1、Lua的數(shù)據(jù)類型 3.3.2、聲明變量 3.3.3、循環(huán) 3.4、條件控制、函數(shù) 3.4.1、函數(shù) 3.4.2、條件控制

    2024年02月09日
    瀏覽(20)
  • 黑馬Redis視頻教程高級(jí)篇(一:分布式緩存)

    黑馬Redis視頻教程高級(jí)篇(一:分布式緩存)

    目錄 分布式緩存 一、Redis持久化 1.1、RDB持久化 1.1.1、執(zhí)行時(shí)機(jī) 1.1.2、RDB原理 1.1.3、小結(jié) 1.2、AOF持久化 1.2.1、AOF原理 1.2.2、AOF配置 1.2.3、AOF文件重寫(xiě) 1.3、RDB與AOF對(duì)比 二、Redis主從 2.1、搭建主從架構(gòu) 2.1.1、集群結(jié)構(gòu) 2.1.2、準(zhǔn)備實(shí)例和配置 2.1.3、啟動(dòng) 2.1.4、開(kāi)啟主從關(guān)系 2.1.5、

    2024年02月08日
    瀏覽(19)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 實(shí)現(xiàn)數(shù)據(jù)監(jiān)聽(tīng)

    SpringCloud 整合 Canal+RabbitMQ+Redis 實(shí)現(xiàn)數(shù)據(jù)監(jiān)聽(tīng)

    Canal 指的是阿里巴巴開(kāi)源的數(shù)據(jù)同步工具,用于數(shù)據(jù)庫(kù)的實(shí)時(shí)增量數(shù)據(jù)訂閱和消費(fèi)。它可以針對(duì) MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的異構(gòu)數(shù)據(jù)同步等情況進(jìn)行實(shí)時(shí)增量數(shù)據(jù)同步。 當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步數(shù)據(jù)庫(kù)

    2024年02月03日
    瀏覽(19)
  • 【Java筆記+踩坑匯總】Java基礎(chǔ)+進(jìn)階+JavaWeb+SSM+SpringBoot+瑞吉外賣+SpringCloud+黑馬旅游+谷粒商城+學(xué)成在線+MySQL高級(jí)篇+設(shè)計(jì)模式+常見(jiàn)面試題+源碼

    本文是“Java學(xué)習(xí)路線”專欄的導(dǎo)航文章,目標(biāo)是為Java工程師提供一套 完整的Java學(xué)習(xí)路線 。 目錄 0.摘要/資料/代碼整理 1.Java基礎(chǔ)+進(jìn)階 2.MySQL,JavaWeb,Mybatis,前端 3.Git 4.SSM(Spring,SpringMVC,Mybatis)框架 5.Maven高級(jí) 6.Springboot,MybatisPlus,JPA框架 7.瑞吉外賣、Redis、Nginx、Linux、mysql主從復(fù)制

    2024年02月06日
    瀏覽(51)
  • 【Java筆記+踩坑匯總】Java基礎(chǔ)+進(jìn)階+JavaWeb+SSM+SpringBoot+瑞吉外賣+SpringCloud+黑馬旅游+谷粒商城+學(xué)成在線+MySQL高級(jí)篇+設(shè)計(jì)模式+面試題匯總+源碼

    本文是“Java學(xué)習(xí)路線”專欄的導(dǎo)航文章,目標(biāo)是為Java工程師提供一套 完整的Java學(xué)習(xí)路線 。 目錄 0.摘要/資料/代碼整理 1.Java基礎(chǔ)+進(jìn)階 2.MySQL,JavaWeb,Mybatis,前端 3.Git 4.SSM(Spring,SpringMVC,Mybatis)框架 5.Maven高級(jí) 6.Springboot,MybatisPlus,JPA框架 7.瑞吉外賣、Redis、Nginx、Linux、mysql主從復(fù)制

    2024年02月16日
    瀏覽(1339)
  • Docker高級(jí)——Docker部署RabbitMQ(單機(jī),集群,仲裁隊(duì)列)

    Docker高級(jí)——Docker部署RabbitMQ(單機(jī),集群,仲裁隊(duì)列)

    我們?cè)贑entos7虛擬機(jī)中使用Docker來(lái)安裝。 方式一:在線拉取 方式二:從本地加載 在課前資料已經(jīng)提供了鏡像包: 上傳到虛擬機(jī)中后,使用命令加載鏡像即可: 執(zhí)行下面的命令來(lái)運(yùn)行MQ容器: 接下來(lái),我們看看如何安裝RabbitMQ的集群。 在RabbitMQ的官方文檔中,講述了兩種集群

    2024年02月16日
    瀏覽(23)
  • Ubuntu 20.04上docker安裝RabbitMQ并確保可以訪問(wèn)RabbitMQ的管理界面

    這將下載RabbitMQ 3.x版本的Docker鏡像并在后臺(tái)運(yùn)行一個(gè)容器。 -p 5672:5672 映射了RabbitMQ的AMQP端口, -p 15672:15672 映射了管理界面的端口 如果狀態(tài)是\\\"Up\\\",則容器已經(jīng)成功啟動(dòng)。 默認(rèn)的用戶名和密碼是: 用戶名: guest 密碼: guest 請(qǐng)確保您的防火墻允許通過(guò) 15672 端口訪問(wèn)。如果您的

    2024年02月09日
    瀏覽(21)
  • 15年大牛用140多個(gè)實(shí)戰(zhàn)案例深入講解Java微服務(wù)架構(gòu)實(shí)戰(zhàn):SpringBoot +SpringCloud +Docker +RabbitMQ

    15年大牛用140多個(gè)實(shí)戰(zhàn)案例深入講解Java微服務(wù)架構(gòu)實(shí)戰(zhàn):SpringBoot +SpringCloud +Docker +RabbitMQ

    第一部分,springboot篇; 第1章SpringBoot編程起步; 1.SpringBoot提倡的是一種簡(jiǎn)潔的開(kāi)發(fā)模式,可保證用戶不被大量的配置文件和依賴關(guān)系所困擾。 2.SpringBoot開(kāi)發(fā)需要Maven或 Gradle構(gòu)建工具支持。 3.SpringBoot使用一系列的注解來(lái)簡(jiǎn)化開(kāi)發(fā)過(guò)程。 第2章SpringBoot程序開(kāi)發(fā); 1. SpringBoot的依賴

    2024年04月09日
    瀏覽(16)
  • Ubuntu 20.04上docker安裝Redis

    這將從Docker Hub上下載Redis官方鏡像。 這將在后臺(tái)運(yùn)行Redis容器,并將容器內(nèi)的6379端口映射到主機(jī)的6379端口。您可以將my-redis替換為您自己的容器名稱。 您應(yīng)該能夠看到Redis容器的信息。 請(qǐng)注意,上述示例中的容器名稱是my-redis,您可以根據(jù)需要替換為其他名稱。

    2024年02月09日
    瀏覽(29)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包