先自我介紹一下,小編浙江大學(xué)畢業(yè),去過(guò)華為、字節(jié)跳動(dòng)等大廠,目前阿里P7
深知大多數(shù)程序員,想要提升技能,往往是自己摸索成長(zhǎng),但自己不成體系的自學(xué)效果低效又漫長(zhǎng),而且極易碰到天花板技術(shù)停滯不前!
因此收集整理了一份《2024年最新大數(shù)據(jù)全套學(xué)習(xí)資料》,初衷也很簡(jiǎn)單,就是希望能夠幫助到想自學(xué)提升又不知道該從何學(xué)起的朋友。
既有適合小白學(xué)習(xí)的零基礎(chǔ)資料,也有適合3年以上經(jīng)驗(yàn)的小伙伴深入學(xué)習(xí)提升的進(jìn)階課程,涵蓋了95%以上大數(shù)據(jù)知識(shí)點(diǎn),真正體系化!
由于文件比較多,這里只是將部分目錄截圖出來(lái),全套包含大廠面經(jīng)、學(xué)習(xí)筆記、源碼講義、實(shí)戰(zhàn)項(xiàng)目、大綱路線、講解視頻,并且后續(xù)會(huì)持續(xù)更新
如果你需要這些資料,可以添加V獲?。簐ip204888 (備注大數(shù)據(jù))
正文
首先讓我們來(lái)了解一下,在消息隊(duì)列中,消息從生產(chǎn)者發(fā)送到交換機(jī),再到隊(duì)列,最后到消費(fèi)者,有哪些情況會(huì)導(dǎo)致消息的丟失?
-
發(fā)送時(shí)丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)交換機(jī);
- 消息到達(dá)交換機(jī)后未到達(dá)隊(duì)列;
-
MQ 宕機(jī),隊(duì)列中的消息會(huì)丟失;
-
消費(fèi)者接收到消息后未消費(fèi)就宕機(jī)了。
確保消息隊(duì)列的可靠性是分布式系統(tǒng)中不可或缺的一部分,因此我們需要采取措施來(lái)應(yīng)對(duì)這些挑戰(zhàn)。為了解決上述消息可靠性問(wèn)題,RabbitMQ提供了一系列的機(jī)制和最佳實(shí)踐,以確保消息在整個(gè)傳遞過(guò)程中得到妥善處理和保護(hù)。
本文將深入探討如何應(yīng)對(duì)這些挑戰(zhàn),介紹消息隊(duì)列中的關(guān)鍵概念,并詳細(xì)討論 RabbitMQ 提供的解決方案,包括生產(chǎn)者消息的確認(rèn)、消息的持久化、消費(fèi)者消息的確認(rèn)以及消息消費(fèi)失敗的重試機(jī)制。這些措施將有助于確保消息隊(duì)列在應(yīng)用程序中的可靠性和穩(wěn)定性。
一、生產(chǎn)者消息的確認(rèn)
1.1 生產(chǎn)者確認(rèn)機(jī)制
RabbitMQ 提供了 publisher confirm
機(jī)制,這是一種用于解決消息發(fā)送過(guò)程中可能出現(xiàn)的丟失問(wèn)題的機(jī)制。當(dāng)消息發(fā)送到 RabbitMQ 后,系統(tǒng)會(huì)返回一個(gè)結(jié)果給消息的發(fā)送者,以指示消息的處理狀態(tài)。這個(gè)結(jié)果有兩種可能的值:
-
publisher-confirm,發(fā)送者確認(rèn):
- 消息成功投遞到交換機(jī),系統(tǒng)返回
ack
(確認(rèn))。 - 消息未能成功投遞到交換機(jī),系統(tǒng)返回
nack
(未確認(rèn))。
- 消息成功投遞到交換機(jī),系統(tǒng)返回
-
publisher-return,發(fā)送者回執(zhí):
- 消息成功投遞到交換機(jī),但是沒(méi)有成功路由到隊(duì)列,系統(tǒng)返回
ACK
,同時(shí)提供路由失敗的原因。
- 消息成功投遞到交換機(jī),但是沒(méi)有成功路由到隊(duì)列,系統(tǒng)返回
這個(gè)確認(rèn)機(jī)制的目的是確保消息在發(fā)送到消息隊(duì)列后,發(fā)送者能夠獲得有關(guān)消息處理狀態(tài)的明確反饋,從而可以采取適當(dāng)?shù)拇胧?,例如重發(fā)消息或記錄失敗信息。
需要注意的是,為了實(shí)現(xiàn)這一機(jī)制,需要為每條消息設(shè)置一個(gè)全局唯一的標(biāo)識(shí)符,以便區(qū)分不同的消息,避免在確認(rèn)過(guò)程中出現(xiàn)沖突。
例如下圖所示:
確保消息生產(chǎn)者能夠獲得有關(guān)消息狀態(tài)的反饋是確保消息可靠性的關(guān)鍵一步,因?yàn)樗兄诮鉀Q消息可能在發(fā)送期間丟失的問(wèn)題。這是構(gòu)建可靠的消息隊(duì)列系統(tǒng)中的重要組成部分。
1.2 實(shí)現(xiàn)生產(chǎn)者消息的確認(rèn)
下面將通過(guò)一個(gè) Java 的 Spring Boot 項(xiàng)目來(lái)演示如何實(shí)現(xiàn)生產(chǎn)者消息的確認(rèn)。這個(gè)項(xiàng)目的結(jié)構(gòu)如下:
這個(gè)項(xiàng)目有兩個(gè)模塊,其中 consumer
負(fù)責(zé)對(duì)消息的消費(fèi),而 publisher
負(fù)責(zé)發(fā)送消息。下面是在 publisher
模塊中實(shí)現(xiàn)消息確認(rèn)的具體步驟:
- 在
publisher
服務(wù)中的application.yml
文件中添加如下配置:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
對(duì)這個(gè)配置的詳細(xì)說(shuō)明:
-
publish-confirm-type
:開(kāi)啟publisher-confirm
功能,這里支持兩種類型:-
simple
:同步等待confirm
結(jié)果,直到超時(shí); -
correlated
:異步回調(diào),定義ConfirmCallback
,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
。
-
-
publish-returns
:開(kāi)啟publish-return
功能,同樣是基于callback
機(jī)制,不過(guò)是定義ReturnCallback
; -
template.mandatory
:定義消息路由失敗時(shí)的策略。true
,則調(diào)用ReturnCallback
;false
,則直接丟棄消息。
- 給
RabbitTemplate
配置ReturnCallback
:
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取 RabbitTemplate 對(duì)象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置 ReturnCallBack
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 記錄日志
log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},路由 Key:{},消息:{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要,接下來(lái)可以重發(fā)消息,或者執(zhí)行其他通知邏輯
});
}
}
由于每個(gè) RabbitTemplate
只能配置一個(gè) ReturnCallback
,并且 RabbitTemplate
在Spring 中是一個(gè)全局對(duì)象,因此需要在項(xiàng)目啟動(dòng)過(guò)程中配置。
上述代碼就是一個(gè) Spring Boot 的配置類,通常用于在項(xiàng)目啟動(dòng)時(shí)配置一些全局的設(shè)置。在這個(gè)配置類中,實(shí)現(xiàn)了 ApplicationContextAware
接口,用于獲取 Spring 應(yīng)用上下文(ApplicationContext
)對(duì)象。主要作用是配置 RabbitMQ 的 ReturnCallback
,以處理消息發(fā)送到隊(duì)列失敗的情況。
- 發(fā)送消息,指定消息的 ID以及消息的
ConfirmCallback
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple.test";
// 1. 準(zhǔn)備消息
String message = "hello, spring amqp!";
// 2. 準(zhǔn)備 CorrelationDate
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.準(zhǔn)備 ConfirmCallback
correlationData.getFuture().addCallback(confirm -> {
// 消息發(fā)送成功
// 判斷結(jié)果
if(confirm != null && confirm.isAck()){
// ACK
log.debug("消息投遞到交換機(jī)成功!消息 ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投遞到交換機(jī)失敗!消息 ID: {}", correlationData.getId());
}
}, throwable -> {
// 發(fā)送失敗
// 記錄日志
log.error("消息發(fā)送失??!", throwable);
// 重發(fā)消息...
});
// 3. 發(fā)送消息
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
}
這是一個(gè) Java 測(cè)試方法,用于發(fā)送消息到 RabbitMQ 隊(duì)列,并指定消息的 ID 以及 ConfirmCallback
(確認(rèn)回調(diào))。以下是對(duì)這段代碼的詳細(xì)解釋:
-
testSendMessage2SimpleQueue
: 這是一個(gè)測(cè)試方法,用于演示如何發(fā)送消息到名為 “simple.test” 的 RabbitMQ 隊(duì)列。 -
String routingKey = "simple.test";
: 定義了消息的路由鍵,這是用于將消息路由到特定隊(duì)列的關(guān)鍵。 -
準(zhǔn)備消息:將要發(fā)送的消息內(nèi)容存儲(chǔ)在
message
變量中。 -
準(zhǔn)備
CorrelationData
:-
CorrelationData
用于關(guān)聯(lián)消息的 ID。 - 使用
UUID.randomUUID().toString()
生成一個(gè)全局唯一的消息 ID。
-
-
準(zhǔn)備
ConfirmCallback
:-
CorrelationData.getFuture().addCallback(confirm -> { ... }, throwable -> { ... })
定義了ConfirmCallback
,該回調(diào)會(huì)在消息的發(fā)送狀態(tài)發(fā)生變化時(shí)觸發(fā)。 - 在
ConfirmCallback
中,判斷了消息是否成功投遞到交換機(jī):- 如果
confirm
不為 null 且confirm.isAck()
為true
,則表示消息成功到達(dá)交換機(jī),記錄一條成功的日志。 - 否則,如果消息未成功到達(dá)交換機(jī),則記錄一條失敗的日志。
- 如果
- 在
throwable
回調(diào)中,處理了發(fā)送失敗的情況,記錄了失敗的日志,可以在這里添加重發(fā)消息或其他失敗處理邏輯。
-
-
發(fā)送消息:
- 使用
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
發(fā)送消息到 RabbitMQ。 - 參數(shù)包括交換機(jī)名稱、路由鍵、消息內(nèi)容和關(guān)聯(lián)的
CorrelationData
。
- 使用
這段代碼演示了如何發(fā)送消息并在消息狀態(tài)變化時(shí)使用 ConfirmCallback
處理消息的確認(rèn)情況。通過(guò)關(guān)聯(lián)消息 ID 和 ConfirmCallback
,可以確保消息的可靠性,根據(jù)確認(rèn)情況采取適當(dāng)?shù)拇胧?/p>
1.3 驗(yàn)證生產(chǎn)者消息的確認(rèn)
下面通過(guò)可以運(yùn)行上述測(cè)試代碼來(lái)查看生產(chǎn)者的消息確認(rèn)情況:
- 正常發(fā)送消息
直接執(zhí)行測(cè)試方法,可以發(fā)現(xiàn)消息成功投遞到交換機(jī):
- 發(fā)送消息失敗
此時(shí),將交換機(jī)的名稱改成一個(gè)錯(cuò)誤不存在的:然后再次執(zhí)行測(cè)試方法:
發(fā)現(xiàn)此時(shí)消息投遞到交換機(jī)失敗,說(shuō)明此時(shí)返回的是 NACK
,并且提示了錯(cuò)誤的原因是找不到名為 aamq.topic
的交換機(jī)。
- 成功發(fā)送消息,但是路由失敗
此時(shí)將交換機(jī)的名稱修改回來(lái),但是將路由 Key 修改成錯(cuò)誤的:
然后執(zhí)行測(cè)試方法:
通過(guò)輸出的日志可以發(fā)現(xiàn),消息成功投遞到了交換機(jī),但是由于路由 Key 不正確,導(dǎo)致路由不到
simple,queue
,從而觸發(fā)調(diào)用了上文配置的ReturnCallback
。
二、消息的持久化
在通過(guò)上文的生產(chǎn)者消息確認(rèn)機(jī)制之后,確保了消息能夠正確的發(fā)送到隊(duì)列中,但是這并不意味著消息就安全了。因?yàn)?RabbitMQ 默認(rèn)是內(nèi)存儲(chǔ)存的,如果出現(xiàn)了 RabbitMQ 宕機(jī)的情況,那么此時(shí)隊(duì)列中的消息還是會(huì)丟失。要確保消息能夠真正的安全,我們還需要實(shí)現(xiàn)消息的持久化。
2.1 演示消息的丟失
例如,現(xiàn)在 simple.queue
中存在 3 條消息:
這些消息是通過(guò) RabbitMQ 自帶的交換機(jī) amp.topic
進(jìn)行轉(zhuǎn)發(fā)的:
然后我們重啟一下 RabbitMQ 服務(wù),看一看隊(duì)列中的消息是否還存在:
此時(shí)我們重新服務(wù) RabbitMQ 的控制臺(tái),發(fā)現(xiàn)連 simple.queue
都消失了:
但是RabbitMQ自帶的 amp.topic
交換機(jī)還存在:
說(shuō)明,這個(gè)交換機(jī)是持久化儲(chǔ)存的,如果仔細(xì)觀察可以發(fā)現(xiàn),這些所有的交換機(jī)的
Features
都帶有一個(gè) D
,即持久化 Durable。
因此要讓我們自己創(chuàng)建的隊(duì)列或者交換機(jī)也能持久存在,就可以否選上 Durable 這個(gè)選項(xiàng):
2.2 聲明持久化的交換機(jī)和隊(duì)列
通過(guò)上文我們知道了可以在 RabbitMQ 的控制臺(tái)創(chuàng)建交換機(jī)和隊(duì)列的時(shí)候可以勾選 Durable 來(lái)達(dá)到持久化的目的,但是如果使用代碼來(lái)創(chuàng)建持久化的交換機(jī)和隊(duì)列呢?下面我將使用 Java 代碼來(lái)演示這個(gè)過(guò)程:
由于消費(fèi)者comsumer
在啟動(dòng)的時(shí)候可以幫我們創(chuàng)建交換機(jī)和隊(duì)列,因此將交換機(jī)和隊(duì)列的聲明交給 consumer
來(lái)完成。
- 聲明持久化的交換機(jī)
@Configuration
public class CommonConfig {
@Bean
public DirectExchange simpleDirect(){
// DirectExchange的構(gòu)造方法有三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒(méi)有 queue 與其綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("simple.direct", true, false);
}
}
- 聲明持久化的隊(duì)列持久化
@Configuration
public class CommonConfig {
// ...
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊(duì)列,其中使用 durable 方法就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
}
當(dāng)完成了上面兩步之后,我們可以啟動(dòng) consumer
服務(wù):
此時(shí),我們發(fā)現(xiàn)成功創(chuàng)建了simple.direct
交換機(jī)和 simple.queue
隊(duì)列,并且它們都是持久的。然后停止consumer
服務(wù),在 RabbitMQ 的控制臺(tái)中向 simple.queue
添加一條消息:
然后再次重啟 RabbitMQ 服務(wù),發(fā)現(xiàn)剛才創(chuàng)建的交換機(jī)和隊(duì)列都還在,但是消息卻沒(méi)有了:因?yàn)槲覄偛盘砑拥氖欠浅志没南ⅲ?br>
2.3 發(fā)送持久化的消息
同樣,在控制臺(tái)添加消息的時(shí)候可以設(shè)置消息的持久化和非持久化,下面讓我來(lái)演示然后在使用 Java 代碼發(fā)送持久化的消息:
@Test
public void testDurableMessage() {
// 1. 準(zhǔn)備消息
Message message = MessageBuilder.withBody("hello, simple.queue".getBytes(StandardCharsets.UTF\_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2. 發(fā)送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
在發(fā)送持久化的消息需要使用MessageBuilder
來(lái)構(gòu)建消息,其中withBody
用于指定消息體;setDeliveryMode
用來(lái)設(shè)置消息的發(fā)送類型,可以是持久化的,也可以是非持久化的;build
與構(gòu)建消息。
完成上述代碼之后,我們可以執(zhí)行這個(gè)測(cè)試方法:
查看 RabbitMQ 的控制臺(tái),發(fā)現(xiàn)成功發(fā)送了消息,并且其中的 delivery_mode
為 2,代表的就是持久化:
再次重啟 RabbitMQ 服務(wù):
此時(shí)發(fā)現(xiàn)剛才的消息并沒(méi)有丟失,至此我們就完成了持久化消息的發(fā)送,進(jìn)一步確保了消息的可靠性。另外,其實(shí)在使用 Spring AMQP 創(chuàng)建的交換機(jī),隊(duì)列和發(fā)送的消息都是持久化的。
三、消費(fèi)者消息的確認(rèn)
網(wǎng)上學(xué)習(xí)資料一大堆,但如果學(xué)到的知識(shí)不成體系,遇到問(wèn)題時(shí)只是淺嘗輒止,不再深入研究,那么很難做到真正的技術(shù)提升。
需要這份系統(tǒng)化的資料的朋友,可以添加V獲?。簐ip204888 (備注大數(shù)據(jù))
一個(gè)人可以走的很快,但一群人才能走的更遠(yuǎn)!不論你是正從事IT行業(yè)的老鳥(niǎo)或是對(duì)IT行業(yè)感興趣的新人,都?xì)g迎加入我們的的圈子(技術(shù)交流、學(xué)習(xí)資源、職場(chǎng)吐槽、大廠內(nèi)推、面試輔導(dǎo)),讓我們一起學(xué)習(xí)成長(zhǎng)!文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-858426.html
此時(shí)發(fā)現(xiàn)剛才的消息并沒(méi)有丟失,至此我們就完成了持久化消息的發(fā)送,進(jìn)一步確保了消息的可靠性。另外,其實(shí)在使用 Spring AMQP 創(chuàng)建的交換機(jī),隊(duì)列和發(fā)送的消息都是持久化的。
三、消費(fèi)者消息的確認(rèn)
網(wǎng)上學(xué)習(xí)資料一大堆,但如果學(xué)到的知識(shí)不成體系,遇到問(wèn)題時(shí)只是淺嘗輒止,不再深入研究,那么很難做到真正的技術(shù)提升。
需要這份系統(tǒng)化的資料的朋友,可以添加V獲?。簐ip204888 (備注大數(shù)據(jù))
[外鏈圖片轉(zhuǎn)存中…(img-DayOzvXk-1713350021288)]文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-858426.html
一個(gè)人可以走的很快,但一群人才能走的更遠(yuǎn)!不論你是正從事IT行業(yè)的老鳥(niǎo)或是對(duì)IT行業(yè)感興趣的新人,都?xì)g迎加入我們的的圈子(技術(shù)交流、學(xué)習(xí)資源、職場(chǎng)吐槽、大廠內(nèi)推、面試輔導(dǎo)),讓我們一起學(xué)習(xí)成長(zhǎng)!
到了這里,關(guān)于【RabbitMQ】RabbitMQ 消息的可靠性 —— 生產(chǎn)者和消費(fèi)者消息的確認(rèn),消息的持久化以及消費(fèi)失敗的重試機(jī)制_rabbitmq 生產(chǎn)者消息確認(rèn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!