什么是 RabbitMQ?
RabbitMQ 是一個開源的消息中間件,使用 Erlang 語言開發(fā)。這種語言天生非常適合分布式場景,RabbitMQ 也就非常適用于在分布式應(yīng)用程序之間傳遞消息。RabbitMQ 有非常多顯著的特點(diǎn):
消息傳遞模式:RabbitMQ 支持多種消息傳遞模式,包括發(fā)布/訂閱、點(diǎn)對點(diǎn)和工作隊列等,使其更靈活適用于各種消息通信場景。
消息路由和交換機(jī):RabbitMQ 引入了交換機(jī)(Exchange)的概念,用于將消息路由到一個或多個隊列。這允許根據(jù)消息的內(nèi)容、標(biāo)簽或路由鍵進(jìn)行靈活的消息路由,從而實現(xiàn)更復(fù)雜的消息傳遞邏輯。
消息確認(rèn)機(jī)制:RabbitMQ 支持消息確認(rèn)機(jī)制,消費(fèi)者可以確認(rèn)已成功處理消息。這確保了消息不會在傳遞后被重復(fù)消費(fèi),增加了消息的可靠性。
可擴(kuò)展性:RabbitMQ 是高度可擴(kuò)展的,可以通過添加更多的節(jié)點(diǎn)和集群來增加吞吐量和可用性。這使得 RabbitMQ 適用于大規(guī)模的分布式系統(tǒng)。
多種編程語言支持:RabbitMQ 提供了多種客戶端庫和插件,支持多種編程語言,包括 Java、Python、Ruby、Node.js 等,使其在不同技術(shù)棧中都能方便地集成和使用。
消息持久性:RabbitMQ 允許消息和隊列的持久性設(shè)置,確保消息在 RabbitMQ 重新啟動后不會丟失。這對于關(guān)鍵的業(yè)務(wù)消息非常重要。
靈活的插件系統(tǒng):RabbitMQ 具有豐富的插件系統(tǒng),使其可以擴(kuò)展功能,包括管理插件、數(shù)據(jù)復(fù)制插件、分布式部署插件等。
管理界面:RabbitMQ 提供了一個易于使用的 Web 管理界面,用于監(jiān)視和管理隊列、交換機(jī)、連接和用戶權(quán)限等。
總之,RabbitMQ 是一個功能豐富、高度可擴(kuò)展且靈活的消息中間件,適用于各種分布式應(yīng)用程序和消息通信需求。它的強(qiáng)大功能和廣泛的社區(qū)支持使其成為一個流行的消息中間件解決方案。
RabbitMQ 和 AMQP 是什么關(guān)系?
RabbitMQ 和 AMQP 有著非常密切的關(guān)系,但是他們是屬于完全不同的兩個概念。
-
AMQP: AMQP 不是一個具體的消息中間件產(chǎn)品,而是一個協(xié)議規(guī)范。他是一個開放的
消息產(chǎn)地協(xié)議
,是一種應(yīng)用層的標(biāo)準(zhǔn)協(xié)議,為面向消息的中間件設(shè)計。AMQP 提供了一種統(tǒng)一的消息服務(wù),使得不同程序之間可以通過消息隊列進(jìn)行通信。 SpringBoot 框架默認(rèn)就提供了對 AMQP 協(xié)議的支持。 -
RabbitMQ:RabbitMQ則是一個開源的消息中間件,是一個具體的軟件產(chǎn)品。RabbitMQ 使用 AMQP 協(xié)議來實現(xiàn)
消息傳遞
的標(biāo)準(zhǔn),但其實他也支持其他消息傳遞協(xié)議,如 STOMP 和 MQTT。RabbitMQ 基于 AMQP 協(xié)議定義的消息格式和交互流程,實現(xiàn)了消息在生產(chǎn)者、交換機(jī)、隊列之間的傳遞和處理。
總之,AMQP 本質(zhì)上是一個開放的標(biāo)準(zhǔn),他不光可以被 RabbitMQ 實現(xiàn),也可以被其他產(chǎn)品實現(xiàn)。通過這種標(biāo)準(zhǔn)的協(xié)議,實際上是可以在不同的消息中間件系統(tǒng)之間進(jìn)行靈活的消息傳遞。只不過,目前具體實現(xiàn)這種標(biāo)準(zhǔn)的產(chǎn)品目前并不多,RabbitMQ 則是最有影響力的一個產(chǎn)品。因此,RabbitMQ 成了 AMQP 協(xié)議事實上的代表。SpringBoot 框架默認(rèn)提供的 AMQP 協(xié)議支持底層也是基于 RabbitMQ 產(chǎn)品實現(xiàn)的。
RabbitMQ 的核心組件有哪些?
RabbitMQ的核心組件包括以下幾部分,他們共同構(gòu)成了 RabbitMQ 的基本架構(gòu):
- Broker:RabbitMQ服務(wù)器,負(fù)責(zé)接收和分發(fā)消息的應(yīng)用。
- Virtual Host:虛擬主機(jī),是RabbitMQ中的邏輯容器,用于隔離不同環(huán)境或不同應(yīng)用程序的信息流。每個虛擬主機(jī)都有自己的隊列、交換機(jī)等設(shè)置,可以理解為一個獨(dú)立的RabbitMQ服務(wù)。
- Connection 連接:管理和維護(hù)與RabbitMQ服務(wù)器的TCP連接,生產(chǎn)者、消費(fèi)者通過這個連接和 Broker 建立物理網(wǎng)絡(luò)連接。
- Channel 通道:是在Connection 內(nèi)創(chuàng)建的輕量級通信通道,用于進(jìn)行消息的傳輸和交互。應(yīng)用程序通過Channel進(jìn)行消息的發(fā)送和接收。通常一個 Connection 可以建立多個 Channel。
- Exchange 交換機(jī):交換機(jī)是消息的中轉(zhuǎn)站,負(fù)責(zé)接收來自生產(chǎn)者的消息,并將其路由到一個或多個隊列中。RabbitMQ 提供了多種不同類型的交換機(jī),每種類型的交換機(jī)都有不同的消息路由規(guī)則。
- Queue 隊列:隊列是消息的存儲位置。每個隊列都有一個唯一的名稱。消息從交換機(jī)路由到隊列,然后等待消費(fèi)者來獲取和處理。
- Binding 綁定關(guān)系: Binding 是 Exchange 和 Queue 之間的關(guān)聯(lián)規(guī)則,定義了消息如何從交換機(jī)路由到特定的隊列。
此外,生產(chǎn)者和消費(fèi)者也是RabbitMQ的核心組件,生產(chǎn)者負(fù)責(zé)發(fā)送消息到Exchange或者 Queue,消費(fèi)者負(fù)責(zé)從Queue中訂閱和處理消息。
這些核心組件共同構(gòu)建了 RabbitMQ 的消息傳遞系統(tǒng),他們協(xié)同工作才能實現(xiàn)消息的可靠傳遞、路由和業(yè)務(wù)處理等功能。
RabbitMQ 中有哪幾種交換機(jī)類型?
RabbitMQ 支持多種交換機(jī)(Exchange)類型,每種類型都用于不同的消息路由和分發(fā)策略:
Direct Exchange(直連交換機(jī))
這種交換機(jī)根據(jù)消息的路由鍵(Routing Key)將消息發(fā)送到與之完全匹配的隊列。只有當(dāng)消息的路由鍵與隊列綁定時指定的路由鍵完全相同時,消息才會被路由到隊列。這是一種簡單的路由策略,適用于點(diǎn)對點(diǎn)通信。
路由鍵與隊列名完全匹配交換機(jī),此種類型交換機(jī),通過RoutingKey路由鍵將交換機(jī)和隊列進(jìn)行綁定, 消息被發(fā)送到exchange時,需要根據(jù)消息的RoutingKey,來進(jìn)行匹配,只將消息發(fā)送到完全匹配到此RoutingKey的隊列。
比如:如果一個隊列綁定到交換機(jī)要求路由鍵為“key”,則只轉(zhuǎn)發(fā)RoutingKey標(biāo)記為“key”的消息,不會轉(zhuǎn)發(fā)"key1",也不會轉(zhuǎn)發(fā)“key.1”等等。它是完全匹配、單播的模式
同一個key可以綁定多個queue隊列;當(dāng)匹配到key1時,queue1和queue2都可以收到消息
Topic Exchange(主題交換機(jī))
這種交換機(jī)根據(jù)消息的路由鍵與隊列綁定時指定的路由鍵模式(通配符)匹配程度,將消息路由到一個或多個隊列。路由鍵可以使用通配符符號 *(匹配一個單詞)和 #(匹配零個或多個單詞),允許更靈活的消息路由。用于發(fā)布/訂閱模式和復(fù)雜的消息路由需求。
Topic,主題類型交換機(jī),此種交換機(jī)與Direct類似,也是需要通過routingkey路由鍵進(jìn)行匹配分發(fā),區(qū)別在于Topic可以進(jìn)行模糊匹配,Direct是完全匹配。
- Topic中,將routingkey通過"."來分為多個部分
- “*”:代表一個部分
- “#”:代表0個或多個部分(如果綁定的路由鍵為 “#” 時,則接受所有消息,因為路由鍵所有都匹配)
然后發(fā)送一條信息,routingkey為"key1.key2.key3.key4",那么根據(jù)"."將這個路由鍵分為了4個部分,此條路由鍵,將會匹配:
- key1.key2.key3.*:成功匹配,因為 * 可以代表一個部分
- key1.# :成功匹配,因為#可以代表0或多個部分
- .key2..key4: 成功匹配,因為第一和第三部分分別為key1和key3,且為4個部分,剛好匹配
- #.key3.key4:成功匹配,#可以代表多個部分,正好匹配中了我們的key1和key2
如果發(fā)送消息routingkey為"key1",那么將只能匹配中key1.#,#可以代表0個部分
Headers Exchange(頭部交換機(jī))
這種交換機(jī)根據(jù)消息的標(biāo)頭信息(Headers)來決定消息的路由,而不是使用路由鍵。隊列和交換機(jī)之間的綁定規(guī)則是根據(jù)標(biāo)頭鍵值對來定義的,只有當(dāng)消息的標(biāo)頭與綁定規(guī)則完全匹配時,消息才會被路由到隊列。適用于需要復(fù)雜消息匹配的場景。
headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了
消費(fèi)方指定的headers中必須包含一個"x-match"的鍵。鍵"x-match"的值有2個:
- x-match = all :表示所有的鍵值對都匹配才能接受到消息
-
x-match = any :表示只要有鍵值對匹配就能接受到消息
發(fā)送消息時間,如果其他參數(shù)信息是{ “name”:“xiaomingXX”, “sex”:“男” },因為queue2的x-match是any,只需要有一個鍵值對匹配所以就能接收到消息,所以queue2可以接收到消息;queue1的x-match是all,需要所有的鍵值對都匹配才能接收到消息,所以此時queue1接收不到消息
Fanout Exchange(廣播交換機(jī))
這種交換機(jī)將消息廣播到與之綁定的所有隊列,無論消息的路由鍵是什么。用于發(fā)布/訂閱模式,其中一個消息被廣播給所有訂閱者。
Fanout,廣播類型交換機(jī),此種交換機(jī),會將消息分發(fā)給所有綁定了此交換機(jī)的隊列,此時RoutingKey參數(shù)無效。
fanout類型交換機(jī)下發(fā)送消息一條,無論RoutingKey是什么,queue1,queue2,queue3,queue4都可以收到消息
Default Exchange(默認(rèn)交換機(jī))
這是 RabbitMQ 默認(rèn)實現(xiàn)的一種交換機(jī),它不需要手動創(chuàng)建。當(dāng)消息發(fā)布到默認(rèn)交換機(jī)時,路由鍵會被解釋為隊列的名稱,消息會被路由到與路由鍵名稱相同的隊列。默認(rèn)交換機(jī)通常用于點(diǎn)對點(diǎn)通信,但不支持復(fù)雜的路由策略。
這些不同類型的交換機(jī)允許你在 RabbitMQ 中實現(xiàn)各種不同的消息路由和分發(fā)策略,以滿足不同的應(yīng)用需求。選擇適當(dāng)?shù)慕粨Q機(jī)類型對于有效的消息傳遞非常重要。
RabbitMQ 如何保證消息不丟失/消息持久化?
根據(jù)上圖我們能知道整個流程中可能會出現(xiàn)三種消息丟失場景:
● 生產(chǎn)者發(fā)送消息到 RabbitMQ 服務(wù)器的過程中出現(xiàn)消息丟失。 可能是網(wǎng)絡(luò)波動未收到消息,又或者是服務(wù)器宕機(jī)。
● RabbitMQ 服務(wù)器消息持久化出現(xiàn)消息丟失。 消息發(fā)送到 RabbitMQ 之后,未能及時存儲完成持久化,RabbitMQ 服務(wù)器出現(xiàn)宕機(jī)重啟,消息出現(xiàn)丟失。
● 消費(fèi)者拉取消息過程以及拿到消息后出現(xiàn)消息丟失。 消費(fèi)者從 RabbitMQ 服務(wù)器獲取到消息過程出現(xiàn)網(wǎng)絡(luò)波動等問題可能出現(xiàn)消息丟失;消費(fèi)者拿到消息后但是消費(fèi)者未能正常消費(fèi),導(dǎo)致丟失,可能是消費(fèi)者出現(xiàn)處理異常又或者是消費(fèi)者宕機(jī)。
針對上述三種消息丟失場景,RabbitMQ 提供了相應(yīng)的解決方案,confirm
消息確認(rèn)機(jī)制(生產(chǎn)者),消息持久化
機(jī)制(RabbitMQ 服務(wù)),ACK 事務(wù)
機(jī)制(消費(fèi)者)
針對生產(chǎn)者
confirm事務(wù)處理機(jī)制
RabbitMQ 提供了confirm
事務(wù)處理機(jī)制,允許生產(chǎn)者在發(fā)送消息時將操作包裝在一個事務(wù)中,以確保消息的可靠性傳遞。在 RabbitMQ 中,事務(wù)是通過通道(Channel)來實現(xiàn)的??梢酝ㄟ^以下步驟進(jìn)行事務(wù)處理:
- 開啟事務(wù):在生產(chǎn)者端,可以通過調(diào)用 Channel 的 tx_select 方法來開啟一個事務(wù)。這將啟動一個新的事務(wù),并將所有后續(xù)的消息發(fā)布操作放在該事務(wù)內(nèi)。
- 發(fā)送消息:接下來在事務(wù)中,可以正常發(fā)送消息。如果消息發(fā)送失敗,事務(wù)會自動回滾。
- 提交事務(wù):如果事務(wù)中所有消息發(fā)送成功后,需要提交事務(wù)??梢酝ㄟ^調(diào)用 Channel 的tx_commit方法提交事務(wù)。
- 處理異常:如果在事務(wù)過程中發(fā)生異常,可以使用 try/catch 快來捕獲異常。然后在異常處理過程中,調(diào)用 Channel 的 tx_rollback 方法來回滾 RabbitMQ 相關(guān)的事務(wù)操作。
需要注意的是,RabbitMQ 的事務(wù)處理是基于存儲過程的,它可以保證在事務(wù)中的操作要么全部成功,要么全部失敗。但是,由于 RabbitMQ 是一個異步的消息隊列系統(tǒng),事務(wù)處理可能會對其性能產(chǎn)生影響。因此,需要根據(jù)具體的應(yīng)用場景和需求來權(quán)衡是否需要使用事務(wù)以及如何使用事務(wù)。
如何使用confirm機(jī)制
在生產(chǎn)者開啟了confirm模式之后,每次寫的消息都會分配一個唯一的id,然后如果寫入了rabbitmq之中,rabbitmq會給你回傳一個ack
消息,告訴你這個消息發(fā)送OK了;如果rabbitmq沒能處理這個消息,會回調(diào)你一個nack
接口,告訴你這個消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個機(jī)制知道自己在內(nèi)存里維護(hù)每個消息的id,如果超過一定時間還沒接收到這個消息的回調(diào),那么你可以進(jìn)行重發(fā)。
// 開啟confirm
channel.confirm();
//發(fā)送成功回調(diào)
public void ack(String messageId){
}
// 發(fā)送失敗回調(diào)
public void nack(String messageId){
// 重發(fā)該消息
}
針對RabbitMQ
消息持久化
RabbitMQ 允許消息的持久化,以確保即使在 RabbitMQ 服務(wù)器重新啟動后,消息也不會丟失。
RabbitMQ 可以通過以下方式實現(xiàn)消息的持久化:
- 消息持久化:在 RabbitMQ 中,只需要在發(fā)送消息時,將delivery_mode屬性設(shè)置為 2,就可以將消息標(biāo)記為持久化。
- 隊列持久化:在 RabbitMQ 中聲明隊列時,也可以將隊列聲明為持久化。RabbitMQ 中的隊列分為三種不同類型經(jīng)典隊列,仲裁隊列和流式隊列。其中,經(jīng)典隊列需要將durable屬性設(shè)置為true。而仲裁隊列和流式隊列默認(rèn)必須持久化保存。
- 交換機(jī)持久化:與經(jīng)典隊列類似,RabbitMQ 也可以在聲明交換機(jī)時,將交換機(jī)的 durable 屬性設(shè)置為true,這樣就可以將交換機(jī)標(biāo)記為持久化。
集群模式
先來介紹下RabbitMQ三種部署模式:
- 單節(jié)點(diǎn)模式:最簡單的情況,非集群模式,節(jié)點(diǎn)掛了,消息就不能用了。業(yè)務(wù)可能癱瘓,只能等待。
- 普通模式:消息只會存在與當(dāng)前節(jié)點(diǎn)中,并不會同步到其他節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)宕機(jī),有影響的業(yè)務(wù)會癱瘓,只能等待節(jié)點(diǎn)恢復(fù)重啟可用(必須持久化消息情況下)。
- 鏡像模式:消息會同步到其他節(jié)點(diǎn)上,可以設(shè)置同步的節(jié)點(diǎn)個數(shù),但吞吐量會下降。屬于RabbitMQ的HA方案
為什么設(shè)置鏡像模式集群,因為隊列的內(nèi)容僅僅存在某一個節(jié)點(diǎn)上面,不會存在所有節(jié)點(diǎn)上面,所有節(jié)點(diǎn)僅僅存放消息結(jié)構(gòu)和元數(shù)據(jù)。
補(bǔ)償機(jī)制
為什么還要消息補(bǔ)償機(jī)制呢?雖然以上方案,基本可以保證消息的高可用不丟失的問題。
但是作為有追求的程序員來講,要絕對保證我的系統(tǒng)的穩(wěn)定性,有一種危機(jī)意識。
比如:持久化的消息,保存到硬盤過程中,當(dāng)前隊列節(jié)點(diǎn)掛了,存儲節(jié)點(diǎn)硬盤又壞了,消息丟了,怎么辦?
1)生產(chǎn)端首先將業(yè)務(wù)數(shù)據(jù)以及消息數(shù)據(jù)入庫,需要在同一個事務(wù)中,消息數(shù)據(jù)入庫失敗,則整體回滾。
2)根據(jù)消息表中消息狀態(tài),失敗則進(jìn)行消息補(bǔ)償措施,重新發(fā)送消息處理。
針對消費(fèi)者
ACK確認(rèn)機(jī)制
多個消費(fèi)者同時收取消息,比如消息接收到一半的時候,一個消費(fèi)者死掉了(邏輯復(fù)雜時間太長,超時了或者消費(fèi)被停機(jī)或者網(wǎng)絡(luò)斷開鏈接),如何保證消息不丟?
使用rabbitmq提供的ack機(jī)制,服務(wù)端首先關(guān)閉rabbitmq的自動ack,然后每次在確保處理完這個消息之后,在代碼里手動調(diào)用ack。這樣就可以避免消息還沒有處理完就ack。才把消息從內(nèi)存刪除。
這樣就解決了,即使一個消費(fèi)者出了問題,但不會同步消息給服務(wù)端,會有其他的消費(fèi)端去消費(fèi),保證了消息不丟的case。
總結(jié)
如果需要保證消息在整條鏈路中不丟失,那就需要生產(chǎn)端、mq自身與消費(fèi)端共同去保障。
生產(chǎn)者:對生產(chǎn)的消息進(jìn)行狀態(tài)標(biāo)記,開啟confirm
機(jī)制,依據(jù)mq的響應(yīng)來更新消息狀態(tài),使用定時任務(wù)重新投遞超時的消息,多次投遞失敗進(jìn)行報警。
RabbitMQ:開啟持久化
,并在落盤后再進(jìn)行ack。如果是鏡像部署模式,需要在同步到多個副本之后再進(jìn)行ack。
消費(fèi)者:開啟手動ack
模式,在業(yè)務(wù)處理完成后再進(jìn)行ack,并且需要保證冪等。
通過以上的處理,理論上不存在消息丟失的情況,但是系統(tǒng)的吞吐量以及性能有所下降。在實際開發(fā)中,需要考慮消息丟失的影響程度,來做出對可靠性以及性能之間的權(quán)衡。
RabbitMQ中如何解決消息堆積問題?
解決方案
- 消費(fèi)者處理消息的速度太慢
○ 增加消費(fèi)者數(shù)量:通過水平擴(kuò)展,增加消費(fèi)者的數(shù)量來提高處理能力。
○ 優(yōu)化消費(fèi)者性能:提高消費(fèi)者處理消息的效率,例如優(yōu)化代碼、增加資源。
○ 消息預(yù)取限制(prefetch count):調(diào)整消費(fèi)者的預(yù)取數(shù)量以避免一次處理過多消息而導(dǎo)致處理緩慢。 - 隊列的容量太小
○ 增加隊列的容量:調(diào)整隊列設(shè)置以允許更多消息存儲。 - 網(wǎng)絡(luò)故障
○ 監(jiān)控和告警:通過監(jiān)控網(wǎng)絡(luò)狀況并設(shè)置告警,確保在網(wǎng)絡(luò)故障時快速發(fā)現(xiàn)并解決問題。
○ 持久化和高可用性:確保消息和隊列的持久化以避免消息丟失,并使用鏡像隊列提高可用性。 - 消費(fèi)者故障
○ 使用死信隊列:將無法處理的消息轉(zhuǎn)移到死信隊列,防止堵塞主隊列。
○ 容錯機(jī)制:實現(xiàn)消費(fèi)者的自動重啟和錯誤處理邏輯。 - 隊列配置不當(dāng)
○ 優(yōu)化隊列配置:檢查并優(yōu)化消息確認(rèn)模式、隊列長度限制和其他相關(guān)配置。 - 消息大小
○ 消息分片:將大型消息分割成小的消息片段,加快處理速度。 - 業(yè)務(wù)邏輯復(fù)雜或耗時
○ 優(yōu)化業(yè)務(wù)邏輯:簡化消費(fèi)者中的業(yè)務(wù)邏輯,減少處理每個消息所需的時間。 - 消息產(chǎn)生速度快于消費(fèi)速度
○ 使用消息限流:控制消息的生產(chǎn)速度,確保它不會超過消費(fèi)者的處理能力。
○ 負(fù)載均衡:確保消息在消費(fèi)者之間公平分配,避免個別消費(fèi)者過載。 - 其他配置優(yōu)化
○ 消息優(yōu)先級:使用消息優(yōu)先級確保高優(yōu)先級消息優(yōu)先處理。
○ 調(diào)整RabbitMQ配置:優(yōu)化RabbitMQ服務(wù)的配置,如文件描述符限制、內(nèi)存使用限制等。
大量消息在MQ中積壓的解決方案
- 先修復(fù)consumer的問題,確保其恢復(fù)消費(fèi)速度,然后將現(xiàn)有consumer都停掉;
- 新建?個topic,partition是原來的10倍,臨時建?好原先10倍或者20倍的queue數(shù)量;
- 然后寫?個臨時的分發(fā)數(shù)據(jù)的consumer程序,這個程序部署上去消費(fèi)積壓的數(shù)據(jù);消費(fèi)之后不做耗時的處理,直接均勻輪詢寫?臨時建?好的10倍數(shù)量的queue;
- 接著臨時征?10倍的機(jī)器來部署consumer,每?批consumer消費(fèi)?個臨時queue的數(shù)據(jù);
- 這種做法相當(dāng)于是臨時將queue資源和consumer資源擴(kuò)?10倍,以正常的10倍速度來消費(fèi)數(shù)據(jù);
- 等快速消費(fèi)完積壓數(shù)據(jù)之后,得恢復(fù)原先部署架構(gòu),重新?原先的consumer機(jī)器來消費(fèi)消息。
總結(jié)
- 修復(fù)并停掉consumer;
- 新建?個topic,partition是原來的10倍,建?臨時queue,數(shù)量是原來的10倍或20倍;
- 寫臨時consumer程序,臨時征?10倍的機(jī)器去消費(fèi)數(shù)據(jù);
- 消費(fèi)完成之后,恢復(fù)原先consumer;
RabbitMQ中如何保證消息不被重復(fù)消費(fèi)?
什么情況會導(dǎo)致消息被重復(fù)消費(fèi)呢?
- 生產(chǎn)者:生產(chǎn)者可能會重復(fù)推送一條數(shù)據(jù)到 MQ 中,比如 Controller 接口被重復(fù)調(diào)用了 2 次,沒有做接口冪等性導(dǎo)致的;
- RabbitMQ:在消費(fèi)者消費(fèi)完準(zhǔn)備響應(yīng) ack 消息消費(fèi)成功時,MQ 突然掛了,導(dǎo)致 MQ 以為消費(fèi)者還未消費(fèi)該條數(shù)據(jù),MQ 恢復(fù)后再次推送了該條消息,導(dǎo)致了重復(fù)消費(fèi)。
- 消費(fèi)者:消費(fèi)者已經(jīng)消費(fèi)完消息,正準(zhǔn)備但是還未響應(yīng)給ack消息到時,此時消費(fèi)者掛了,服務(wù)重啟后 MQ 以為消費(fèi)者還沒有消費(fèi)該消息,再次推送了該條消息。
解決方案
- 使用數(shù)據(jù)庫唯一鍵約束
缺點(diǎn):局限性很大,僅僅只能用在我們數(shù)據(jù)新增場景,并且性能也比較低 - 使用樂觀鎖
假設(shè)是更新訂單狀態(tài),在發(fā)送的消息的時候帶上修改字段的版本號
缺點(diǎn):如果說更新字段比較多,并且更新場景比較多,可能會導(dǎo)致數(shù)據(jù)庫字段增加并且還有可能出現(xiàn)多條消息同時在隊列中此時他們修改字段版本號一致,排在后續(xù)的消息無法被消費(fèi) - 簡單的消息去重,插入消費(fèi)記錄,增加數(shù)據(jù)庫判斷
優(yōu)點(diǎn):很多場景下的確能起到不錯的效果
缺點(diǎn):
- 這個消費(fèi)者的代碼執(zhí)行需要1秒,重復(fù)消息在執(zhí)行期間(假設(shè)100毫秒)內(nèi)到達(dá)(例如生產(chǎn)者快速重發(fā),Broker重啟等),增加校驗的地方是不是還是沒數(shù)據(jù)(因為上一條消息還沒消費(fèi)完,沒有記錄)
- 那么就會穿透掉檢查的擋板,最后導(dǎo)致重復(fù)的消息消費(fèi)邏輯進(jìn)入到非冪等安全的業(yè)務(wù)代碼中,從而引發(fā)重復(fù)消費(fèi)的問題
- 并發(fā)消息去重基于消息冪等表
● 缺點(diǎn):如果說第一次消息投遞異常沒有消費(fèi)成功,并且沒有將消息狀態(tài)給置為成功或者沒有刪除消息表記錄,此時延時消費(fèi)每次執(zhí)行下列都是一直處于消費(fèi)中,最后消費(fèi)就會被視為消費(fèi)失敗而被投遞到死信Topic中
● 方案:插入的消息表必須要帶一個最長消費(fèi)過期時間,例如10分鐘
● 上述方案只需要一個存儲的中心媒介,那我們可以選擇更靈活的存儲中心媒介,比如Redis。使用Redis有兩個好處:
○ 性能上損耗更低
○ 上面我們講到的超時時間可以直接利用Redis本身的ttl實現(xiàn)文章來源:http://www.zghlxwxcb.cn/news/detail-840244.html
總結(jié)
- 利用數(shù)據(jù)庫唯一鍵約束
- 可以利用我們的樂觀鎖
- 插入消費(fèi)記錄
不丟和不重是矛盾的(在分布式場景下),總的來說,開發(fā)者根據(jù)業(yè)務(wù)的實際需求來選擇相應(yīng)的方式即可。
文章來源地址http://www.zghlxwxcb.cn/news/detail-840244.html
到了這里,關(guān)于RabbitMQ詳解與常見問題解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!