???????????????????????????????????????????????????????????????????? 【 R a b b i t M Q 教程】第三章—— R a b b i t M Q ? 發(fā)布確認(rèn) \color{#FF1493}{【RabbitMQ教程】第三章 —— RabbitMQ - 發(fā)布確認(rèn)} 【RabbitMQ教程】第三章——RabbitMQ?發(fā)布確認(rèn)?? ?????????
?? 仰望天空,妳我亦是行人.?
?? 個人主頁——微風(fēng)撞見云的博客??
?? 《數(shù)據(jù)結(jié)構(gòu)與算法》專欄的文章圖文并茂??生動形象??簡單易學(xué)!歡迎大家來踩踩~??
?? 《Java學(xué)習(xí)筆記》專欄的文章是本人在Java學(xué)習(xí)中總結(jié)的一些知識點~ ??
?? 《每天一點小知識》專欄的文章可以豐富你的知識庫,滴水成河~ ??
?? 《RabbitMQ》專欄的文章是在學(xué)習(xí)尚硅谷課程時整理的筆記,方便復(fù)習(xí)鞏固~ ??
?? 希望本文能夠給讀者帶來一定的幫助~??文章粗淺,敬請批評指正!??
??RabbitMQ - 發(fā)布確認(rèn)
發(fā)布確認(rèn)邏輯
生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進入 confirm 模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker 就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號,此外 broker 也可以設(shè)置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理。
confirm 模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ 因為自身內(nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條 nack 消息, 生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該 nack 消息。
發(fā)布確認(rèn)的策略
開啟發(fā)布確認(rèn)的方法:
發(fā)布確認(rèn)默認(rèn)是沒有開啟的,如果要開啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布確認(rèn),都需要在 channel 上調(diào)用該方法
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
單個確認(rèn)發(fā)布
這是一種簡單的確認(rèn)方式,它是一種同步確認(rèn)發(fā)布的方式,也就是發(fā)布一個消息之后只有它被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)
這個方法只有在消息被確認(rèn)的時候才返回,如果在指定時間范圍內(nèi)這個消息沒有被確認(rèn)那么它將拋出異常。
這種確認(rèn)方式有一個最大的缺點就是:發(fā)布速度特別的慢,因為如果沒有確認(rèn)發(fā)布的消息就會阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對于某些應(yīng)用程序來說這可能已經(jīng)足夠了。
/**
* 單個發(fā)送
*/
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//隊列聲明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服務(wù)端返回 false 或超時時間內(nèi)未返回,生產(chǎn)者可以消息重發(fā)
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息發(fā)送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個單獨確認(rèn)消息,耗時" + (end - begin) + "ms");
}
批量確認(rèn)發(fā)布
上面那種方式非常慢,與單個等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量,當(dāng)然這種方式的缺點就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,不知道是哪個消息出 問題了,我們必須將整個批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布。
/**
* 批量
*/
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//隊列聲明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
//批量確認(rèn)消息大小
int batchSize = 100;
//未確認(rèn)消息個數(shù)
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//為了確保還有剩余沒有確認(rèn)消息 再次確認(rèn)
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個批量確認(rèn)消息,耗時" + (end - begin) + "ms");
}
異步確認(rèn)發(fā)布
異步確認(rèn)雖然編程邏輯比上兩個要復(fù)雜,但是性價比最高,無論是可靠性還是效率都沒得說, 他是利用回調(diào)函數(shù)來達到消息可靠性傳遞的,這個中間件也是通過函數(shù)回調(diào)來保證是否投遞成功, 下面就讓我們來詳細(xì)講解異步確認(rèn)是怎么實現(xiàn)的。
如何處理異步未確認(rèn)消息?
最好的解決的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊列, 比如說用 ConcurrentLinkedQueue 這個隊列在 confirm callbacks 與發(fā)布線程之間進行消息的傳遞。
以上 3 種發(fā)布確認(rèn)速度對比 :
-
單獨發(fā)布消息
同步等待確認(rèn),簡單,但吞吐量非常有限。
-
批量發(fā)布消息
批量同步等待確認(rèn),簡單,合理的吞吐量,一旦出現(xiàn)問題但很難推斷出是那條消息出現(xiàn)了問題。
-
異步處理
最佳性能和資源使用,在出現(xiàn)錯誤的情況下可以很好地控制,但是實現(xiàn)起來稍微難些
??結(jié)語
??初學(xué)一門技術(shù)時,總有些許的疑惑,別怕,它們是我們學(xué)習(xí)路上的點點繁星,幫助我們不斷成長。
??文章粗淺,希望對大家有幫助!文章來源:http://www.zghlxwxcb.cn/news/detail-482892.html
??下一篇 -->【RabbitMQ教程】第四章 —— RabbitMQ - 交換機文章來源地址http://www.zghlxwxcb.cn/news/detail-482892.html
到了這里,關(guān)于【RabbitMQ教程】第三章 —— RabbitMQ - 發(fā)布確認(rèn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!