一、死信隊(duì)列
1.1 相關(guān)概念
死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō),producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因?qū)е?queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列
應(yīng)用場(chǎng)景:
- 為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中
- 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)間未支付時(shí)自動(dòng)失效
1.2 死信的來(lái)源
- 消息TTL過(guò)期 【Time to live 存活時(shí)間】
- 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無(wú)法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false 【消息應(yīng)答被拒絕并且不能重新返回隊(duì)列】
1.3 死信實(shí)戰(zhàn)
?? 1、我們先看一下案例的整體架構(gòu)圖
- 一個(gè)生產(chǎn)者、兩個(gè)消費(fèi)者
- 一個(gè)消費(fèi)者從正常隊(duì)列接收消息、另一個(gè)從死信隊(duì)列接收消息
- 一個(gè)消費(fèi)者從正常隊(duì)列接收消息、另一個(gè)從死信隊(duì)列接收消息
?? 2、演示第一種情況:消息TTL過(guò)期
- 我們需要思考如何將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)?
- 此時(shí)就用到了我們聲明隊(duì)列的第四個(gè)參數(shù),準(zhǔn)備一個(gè) map 集合
- 以添加鍵值對(duì)的方式設(shè)置 死信交換機(jī) 和 routingKey
//正常隊(duì)列綁定死信隊(duì)列信息 Map<String, Object> params = new HashMap<>(); //正常隊(duì)列設(shè)置死信交換機(jī) params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常隊(duì)列設(shè)置死信 routingKey params.put("x-dead-letter-routing-key", "lisi");
- 此時(shí)就用到了我們聲明隊(duì)列的第四個(gè)參數(shù),準(zhǔn)備一個(gè) map 集合
- 第二個(gè)我們需要思考如何設(shè)置過(guò)期時(shí)間
-
首先,過(guò)期時(shí)間肯定是設(shè)置給消息的,所以應(yīng)該在生產(chǎn)者發(fā)出消息前進(jìn)行設(shè)置
-
這就用到了我們發(fā)送消息的第三個(gè)參數(shù),通過(guò) AMQP.BasicProperties 指明過(guò)期時(shí)間
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
-
- 接下來(lái)是代碼部分和效果演示:
1?? 生產(chǎn)者
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
/**
* @author Bonbons
* @version 1.0
* 死信隊(duì)列的生產(chǎn)者
*/
public class Producer {
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//為了實(shí)現(xiàn)消息的存活時(shí)間為10s
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
//演示過(guò)期成為死信消息
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("UTF-8"));
}
}
}
2?? 消費(fèi)者1
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
* @author Bonbons
* @version 1.0
* 演示我們死信隊(duì)列的消費(fèi)者1號(hào)
*/
public class Consumer01 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
//獲取信道
Channel channel = RabbitMqUtils.getChannel();
//聲明交換機(jī)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//正常隊(duì)列綁定死信隊(duì)列信息
Map<String, Object> params = new HashMap<>();
//正常隊(duì)列設(shè)置死信交換機(jī)
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊(duì)列設(shè)置死信 routingKey
params.put("x-dead-letter-routing-key", "lisi");
//聲明隊(duì)列 [此處用到了第四個(gè)參數(shù),實(shí)現(xiàn)將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)]
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//將我們的隊(duì)列和交換機(jī)綁定到一起
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息......");
channel.basicConsume(NORMAL_QUEUE, true, (consumersTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("Consumer01接收到的消息: " + msg);
}, consumersTag -> {});
}
}
3?? 消費(fèi)者2
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
* @author Bonbons
* @version 1.0
* 演示我們死信隊(duì)列的消費(fèi)者2號(hào),就負(fù)責(zé)接收死信隊(duì)列的消息
*/
public class Consumer02 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
//獲取信道
Channel channel = RabbitMqUtils.getChannel();
//最普通的消費(fèi)者
System.out.println("等待接收消息......");
channel.basicConsume(DEAD_QUEUE, true, (consumersTag, message) -> {
System.out.println("Consumer02接收到的消息: " + new String(message.getBody(), "UTF-8"));
}, consumersTag -> {});
}
}
- 我們通過(guò)不啟動(dòng)消費(fèi)者1(C1)來(lái)演示消息過(guò)期的效果
- 因?yàn)闆](méi)有消費(fèi)者1,當(dāng)生產(chǎn)者的消息發(fā)過(guò)來(lái)就沒(méi)有消費(fèi)者進(jìn)行處理,此處10s后就會(huì)成為死信消息
- 成為死信消息后就會(huì)從當(dāng)前的這個(gè)隊(duì)列發(fā)往死信交換機(jī)
- 死信交換機(jī)將死信消息發(fā)往死信隊(duì)列,我們的第二個(gè)消費(fèi)者從死信隊(duì)列獲取消息并消費(fèi)
?? 3、演示第二種情況:隊(duì)列達(dá)到最大長(zhǎng)度
- 只需要在上面的代碼中進(jìn)行部分修改
-
第一步,去掉生產(chǎn)者中消息的過(guò)期時(shí)間參數(shù)設(shè)置
-
第二步 ,在消費(fèi)者C1的聲明普通隊(duì)列的map集合參數(shù)加入一條關(guān)于隊(duì)列長(zhǎng)度的限制
-
params.put("x-max-length", 6);
-
-
重點(diǎn): 當(dāng)我們想要修改已經(jīng)存在隊(duì)列、交換機(jī)的屬性時(shí),需要將已經(jīng)存在的刪除,然后運(yùn)行代碼重新創(chuàng)建
- 通過(guò)RabbitMQ的Web管理工具我們可以看出達(dá)到了預(yù)期的測(cè)試效果:
- 發(fā)送十條消息,正常隊(duì)列里有六條消息,死信隊(duì)列里有四條消息
?? 4、第三種情況:消息被拒并不能返回隊(duì)列
- 對(duì)于生產(chǎn)者與第二種情況一致,消費(fèi)者C2與第一種情況一致
- 與前兩種情況最大的不同在于消費(fèi)者C1的設(shè)定,接下來(lái)我們看一下這部分的代碼
- 要關(guān)閉自動(dòng)應(yīng)答
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
* @author Bonbons
* @version 1.0
* 演示我們死信隊(duì)列的消費(fèi)者1號(hào)
*/
public class Consumer01 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
//獲取信道
Channel channel = RabbitMqUtils.getChannel();
//聲明交換機(jī)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//正常隊(duì)列綁定死信隊(duì)列信息
Map<String, Object> params = new HashMap<>();
//正常隊(duì)列設(shè)置死信交換機(jī)
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊(duì)列設(shè)置死信 routingKey
params.put("x-dead-letter-routing-key", "lisi");
//設(shè)置正常隊(duì)列的長(zhǎng)度限制
// params.put("x-max-length", 6);
//聲明隊(duì)列 [此處用到了第四個(gè)參數(shù),實(shí)現(xiàn)將普通隊(duì)列與死信交換機(jī)關(guān)聯(lián)起來(lái)]
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//將我們的隊(duì)列和交換機(jī)綁定到一起
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息......");
//此處要開(kāi)啟手動(dòng)應(yīng)答,因?yàn)槿绻O(shè)置為true(自動(dòng)應(yīng)答)就不存在拒絕消息這一說(shuō)了
channel.basicConsume(NORMAL_QUEUE, false, (consumersTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if(msg.equals("info5")){
System.out.println("Consumer01接收到的消息: " + msg + ": 此消息是被C1拒絕的");
//拒絕消息,并設(shè)置不重新添加到隊(duì)列 >> 變成死信消息
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else{
System.out.println("Consumer01接收到的消息: " + msg);
}
}, consumersTag -> {});
}
}
二、延遲隊(duì)列
- 此部分開(kāi)始,我們不再使用簡(jiǎn)單的Maven項(xiàng)目,而是創(chuàng)建一個(gè)SpringBoot項(xiàng)目來(lái)演示
2.1 延遲隊(duì)列的概念
- 延時(shí)隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理
- 簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列
- 對(duì)于我們死信隊(duì)列的C2消費(fèi)者來(lái)說(shuō),它就是一個(gè)延遲隊(duì)列
2.2 延遲隊(duì)列使用場(chǎng)景
- 1.訂單在十分鐘之內(nèi)未支付則自動(dòng)取消
- 2.新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
- 3.用戶注冊(cè)成功后,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒。
- 4.用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
- 5.預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議
這些場(chǎng)景都有一個(gè)特點(diǎn),需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;看起來(lái)似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求,如果對(duì)于時(shí)間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案。但對(duì)于數(shù)據(jù)量比較大,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多,活動(dòng)期間甚至?xí)_(dá)到百萬(wàn)甚至千萬(wàn)級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無(wú)法完成所有訂單的檢查,同時(shí)會(huì)給數(shù)據(jù)庫(kù)帶來(lái)很大壓力,無(wú)法滿足業(yè)務(wù)要求而且性能低下?!疽鑫覀兊难舆t隊(duì)列】
2.3 RabbitMQ 中的 TTL
-
TTL 是什么呢?
- TTL 是 RabbitMQ 中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。
- 換句話說(shuō),如果一條消息設(shè)置了 TTL 屬性或者進(jìn)入了設(shè)置 TTL 屬性的隊(duì)列,那么這條消息如果在 TTL 設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為"死信"。
- 如果同時(shí)配置了隊(duì)列的 TTL 和消息的TTL,那么較小的那個(gè)值將會(huì)被使用,有兩種方式設(shè)置 TTL。
?? 1、如何為消息設(shè)置 TTL 呢?
- 在我們使用 RabbitTemplate 的 convertAndSend 方法發(fā)送消息時(shí),通過(guò)參數(shù)設(shè)置消息的過(guò)期時(shí)間 【SpringBoot】
?? 2、如何為隊(duì)列設(shè)置 TTL 呢?
- 在我們創(chuàng)建隊(duì)列之間,設(shè)置隊(duì)列 map 集合參數(shù)時(shí),設(shè)置我們隊(duì)列的過(guò)期時(shí)間 【SpringBoot】
?? 3、兩者有什么區(qū)別呢?
- 區(qū)別:
- 第一種方式:如果設(shè)置了隊(duì)列的 TTL 屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄(如果配置了死信隊(duì)列被丟到死信隊(duì)列中),
- 第二種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)?mark>消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間
- 另外,還需要注意的一點(diǎn)是,如果不設(shè)置 TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期
- 如果將 TTL 設(shè)置為 0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。
2.4 整合 SpringBoot
?? 1、創(chuàng)建項(xiàng)目
?? 2、添加依賴 【此處直接貼上我們的pom.xml】
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.atguigu.rabbitmq</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq</name>
<description>springboot-rabbitmq</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 測(cè)試依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
?? 3、修改配置文件 【連接到我們的RabbitMQ】
spring.rabbitmq.host=8.130.95.101
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
?? 4、添加 Swagger 配置類
- 根據(jù)在代碼中使用自定義的注解來(lái)生成接口文檔,這個(gè)在前后端分離的項(xiàng)目中很重要
- 這樣做的好處是 在開(kāi)發(fā)接口時(shí)可以通過(guò)swagger 將接口文檔定義好,同時(shí)也方便以后的維護(hù)
package com.atguigu.rabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文檔")
.description("本文檔描述了 rabbitmq 微服務(wù)接口定義")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1551388580@qq.com"))
.build();
}
}
2.5 隊(duì)列 TTL
- 就是通過(guò)死信隊(duì)列的方式實(shí)現(xiàn)延遲隊(duì)列,接下來(lái)直接通過(guò)案例演示
- 首先,我們要知道:
- 聲明工作通過(guò)配置類完成 【就是定義聲明隊(duì)列、交換機(jī)】
- 生產(chǎn)者通過(guò)controller完成 【以瀏覽器發(fā)送請(qǐng)求的方式讓生產(chǎn)者發(fā)送消息】
- 消費(fèi)者通過(guò)監(jiān)聽(tīng)器完成 【通過(guò)監(jiān)聽(tīng)器監(jiān)聽(tīng)對(duì)應(yīng)的隊(duì)列,來(lái)完成消費(fèi)者的功能】
1?? 代碼架構(gòu)圖
- 創(chuàng)建兩個(gè)隊(duì)列 QA 和 QB,兩者隊(duì)列 TTL 分別設(shè)置為 10S 和 40S
- 然后在創(chuàng)建一個(gè)交換機(jī) X 和死信交換機(jī) Y,它們的類型都是 direct
- 創(chuàng)建一個(gè)死信隊(duì)列 QD,它們的綁定關(guān)系如下
2?? 配置類代碼 【定義隊(duì)列、交換機(jī)部分】
package com.atguigu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author Bonbons
* @version 1.0
* 延遲隊(duì)列演示配置類
*/
@Configuration
public class TtlQueueConfig {
//普通交換機(jī)
public static final String X_EXCHANGE = "X";
//死信交換機(jī)
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通隊(duì)列
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信隊(duì)列
public static final String DEAD_LETTER_QUEUE = "QD";
//聲明我們的兩個(gè)交換機(jī)
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//聲明我們的兩個(gè)普通隊(duì)列
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的長(zhǎng)度
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routingKey
arguments.put("x-dead-letter-routing-key", "YD");
//設(shè)置過(guò)期時(shí)間
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的長(zhǎng)度
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routingKey
arguments.put("x-dead-letter-routing-key", "YD");
//設(shè)置過(guò)期時(shí)間
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
//聲明我們的死信隊(duì)列
@Bean("queueD")
public Queue queueD(){
// return new Queue(DEAD_LETTER_QUEUE); //兩種創(chuàng)建隊(duì)列的方式均可
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
//將我們的兩個(gè)普通隊(duì)列綁定到普通交換機(jī)X上
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//將我們的死信隊(duì)列綁定到死信交換機(jī)上
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
3?? 消息生產(chǎn)者 【編寫一個(gè)處理請(qǐng)求的控制器】
package com.atguigu.rabbitmq.controller;
import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author Bonbons
* @version 1.0
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
RabbitTemplate rabbitTemplate;
//發(fā)送我們基礎(chǔ)死信消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條消息給兩個(gè)TTL隊(duì)列: {}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息來(lái)自ttl為10s的隊(duì)列" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息來(lái)自ttl為40s的隊(duì)列" + message);
}
}
4?? 消息消費(fèi)者 【一個(gè)組件類里定義Rabbit監(jiān)聽(tīng)器】
package com.atguigu.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author Bonbons
* @version 1.0
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接收消息
@RabbitListener(queues = "QD")
public void receiveMsg(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間: {}, 收到死信隊(duì)列的消息: {}", new Date().toString(), msg);
}
}
- 接下來(lái)我們通過(guò)瀏覽器發(fā)送請(qǐng)求進(jìn)行測(cè)試:
- 第一條消息在 10S 后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在 40S 之后變成了死信消息,然后被消費(fèi)掉,這樣一個(gè)延時(shí)隊(duì)列就打造完成了
- 但是基于上述這種情況,每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求?
2.6 延遲隊(duì)列優(yōu)化
- 就是我們先不為隊(duì)列設(shè)置 TTL,而是通過(guò)瀏覽器發(fā)起請(qǐng)求的占位符來(lái)攜帶我們的過(guò)期時(shí)間
- 注意此處的過(guò)期時(shí)間,我們是設(shè)置給消息的,并沒(méi)有設(shè)置給隊(duì)列
- 映射URL綁定的占位符 帶占位符的URL是 Spring3.0 新增的功能
1?? 代碼架構(gòu)圖
- 基于上面通過(guò)TTL設(shè)置的延遲隊(duì)列,我們新增了一個(gè)隊(duì)列 QC,綁定關(guān)系如下,該隊(duì)列不設(shè)置 TTL 時(shí)間
2?? 配置類代碼
- 只需要在我上面的配置類 TtlQueueConfig.java 代碼里增加下面的內(nèi)容
//優(yōu)化我們的延遲隊(duì)列,再設(shè)置一個(gè)普通隊(duì)列,不為其設(shè)置過(guò)期時(shí)間,讓它做公共隊(duì)列
public static final String QUEUE_C = "QC";
//聲明這個(gè)隊(duì)列
@Bean
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>(3);
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//綁定到我們的交換機(jī)X
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
3?? 消息生產(chǎn)者代碼 【增加了一個(gè)新的請(qǐng)求映射】
- 在我們上面的生產(chǎn)者代碼文件中進(jìn)行添加即可
//發(fā)送我們優(yōu)化后的延遲隊(duì)列的消息
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}毫秒的消息給TTL隊(duì)列QC: {}", new Date().toString(), ttlTime, message);
//使用springboot為我們提供的rabbitTemplate來(lái)發(fā)送延遲消息
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
//設(shè)置過(guò)期時(shí)間
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
- 對(duì)于消費(fèi)者代碼不變,接下來(lái)我們發(fā)起請(qǐng)求進(jìn)行測(cè)試:
- 我們先發(fā)起一個(gè)過(guò)期時(shí)間長(zhǎng)的請(qǐng)求,再發(fā)起一個(gè)過(guò)期時(shí)間短的請(qǐng)求,這樣便于說(shuō)明優(yōu)化后延遲隊(duì)列的問(wèn)題
- 看起來(lái)似乎沒(méi)什么問(wèn)題,但是在最開(kāi)始的時(shí)候,就介紹過(guò)如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“
- 因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行
2.7 Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列
- 對(duì)于上文優(yōu)化后延遲隊(duì)列存在的問(wèn)題,我們可以通過(guò)使用 RabbitMQ 為我們提供的插件解決
- 我們需要知道,這種基于插件的==實(shí)現(xiàn)延遲的效果是在交換機(jī)處完成的 ==
1?? 安裝延遲隊(duì)列插件 【在我們的第一篇配置文章資源包里有】
-
在官網(wǎng)上下載 https://www.rabbitmq.com/community-plugins.html,下載 rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄
-
進(jìn)入 RabbitMQ 的安裝目錄下的 plgins 目錄,執(zhí)行下面命令讓該插件生效,然后重啟 RabbitMQ
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
rabbitmqctl start_app
-
通過(guò) RabbitMQ 的 web 控制臺(tái)可以看到添加插件前后交換機(jī)處的變換
- 插件安裝好了之后,就可以演示如何使用我們基于插件的方式實(shí)現(xiàn)延遲隊(duì)列
2?? 代碼架構(gòu)圖
- 在這里新增了一個(gè)隊(duì)列 delayed.queue,一個(gè)自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:
3?? 配置類代碼
在我們自定義的交換機(jī)中,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并不會(huì)立即投遞到目標(biāo)隊(duì)列中,而是存儲(chǔ)在 mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時(shí)間時(shí),才投遞到目標(biāo)隊(duì)列中
新建的配置類
package com.atguigu.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author Bonbons
* @version 1.0
*/
@Configuration
public class DelayedQueueConfig {
//定義我們的交換機(jī)、隊(duì)列、RoutingKey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//聲明交換機(jī)
@Bean("delayedExchange")
public CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
//延遲類型
arguments.put("x-delayed-type", "direct");
/**
* 1、交換機(jī)的名字
* 2、交換機(jī)的類型 [指明是延遲消息]
* 3、是否需要持久化
* 4、是否開(kāi)啟自動(dòng)刪除
* 5、其他參數(shù)
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
true, false, arguments);
}
//聲明隊(duì)列
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//將交換機(jī)與隊(duì)列進(jìn)行綁定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
4?? 消息生產(chǎn)者
- 在原來(lái)的生產(chǎn)者代碼里添加
//發(fā)送我們使用插件的延遲隊(duì)列消息
@GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime){
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}毫秒的消息給延遲隊(duì)列delayed.queue: {}", new Date().toString(), delayedTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
//設(shè)置延遲消息
msg.getMessageProperties().setDelay(delayedTime);
return msg;
});
}
5?? 消息消費(fèi)者
新的監(jiān)聽(tīng)器
package com.atguigu.rabbitmq.consumer;
import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author Bonbons
* @version 1.0
*/
@Slf4j
@Component
public class DelayQueueConsumer {
//設(shè)置監(jiān)聽(tīng)器
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間: {}, 收到延遲隊(duì)列的消息: {}", new Date().toString(), msg);
}
}
- 發(fā)起請(qǐng)求,測(cè)試效果
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-736805.html
6?? 總結(jié)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-736805.html
- 延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。
- 另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
- 當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的時(shí)間輪,這些方式各有特點(diǎn),看需要適用的場(chǎng)景
到了這里,關(guān)于【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!