国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息隊(duì)列中間件 MetaQ/RocketMQ

這篇具有很好參考價(jià)值的文章主要介紹了消息隊(duì)列中間件 MetaQ/RocketMQ。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

推薦電子書(shū):云原生架構(gòu)白皮書(shū) 2022版-藏經(jīng)閣-阿里云開(kāi)發(fā)者社區(qū) (aliyun.com)

簡(jiǎn)介—— 消息隊(duì)列中間件 MetaQ/RocketMQ

中間件 MetaQ 是一種基于隊(duì)列模型的消息中間件,MetaQ 據(jù)說(shuō)最早是受 Kafka 的影響開(kāi)發(fā)的,第一版的名字?"metamorphosis",是奧地利作家卡夫卡的名作——《變形記》。RocketMQ 是 MetaQ 的開(kāi)源版本。?

消息隊(duì)列中間件一般用于在分布式場(chǎng)景下解決集群?jiǎn)螜C(jī)瓶頸的問(wèn)題。在傳統(tǒng)的分布式計(jì)算環(huán)境中,常常會(huì)出現(xiàn)由于某個(gè)單機(jī)節(jié)點(diǎn)的性能瓶頸,即使其他節(jié)點(diǎn)仍有余力,仍然會(huì)導(dǎo)致整個(gè)系統(tǒng)的性能無(wú)法進(jìn)一步提升的情況,這一現(xiàn)象通常是由于任務(wù)負(fù)載不均衡,網(wǎng)絡(luò)延遲等常見(jiàn)且難以解決的問(wèn)題。消息隊(duì)列本質(zhì)上是提供了一種非常合理的任務(wù)分配策略,通過(guò)將任務(wù)分給消費(fèi)者實(shí)現(xiàn)異步和分布式處理,提高整個(gè)集群的性能。

消息隊(duì)列(mq)的核心思想是將耗時(shí)的任務(wù)異步化,通過(guò)消息隊(duì)列緩存任務(wù),從而實(shí)現(xiàn)消息發(fā)送方和接收方的解耦,使得任務(wù)的處理能夠異步、并行,從而提高系統(tǒng)或集群的吞吐量和可擴(kuò)展性。在這個(gè)過(guò)程中,整個(gè)系統(tǒng)強(qiáng)依賴(lài)于消息隊(duì)列,起到類(lèi)似橋梁的作用。消息隊(duì)列有著經(jīng)典的三大應(yīng)用場(chǎng)景:解耦、異步削峰填谷。?

解耦場(chǎng)景:消息隊(duì)列一般使用發(fā)布/訂閱的模型,如果服務(wù) B C D 依賴(lài)服務(wù) A 的消息,此時(shí)新增服務(wù) E 也需要依賴(lài) A ,而 B 服務(wù)不再需要消息,需要頻繁且復(fù)雜的業(yè)務(wù)改造,效率低,穩(wěn)定性差,此時(shí)引入消息隊(duì)列進(jìn)行解耦,服務(wù) A 只需要將產(chǎn)生的消息發(fā)布到 mq 中,就不用管了,其它服務(wù)會(huì)自己根據(jù)需要訂閱 mq 中的消息,或者說(shuō)去 mq 中消費(fèi),這就使得每個(gè)服務(wù)可以更多地關(guān)注自身業(yè)務(wù),而不需要把精力用在維護(hù)服務(wù)之間的關(guān)系上,可擴(kuò)展性提高。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

異步場(chǎng)景:如用戶(hù)的業(yè)務(wù)需要一系列的服務(wù)進(jìn)行處理,按順序處理的話,用戶(hù)需要等待的時(shí)間過(guò)長(zhǎng)。例如電商平臺(tái)的用戶(hù)下單、支付、積分、郵件、短信通知等流程,長(zhǎng)時(shí)間等待用戶(hù)無(wú)法接受,就可以通過(guò) mq 進(jìn)行服務(wù)的異步處理,例如積分、郵件和短信通知服務(wù)訂閱了支付服務(wù)的消息,將支付完成作為消息發(fā)布到 mq ,這些服務(wù)就可以同時(shí)對(duì)這一訂單進(jìn)行處理,降低了請(qǐng)求等待時(shí)間(rt) 。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

削峰填谷場(chǎng)景:削峰表示的含義是,流量如果太大,就控制服務(wù)器處理的 QPS,不要讓大流量打掛數(shù)據(jù)庫(kù)等導(dǎo)致服務(wù)器宕機(jī),讓服務(wù)處理請(qǐng)求更加平緩,節(jié)省服務(wù)器資源,其本質(zhì)上是控制用戶(hù)的請(qǐng)求速率,或是延緩或是直接拒絕。填谷的含義是將階段性的大流量請(qǐng)求緩存起來(lái),在流量平緩的時(shí)候慢慢處理,防止過(guò)多的請(qǐng)求被拒絕后的重試導(dǎo)致更大的流量。mq 很適合這一場(chǎng)景,QPS 超出服務(wù)端接收請(qǐng)求的能力時(shí),服務(wù)端仍然保持在安全范圍內(nèi)地從消息隊(duì)列中獲取消息進(jìn)行處理,多余的消息會(huì)積壓在消息隊(duì)列中,或由于超時(shí)直接拒絕,到 QPS 低于這一閾值的時(shí)候,這些積壓的消息就會(huì)被逐漸消費(fèi)掉。相當(dāng)于在系統(tǒng)前修建了一個(gè)流量蓄水池。?

除此之外還可以利用消息隊(duì)列進(jìn)行消息通信,日志處理等業(yè)務(wù),但消息隊(duì)列也會(huì)引入系統(tǒng)可用性,系統(tǒng)復(fù)雜度,數(shù)據(jù)一致性等問(wèn)題(強(qiáng)依賴(lài)消息隊(duì)列的正確執(zhí)行,需要確保消息不會(huì)丟失,確保消息的順序性等)。這意味著如果系統(tǒng)中的消息隊(duì)列承擔(dān)著重要的角色,那么消息隊(duì)列的可靠性和穩(wěn)定性也至關(guān)重要,本文介紹的 MetaQ/RocketMQ 是側(cè)重于維持消息一致性和高可靠性的消息隊(duì)列中間件。

物理架構(gòu)

MetaQ 的高可用性是基于其物理部署架構(gòu)實(shí)現(xiàn)的,在生產(chǎn)者為消息定義了一個(gè) topic 之后,消費(fèi)者可以訂閱這個(gè) topic ,于是消息就有了從生產(chǎn)到消費(fèi)的路由指向。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

NameServer?負(fù)責(zé)暴露消息的 topic ,因此可以以將 NameServer 理解成一個(gè)注冊(cè)中心,用來(lái)關(guān)聯(lián) topic 和對(duì)應(yīng)的 broker ,即消息的存儲(chǔ)位置。NameServer 的每個(gè)節(jié)點(diǎn)都維護(hù)著 topic 和 broker 的映射關(guān)系,每個(gè)節(jié)點(diǎn)彼此獨(dú)立,無(wú)同步。在每個(gè)NameServer節(jié)點(diǎn)內(nèi)部都維護(hù)著所有?Broker?的地址列表,所有?Topic?和 Topic 對(duì)應(yīng)?Queue?的信息等。消息生產(chǎn)者在發(fā)送消息之前先與任意一臺(tái) NameServer 建立連接,獲取 Broker 服務(wù)器的地址列表,然后根據(jù)負(fù)載均衡算法從列表中選擇一臺(tái)消息服務(wù)器發(fā)送消息。

Broker?主要負(fù)責(zé)消息的存儲(chǔ)和轉(zhuǎn)發(fā),分為?master?和?slave,是一寫(xiě)多讀的關(guān)系。broker 節(jié)點(diǎn)可以按照處理的數(shù)據(jù)相同劃分成副本組,同一組 master 和 slave 的關(guān)系可以通過(guò)指定相同 brokerName,不同的 brokerId 來(lái)定義,brokerId 為 0 標(biāo)識(shí) master,非 0 是 slave。每個(gè) broker 服務(wù)器會(huì)與 NameServer 集群建立長(zhǎng)連接(注意是跟所有的 NameServer 服務(wù)器,因?yàn)?NameServer 彼此之間獨(dú)立不同步),并且會(huì)注冊(cè) topic 信息到 NameServer 中。復(fù)制策略是 Broker 的 Master 與 Slave 間的數(shù)據(jù)同步方式,分為同步復(fù)制與異步復(fù)制。由于異步復(fù)制、異步刷盤(pán)可能會(huì)丟失少量消息,因此 Broker 默認(rèn)采用的是同步雙寫(xiě)的方式,消息寫(xiě)入 master 成功后,master 會(huì)等待 slave 同步數(shù)據(jù)成功后才向 Producer 返回成功 ACK ,即 Master 與 Slave 都要寫(xiě)入成功后才會(huì)返回成功 ACK 。這樣可以保證消息發(fā)送時(shí)消息不丟失。副本組中,各個(gè)節(jié)點(diǎn)處理的速度不同,也就有了日志水位的概念 (高水位對(duì)消費(fèi)者不可見(jiàn))。在 master 宕機(jī)時(shí),同步副本集中的其余節(jié)點(diǎn)會(huì)自動(dòng)選舉出新的 master 代替工作(Raft 協(xié)議)。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

Producer,消息生產(chǎn)者,與 NameServer 隨機(jī)一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)從 NameServer 獲取 topic 路由信息,與 master broker 建立長(zhǎng)連接,定時(shí)發(fā)送心跳,Producer 只與 master 建立連接產(chǎn)生通信,不與 slave 建立連接。生產(chǎn)者和消費(fèi)者都有組(Group)的概念,同一組節(jié)點(diǎn)的生產(chǎn)/消費(fèi)邏輯相同。?

Consumer,消息消費(fèi)者,與 NameServer 隨機(jī)一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)從 NameServer 獲取 topic 的路由信息,并獲取想要消費(fèi)的 queue ??梢院吞峁┓?wù)的 master 或 slave 建立長(zhǎng)連接,定時(shí)向 master 和 slave 發(fā)送心跳,既可以從 master 訂閱消息,也可以從 slave 訂閱消息。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

消息的存儲(chǔ)

MetaQ 將消息存儲(chǔ)(持久化)到位于生產(chǎn)者和消費(fèi)者之間的一個(gè)消息代理(Message Broker)上。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

MetaQ 消息模型:

  • Message 單位消息;

  • Topic 消息的類(lèi)型,生產(chǎn)者對(duì)應(yīng)消費(fèi)者的分區(qū)標(biāo)識(shí);

  • Tag 消息在相同 Topic 時(shí)的二級(jí)分類(lèi)標(biāo)識(shí),可用于消息的篩選;

  • Queue 物理分區(qū),一個(gè) Topic 對(duì)應(yīng)多個(gè) Queue;

  • Group 生產(chǎn)者或消費(fèi)者的邏輯分組,同一個(gè) Group 的 生產(chǎn)者/消費(fèi)者 通常 生產(chǎn)/消費(fèi) 同一類(lèi)消息,并且 生產(chǎn)/消費(fèi) 的邏輯一致;

  • Offset:偏移值, 表示消費(fèi)到的位置或待消費(fèi)的消息位置;

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

消息的存儲(chǔ)方式對(duì)消息隊(duì)列的性能有很大影響,如 ActiveMQ 會(huì)使用隊(duì)列表來(lái)存儲(chǔ)消息,依靠輪訓(xùn)、加鎖等方式檢查和處理消息,但對(duì)于 QPS 很高的系統(tǒng)來(lái)說(shuō),一下子積壓龐大的數(shù)據(jù)量在表中會(huì)導(dǎo)致 B+ 樹(shù)索引層級(jí)加深,影響查詢(xún)效率。KV 數(shù)據(jù)庫(kù)采用如 LSM 樹(shù)作為索引結(jié)構(gòu),對(duì)讀性能有較大的犧牲,這對(duì)于消息隊(duì)列而言很難接受,因?yàn)橄㈥?duì)列常常需要面對(duì)消費(fèi)失敗需要重試的情況。?

?RocketMQ/Kafka/RabbitMQ?等消息隊(duì)列會(huì)采用順序?qū)懙娜罩窘Y(jié)構(gòu),將消息刷盤(pán)至文件系統(tǒng)作持久化。順序?qū)懭罩疚募梢员苊忸l繁的隨機(jī)訪問(wèn)而導(dǎo)致的性能問(wèn)題,而且利于延遲寫(xiě)入等優(yōu)化手段,能夠快速保存日志。Kafka 會(huì)為每個(gè) topic (事件的組織和存儲(chǔ)單位,一個(gè) topic 可以對(duì)應(yīng)多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者) 劃分出一個(gè)分區(qū)日志,便于根據(jù) topic 順序消費(fèi),消息被讀取后不會(huì)立刻刪除,可以持久存儲(chǔ),但 topic 數(shù)量增加的時(shí)候,broker 的分區(qū)文件數(shù)量增大,會(huì)使得本來(lái)速度很快的順序?qū)懽兂呻S機(jī)寫(xiě)(不同文件之間移動(dòng)),性能大幅下降。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

MetaQ 2.0 對(duì)這部分進(jìn)行重新設(shè)計(jì),其存儲(chǔ)結(jié)構(gòu)主要包括?CommitLog?和?Consume queue?兩部分。?

CommitLog 是物理存儲(chǔ),存儲(chǔ)不定長(zhǎng)的完整消息記錄,邏輯上是完全連續(xù)的一個(gè)文件,物理上單個(gè)文件大小是 1 GB,文件名是當(dāng)前文件首地址在 CommitLog 中的偏移量。只要 CommitLog 落盤(pán),就可以認(rèn)為已經(jīng)接收到消息,即使 Cosume queue 丟失,也可以從 CommitLog 恢復(fù)。而所有 topic 的消息都會(huì)存儲(chǔ)在同一個(gè) CommitLog 中來(lái)保證順序?qū)?。這樣的結(jié)構(gòu)會(huì)導(dǎo)致 CommitLog 讀取完全變成隨機(jī)讀,所以需要 Consume queue 作為索引隊(duì)列 (offset, size, tag),每個(gè) topic-queue 的消息在寫(xiě)完 CommitLog 之后,都會(huì)寫(xiě)到獨(dú)立的 Consume queue ,隊(duì)列里的每個(gè)元素都是定長(zhǎng)的元數(shù)據(jù),內(nèi)容包含該消息在對(duì)應(yīng) CommitLog 的 offset 和 size ,還包括 tagcode 可支持消息按照指定 tag 進(jìn)行過(guò)濾。順序?qū)懯?MetaQ 實(shí)現(xiàn)高性能的基礎(chǔ)。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

基于這樣的存儲(chǔ)結(jié)構(gòu),MetaQ 對(duì)客戶(hù)端暴露的主要是 Consume queue 邏輯視圖,提供隊(duì)列訪問(wèn)接口。消費(fèi)者通過(guò)指定 Consume queue 的位點(diǎn)來(lái)讀取消息,通過(guò)提交 Consume queue 的位點(diǎn)來(lái)維護(hù)消費(fèi)進(jìn)度。Concume queue 每個(gè)條目長(zhǎng)度固定(8個(gè)字節(jié)CommitLog物理偏移量、4字節(jié)消息長(zhǎng)度、8字節(jié)tag哈希碼),單個(gè) ConsumeQueue 文件默認(rèn)最多包括 30 萬(wàn)個(gè)條目。這樣做的好處是隊(duì)列非常輕量級(jí),Consume Queue 非常小,且在消費(fèi)過(guò)程中都是順序讀取,其速度幾乎能與內(nèi)存讀寫(xiě)相比,而在 page cache 和良好的空間局部性作用下,CommitLog 的訪問(wèn)也非??焖?。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

MetaQ 會(huì)啟動(dòng)一個(gè)定時(shí)服務(wù) ReputMessageService 定時(shí)調(diào)用(間隔 1ms)來(lái)生成 Consume queue 和 其它索引文件。

Consume queue 解決了順序消費(fèi)的問(wèn)題,但如果需要根據(jù)屬性進(jìn)行篩選,就必須用到?index 索引。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

index 索引支持根據(jù) key 值進(jìn)行篩選,查找時(shí),可以根據(jù)消息的 key 計(jì)算 hash 槽的位置,hash 槽中存儲(chǔ)著 Index 條目的位置,可以根據(jù)這個(gè) index 條目獲得一個(gè)鏈表(尾),每個(gè) index 條目包含在 CommitLog 上的消息主體的物理偏移量。

消息鏈路

MetaQ 的消息可以根據(jù) topic-queue 劃分出確定的從生產(chǎn)者到消費(fèi)者路由指向。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

1.producer 指定 broker 和 queue 發(fā)送消息 msg ;

2.broker 接收消息,并完成緩存、刷盤(pán)和生成摘要(同時(shí)根據(jù) tag 和 user properties 對(duì) msg 進(jìn)行打標(biāo))等操作;

3.consumer 每隔一段時(shí)間( pullInterval )從 broker 端的(根據(jù)服務(wù)端消息過(guò)濾模式 tag 或 sql 過(guò)濾后)獲取一定量的消息到本地消息隊(duì)列中(單線程)

4.consumer 按照配置并發(fā)分配上述隊(duì)列消息并執(zhí)行消費(fèi)方法;

5.consumer 返回 broker 消費(fèi)結(jié)果并重置消費(fèi)位點(diǎn);

生產(chǎn)者

Topic 是消息的主題,每個(gè) topic 對(duì)應(yīng)多個(gè)隊(duì)列,多個(gè)隊(duì)列會(huì)均勻的分布在多個(gè) broker 上,Producer 發(fā)送的消息在 broker 上會(huì)均衡的分布在多個(gè)隊(duì)列中,Producer 發(fā)送消息時(shí)在多個(gè)隊(duì)列間輪詢(xún)確保消息的均衡。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

??

發(fā)送消息的具體操作如下:

1、查詢(xún)本地緩存是否存儲(chǔ)了 TopicPublishInfo ,否則從 NameServer 獲取

2、根據(jù)負(fù)載均衡選擇策略獲取待發(fā)送隊(duì)列并輪訓(xùn)訪問(wèn)

3、獲取消息隊(duì)列對(duì)應(yīng)的 broker 實(shí)際 IP

4、設(shè)置消息 Unique ID ,zip 壓縮消息

5、消息校驗(yàn)(長(zhǎng)度等),發(fā)送消息

Producer 發(fā)送的每條消息都包含一個(gè) Topic,表示一類(lèi)消息的集合。同時(shí)還有一個(gè) Tag,用于區(qū)分同一Topic 下不同類(lèi)型的消息。一個(gè) Topic 包括多個(gè) Queue,每個(gè) Queue 中存放該 Topic 對(duì)應(yīng)消息的位置。一個(gè) Topic 的 Queue 相當(dāng)于該 Topic 中消息的分區(qū),Queue 可以存儲(chǔ)在不同的 Broker 上。發(fā)送消息時(shí),Producer 通過(guò)負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊(duì)列進(jìn)行消息投遞。

消息發(fā)送時(shí)如果出現(xiàn)失敗,默認(rèn)會(huì)重試 2 次,在重試時(shí)會(huì)盡量避開(kāi)剛剛接收失敗的 Broker,而是選擇其它 Broker 上的隊(duì)列進(jìn)行發(fā)送,從而提高消息發(fā)送的成功率。

消費(fèi)者

消費(fèi)方式

  • 廣播消費(fèi):Producer 向一些隊(duì)列輪流發(fā)送消息,隊(duì)列集合稱(chēng)為 Topic,每一個(gè) Consumer 實(shí)例消費(fèi)這個(gè) Topic 對(duì)應(yīng)的所有隊(duì)列。

  • 集群消費(fèi):多個(gè) Consumer 實(shí)例平均消費(fèi)這個(gè) Topic 對(duì)應(yīng)的隊(duì)列集合。

MetaQ 消費(fèi)者端有多套負(fù)載均衡算法的實(shí)現(xiàn),比較常見(jiàn)的是平均分配和平均循環(huán)分配,默認(rèn)使用平均分配算法,給每個(gè) Consumer 分配均等的隊(duì)列。一個(gè) Consumer 可以對(duì)應(yīng)多個(gè)隊(duì)列,而一個(gè)隊(duì)列只能給一個(gè) Consumer 進(jìn)行消費(fèi),Consumer 和隊(duì)列之間是一對(duì)多的關(guān)系。?

集群模式下有一點(diǎn)需要注意:消費(fèi)隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想,一個(gè)消息隊(duì)列同時(shí)只允許被一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多個(gè)消費(fèi)隊(duì)列。因此當(dāng) Consumer 的數(shù)量大于隊(duì)列的數(shù)量,會(huì)有部分 Consumer 分配不到隊(duì)列,這些分配不到隊(duì)列的 Consumer 機(jī)器不會(huì)有消息到達(dá)。?

平均分配算法舉例:

  • 如果有 5 個(gè)隊(duì)列,2 個(gè) consumer,consumer1 會(huì)分配 3 個(gè)隊(duì)列,consumer2 分配 2 個(gè)隊(duì)列;

  • 如果有 6 個(gè)隊(duì)列,2 個(gè) consumer,consumer1 會(huì)分配 3 個(gè)隊(duì)列,consumer2 也會(huì)分配 3 個(gè)隊(duì)列;

  • 如果 10 個(gè)隊(duì)列,11 個(gè) consumer,consumer1~consumer10 各分配一個(gè)隊(duì)列,consumer11 無(wú)隊(duì)列分配;

如果消費(fèi)集群規(guī)模較大:例如 topic 隊(duì)列資源是 128 個(gè),而消費(fèi)機(jī)器數(shù)有 160 臺(tái),按照一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)集群中一臺(tái)機(jī)器處理的原則,會(huì)有 32 臺(tái)機(jī)器不會(huì)收到消息,此種情況需要聯(lián)系 MetaQ 人員進(jìn)行擴(kuò)容評(píng)估。?

消費(fèi)重試:當(dāng)出現(xiàn)消費(fèi)失敗的消息時(shí),Broker 會(huì)為每個(gè)消費(fèi)者組設(shè)置一個(gè)重試隊(duì)列。當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列會(huì)自動(dòng)進(jìn)行消費(fèi)重試。達(dá)到最大重試次數(shù)后,若消費(fèi)仍然失敗,此時(shí)會(huì)將該消息發(fā)送到死信隊(duì)列。對(duì)于死信消息,通常需要開(kāi)發(fā)人員進(jìn)行手動(dòng)處理。?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

在消費(fèi)時(shí)間過(guò)程中可能會(huì)遇到消息消費(fèi)隊(duì)列增加和減少、消息消費(fèi)者增加或減少,此時(shí)需要對(duì)消息消費(fèi)隊(duì)列進(jìn)行重新平衡,既重新分配 (rebalance),這就是所謂的重平衡機(jī)制。在 RocketMQ 中,每隔 20s 會(huì)根據(jù)當(dāng)前隊(duì)列數(shù)量、消費(fèi)者數(shù)量重新進(jìn)行隊(duì)列負(fù)載計(jì)算,如果計(jì)算出來(lái)的結(jié)果與當(dāng)前不一樣,則觸發(fā)消息消費(fèi)隊(duì)列的重分配。?

Consumer 啟動(dòng)時(shí)會(huì)啟動(dòng)定時(shí)器,還執(zhí)行一些定時(shí)同步任務(wù),包括:同步 nameServer 地址,從 nameServer 同步 topic 的路由信息,清理 offline 的 broker,并向所有 broker 發(fā)送心跳,分配給當(dāng)前 consumer 的每個(gè)隊(duì)列將最新消費(fèi)的 offset 同步給 broker。

消息消費(fèi)過(guò)程淺析

三個(gè)關(guān)鍵服務(wù):?RebalanceService、PullMessageService、MessageConsumeService ? ?

RebalanceService 負(fù)載均衡服務(wù)

定時(shí)執(zhí)行一次負(fù)載均衡(20 s)分配消息隊(duì)列給消費(fèi)者。負(fù)載均衡針對(duì)每個(gè) topic 獨(dú)立進(jìn)行,具體如下:

 
private void rebalanceByTopic(final String topic, final boolean isOrder) {        switch (messageModel) {            case BROADCASTING: {                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);                if (mqSet != null) {                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);//廣播模式下每個(gè)消費(fèi)者要消費(fèi)所有 queue 的消息????????????????????if?(changed)?{????????????????????????this.messageQueueChanged(topic,?mqSet,?mqSet);????????????????????????log.info("messageQueueChanged?{}?{}?{}?{}",????????????????????????????consumerGroup,????????????????????????????topic,????????????????????????????mqSet,????????????????????????????mqSet);????????????????????}????????????????}?else?{????????????????????log.warn("doRebalance,?{},?but?the?topic[{}]?not?exist.",?consumerGroup,?topic);????????????????}????????????????break;????????????}????????????case?CLUSTERING:?{????????????????Set<MessageQueue>?mqSet?=?this.topicSubscribeInfoTable.get(topic);//找到該topic下的消息隊(duì)列集合????????????????List<String>?cidAll?=?this.mQClientFactory.findConsumerIdList(topic,?consumerGroup);//找到給消費(fèi)者組下的所有消費(fèi)者id????????????????if?(null?==?mqSet)?{????????????????????if?(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))?{????????????????????????log.warn("doRebalance,?{},?but?the?topic[{}]?not?exist.",?consumerGroup,?topic);????????????????????}????????????????}????????????????if?(null?==?cidAll)?{????????????????????log.warn("doRebalance,?{}?{},?get?consumer?id?list?failed",?consumerGroup,?topic);????????????????}????????????????????????????????if?(mqSet?!=?null?&&?cidAll?!=?null)?{????????????????????List<MessageQueue>?mqAll?=?new?ArrayList<MessageQueue>();????????????????????mqAll.addAll(mqSet);
????????????????????Collections.sort(mqAll);????????????????????Collections.sort(cidAll);????????????????????????????????????????AllocateMessageQueueStrategy?strategy?=?this.allocateMessageQueueStrategy;????????????????????????????????????????List<MessageQueue>?allocateResult?=?null;????????????????????try?{????????????????????????allocateResult?=?strategy.allocate(????????????????????????????this.consumerGroup,????????????????????????????this.mQClientFactory.getClientId(),????????????????????????????mqAll,????????????????????????????cidAll);//?根據(jù)分配策略進(jìn)行分配????????????????????}?catch?(Throwable?e)?{????????????????????????log.error("AllocateMessageQueueStrategy.allocate?Exception.?allocateMessageQueueStrategyName={}",?strategy.getName(),????????????????????????????e);????????????????????????return;????????????????????}????????????????????????????????????????Set<MessageQueue>?allocateResultSet?=?new?HashSet<MessageQueue>();????????????????????if?(allocateResult?!=?null)?{????????????????????????allocateResultSet.addAll(allocateResult);????????????????????}????????????????????boolean?changed?=?this.updateProcessQueueTableInRebalance(topic,?allocateResultSet,?isOrder);//?更新處理隊(duì)列表????????????????????????????????????????if?(changed)?{????????????????????????log.info(????????????????????????????"rebalanced?result?changed.?allocateMessageQueueStrategyName={},?group={},?topic={},?clientId={},?mqAllSize={},?cidAllSize={},?rebalanceResultSize={},?rebalanceResultSet={}",????????????????????????????strategy.getName(),?consumerGroup,?topic,?this.mQClientFactory.getClientId(),?mqSet.size(),?cidAll.size(),????????????????????????????allocateResultSet.size(),?allocateResultSet);????????????????????????this.messageQueueChanged(topic,?mqSet,?allocateResultSet);????????????????????}????????????????}????????????????break;????????????}????????????default:????????????????break;????????}    }
?

這里主要做了幾件事:

  • 判斷消費(fèi)模式

  • 廣播模式

i.找到 topic 下的消息隊(duì)列(queue)集合

ii.更新處理隊(duì)列表

  • 集群模式

i.找到 topic 下的消息隊(duì)列集合

ii.找到消費(fèi)者組下所有消費(fèi)者 id

iii.根據(jù)分配策略進(jìn)行分配

iv.更新處理隊(duì)列表,開(kāi)始真正拉取消息請(qǐng)求

消費(fèi)者會(huì)將消費(fèi)位點(diǎn)更新到 NameServer 上,Rebalance 發(fā)生時(shí),讀取消費(fèi)者的消費(fèi)位點(diǎn)信息,需要注意在消費(fèi)者數(shù)量大于隊(duì)列數(shù)量的情況下,如果消費(fèi)者不及時(shí)更新消費(fèi)位點(diǎn)信息,可能會(huì)導(dǎo)致消息被重復(fù)消費(fèi)。因此,消費(fèi)者需要及時(shí)更新消費(fèi)位點(diǎn)信息,確保消費(fèi)進(jìn)度正確。

Consumer 創(chuàng)建的時(shí)候 Rebalance 會(huì)被執(zhí)行。整個(gè) rebalanceService 的作用就是不斷的通過(guò)負(fù)載均衡,重新分配隊(duì)列的過(guò)程。根據(jù)分配好的隊(duì)列構(gòu)建拉取消息的請(qǐng)求,然后放到 pullRequestQueue 中。

PullMessageService 拉取消息服務(wù)

首先拉取消息時(shí)最重要的是確定偏移量?offset,這存儲(chǔ)在消費(fèi)者端的 OffsetStore 對(duì)象中。

if?(this.defaultMQPushConsumer.getOffsetStore()?!=?null)?{??????????this.offsetStore?=?this.defaultMQPushConsumer.getOffsetStore();????????}?else?{??????????switch?(this.defaultMQPushConsumer.getMessageModel())?{????????????case?BROADCASTING:??????????????this.offsetStore?=?new?LocalFileOffsetStore(this.mQClientFactory,?this.defaultMQPushConsumer.getConsumerGroup());??????????????break;????????????case?CLUSTERING:??????????????this.offsetStore?=?new?RemoteBrokerOffsetStore(this.mQClientFactory,?this.defaultMQPushConsumer.getConsumerGroup());??????????????break;????????????default:??????????????break;??????????}}this.offsetStore.load();

可以看到廣播模式和集群模式的對(duì)象類(lèi)型不同,這是因?yàn)閷?duì) offset 的維護(hù)的方式不一樣,在 load 的時(shí)候 LocalFileOffsetStore 會(huì)從本地文件加載這個(gè) offset,而 RemoteBrokerOffsetStore 的 load 函數(shù)是空的。

兩種對(duì)象類(lèi)型分別有 readOffset 函數(shù)支持從內(nèi)存中獲取 offset 值,以及分別從本地文件存儲(chǔ)和 broker 獲取 offset。需要注意集群模式下消費(fèi)者只需要關(guān)心 broker 上維護(hù)的消費(fèi)進(jìn)度,因?yàn)椴徽?queue 和 消費(fèi)者的映射關(guān)系如何切換, 只有 offset 之后的未消費(fèi)消息是消費(fèi)者需要關(guān)心的。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

消息的拉取過(guò)程是一個(gè)不斷循環(huán)的生產(chǎn)者消費(fèi)者模型,一個(gè) PullRequest 就對(duì)應(yīng)一個(gè)拉取任務(wù),并和一對(duì)MessageQueue(保存 Consume queue 的信息)和 ProcessQueue 關(guān)聯(lián),消息拉取的過(guò)程中,PullMessageService 拉取線程不停的讀取 PullRequestQueue 根據(jù) PullRequest 拉取消息。拉取到消息時(shí),消息提交到 ProcessQueue 中并新建 ConsumeRequest 提交到 ConsumeService 處理, 然后生成下一批的 PullRequest 丟到 PullRequestQueue。如果沒(méi)有拉取到消息或出現(xiàn)異常,則會(huì)重新將請(qǐng)求放回拉取隊(duì)列。ProcessQueue 中以 TreeMap 形式保存待處理的消息, key 為消息對(duì)應(yīng)的 offset ,并自動(dòng)進(jìn)行排序。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

消息拉取過(guò)程:

1.PullMessageService 不斷循環(huán)遍歷,從 PullRequestQueue 中提取 PullRequest,根據(jù) nextOffset 去 broker 拉取消息,若該隊(duì)列 已經(jīng) dropped 則更新 offset 到 broker 并丟棄此拉消息請(qǐng)求。

2.PullMessageService 異步拉取消息,同時(shí)將 PullRequest 封裝在 PullCallback 中,PullCallback 封裝在 ResponseFuture中,并以自增的請(qǐng)求 id 為鍵,ResponseFuture 為值放入 ResponseTable 中。

3.Broker 收到請(qǐng)求,如果 offset 之后有新的消息會(huì)立即發(fā)送異步響應(yīng);否則等待直到 producer 有新的消息發(fā)送后返回或者超時(shí)。如果通信異?;蛘?Broker 超時(shí)未返回響應(yīng),nettyClient 會(huì)定時(shí)清理超時(shí)的請(qǐng)求,釋放 PullRequest 回到 PullRequestQueue。

4.用最新的 offset 更新 ResponseFuture 里的 PullRequest 并推送給 PullRequestQueue 里以進(jìn)行下一次拉取。批量拉取到的消息分批提交給 consumeExecutor 線程處理。

消費(fèi)控速

MetaQ 為消費(fèi)者端拉取消息提供了消費(fèi)控速的能力:

  • 主動(dòng)控速,在整個(gè)消費(fèi)過(guò)程中我們可以發(fā)現(xiàn),如果想要做到流控,一個(gè)是控制生成 PullRequest 的時(shí)間間隔,一個(gè)是控制生成新一批的請(qǐng)求數(shù)量,因此 MetaQ 提供了兩個(gè)參數(shù)給我們 pullInterval、pullBatchSize ,主動(dòng)控速的邏輯是通過(guò)控制消息的拉取速度來(lái)達(dá)到降低速率的效果。

  • 被動(dòng)控速,這種流量控制的方式要復(fù)雜得多,需要用戶(hù)在消費(fèi)消息時(shí)控制流量 (sentinel),由于消費(fèi)線程池的待消費(fèi)隊(duì)列的消息達(dá)到一定閾值之后,MetaQ 會(huì)被動(dòng)降低 PullRequest 的產(chǎn)生的速率,因此當(dāng)采用流量控制手段通過(guò)埋點(diǎn)降低消費(fèi)速度時(shí),待消費(fèi)隊(duì)列會(huì)逐漸占滿(mǎn),觸發(fā)降速機(jī)制;為什么不直接用 sentinel??因?yàn)?sentinel 快速失敗等策略觸發(fā)限流后會(huì)產(chǎn)生大量重試,重試消息會(huì)進(jìn)入重試隊(duì)列,當(dāng)重試的量逐漸增大,broker 上重試隊(duì)列中消息量也越來(lái)越多,并且重試消息再次投遞時(shí)還可能再次發(fā)生重試,又重新進(jìn)入重試隊(duì)列,同一條消息反復(fù)進(jìn)出隊(duì)列,這種無(wú)意義的重復(fù)動(dòng)作會(huì)增加 broker 的壓力。

消息種類(lèi)

普通消息

可選擇同步、異步或單向發(fā)送。同步:Producer 發(fā)出一條消息后,會(huì)在收到 MQ 返回的 ACK 之后再發(fā)送下一條消息。異步:Producer 發(fā)出消息后無(wú)需等待 MQ 返回 ACK ,直接發(fā)送下一條消息。單向: Producer 僅負(fù)責(zé)發(fā)送消息,不等待,MQ 也不返回 ACK。

順序消息

消息的順序性分為兩種:

  • 全局順序:對(duì)于指定的一個(gè) Topic ,所有消息按照嚴(yán)格的先入先出的順序進(jìn)行發(fā)布和消費(fèi) (同一個(gè) queue)。

  • 分區(qū)順序:對(duì)于一個(gè)指定的 Topic ,所有消息根據(jù) sharding key 進(jìn)行分區(qū),同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi),分區(qū)之間彼此獨(dú)立。

?MetaQ 只支持同一個(gè) queue 的順序消息,且同一個(gè) queue 只能被一臺(tái)機(jī)器的一個(gè)線程消費(fèi),如果想要支持全局消息,那需要將該 topic 的 queue 的數(shù)量設(shè)置為 1,犧牲了可用性。

消息事務(wù)

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

1.發(fā)送方向 MQ 服務(wù)端發(fā)送消息。

2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。

3.發(fā)送方開(kāi)始執(zhí)行本地事務(wù)邏輯。

4.發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。

5.在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過(guò)固定時(shí)間后 MQ Server 將對(duì)該消息發(fā)起消息回查。

6.發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

7.發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟 4 對(duì)半消息進(jìn)行操作。?

MetaQ 3.0 以后,新的版本提供更加豐富的功能,支持消息屬性、無(wú)序消息、延遲消息、廣播消息、長(zhǎng)輪詢(xún)消費(fèi)、高可用特性,這些功能基本上覆蓋了大部分應(yīng)用對(duì)消息中間件的需求。除了功能豐富之外,MetaQ 基于順序?qū)懀蟾怕薯樞蜃x的隊(duì)列存儲(chǔ)結(jié)構(gòu)和 pull 模式的消費(fèi)方式,使得 MetaQ 具備了最快的消息寫(xiě)入速度和百億級(jí)的堆積能力,特別適合用來(lái)削峰填谷。在 MetaQ 3.0 版本的基礎(chǔ)上,衍生了開(kāi)源版本 RocketMQ。

高可用

如何做到不重復(fù)消費(fèi)也不丟失消息?

重復(fù)消費(fèi)問(wèn)題

  • 發(fā)送時(shí)消息重復(fù)【消息 Message ID 不同】:MQ Producer 發(fā)送消息時(shí),消息已成功發(fā)送到服務(wù)端并完成持久化,此時(shí)網(wǎng)絡(luò)閃斷或者客戶(hù)端宕機(jī)導(dǎo)致服務(wù)端應(yīng)答給客戶(hù)端失敗。如果此時(shí) MQ Producer 意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,MQ 消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同但是 Message ID 不同的消息。

  • 投遞時(shí)消息重復(fù)【消息 Message ID 相同】:MQ Consumer 消費(fèi)消息場(chǎng)景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶(hù)端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,MQ 服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,MQ 消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。

MetaQ 不能保證消息不重復(fù),因此對(duì)于重復(fù)消費(fèi)情況,需要業(yè)務(wù)自定義唯一標(biāo)識(shí)作為冪等處理的依據(jù)。

消息丟失問(wèn)題

MetaQ 避免消息丟失的機(jī)制主要包括:重試、冗余消息存儲(chǔ)。在生產(chǎn)者的消息投遞失敗時(shí),默認(rèn)會(huì)重試兩次。消費(fèi)者消費(fèi)失敗時(shí),在廣播模式下,消費(fèi)失敗僅會(huì)返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,而不會(huì)重試。在未指定順序消息的集群模式下,消費(fèi)失敗的消息會(huì)進(jìn)入重試隊(duì)列自動(dòng)重試,默認(rèn)最大重試次數(shù)為 16 。在順序消費(fèi)的集群模式下,消費(fèi)失敗會(huì)使得當(dāng)前隊(duì)列暫停消費(fèi),并重試到成功為止。

主從同步

RocketMQ/MetaQ 為每個(gè)存儲(chǔ)數(shù)據(jù)的 Broker 節(jié)點(diǎn)配置 ClusterName,BrokerName 標(biāo)識(shí)來(lái)更好的進(jìn)行資源管理。多個(gè) BrokerName 相同的節(jié)點(diǎn)構(gòu)成一個(gè)副本組。每個(gè)副本還擁有一個(gè)從 0 開(kāi)始編號(hào),不重復(fù)也不一定連續(xù)的 BrokerId 用來(lái)表示身份,編號(hào)為 0 的節(jié)點(diǎn)是這個(gè)副本組的 Leader / Primary / Master,故障時(shí)通過(guò)選舉來(lái)重新對(duì) Broker 編號(hào)標(biāo)識(shí)新的身份。例如 BrokerId = {0, 1, 3},則 0 為主,其他兩個(gè)為備。

從模型的角度來(lái)看,RocketMQ /MetaQ 單節(jié)點(diǎn)上 Topic 數(shù)量較多,如果像 kafka 以 topic 粒度維護(hù)狀態(tài)機(jī),節(jié)點(diǎn)宕機(jī)會(huì)導(dǎo)致上萬(wàn)個(gè)狀態(tài)機(jī)切換,這種驚群效應(yīng)會(huì)帶來(lái)很多潛在風(fēng)險(xiǎn),因此新版本的 RocketMQ/MetaQ 選擇以單個(gè) Broker 作為切換的最小粒度來(lái)管理,相比于其他更細(xì)粒度的實(shí)現(xiàn),副本身份切換時(shí)只需要重分配 Broker 編號(hào),對(duì)元數(shù)據(jù)節(jié)點(diǎn)壓力最小。由于通信的數(shù)據(jù)量少,可以加快主備切換的速度,單個(gè)副本下線的影響被限制在副本組內(nèi),減少管理和運(yùn)維成本。這種實(shí)現(xiàn)也存在一些缺點(diǎn),例如存儲(chǔ)節(jié)點(diǎn)的負(fù)載無(wú)法以最佳狀態(tài)在集群上進(jìn)行負(fù)載均衡。

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

?

RocketMQ/MetaQ 采用物理復(fù)制的方法,存儲(chǔ)層的 CommitLog 通過(guò)鏈表和內(nèi)核的 MappedFile 機(jī)制抽象出一條 append only 的數(shù)據(jù)流。主副本將未提交的消息按序傳輸給其他副本(相當(dāng)于 redo log),并根據(jù)一定規(guī)則計(jì)算確認(rèn)位點(diǎn)(confirm offset)判斷日志流是否被提交。最終一致性通過(guò)數(shù)據(jù)水位對(duì)齊的方式來(lái)實(shí)現(xiàn)(越近期的消息價(jià)值越高):?

消息隊(duì)列中間件 MetaQ/RocketMQ,中間件,rocketmq

  • 1-1 情況下滿(mǎn)足備 Max <= 主 Min,一般是備新上線或下線較久,備跳過(guò)存量日志,從主的 Min 開(kāi)始復(fù)制。

  • 1-2,2-2 兩種情況下滿(mǎn)足 主 Min < 備 Max <= 主 Max,一般是由于備網(wǎng)絡(luò)閃斷導(dǎo)致日志水位落后,通過(guò) HA 連接追隨主即可。

  • 1-3,2-3 兩種情況下備 Max > 主 Max,可能由于主異步寫(xiě)磁盤(pán)宕機(jī)后又成為主,或者網(wǎng)絡(luò)分區(qū)時(shí)雙主寫(xiě)入造成 CommitLog 分叉。由于新主落后于備,在確認(rèn)位點(diǎn)對(duì)齊后少量未確認(rèn)的消息丟失,這種非正常模式的選舉是應(yīng)該盡量避免的。

  • 3-3 理論上不會(huì)出現(xiàn),備的數(shù)據(jù)長(zhǎng)于主,原因可能是主節(jié)點(diǎn)數(shù)據(jù)丟失又疊加了非正常選舉,因此這種情況需要人工介入處理。

副本組的消息復(fù)制也支持同步和異步的模式。

復(fù)制方式

優(yōu)點(diǎn)

缺點(diǎn)

同步復(fù)制

成功寫(xiě)入的消息不會(huì)丟失,可靠性高

寫(xiě)入延遲更高

異步復(fù)制

slave 宕機(jī)不影響 master 性能更高

可能丟失消息


slave broker 會(huì)定時(shí)(60 s)從 master 同步信息

  • public?void?syncAll()?{????????this.syncTopicConfig();????????this.syncConsumerOffset();????????this.syncDelayOffset();????????this.syncSubscriptionGroupConfig();????????this.syncMessageRequestMode();????????if?(brokerController.getMessageStoreConfig().isTimerWheelEnable())?{????????????this.syncTimerMetrics();????????} }
主從切換

RocketMQ 衍生出了很多不同的主從切換架構(gòu)。?

無(wú)切換架構(gòu)

最早的時(shí)候,RocketMQ 基于 Master-Slave 模式提供了主備部署的架構(gòu),這種模式提供了一定的高可用能力,在 Master 節(jié)點(diǎn)負(fù)載較高情況下,讀流量可以被重定向到備機(jī)。由于沒(méi)有選主機(jī)制,在 Master 節(jié)點(diǎn)不可用時(shí),這個(gè)副本組的消息發(fā)送將會(huì)完全中斷,還會(huì)出現(xiàn)延遲消息、事務(wù)消息等無(wú)法消費(fèi)或者延遲。此外,備機(jī)在正常工作場(chǎng)景下資源使用率較低,造成一定的資源浪費(fèi)。為了解決這些問(wèn)題,社區(qū)提出了在一個(gè) Broker 進(jìn)程內(nèi)運(yùn)行多個(gè) BrokerContainer,這個(gè)設(shè)計(jì)類(lèi)似于 Flink 的 slot,讓一個(gè) Broker 進(jìn)程上可以以 Container 的形式運(yùn)行多個(gè)節(jié)點(diǎn),復(fù)用傳輸層的連接,業(yè)務(wù)線程池等資源,通過(guò)單節(jié)點(diǎn)主備交叉部署來(lái)同時(shí)承擔(dān)多份流量,無(wú)外部依賴(lài),自愈能力強(qiáng)。這種方式下隔離性弱于使用原生容器方式進(jìn)行隔離,同時(shí)由于架構(gòu)的復(fù)雜度增加導(dǎo)致了自愈流程較為復(fù)雜。

切換架構(gòu)

另一條演進(jìn)路線則是基于可切換的,RocketMQ 也嘗試過(guò)依托于 Zookeeper 的分布式鎖和通知機(jī)制進(jìn)行 HA 狀態(tài)的管理。引入外部依賴(lài)的同時(shí)給架構(gòu)帶來(lái)了復(fù)雜性,不容易做小型化部署,部署運(yùn)維和診斷的成本較高。另一種方式就是基于 Raft 在集群內(nèi)自動(dòng)選主,Raft 中的副本身份被透出和復(fù)用到 Broker Role 層面去除外部依賴(lài),然而強(qiáng)一致的 Raft 版本并未支持靈活的降級(jí)策略,無(wú)法在 C(Consistency)和 A (Availability)之間靈活調(diào)整。兩種切換方案都是 CP 設(shè)計(jì),犧牲高可用優(yōu)先保證一致性。主副本下線時(shí)選主和路由定時(shí)更新策略導(dǎo)致整個(gè)故障轉(zhuǎn)移時(shí)間依然較長(zhǎng),Raft 本身對(duì)三副本的要求也會(huì)面臨較大的成本壓力。?

RocketMQ DLedger 融合模式

RocketMQ DLedger (基于 Raft 的分布式日志存儲(chǔ))融合模式是 RocketMQ 5.0 演進(jìn)中結(jié)合上述兩條路線后的一個(gè)系統(tǒng)的解決方案。?

模式

優(yōu)點(diǎn)

缺點(diǎn)

無(wú)切換

Master-Slave 模式

實(shí)現(xiàn)簡(jiǎn)單,適用于中小型用戶(hù),人工管控力強(qiáng)

故障需要人工處理,故障時(shí)寫(xiě)入消息失敗,導(dǎo)致消息消費(fèi)暫停

Broker Container 模式

無(wú)需選主,無(wú)外部依賴(lài),故障轉(zhuǎn)移非???(< 3 秒)

增加單節(jié)點(diǎn)運(yùn)維的復(fù)雜度,機(jī)器故障的風(fēng)險(xiǎn)增加,自愈流程復(fù)雜

切換架構(gòu)

Raft 自動(dòng)選主模式

自動(dòng)主備切換

故障轉(zhuǎn)移時(shí)間較長(zhǎng),強(qiáng)一致無(wú)法靈活降級(jí),三副本成本壓力較大

融合架構(gòu)

基于 Dledger Controller 的可切換模式

可支持無(wú)切換和切換架構(gòu)之間的轉(zhuǎn)換,復(fù)制協(xié)議更簡(jiǎn)單,靈活降級(jí)

提高了部署和系統(tǒng)的復(fù)雜度

?

總結(jié)

相比較于 RocketMQ/MetaQ,Kafka 具有更高的吞吐量。Kafka 默認(rèn)采用異步發(fā)送的機(jī)制,并且還擁有消息收集和批量發(fā)送的機(jī)制,這樣的設(shè)置可以顯著提高其吞吐量。由于 Kafka 的高吞吐量,因此通常被用于日志采集、大數(shù)據(jù)等領(lǐng)域。?

RocketMQ/MetaQ 不采用異步的方式發(fā)送消息。因?yàn)楫?dāng)采用異步的方式發(fā)送消息時(shí),Producer 發(fā)送的消息到達(dá) Broker 就會(huì)返回成功。此時(shí)如果 Producer 宕機(jī),而消息在 Broker 刷盤(pán)失敗時(shí),就會(huì)導(dǎo)致消息丟失,從而降低系統(tǒng)的可靠性。?

RocketMQ/MetaQ 單機(jī)可以支持更多的 topic 數(shù)量。因?yàn)?Kafka 在 Broker 端是將一個(gè)分區(qū)存儲(chǔ)在一個(gè)文件中的,當(dāng) topic 增加時(shí),分區(qū)的數(shù)量也會(huì)增加,就會(huì)產(chǎn)生過(guò)多的文件。當(dāng)消息刷盤(pán)時(shí),就會(huì)出現(xiàn)性能下降的情況。而 RocketMQ/MetaQ 是將所有消息順序?qū)懭胛募?,因此不?huì)出現(xiàn)這種情況。

當(dāng) Kafka 單機(jī)的 topic 數(shù)量從幾十到幾百個(gè)時(shí),就會(huì)出現(xiàn)吞吐量大幅度下降、load 增高、響應(yīng)時(shí)間變長(zhǎng)等現(xiàn)象。而 RocketMQ/MetaQ 的 topic 數(shù)量達(dá)到幾千,甚至上萬(wàn)時(shí),也只是會(huì)出現(xiàn)小幅度的性能下降。

綜上所述,Kafka 具有更高的吞吐量,適合應(yīng)用于日志采集、大數(shù)據(jù)等領(lǐng)域。而 RocketMQ/MetaQ 單機(jī)支持更多的 topic,且具有更高的可靠性(一致性支持),因此適用于淘寶這樣復(fù)雜的業(yè)務(wù)處理。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-622226.html

到了這里,關(guān)于消息隊(duì)列中間件 MetaQ/RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【消息中間件】RocketMQ消息重復(fù)消費(fèi)場(chǎng)景及解決辦法

    【消息中間件】RocketMQ消息重復(fù)消費(fèi)場(chǎng)景及解決辦法

    消息重復(fù)消費(fèi)是各個(gè)MQ都會(huì)發(fā)生的常見(jiàn)問(wèn)題之一,在一些比較敏感的場(chǎng)景下,重復(fù)消費(fèi)會(huì)造成比較嚴(yán)重的后果,比如重復(fù)扣款等。 當(dāng)系統(tǒng)的調(diào)用鏈路比較長(zhǎng)的時(shí)候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時(shí)候,如果系統(tǒng)B處理成功,但是遲遲

    2024年02月05日
    瀏覽(41)
  • 分布式消息中間件RocketMQ的應(yīng)用

    分布式消息中間件RocketMQ的應(yīng)用

    所有代碼同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息發(fā)送分類(lèi) ? Producer對(duì)于消息的發(fā)送方式也有多種選擇,不同的方式會(huì)產(chǎn)生不同的系統(tǒng)效果。 同步發(fā)送消息 ? 同步發(fā)送消息是指,Producer發(fā)出一條消息后,會(huì)在收到MQ返回的ACK之后才發(fā)下一條消息。

    2024年02月05日
    瀏覽(21)
  • 【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者簡(jiǎn)介 前言 博主之前寫(xiě)過(guò)一個(gè)完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,從安裝使用到底層機(jī)制、原理。專(zhuān)欄地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是該系列的清單綜述,會(huì)拉通來(lái)聊一下三大MQ的特點(diǎn)和各種適合的場(chǎng)景。 目錄 1.概述 1.1.M

    2024年02月09日
    瀏覽(53)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中間件技術(shù)選型

    消息中間件是分布式系統(tǒng)中重要的組件之一,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)、提高系統(tǒng)可靠性和擴(kuò)展性。在做消息中間件技術(shù)選型時(shí),需要考慮多個(gè)因素,包括可靠性、性能、可擴(kuò)展性、功能豐富性、社區(qū)支持和成本等。本文將五種流行的消息中間件技術(shù):ActiveMQ、RabbitMQ、

    2024年02月11日
    瀏覽(23)
  • SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的發(fā)送方:生產(chǎn)者 消息的接收方:消費(fèi)者 同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送 異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送 什么是消息隊(duì)列? 當(dāng)此時(shí)有很多個(gè)用戶(hù)同時(shí)訪問(wèn)服務(wù)器,需要服務(wù)器進(jìn)行操作,但此

    2024年04月27日
    瀏覽(53)
  • RocketMQ:一個(gè)純java的開(kāi)源消息中間件--開(kāi)發(fā)測(cè)試環(huán)境搭建

    一、簡(jiǎn)介 ? ? RocketMQ的前身是Metaq,當(dāng)?Metaq?3.0發(fā)布時(shí),產(chǎn)品名稱(chēng)改為? RocketMQ ????MetaQ2.x版本由于依賴(lài)了alibaba公司內(nèi)部其他系統(tǒng),對(duì)于公司外部用戶(hù)使用不夠友好,推薦使用3.0版本。 ? ? ?項(xiàng)目地址:? https://github.com/alibaba/RocketMQ

    2024年02月11日
    瀏覽(25)
  • 消息中間件(MQ)對(duì)比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在構(gòu)建分布式系統(tǒng)時(shí),選擇適合的消息中間件是至關(guān)重要的決策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是當(dāng)前流行的消息中間件之一,它們各自具有獨(dú)特的特點(diǎn)和適用場(chǎng)景。本文將對(duì)這四種消息中間件進(jìn)行綜合比較,幫助您在項(xiàng)目中作出明智的選擇。 1. RabbitMQ 特點(diǎn): 消息模

    2024年02月20日
    瀏覽(35)
  • 【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專(zhuān)題」RocketMQ消息發(fā)送的全部流程和落盤(pán)原理分析

    RocketMQ目前在國(guó)內(nèi)應(yīng)該是比較流行的MQ 了,目前本人也在公司的項(xiàng)目中進(jìn)行使用和研究,借著這個(gè)機(jī)會(huì),分析一下RocketMQ 發(fā)送一條消息到存儲(chǔ)一條消息的過(guò)程,這樣會(huì)對(duì)以后大家分析和研究RocketMQ相關(guān)的問(wèn)題有一定的幫助。 分析的總體技術(shù)范圍發(fā)送到存儲(chǔ),本文的主要目的是

    2024年02月10日
    瀏覽(26)
  • Linux系統(tǒng)下消息中間件RocketMQ下載、安裝、搭建、配置、控制臺(tái)rocketmq-dashboard的安裝保姆級(jí)教程 rocketmq ui

    Linux系統(tǒng)下消息中間件RocketMQ下載、安裝、搭建、配置、控制臺(tái)rocketmq-dashboard的安裝保姆級(jí)教程 rocketmq ui

    這里給出我使用的 RocketMQ 版本(5.1.3)、RocketMQ-Dashboard 版本的百度網(wǎng)盤(pán)鏈接: 鏈接:https://pan.baidu.com/s/1HaKBBDGWZ0WKLGgVwIG9pw 提取碼:1234 1、注意:有兩種資源下載:Source表示源碼、Binary是二進(jìn)制包(我們下載這個(gè)):二進(jìn)制包是已經(jīng)編譯完成后可以直接運(yùn)行的,源碼包是需要

    2024年02月12日
    瀏覽(24)
  • 消息中間件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之間的區(qū)別

    消息中間件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之間的區(qū)別

    目錄 一、什么是消息中間件 二、消息中間件的組成 1、Broker 2、Producer 3、Consumer 4、Topic 5、Queue 6、Message 三、消息中間件通信模式 1、點(diǎn)對(duì)點(diǎn)(kafka不支持這種模式) ?2、發(fā)布/訂閱 ?四、消息中間件的作用 1、系統(tǒng)解耦 2、提高系統(tǒng)響應(yīng)時(shí)間 3、為大數(shù)據(jù)處理架構(gòu)提供服務(wù) 五、

    2024年01月25日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包