死信隊(duì)列(DLX)
這個(gè)概念 在其他MQ產(chǎn)品里面也是有的,只不過(guò)在Rabbitmq中稍微特殊一點(diǎn)
什么叫私信隊(duì)列呢? 就是當(dāng)消息成為 dead message之后,可以重新發(fā)到另外一臺(tái)交換機(jī),這個(gè)交換機(jī)就是DLX。
注意這里的有翻譯歧義, 這里的DLX 指的是 交換機(jī) ,而不是一個(gè)隊(duì)列。
消息成為死信隊(duì)列的三種情況
-
隊(duì)列的消息長(zhǎng)度 到達(dá)限制。
-
消費(fèi)者拒收消息,
channel.basicNack(deliveryTag,true,false);
//這里拒收消息
/**
* deliveryTag 標(biāo)識(shí)
* requeue 是否打回原隊(duì)列如果為false 則進(jìn)入死信隊(duì)列
*/
channel.basicReject(deliveryTag,false);
}
- 存在消息過(guò)期設(shè)置 消息超時(shí)未消費(fèi)(就是上一篇中的TTL)
我們拿這個(gè)TTL舉例:
首先你在發(fā)送端 需要有2個(gè)隊(duì)列 一個(gè)正常隊(duì)列 一個(gè)私信隊(duì)列
然后你在創(chuàng)建延時(shí)隊(duì)列的時(shí)候 要注明 這個(gè)隊(duì)列的消息一旦超時(shí) 那么送到哪個(gè)死信交換機(jī)的隊(duì)列里面去:
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME_TTL = "boot_queue_ttl";
public static final String DEAD_EXCHANGE_NAME = "dead_topic_exchange";
public static final String DEAD_QUEUE_NAME = "dead_queue";
//正常交換機(jī)
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("bootQueue")
public Queue bootQueue(){
Map<String, Object> map = new HashMap<>();
// 設(shè)置TTL
map.put("x-message-ttl", 20000);
// 設(shè)置死信的目的交換機(jī)
map.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 設(shè)置死信交給目的交換機(jī)時(shí)的路由鍵
map.put("x-dead-letter-routing-key", "boot.111");
return QueueBuilder.durable(QUEUE_NAME_TTL).withArguments(map).build();
}
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
//死信交換機(jī)
@Bean("deadExchange")
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build();
}
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding bindDeadQueueExchange(@Qualifier("deadQueue") Queue queue,@Qualifier("deadExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.*").noargs();
}
}
好了 我們發(fā)送消息 往正常隊(duì)列里面發(fā):
看超時(shí)之后 它進(jìn)來(lái)了:
·這是超時(shí)的消息進(jìn)入DLX的情況
再動(dòng)動(dòng)手 我們實(shí)踐一下 消費(fèi)者拒收之后 進(jìn)入死信的情況:
我們監(jiān)聽(tīng)正常的隊(duì)列 然后接受到消息之后
我們故意寫(xiě)一個(gè)bug 觸發(fā)exception
調(diào)用basicNack方法 打回這個(gè)消息 讓它 進(jìn)入dlx
這里千萬(wàn)要注意:basicNack第三個(gè)參數(shù)要為false 才會(huì)進(jìn)入dlx
如果為true 那它就會(huì)打回到正常隊(duì)列 然后不停的給你發(fā)
@RabbitListener(queues = "boot_queue_ttl")
public void ListenerQueue(Message message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) throws IOException {
try {
System.out.println("收到消息為"+new String(message.getBody()));
System.out.println("處理業(yè)務(wù)邏輯。。");
// int i =3/0; //手動(dòng)制造一個(gè)bug 讓程序進(jìn)入異常
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//拒絕簽收
//這里的第三個(gè)參數(shù)表示 如果為true 消息重回到queue broker會(huì)重新發(fā)送該消息給消費(fèi)端
System.out.println(" error 處理業(yè)務(wù)邏輯失敗 消息打回 進(jìn)入私信隊(duì)列");
channel.basicNack(deliveryTag,true,false);
}
ok我們實(shí)踐了兩種 第三種 超過(guò)隊(duì)列長(zhǎng)度的很簡(jiǎn)單 你設(shè)一個(gè)隊(duì)列長(zhǎng)度max=10 然后for循環(huán)往里面塞進(jìn)去>10 超過(guò)的就會(huì)進(jìn)入dlx 這種很簡(jiǎn)單
有興趣的好兄弟 可以自己去試一下啦
新手一定要多動(dòng)手
延遲隊(duì)列
什么是延遲隊(duì)列 就是交換機(jī)收到消息之后 它不急著發(fā) 它等。
等到你想要它發(fā)給消費(fèi)者方的時(shí)候 你再發(fā)。
假設(shè)你正在開(kāi)發(fā)一個(gè)電子商務(wù)網(wǎng)站,并且你希望為用戶提供一個(gè)訂單確認(rèn)后一段時(shí)間自動(dòng)取消的功能。當(dāng)用戶下單后,訂單將被發(fā)送到延遲隊(duì)列中,設(shè)置一個(gè)特定的延遲時(shí)間,例如30分鐘。如果在30分鐘內(nèi)用戶未支付訂單,系統(tǒng)將自動(dòng)取消該訂單。
但是問(wèn)題來(lái)了 rabbitmq 中并沒(méi)有提供現(xiàn)成的延遲隊(duì)列實(shí)現(xiàn),
需要我們自己去實(shí)現(xiàn):
怎么實(shí)現(xiàn)呢 如果你理解熟悉了 前面的TTL 和死信隊(duì)列之后
這個(gè)就非常非常簡(jiǎn)單了
看上面那個(gè)我們死信隊(duì)列里面那個(gè)例子:
發(fā)送端向正常隊(duì)列里面發(fā)數(shù)據(jù) 超時(shí)未消費(fèi)。 消息就進(jìn)入了死信隊(duì)列,
那就很簡(jiǎn)單了 你把消費(fèi)的監(jiān)聽(tīng)隊(duì)列 換成死信隊(duì)列 就ok了文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-448297.html
他就是一個(gè)延遲隊(duì)列了文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-448297.html
到了這里,關(guān)于RabbitMQ養(yǎng)成記 (10.高級(jí)特性:死信隊(duì)列,延遲隊(duì)列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!