RabbitMQ
課程內(nèi)容
- 認(rèn)識(shí)RabbitMQ
- 安裝RabbitMQ
- SpringBoot使用RabbitMQ
- 其他特性
一.RabbitMQ入門
1.RabbitMQ認(rèn)識(shí)
1.1.RabbitMQ是什么
MQ全稱為Message Queue,即消息隊(duì)列. 它也是一個(gè)隊(duì)列,遵循FIFO原則 。RabbitMQ是由erlang語(yǔ)言開發(fā),基于AMQP(Advanced Message Queue Protocol高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開 發(fā)中應(yīng)用非常廣泛。官方地址:http://www.rabbitmq.com/
1.2.RabbitMQ的使用場(chǎng)景
開發(fā)中消息隊(duì)列通常有如下應(yīng)用場(chǎng)景: 消峰,解耦,提速,大數(shù)據(jù)處理
-
消除峰值:用于高并發(fā)場(chǎng)景消除峰值,讓并發(fā)請(qǐng)求在mq中進(jìn)行排隊(duì)
-
大數(shù)據(jù)處理:由于數(shù)據(jù)量太大,程序一時(shí)處理不過(guò)來(lái),可以通過(guò)把數(shù)據(jù)放入MQ,多開幾個(gè)消費(fèi)者去處理消息,比如:日志收集等
-
服務(wù)異步/解耦 :服務(wù)之間通過(guò)RPC進(jìn)行通信的方式是同步方式,服務(wù)消費(fèi)方需要等到服務(wù)提供方相應(yīng)結(jié)果后才可以繼續(xù)執(zhí)行,使用MQ之后的服務(wù)通信是異步的,服務(wù)之間沒(méi)有直接的調(diào)用關(guān)系,而是通過(guò)隊(duì)列進(jìn)行服務(wù)通信, 應(yīng)用程序解耦合 MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過(guò)MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。
-
排序保證 FIFO :遵循隊(duì)列先進(jìn)先出的特點(diǎn),可以保證數(shù)據(jù)按順序消費(fèi)
除此之外使用MQ還可以達(dá)到:提高系統(tǒng)響應(yīng)速度,提高系統(tǒng)穩(wěn)定性的目的。 將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。 提高了應(yīng)用程序的響應(yīng)時(shí)間。另外如果系統(tǒng)掛了也沒(méi)關(guān)系,數(shù)據(jù)放到消息隊(duì)列.后續(xù)可以繼續(xù)消費(fèi)
但是需要注意的是:對(duì)數(shù)據(jù)的一致性要求較高的業(yè)務(wù)場(chǎng)景不適合使用MQ,因?yàn)镸Q具有一定的數(shù)據(jù)延遲
1.3.AMQP協(xié)議
AMQP是一套公開的消息隊(duì)列協(xié)議,最早在2003年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標(biāo)準(zhǔn)格式, 為的就是解決MQ市場(chǎng)上協(xié)議不統(tǒng)一的問(wèn)題,RabbitMQ就是遵循AMQP標(biāo)準(zhǔn)協(xié)議開發(fā)的MQ服務(wù)。 官方:http://www.amqp.org
1.4.JMS是什么 ?
JMS是Java消息服務(wù),是java提供的一套消息服務(wù)API標(biāo)準(zhǔn),其目的是為所有的java應(yīng)用程序提供統(tǒng)一的消息通信的標(biāo)準(zhǔn),類似java的 jdbc,只要遵循jms標(biāo)準(zhǔn)的應(yīng)用程序之間都可以進(jìn)行消息通信。它和AMQP有什么 不同,jms是java語(yǔ)言專屬的消 息服務(wù)標(biāo)準(zhǔn),它是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用;而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是跨語(yǔ)言的 。
2.RabbitMQ的工作流程
2.1.RabbitMQ中核心概念
-
Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。
-
Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過(guò)慮。exchange有下面四種(先了解:fanout,direct,topics,header)
-
Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。
-
Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。
-
Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
2.2.RabbitMQ的工作流程
消息發(fā)布接收流程:
1.發(fā)送消息
1、生產(chǎn)者和Broker建立TCP連接。
2、生產(chǎn)者和Broker建立通道。
3、生產(chǎn)者通過(guò)通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)
消息接收消息
1、消費(fèi)者和Broker建立TCP連接
2、消費(fèi)者和Broker建立通道
3、消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列)
4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
5、消費(fèi)者接收到消息。
3.RabbitMQ安裝
3.1.Docker安裝RabbitMQ
1.下載docker鏡像
docker pull rabbitmq:3-management
2.啟動(dòng)容器
需要開放端口:5672是程序連接的端口,15672是可視化界面接口
docker run -id --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
3.訪問(wèn)管理界面
安裝好之后,訪問(wèn):15672界面如下,賬號(hào)和密碼都是 guest
二.SpringBoot中使用RabbitMQ
1.SpringBoot整合RabbitMQ
1.1.導(dǎo)入依賴
第一步需要導(dǎo)入mq的基礎(chǔ)依賴,SpringBoot使用的是2.6.13
<!--SpringBoot依賴-->
<parent>
<groupId> org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2.2.配置RabbitMQ
第二步:配置mq,主要是配置連接信息
server:
port: 10200
spring:
application:
name: rabbitmq‐application
rabbitmq:
host: 60.204.187.34
port: 5672
username: guest
password: guest
virtualHost: /
編寫啟動(dòng)類,省略…
2.HelloWorld(直連模型)
2.1.模型認(rèn)識(shí)
rabbitMQ提供了7種消息模型。https://www.rabbitmq.com/tutorials
我們使用Hello World 案例來(lái)入門,這種模式比較簡(jiǎn)單,只需要一個(gè)生產(chǎn)者,一個(gè)隊(duì)列,一個(gè)消費(fèi)者即可
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
- queue:消息隊(duì)列,用來(lái)存儲(chǔ)消息的,生產(chǎn)者把消息發(fā)送到隊(duì)列中,消費(fèi)者從隊(duì)列中消費(fèi)消息。類似一個(gè)郵箱,可以緩存消息;
我們需要做如下事情
- 創(chuàng)建一個(gè)隊(duì)列
- 編寫消費(fèi)者,監(jiān)聽該隊(duì)列
- 編寫生產(chǎn)者發(fā)送消息,指定該隊(duì)列名
2.2.配置隊(duì)列
在SpringBoot中交換機(jī)和隊(duì)列的創(chuàng)建都通過(guò)Bean的方式來(lái)進(jìn)行,下面定義了一個(gè)隊(duì)列,名字為hello:
@Configuration
public class RabbitmqConfig {
//定義消息隊(duì)列的名字
public static final String NAME_HELLO = "queue_hello";
@Bean
public Queue queue() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字
return new Queue(NAME_HELLO,true);
}
}
2.3.編寫消費(fèi)者
rabbitmq通過(guò)@RabbitListener(queues = {隊(duì)列名}) 來(lái)監(jiān)聽隊(duì)列,從而消費(fèi)消息
@Component
public class ReceiveHandler {
//監(jiān)聽NAME_HELLO隊(duì)列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receiveHelloQueueMessage(String msg, Message message, Channel channel) {
System.out.println("消費(fèi)者收到消息:"+msg);
}
}
2.4.編寫MQ生產(chǎn)者
通過(guò)注入:RabbitTemplate發(fā)送消息,以及消息內(nèi)容。
@RestController
public class SenderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/sender/hello/{message}")
public String senderHello(@PathVariable String message) {
/**
* 參數(shù)說(shuō)明
* exchnage: 交換機(jī),默認(rèn)交換機(jī)指定為“”即可
* routingKey :發(fā)送消息的路由鍵,該模式下使用隊(duì)列名即可
* message:消息的內(nèi)容
*/
rabbitTemplate.convertAndSend("", RabbitmqConfig.NAME_HELLO,message);
return "success";
}
}
注意:這個(gè)的交換機(jī)使用的是默認(rèn)的交換機(jī)"" ,路由鍵直接指定為隊(duì)列的名字。其實(shí)在MQ中是提供了幾個(gè)默認(rèn)的交換機(jī),當(dāng)我們把交換機(jī)指定為 “” , 就會(huì)使用默認(rèn)的交換機(jī)來(lái)轉(zhuǎn)發(fā)消息,而我們創(chuàng)建的隊(duì)列會(huì)和默認(rèn)的交換機(jī)進(jìn)行綁定,如下:
下面是綁定關(guān)系圖
2.5.測(cè)試
啟動(dòng)程序訪問(wèn)controller進(jìn)行測(cè)試,控制臺(tái)可以看到消費(fèi)者打印的日志,打開MQ的可視化界面可以看到創(chuàng)建的隊(duì)列,之所以里面沒(méi)有消息是因?yàn)橄⒈幌M(fèi)了。
3.WorkQueue(工作隊(duì)列)
3.1.WorkQueue模型認(rèn)識(shí)
Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來(lái)越多,無(wú)法及時(shí)處理。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息
。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的
,即:同一個(gè)消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)
3.2.代碼演示
WorkQueue和HelloWorld本身無(wú)區(qū)別,只是在HelloWorld的基礎(chǔ)上多增加消費(fèi)者而已,如下:
@Component
public class ReceiveHandler {
//監(jiān)聽NAME_HELLO隊(duì)列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receive1(String msg, Message message, Channel channel) {
System.out.println("消費(fèi)者1收到消息:"+msg);
}
//監(jiān)聽NAME_HELLO隊(duì)列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receive2(String msg, Message message, Channel channel) {
System.out.println("消費(fèi)者2收到消息:"+msg);
}
}
連續(xù)多次發(fā)送消息MQ會(huì)使用輪詢方式把消息評(píng)價(jià)分配給多個(gè)消費(fèi)者
3.3.指定拉取數(shù)量
這種消費(fèi)模式有一個(gè)問(wèn)題,當(dāng)某個(gè)消費(fèi)者消費(fèi)能力偏弱會(huì)導(dǎo)致后續(xù)的消息阻塞,我們可以通過(guò) prefetch
來(lái)指定消費(fèi)者每次只能拉取一個(gè)消息,這樣的話當(dāng)某個(gè)消費(fèi)者正在忙碌,那么MQ會(huì)把消息推送給別的消費(fèi)者,防止消息在某個(gè)消費(fèi)者身上發(fā)生阻塞。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
4.fanout-廣播模型
4.1.模型認(rèn)識(shí)
在上面的案例中,我們采用一個(gè)隊(duì)列來(lái)發(fā)送消息,及時(shí)同一個(gè)隊(duì)列監(jiān)聽了多個(gè)消費(fèi)者,同一個(gè)消息也只會(huì)給到其中一個(gè)消費(fèi)者,而發(fā)布訂閱模型允許一個(gè)消息向多個(gè)消費(fèi)者投遞
。而對(duì)于:fanout , direct , topics都屬于發(fā)布訂閱模型。
RabbitMQ的exchnage正好有4中類型,就對(duì)應(yīng)了上述的幾種訂閱模型,源碼如下:
public abstract class ExchangeTypes {
public static final String DIRECT = "direct";
public static final String TOPIC = "topic";
public static final String FANOUT = "fanout";
public static final String HEADERS = "headers";
public static final String SYSTEM = "system";
}
Fanout被叫做廣播模型,它的特點(diǎn)是當(dāng)生產(chǎn)者把消息投遞給交換機(jī),交換機(jī)會(huì)把消息投遞給和它綁定的所有隊(duì)列,而相應(yīng)的所有的消費(fèi)者都能收到消息,如上圖。要實(shí)現(xiàn)Fanout模型我們要做如下幾個(gè)事情
- 定義自己的fanout類型的交換機(jī)
- 定義多個(gè)隊(duì)列
- 把隊(duì)列和交換機(jī)進(jìn)行綁定
- 消費(fèi)者監(jiān)聽不同隊(duì)列
4.2.配置交換機(jī)和隊(duì)列
下面配置了一個(gè)fanout類型的交換機(jī)和2個(gè)隊(duì)列,并把隊(duì)列綁定到了交換機(jī)
@Configuration
public class RabbitmqConfigFanout {
//定義消息隊(duì)列的名字
public static final String QUEUE_1 = "queue1";
public static final String QUEUE_2 = "queue2";
public static final String EXCHANGE_FANOUT = "exchnage-fanout";
@Bean
public Exchange exchange(){
//定義一個(gè)fanout類型的交換機(jī),并指定持久化
return ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT).durable(true).build();
}
@Bean
public Queue queue1() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字和持久化
return new Queue(QUEUE_1,true);
}
@Bean
public Queue queue2() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字
return new Queue(QUEUE_2,true);
}
@Bean
public Binding bindingQueue1() {
//fanout模式不指定routingkey
return BindingBuilder
.bind(queue1()).to(exchange()).with("").noargs();
}
@Bean
public Binding bindingQueue2() {
return BindingBuilder
.bind(queue2()).to(exchange()).with("").noargs();
}
}
4.3.編寫消費(fèi)者
消費(fèi)者只需要監(jiān)聽不同的隊(duì)列即可
@RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_1})
public void receiveFanout1(String msg, Message message, Channel channel) {
System.out.println("fanout消費(fèi)者1收到消息:"+msg);
}
@RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_2})
public void receiveFanout2(String msg, Message message, Channel channel) {
System.out.println("fanout消費(fèi)者2收到消息:"+msg);
}
4.4.編寫生產(chǎn)者
生產(chǎn)者發(fā)送消息的時(shí)候需要指定exchange的名字,注意:routingkey不需要指定
@PostMapping("/sender/fanout/{message}")
public String senderFanout(@PathVariable String message) {
/**
* 參數(shù)說(shuō)明
* exchnage: 交換機(jī),使用自定義的交換機(jī)
* routingKey :發(fā)送消息的路由鍵,fanout模式指定為“”
* message:消息的內(nèi)容
*/
rabbitTemplate.convertAndSend(RabbitmqConfigFanout.EXCHANGE_FANOUT, "",message);
return "success";
}
4.5.測(cè)試
啟動(dòng)測(cè)試,發(fā)送一個(gè)消息2個(gè)消費(fèi)者都能收到
5.Routing(路由模型)
5.1.模型認(rèn)識(shí)
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi),我們就要用的routing路由模式,這種模式是通過(guò)一個(gè)routingkey來(lái)收發(fā)消息。交換機(jī)的類型使用direct
如上圖:不同的隊(duì)列在綁定到交換機(jī)時(shí)指定的routingkey是不一樣的,這樣一來(lái)我們發(fā)送消息的時(shí)候,就可以通過(guò)不同的routingkey來(lái)把消息發(fā)送到不同的隊(duì)列中,從而使不同的消費(fèi)者去消費(fèi),該模型我們需要做如下幾個(gè)步驟
- 創(chuàng)建direct類型的交換機(jī)
- 創(chuàng)建多個(gè)隊(duì)列
- 把隊(duì)列綁定到交換機(jī),但是綁定時(shí)需要指定不同的routingkey
- 消費(fèi)者消費(fèi)不同隊(duì)列的消息
5.2.配置交換機(jī)和隊(duì)列
這里定義了一個(gè)direct類型的交換機(jī),以及2個(gè)隊(duì)列,隊(duì)列在綁定到交換機(jī)時(shí)采用了不同的routingkey.
@Configuration
public class RabbitmqConfigDirect {
//定義消息隊(duì)列的名字
public static final String QUEUE_DIRECT_1 = "direct_queue1";
public static final String QUEUE_DIRECT_2 = "direct_queue2";
public static final String EXCHANGE_DIRECT = "exchnage-direct";
@Bean
public Exchange exchange(){
//定義一個(gè)direct類型的交換機(jī),并指定持久化
return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
@Bean
public Queue queue1() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字
return new Queue(QUEUE_DIRECT_1,true);
}
@Bean
public Queue queue2() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字
return new Queue(QUEUE_DIRECT_2,true);
}
@Bean
public Binding bindingQueue1() {
return BindingBuilder
.bind(queue1()).to(exchange()).with("pay").noargs();
}
@Bean
public Binding bindingQueue2() {
return BindingBuilder
.bind(queue2()).to(exchange()).with("order").noargs();
}
}
5.3.配置消費(fèi)者
消費(fèi)者消費(fèi)不同隊(duì)列中的消息即可
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_1})
public void receiveDirect1(String msg, Message message, Channel channel) {
System.out.println("receiveDirect1消費(fèi)者1收到消息:"+msg);
}
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) {
System.out.println("receiveDirect2消費(fèi)者2收到消息:"+msg);
}
5.4.定義生產(chǎn)者
生產(chǎn)者需要指定自己的交換機(jī),以及routingkey,指定不同的routingkey決定了消息會(huì)發(fā)送到不同的隊(duì)列中
@PostMapping("/sender/direct/{message}")
public String senderDirect(@PathVariable String message) {
/**
* 參數(shù)說(shuō)明
* exchnage: 交換機(jī),使用自定義的交換機(jī)
* routingKey :發(fā)送消息的路由鍵,fanout模式指定為“”
* message:消息的內(nèi)容
*/
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
return "success";
}
6.Topics(通配符)
6.1.模型認(rèn)識(shí)
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!這種模型Routingkey 一般都是由一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: goods.insert
通配符規(guī)則:
-
#:匹配一個(gè)或多個(gè)詞
-
*:匹配不多不少恰好1個(gè)詞
舉例:
audit.#:能夠匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
topic 和 direct 沒(méi)有本質(zhì)的區(qū)別,只是在綁定隊(duì)列時(shí)可以使用通配符
6.2.定義交換機(jī)和隊(duì)列
需要定義topics類型的交換機(jī)和2個(gè)隊(duì)列,綁定隊(duì)列的時(shí)候指定routingkey,可以使用通配符來(lái)指定
@Bean
public Binding bindingQueue1() {
return BindingBuilder
.bind(queue1()).to(exchange()).with("#.pay").noargs();
}
@Bean
public Binding bindingQueue2() {
return BindingBuilder
.bind(queue2()).to(exchange()).with("#.order").noargs();
}
6.3.編寫生產(chǎn)者
發(fā)送消息的時(shí)候指定的routingkey如果能命中綁定時(shí)的routingkey消息就可以發(fā)送到相應(yīng)的隊(duì)列中,比如:
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "account.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.order",message);
前2個(gè)消息的routingkey可以命中 #.pay, 第3條消息可以命中 #.order.
三.RabbitMQ其他特性
1.簽收機(jī)制
1.1.自動(dòng)簽收的問(wèn)題
在RabbitMQ中包含手動(dòng)簽收和自動(dòng)簽收2鐘模式,上述案例都采用的是自動(dòng)簽收,也就是當(dāng)MQ吧消息投遞給消費(fèi)者后,消息默認(rèn)被簽收,MQ就會(huì)直接把消息刪除掉。這種模式可能會(huì)導(dǎo)致消息丟失分享:比如消費(fèi)者拿到消息并未成功消費(fèi),但是MQ已經(jīng)把消息刪除,從而造成了消息的丟失,所以在司機(jī)開發(fā)中盡量使用手動(dòng)簽收
1.2.手動(dòng)牽手
手動(dòng)簽收模式意味著MQ不會(huì)自動(dòng)簽收消息,而是把消息推送給消費(fèi)者后,等到消費(fèi)者自己去簽收消息后,再刪除隊(duì)列中的消息,這種模式可以防止消息丟失。我們可以通過(guò)下面配置來(lái)指定簽收模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #默認(rèn)是 auto 自動(dòng)簽署
然后在消費(fèi)者成功消費(fèi)完消息后,觸發(fā)手動(dòng)簽收,代碼如下
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) throws IOException {
System.out.println("receiveDirect2消費(fèi)者2收到消息:"+msg);
//拿到消息的tag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//簽收消息:指定消息的tags ,以及不做批量簽收
channel.basicAck(deliveryTag,false);
}
channel.basicAck : 簽收消息
- deliveryTag :消息的標(biāo)簽,代表了一條消息
除此之外我們還可以不簽收消息,或者拒絕消息.不簽收的消息會(huì)一直重復(fù)消費(fèi),而被拒絕的消息會(huì)丟棄掉
//不簽收
channel.basicNack(deliveryTag,false,false);
//拒絕消息
channel.basicReject(deliveryTag,false);
2.持久化
mq消息在內(nèi)存中進(jìn)行讀寫,如果MQ宕機(jī)那么消息有丟失的風(fēng)險(xiǎn),我們需要通過(guò)持久化來(lái)防止消息丟失
2.1.交換機(jī)持久化
創(chuàng)建交換機(jī)的時(shí)候,指定durable屬性為true
@Bean
public Exchange exchange(){
//定義一個(gè)direct類型的交換機(jī),并指定持久化
return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
2.2.隊(duì)列持久化
創(chuàng)建隊(duì)列時(shí)指定durable屬性為true
@Bean
public Queue queue1() {
//創(chuàng)建一個(gè)隊(duì)列隊(duì)列,并指定隊(duì)列的名字
return new Queue(QUEUE_DIRECT_1,true);
}
2.3.消息持久化
當(dāng)我們發(fā)送一個(gè)消息內(nèi)容的時(shí)候,SpringBoot會(huì)自動(dòng)幫我們持久化
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
底層會(huì)自動(dòng)構(gòu)建Message對(duì)象,Messge對(duì)象中有一個(gè)MessageProperties屬性,它包含了MessageDeliveryMode.PERSISTENT持久化和NON_PERSISTENT不持久化2中方式。
3.可靠消息投遞
3.1.回調(diào)接口介紹
在RabbitTemplate中提供了2個(gè)接口
- ConfirmCallback : 消息投遞到Brocker后觸發(fā)回調(diào),可以用來(lái)檢測(cè)消息是否到達(dá)RabbitMQ
- ReturnsCallback : 消息發(fā)送失敗回調(diào),比如隊(duì)列路由失敗
要開啟上面2中回調(diào)需要在yaml中做如下配置
spring:
rabbitmq:
publisher-returns: true #開啟returnCallback回調(diào)
template:
mandatory: true #消息會(huì)返回給發(fā)送者的回調(diào),而不是丟棄
publisher-confirm-type: correlated #開啟ConfirmCallback 回調(diào)
然后需要編寫回調(diào)接口,通過(guò)實(shí)現(xiàn) RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback
@Component
@Slf4j
public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//ReturnedMessage 消息對(duì)象中包括:交換機(jī),路由key,消息內(nèi)容等
log.info(returnedMessage.getExchange()
+","+returnedMessage.getRoutingKey()
+","+new String(returnedMessage.getMessage().getBody()));
//把失敗的消息再次發(fā)送
rabbitTemplate.convertAndSend(returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),returnedMessage.getMessage());
}
/**
* @param correlationData :消息的唯一標(biāo)識(shí)
* @param ack :消息確認(rèn)結(jié)果
* @param cause :錯(cuò)誤原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(correlationData.getId() +","+ack +","+cause);
}
}
- ConfirmCallback : 消息投遞到Brocker的exchange后觸發(fā)回調(diào),可以用來(lái)檢測(cè)消息是否到達(dá)RabbitMQ,通過(guò)confirm來(lái)把投遞結(jié)果返回,通過(guò)ack我們可以判斷消息是否投遞到MQ中。
- ReturnsCallback : 消息發(fā)送失敗回調(diào),比如隊(duì)列路由失敗。通過(guò) returnedMessage 來(lái)把失敗的消息返回,我們可以通過(guò)該方法對(duì)失敗的消息進(jìn)行重試發(fā)送
然后自定義template,把2個(gè)回調(diào)設(shè)置給template
//以下配置RabbitMQ消息服務(wù)
@Autowired
public ConnectionFactory connectionFactory;
@Autowired
private RabbitMQCallback rabbitMQCallback;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 這里的轉(zhuǎn)換器設(shè)置實(shí)現(xiàn)了發(fā)送消息時(shí)自動(dòng)序列化消息對(duì)象為message body
template.setMandatory(true);
template.setReturnsCallback(rabbitMQCallback);
template.setConfirmCallback(rabbitMQCallback);
return template;
}
3.2.可靠消息投遞方案
根據(jù)上面2個(gè)回調(diào)接口的特性,我們可以做一個(gè)可靠消息投遞方案,方案如下:
- 設(shè)計(jì)一個(gè)消息日志表,可以基于數(shù)據(jù)庫(kù),也可以基于Redis,字段有:交換機(jī),路由key, 消息內(nèi)容,發(fā)送狀態(tài)(發(fā)送中,發(fā)送成功,發(fā)送失敗),重試次數(shù)等。
- 在使用rabbitTemplate發(fā)布消息之前,把消息的內(nèi)容持久化到 :消息日志表中,狀態(tài)為:發(fā)送中
- 通過(guò)回調(diào)來(lái)監(jiān)聽消息發(fā)送結(jié)果,如果成功,把消息日志狀態(tài)修改為:成功,如果發(fā)送失敗,把消息日志修改為:失敗
- 額外創(chuàng)建定時(shí)任務(wù),定時(shí)讀取失敗的日志進(jìn)行重試發(fā)送,并增加重試次數(shù),直到發(fā)送成功,如果發(fā)送到一定次數(shù)依然不成功就不再重試,而是通知管理員搶修。
4.延遲隊(duì)列
4.1.為什么要用
在開發(fā)項(xiàng)目的時(shí)候我們通常會(huì)遇到這么一個(gè)問(wèn)題,比如商城項(xiàng)目有一下單邏輯,下單成功數(shù)據(jù)保存在數(shù)據(jù)庫(kù)中,下單成功后需要用戶進(jìn)行支付,如果在30分鐘內(nèi)支付失敗,需要修改訂單的支付狀態(tài)為“支付超時(shí)”并關(guān)閉訂單以及回退庫(kù)存操作,那如何在下單30后準(zhǔn)時(shí)檢查支付結(jié)果處理訂單狀態(tài)呢?
你可能想到了一個(gè)最簡(jiǎn)單的方法,就是使用定時(shí)任務(wù)掃描訂單表,判斷時(shí)間是否支付超時(shí),這樣的方式無(wú)疑是一種很消耗性能的做法,你試想一下,定時(shí)掃描一張數(shù)據(jù)量很大的表去判斷時(shí)間和狀態(tài),而且99%的掃描都是無(wú)效的操作。
那么該如何優(yōu)雅的解決上述問(wèn)題呢?我們可以采用延遲隊(duì)列來(lái)實(shí)現(xiàn),Redis和MQ都可以做到,本文章采用RabbitMQ的延遲隊(duì)列來(lái)實(shí)現(xiàn)。
4.2.延遲隊(duì)列實(shí)現(xiàn)原理
說(shuō)到延遲隊(duì)列就要說(shuō)一說(shuō)消息的過(guò)期時(shí)間(存活時(shí)間)TTL,RabbitMQ可以給隊(duì)列設(shè)置過(guò)期時(shí)間,也可以單獨(dú)給每個(gè)消息設(shè)置過(guò)期時(shí)間,如果到了過(guò)期時(shí)間消息沒(méi)被消費(fèi)該消息就會(huì)標(biāo)記為死信消息。
除此之外還有那些消息會(huì)成為死信消息?
- 一是設(shè)置了TTL的消息到了TTL過(guò)期時(shí)間還沒(méi)被消費(fèi),會(huì)成為死信
- 二是消息被消費(fèi)者拒收,并且reject方法的參數(shù)里requeue是false,意味這這個(gè)消息不會(huì)重回隊(duì)列,該消息會(huì)成為死信,
- 三是由于隊(duì)列大小限制,新的消息進(jìn)來(lái)隊(duì)列可能滿了,MQ會(huì)淘汰掉最老的消息,這些消息可能會(huì)成為死信消息
成為死信的消息會(huì)進(jìn)入一個(gè)死信交換機(jī)(Dead Letter Exchange)中,死信交換機(jī)也是一個(gè)普通的交換機(jī)而已,根據(jù)這一特點(diǎn),我們可以準(zhǔn)備一個(gè)隊(duì)列來(lái)接收死信交換機(jī)中的死信消息,然后準(zhǔn)備一個(gè)消費(fèi)者來(lái)消費(fèi)該隊(duì)列中的消息,這樣一來(lái)我們的延遲隊(duì)列就有思路了,還是按照訂單為例流程如下:
- 下單成功(生產(chǎn)者),加入下單消息到隊(duì)列(order.message)
- 隊(duì)列設(shè)置TTL過(guò)期時(shí)間(10000毫秒),同時(shí)指定了死信交換機(jī)“delay-exchange”和死信交換機(jī)轉(zhuǎn)發(fā)消息的隊(duì)列“delay-message”
- 消息進(jìn)入隊(duì)列,等待一段時(shí)間,如果TTL時(shí)間到,訂單消息會(huì)被MQ扔給死信交換機(jī),死信交換機(jī)會(huì)把消息扔給指定的死信隊(duì)列delay-message
- 消費(fèi)者正好監(jiān)聽了死信隊(duì)列delay-message,就可以獲取到消息進(jìn)行消費(fèi),比如檢查該消息對(duì)應(yīng)的訂單是否支付,做出退庫(kù)存處理等。
整體效果就是,消息進(jìn)入order.message隊(duì)列 延遲 10秒后就 會(huì)進(jìn)入delay-message隊(duì)列然后被消費(fèi)者消費(fèi)處理,這就是一個(gè)延遲隊(duì)列的效果。
注意,這里的delay-exchange死信交換機(jī)其實(shí)就是一個(gè)普通的交換機(jī)而已,所以我們可以把上面的兩個(gè)交換機(jī)合并成一個(gè),如下:
4.3.延遲隊(duì)列實(shí)戰(zhàn)
第一步,定義交換機(jī)和隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
//rabbitMQ的配置
@Configuration
public class MQConfig {
//交換機(jī)
public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
//訂單隊(duì)列,該隊(duì)列中的消息設(shè)置過(guò)期時(shí)間
public static final String QUEUE_ORDER = "QUEUE_ORDER";
//該隊(duì)列用來(lái)接收死信交換機(jī)轉(zhuǎn)發(fā)過(guò)來(lái)的消息
public static final String QUEUE_DELAY = "QUEUE_DELAY";
//隊(duì)列的路由鍵,該路由鍵用來(lái)接收訂單消息傳出到訂單隊(duì)列
public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
//該路由鍵用來(lái)接收死信交換機(jī)轉(zhuǎn)發(fā)過(guò)來(lái)的消息
public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";
//定義交換機(jī)
@Bean
public Exchange exchangeDelay(){
return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
}
//該隊(duì)列中的消息需要設(shè)置ttl
@Bean
public Queue queueOrder(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", EXCHNAGE_DELAY); //過(guò)期的消息給哪個(gè)交換機(jī)的名字
map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY); //死信交換機(jī)把消息個(gè)哪個(gè)個(gè)routingkey
map.put("x-message-ttl", 10000); //隊(duì)列過(guò)期時(shí)間10s
return new Queue(QUEUE_ORDER,true,false,false,map);
}
//該隊(duì)列接收死信交換機(jī)轉(zhuǎn)發(fā)過(guò)來(lái)的消息
@Bean
public Queue queueDelay(){
return new Queue(QUEUE_DELAY,true);
}
@Bean
public Binding queueOrderBinding(){
return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
}
@Bean
public Binding queueDelayBinding(){
return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
第二步,寫一個(gè)消息發(fā)送者
System.out.println("發(fā)送消息:我是一個(gè)延遲消息,開始時(shí)間:"+System.currentTimeMillis());
rabbitTemplate.convertAndSend(
MQConfig.EXCHNAGE_DELAY,
MQConfig.ROUTINGKEY_QUEUE_ORDER,
"我是一個(gè)延遲消息"
);
第三步,寫一個(gè)消費(fèi)者
@Component
public class Consumer {
@RabbitListener(queues = MQConfig.QUEUE_DELAY)
public void handler(String message){
System.out.println("收到消息:"+message+",結(jié)束時(shí)間:"+System.currentTimeMillis());
}
}
第六步,測(cè)試效果
- 生產(chǎn)者執(zhí)行后,觀察MQ,QUEUE_ORDER中有消息
- 等待10s之后,消息進(jìn)入QUEUE_DELAY隊(duì)列
- 控制臺(tái)打印效果
Producer: 發(fā)送消息:我是一個(gè)延遲消息,開始時(shí)間:1606295976347
Consumer: 收到消息:我是一個(gè)延遲消息,結(jié)束時(shí)間:1606295986418
發(fā)送消息到收到消息的時(shí)間差為 10071 , 忽略網(wǎng)絡(luò)開銷,延遲時(shí)間差不多就是我們?cè)O(shè)置的TTL時(shí)間文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-837432.html
5.消息重復(fù)消費(fèi)
因?yàn)橄⒈旧硎怯兄卦嚈C(jī)制或者我們?yōu)榱吮WC消息一定能投遞成功可能會(huì)導(dǎo)致消息多次投遞,那么對(duì)于消費(fèi)者而言消息的重復(fù)消費(fèi)處理就變得非常重要。通常我們可以使用消息的唯一標(biāo)識(shí)來(lái)避免重復(fù)消費(fèi),大概思路如下文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-837432.html
- 找到消息本省的唯一標(biāo)識(shí),或者在數(shù)據(jù)中設(shè)置一個(gè)唯一標(biāo)識(shí),比如:訂單就可以把訂單號(hào)作為唯一標(biāo)識(shí)
- 消費(fèi)者每次做了消息消費(fèi)后,會(huì)把這個(gè)唯一標(biāo)識(shí)記錄下來(lái),比如記錄到數(shù)據(jù)庫(kù),或者Redis都可以。
- 消費(fèi)者每次消費(fèi)前都拿到消息的這個(gè)唯一標(biāo)識(shí)去判斷一下消息是否被消費(fèi),如果已經(jīng)被消費(fèi)國(guó)了就不要再消費(fèi)了
四.面試必備
- MQ的使用場(chǎng)景
- RabbitMQ的工作流程
- RabbitMQ如何防止消息丟失
- RabbitMQ的消息模型有哪幾種(交換機(jī)有哪幾種)
- 如何處理消息重復(fù)消費(fèi)
- 如何實(shí)現(xiàn)延遲隊(duì)列(如果通過(guò)MQ實(shí)現(xiàn)訂單超時(shí)取消)
- 什么情況下消息會(huì)變成死信消息
- 消息的簽收模式有哪幾種,有什么區(qū)別
到了這里,關(guān)于RabbitMQ入門到實(shí)戰(zhàn)一篇文章就夠了的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!