一、死信交換機
當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter
):
- 消費者使用
basic.reject
或basic.nack
聲明消費失敗,并且消息的requeue
參數(shù)設(shè)置為false
- 消息是一個過期消息,超時無人消費
- 要投遞的隊列消息堆積滿了,最早的消息可能成為死信
如果該隊列配置了dead-letter-exchange
屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而
這個交換機稱為死信交換機(Dead Letter Exchange
,簡稱DLX
)。
二、TTL
如果
message
和queue
都有ttl
,采用更小的一方。
1. Queue指定死信交換機并設(shè)置TTL
@Configuration
public class CommonConfig {
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
2. 消息設(shè)置TTL
@Test
public void testTTLMessage(){
Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")
.build();
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
log.info("ttl消息已發(fā)送");
}
借助TTL
機制可以用死信交換機模擬延遲隊列,但是設(shè)計上比較牽強,性能不好。
三、延遲隊列
這是官方提供的一些額外插件
https://www.rabbitmq.com/community-plugins.html
下載其中的DelayExchange
插件,把.ez
文件掛載到RabbitMQ
容器的/plugins
目錄下,然后進入容器,執(zhí)行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@7c4ba266e5bc...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
1. SpringAMQP創(chuàng)建延遲隊列
基于@RabbitListener
或者基于@Bean
都可以。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg){
log.info("消費者接收到delay.queue的延遲消息:【" + msg + "】");
}
2. 設(shè)置消息延遲
這個插件只能在消息上設(shè)置延遲時間,沒有隊列設(shè)置延遲時間的概念,不過都是一樣的。message
要在Header
上添加一個x-delay
。
@Test
public void testDelayMessage(){
Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// confirm callback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("發(fā)送消息成功");
}
3. 測試
直接運行測試,可能會報錯,因為rabbitmq
意識到消息到了exchange
卻沒有立即到queue
,被認為錯誤,回調(diào)returnback
,所以我們在ReturnCallBack
中繞過這個限制。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
//check if is delay message
if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {
return;
}
log.error("消息發(fā)送到queue失敗,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}
運行Test
測試,可以看到Test
方面,消息發(fā)送的時間為21:09:13
文章來源:http://www.zghlxwxcb.cn/news/detail-800489.html
21:09:13:516 INFO 25468 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470]
21:09:13:557 INFO 25468 --- [ main] cn.itcast.mq.spring.SpringAmqpTest : 發(fā)送消息成功
listener
方面消息消費的時間為21:09:18
,剛好5s。文章來源地址http://www.zghlxwxcb.cn/news/detail-800489.html
21:08:31:952 INFO 19532 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583 INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消費者接收到delay.queue的延遲消息:【hello delay】
到了這里,關(guān)于RabbitMQ常見問題之延遲消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!