一、基本概念
1、死信定義
? ? ? ? 指的是,從隊列當(dāng)中取出來的消息,到達(dá)消費方后,因為某些原因?qū)е孪⒉]有被正常消費掉,這些沒有被后續(xù)處理的消息就是“死信”,而保存死信的隊列,就是死信隊列。
2、死信出現(xiàn)的場景舉例
? ? ? ? 為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用死信隊列機制,在消息消費發(fā)生異常的時候,將消息給投入到死信隊列當(dāng)中;
? ? ? ? ? 用戶在商城下單成功并點進(jìn)去準(zhǔn)備支付,超過指定時間未支付時,消息自動失效成為死信(消息超時情況);
3、死信的來源
? ? ? ? 消息TTL過期;
? ? ? ? 隊列已經(jīng)到達(dá)最大長度,數(shù)據(jù)無法再添加到MQ;
? ? ? ? 消息被拒絕了;
4、死信架構(gòu)圖
分析:
對于消息生產(chǎn)者而言,只需要關(guān)注將消息發(fā)送給交換機即可;
而對于普通消費者C1而言,需要關(guān)注普通交換機、普通隊列、死信交換機的相關(guān)信息,要做兩次綁定操作(普通交換機和普通隊列綁定,普通隊列和死信隊列綁定),難點在于---普通隊列怎么與死信交換機進(jìn)行綁定?
而消費者C2也是一個普通消費者,專注于死信隊列當(dāng)中消息的處理,需要關(guān)注死信交換機、死信隊列的信息,在死信交換機和死信隊列綁定之后,從隊列當(dāng)中拿到死信進(jìn)行處理;
二、代碼部分
(0)提前準(zhǔn)備工具類
封裝獲取MQ的connection方法,以及釋放資源的方法
public class AMQPUtils {
//用于獲取客戶端和MQ綁定的連接對象
public static Connection getConnection() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/test");//用于隔離資源的虛擬主機
factory.setHost("MQServer的ip地址");
factory.setPort(5672);
factory.setUsername("zhangsan");
factory.setPassword("1234");
Connection connection = factory.newConnection();
return connection;
}
//釋放資源
public static void close(Channel channel,Connection connection) throws Exception{
channel.close();
connection.close();
}
}
(一)模擬消息超時情況
1、消息發(fā)布者
public class Provider {
public static void main(String[] args) throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個普通交換機
String normalExchange = "normal_Exchange";
channel.exchangeDeclare(normalExchange,"direct");
String key = "zhangsan";
//設(shè)置消息的ttl時間為5s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
//發(fā)布消息-發(fā)布多條消息驗證
for (int i = 1; i <= 10 ; i++) {
channel.basicPublish(normalExchange,key,properties,("message---->"+i).getBytes());
}
//釋放資源
AMQPUtils.close(channel,connection);
}
}
2、消費者C1
代碼:
public class Consumer1 {
public static void main(String[] args) throws Exception{
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//提前準(zhǔn)備一些名字
String normalExchange = "normal_exchange";
String deadExchange = "dead_exchange";
String normalQueue = "normal_queue";
String deadQueue = "dead_queue";
String normal_key = "zhangsan";
String dead_key = "lisi";
//聲明普通交換機
channel.exchangeDeclare(normalExchange,"direct");
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",deadExchange);
params.put("x-dead-letter-routing-key",dead_key);
//聲明普通隊列
channel.queueDeclare(normalQueue,false,false,false,params);
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//binding
//將普通交換機和普通隊列綁定
channel.queueBind(normalQueue,normalExchange,normal_key);
//將將死信交換機和死信隊列綁定
channel.queueBind(deadQueue,deadExchange,dead_key);
//消費消息
channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C1消費的消息是---->"+new String(body));
}
});
}
}
注意:開啟C1接受普通交換機的消息之后,關(guān)閉C1,讓普通隊列當(dāng)中的消息超過超時時間,成為死信,后被死信交換機路由進(jìn)入dead_queue當(dāng)中,下圖所示的就是消息超時之后進(jìn)入死信隊列:
點擊dead_queue可以查看具體的死信來源、交換機、路由key等信息;
此時:注意此時死信消息是保存在MQ當(dāng)中的;
3、消費者c2
消費者C2要去消費死信隊列當(dāng)中的消息:
public class Consumer2 {
public static void main(String[] args)throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//準(zhǔn)備一些名字
String deadExchange = "dead_exchange";
String deadQueue = "dead_queue";
String key = "lisi";
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//交換機和隊列綁定
channel.queueBind(deadQueue,deadExchange,key);
//消費死信消息
channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C2消費了死信----->"+new String(body));
}
});
}
}
控制臺結(jié)果:
此時存儲在死信隊列當(dāng)中的消息已經(jīng)被C2消費了!
(二)模擬隊列達(dá)到最大長度
請?zhí)崆霸贛Q的控制臺上,將情況1當(dāng)中設(shè)置的隊列給刪除;
1、消息發(fā)布者
public class Provider {
public static void main(String[] args) throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個普通交換機
String normalExchange = "normal_exchange";
channel.exchangeDeclare(normalExchange,"direct");
String key = "zhangsan";
//發(fā)布消息-發(fā)布多條消息驗證
for (int i = 1; i <= 10 ; i++) {
channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
}
//釋放資源
AMQPUtils.close(channel,connection);
}
}
2、消費者C1
public class Consumer1 {
public static void main(String[] args) throws Exception{
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//提前準(zhǔn)備一些名字
String normalExchange = "normal_exchange";
String deadExchange = "dead_exchange";
String normalQueue = "normal_queue";
String deadQueue = "dead_queue";
String normal_key = "zhangsan";
String dead_key = "lisi";
//聲明普通交換機
channel.exchangeDeclare(normalExchange,"direct");
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",deadExchange);
params.put("x-dead-letter-routing-key",dead_key);
//設(shè)置普通隊列的最大長度
params.put("x-max-length",6);
//聲明普通隊列
channel.queueDeclare(normalQueue,false,false,false,params);
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//binding
//將普通交換機和普通隊列綁定
channel.queueBind(normalQueue,normalExchange,normal_key);
//將將死信交換機和死信隊列綁定
channel.queueBind(deadQueue,deadExchange,dead_key);
//消費消息
channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C1消費的消息是---->"+new String(body));
}
});
}
}
開啟C1,然后關(guān)閉,再啟動消息生產(chǎn)者,結(jié)果:
3、消費者C2
public class Consumer2 {
public static void main(String[] args)throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//準(zhǔn)備一些名字
String deadExchange = "dead_exchange";
String deadQueue = "dead_queue";
String key = "lisi";
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//交換機和隊列綁定
channel.queueBind(deadQueue,deadExchange,key);
//消費死信消息
channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C2消費了死信----->"+new String(body));
}
});
}
}
結(jié)果:
死信隊列當(dāng)中的消息已經(jīng)被消費了;
(三)模擬消息被拒絕
1、消息發(fā)布者
public class Provider {
public static void main(String[] args) throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個普通交換機
String normalExchange = "normal_exchange";
channel.exchangeDeclare(normalExchange,"direct");
String key = "zhangsan";
//發(fā)布消息-發(fā)布多條消息驗證
for (int i = 1; i <= 10 ; i++) {
channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
}
//釋放資源
AMQPUtils.close(channel,connection);
}
}
2、消費者C1
public class Consumer1 {
public static void main(String[] args) throws Exception{
Connection connection = AMQPUtils.getConnection();
final Channel channel = connection.createChannel();
//提前準(zhǔn)備一些名字
String normalExchange = "normal_exchange";
String deadExchange = "dead_exchange";
String normalQueue = "normal_queue";
String deadQueue = "dead_queue";
String normal_key = "zhangsan";
String dead_key = "lisi";
//聲明普通交換機
channel.exchangeDeclare(normalExchange,"direct");
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//設(shè)置普通隊列當(dāng)中需要攜帶的其他信息(死信交換機、死信隊列、路由key)
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",deadExchange);
params.put("x-dead-letter-routing-key",dead_key);
//聲明普通隊列
channel.queueDeclare(normalQueue,false,false,false,params);
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//binding
//將普通交換機和普通隊列綁定
channel.queueBind(normalQueue,normalExchange,normal_key);
//將將死信交換機和死信隊列綁定
channel.queueBind(deadQueue,deadExchange,dead_key);
//消費消息--注意關(guān)閉自動確認(rèn)
channel.basicConsume(normalQueue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("C1消費的消息是---->"+new String(body));*/
//模擬消息被拒絕--把所有消息都拒絕
channel.basicReject(envelope.getDeliveryTag(),false);
}
});
}
}
注意:先啟動C1,然后關(guān)閉,啟動消息發(fā)布者,結(jié)果如下:
這10條消息目前都保存在MQ當(dāng)中,然后再啟動C1,把消息全部拒絕掉,讓它們成為死信:
點擊dead_queue,去查看死信隊列當(dāng)中的一些信息:
3、消費者C2
public class Consumer2 {
public static void main(String[] args)throws Exception {
Connection connection = AMQPUtils.getConnection();
Channel channel = connection.createChannel();
//準(zhǔn)備一些名字
String deadExchange = "dead_exchange";
String deadQueue = "dead_queue";
String key = "lisi";
//聲明死信交換機
channel.exchangeDeclare(deadExchange,"direct");
//聲明死信隊列
channel.queueDeclare(deadQueue,false,false,false,null);
//交換機和隊列綁定
channel.queueBind(deadQueue,deadExchange,key);
//消費死信消息
channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C2消費了死信----->"+new String(body));
}
});
}
}
啟動消費者C2,消費死信隊列當(dāng)中的消息!
三、小結(jié)
????????死信隊列的出現(xiàn),是為了保存因為特殊原因無法被消費的消息,避免消息直接失效!這些消息通過rabbitMQ的死信隊列機制,可以保存在MQ服務(wù)的死信隊列當(dāng)中,等待被其他的消費者進(jìn)行處理!
?需要注意的是:
? ? ? ?只有針對消息的設(shè)置會放在消息發(fā)布方進(jìn)行,隊列等操作,因為發(fā)布方無法自己決定消息被路由到哪個隊列,只能決定把消息交給哪個交換機,以及給定路由規(guī)則;
? ? ? 對于消息消費方而言,需要確定交換機、消息隊列,已經(jīng)完成?交換機和隊列的綁定操作,所以針對于隊列的設(shè)置都是放在消費方完成的!文章來源:http://www.zghlxwxcb.cn/news/detail-788056.html
????????文章來源地址http://www.zghlxwxcb.cn/news/detail-788056.html
到了這里,關(guān)于rabbitMQ引入死信隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!