一、序言
業(yè)務(wù)開發(fā)中有很多延時(shí)操作的場(chǎng)景,比如最常見的超時(shí)訂單自動(dòng)關(guān)閉
、延時(shí)異步處理
,我們常用的實(shí)現(xiàn)方式有:
- 定時(shí)任務(wù)輪詢(有延時(shí))。
- 借助Redission的延時(shí)隊(duì)列。
- Redis的key過期事件通知機(jī)制(需開啟key過期事件通知,對(duì)Redis有性能損耗)。
- RocketMQ中定時(shí)消息推送(支持的時(shí)間間隔固定,不支持自定義)。
- RabbitMQ中的死信隊(duì)列和延遲消息交換機(jī)。
其中用的最多的也是借助Redisson實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)延遲隊(duì)列
和RabbitMQ中的死信隊(duì)列來實(shí)現(xiàn)
,今天我們通過RabbitMQ死信隊(duì)列和延遲消息交換機(jī)(新特性)來實(shí)現(xiàn)延時(shí)消息推送。
二、死信交換機(jī)和消息TTL實(shí)現(xiàn)延遲消息
1、死信隊(duì)列介紹
這種方式主要通過結(jié)合消息過期和私信交換機(jī)來實(shí)現(xiàn)延遲消息推送,首先先了解下哪些消息會(huì)進(jìn)入死信隊(duì)列:
- 被消費(fèi)者
nack
(negatively acknowleged)的消息。 -
TTL
過期后未被消費(fèi)的消息。 - 超過隊(duì)列長(zhǎng)度限制后被丟棄的消息。
備注:更多信息請(qǐng)參考RabbitMQ中的 Dead Letter Exchange。
2、代碼示例
(1) 死信交換機(jī)配置
@Configuration
protected static class DeadLetterExchangeConfig {
@Bean
public Queue deadLetterQueue(){
return QueueBuilder.durable("dead-letter-queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("dead-letter-exchange").build();
}
@Bean
public Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
}
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal-queue")
.deadLetterExchange("dead-letter-exchange")
.deadLetterRoutingKey("dead-letter-routing-key")
.build();
}
}
(2) 消息生產(chǎn)者
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {
log.info("開始發(fā)送消息到dead letter exchange 消息體:{}, 消息延遲:{}ms, 當(dāng)前時(shí)間:{}", body, timeoutInMillSeconds, LocalDateTime.now());
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
rabbitTemplate.send("normal-queue", message);
}
}
(3) 消息消費(fèi)者
@Slf4j
@Component
public class RabbitMqConsumer {
@RabbitListener(queues = "dead-letter-queue")
public void handleMsgFromDeadLetterQueue(String msg) {
log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());
}
}
3、測(cè)試用例
@RestController
@RequiredArgsConstructor
public class RabbitMsgController {
private final RabbitMqProducer rabbitMqProducer;
@RequestMapping("/exchange/dead-letter")
public ResponseEntity<String> sendMsgToDeadLetterExchange(String body, int timeout) {
rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);
return ResponseEntity.ok("消息發(fā)送到死信交換機(jī)成功");
}
}
瀏覽器訪問http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000
,可以看到消息被延遲5s處理。
2023-11-26 11:50:33.041 INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到dead letter exchange 消息體:hello, 消息延遲:5000ms, 當(dāng)前時(shí)間:2023-11-26T11:50:33.041
2023-11-26 11:50:38.054 INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054
三、延遲消息交換機(jī)實(shí)現(xiàn)延遲消息
上面通過消息TTL和死信交換機(jī)實(shí)現(xiàn)延遲消息的解決方案是由一個(gè)叫James Carr
的人提出來的,后來RabbitMQ提供了一個(gè)開箱即用的解決方案,通過延時(shí)消息插件來實(shí)現(xiàn)。
該插件以前被當(dāng)做是試驗(yàn)性產(chǎn)品,但是現(xiàn)在已經(jīng)可以投產(chǎn)使用了。(PS:2015年4月16號(hào)就已經(jīng)有該插件文檔)
在Spring AMQP中,同樣提供了對(duì)該延時(shí)消息插件的支持,并且在RabbitMQ 3.6.0版本就已經(jīng)測(cè)試通過。
1、安裝延時(shí)消息插件
該延時(shí)消息插件為社區(qū)插件,因此需要自己手動(dòng)下載安裝的RabbMQ版本對(duì)應(yīng)的插件,下載地址:RabbitMQ延時(shí)消息插件releases。
我安裝的RabbitMQ版本為3.9.9
,3.9.0版本的插件對(duì)所有3.9.x
版本的RabbitMQ都支持。
下載完后把.ez
結(jié)尾的插件復(fù)制RabbitMQ的插件目錄下,插件目錄為/usr/lib/rabbitmq/plugins
。
通過命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安裝該插件,通過命令rabbitmq-plugins list
查看插件列表,可以看到該延時(shí)消息插件已經(jīng)成功安裝。
2、代碼示例
(1) 延時(shí)消息交換機(jī)配置
@Configuration
protected static class DelayedMsgExchangePluginConfig {
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable("delayed-queue").build();
}
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();
}
@Bean
public Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");
}
}
備注:延時(shí)交換機(jī)的類型可以為DirectExchage、TopicExcahge和FanoutExchange,這些都支持。
(2) 消息生產(chǎn)者
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
public void sendDelayedMsg(String body, int timeoutInMillSeconds) {
log.info("開始發(fā)送消息到delayed-exchange 消息體:{}, 消息延遲:{}ms, 當(dāng)前時(shí)間:{}", body, timeoutInMillSeconds, LocalDateTime.now());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDelay(timeoutInMillSeconds);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);
}
}
(3) 消息消費(fèi)者
@Slf4j
@Component
public class RabbitMqConsumer {
@RabbitListener(queues = "delayed-queue")
public void handleMsgFromDelayedQueue(String msg) {
log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());
}
}
3、測(cè)試用例
@RestController
@RequiredArgsConstructor
public class RabbitMsgController {
private final RabbitMqProducer rabbitMqProducer;
@RequestMapping("/exchange/delayed")
public ResponseEntity<String> sendMsgToHeadersExchange(String body, int timeout) {
rabbitMqProducer.sendDelayedMsg(body, timeout);
return ResponseEntity.ok("消息發(fā)送到延遲交換機(jī)成功");
}
}
瀏覽器訪問http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000
,可以看到消息被延遲5s處理。文章來源:http://www.zghlxwxcb.cn/news/detail-765301.html
2023-11-26 13:02:07.816 INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到delayed-exchange 消息體:Hello, 消息延遲:5000ms, 當(dāng)前時(shí)間:2023-11-26T13:02:07.816
2023-11-26 13:02:12.830 INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829
四、兩種實(shí)現(xiàn)方式優(yōu)缺點(diǎn)
1、延時(shí)消息插件
- 優(yōu)點(diǎn):配置更加簡(jiǎn)單,少配置1個(gè)過期消息接收隊(duì)列,且語(yǔ)義更明確,容易定位消息出入口。
- 缺點(diǎn):延時(shí)消息插件對(duì)RabbitMQ版本有要求,只有RabbitMQ
3.8.x
及以上版本支持。
2、TLL&死信交換機(jī)
- 優(yōu)點(diǎn):基本適用于所有RabbitMQ版本。
- 缺點(diǎn):配置相對(duì)來說復(fù)雜一些,還有就是我們最開始提到的,不只是TTL過期的消息才會(huì)進(jìn)入死信隊(duì)列,還有
超出隊(duì)列限制
和nack
的消息也會(huì)進(jìn)入死信隊(duì)列,觸發(fā)的條件沒那么純粹。
文章來源地址http://www.zghlxwxcb.cn/news/detail-765301.html
到了這里,關(guān)于Spring RabbitMQ那些事(2-兩種方式實(shí)現(xiàn)延時(shí)消息訂閱)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!