RabbitMQ
微服務(wù)間通訊有同步和異步兩種方式:
同步通訊:就像打電話,需要實時響應(yīng)。
異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。
兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是一個人卻不能跟多個人同時通話。而發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應(yīng)會有延遲。
同步調(diào)用的優(yōu)點:
- 時效性較強(qiáng),可以立即得到結(jié)果
同步調(diào)用的問題:
-
耦合度高
:每次加入新的需求,都要修改原來的代碼。 -
性能和吞吐能力下降
:調(diào)用者需要等待服務(wù)提供者響應(yīng),如果調(diào)用鏈過長則響應(yīng)時間等于每次調(diào)用的時間之和。 -
有額外的資源消耗
:調(diào)用鏈中的每個服務(wù)在等待響應(yīng)過程中,不能釋放請求占用的資源,高并發(fā)場景下會極度浪費系統(tǒng)資源。 -
有級聯(lián)失敗問題
:如果服務(wù)提供者出現(xiàn)問題,所有調(diào)用方都會跟著出問題,如同多米諾骨牌一樣,迅速導(dǎo)致整個微服務(wù)群故障。
異步通訊
異步調(diào)用則可以避免上述問題:
以購買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉庫分配響應(yīng)的庫存并準(zhǔn)備發(fā)貨。
在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個支付成功的事件(event),事件中帶上訂單id。
訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽到事件后完成自己業(yè)務(wù)即可。
為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰來訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰發(fā)來的消息。
Broker 是一個類似于數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個總線上,這個總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。
異步通訊的優(yōu)點:
-
吞吐量提升:無需等待訂閱者處理完成,響應(yīng)更快速
-
故障隔離:服務(wù)沒有直接調(diào)用,不存在級聯(lián)失敗問題
-
調(diào)用間沒有阻塞,不會造成無效的資源占用
-
耦合度極低,每個服務(wù)都可以靈活插拔,可替換
-
流量削峰:不管發(fā)布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件
缺點:
- 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好管理
- 需要依賴于Broker的可靠、安全、性能。
技術(shù)對比
現(xiàn)在開源軟件或云平臺上 Broker 的軟件是非常成熟的,比較常見的一種就是MQ技術(shù)。
MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅(qū)動架構(gòu)中的Broker。
比較常見的MQ實現(xiàn):
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社 區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機(jī)吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
如果追求可用性,可以選擇Kafka、 RocketMQ 、RabbitMQ
如果追求可靠性,可以選擇RabbitMQ、RocketMQ
如果追求吞吐能力,選擇RocketMQ、Kafka
追求消息低延遲,選擇RabbitMQ、Kafka
為什么選擇RabbitMQ而不是其它的MQ?
kafka是以吞吐量高而聞名,不過其數(shù)據(jù)穩(wěn)定性一般,而且無法保證消息有序性。RabbitMQ基于面向并發(fā)的語言Erlang開發(fā),吞吐量不如Kafka,但消息可靠性較好,并且消息延遲極低,集群搭建比較方便。支持多種協(xié)議,并且有各種語言的客戶端,比較靈活。Spring對RabbitMQ的支持也比較好,使用起來比較方便。
安裝RabbitMQ
在Centos7虛擬機(jī)中使用Docker來安裝。
下載鏡像
在線拉取鏡像
docker pull rabbitmq:3-management
安裝MQ
執(zhí)行下面的命令來運行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
RabbitMQ的基本結(jié)構(gòu):
RabbitMQ中的一些角色:
- publisher:生產(chǎn)者
- consumer:消費者
- exchange:交換機(jī),負(fù)責(zé)消息路由
- queue:隊列,存儲消息
- Virtual Host:虛擬主機(jī),隔離不同租戶的exchange、queue、消息的隔離
RabbitMQ消息模型
MQ的官方文檔中給出了5個MQ的Demo示例,對應(yīng)了幾種不同的用法:
-
基本消息隊列(BasicQueue)
-
工作消息隊列(WorkQueue)
發(fā)布訂閱(Publish、Subscribe),又根據(jù)交換機(jī)類型不同分為三種:
-
Fanout Exchange:廣播
-
Direct Exchange:路由
-
Topic Exchange:主題
入門案例
簡單隊列模式的模型圖:
官方的HelloWorld是基于最基礎(chǔ)的消息隊列模型來實現(xiàn)的,只包括三個角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負(fù)責(zé)接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
publisher實現(xiàn)
實現(xiàn)思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 發(fā)送消息
- 關(guān)閉連接和channel
代碼實現(xiàn):
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
factory.setHost("192.168.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發(fā)送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發(fā)送消息成功:【" + message + "】");
// 5.關(guān)閉通道和連接
channel.close();
connection.close();
}
}
consumer實現(xiàn)
實現(xiàn)思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 訂閱消息
代碼實現(xiàn):
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
factory.setHost("192.168.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
總結(jié)
基本消息隊列的消息發(fā)送流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
利用channel向隊列發(fā)送消息
基本消息隊列的消息接收流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
定義consumer的消費行為handleDelivery()
-
利用channel將消費者與隊列綁定
SpringAMQP
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機(jī)及其綁定關(guān)系
- 基于注解的監(jiān)聽器模式,異步接收消息
- 封裝了RabbitTemplate工具,用于發(fā)送消息
Basic Queue 簡單隊列模型
在父工程mq-demo中引入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息發(fā)送
首先配置MQ地址,在publisher服務(wù)的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.xxx.xxx # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: admin # 用戶名
password: 123456 # 密碼
然后在publisher服務(wù)中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現(xiàn)消息發(fā)送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
首先配置MQ地址,在consumer服務(wù)的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.xxx.xxx # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: admin # 用戶名
password: 123456 # 密碼
然后在consumer服務(wù)的listener包中新建一個類SpringRabbitListener,代碼如下:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者接收到消息:【" + msg + "】");
}
}
啟動consumer服務(wù),然后在publisher服務(wù)中運行測試代碼,發(fā)送MQ消息。
WorkQueue
Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當(dāng)消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠(yuǎn)遠(yuǎn)大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理消息處理,效率就能大大提高了。
消息發(fā)送
利用循環(huán)發(fā)送,來模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個測試方法:
/**
* workQueue
* 向隊列中不停發(fā)送消息,模擬消息堆積。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 發(fā)送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消息接收
要模擬多個消費者綁定同一個隊列,我們在consumer服務(wù)的SpringRabbitListener中添加2個新的方法:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
注意到這個消費者sleep了1000秒,模擬任務(wù)耗時。
測試
啟動ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測試方法testWorkQueue。
可以看到消費者1很快完成了自己的25條消息。消費者2卻在緩慢的處理自己的25條消息。
也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。這樣顯然是有問題的。
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務(wù)的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
總結(jié)
Work模型的使用過程:
- 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
- 通過設(shè)置prefetch來控制消費者預(yù)取的消息數(shù)量
發(fā)布/訂閱
發(fā)布訂閱的模型如圖:
可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:
- Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X(交換機(jī))
-
Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊列
- Direct:定向,把消息交給符合指定routing key 的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
Fanout
Fanout,英文翻譯是扇出,在MQ中譯為廣播更合適。
在廣播模式下,消息發(fā)送流程是這樣的:
-
可以有多個隊列
-
每個隊列都要綁定到Exchange(交換機(jī))
-
生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定
-
交換機(jī)把消息發(fā)送給綁定過的所有隊列
-
訂閱隊列的消費者都能拿到消息
案例測試:
- 創(chuàng)建一個交換機(jī) itcast.fanout,類型是Fanout
- 創(chuàng)建兩個隊列fanout.queue1和fanout.queue2,綁定到交換機(jī)itcast.fanout
聲明隊列和交換機(jī)
Spring提供了一個接口Exchange,來表示所有不同類型的交換機(jī):
在consumer中創(chuàng)建一個類,聲明隊列和交換機(jī):
@Configuration
public class FanoutConfig {
/**
* 聲明交換機(jī)
* @return Fanout類型交換機(jī)
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1個隊列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 綁定隊列和交換機(jī)
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個隊列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 綁定隊列和交換機(jī)
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testFanoutExchange() {
// 隊列名稱
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在consumer服務(wù)的SpringRabbitListener中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}
小結(jié)
交換機(jī)的作用是什么?
- 接收publisher發(fā)送的消息。
- 將消息按照規(guī)則路由到與之綁定的隊列。
- 不能緩存消息,路由失敗,消息丟失。
- FanoutExchange的會將消息路由到每個綁定的隊列。
聲明隊列、交換機(jī)、綁定關(guān)系的Bean是什么?
- Queue
- FanoutExchange
- Binding
Direct
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的
Routing Key
進(jìn)行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
案例需求如下:
-
利用@RabbitListener聲明Exchange、Queue、RoutingKey
-
在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2
-
在publisher中編寫測試方法,向itcast. direct發(fā)送消息。
基于注解聲明隊列和交換機(jī)
基于@Bean的方式聲明隊列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來聲明。
在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來聲明隊列和交換機(jī):
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testSendDirectExchange() {
// 交換機(jī)名稱
String exchangeName = "itcast.direct";
// 消息
String message = "紅色警報!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
總結(jié)
描述下Direct交換機(jī)與Fanout交換機(jī)的差異?
- Fanout交換機(jī)將消息路由給每一個與之綁定的隊列。
- Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個隊列。
- 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似。
基于@RabbitListener注解聲明隊列和交換機(jī)有哪些常見注解?
- @Queue
- @Exchange
Topic
說明
Topic
類型的Exchange
與Direct
相比,都是可以根據(jù)RoutingKey
把消息路由到不同的隊列。只不過Topic
類型Exchange
可以讓隊列在綁定Routing key
的時候使用通配符!
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
舉例:
item.#
:能夠匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
圖示
解釋:
- Queue1:綁定的是
china.#
,因此凡是以china.
開頭的routing key
都會被匹配到。包括china.news和china.weather - Queue2:綁定的是
#.news
,因此凡是以.news
結(jié)尾的routing key
都會被匹配。包括china.news和japan.news
實現(xiàn)思路如下:
-
并利用@RabbitListener聲明Exchange、Queue、RoutingKey
-
在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2
-
在publisher中編寫測試方法,向item. topic發(fā)送消息。
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交換機(jī)名稱
String exchangeName = "itcast.topic";
// 消息
String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收
在consumer服務(wù)的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}
總結(jié)
描述下Direct交換機(jī)與Topic交換機(jī)的差異?
- Topic交換機(jī)接收的消息RoutingKey必須是多個單詞,以
**.**
分割 - Topic交換機(jī)與隊列綁定時的bindingKey可以指定通配符
-
#
:代表0個或多個詞 -
*
:代表1個詞
消息轉(zhuǎn)換器
Spring會把發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時候,還會把字節(jié)反序列化為Java對象。
只不過,默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
- 數(shù)據(jù)體積過大
- 有安全漏洞
- 可讀性差
默認(rèn)轉(zhuǎn)換器
修改消息發(fā)送的代碼,發(fā)送一個Map對象:
@Test
public void testSendMap() throws InterruptedException {
// 準(zhǔn)備消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 發(fā)送消息
rabbitTemplate.convertAndSend("simple.queue","", msg);
}
停止consumer服務(wù)
發(fā)送消息后查看控制臺:
配置JSON轉(zhuǎn)換器
顯然,JDK序列化方式并不合適。可以使用JSON方式來做序列化和反序列化,使得消息體的體積更小、可讀性更高。
在publisher和consumer兩個服務(wù)中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置消息轉(zhuǎn)換器。
在啟動類中添加一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
消息隊列在使用過程中,面臨著很多實際問題需要思考:
消息可靠性
消息從發(fā)送,到消費者接收,會經(jīng)理多個過程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
- 發(fā)送時丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費就宕機(jī)
針對這些問題,RabbitMQ分別給出了解決方案:
- 生產(chǎn)者確認(rèn)機(jī)制
- mq持久化
- 消費者確認(rèn)機(jī)制
- 失敗重試機(jī)制
生產(chǎn)者消息確認(rèn)
RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個消息指定一個唯一ID。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊列。返回ACK,及路由失敗原因。
注意
:確認(rèn)機(jī)制發(fā)送消息時,需要給每個消息設(shè)置一個全局唯一id,以區(qū)分不同消息,避免ack沖突。
修改配置
首先,修改生產(chǎn)者服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
-
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:-
simple
:同步等待confirm結(jié)果,直到超時 -
correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
-
-
publish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback -
template.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
定義Return回調(diào)
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置。
在生產(chǎn)者服務(wù)中添加一個Config配置類
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業(yè)務(wù)需要,可以重發(fā)消息
});
}
}
定義ConfirmCallback
ConfirmCallback可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。
在生產(chǎn)者服務(wù)的測試類中,定義一個單元測試方法
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息體
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失敗
log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一會兒,等待ack回執(zhí)
Thread.sleep(2000);
}
消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。
- 交換機(jī)持久化
- 隊列持久化
- 消息持久化
必須保證上述三點都持久化,才是真正的消息持久化。
交換機(jī)持久化
RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過代碼指定交換機(jī)持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時是否自動刪除
return new DirectExchange("simple.direct", true, false);
}
默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。
可以在RabbitMQ控制臺看到持久化的交換機(jī)都會帶上D
的標(biāo)示:
隊列持久化
RabbitMQ中隊列默認(rèn)是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過代碼指定交換機(jī)持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
默認(rèn)情況下,由SpringAMQP聲明的隊列都是持久化的。
可以在RabbitMQ控制臺看到持久化的隊列都會帶上D
的標(biāo)示:
消息持久化
利用SpringAMQP發(fā)送消息時,可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代碼指定
@Test
public void testDurableMessage(){
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello,TTL Queue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("simple.queue",message,correlationData);
// 記錄日志
log.debug("發(fā)送消息成功!");
}
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
消費者消息確認(rèn)
RabbitMQ是閱后即焚機(jī)制,即RabbitMQ確認(rèn)消息被消費者消費后會立刻刪除此消息。
而RabbitMQ是通過消費者回執(zhí)來確認(rèn)消費者是否成功處理消息的:消費者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
如果RabbitMQ投遞消息給消費者,消費者獲取消息后,并返回ACK給RabbitMQ,RabbitMQ刪除消息之后,此時消費者宕機(jī),接收到的消息還未來得及處理。這樣,消息就丟失了,因此消費者返回ACK的時機(jī)相當(dāng)重要。
而SpringAMQP則允許配置三種確認(rèn)模式:
?manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack(根據(jù)業(yè)務(wù)情況,自行判斷什么時候該返回ack)。
?auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack(類似事務(wù)機(jī)制,出現(xiàn)異常時返回nack,消息回滾到mq;沒有異常,返回ack)。
?none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除(消息投遞是不可靠的,可能丟失)。
配置確認(rèn)模式
修改消費者服務(wù)的application.yml文件,添加下面內(nèi)容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自動ACK
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個消息處理異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消費者接收到simple.queue的消息:【{}】", msg);
// 模擬異常
System.out.println(1 / 0);
log.debug("消息處理完成!");
}
在異常位置打斷點,再次發(fā)送消息,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unack(未確定狀態(tài))
拋出異常后,因為Spring會自動返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:
消費失敗重試機(jī)制
當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue(重入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力
此時有兩種解決辦法:1、本地重試。2、失敗策略。
本地重試
可以利用Spring的retry機(jī)制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。
修改消費者服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟消費者服務(wù),重復(fù)之前的測試??梢园l(fā)現(xiàn):
- 在重試3次后,SpringAMQP會拋出異常AmqpRejectAndDontRequeueException,說明觸發(fā)本地重試。
- 查看RabbitMQ控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是ack,mq將消息刪除了。
由此可見當(dāng)開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試。重試達(dá)到最大次數(shù)后,Spring會返回ack,消息會被丟棄。
失敗策略
在之前的測試中,達(dá)到最大重試次數(shù)后,消息會被丟棄,這是由Spring內(nèi)部機(jī)制決定的。
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。
配置失敗策略
- 在消費者服務(wù)中定義處理失敗消息的交換機(jī)和隊列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
- 定義一個RepublishMessageRecoverer,關(guān)聯(lián)隊列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代碼
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
由此可以得出,確保RabbitMQ消息的可靠性的措施有:
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊列。
- 開啟持久化功能,確保消息未消費前在隊列中不會丟失。
- 開啟消費者確認(rèn)機(jī)制為auto,由spring確定消息處理成功之后完成ACK。
- 開啟消費者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。
死信交換機(jī)
定義
當(dāng)一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個過期消息,超時無人消費
- 要投遞的隊列消息滿了,無法投遞
如果這個包含死信的隊列配置了dead-letter-exchange
屬性,指定了一個交換機(jī),那么隊列中的死信就會投遞到這個交換機(jī)中,而這個交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)。
一個消息被消費者拒絕了,變成了死信:
因為simple.queue綁定了死信交換機(jī) dl.direct,因此死信會投遞給這個交換機(jī):
如果這個死信交換機(jī)也綁定了一個隊列,則消息最終會進(jìn)入這個存放死信的隊列:
另外,隊列將死信投遞給死信交換機(jī)時,必須知道兩個信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊列綁定的RoutingKey
這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊列。
利用死信交換機(jī)接收死信
在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄。
可以給simple.queue添加一個死信交換機(jī),給死信交換機(jī)綁定一個隊列。這樣消息變成死信后也不會丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊列。
在消費者服務(wù)中,定義一組死信交換機(jī)、死信隊列
// 聲明普通的 simple.queue隊列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定隊列名稱,并持久化
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲死信的隊列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 將死信隊列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
使用場景:
- 隊列綁定了死信交換機(jī),死信會投遞到死信交換機(jī)。
- 利用死信交換機(jī)收集所有消費者處理失敗的消息(死信),交由人工處理,進(jìn)一步提高消息隊列的可靠性。
TTL
一個隊列中的消息如果超時未消費,則會變?yōu)樗佬?,超時分為兩種情況:
- 消息所在的隊列設(shè)置了超時時間
- 消息本身設(shè)置了超時時間
聲明隊列時指定TTL
在消費者服務(wù)的SpringRabbitListener中,定義一個新的消費者,并且聲明死信交換機(jī)、死信隊列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
要給隊列設(shè)置超時時間,需要在聲明隊列時配置x-message-ttl屬性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊列名稱,并持久化
.ttl(10000) // 設(shè)置隊列的超時時間,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)
.build();
}
注意,這個隊列設(shè)定了死信交換機(jī)為dl.ttl.direct
聲明交換機(jī),將TTL與交換機(jī)綁定:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
發(fā)送消息,但是不要指定TTL:
@Test
public void testTTLQueue() {
// 創(chuàng)建消息
String message = "hello, ttl queue";
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 記錄日志
log.debug("發(fā)送消息成功");
}
發(fā)送消息時指定TTL
在發(fā)送消息時,也可以指定TTL:
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("10000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("發(fā)送消息成功");
}
延遲隊列
利用TTL結(jié)合死信交換機(jī),我們實現(xiàn)了消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就稱為延遲隊列(Delay Queue)模式。
延遲隊列的使用場景包括:
- 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動取消
- 預(yù)約工作會議,10分鐘后自動通知所有參會人員
因為延遲隊列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊列效果。
這個插件就是DelayExchange插件。
參考RabbitMQ的插件列表頁面:https://www.rabbitmq.com/community-plugins.html
使用方式可以參考官網(wǎng)地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
安裝DelayExchange插件
可以去對應(yīng)的GitHub頁面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
這個對應(yīng)RabbitMQ的3.8.5以上版本。
上傳插件
因為是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應(yīng)的數(shù)據(jù)卷。但是前面安裝MQ時并未掛載數(shù)據(jù)卷,所以這里重新創(chuàng)建一個MQ容器。
先移除之前創(chuàng)建的容器
docker rm -f mq
創(chuàng)建MQ容器
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
使用下面命令查看數(shù)據(jù)卷
docker volume inspect mq-plugins
可以得知掛載的位置在/var/lib/docker/volumes/mq-plugins/_data
將下載好的插件上傳至該目錄即可。
安裝插件
安裝需要進(jìn)入MQ容器內(nèi)部執(zhí)行安裝,執(zhí)行以下命令
docker exec -it mq bash
進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安裝成功結(jié)果如下:
DelayExchange原理
DelayExchange需要將一個交換機(jī)聲明為delayed類型。當(dāng)有消息發(fā)送到delayExchange時,流程如下:
-
接收消息
-
判斷消息是否具備x-delay屬性
-
如果有x-delay屬性,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時間
-
返回routing not found結(jié)果給消息發(fā)送者
-
x-delay時間到期后,重新投遞消息到指定隊列
使用DelayExchange
插件的使用也非常簡單:聲明一個交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊列與其綁定即可。
聲明DelayExchange交換機(jī)
基于注解方式(推薦)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl" ))public void listenDlQueue(String msg){
log.info("接收到 dl.queue的延遲消息:{}", msg);
}
基于@Bean方式
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊列名稱,并持久化
.ttl(10000) // 設(shè)置隊列的超時時間,10秒
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.deadLetterRoutingKey("dl") // 指定死信RoutingKey
.build();
}
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
發(fā)送消息時,一定要攜帶x-delay屬性,并指定延遲的時間。
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
}
惰性隊列
消息堆積問題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費者處理消息的速度,就會導(dǎo)致隊列中的消息堆積,直到隊列存儲消息達(dá)到上限。之后發(fā)送的消息就會成為死信,可能會被丟棄,這就是消息堆積問題。
解決消息堆積有兩種思路:
- 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
- 擴(kuò)大隊列容積,提高堆積上限
惰性隊列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊列。惰性隊列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲
基于命令行設(shè)置lazy-queue
要設(shè)置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可??梢酝ㄟ^命令行將一個運行中的隊列修改為惰性隊列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一個策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表達(dá)式匹配隊列的名字 -
'{"queue-mode":"lazy"}'
:設(shè)置隊列模式為lazy模式 -
--apply-to queues
:策略的作用對象,是所有的隊列
基于@Bean聲明lazy-queue
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 開啟x-queue-mode為lazy
.build();
}
基于@RabbitListener聲明LazyQueue
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String message){
log.info("接收到 lazy.queue 的消息:{}",message);
}
惰性隊列的優(yōu)點:
- 基于磁盤存儲,消息上限高
- 沒有間歇性的page-out,性能比較穩(wěn)定
惰性隊列的缺點:
- 基于磁盤存儲,消息時效性會降低
- 性能受限于磁盤的IO
MQ集群
集群的分類
RabbitMQ的是基于Erlang語言編寫,而Erlang又是一個面向并發(fā)的語言,天然支持集群模式。RabbitMQ的集群有兩種模式:
-
普通集群:是一種分布式集群,將隊列分散到集群的各個節(jié)點,從而提高整個集群的并發(fā)能力。
-
鏡像集群:是一種主從集群,普通集群的基礎(chǔ)上,添加了主從備份功能,提高集群的數(shù)據(jù)可用性。
鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險。因此在RabbitMQ的3.8版本以后,推出了新的功能:
- 仲裁隊列來代替鏡像集群,底層采用Raft協(xié)議確保主從的數(shù)據(jù)一致性。
普通集群
普通集群,或者叫標(biāo)準(zhǔn)集群(classic cluster),具備下列特征:
- 會在集群的各個節(jié)點間共享部分?jǐn)?shù)據(jù),包括:交換機(jī)、隊列元信息。不包含隊列中的消息。
- 當(dāng)訪問集群某節(jié)點時,如果隊列不在該節(jié)點,會從數(shù)據(jù)所在節(jié)點傳遞到當(dāng)前節(jié)點并返回
- 隊列所在節(jié)點宕機(jī),隊列中的消息就會丟失
結(jié)構(gòu)如下
部署
計劃部署三節(jié)點的mq集群:
主機(jī)名 | 控制臺端口 | amqp通信端口 |
---|---|---|
mq1 | 8081 —> 15672 | 8071 —> 5672 |
mq2 | 8082 —> 15672 | 8072 —> 5672 |
mq3 | 8083 —> 15672 | 8073 —> 5672 |
集群中的節(jié)點標(biāo)示默認(rèn)都是:rabbit@[hostname]
,因此以上三個節(jié)點的名稱分別為:
- rabbit@mq1
- rabbit@mq2
- rabbit@mq3
獲取Cookie
RabbitMQ底層依賴于Erlang,而Erlang虛擬機(jī)就是一個面向分布式的語言,默認(rèn)就支持集群模式。集群模式中的每個RabbitMQ 節(jié)點使用 cookie 來確定它們是否被允許相互通信。
要使兩個節(jié)點能夠通信,它們必須具有相同的共享秘密,稱為Erlang cookie。cookie 只是一串最多 255 個字符的字母數(shù)字字符。
每個集群節(jié)點必須具有相同的 cookie。實例之間也需要它來相互通信。
先在之前啟動的mq容器中獲取一個cookie值,作為集群的cookie。執(zhí)行下面的命令
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
記錄保存獲取到的Cookie值,以備后用。
接下來,停止并刪除當(dāng)前的mq容器,重新搭建集群。
docker rm -f mq
準(zhǔn)備集群配置
在/tmp目錄新建一個配置文件 rabbitmq.conf:
cd /tmp
# 創(chuàng)建文件
touch rabbitmq.conf
文件內(nèi)容如下:
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
再創(chuàng)建一個文件,記錄cookie
cd /tmp
# 創(chuàng)建cookie文件
touch .erlang.cookie
# 寫入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的權(quán)限
chmod 600 .erlang.cookie
準(zhǔn)備三個目錄,mq1、mq2、mq3:
cd /tmp
# 創(chuàng)建目錄
mkdir mq1 mq2 mq3
然后拷貝rabbitmq.conf、cookie文件到mq1、mq2、mq3:
# 進(jìn)入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
啟動集群
創(chuàng)建網(wǎng)絡(luò)
docker network create mq-net
運行命令
# 運行mq1
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
# 運行mq2
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
# 運行mq3
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management
到此,一個普通的集群就部署完畢。
鏡像集群
在剛剛的案例中,一旦創(chuàng)建隊列的主機(jī)宕機(jī),隊列就會不可用。不具備高可用能力。如果要解決這個問題,必須使用官方提供的鏡像集群方案。
官方文檔地址:https://www.rabbitmq.com/ha.html
鏡像集群:本質(zhì)是主從模式,具備下面的特征。
- 交換機(jī)、隊列、隊列中的消息會在各個mq的鏡像節(jié)點之間同步備份。
- 創(chuàng)建隊列的節(jié)點被稱為該隊列的主節(jié)點,備份到的其它節(jié)點叫做該隊列的鏡像節(jié)點。
- 一個隊列的主節(jié)點可能是另一個隊列的鏡像節(jié)點
- 所有操作都是主節(jié)點完成,然后同步給鏡像節(jié)點
- 主宕機(jī)后,鏡像節(jié)點會替代成新的主(如果在主從同步完成前,主就已經(jīng)宕機(jī),可能出現(xiàn)數(shù)據(jù)丟失)
- 不具備負(fù)載均衡功能,因為所有操作都會有主節(jié)點完成(但是不同隊列,其主節(jié)點可以不同,可以利用這個提高吞吐量)
結(jié)構(gòu)如下
默認(rèn)情況下,隊列只保存在創(chuàng)建該隊列的節(jié)點上。而鏡像模式下,創(chuàng)建隊列的節(jié)點被稱為該隊列的主節(jié)點,隊列還會拷貝到集群中的其它節(jié)點,也叫做該隊列的鏡像節(jié)點。
但是,不同隊列可以在集群中的任意節(jié)點上創(chuàng)建,因此不同隊列的主節(jié)點可以不同。甚至,一個隊列的主節(jié)點可能是另一個隊列的鏡像節(jié)點。
用戶發(fā)送給隊列的一切請求,例如發(fā)送消息、消息回執(zhí)默認(rèn)都會在主節(jié)點完成,如果是從節(jié)點接收到請求,也會路由到主節(jié)點去完成。鏡像節(jié)點僅僅起到備份數(shù)據(jù)作用。
當(dāng)主節(jié)點接收到消費者的ACK時,所有鏡像都會刪除節(jié)點中的數(shù)據(jù)。
部署
鏡像模式的配置有3種模式:
hamode | haparams | 效果 |
---|---|---|
準(zhǔn)確模式exactly | 隊列的副本量count | 集群中隊列副本(主服務(wù)器和鏡像服務(wù)器之和)的數(shù)量。count如果為1意味著單個副本:即隊列主節(jié)點。count值為2表示2個副本:1個隊列主和1個隊列鏡像。換句話說:count = 鏡像數(shù)量 + 1。如果群集中的節(jié)點數(shù)少于count,則該隊列將鏡像到所有節(jié)點。如果有集群總數(shù)大于count+1,并且包含鏡像的節(jié)點出現(xiàn)故障,則將在另一個節(jié)點上創(chuàng)建一個新的鏡像。 |
all | (none) | 隊列在群集中的所有節(jié)點之間進(jìn)行鏡像。隊列將鏡像到任何新加入的節(jié)點。鏡像到所有節(jié)點將對所有群集節(jié)點施加額外的壓力,包括網(wǎng)絡(luò)I / O,磁盤I / O和磁盤空間使用情況。推薦使用exactly,設(shè)置副本數(shù)為(N / 2 +1)。 |
nodes | node names | 指定隊列創(chuàng)建到哪些節(jié)點,如果指定的節(jié)點全部不存在,則會出現(xiàn)異常。如果指定的節(jié)點在集群中存在,但是暫時不可用,會創(chuàng)建節(jié)點到當(dāng)前客戶端連接到的節(jié)點。 |
exactly模式
進(jìn)入任意容器
docker exec -it mq1 bash
執(zhí)行命令
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
-
rabbitmqctl set_policy
:固定寫法 -
ha-two
:策略名稱,自定義 -
"^two\."
:匹配隊列的正則表達(dá)式,符合命名規(guī)則的隊列才生效,這里是任何以two.
開頭的隊列名稱 -
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
: 策略內(nèi)容-
"ha-mode":"exactly"
:策略模式,此處是exactly模式,指定副本數(shù)量 -
"ha-params":2
:策略參數(shù),這里是2,就是副本數(shù)量為2,1主1鏡像。 -
"ha-sync-mode":"automatic"
:同步策略,默認(rèn)是manual,即新加入的鏡像節(jié)點不會同步舊的消息。如果設(shè)置為automatic,則新加入的鏡像節(jié)點會把主節(jié)點中所有消息都同步,會帶來額外的網(wǎng)絡(luò)開銷。
-
all模式
進(jìn)入任意容器
docker exec -it mq2 bash
執(zhí)行命令
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
-
ha-all
:策略名稱,自定義 -
"^all\."
:匹配所有以all.
開頭的隊列名 -
'{"ha-mode":"all"}'
:策略內(nèi)容-
"ha-mode":"all"
:策略模式,此處是all模式,即所有節(jié)點都會稱為鏡像節(jié)點
-
nodes模式
進(jìn)入容器
docker exec -it mq3 bash
執(zhí)行命令
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
- rabbitmqctl set_policy`:固定寫法
-
ha-nodes
:策略名稱,自定義 -
"^nodes\."
:匹配隊列的正則表達(dá)式,符合命名規(guī)則的隊列才生效,這里是任何以nodes.
開頭的隊列名稱 -
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
: 策略內(nèi)容-
"ha-mode":"nodes"
:策略模式,此處是nodes模式 -
"ha-params":["rabbit@mq1", "rabbit@mq2"]
:策略參數(shù),這里指定副本所在節(jié)點名稱
-
仲裁隊列
從RabbitMQ 3.8版本開始,引入了新的仲裁隊列,他具備與鏡像隊里類似的功能,但使用更加方便。
添加仲裁隊列
控制臺添加
在任意控制臺添加一個隊列,一定要選擇隊列類型為Quorum類型。
在任意控制臺查看隊列:
可以看到,仲裁隊列的 + 2 字樣。代表這個隊列有2個鏡像節(jié)點。
因為仲裁隊列默認(rèn)的鏡像數(shù)為5。如果你的集群有7個節(jié)點,那么鏡像數(shù)肯定是5;而我們集群只有3個節(jié)點,因此鏡像數(shù)量就是3。
SpringAMQP創(chuàng)建
@Configuration
public class QuorumConfig{
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁隊列
.build();
}
}
SpringAMQP連接集群,只需要在yaml中配置即可
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: admin
password: 123456
virtual-host: /
集群擴(kuò)容
啟動一個新的MQ容器
docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management
進(jìn)入容器控制臺
docker exec -it mq4 bash
停止mq進(jìn)程
rabbitmqctl stop_app
重置RabbitMQ中的數(shù)據(jù):
rabbitmqctl reset
加入mq1
rabbitmqctl join_cluster rabbit@mq1
再次啟動mq進(jìn)程
rabbitmqctl start_app
增加仲裁隊列副本
先查看下quorum.queue這個隊列目前的副本情況,進(jìn)入mq1容器:
docker exec -it mq1 bash
執(zhí)行命令:
rabbitmq-queues quorum_status "quorum.queue"
現(xiàn)在讓mq4也加入進(jìn)來
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"
再次查看副本情況
rabbitmq-queues quorum_status "quorum.queue"
查看控制臺,發(fā)現(xiàn)quorum.queue的鏡像數(shù)量也從原來的 +2 變成了 +3文章來源:http://www.zghlxwxcb.cn/news/detail-799812.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-799812.html
到了這里,關(guān)于服務(wù)異步通訊——RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!