[此文檔是在心向陽(yáng)光的天域的博客加了一些有助于自己的知識(shí)體系,也歡迎大家關(guān)注這個(gè)大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) |
[是這個(gè)視頻](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5&spm_id_from=pageDriver&vd_source=9beb0a2f0cec6f01c2433a881b54152c) |
= = = = = = = = = = = = = = = 微服務(wù)技術(shù)——可靠性消息服務(wù) = = = = = = = = = = = = = = =
今日目標(biāo)
服務(wù)異步通信-高級(jí)篇
消息隊(duì)列在使用過(guò)程中,面臨著很多實(shí)際問(wèn)題需要思考:
1.消息可靠性
消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過(guò)程:
其中的每一步都可能導(dǎo)致消息丟失,常見(jiàn)的丟失原因包括:
- 發(fā)送時(shí)丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費(fèi)就宕機(jī)
針對(duì)這些問(wèn)題,RabbitMQ分別給出了解決方案:
- 生產(chǎn)者確認(rèn)機(jī)制
- mq持久化
- 消費(fèi)者確認(rèn)機(jī)制
- 失敗重試機(jī)制
下面我們就通過(guò)案例來(lái)演示每一個(gè)步驟。
首先,導(dǎo)入課前資料提供的demo工程:
項(xiàng)目結(jié)構(gòu)如下:
用docker啟動(dòng)即可
docker start mq
要?jiǎng)?chuàng)建一個(gè)隊(duì)列起名simple.queue
然后在交換機(jī)中把a(bǔ)mq.topic交換機(jī),和上面創(chuàng)建的隊(duì)列simple.queue綁定,我們手動(dòng)配置
進(jìn)入amq.topic交換機(jī)后,綁定隊(duì)列
綁定后如圖:
1.1.生產(chǎn)者消息確認(rèn)
RabbitMQ提供了publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過(guò)程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒(méi)有路由到隊(duì)列。返回ACK,及路由失敗原因。
注意:
1.1.1.修改配置
首先,修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
說(shuō)明:
-
publish-confirm-type
:開(kāi)啟publisher-confirm,這里支持兩種類型:-
simple
:同步等待confirm結(jié)果,直到超時(shí) -
correlated
?:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
-
-
publish-returns
:開(kāi)啟publish-return功能,同樣是基于callback機(jī)制,不過(guò)是定義ReturnCallback -
template.mandatory
:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
1.1.2.定義Return回調(diào)
每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)配置:
修改publisher服務(wù),添加一個(gè):
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業(yè)務(wù)需要,可以重發(fā)消息
});
}
}
1.1.3.定義ConfirmCallback
ConfirmCallback可以在發(fā)送消息時(shí)指定,因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。
在publisher服務(wù)的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息體
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失敗
log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一會(huì)兒,等待ack回執(zhí)
Thread.sleep(2000);
}
全部配置完后,運(yùn)行測(cè)試類SpringAmqpTest.java,這說(shuō)明消息發(fā)送成功
然后呢,我們來(lái)一個(gè)消息發(fā)送失敗的情況,我們故意填錯(cuò)交換機(jī)的名字
調(diào)用后,后臺(tái)打印日志如下:
然后我們嘗試填錯(cuò),routingKey看一下
報(bào)錯(cuò)信息如下:
之后我們恢復(fù)代碼,都保證正確即可
總結(jié):
SpringAMQP中處理消息確認(rèn)的幾種情況:
● publisher-comfirm:
- 消息成功發(fā)送到exchange,返回ack
- 消息發(fā)送失敗,沒(méi)有到達(dá)交換機(jī),返回nack
- 消息發(fā)送過(guò)程中出現(xiàn)異常,沒(méi)有收到回執(zhí)
● 消息成功發(fā)送到exchange, 但沒(méi)有路由到queue,
- 調(diào)用ReturnCallback
1.2.消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開(kāi)啟消息持久化機(jī)制。
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
1.2.1.交換機(jī)持久化
RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。
我們通過(guò)命令
重啟mq
docker restart mq
然后查看隊(duì)列、交換機(jī)的情況,比如我們創(chuàng)建的是持久化隊(duì)列
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒(méi)有queue與其綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("simple.direct", true, false);
}
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)都會(huì)帶上D
的標(biāo)示:
1.2.2.隊(duì)列持久化
RabbitMQ中隊(duì)列默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過(guò)代碼指定交換機(jī)持久化:
我們可以先去mq圖形化界面把simple.queue刪除
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的隊(duì)列都會(huì)帶上D
的標(biāo)示:
這些做完后,我們啟動(dòng)ConsumerApplication.java,然后查看mq的圖形化界面
交換機(jī)是持久的
隊(duì)列是持久的
1.2.3.消息持久化
首先把consumer服務(wù)停了,不要消費(fèi)我們的消息
我們?cè)趍q的圖形化界面,點(diǎn)擊simple.queue隊(duì)列,然后編輯消息,點(diǎn)擊發(fā)送
查看有1條消息
然后我們重啟docker中的mq
docker restart mq
然后再回來(lái)看mq的圖形化界面,發(fā)現(xiàn)隊(duì)列還在,但是消息沒(méi)了
利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代碼指定:
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
運(yùn)行測(cè)試類SpringAmqpTest.java之后,查看mq的圖形化界面
查看一下具體消息
然后我們重啟一下docker的mq容器
docker restart mq
注意:AMQP中創(chuàng)建的交換機(jī)、隊(duì)列、消息默認(rèn)都是持久的
交換機(jī):
隊(duì)列:
消息:
1.3.消費(fèi)者消息確認(rèn)
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。
而RabbitMQ是通過(guò)消費(fèi)者回執(zhí)來(lái)確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場(chǎng)景:
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。
而SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto?:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒(méi)有異常則返回ack;拋出異常則返回nack。
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除
由此可知:
- none模式下,消息投遞是不可靠的,可能丟失
- auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒(méi)有異常,返回ack
- manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack
一般,我們都是使用默認(rèn)的auto即可。
1.3.1.演示none模式
修改consumer服務(wù)的application.yml文件,添加下面內(nèi)容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關(guān)閉ack
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:
修改SpringRabbitListener.java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);
// 模擬異常
System.out.println(1 / 0);
log.debug("消息處理完成!");
}
測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。
dubug啟動(dòng)Consumer
發(fā)現(xiàn)消息還沒(méi)接收呢,直接就沒(méi)了
也就是說(shuō),消費(fèi)者雖然接收到了消息,但是假如消費(fèi)者還沒(méi)有讀取,發(fā)生了報(bào)錯(cuò)或者宕機(jī),這個(gè)消息就會(huì)丟失
1.3.2.演示auto模式
再次把確認(rèn)機(jī)制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關(guān)閉ack
我們?nèi)q的圖形化界面創(chuàng)建消息
發(fā)送后,我們看到圖形化界面中有1條消息
IDEA后臺(tái)因?yàn)槲覀冋J(rèn)為寫(xiě)了1/0的錯(cuò)誤算數(shù)運(yùn)算,導(dǎo)致IDEA不停重發(fā)請(qǐng)求重試消息的推送,這顯然也不符合我們的要求
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒(méi)有被RabbitMQ刪除:
1.4.消費(fèi)失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:
怎么辦呢?
1.4.1.本地重試
我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000 # 初始的失敗等待時(shí)長(zhǎng)為1秒
multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 4 # 最大重試次數(shù)
stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
修改SpringRabbitListener.java
修改為日志打印的形式
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消費(fèi)者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);
log.info("消費(fèi)者處理消息成功!");
}
重啟consumer服務(wù),重復(fù)之前的測(cè)試。可以發(fā)現(xiàn):
- 在重試4次后,SpringAMQP會(huì)拋出異常
AmqpRejectAndDontRequeueException,說(shuō)明本地重試觸發(fā)了
- 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是ack,mq刪除消息了
結(jié)論:
- 開(kāi)啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄
1.4.2.失敗策略
在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。
在開(kāi)啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來(lái)處理,它包含三種不同的實(shí)現(xiàn):
-
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
-
ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
-
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)?
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門(mén)存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代碼:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
以上配置完之后,我們?cè)僦貜?fù)步驟發(fā)送消息
發(fā)送后我們看到失敗交換機(jī)有了
隊(duì)列也有了
看一下IDEA的后臺(tái)
看一下error.queue中的消息,很清晰把錯(cuò)誤棧都輸出了
1.5.總結(jié)
如何確保RabbitMQ消息的可靠性?
- 開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開(kāi)啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開(kāi)啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
2.死信交換機(jī)
2.1.初識(shí)死信交換機(jī)
2.1.1.什么是死信交換機(jī)
什么是死信?
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):
- 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)
- 要投遞的隊(duì)列消息滿了,無(wú)法投遞
如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange
屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,檢查DLX)。
如圖,一個(gè)消息被消費(fèi)者拒絕了,變成了死信:
因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī):
如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列:
另外,隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey
這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。
2.1.2.利用死信交換機(jī)接收死信(拓展)
在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會(huì)在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄。
我們可以給simple.queue添加一個(gè)死信交換機(jī),給死信交換機(jī)綁定一個(gè)隊(duì)列。這樣消息變成死信后也不會(huì)丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊(duì)列。
我們?cè)赾onsumer服務(wù)中,定義一組死信交換機(jī)、死信隊(duì)列:
// 聲明普通的 simple.queue隊(duì)列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲(chǔ)死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
2.1.3.總結(jié)
什么樣的消息會(huì)成為死信?
- 消息被消費(fèi)者reject或者返回nack
- 消息超時(shí)未消費(fèi)
- 隊(duì)列滿了
死信交換機(jī)的使用場(chǎng)景是什么?
- 如果隊(duì)列綁定了死信交換機(jī),死信會(huì)投遞到死信交換機(jī);
- 可以利用死信交換機(jī)收集所有消費(fèi)者處理失敗的消息(死信),交由人工處理,進(jìn)一步提高消息隊(duì)列的可靠性。
2.2.TTL
一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變?yōu)樗佬?,超時(shí)分為兩種情況:
- 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
- 消息本身設(shè)置了超時(shí)時(shí)間
2.2.1.接收超時(shí)死信的死信交換機(jī)
在consumer服務(wù)的SpringRabbitListener中,定義一個(gè)新的消費(fèi)者,并且聲明 死信交換機(jī)、死信隊(duì)列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
2.2.2.聲明一個(gè)隊(duì)列,并且指定TTL
要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
.ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)
.build();
}
注意,這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.direct
聲明交換機(jī),將ttl與交換機(jī)綁定:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
發(fā)送消息,但是不要指定TTL:
@Test
public void testTTLQueue() {
// 創(chuàng)建消息
String message = "hello, ttl queue";
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 記錄日志
log.debug("發(fā)送消息成功");
}
發(fā)送消息的日志:
查看下接收消息的日志:
因?yàn)殛?duì)列的TTL值是10000ms,也就是10秒??梢钥吹较l(fā)送與接收之間的時(shí)差剛好是10秒。
2.2.3.發(fā)送消息時(shí),設(shè)定TTL
在發(fā)送消息時(shí),也可以指定TTL:
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("發(fā)送消息成功");
}
查看發(fā)送消息日志:
接收消息日志:
這次,發(fā)送與接收的延遲只有5秒。說(shuō)明當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信。
2.2.4.總結(jié)
消息超時(shí)的兩種方式是?
- 給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過(guò)ttl時(shí)間的消息變?yōu)樗佬?/li>
- 給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過(guò)ttl時(shí)間后變?yōu)樗佬?/li>
如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?
- 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)
- 將消費(fèi)者監(jiān)聽(tīng)的隊(duì)列綁定到死信交換機(jī)
- 發(fā)送消息時(shí)給消息設(shè)置超時(shí)時(shí)間為20秒
2.3.延遲隊(duì)列
利用TTL結(jié)合死信交換機(jī),我們實(shí)現(xiàn)了消息發(fā)出后,消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。
延遲隊(duì)列的使用場(chǎng)景包括:
- 延遲發(fā)送短信
- 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動(dòng)取消
- 預(yù)約工作會(huì)議,20分鐘后自動(dòng)通知所有參會(huì)人員
因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。
這個(gè)插件就是DelayExchange插件。參考RabbitMQ的插件列表頁(yè)面:https://www.rabbitmq.com/community-plugins.html
使用方式可以參考官網(wǎng)地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
2.3.1.安裝DelayExchange插件
參考課前資料:
1.安裝DelayExchange插件
官方的安裝指南地址為:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
上述文檔是基于linux原生安裝RabbitMQ,然后安裝插件。
因?yàn)槲覀冎笆腔贒ocker安裝RabbitMQ,所以下面我們會(huì)講解基于Docker來(lái)安裝RabbitMQ插件。
2.下載插件
RabbitMQ有一個(gè)官方的插件社區(qū),地址為:https://www.rabbitmq.com/community-plugins.html
其中包含各種各樣的插件,包括我們要使用的DelayExchange插件:
大家可以去對(duì)應(yīng)的GitHub頁(yè)面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9這個(gè)對(duì)應(yīng)RabbitMQ的3.8.5以上版本。
課前資料也提供了下載好的插件:
3.上傳插件
因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。如果不是基于Docker的同學(xué),請(qǐng)參考第一章部分,重新創(chuàng)建Docker容器。
我們之前設(shè)定的RabbitMQ的數(shù)據(jù)卷名稱為mq-plugins
,所以我們使用下面命令查看數(shù)據(jù)卷:
docker volume inspect mq-plugins
可以得到下面結(jié)果:
接下來(lái),將插件上傳到這個(gè)目錄即可:
4.安裝插件
最后就是安裝了,需要進(jìn)入MQ容器內(nèi)部來(lái)執(zhí)行安裝。我的容器名為mq
,所以執(zhí)行下面命令:
docker exec -it mq bash
執(zhí)行時(shí),請(qǐng)將其中的 -it
后面的mq
替換為你自己的容器名.
進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開(kāi)啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
結(jié)果如下:
2.3.2.DelayExchange原理
DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:
- 接收消息
- 判斷消息是否具備x-delay屬性
- 如果有x-delay屬性,說(shuō)明是延遲消息,持久化到硬盤(pán),讀取x-delay值,作為延遲時(shí)間
- 返回routing not found結(jié)果給消息發(fā)送者
- x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列
2.3.3.使用DelayExchange
插件的使用也非常簡(jiǎn)單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。
1)聲明DelayExchange交換機(jī)
基于注解方式(推薦):
SpringRabbitListener.java
/**
* 延遲交換機(jī)和隊(duì)列
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String message) {
log.info("消費(fèi)者接收到了delay.queue的消息" + message);
}
也可以基于@Bean的方式:
2)發(fā)送消息
發(fā)送消息時(shí),一定要攜帶x-delay屬性,指定延遲的時(shí)間:
SpringAmqpTest.java
/**
* 發(fā)送延遲消息
*
* @throws InterruptedException
*/
@Test
public void testSendDealyMessage() throws InterruptedException {
String routingKey = "delay";
// 創(chuàng)建消息
Message message = MessageBuilder.withBody("hello, delay message !".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// 準(zhǔn)備CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct", routingKey, message, correlationData);
}
log.info("發(fā)送delay消息成功!");
看一下mq的圖形界面
運(yùn)行測(cè)試類SpringAmqpTest.java發(fā)送消息,雖然發(fā)送delay消息成功,但是下面報(bào)了錯(cuò)誤
查看consumer,5秒后接收消息成功
那這里為什么會(huì)報(bào)錯(cuò)呢,這是因?yàn)閐elay的交換機(jī)是將消息持有了5秒后發(fā)送的,這里其實(shí)不是報(bào)錯(cuò),而是消息暫存了5秒,過(guò)了5秒才發(fā)送到隊(duì)列,那我們能不讓它們報(bào)錯(cuò)嗎,當(dāng)然可以,我們繼續(xù)修改。
根據(jù)receivedDelay
是否有值判斷是否重發(fā),有就不重發(fā)
修改publisher中的CommonConfig.java
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對(duì)象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置eturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
if (message.getMessageProperties().getReceivedDelay() > 0) {
// 是一個(gè)延遲消息,忽略報(bào)錯(cuò)信息
return;
}
// 記錄日志
log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},routingKey:{},消息:{}",
replyCode, replyText, exchange, routingKey, message);
// 如果有失敗的,可以進(jìn)行消息的重發(fā)
});
}
再此通過(guò)測(cè)試類發(fā)送消息,報(bào)錯(cuò)就沒(méi)有了
2.3.4.總結(jié)
延遲隊(duì)列插件的使用步驟包括哪些?
?聲明一個(gè)交換機(jī),添加delayed屬性為true
?發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間
3.惰性隊(duì)列
3.1.消息堆積問(wèn)題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問(wèn)題。
解決消息堆積有三種思路:
- 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說(shuō)的work queue模式
- 在消費(fèi)者內(nèi)開(kāi)啟線程池加快消息處理速度
- 擴(kuò)大隊(duì)列容積,提高堆積上限
要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。
3.2.惰性隊(duì)列
從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤(pán)而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
3.2.1.基于命令行設(shè)置lazy-queue
而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可??梢酝ㄟ^(guò)命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一個(gè)策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表達(dá)式匹配隊(duì)列的名字 -
'{"queue-mode":"lazy"}'
:設(shè)置隊(duì)列模式為lazy模式 -
--apply-to queues
:策略的作用對(duì)象,是所有的隊(duì)列
3.2.2.基于@Bean聲明lazy-queue
新建類LazyConfig.java
@Configuration
public class LazyConfig {
/**
* 惰性隊(duì)列
*
* @return
*/
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue").lazy().build();
}
/**
* 普通隊(duì)列
*
* @return
*/
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue").build();
}
}
啟動(dòng)Consumer服務(wù),查看mq圖形界面
修改SpringAmqpTest.java
/**
* 測(cè)試惰性隊(duì)列
*
* @throws InterruptedException
*/
@Test
public void testSendLazyQueue() throws InterruptedException {
for (int i = 0; i < 1000000; i++) {
String routingKey = "lazy.queue";
// 創(chuàng)建消息
Message message = MessageBuilder.withBody("hello, lazy queue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(routingKey, message);
//log.info("發(fā)送lazy隊(duì)列消息成功!");
}
}
/**
* 測(cè)試惰性隊(duì)列
*
* @throws InterruptedException
*/
@Test
public void testSendNormalQueue() throws InterruptedException {
for (int i = 0; i < 1000000; i++) {
String routingKey = "normal.queue";
// 創(chuàng)建消息
Message message = MessageBuilder.withBody("hello, normal queue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(routingKey, message);
//log.info("發(fā)送normal隊(duì)列消息成功!");
}
}
我們看到lazy queue中,memory中沒(méi)有,直接寫(xiě)磁盤(pán),比較穩(wěn)定
我們看normal queue中,往內(nèi)存中寫(xiě),而且寫(xiě)一會(huì)兒就會(huì)往磁盤(pán)刷新一部分,穩(wěn)定性不好
3.2.3.基于@RabbitListener聲明LazyQueue
3.3.總結(jié)
消息堆積問(wèn)題的解決方案?
- 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
- 使用惰性隊(duì)列,可以再mq中保存更多消息
惰性隊(duì)列的優(yōu)點(diǎn)有哪些?
- 基于磁盤(pán)存儲(chǔ),消息上限高
- 沒(méi)有間歇性的page-out,性能比較穩(wěn)定
惰性隊(duì)列的缺點(diǎn)有哪些?
- 基于磁盤(pán)存儲(chǔ),消息時(shí)效性會(huì)降低
- 性能受限于磁盤(pán)的IO
4.MQ集群
4.1.集群分類
RabbitMQ的是基于Erlang語(yǔ)言編寫(xiě),而Erlang又是一個(gè)面向并發(fā)的語(yǔ)言,天然支持集群模式。RabbitMQ的集群有兩種模式:
?普通集群:是一種分布式集群,將隊(duì)列分散到集群的各個(gè)節(jié)點(diǎn),從而提高整個(gè)集群的并發(fā)能力。
?鏡像集群:是一種主從集群,普通集群的基礎(chǔ)上,添加了主從備份功能,提高集群的數(shù)據(jù)可用性。
鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險(xiǎn)。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁隊(duì)列來(lái)代替鏡像集群,底層采用Raft協(xié)議確保主從的數(shù)據(jù)一致性。
4.2.普通集群
4.2.1.集群結(jié)構(gòu)和特征
普通集群,或者叫標(biāo)準(zhǔn)集群(classic cluster),具備下列特征:
- 會(huì)在集群的各個(gè)節(jié)點(diǎn)間共享部分?jǐn)?shù)據(jù),包括:交換機(jī)、隊(duì)列元信息。不包含隊(duì)列中的消息。
- 當(dāng)訪問(wèn)集群某節(jié)點(diǎn)時(shí),如果隊(duì)列不在該節(jié)點(diǎn),會(huì)從數(shù)據(jù)所在節(jié)點(diǎn)傳遞到當(dāng)前節(jié)點(diǎn)并返回
- 隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)丟失
結(jié)構(gòu)如圖:
4.2.2.部署
參考課前資料:《RabbitMQ部署指南.md》
集群部署
接下來(lái),我們看看如何安裝RabbitMQ的集群。
1.集群分類
在RabbitMQ的官方文檔中,講述了兩種集群的配置方式:
- 普通模式:普通模式集群不進(jìn)行數(shù)據(jù)同步,每個(gè)MQ都有自己的隊(duì)列、數(shù)據(jù)信息(其它元數(shù)據(jù)信息如交換機(jī)等會(huì)同步)。例如我們有2個(gè)MQ:mq1,和mq2,如果你的消息在mq1,而你連接到了mq2,那么mq2會(huì)去mq1拉取消息,然后返回給你。如果mq1宕機(jī),消息就會(huì)丟失。
- 鏡像模式:與普通模式不同,隊(duì)列會(huì)在各個(gè)mq的鏡像節(jié)點(diǎn)之間同步,因此你連接到任何一個(gè)鏡像節(jié)點(diǎn),均可獲取到消息。而且如果一個(gè)節(jié)點(diǎn)宕機(jī),并不會(huì)導(dǎo)致數(shù)據(jù)丟失。不過(guò),這種方式增加了數(shù)據(jù)同步的帶寬消耗。
我們先來(lái)看普通模式集群,我們的計(jì)劃部署3節(jié)點(diǎn)的mq集群:
主機(jī)名 | 控制臺(tái)端口 | amqp通信端口 |
---|---|---|
mq1 | 8081 —> 15672 | 8071 —> 5672 |
mq2 | 8082 —> 15672 | 8072 —> 5672 |
mq3 | 8083 —> 15672 | 8073 —> 5672 |
集群中的節(jié)點(diǎn)標(biāo)示默認(rèn)都是:rabbit@[hostname]
,因此以上三個(gè)節(jié)點(diǎn)的名稱分別為:
- rabbit@mq1
- rabbit@mq2
- rabbit@mq3
2.獲取cookie
RabbitMQ底層依賴于Erlang,而Erlang虛擬機(jī)就是一個(gè)面向分布式的語(yǔ)言,默認(rèn)就支持集群模式。集群模式中的每個(gè)RabbitMQ 節(jié)點(diǎn)使用 cookie 來(lái)確定它們是否被允許相互通信。
要使兩個(gè)節(jié)點(diǎn)能夠通信,它們必須具有相同的共享秘密,稱為Erlang cookie。cookie 只是一串最多 255 個(gè)字符的字母數(shù)字字符。
每個(gè)集群節(jié)點(diǎn)必須具有相同的 cookie。實(shí)例之間也需要它來(lái)相互通信。
我們先在之前啟動(dòng)的mq容器中獲取一個(gè)cookie值,作為集群的cookie。執(zhí)行下面的命令:
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
可以看到cookie值如下:
FXZMCVGLBIXZCDEMMVZQ
接下來(lái),停止并刪除當(dāng)前的mq容器,我們重新搭建集群。
docker rm -f mq
清理數(shù)據(jù)卷
docker volume prune
3.準(zhǔn)備集群配置
在/tmp目錄新建一個(gè)配置文件 rabbitmq.conf:
cd /tmp
# 創(chuàng)建文件
touch rabbitmq.conf
文件內(nèi)容如下:
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
再創(chuàng)建一個(gè)文件,記錄cookie
cd /tmp
# 創(chuàng)建cookie文件
touch .erlang.cookie
# 寫(xiě)入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的權(quán)限
chmod 600 .erlang.cookie
準(zhǔn)備三個(gè)目錄,mq1、mq2、mq3:
cd /tmp
# 創(chuàng)建目錄
mkdir mq1 mq2 mq3
然后拷貝rabbitmq.conf、cookie文件到mq1、mq2、mq3:
# 進(jìn)入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
4.啟動(dòng)集群
創(chuàng)建一個(gè)網(wǎng)絡(luò):
docker network create mq-net
docker volume create
運(yùn)行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management
5.測(cè)試
在mq1這個(gè)節(jié)點(diǎn)上添加一個(gè)隊(duì)列:
如圖,在mq2和mq3兩個(gè)控制臺(tái)也都能看到:
6.數(shù)據(jù)共享測(cè)試
點(diǎn)擊這個(gè)隊(duì)列,進(jìn)入管理頁(yè)面:
然后利用控制臺(tái)發(fā)送一條消息到這個(gè)隊(duì)列:
結(jié)果在mq2、mq3上都能看到這條消息:
7.可用性測(cè)試
我們讓其中一臺(tái)節(jié)點(diǎn)mq1宕機(jī):
docker stop mq1
然后登錄mq2或mq3的控制臺(tái),發(fā)現(xiàn)simple.queue也不可用了:
說(shuō)明數(shù)據(jù)并沒(méi)有拷貝到mq2和mq3。
4.3.鏡像集群
4.3.1.集群結(jié)構(gòu)和特征
鏡像集群:本質(zhì)是主從模式,具備下面的特征:
- 交換機(jī)、隊(duì)列、隊(duì)列中的消息會(huì)在各個(gè)mq的鏡像節(jié)點(diǎn)之間同步備份。
- 創(chuàng)建隊(duì)列的節(jié)點(diǎn)被稱為該隊(duì)列的主節(jié)點(diǎn),備份到的其它節(jié)點(diǎn)叫做該隊(duì)列的鏡像節(jié)點(diǎn)。
- 一個(gè)隊(duì)列的主節(jié)點(diǎn)可能是另一個(gè)隊(duì)列的鏡像節(jié)點(diǎn)
- 所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)
- 主宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成新的主
結(jié)構(gòu)如圖:
4.3.2.部署
參考課前資料:《RabbitMQ部署指南.md》
鏡像模式
在剛剛的案例中,一旦創(chuàng)建隊(duì)列的主機(jī)宕機(jī),隊(duì)列就會(huì)不可用。不具備高可用能力。如果要解決這個(gè)問(wèn)題,必須使用官方提供的鏡像集群方案。
官方文檔地址:https://www.rabbitmq.com/ha.html
1.鏡像模式的特征
默認(rèn)情況下,隊(duì)列只保存在創(chuàng)建該隊(duì)列的節(jié)點(diǎn)上。而鏡像模式下,創(chuàng)建隊(duì)列的節(jié)點(diǎn)被稱為該隊(duì)列的主節(jié)點(diǎn),隊(duì)列還會(huì)拷貝到集群中的其它節(jié)點(diǎn),也叫做該隊(duì)列的鏡像節(jié)點(diǎn)。
但是,不同隊(duì)列可以在集群中的任意節(jié)點(diǎn)上創(chuàng)建,因此不同隊(duì)列的主節(jié)點(diǎn)可以不同。甚至,一個(gè)隊(duì)列的主節(jié)點(diǎn)可能是另一個(gè)隊(duì)列的鏡像節(jié)點(diǎn)。
用戶發(fā)送給隊(duì)列的一切請(qǐng)求,例如發(fā)送消息、消息回執(zhí)默認(rèn)都會(huì)在主節(jié)點(diǎn)完成,如果是從節(jié)點(diǎn)接收到請(qǐng)求,也會(huì)路由到主節(jié)點(diǎn)去完成。鏡像節(jié)點(diǎn)僅僅起到備份數(shù)據(jù)作用。
當(dāng)主節(jié)點(diǎn)接收到消費(fèi)者的ACK時(shí),所有鏡像都會(huì)刪除節(jié)點(diǎn)中的數(shù)據(jù)。
總結(jié)如下:
- 鏡像隊(duì)列結(jié)構(gòu)是一主多從(從就是鏡像)
- 所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)
- 主宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成新的主(如果在主從同步完成前,主就已經(jīng)宕機(jī),可能出現(xiàn)數(shù)據(jù)丟失)
- 不具備負(fù)載均衡功能,因?yàn)樗胁僮鞫紩?huì)有主節(jié)點(diǎn)完成(但是不同隊(duì)列,其主節(jié)點(diǎn)可以不同,可以利用這個(gè)提高吞吐量)
2.鏡像模式的配置
鏡像模式的配置有3種模式:
ha-mode | ha-params | 效果 |
---|---|---|
準(zhǔn)確模式exactly | 隊(duì)列的副本量count | 集群中隊(duì)列副本(主服務(wù)器和鏡像服務(wù)器之和)的數(shù)量。count如果為1意味著單個(gè)副本:即隊(duì)列主節(jié)點(diǎn)。count值為2表示2個(gè)副本:1個(gè)隊(duì)列主和1個(gè)隊(duì)列鏡像。換句話說(shuō):count = 鏡像數(shù)量 + 1。如果群集中的節(jié)點(diǎn)數(shù)少于count,則該隊(duì)列將鏡像到所有節(jié)點(diǎn)。如果有集群總數(shù)大于count+1,并且包含鏡像的節(jié)點(diǎn)出現(xiàn)故障,則將在另一個(gè)節(jié)點(diǎn)上創(chuàng)建一個(gè)新的鏡像。 |
all | (none) | 隊(duì)列在群集中的所有節(jié)點(diǎn)之間進(jìn)行鏡像。隊(duì)列將鏡像到任何新加入的節(jié)點(diǎn)。鏡像到所有節(jié)點(diǎn)將對(duì)所有群集節(jié)點(diǎn)施加額外的壓力,包括網(wǎng)絡(luò)I / O,磁盤(pán)I / O和磁盤(pán)空間使用情況。推薦使用exactly,設(shè)置副本數(shù)為(N / 2 +1)。 |
nodes | node names | 指定隊(duì)列創(chuàng)建到哪些節(jié)點(diǎn),如果指定的節(jié)點(diǎn)全部不存在,則會(huì)出現(xiàn)異常。如果指定的節(jié)點(diǎn)在集群中存在,但是暫時(shí)不可用,會(huì)創(chuàng)建節(jié)點(diǎn)到當(dāng)前客戶端連接到的節(jié)點(diǎn)。 |
這里我們以rabbitmqctl命令作為案例來(lái)講解配置語(yǔ)法。
語(yǔ)法示例:
3.exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
-
rabbitmqctl set_policy
:固定寫(xiě)法 -
ha-two
:策略名稱,自定義 -
"^two\."
:匹配隊(duì)列的正則表達(dá)式,符合命名規(guī)則的隊(duì)列才生效,這里是任何以two.
開(kāi)頭的隊(duì)列名稱 -
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
: 策略內(nèi)容-
"ha-mode":"exactly"
:策略模式,此處是exactly模式,指定副本數(shù)量 -
"ha-params":2
:策略參數(shù),這里是2,就是副本數(shù)量為2,1主1鏡像 -
"ha-sync-mode":"automatic"
:同步策略,默認(rèn)是manual,即新加入的鏡像節(jié)點(diǎn)不會(huì)同步舊的消息。如果設(shè)置為automatic,則新加入的鏡像節(jié)點(diǎn)會(huì)把主節(jié)點(diǎn)中所有消息都同步,會(huì)帶來(lái)額外的網(wǎng)絡(luò)開(kāi)銷
-
4.模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
-
ha-all
:策略名稱,自定義 -
"^all\."
:匹配所有以all.
開(kāi)頭的隊(duì)列名 -
'{"ha-mode":"all"}'
:策略內(nèi)容-
"ha-mode":"all"
:策略模式,此處是all模式,即所有節(jié)點(diǎn)都會(huì)稱為鏡像節(jié)點(diǎn)
-
5.nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
-
rabbitmqctl set_policy
:固定寫(xiě)法 -
ha-nodes
:策略名稱,自定義 -
"^nodes\."
:匹配隊(duì)列的正則表達(dá)式,符合命名規(guī)則的隊(duì)列才生效,這里是任何以nodes.
開(kāi)頭的隊(duì)列名稱 -
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
: 策略內(nèi)容-
"ha-mode":"nodes"
:策略模式,此處是nodes模式 -
"ha-params":["rabbit@mq1", "rabbit@mq2"]
:策略參數(shù),這里指定副本所在節(jié)點(diǎn)名稱
-
6.測(cè)試
我們使用exactly模式的鏡像,因?yàn)榧汗?jié)點(diǎn)數(shù)量為3,因此鏡像數(shù)量就設(shè)置為2.
運(yùn)行下面的命令:
docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
下面,我們創(chuàng)建一個(gè)新的隊(duì)列:
在任意一個(gè)mq控制臺(tái)查看隊(duì)列:
7.測(cè)試數(shù)據(jù)共享
給two.queue發(fā)送一條消息:
然后在mq1、mq2、mq3的任意控制臺(tái)查看消息:
8.測(cè)試高可用
現(xiàn)在,我們讓two.queue的主節(jié)點(diǎn)mq1宕機(jī):
docker stop mq1
查看集群狀態(tài):
查看隊(duì)列狀態(tài):
發(fā)現(xiàn)依然是健康的!并且其主節(jié)點(diǎn)切換到了rabbit@mq2上
5.仲裁隊(duì)列
從RabbitMQ 3.8版本開(kāi)始,引入了新的仲裁隊(duì)列,他具備與鏡像隊(duì)里類似的功能,但使用更加方便。
4.4.仲裁隊(duì)列
4.4.1.集群特征
仲裁隊(duì)列:仲裁隊(duì)列是3.8版本以后才有的新功能,用來(lái)替代鏡像隊(duì)列,具備下列特征:
- 與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步
- 使用非常簡(jiǎn)單,沒(méi)有復(fù)雜的配置
- 主從同步基于Raft協(xié)議,強(qiáng)一致
4.4.2.部署
參考課前資料:《RabbitMQ部署指南.md》
仲裁隊(duì)列
從RabbitMQ 3.8版本開(kāi)始,引入了新的仲裁隊(duì)列,他具備與鏡像隊(duì)里類似的功能,但使用更加方便。
1.添加仲裁隊(duì)列
在任意控制臺(tái)添加一個(gè)隊(duì)列,一定要選擇隊(duì)列類型為Quorum類型。
在任意控制臺(tái)查看隊(duì)列:
可以看到,仲裁隊(duì)列的 + 2字樣。代表這個(gè)隊(duì)列有2個(gè)鏡像節(jié)點(diǎn)。
因?yàn)橹俨藐?duì)列默認(rèn)的鏡像數(shù)為5。如果你的集群有7個(gè)節(jié)點(diǎn),那么鏡像數(shù)肯定是5;而我們集群只有3個(gè)節(jié)點(diǎn),因此鏡像數(shù)量就是3.
2.測(cè)試
可以參考對(duì)鏡像集群的測(cè)試,效果是一樣的。
3.集群擴(kuò)容
4.加入集群
1)啟動(dòng)一個(gè)新的MQ容器:
docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management
2)進(jìn)入容器控制臺(tái):
docker exec -it mq4 bash
3)停止mq進(jìn)程
rabbitmqctl stop_app
4)重置RabbitMQ中的數(shù)據(jù):
rabbitmqctl reset
5)加入mq1:
rabbitmqctl join_cluster rabbit@mq1
6)再次啟動(dòng)mq進(jìn)程
rabbitmqctl start_app
5.增加仲裁隊(duì)列副本
我們先查看下quorum.queue這個(gè)隊(duì)列目前的副本情況,進(jìn)入mq1容器:
docker exec -it mq1 bash
執(zhí)行命令:
rabbitmq-queues quorum_status "quorum.queue"
結(jié)果:
現(xiàn)在,我們讓mq4也加入進(jìn)來(lái):
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"
結(jié)果:
再次查看:
rabbitmq-queues quorum_status "quorum.queue"
查看控制臺(tái),發(fā)現(xiàn)quorum.queue的鏡像數(shù)量也從原來(lái)的 +2 變成了 +3:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-841411.html
4.4.3.Java代碼創(chuàng)建仲裁隊(duì)列
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁隊(duì)列
.build();
}
4.4.4.SpringAMQP連接MQ集群
注意,這里用address來(lái)代替host、port方式文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-841411.html
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: itcast
password: 123321
virtual-host: /
到了這里,關(guān)于(黑馬出品_高級(jí)篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!