一、RabbitMQ消息可靠性概述
1、引出問題
我們先看一串代碼,并思考一下為什么要先入庫然后發(fā)MQ:
public int add(Merchant merchant) {
int k = merchantMapper.add(merchant);
System.out.println("aaa : "+merchant.getId());
JSONObject title = new JSONObject();
String jsonBody = JSONObject.toJSONString(merchant);
title.put("type","add");
title.put("desc","新增商戶");
title.put("content",jsonBody);
rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
return k;
}
如果先發(fā)MQ的話,如果入庫失敗,就會(huì)導(dǎo)致MQ消息無法回滾了。今天我們就好好聊一聊RabbitMQ消息可靠投遞的問題。
2、RabbitMQ消息可靠性保證的四個(gè)環(huán)節(jié)
① 消息從生產(chǎn)者發(fā)送到Broker
生產(chǎn)者把消息發(fā)送到Broker之后,如何知道自己的消息有沒有被Broker成功接收?如果Broker不給應(yīng)答,生產(chǎn)者發(fā)送的消息也無法知道成功還是失敗。
② 消息從Exchange路由到Queue
Exchange交換機(jī)維護(hù)一個(gè)與Queue的綁定列表,它的職責(zé)是分發(fā)消息。如果交換機(jī)處理消息的分發(fā)出現(xiàn)問題怎么辦?
③ 消息存儲(chǔ)在Queue
RabbitMQ的隊(duì)列有自己的數(shù)據(jù)庫(Mnesia),它是真正用來存儲(chǔ)消息的。如果還沒有消費(fèi)者來消費(fèi),消息要一直存儲(chǔ)在隊(duì)列中。隊(duì)列中的消息有沒有丟失的可能?怎么保證消息在隊(duì)列中穩(wěn)定的存儲(chǔ)呢?
④ 消費(fèi)者訂閱Queue并消費(fèi)消息
隊(duì)列是先進(jìn)先出的,消息投遞給消費(fèi)者是一條一條投遞的,如何能保證消費(fèi)者正確地消費(fèi)了消息?
二、保證生產(chǎn)者消息發(fā)送到RabbitMQ服務(wù)器
生產(chǎn)者發(fā)送RabbitMQ消息時(shí),如果遇到了網(wǎng)絡(luò)問題或者Broker的問題(硬盤故障、磁盤寫滿等),就會(huì)導(dǎo)致消息發(fā)送失敗,生產(chǎn)者無法確定Broker是否正確地接收到消息。
在RabbitMQ中提供了兩種生產(chǎn)者確認(rèn)機(jī)制,也就是說生產(chǎn)者發(fā)送消息給RabbitMQ時(shí),服務(wù)端會(huì)通過某種方式返回一個(gè)應(yīng)答,只要生產(chǎn)者收到了這個(gè)應(yīng)答,就知道消息發(fā)送成功了。
1、服務(wù)端確認(rèn):Transaction模式
在事務(wù)模式里面,只有收到了服務(wù)端的Commit-OK指令,才能提交成功。所以可以解決生產(chǎn)者和服務(wù)端確認(rèn)的問題。但是事務(wù)模式有一個(gè)缺點(diǎn),它是阻塞的,一條消息沒有發(fā)送完畢,不能發(fā)送下一條消息,它會(huì)榨干RabbitMQ服務(wù)器的性能。所以不建議在生產(chǎn)環(huán)境使用。
(1)JavaAPI
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生產(chǎn)者,測(cè)試事務(wù)模式。發(fā)送消息的效率比較低,建議使用Confirm模式
* 參考文章:https://www.cnblogs.com/vipstone/p/9350075.html
*/
public class TransactionProducer {
private final static String QUEUE_NAME = "ORIGIN_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ";
// 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 開啟事務(wù)
channel.txSelect();
// 發(fā)送消息,發(fā)布了4條,但只確認(rèn)了3條
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
channel.txCommit(); // 提交
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
channel.txCommit();
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
channel.txCommit();
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
int i =1/0;
channel.txCommit();
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
channel.txCommit();
System.out.println("消息發(fā)送成功");
} catch (Exception e) {
channel.txRollback(); // 回滾
System.out.println("消息已經(jīng)回滾");
}
channel.close();
conn.close();
}
}
(2)springbootAPI
rabbitTemplate.setChannelTransacted(true);
2、服務(wù)端確認(rèn):Confirm模式
Confirm模式有三種,單個(gè)確認(rèn)、批量確認(rèn)、異步確認(rèn)。
(1)JavaAPI
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 普通確認(rèn)模式
*/
public class NormalConfirmProducer {
private final static String QUEUE_NAME = "ORIGIN_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ ,Normal Confirm";
// 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 開啟發(fā)送方確認(rèn)模式
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 普通Confirm,發(fā)送一條,確認(rèn)一條
if (channel.waitForConfirms()) {
System.out.println("消息發(fā)送成功" );
}else{
System.out.println("消息發(fā)送失敗");
}
channel.close();
conn.close();
}
}
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生產(chǎn)者,測(cè)試批量Confirm模式
*/
public class BatchConfirmProducer {
private final static String QUEUE_NAME = "ORIGIN_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ ,Batch Confirm";
// 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 開啟confirm模式
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
// 發(fā)送消息
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
}
// 批量確認(rèn)結(jié)果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被確認(rèn)了
// 比如5條消息可能只收到1個(gè)ACK,也可能收到2個(gè)(抓包才看得到)
// 直到所有信息都發(fā)布,只要有一個(gè)未被Broker確認(rèn)就會(huì)IOException
channel.waitForConfirmsOrDie();
System.out.println("消息發(fā)送完畢,批量確認(rèn)成功");
} catch (Exception e) {
// 發(fā)生異常,可能需要對(duì)所有消息進(jìn)行重發(fā)
e.printStackTrace();
}
channel.close();
conn.close();
}
}
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* 消息生產(chǎn)者,測(cè)試異步Confirm模式
* 參考文章:https://www.cnblogs.com/vipstone/p/9350075.html
*/
public class AsyncConfirmProducer {
private final static String QUEUE_NAME = "ORIGIN_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ, Async Confirm";
// 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 用來維護(hù)未確認(rèn)消息的deliveryTag
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 這里不會(huì)打印所有響應(yīng)的ACK;ACK可能有多個(gè),有可能一次確認(rèn)多條,也有可能一次確認(rèn)一條
// 異步監(jiān)聽確認(rèn)和未確認(rèn)的消息
// 如果要重復(fù)運(yùn)行,先停掉之前的生產(chǎn)者,清空隊(duì)列
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Broker未確認(rèn)消息,標(biāo)識(shí):" + deliveryTag);
if (multiple) {
// headSet表示后面參數(shù)之前的所有元素,全部刪除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 這里添加重發(fā)的方法
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 如果true表示批量執(zhí)行了deliveryTag這個(gè)值以前(小于deliveryTag的)的所有消息,如果為false的話表示單條確認(rèn)
System.out.println(String.format("Broker已確認(rèn)消息,標(biāo)識(shí):%d,多個(gè)消息:%b", deliveryTag, multiple));
if (multiple) {
// headSet表示后面參數(shù)之前的所有元素,全部刪除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
// 只移除一個(gè)元素
confirmSet.remove(deliveryTag);
}
System.out.println("未確認(rèn)的消息:"+confirmSet);
}
});
// 開啟發(fā)送方確認(rèn)模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
long nextSeqNo = channel.getNextPublishSeqNo();
// 發(fā)送消息
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
confirmSet.add(nextSeqNo);
}
System.out.println("所有消息:"+confirmSet);
// 這里注釋掉的原因是如果先關(guān)閉了,可能收不到后面的ACK
//channel.close();
//conn.close();
}
}
(2)springbootAPI
Confirm模式是在Channel上開啟的,RabbitTemplate對(duì)Channel進(jìn)行了封裝。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("--------收到服務(wù)端異步確認(rèn)--------");
System.out.println("received ack: "+ack);
System.out.println("cause: "+cause);
System.out.println("correlationId: "+correlationData.getId());
if (!ack) {
System.out.println("發(fā)送消息失?。? + cause);
throw new RuntimeException("發(fā)送異常:" + cause);
// 做一些回滾、重試、記錄處理
} else {
System.out.println("消息確認(rèn)成功");
}
}
});
三、保證消息從交換機(jī)路由到隊(duì)列
如果routingkey錯(cuò)誤,或者隊(duì)列不存在(但是生產(chǎn)環(huán)境基本不會(huì)出現(xiàn)這么低級(jí)的錯(cuò)誤),就會(huì)導(dǎo)致消息從交換機(jī)無法路由到隊(duì)列。
我們有兩種方式處理無法路由的消息,一種是服務(wù)端回發(fā)給生產(chǎn)者,一種是讓交換機(jī)路由到另一個(gè)備份的交換機(jī)。
1、消息回發(fā)
(1)JavaAPI
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("=========監(jiān)聽器收到了無法路由,被返回的消息============");
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("message:"+new String(body));
}
});
(2)SpringbootAPI
Springboot消息回發(fā)是使用mandatory參數(shù)和ReturnListener(在SPringAMQP中是ReturnCallback)
// 為RabbitTemplate設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
System.out.println("--------收到無法路由回發(fā)的消息--------");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + message.getMessageProperties());
System.out.println("body:" + new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
});
2、消息路由到備份交換機(jī)
在創(chuàng)建交換機(jī)時(shí),從屬性中指定備份交換機(jī)。
(1)JavaAPI
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
// 備份交換機(jī)
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");
// 在聲明交換機(jī)的時(shí)候指定備份交換機(jī)
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
// 發(fā)送到了默認(rèn)的交換機(jī)上,由于沒有任何隊(duì)列使用這個(gè)關(guān)鍵字跟交換機(jī)綁定,所以會(huì)被退回
// 第三個(gè)參數(shù)是設(shè)置的mandatory,如果mandatory是false,消息也會(huì)被直接丟棄
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只為更好的你".getBytes());
四、保證消息在隊(duì)列中可靠存儲(chǔ)
如果消息一直沒有被消費(fèi)者消費(fèi),消息會(huì)一直存儲(chǔ)在隊(duì)列中,并且隊(duì)列中的數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)庫中。
如果RabbitMQ服務(wù)或者硬件發(fā)生了故障,比如系統(tǒng)宕機(jī)、重啟、關(guān)閉等等,可能會(huì)導(dǎo)致內(nèi)存中的消息丟失,所以我們要把消息本身和元數(shù)據(jù)(隊(duì)列、交換機(jī)、綁定)都保存到磁盤中。
1、隊(duì)列持久化
聲明隊(duì)列時(shí)可以設(shè)置幾個(gè)重要的參數(shù):
// 聲明隊(duì)列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// springboot中
@Bean("reliableQueue")
public Queue queue() {
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}
參數(shù)解析:
durable:沒有持久化的隊(duì)列,保存在內(nèi)存中,服務(wù)重啟后隊(duì)列和消息都會(huì)丟失。設(shè)置為true可以保證消息持久化
exclusive:排他性隊(duì)列的特點(diǎn)是:只對(duì)首次聲明它的連接(Connection)可見;會(huì)在其連接斷開的時(shí)候自動(dòng)刪除。
autoDelete:沒有消費(fèi)者連接的時(shí)候,自動(dòng)刪除。
2、交換機(jī)持久化
聲明交換機(jī)時(shí)可以設(shè)置這幾個(gè)重要的參數(shù),和隊(duì)列類似:
// 聲明交換機(jī)
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);
// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {
// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}
3、消息持久化
在生產(chǎn)者發(fā)送消息時(shí),可以指定消息的配置:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2代表持久化
.contentEncoding("UTF-8") // 編碼
.expiration("10000") // TTL,過期時(shí)間
.headers(headers) // 自定義屬性
.priority(5) // 優(yōu)先級(jí),默認(rèn)為5,配合隊(duì)列的 x-max-priority 屬性使用
.messageId(String.valueOf(UUID.randomUUID()))
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一條正常的消息".getBytes(), messageProperties);
4、搭建集群
如果只有一個(gè)RabbitMQ節(jié)點(diǎn),即使交換機(jī)、隊(duì)列、消息做了持久化,如果服務(wù)崩潰或者硬件故障,RabbitMQ的服務(wù)一樣是不可以的。
所以為了提高M(jìn)Q服務(wù)的可用性,保證消息的傳輸,我們需要有多個(gè)RabbitMQ的節(jié)點(diǎn)。
RabbitMQ集群搭建與高可用實(shí)現(xiàn)
五、保證消費(fèi)者成功消費(fèi)消息
如果消費(fèi)者收到消息后沒來得及處理或者發(fā)生了一場(chǎng),就會(huì)導(dǎo)致消費(fèi)失敗。RabbitMQ提供了消費(fèi)者的消息確認(rèn)機(jī)制(message acknowledgement),消費(fèi)者可以自動(dòng)或者手動(dòng)地發(fā)送ACK給服務(wù)端。
如果消費(fèi)者拿到消息沒有ACK會(huì)怎么樣?
沒有收到ACK的消息,消費(fèi)者斷開連接之后,RabbitMQ會(huì)把這條消息發(fā)送給其他消費(fèi)者。如果沒有其他消費(fèi)者,消費(fèi)者重啟后會(huì)重新消費(fèi)這條消息,重復(fù)執(zhí)行業(yè)務(wù)邏輯。
1、自動(dòng)ACK
默認(rèn)就是自動(dòng)ACK。消費(fèi)者收到消息的時(shí)候
就會(huì)自動(dòng)發(fā)送ACK,而不是方法執(zhí)行成功的時(shí)候發(fā)送ACK。這種情況RabbitMQ只會(huì)關(guān)心消費(fèi)者是否接收到了消息,對(duì)消息的處理結(jié)果是不關(guān)心的,所以通常來說我們會(huì)選擇手動(dòng)ACK。
// 創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 開始獲取消息 autoAck參數(shù)為true代表自動(dòng)ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
同樣的,在Springboot中如果沒有特殊配置,默認(rèn)的就是自動(dòng)ACK。
2、手動(dòng)ACK
// 創(chuàng)建消費(fèi)者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
if (msg.contains("拒收")){
// 拒絕消息
// requeue:是否重新入隊(duì)列,true:是,會(huì)再發(fā)給其他消費(fèi)者;false:直接丟棄,相當(dāng)于告訴隊(duì)列可以直接刪除掉
// TODO 如果只有這一個(gè)消費(fèi)者,requeue 為true 的時(shí)候會(huì)造成消息重復(fù)消費(fèi)
// basicReject(long deliveryTag, boolean requeue)
channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("異常")){
// 批量拒絕
// requeue:是否重新入隊(duì)列
// TODO 如果只有這一個(gè)消費(fèi)者,requeue 為true 的時(shí)候會(huì)造成消息重復(fù)消費(fèi)
// basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(envelope.getDeliveryTag(), true, false);
} else {
// 手工應(yīng)答
// 如果不應(yīng)答,隊(duì)列中的消息會(huì)一直存在,重新連接的時(shí)候會(huì)重復(fù)消費(fèi)
// basicAck(long deliveryTag, boolean multiple)
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
};
// 開始獲取消息,注意這里開啟了手工應(yīng)答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);
在Springboot中,可以這樣配置應(yīng)答方式:
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中這樣設(shè)置:
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
消費(fèi)者中獲取到Channel對(duì)象,就可以進(jìn)行應(yīng)答了:
@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
@RabbitHandler
public void process(String msgContent,Channel channel, Message message) throws IOException {
System.out.println("Second Queue received msg : " + msgContent );
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
注意這個(gè)枚舉值:
NONE:自動(dòng)ACK
MANUAL:手動(dòng)ACK
AUTO:如果方法未拋出異常,則發(fā)送ACK。如果方法拋出異常,并且不是AmqpRejectAndDontRequeueException則發(fā)送nack,并且重新入隊(duì)。如果拋出異常是AmqpRejectAndDontRequeueException則發(fā)送nack并不會(huì)重新入隊(duì)。
注意!拒絕消息時(shí)如果requeue參數(shù)設(shè)置為true,可以把這條消息重新存入隊(duì)列,以便發(fā)給下一個(gè)消費(fèi)者處理,如果只有一個(gè)消費(fèi)者時(shí),這種方式可能會(huì)出現(xiàn)無限循環(huán)重復(fù)消費(fèi)的情況。
六、保證消息一致之消費(fèi)者回調(diào)
消費(fèi)者成功消費(fèi)了消息之后,如何告知生產(chǎn)者該消息已經(jīng)被成功消費(fèi)了?
雖然說使用MQ的目的之一是解耦,但是某些一致性要求很高的場(chǎng)景,比如說金融業(yè)務(wù),還是很有必要通知生產(chǎn)者的。
1、調(diào)用生產(chǎn)者API
例如,提單系統(tǒng)給其他系統(tǒng)發(fā)送了保險(xiǎn)消息后,其他系統(tǒng)必須在處理完消息之后調(diào)用提單系統(tǒng)提供的API,來修改提單系統(tǒng)中這筆數(shù)據(jù)的狀態(tài)。只要API沒有被調(diào)用,數(shù)據(jù)狀態(tài)沒有被修改,提單系統(tǒng)就認(rèn)為下游系統(tǒng)沒有收到這條消息。
但是這種方式又從解耦的狀態(tài)變成了耦合狀態(tài),還是需要根據(jù)實(shí)際情況來判斷是否要采用這種方式。
2、發(fā)送響應(yīng)消息給生產(chǎn)者
例如商業(yè)銀行與人民銀行二代支付系統(tǒng)(使用IBM MQ),無論是人行收到了商業(yè)銀行的消息,還是商業(yè)銀行收到了人行的消息,都必須發(fā)送一條響應(yīng)消息(叫做回執(zhí)報(bào)文)。
整個(gè)通信的流程設(shè)計(jì)非常復(fù)雜,但是對(duì)于金融場(chǎng)景下的消息可靠性保證,是很有用的。
七、保證消息一致性之補(bǔ)償機(jī)制
如果消費(fèi)者消費(fèi)了消息之后,一直遲遲沒有通知給生產(chǎn)者,我們?nèi)绾沃老M(fèi)者已經(jīng)成功消費(fèi)了消息?
就像是銀行A給銀行B發(fā)起了一筆轉(zhuǎn)賬,銀行B一直沒有給銀行A回調(diào),此時(shí)我們?nèi)绾未_定銀行B確定是已經(jīng)處理了該消息?
此時(shí)生產(chǎn)者與消費(fèi)者之間應(yīng)該約定一個(gè)超時(shí)時(shí)間,對(duì)于超出這個(gè)時(shí)間沒有得到響應(yīng)的消息,才確定為消費(fèi)失敗,比如說5分鐘。
1、消息結(jié)果查證
假如說生產(chǎn)者5分鐘內(nèi)沒有收到消費(fèi)者消費(fèi)成功的回調(diào),生產(chǎn)者可以主動(dòng)發(fā)起一次結(jié)果查證,通過業(yè)務(wù)要素或者唯一流水號(hào),查證該業(yè)務(wù)在消費(fèi)者是否正常消費(fèi)。
2、消息重發(fā)
假如說消息一直沒有結(jié)果,就需要考慮消息重發(fā)了。
可以啟動(dòng)一個(gè)定時(shí)任務(wù),比如30秒跑一次,查詢業(yè)務(wù)表業(yè)務(wù)狀態(tài)是中間狀態(tài)的記錄,查詢出來,構(gòu)建MQ消息,重新發(fā)送。也可以單獨(dú)設(shè)計(jì)一張消息表,把本系統(tǒng)所有發(fā)送出去的消息全部異步登記,狀態(tài)是未回復(fù)的消息,進(jìn)行重發(fā)(這種方式會(huì)對(duì)數(shù)據(jù)庫性能造成一定損耗)。
重發(fā)機(jī)制也不可能一直重復(fù)發(fā),如果消費(fèi)者確實(shí)是有bug或者其他問題,如果一直重復(fù)發(fā)送會(huì)導(dǎo)致死循環(huán)了。我們可以設(shè)置一個(gè)衰減機(jī)制,第一次間隔一分鐘,第二次間隔2分鐘,最終發(fā)送三次,三次過后如果還沒有收到回復(fù),就需要將消息設(shè)置為特殊狀態(tài),進(jìn)行人工干預(yù)。
3、消息冪等性
消息重發(fā)就意味著要處理消息的冪等。
什么是服務(wù)的冪等?為什么要實(shí)現(xiàn)冪等?
接口的冪等性——詳細(xì)談?wù)劷涌诘膬绲燃唇鉀Q方案
4、最終一致性
在一些一致性很強(qiáng)的接口調(diào)用中,比如說轉(zhuǎn)賬操作,通常會(huì)在一天的業(yè)務(wù)結(jié)束之后,第二天營(yíng)業(yè)之前,生產(chǎn)者和消費(fèi)者之間會(huì)進(jìn)行一次消息對(duì)賬。生成一個(gè)對(duì)賬文件,兩者分別解析對(duì)方的文件進(jìn)行對(duì)賬,如果確實(shí)有消息不一致的情況,會(huì)通過短信或者郵件的方式通知業(yè)務(wù)人員進(jìn)行手動(dòng)處理,要么把錢退回,要么把錢補(bǔ)上。
八、消息的順序性
在RabbitMQ中,一個(gè)隊(duì)列有多個(gè)消費(fèi)者時(shí),由于不同的消費(fèi)者消費(fèi)消息的速度是不一樣的,順序無法保證。只有一個(gè)隊(duì)列僅有一個(gè)消費(fèi)者的情況才能保證順序消費(fèi)(不同的業(yè)務(wù)消息發(fā)送到不同的專用隊(duì)列)。文章來源:http://www.zghlxwxcb.cn/news/detail-504697.html
除非負(fù)載的場(chǎng)景,不要用多個(gè)消費(fèi)者消費(fèi)消息,可以保證消息的順序性。文章來源地址http://www.zghlxwxcb.cn/news/detail-504697.html
到了這里,關(guān)于RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!