RabbitMQ——死信隊(duì)列
死信隊(duì)列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一種重要特性,用于處理無(wú)法被消費(fèi)的消息,防止消息丟失。
死信的來(lái)源
在消息隊(duì)列中,當(dāng)消息滿足一定條件而無(wú)法被正常消費(fèi)時(shí),這些消息會(huì)被發(fā)送到死信隊(duì)列。滿足條件的情況包括但不限于:
- 消息被拒絕(
basic.reject
或basic.nack
)且不重新入隊(duì)(requeue
參數(shù)為false
)。 - 消息過(guò)期(TTL,Time-To-Live)。
- 隊(duì)列長(zhǎng)度超過(guò)限制,無(wú)法再添加數(shù)據(jù)到mq中。
生產(chǎn)者
package com.weipch.rabbitmq.dlq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;
/**
* @Author 方唐鏡
* @Create 2024-03-03 14:08
* @Description
*/
public class Produce {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//模擬消息過(guò)期 10s
//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "hello world" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());
}
}
}
消費(fèi)者
正常隊(duì)列:
package com.weipch.rabbitmq.dlq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @Author 方唐鏡
* @Create 2024-03-03 13:50
* @Description
*/
public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明死信交換機(jī)和隊(duì)列
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//綁定
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");
//聲明普通交換機(jī)和隊(duì)列
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//在正常隊(duì)列中設(shè)置死信參數(shù) 指定死信交換機(jī)和死信路由鍵
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "dead-routing-key");
//最大長(zhǎng)度
//map.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
if (message.contains("5")){
System.out.println("Consumer01接收消息:" + message + ",此消息被拒絕");
//拒絕消息并把消息丟入死信隊(duì)列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接收消息:" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});
}
}
死信隊(duì)列:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-839926.html
package com.weipch.rabbitmq.dlq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @Author 方唐鏡
* @Create 2024-03-03 13:50
* @Description
*/
public class Consumer02 {
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.basicConsume(DEAD_QUEUE, true,
(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),
(consumerTag, e) -> {});
}
}
生產(chǎn)者發(fā)送消息到正常隊(duì)列,而消費(fèi)者負(fù)責(zé)消費(fèi)正常隊(duì)列的消息。當(dāng)消息被消費(fèi)者拒絕并不再重新投遞時(shí),消息會(huì)被發(fā)送到死信隊(duì)列。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-839926.html
到了這里,關(guān)于RabbitMQ——死信隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!