生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式, 所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的 ID (從 1 開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,broker 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 broker 也可以設(shè)置basic.ack 的 multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。
單個(gè)確認(rèn)發(fā)布 ?
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
public class publishMessageIndividually {
private static final int MESSAGE_COUNT = 5;
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服務(wù)端返回 false 或超時(shí)時(shí)間內(nèi)未返回,生產(chǎn)者可以消息重發(fā)
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息發(fā)送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí)" + (end - begin) +
"ms");
}
}
}
耗時(shí)
?批量確認(rèn)發(fā)布
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
public class publishMessageBatch {
private static final int MESSAGE_COUNT = 5;
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();
//批量確認(rèn)消息大小
int batchSize = 100;
//未確認(rèn)消息個(gè)數(shù)
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//為了確保還有剩余沒(méi)有確認(rèn)消息 再次確認(rèn)
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)批量確認(rèn)消息,耗時(shí)" + (end - begin) +
"ms");
}
}
public static void main(String[] args) throws Exception {
publishMessageBatch.publishMessageBatch();
}
}
?耗時(shí)
?異步確認(rèn)發(fā)布
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class publishMessageAsync {
private static final int MESSAGE_COUNT = 5;
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();
/**
* 線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
* 1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián)
* 2.輕松批量刪除條目 只要給到序列號(hào)
* 3.支持并發(fā)訪問(wèn)
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 確認(rèn)收到消息的一個(gè)回調(diào)
* 1.消息序列號(hào)
* 2.true 可以確認(rèn)小于等于當(dāng)前序列號(hào)的消息
* false 確認(rèn)當(dāng)前序列號(hào)消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息集合 是一個(gè) map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認(rèn)消息集合
confirmed.clear();
}else{
//只清除當(dāng)前序列號(hào)的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發(fā)布的消息"+message+"未被確認(rèn),序列號(hào)"+sequenceNumber);
};
/**
* 添加一個(gè)異步確認(rèn)的監(jiān)聽(tīng)器
* 1.確認(rèn)收到消息的回調(diào)
* 2.未收到消息的回調(diào)
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
* 通過(guò)序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián)
* 全部都是未確認(rèn)的消息體
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) +
"ms");
}
}
public static void main(String[] args) throws Exception {
publishMessageAsync.publishMessageAsync();
}
}
耗時(shí)
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-663076.html
?以上 3 種發(fā)布確認(rèn)速度對(duì)比
單獨(dú)發(fā)布消息
批量發(fā)布消息
消息出現(xiàn)了問(wèn)題。
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-663076.html
到了這里,關(guān)于rabbitmq的發(fā)布確認(rèn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!