1、延遲隊列的概念
延遲隊列內(nèi)部是有序的,重要的特性體現(xiàn)在它的延遲屬性上,延遲隊列中的元素希望在指定時間到了之后或之前取出處理,簡單的說延遲隊列就是用來存放需要在指定時間被處理的元素的隊列。
2、延遲隊列的應用場景
(1)訂單指定時間內(nèi)未支付則自動取消
(2)用戶發(fā)起退款,指定時間內(nèi)未處理則通知相關運營人員
3、定時任務和延遲隊列的取舍
以上場景都有一個特點,那就是都需要在某個事件發(fā)生前或發(fā)生后執(zhí)行一項任務,如生成訂單后,在十分鐘后檢查訂單狀態(tài),未支付的訂單將關閉,這種場景也可以用定時任務來處理,但數(shù)據(jù)量比價少的話確實可以用定時任務來處理,但在活動期間,訂單的數(shù)據(jù)量可能會變得很龐大,對于龐大的數(shù)據(jù),定時任務很難在1秒內(nèi)檢查完訂單,從而不能及時的關閉未支付的訂單,而且用定時任務來檢查訂單會給數(shù)據(jù)庫帶來很大的壓力,所以在數(shù)據(jù)量大的情況下,定時任務無法滿足業(yè)務需求且性能低下
4、延遲隊列架構圖?(后面我們就根據(jù)這個架構圖進行代碼的設計與實現(xiàn))
??
5、延遲隊列的實現(xiàn)
(1)新建一個名為config的包,用于裝實現(xiàn)特定配置的代碼
效果圖:
(2)在config包里新建一個名為TtlQueueConfig的類用于編寫配置隊列延遲的代碼
代碼如下:
package com.ken.springbootrqbbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 用于配置TTL隊列的延遲時間
*/
@Configuration
public class TtlQueueConfig {
//普通交換機的名稱
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機的名稱
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通隊列的名稱
public static final String NORMAL_QUEUE01 = "normal_queue01";
public static final String NORMAL_QUEUE02 = "normal_queue02";
//死信隊列的名稱
public static final String DEAD_QUEUE = "dead_queue";
//聲明普通交換機
@Bean("normalExchange")
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//聲明交換機交換機
@Bean("deadExchange")
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//聲明普通隊列,TTL為10S
@Bean("normalQueue01")
public Queue normalQueue01() {
Map<String, Object> arguments = new HashMap<>();
//設置死信交換機
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//設置死信RoutignKey
arguments.put("x-dead-letter-routing-key","dead");
//設置TTL
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
}
//聲明普通隊列,TTL為40S
@Bean("normalQueue02")
public Queue normalQueue02() {
Map<String, Object> arguments = new HashMap<>();
//設置死信交換機
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//設置死信RoutignKey
arguments.put("x-dead-letter-routing-key","dead");
//設置TTL
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
}
//聲明死信隊列
@Bean("deadQueue")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//綁定隊列1和普通交換機
@Bean
public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
}
//綁定隊列2和普通交換機
@Bean
public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
}
//綁定隊列2和普通交換機
@Bean
public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
@Qualifier("deadExchange") DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
}
}
(3)新建一個名為controller的包,用于裝控制層的代碼
效果圖:
(4)新建一個名為SendMsgController的類用于充當生產(chǎn)者用于發(fā)送消息
?代碼如下:
package com.ken.springbootrqbbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 發(fā)送延遲消息
*/
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("當前時間:{},發(fā)送一條消息給兩個TTL隊列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("normal_exchange","normal01","消息來著ttl為10s的隊列:" + message);
rabbitTemplate.convertAndSend("normal_exchange","normal02","消息來著ttl為40s的隊列:" + message);
}
}
(5)新建一個名為consumer的包,用于裝消費者的代碼
效果圖:
(6)新建一個名為DeadQueueConsumer的類用于消費死信隊列里的消息
代碼如下:
package com.ken.springbootrqbbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 死信隊列消費者
*/
@Slf4j
@Component
public class DeadQueueConsumer {
//接收消息
@RabbitListener(queues = "dead_queue")
public void receiveMsg(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("當前時間:{},收到死信隊列的消息:{}",new Date().toString(),msg);
}
}
(7)進入項目的啟動類啟動項目
(8)啟動完畢后在瀏覽器地址欄輸入http://localhost:8080/ttl/sendMsg/參數(shù)往隊列發(fā)送消息
?
?(9)查看控制臺的輸出,發(fā)現(xiàn)分別在10s和40s后進行輸出,這證明我們的延遲隊列成功運行
?
6、延遲隊列的優(yōu)化
雖然上述能實現(xiàn)延遲隊列,但上述的實現(xiàn)過程是一個隊列只能延遲固定的已經(jīng)設置好的時間,若想增加一個新的時間需要,用上述的實現(xiàn)方法就只能新增一個隊列,這樣很麻煩,所以我們需要優(yōu)化延遲隊列
(1)延遲隊列優(yōu)化架構圖?(后面我們就根據(jù)這個架構圖對延遲隊列進行優(yōu)化)
(2)修改config包里TtlQueueConfig類的代碼,多加一些關于NormalQueue03隊列的配置
代碼如下:
package com.ken.springbootrqbbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 用于配置TTL隊列的延遲時間
*/
@Configuration
public class TtlQueueConfig {
//普通交換機的名稱
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機的名稱
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通隊列的名稱
public static final String NORMAL_QUEUE01 = "normal_queue01";
public static final String NORMAL_QUEUE02 = "normal_queue02";
//自定義延遲時間隊列的名稱
public static final String NORMAL_QUEUE03 = "normal_queue03";
//死信隊列的名稱
public static final String DEAD_QUEUE = "dead_queue";
//聲明普通交換機
@Bean("normalExchange")
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//聲明交換機交換機
@Bean("deadExchange")
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//聲明普通隊列,TTL為10S
@Bean("normalQueue01")
public Queue normalQueue01() {
Map<String, Object> arguments = new HashMap<>(3);
//設置死信交換機
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//設置死信RoutignKey
arguments.put("x-dead-letter-routing-key","dead");
//設置TTL
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
}
//聲明普通隊列,TTL為40S
@Bean("normalQueue02")
public Queue normalQueue02() {
Map<String, Object> arguments = new HashMap<>(3);
//設置死信交換機
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//設置死信RoutignKey
arguments.put("x-dead-letter-routing-key","dead");
//設置TTL
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
}
//聲明普通隊列,TTL為40S
@Bean("normalQueue03")
public Queue normalQueue03() {
Map<String, Object> arguments = new HashMap<>(3);
//設置死信交換機
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//設置死信RoutignKey
arguments.put("x-dead-letter-routing-key","dead");
//設置TTL
return QueueBuilder.durable(NORMAL_QUEUE03).withArguments(arguments).build();
}
//聲明死信隊列
@Bean("deadQueue")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//綁定隊列1和普通交換機
@Bean
public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
}
//綁定隊列2和普通交換機
@Bean
public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
}
//綁定隊列3和普通交換機
@Bean
public Binding queue03BindNormalExchange(@Qualifier("normalQueue03") Queue normalQueue03,
@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue03).to(normalExchange).with("normal03");
}
//綁定死信隊列和死信交換機
@Bean
public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
@Qualifier("deadExchange") DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
}
}
(3)修改controller包里SendMsgController類的代碼,多加一個調(diào)用自定義延遲時間NormalQueue03隊列的接口
代碼如下:
package com.ken.springbootrqbbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 發(fā)送延遲消息
*/
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("當前時間:{},發(fā)送一條消息給兩個TTL隊列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("normal_exchange","normal01","消息來著ttl為10s的隊列:" + message);
rabbitTemplate.convertAndSend("normal_exchange","normal02","消息來著ttl為40s的隊列:" + message);
}
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
log.info("當前時間:{},發(fā)送一條時長{}毫秒的TTL消息給normal03隊列:{}", new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
//發(fā)送消息的時候延遲時長
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
}
?(4)進入項目的啟動類重新啟動項目
(5)啟動完畢后分別在瀏覽器地址欄輸http://localhost:8080/ttl/sendExpirationMsg/第一個參數(shù)/20000和http://localhost:8080/ttl/sendExpirationMsg/第二個參數(shù)/2000隊列發(fā)送消息
例:
?
?(6)查看控制臺的輸出,發(fā)現(xiàn)第一條消息在20s后進行了輸出,這證明我們優(yōu)化后的延遲隊列成功運行?,但當我們發(fā)送多條消息時,消息可能不會按時"死亡"從而不能按時把消息發(fā)送到死信隊列,如圖里的第二條消息,在第一條消息被消費后緊跟著被消費,而不是隔2秒后被消費,這是因為RabbitMQ只會檢查第一條消息是否過期,過期則會被扔進死信隊列,如果第一條消息延遲時間很長,第二條消息延遲時間很短,第二條消息也并不會被優(yōu)先消費,而是等到第一條消息被消費后第二條消息再被消費,這時需要我們用另一種方法去實現(xiàn)延遲隊列(另一種方法放在下一篇文章介紹)文章來源:http://www.zghlxwxcb.cn/news/detail-726587.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-726587.html
到了這里,關于RabbitMQ系列(17)--延遲隊列的簡介與實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!