問題:什么是延遲隊(duì)列
我們常說的延遲隊(duì)列是指消息進(jìn)入隊(duì)列后不會(huì)被立即消費(fèi),只有達(dá)到指定時(shí)間后才能被消費(fèi)。
但RabbitMq中并沒有提供延遲隊(duì)列功能。那么RabbitMQ如何實(shí)現(xiàn)延遲隊(duì)列
通過:死信隊(duì)列 + RabbitMQ的TTL特性實(shí)現(xiàn)。
實(shí)現(xiàn)原理
給一個(gè)普通帶有過期功能的隊(duì)列綁定一個(gè)死信隊(duì)列,消息先進(jìn)延時(shí)隊(duì)列,過期了后消息進(jìn)入死信隊(duì)列,死信隊(duì)列的消息會(huì)轉(zhuǎn)發(fā)到對(duì)應(yīng)的queue里面,我們只需要消費(fèi)死信的queue里面的消息就可以了。
一、TTL特性說明
TTL就是消息或者隊(duì)列的過期功能。當(dāng)消息過期就會(huì)進(jìn)到死信隊(duì)列,死信隊(duì)列和普通隊(duì)列沒啥區(qū)別,然后我們只需要配置一個(gè)消費(fèi)者來消費(fèi)死信隊(duì)列里面的消息就可以了。
- 如果不設(shè)置x-message-ttl,則表示消息不會(huì)過期
- 如果x-message-ttl設(shè)置為0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。
- 如果x-message-ttl設(shè)置不為0,表明消息或隊(duì)列中所有消息的最大存活時(shí)間(過期時(shí)間),單位是毫秒。
需要注意: RabbitMQ只會(huì)對(duì)隊(duì)列頭部的消息進(jìn)行過期淘汰,消息是否過期是在即將投遞消息到消費(fèi)者之前判定的,如果隊(duì)列出現(xiàn)消息堆積情況,則已過期的消息還是會(huì)繼續(xù)存活的
比如過期時(shí)間設(shè)置在消息上,由于消息進(jìn)隊(duì)列是先進(jìn)先出,假設(shè)先進(jìn)去的消息過期時(shí)間長(zhǎng),后進(jìn)的消息過期時(shí)間短,先進(jìn)來的消息還沒過期,后面進(jìn)來的消息就算到了過期時(shí)間,消息也不會(huì)過期也就不會(huì)從隊(duì)列中剔除,從而導(dǎo)致隊(duì)列里面的消息積壓。
二、死信隊(duì)列
簡(jiǎn)單理解就是要被丟棄的消息才會(huì)放到死信隊(duì)列。
1、消息什么時(shí)候變?yōu)樗佬?/p>
1、消息被否定接收,消費(fèi)者使用basic.reject 或者 basic.nack并且requeue重回隊(duì)列屬性設(shè)為false
2、消息在隊(duì)列里的時(shí)間超過了該消息設(shè)置的過期時(shí)間(TTL)
3、消息隊(duì)列到達(dá)了它的最大長(zhǎng)度,之后再收到的消息。
2、死信隊(duì)列的原理
當(dāng)一個(gè)消息在隊(duì)列里變?yōu)樗佬牛鼤?huì)被重新publish到綁定的死信隊(duì)列所對(duì)應(yīng)的exchange交換機(jī)上,這個(gè)exchange就為DLX。因此我們只需要在聲明正常的業(yè)務(wù)隊(duì)列時(shí)添加一個(gè)可選的"x-dead-letter-exchange"參數(shù),值為死信交換機(jī),死信就會(huì)被rabbitmq重新publish到配置的這個(gè)交換機(jī)上,我們接著監(jiān)聽這個(gè)死信交換機(jī)所綁定隊(duì)列就可以了。
下面看個(gè)案例:
正常情況,我們配置交換機(jī)-A,會(huì)為其綁定路由鍵-A,和路由鍵-A所映射的隊(duì)列-A,并設(shè)置這個(gè)隊(duì)列-A的死信交換機(jī)-B、死信路由鍵-B。 消息消費(fèi)者-A,監(jiān)聽隊(duì)列-A,基本上就沒死信交換機(jī)什么事了,特殊時(shí)候隊(duì)列滿了,或者接收后給的回執(zhí)是false才會(huì)進(jìn)到死信交換機(jī)-B。現(xiàn)在我們把消費(fèi)者-A刪了,并且設(shè)置隊(duì)列-A里面消息過期時(shí)間,那么所有進(jìn)到隊(duì)列A里面消息就自然過期,然后被推給死信交換機(jī),那么監(jiān)聽死信交換機(jī)-B的消息者-B,就可以得到過期后的消息了,就實(shí)現(xiàn)了延時(shí)功能。
3、死信交換機(jī)
死信交換機(jī)DLX實(shí)際上也是普通的交換機(jī),說白的就是將死信(過期消息)路由到死信隊(duì)列的交換機(jī)。
4、死信隊(duì)列
死信隊(duì)列DLQ(Dead Letter Queue)實(shí)際上也是普通的隊(duì)列,只不過它存儲(chǔ)的是死信交換機(jī)路由過來的死信(過期消息)
三、代碼
1、給隊(duì)列綁定死信隊(duì)列
Map<String, Object> args = new HashMap<>(2);
args.put(x-dead-letter-exchange,死信隊(duì)列交換機(jī))
args.put(x-dead-letter-routing-key,死信routeKey)
args.put(x-message-ttl,隊(duì)列里面的消息最大存活時(shí)間)
//申明一個(gè)普通隊(duì)列,并且給這個(gè)普通隊(duì)里綁定一個(gè)死信隊(duì)列。當(dāng)消息過期就會(huì)轉(zhuǎn)到死信隊(duì)列里面去。
Queue queue = QueueBuilder.durable(queue).withArguments(args).build();
2、設(shè)置過期時(shí)間
目前有兩種方式可以設(shè)置消息的TTL。第一種是通過隊(duì)列的屬性設(shè)置,隊(duì)列中的所有消息都有相同的過期時(shí)間。第二種方法是對(duì)消息本身進(jìn)行單獨(dú)設(shè)置,每條消息的TTL可以不同。如果兩種方法同時(shí)設(shè)置,則TTL以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列中的生存時(shí)間一旦超過設(shè)置的TTL值時(shí)就會(huì)進(jìn)到死信。文章來源:http://www.zghlxwxcb.cn/news/detail-611951.html
1、通過隊(duì)列屬性設(shè)置消息過期時(shí)間
Map<String, Object> arguments = Maps.newHashMap();
//設(shè)置消息發(fā)送到隊(duì)列中在被丟棄之前可以存活的時(shí)間,單位:毫秒
arguments.put("x-message-ttl", 5000);
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
2、發(fā)送消息的時(shí)候,對(duì)消息本身設(shè)定過期時(shí)間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//設(shè)置消息多久沒有被消費(fèi)在過期。
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
上面底層代碼是:文章來源地址http://www.zghlxwxcb.cn/news/detail-611951.html
AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder();
properties.expiration("5000");//設(shè)置消息多久沒有被消費(fèi)在過期。
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties.build(), message.getBytes());
到了這里,關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列實(shí)現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!