???????????????????????????????????????????????????????????????????? 【 R a b b i t M Q 教 程 】 第 五 章 — — R a b b i t M Q ? 死 信 隊 列 \color{#FF1493}{【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊列} 【RabbitMQ教程】第五章——RabbitMQ?死信隊列?? ?????????
?? 仰望天空,妳我亦是行人.?
?? 個人主頁——微風(fēng)撞見云的博客??
?? 《數(shù)據(jù)結(jié)構(gòu)與算法》專欄的文章圖文并茂??生動形象??簡單易學(xué)!歡迎大家來踩踩~??
?? 《Java學(xué)習(xí)筆記》專欄的文章是本人在Java學(xué)習(xí)中總結(jié)的一些知識點~ ??
?? 《每天一點小知識》專欄的文章可以豐富你的知識庫,滴水成河~ ??
?? 《RabbitMQ》專欄的文章是在學(xué)習(xí)尚硅谷課程時整理的筆記,方便復(fù)習(xí)鞏固~ ??
?? 希望本文能夠給讀者帶來一定的幫助~??文章粗淺,敬請批評指正!??
??RabbitMQ - 死信隊列
死信的概念
先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的消息,字面意思可以這樣理 解,一般來說,producer 將消息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出消息 進(jìn)行消費,但某些時候由于特定的原因?qū)е?queue 中的某些消息無法被消費,這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。
應(yīng)用場景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊列機制,當(dāng)消息消費發(fā)生異常時,將消息投入死信隊列中。還有比如說:用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
#死信的來源
死信的來源
-
消息 TTL 過期
TTL是Time To Live的縮寫, 也就是生存時間
-
隊列達(dá)到最大長度
隊列滿了,無法再添加數(shù)據(jù)到 mq 中
-
消息被拒絕
(basic.reject 或 basic.nack) 并且 requeue=false.
死信實戰(zhàn)
死信之 T T l
消費者 C1 代碼:
/**
* 死信隊列 - 消費者01
*/
public class Consumer01 {
//普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明死信和普通交換機 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定:隊列、交換機、路由鍵(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設(shè)置死信交換機 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
//正常隊列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
生產(chǎn)者代碼
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設(shè)置消息的 TTL 時間 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//該信息是用作演示隊列個數(shù)限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息:" + message);
}
}
}
啟動 C1 ,之后關(guān)閉消費者,模擬其接收不到消息。再啟動 Producer
消費者 C2 代碼:
以上步驟完成后,啟動 C2 消費者,它消費死信隊列里面的消息
public class Consumer02 {
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
死信之最大長度
1、消息生產(chǎn)者代碼去掉 TTL 屬性
2、C1 消費者修改以下代碼**(啟動之后關(guān)閉該消費者 模擬其接收不到消息)**
//設(shè)置正常隊列的長度限制,例如發(fā)10個,4個則為死信
params.put("x-max-length",6);
注意此時需要把原先隊列刪除 因為參數(shù)改變了
3、C2 消費者代碼不變(啟動 C2 消費者)
死信之消息被拒
1、消息生產(chǎn)者代碼同上生產(chǎn)者一致
2、C1 消費者代碼(啟動之后關(guān)閉該消費者 模擬其接收不到消息)
拒收消息 “info5”
public class Consumer01 {
//普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明死信和普通交換機 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定:隊列、交換機、路由鍵(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設(shè)置死信交換機 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// //設(shè)置正常隊列的長度限制,例如發(fā)10個,4個則為死信
// params.put("x-max-length",6);
//正常隊列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) {
System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");
//requeue 設(shè)置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發(fā)送到死信隊列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//開啟手動應(yīng)答
channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {
});
}
}
3、C2 消費者代碼不變
啟動消費者 1 然后再啟動消費者 2
??結(jié)語
??初學(xué)一門技術(shù)時,總有些許的疑惑,別怕,它們是我們學(xué)習(xí)路上的點點繁星,幫助我們不斷成長。
??文章粗淺,希望對大家有幫助!文章來源:http://www.zghlxwxcb.cn/news/detail-487167.html
??下一篇 -->【RabbitMQ教程】第四章 —— RabbitMQ - 延遲隊列文章來源地址http://www.zghlxwxcb.cn/news/detail-487167.html
到了這里,關(guān)于【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!