推薦電子書(shū):云原生架構(gòu)白皮書(shū) 2022版-藏經(jīng)閣-阿里云開(kāi)發(fā)者社區(qū) (aliyun.com)
簡(jiǎn)介—— 消息隊(duì)列中間件 MetaQ/RocketMQ
中間件 MetaQ 是一種基于隊(duì)列模型的消息中間件,MetaQ 據(jù)說(shuō)最早是受 Kafka 的影響開(kāi)發(fā)的,第一版的名字?"metamorphosis",是奧地利作家卡夫卡的名作——《變形記》。RocketMQ 是 MetaQ 的開(kāi)源版本。?
消息隊(duì)列中間件一般用于在分布式場(chǎng)景下解決集群?jiǎn)螜C(jī)瓶頸的問(wèn)題。在傳統(tǒng)的分布式計(jì)算環(huán)境中,常常會(huì)出現(xiàn)由于某個(gè)單機(jī)節(jié)點(diǎn)的性能瓶頸,即使其他節(jié)點(diǎn)仍有余力,仍然會(huì)導(dǎo)致整個(gè)系統(tǒng)的性能無(wú)法進(jìn)一步提升的情況,這一現(xiàn)象通常是由于任務(wù)負(fù)載不均衡,網(wǎng)絡(luò)延遲等常見(jiàn)且難以解決的問(wèn)題。消息隊(duì)列本質(zhì)上是提供了一種非常合理的任務(wù)分配策略,通過(guò)將任務(wù)分給消費(fèi)者實(shí)現(xiàn)異步和分布式處理,提高整個(gè)集群的性能。
消息隊(duì)列(mq)的核心思想是將耗時(shí)的任務(wù)異步化,通過(guò)消息隊(duì)列緩存任務(wù),從而實(shí)現(xiàn)消息發(fā)送方和接收方的解耦,使得任務(wù)的處理能夠異步、并行,從而提高系統(tǒng)或集群的吞吐量和可擴(kuò)展性。在這個(gè)過(guò)程中,整個(gè)系統(tǒng)強(qiáng)依賴(lài)于消息隊(duì)列,起到類(lèi)似橋梁的作用。消息隊(duì)列有著經(jīng)典的三大應(yīng)用場(chǎng)景:解耦、異步和削峰填谷。?
解耦場(chǎng)景:消息隊(duì)列一般使用發(fā)布/訂閱的模型,如果服務(wù) B C D 依賴(lài)服務(wù) A 的消息,此時(shí)新增服務(wù) E 也需要依賴(lài) A ,而 B 服務(wù)不再需要消息,需要頻繁且復(fù)雜的業(yè)務(wù)改造,效率低,穩(wěn)定性差,此時(shí)引入消息隊(duì)列進(jìn)行解耦,服務(wù) A 只需要將產(chǎn)生的消息發(fā)布到 mq 中,就不用管了,其它服務(wù)會(huì)自己根據(jù)需要訂閱 mq 中的消息,或者說(shuō)去 mq 中消費(fèi),這就使得每個(gè)服務(wù)可以更多地關(guān)注自身業(yè)務(wù),而不需要把精力用在維護(hù)服務(wù)之間的關(guān)系上,可擴(kuò)展性提高。
異步場(chǎng)景:如用戶(hù)的業(yè)務(wù)需要一系列的服務(wù)進(jìn)行處理,按順序處理的話,用戶(hù)需要等待的時(shí)間過(guò)長(zhǎng)。例如電商平臺(tái)的用戶(hù)下單、支付、積分、郵件、短信通知等流程,長(zhǎng)時(shí)間等待用戶(hù)無(wú)法接受,就可以通過(guò) mq 進(jìn)行服務(wù)的異步處理,例如積分、郵件和短信通知服務(wù)訂閱了支付服務(wù)的消息,將支付完成作為消息發(fā)布到 mq ,這些服務(wù)就可以同時(shí)對(duì)這一訂單進(jìn)行處理,降低了請(qǐng)求等待時(shí)間(rt) 。?
削峰填谷場(chǎng)景:削峰表示的含義是,流量如果太大,就控制服務(wù)器處理的 QPS,不要讓大流量打掛數(shù)據(jù)庫(kù)等導(dǎo)致服務(wù)器宕機(jī),讓服務(wù)處理請(qǐng)求更加平緩,節(jié)省服務(wù)器資源,其本質(zhì)上是控制用戶(hù)的請(qǐng)求速率,或是延緩或是直接拒絕。填谷的含義是將階段性的大流量請(qǐng)求緩存起來(lái),在流量平緩的時(shí)候慢慢處理,防止過(guò)多的請(qǐng)求被拒絕后的重試導(dǎo)致更大的流量。mq 很適合這一場(chǎng)景,QPS 超出服務(wù)端接收請(qǐng)求的能力時(shí),服務(wù)端仍然保持在安全范圍內(nèi)地從消息隊(duì)列中獲取消息進(jìn)行處理,多余的消息會(huì)積壓在消息隊(duì)列中,或由于超時(shí)直接拒絕,到 QPS 低于這一閾值的時(shí)候,這些積壓的消息就會(huì)被逐漸消費(fèi)掉。相當(dāng)于在系統(tǒng)前修建了一個(gè)流量蓄水池。?
除此之外還可以利用消息隊(duì)列進(jìn)行消息通信,日志處理等業(yè)務(wù),但消息隊(duì)列也會(huì)引入系統(tǒng)可用性,系統(tǒng)復(fù)雜度,數(shù)據(jù)一致性等問(wèn)題(強(qiáng)依賴(lài)消息隊(duì)列的正確執(zhí)行,需要確保消息不會(huì)丟失,確保消息的順序性等)。這意味著如果系統(tǒng)中的消息隊(duì)列承擔(dān)著重要的角色,那么消息隊(duì)列的可靠性和穩(wěn)定性也至關(guān)重要,本文介紹的 MetaQ/RocketMQ 是側(cè)重于維持消息一致性和高可靠性的消息隊(duì)列中間件。
物理架構(gòu)
MetaQ 的高可用性是基于其物理部署架構(gòu)實(shí)現(xiàn)的,在生產(chǎn)者為消息定義了一個(gè) topic 之后,消費(fèi)者可以訂閱這個(gè) topic ,于是消息就有了從生產(chǎn)到消費(fèi)的路由指向。
?
NameServer?負(fù)責(zé)暴露消息的 topic ,因此可以以將 NameServer 理解成一個(gè)注冊(cè)中心,用來(lái)關(guān)聯(lián) topic 和對(duì)應(yīng)的 broker ,即消息的存儲(chǔ)位置。NameServer 的每個(gè)節(jié)點(diǎn)都維護(hù)著 topic 和 broker 的映射關(guān)系,每個(gè)節(jié)點(diǎn)彼此獨(dú)立,無(wú)同步。在每個(gè)NameServer節(jié)點(diǎn)內(nèi)部都維護(hù)著所有?Broker?的地址列表,所有?Topic?和 Topic 對(duì)應(yīng)?Queue?的信息等。消息生產(chǎn)者在發(fā)送消息之前先與任意一臺(tái) NameServer 建立連接,獲取 Broker 服務(wù)器的地址列表,然后根據(jù)負(fù)載均衡算法從列表中選擇一臺(tái)消息服務(wù)器發(fā)送消息。
Broker?主要負(fù)責(zé)消息的存儲(chǔ)和轉(zhuǎn)發(fā),分為?master?和?slave,是一寫(xiě)多讀的關(guān)系。broker 節(jié)點(diǎn)可以按照處理的數(shù)據(jù)相同劃分成副本組,同一組 master 和 slave 的關(guān)系可以通過(guò)指定相同 brokerName,不同的 brokerId 來(lái)定義,brokerId 為 0 標(biāo)識(shí) master,非 0 是 slave。每個(gè) broker 服務(wù)器會(huì)與 NameServer 集群建立長(zhǎng)連接(注意是跟所有的 NameServer 服務(wù)器,因?yàn)?NameServer 彼此之間獨(dú)立不同步),并且會(huì)注冊(cè) topic 信息到 NameServer 中。復(fù)制策略是 Broker 的 Master 與 Slave 間的數(shù)據(jù)同步方式,分為同步復(fù)制與異步復(fù)制。由于異步復(fù)制、異步刷盤(pán)可能會(huì)丟失少量消息,因此 Broker 默認(rèn)采用的是同步雙寫(xiě)的方式,消息寫(xiě)入 master 成功后,master 會(huì)等待 slave 同步數(shù)據(jù)成功后才向 Producer 返回成功 ACK ,即 Master 與 Slave 都要寫(xiě)入成功后才會(huì)返回成功 ACK 。這樣可以保證消息發(fā)送時(shí)消息不丟失。副本組中,各個(gè)節(jié)點(diǎn)處理的速度不同,也就有了日志水位的概念 (高水位對(duì)消費(fèi)者不可見(jiàn))。在 master 宕機(jī)時(shí),同步副本集中的其余節(jié)點(diǎn)會(huì)自動(dòng)選舉出新的 master 代替工作(Raft 協(xié)議)。?
?
Producer,消息生產(chǎn)者,與 NameServer 隨機(jī)一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)從 NameServer 獲取 topic 路由信息,與 master broker 建立長(zhǎng)連接,定時(shí)發(fā)送心跳,Producer 只與 master 建立連接產(chǎn)生通信,不與 slave 建立連接。生產(chǎn)者和消費(fèi)者都有組(Group)的概念,同一組節(jié)點(diǎn)的生產(chǎn)/消費(fèi)邏輯相同。?
Consumer,消息消費(fèi)者,與 NameServer 隨機(jī)一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)從 NameServer 獲取 topic 的路由信息,并獲取想要消費(fèi)的 queue ??梢院吞峁┓?wù)的 master 或 slave 建立長(zhǎng)連接,定時(shí)向 master 和 slave 發(fā)送心跳,既可以從 master 訂閱消息,也可以從 slave 訂閱消息。
?
消息的存儲(chǔ)
MetaQ 將消息存儲(chǔ)(持久化)到位于生產(chǎn)者和消費(fèi)者之間的一個(gè)消息代理(Message Broker)上。
MetaQ 消息模型:
-
Message 單位消息;
-
Topic 消息的類(lèi)型,生產(chǎn)者對(duì)應(yīng)消費(fèi)者的分區(qū)標(biāo)識(shí);
-
Tag 消息在相同 Topic 時(shí)的二級(jí)分類(lèi)標(biāo)識(shí),可用于消息的篩選;
-
Queue 物理分區(qū),一個(gè) Topic 對(duì)應(yīng)多個(gè) Queue;
-
Group 生產(chǎn)者或消費(fèi)者的邏輯分組,同一個(gè) Group 的 生產(chǎn)者/消費(fèi)者 通常 生產(chǎn)/消費(fèi) 同一類(lèi)消息,并且 生產(chǎn)/消費(fèi) 的邏輯一致;
-
Offset:偏移值, 表示消費(fèi)到的位置或待消費(fèi)的消息位置;
消息的存儲(chǔ)方式對(duì)消息隊(duì)列的性能有很大影響,如 ActiveMQ 會(huì)使用隊(duì)列表來(lái)存儲(chǔ)消息,依靠輪訓(xùn)、加鎖等方式檢查和處理消息,但對(duì)于 QPS 很高的系統(tǒng)來(lái)說(shuō),一下子積壓龐大的數(shù)據(jù)量在表中會(huì)導(dǎo)致 B+ 樹(shù)索引層級(jí)加深,影響查詢(xún)效率。KV 數(shù)據(jù)庫(kù)采用如 LSM 樹(shù)作為索引結(jié)構(gòu),對(duì)讀性能有較大的犧牲,這對(duì)于消息隊(duì)列而言很難接受,因?yàn)橄㈥?duì)列常常需要面對(duì)消費(fèi)失敗需要重試的情況。?
?RocketMQ/Kafka/RabbitMQ?等消息隊(duì)列會(huì)采用順序?qū)懙娜罩窘Y(jié)構(gòu),將消息刷盤(pán)至文件系統(tǒng)作持久化。順序?qū)懭罩疚募梢员苊忸l繁的隨機(jī)訪問(wèn)而導(dǎo)致的性能問(wèn)題,而且利于延遲寫(xiě)入等優(yōu)化手段,能夠快速保存日志。Kafka 會(huì)為每個(gè) topic (事件的組織和存儲(chǔ)單位,一個(gè) topic 可以對(duì)應(yīng)多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者) 劃分出一個(gè)分區(qū)日志,便于根據(jù) topic 順序消費(fèi),消息被讀取后不會(huì)立刻刪除,可以持久存儲(chǔ),但 topic 數(shù)量增加的時(shí)候,broker 的分區(qū)文件數(shù)量增大,會(huì)使得本來(lái)速度很快的順序?qū)懽兂呻S機(jī)寫(xiě)(不同文件之間移動(dòng)),性能大幅下降。?
MetaQ 2.0 對(duì)這部分進(jìn)行重新設(shè)計(jì),其存儲(chǔ)結(jié)構(gòu)主要包括?CommitLog?和?Consume queue?兩部分。?
CommitLog 是物理存儲(chǔ),存儲(chǔ)不定長(zhǎng)的完整消息記錄,邏輯上是完全連續(xù)的一個(gè)文件,物理上單個(gè)文件大小是 1 GB,文件名是當(dāng)前文件首地址在 CommitLog 中的偏移量。只要 CommitLog 落盤(pán),就可以認(rèn)為已經(jīng)接收到消息,即使 Cosume queue 丟失,也可以從 CommitLog 恢復(fù)。而所有 topic 的消息都會(huì)存儲(chǔ)在同一個(gè) CommitLog 中來(lái)保證順序?qū)?。這樣的結(jié)構(gòu)會(huì)導(dǎo)致 CommitLog 讀取完全變成隨機(jī)讀,所以需要 Consume queue 作為索引隊(duì)列 (offset, size, tag),每個(gè) topic-queue 的消息在寫(xiě)完 CommitLog 之后,都會(huì)寫(xiě)到獨(dú)立的 Consume queue ,隊(duì)列里的每個(gè)元素都是定長(zhǎng)的元數(shù)據(jù),內(nèi)容包含該消息在對(duì)應(yīng) CommitLog 的 offset 和 size ,還包括 tagcode 可支持消息按照指定 tag 進(jìn)行過(guò)濾。順序?qū)懯?MetaQ 實(shí)現(xiàn)高性能的基礎(chǔ)。?
?
基于這樣的存儲(chǔ)結(jié)構(gòu),MetaQ 對(duì)客戶(hù)端暴露的主要是 Consume queue 邏輯視圖,提供隊(duì)列訪問(wèn)接口。消費(fèi)者通過(guò)指定 Consume queue 的位點(diǎn)來(lái)讀取消息,通過(guò)提交 Consume queue 的位點(diǎn)來(lái)維護(hù)消費(fèi)進(jìn)度。Concume queue 每個(gè)條目長(zhǎng)度固定(8個(gè)字節(jié)CommitLog物理偏移量、4字節(jié)消息長(zhǎng)度、8字節(jié)tag哈希碼),單個(gè) ConsumeQueue 文件默認(rèn)最多包括 30 萬(wàn)個(gè)條目。這樣做的好處是隊(duì)列非常輕量級(jí),Consume Queue 非常小,且在消費(fèi)過(guò)程中都是順序讀取,其速度幾乎能與內(nèi)存讀寫(xiě)相比,而在 page cache 和良好的空間局部性作用下,CommitLog 的訪問(wèn)也非??焖?。
?
MetaQ 會(huì)啟動(dòng)一個(gè)定時(shí)服務(wù) ReputMessageService 定時(shí)調(diào)用(間隔 1ms)來(lái)生成 Consume queue 和 其它索引文件。
Consume queue 解決了順序消費(fèi)的問(wèn)題,但如果需要根據(jù)屬性進(jìn)行篩選,就必須用到?index 索引。?
index 索引支持根據(jù) key 值進(jìn)行篩選,查找時(shí),可以根據(jù)消息的 key 計(jì)算 hash 槽的位置,hash 槽中存儲(chǔ)著 Index 條目的位置,可以根據(jù)這個(gè) index 條目獲得一個(gè)鏈表(尾),每個(gè) index 條目包含在 CommitLog 上的消息主體的物理偏移量。
消息鏈路
MetaQ 的消息可以根據(jù) topic-queue 劃分出確定的從生產(chǎn)者到消費(fèi)者路由指向。
?
1.producer 指定 broker 和 queue 發(fā)送消息 msg ;
2.broker 接收消息,并完成緩存、刷盤(pán)和生成摘要(同時(shí)根據(jù) tag 和 user properties 對(duì) msg 進(jìn)行打標(biāo))等操作;
3.consumer 每隔一段時(shí)間( pullInterval )從 broker 端的(根據(jù)服務(wù)端消息過(guò)濾模式 tag 或 sql 過(guò)濾后)獲取一定量的消息到本地消息隊(duì)列中(單線程)
4.consumer 按照配置并發(fā)分配上述隊(duì)列消息并執(zhí)行消費(fèi)方法;
5.consumer 返回 broker 消費(fèi)結(jié)果并重置消費(fèi)位點(diǎn);
生產(chǎn)者
Topic 是消息的主題,每個(gè) topic 對(duì)應(yīng)多個(gè)隊(duì)列,多個(gè)隊(duì)列會(huì)均勻的分布在多個(gè) broker 上,Producer 發(fā)送的消息在 broker 上會(huì)均衡的分布在多個(gè)隊(duì)列中,Producer 發(fā)送消息時(shí)在多個(gè)隊(duì)列間輪詢(xún)確保消息的均衡。?
??
發(fā)送消息的具體操作如下:
1、查詢(xún)本地緩存是否存儲(chǔ)了 TopicPublishInfo ,否則從 NameServer 獲取
2、根據(jù)負(fù)載均衡選擇策略獲取待發(fā)送隊(duì)列并輪訓(xùn)訪問(wèn)
3、獲取消息隊(duì)列對(duì)應(yīng)的 broker 實(shí)際 IP
4、設(shè)置消息 Unique ID ,zip 壓縮消息
5、消息校驗(yàn)(長(zhǎng)度等),發(fā)送消息
Producer 發(fā)送的每條消息都包含一個(gè) Topic,表示一類(lèi)消息的集合。同時(shí)還有一個(gè) Tag,用于區(qū)分同一Topic 下不同類(lèi)型的消息。一個(gè) Topic 包括多個(gè) Queue,每個(gè) Queue 中存放該 Topic 對(duì)應(yīng)消息的位置。一個(gè) Topic 的 Queue 相當(dāng)于該 Topic 中消息的分區(qū),Queue 可以存儲(chǔ)在不同的 Broker 上。發(fā)送消息時(shí),Producer 通過(guò)負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊(duì)列進(jìn)行消息投遞。
消息發(fā)送時(shí)如果出現(xiàn)失敗,默認(rèn)會(huì)重試 2 次,在重試時(shí)會(huì)盡量避開(kāi)剛剛接收失敗的 Broker,而是選擇其它 Broker 上的隊(duì)列進(jìn)行發(fā)送,從而提高消息發(fā)送的成功率。
消費(fèi)者
消費(fèi)方式
-
廣播消費(fèi):Producer 向一些隊(duì)列輪流發(fā)送消息,隊(duì)列集合稱(chēng)為 Topic,每一個(gè) Consumer 實(shí)例消費(fèi)這個(gè) Topic 對(duì)應(yīng)的所有隊(duì)列。
-
集群消費(fèi):多個(gè) Consumer 實(shí)例平均消費(fèi)這個(gè) Topic 對(duì)應(yīng)的隊(duì)列集合。
MetaQ 消費(fèi)者端有多套負(fù)載均衡算法的實(shí)現(xiàn),比較常見(jiàn)的是平均分配和平均循環(huán)分配,默認(rèn)使用平均分配算法,給每個(gè) Consumer 分配均等的隊(duì)列。一個(gè) Consumer 可以對(duì)應(yīng)多個(gè)隊(duì)列,而一個(gè)隊(duì)列只能給一個(gè) Consumer 進(jìn)行消費(fèi),Consumer 和隊(duì)列之間是一對(duì)多的關(guān)系。?
集群模式下有一點(diǎn)需要注意:消費(fèi)隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想,一個(gè)消息隊(duì)列同時(shí)只允許被一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多個(gè)消費(fèi)隊(duì)列。因此當(dāng) Consumer 的數(shù)量大于隊(duì)列的數(shù)量,會(huì)有部分 Consumer 分配不到隊(duì)列,這些分配不到隊(duì)列的 Consumer 機(jī)器不會(huì)有消息到達(dá)。?
平均分配算法舉例:
-
如果有 5 個(gè)隊(duì)列,2 個(gè) consumer,consumer1 會(huì)分配 3 個(gè)隊(duì)列,consumer2 分配 2 個(gè)隊(duì)列;
-
如果有 6 個(gè)隊(duì)列,2 個(gè) consumer,consumer1 會(huì)分配 3 個(gè)隊(duì)列,consumer2 也會(huì)分配 3 個(gè)隊(duì)列;
-
如果 10 個(gè)隊(duì)列,11 個(gè) consumer,consumer1~consumer10 各分配一個(gè)隊(duì)列,consumer11 無(wú)隊(duì)列分配;
如果消費(fèi)集群規(guī)模較大:例如 topic 隊(duì)列資源是 128 個(gè),而消費(fèi)機(jī)器數(shù)有 160 臺(tái),按照一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)集群中一臺(tái)機(jī)器處理的原則,會(huì)有 32 臺(tái)機(jī)器不會(huì)收到消息,此種情況需要聯(lián)系 MetaQ 人員進(jìn)行擴(kuò)容評(píng)估。?
消費(fèi)重試:當(dāng)出現(xiàn)消費(fèi)失敗的消息時(shí),Broker 會(huì)為每個(gè)消費(fèi)者組設(shè)置一個(gè)重試隊(duì)列。當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列會(huì)自動(dòng)進(jìn)行消費(fèi)重試。達(dá)到最大重試次數(shù)后,若消費(fèi)仍然失敗,此時(shí)會(huì)將該消息發(fā)送到死信隊(duì)列。對(duì)于死信消息,通常需要開(kāi)發(fā)人員進(jìn)行手動(dòng)處理。?
在消費(fèi)時(shí)間過(guò)程中可能會(huì)遇到消息消費(fèi)隊(duì)列增加和減少、消息消費(fèi)者增加或減少,此時(shí)需要對(duì)消息消費(fèi)隊(duì)列進(jìn)行重新平衡,既重新分配 (rebalance),這就是所謂的重平衡機(jī)制。在 RocketMQ 中,每隔 20s 會(huì)根據(jù)當(dāng)前隊(duì)列數(shù)量、消費(fèi)者數(shù)量重新進(jìn)行隊(duì)列負(fù)載計(jì)算,如果計(jì)算出來(lái)的結(jié)果與當(dāng)前不一樣,則觸發(fā)消息消費(fèi)隊(duì)列的重分配。?
Consumer 啟動(dòng)時(shí)會(huì)啟動(dòng)定時(shí)器,還執(zhí)行一些定時(shí)同步任務(wù),包括:同步 nameServer 地址,從 nameServer 同步 topic 的路由信息,清理 offline 的 broker,并向所有 broker 發(fā)送心跳,分配給當(dāng)前 consumer 的每個(gè)隊(duì)列將最新消費(fèi)的 offset 同步給 broker。
消息消費(fèi)過(guò)程淺析
三個(gè)關(guān)鍵服務(wù):?RebalanceService、PullMessageService、MessageConsumeService ? ?
RebalanceService 負(fù)載均衡服務(wù)
定時(shí)執(zhí)行一次負(fù)載均衡(20 s)分配消息隊(duì)列給消費(fèi)者。負(fù)載均衡針對(duì)每個(gè) topic 獨(dú)立進(jìn)行,具體如下:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);//廣播模式下每個(gè)消費(fèi)者要消費(fèi)所有 queue 的消息
????????????????????if?(changed)?{
????????????????????????this.messageQueueChanged(topic,?mqSet,?mqSet);
????????????????????????log.info("messageQueueChanged?{}?{}?{}?{}",
????????????????????????????consumerGroup,
????????????????????????????topic,
????????????????????????????mqSet,
????????????????????????????mqSet);
????????????????????}
????????????????}?else?{
????????????????????log.warn("doRebalance,?{},?but?the?topic[{}]?not?exist.",?consumerGroup,?topic);
????????????????}
????????????????break;
????????????}
????????????case?CLUSTERING:?{
????????????????Set<MessageQueue>?mqSet?=?this.topicSubscribeInfoTable.get(topic);//找到該topic下的消息隊(duì)列集合
????????????????List<String>?cidAll?=?this.mQClientFactory.findConsumerIdList(topic,?consumerGroup);//找到給消費(fèi)者組下的所有消費(fèi)者id
????????????????if?(null?==?mqSet)?{
????????????????????if?(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))?{
????????????????????????log.warn("doRebalance,?{},?but?the?topic[{}]?not?exist.",?consumerGroup,?topic);
????????????????????}
????????????????}
????????????????if?(null?==?cidAll)?{
????????????????????log.warn("doRebalance,?{}?{},?get?consumer?id?list?failed",?consumerGroup,?topic);
????????????????}
????????????????
????????????????if?(mqSet?!=?null?&&?cidAll?!=?null)?{
????????????????????List<MessageQueue>?mqAll?=?new?ArrayList<MessageQueue>();
????????????????????mqAll.addAll(mqSet);
????????????????????Collections.sort(mqAll);
????????????????????Collections.sort(cidAll);
????????????????????
????????????????????AllocateMessageQueueStrategy?strategy?=?this.allocateMessageQueueStrategy;
????????????????????
????????????????????List<MessageQueue>?allocateResult?=?null;
????????????????????try?{
????????????????????????allocateResult?=?strategy.allocate(
????????????????????????????this.consumerGroup,
????????????????????????????this.mQClientFactory.getClientId(),
????????????????????????????mqAll,
????????????????????????????cidAll);//?根據(jù)分配策略進(jìn)行分配
????????????????????}?catch?(Throwable?e)?{
????????????????????????log.error("AllocateMessageQueueStrategy.allocate?Exception.?allocateMessageQueueStrategyName={}",?strategy.getName(),
????????????????????????????e);
????????????????????????return;
????????????????????}
????????????????????
????????????????????Set<MessageQueue>?allocateResultSet?=?new?HashSet<MessageQueue>();
????????????????????if?(allocateResult?!=?null)?{
????????????????????????allocateResultSet.addAll(allocateResult);
????????????????????}
????????????????????boolean?changed?=?this.updateProcessQueueTableInRebalance(topic,?allocateResultSet,?isOrder);//?更新處理隊(duì)列表
????????????????????
????????????????????if?(changed)?{
????????????????????????log.info(
????????????????????????????"rebalanced?result?changed.?allocateMessageQueueStrategyName={},?group={},?topic={},?clientId={},?mqAllSize={},?cidAllSize={},?rebalanceResultSize={},?rebalanceResultSet={}",
????????????????????????????strategy.getName(),?consumerGroup,?topic,?this.mQClientFactory.getClientId(),?mqSet.size(),?cidAll.size(),
????????????????????????????allocateResultSet.size(),?allocateResultSet);
????????????????????????this.messageQueueChanged(topic,?mqSet,?allocateResultSet);
????????????????????}
????????????????}
????????????????break;
????????????}
????????????default:
????????????????break;
????????}
}
?
這里主要做了幾件事:
-
判斷消費(fèi)模式
-
廣播模式
i.找到 topic 下的消息隊(duì)列(queue)集合
ii.更新處理隊(duì)列表
-
集群模式
i.找到 topic 下的消息隊(duì)列集合
ii.找到消費(fèi)者組下所有消費(fèi)者 id
iii.根據(jù)分配策略進(jìn)行分配
iv.更新處理隊(duì)列表,開(kāi)始真正拉取消息請(qǐng)求
消費(fèi)者會(huì)將消費(fèi)位點(diǎn)更新到 NameServer 上,Rebalance 發(fā)生時(shí),讀取消費(fèi)者的消費(fèi)位點(diǎn)信息,需要注意在消費(fèi)者數(shù)量大于隊(duì)列數(shù)量的情況下,如果消費(fèi)者不及時(shí)更新消費(fèi)位點(diǎn)信息,可能會(huì)導(dǎo)致消息被重復(fù)消費(fèi)。因此,消費(fèi)者需要及時(shí)更新消費(fèi)位點(diǎn)信息,確保消費(fèi)進(jìn)度正確。
Consumer 創(chuàng)建的時(shí)候 Rebalance 會(huì)被執(zhí)行。整個(gè) rebalanceService 的作用就是不斷的通過(guò)負(fù)載均衡,重新分配隊(duì)列的過(guò)程。根據(jù)分配好的隊(duì)列構(gòu)建拉取消息的請(qǐng)求,然后放到 pullRequestQueue 中。
PullMessageService 拉取消息服務(wù)
首先拉取消息時(shí)最重要的是確定偏移量?offset,這存儲(chǔ)在消費(fèi)者端的 OffsetStore 對(duì)象中。
if?(this.defaultMQPushConsumer.getOffsetStore()?!=?null)?{
??????????this.offsetStore?=?this.defaultMQPushConsumer.getOffsetStore();
????????}?else?{
??????????switch?(this.defaultMQPushConsumer.getMessageModel())?{
????????????case?BROADCASTING:
??????????????this.offsetStore?=?new?LocalFileOffsetStore(this.mQClientFactory,?this.defaultMQPushConsumer.getConsumerGroup());
??????????????break;
????????????case?CLUSTERING:
??????????????this.offsetStore?=?new?RemoteBrokerOffsetStore(this.mQClientFactory,?this.defaultMQPushConsumer.getConsumerGroup());
??????????????break;
????????????default:
??????????????break;
??????????}
}
this.offsetStore.load();
可以看到廣播模式和集群模式的對(duì)象類(lèi)型不同,這是因?yàn)閷?duì) offset 的維護(hù)的方式不一樣,在 load 的時(shí)候 LocalFileOffsetStore 會(huì)從本地文件加載這個(gè) offset,而 RemoteBrokerOffsetStore 的 load 函數(shù)是空的。
兩種對(duì)象類(lèi)型分別有 readOffset 函數(shù)支持從內(nèi)存中獲取 offset 值,以及分別從本地文件存儲(chǔ)和 broker 獲取 offset。需要注意集群模式下消費(fèi)者只需要關(guān)心 broker 上維護(hù)的消費(fèi)進(jìn)度,因?yàn)椴徽?queue 和 消費(fèi)者的映射關(guān)系如何切換, 只有 offset 之后的未消費(fèi)消息是消費(fèi)者需要關(guān)心的。
?
消息的拉取過(guò)程是一個(gè)不斷循環(huán)的生產(chǎn)者消費(fèi)者模型,一個(gè) PullRequest 就對(duì)應(yīng)一個(gè)拉取任務(wù),并和一對(duì)MessageQueue(保存 Consume queue 的信息)和 ProcessQueue 關(guān)聯(lián),消息拉取的過(guò)程中,PullMessageService 拉取線程不停的讀取 PullRequestQueue 根據(jù) PullRequest 拉取消息。拉取到消息時(shí),消息提交到 ProcessQueue 中并新建 ConsumeRequest 提交到 ConsumeService 處理, 然后生成下一批的 PullRequest 丟到 PullRequestQueue。如果沒(méi)有拉取到消息或出現(xiàn)異常,則會(huì)重新將請(qǐng)求放回拉取隊(duì)列。ProcessQueue 中以 TreeMap 形式保存待處理的消息, key 為消息對(duì)應(yīng)的 offset ,并自動(dòng)進(jìn)行排序。
?
消息拉取過(guò)程:
1.PullMessageService 不斷循環(huán)遍歷,從 PullRequestQueue 中提取 PullRequest,根據(jù) nextOffset 去 broker 拉取消息,若該隊(duì)列 已經(jīng) dropped 則更新 offset 到 broker 并丟棄此拉消息請(qǐng)求。
2.PullMessageService 異步拉取消息,同時(shí)將 PullRequest 封裝在 PullCallback 中,PullCallback 封裝在 ResponseFuture中,并以自增的請(qǐng)求 id 為鍵,ResponseFuture 為值放入 ResponseTable 中。
3.Broker 收到請(qǐng)求,如果 offset 之后有新的消息會(huì)立即發(fā)送異步響應(yīng);否則等待直到 producer 有新的消息發(fā)送后返回或者超時(shí)。如果通信異?;蛘?Broker 超時(shí)未返回響應(yīng),nettyClient 會(huì)定時(shí)清理超時(shí)的請(qǐng)求,釋放 PullRequest 回到 PullRequestQueue。
4.用最新的 offset 更新 ResponseFuture 里的 PullRequest 并推送給 PullRequestQueue 里以進(jìn)行下一次拉取。批量拉取到的消息分批提交給 consumeExecutor 線程處理。
消費(fèi)控速
MetaQ 為消費(fèi)者端拉取消息提供了消費(fèi)控速的能力:
-
主動(dòng)控速,在整個(gè)消費(fèi)過(guò)程中我們可以發(fā)現(xiàn),如果想要做到流控,一個(gè)是控制生成 PullRequest 的時(shí)間間隔,一個(gè)是控制生成新一批的請(qǐng)求數(shù)量,因此 MetaQ 提供了兩個(gè)參數(shù)給我們 pullInterval、pullBatchSize ,主動(dòng)控速的邏輯是通過(guò)控制消息的拉取速度來(lái)達(dá)到降低速率的效果。
-
被動(dòng)控速,這種流量控制的方式要復(fù)雜得多,需要用戶(hù)在消費(fèi)消息時(shí)控制流量 (sentinel),由于消費(fèi)線程池的待消費(fèi)隊(duì)列的消息達(dá)到一定閾值之后,MetaQ 會(huì)被動(dòng)降低 PullRequest 的產(chǎn)生的速率,因此當(dāng)采用流量控制手段通過(guò)埋點(diǎn)降低消費(fèi)速度時(shí),待消費(fèi)隊(duì)列會(huì)逐漸占滿(mǎn),觸發(fā)降速機(jī)制;為什么不直接用 sentinel??因?yàn)?sentinel 快速失敗等策略觸發(fā)限流后會(huì)產(chǎn)生大量重試,重試消息會(huì)進(jìn)入重試隊(duì)列,當(dāng)重試的量逐漸增大,broker 上重試隊(duì)列中消息量也越來(lái)越多,并且重試消息再次投遞時(shí)還可能再次發(fā)生重試,又重新進(jìn)入重試隊(duì)列,同一條消息反復(fù)進(jìn)出隊(duì)列,這種無(wú)意義的重復(fù)動(dòng)作會(huì)增加 broker 的壓力。
消息種類(lèi)
普通消息
可選擇同步、異步或單向發(fā)送。同步:Producer 發(fā)出一條消息后,會(huì)在收到 MQ 返回的 ACK 之后再發(fā)送下一條消息。異步:Producer 發(fā)出消息后無(wú)需等待 MQ 返回 ACK ,直接發(fā)送下一條消息。單向: Producer 僅負(fù)責(zé)發(fā)送消息,不等待,MQ 也不返回 ACK。
順序消息
消息的順序性分為兩種:
-
全局順序:對(duì)于指定的一個(gè) Topic ,所有消息按照嚴(yán)格的先入先出的順序進(jìn)行發(fā)布和消費(fèi) (同一個(gè) queue)。
-
分區(qū)順序:對(duì)于一個(gè)指定的 Topic ,所有消息根據(jù) sharding key 進(jìn)行分區(qū),同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi),分區(qū)之間彼此獨(dú)立。
?MetaQ 只支持同一個(gè) queue 的順序消息,且同一個(gè) queue 只能被一臺(tái)機(jī)器的一個(gè)線程消費(fèi),如果想要支持全局消息,那需要將該 topic 的 queue 的數(shù)量設(shè)置為 1,犧牲了可用性。
消息事務(wù)
?
1.發(fā)送方向 MQ 服務(wù)端發(fā)送消息。
2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。
3.發(fā)送方開(kāi)始執(zhí)行本地事務(wù)邏輯。
4.發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。
5.在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過(guò)固定時(shí)間后 MQ Server 將對(duì)該消息發(fā)起消息回查。
6.發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
7.發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟 4 對(duì)半消息進(jìn)行操作。?
MetaQ 3.0 以后,新的版本提供更加豐富的功能,支持消息屬性、無(wú)序消息、延遲消息、廣播消息、長(zhǎng)輪詢(xún)消費(fèi)、高可用特性,這些功能基本上覆蓋了大部分應(yīng)用對(duì)消息中間件的需求。除了功能豐富之外,MetaQ 基于順序?qū)懀蟾怕薯樞蜃x的隊(duì)列存儲(chǔ)結(jié)構(gòu)和 pull 模式的消費(fèi)方式,使得 MetaQ 具備了最快的消息寫(xiě)入速度和百億級(jí)的堆積能力,特別適合用來(lái)削峰填谷。在 MetaQ 3.0 版本的基礎(chǔ)上,衍生了開(kāi)源版本 RocketMQ。
高可用
如何做到不重復(fù)消費(fèi)也不丟失消息?
重復(fù)消費(fèi)問(wèn)題
-
發(fā)送時(shí)消息重復(fù)【消息 Message ID 不同】:MQ Producer 發(fā)送消息時(shí),消息已成功發(fā)送到服務(wù)端并完成持久化,此時(shí)網(wǎng)絡(luò)閃斷或者客戶(hù)端宕機(jī)導(dǎo)致服務(wù)端應(yīng)答給客戶(hù)端失敗。如果此時(shí) MQ Producer 意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,MQ 消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同但是 Message ID 不同的消息。
-
投遞時(shí)消息重復(fù)【消息 Message ID 相同】:MQ Consumer 消費(fèi)消息場(chǎng)景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶(hù)端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,MQ 服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,MQ 消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
MetaQ 不能保證消息不重復(fù),因此對(duì)于重復(fù)消費(fèi)情況,需要業(yè)務(wù)自定義唯一標(biāo)識(shí)作為冪等處理的依據(jù)。
消息丟失問(wèn)題
MetaQ 避免消息丟失的機(jī)制主要包括:重試、冗余消息存儲(chǔ)。在生產(chǎn)者的消息投遞失敗時(shí),默認(rèn)會(huì)重試兩次。消費(fèi)者消費(fèi)失敗時(shí),在廣播模式下,消費(fèi)失敗僅會(huì)返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,而不會(huì)重試。在未指定順序消息的集群模式下,消費(fèi)失敗的消息會(huì)進(jìn)入重試隊(duì)列自動(dòng)重試,默認(rèn)最大重試次數(shù)為 16 。在順序消費(fèi)的集群模式下,消費(fèi)失敗會(huì)使得當(dāng)前隊(duì)列暫停消費(fèi),并重試到成功為止。
主從同步
RocketMQ/MetaQ 為每個(gè)存儲(chǔ)數(shù)據(jù)的 Broker 節(jié)點(diǎn)配置 ClusterName,BrokerName 標(biāo)識(shí)來(lái)更好的進(jìn)行資源管理。多個(gè) BrokerName 相同的節(jié)點(diǎn)構(gòu)成一個(gè)副本組。每個(gè)副本還擁有一個(gè)從 0 開(kāi)始編號(hào),不重復(fù)也不一定連續(xù)的 BrokerId 用來(lái)表示身份,編號(hào)為 0 的節(jié)點(diǎn)是這個(gè)副本組的 Leader / Primary / Master,故障時(shí)通過(guò)選舉來(lái)重新對(duì) Broker 編號(hào)標(biāo)識(shí)新的身份。例如 BrokerId = {0, 1, 3},則 0 為主,其他兩個(gè)為備。
從模型的角度來(lái)看,RocketMQ /MetaQ 單節(jié)點(diǎn)上 Topic 數(shù)量較多,如果像 kafka 以 topic 粒度維護(hù)狀態(tài)機(jī),節(jié)點(diǎn)宕機(jī)會(huì)導(dǎo)致上萬(wàn)個(gè)狀態(tài)機(jī)切換,這種驚群效應(yīng)會(huì)帶來(lái)很多潛在風(fēng)險(xiǎn),因此新版本的 RocketMQ/MetaQ 選擇以單個(gè) Broker 作為切換的最小粒度來(lái)管理,相比于其他更細(xì)粒度的實(shí)現(xiàn),副本身份切換時(shí)只需要重分配 Broker 編號(hào),對(duì)元數(shù)據(jù)節(jié)點(diǎn)壓力最小。由于通信的數(shù)據(jù)量少,可以加快主備切換的速度,單個(gè)副本下線的影響被限制在副本組內(nèi),減少管理和運(yùn)維成本。這種實(shí)現(xiàn)也存在一些缺點(diǎn),例如存儲(chǔ)節(jié)點(diǎn)的負(fù)載無(wú)法以最佳狀態(tài)在集群上進(jìn)行負(fù)載均衡。
?
RocketMQ/MetaQ 采用物理復(fù)制的方法,存儲(chǔ)層的 CommitLog 通過(guò)鏈表和內(nèi)核的 MappedFile 機(jī)制抽象出一條 append only 的數(shù)據(jù)流。主副本將未提交的消息按序傳輸給其他副本(相當(dāng)于 redo log),并根據(jù)一定規(guī)則計(jì)算確認(rèn)位點(diǎn)(confirm offset)判斷日志流是否被提交。最終一致性通過(guò)數(shù)據(jù)水位對(duì)齊的方式來(lái)實(shí)現(xiàn)(越近期的消息價(jià)值越高):?
-
1-1 情況下滿(mǎn)足備 Max <= 主 Min,一般是備新上線或下線較久,備跳過(guò)存量日志,從主的 Min 開(kāi)始復(fù)制。
-
1-2,2-2 兩種情況下滿(mǎn)足 主 Min < 備 Max <= 主 Max,一般是由于備網(wǎng)絡(luò)閃斷導(dǎo)致日志水位落后,通過(guò) HA 連接追隨主即可。
-
1-3,2-3 兩種情況下備 Max > 主 Max,可能由于主異步寫(xiě)磁盤(pán)宕機(jī)后又成為主,或者網(wǎng)絡(luò)分區(qū)時(shí)雙主寫(xiě)入造成 CommitLog 分叉。由于新主落后于備,在確認(rèn)位點(diǎn)對(duì)齊后少量未確認(rèn)的消息丟失,這種非正常模式的選舉是應(yīng)該盡量避免的。
-
3-3 理論上不會(huì)出現(xiàn),備的數(shù)據(jù)長(zhǎng)于主,原因可能是主節(jié)點(diǎn)數(shù)據(jù)丟失又疊加了非正常選舉,因此這種情況需要人工介入處理。
副本組的消息復(fù)制也支持同步和異步的模式。
復(fù)制方式 |
優(yōu)點(diǎn) |
缺點(diǎn) |
同步復(fù)制 |
成功寫(xiě)入的消息不會(huì)丟失,可靠性高 |
寫(xiě)入延遲更高 |
異步復(fù)制 |
slave 宕機(jī)不影響 master 性能更高 |
可能丟失消息 |
slave broker 會(huì)定時(shí)(60 s)從 master 同步信息
-
public?void?syncAll()?{
????????this.syncTopicConfig();
????????this.syncConsumerOffset();
????????this.syncDelayOffset();
????????this.syncSubscriptionGroupConfig();
????????this.syncMessageRequestMode();
????????if?(brokerController.getMessageStoreConfig().isTimerWheelEnable())?{
????????????this.syncTimerMetrics();
????????}
}
主從切換
RocketMQ 衍生出了很多不同的主從切換架構(gòu)。?
無(wú)切換架構(gòu)
最早的時(shí)候,RocketMQ 基于 Master-Slave 模式提供了主備部署的架構(gòu),這種模式提供了一定的高可用能力,在 Master 節(jié)點(diǎn)負(fù)載較高情況下,讀流量可以被重定向到備機(jī)。由于沒(méi)有選主機(jī)制,在 Master 節(jié)點(diǎn)不可用時(shí),這個(gè)副本組的消息發(fā)送將會(huì)完全中斷,還會(huì)出現(xiàn)延遲消息、事務(wù)消息等無(wú)法消費(fèi)或者延遲。此外,備機(jī)在正常工作場(chǎng)景下資源使用率較低,造成一定的資源浪費(fèi)。為了解決這些問(wèn)題,社區(qū)提出了在一個(gè) Broker 進(jìn)程內(nèi)運(yùn)行多個(gè) BrokerContainer,這個(gè)設(shè)計(jì)類(lèi)似于 Flink 的 slot,讓一個(gè) Broker 進(jìn)程上可以以 Container 的形式運(yùn)行多個(gè)節(jié)點(diǎn),復(fù)用傳輸層的連接,業(yè)務(wù)線程池等資源,通過(guò)單節(jié)點(diǎn)主備交叉部署來(lái)同時(shí)承擔(dān)多份流量,無(wú)外部依賴(lài),自愈能力強(qiáng)。這種方式下隔離性弱于使用原生容器方式進(jìn)行隔離,同時(shí)由于架構(gòu)的復(fù)雜度增加導(dǎo)致了自愈流程較為復(fù)雜。
切換架構(gòu)
另一條演進(jìn)路線則是基于可切換的,RocketMQ 也嘗試過(guò)依托于 Zookeeper 的分布式鎖和通知機(jī)制進(jìn)行 HA 狀態(tài)的管理。引入外部依賴(lài)的同時(shí)給架構(gòu)帶來(lái)了復(fù)雜性,不容易做小型化部署,部署運(yùn)維和診斷的成本較高。另一種方式就是基于 Raft 在集群內(nèi)自動(dòng)選主,Raft 中的副本身份被透出和復(fù)用到 Broker Role 層面去除外部依賴(lài),然而強(qiáng)一致的 Raft 版本并未支持靈活的降級(jí)策略,無(wú)法在 C(Consistency)和 A (Availability)之間靈活調(diào)整。兩種切換方案都是 CP 設(shè)計(jì),犧牲高可用優(yōu)先保證一致性。主副本下線時(shí)選主和路由定時(shí)更新策略導(dǎo)致整個(gè)故障轉(zhuǎn)移時(shí)間依然較長(zhǎng),Raft 本身對(duì)三副本的要求也會(huì)面臨較大的成本壓力。?
RocketMQ DLedger 融合模式
RocketMQ DLedger (基于 Raft 的分布式日志存儲(chǔ))融合模式是 RocketMQ 5.0 演進(jìn)中結(jié)合上述兩條路線后的一個(gè)系統(tǒng)的解決方案。?
模式 |
優(yōu)點(diǎn) |
缺點(diǎn) |
|
無(wú)切換 |
Master-Slave 模式 |
實(shí)現(xiàn)簡(jiǎn)單,適用于中小型用戶(hù),人工管控力強(qiáng) |
故障需要人工處理,故障時(shí)寫(xiě)入消息失敗,導(dǎo)致消息消費(fèi)暫停 |
Broker Container 模式 |
無(wú)需選主,無(wú)外部依賴(lài),故障轉(zhuǎn)移非???(< 3 秒) |
增加單節(jié)點(diǎn)運(yùn)維的復(fù)雜度,機(jī)器故障的風(fēng)險(xiǎn)增加,自愈流程復(fù)雜 |
|
切換架構(gòu) |
Raft 自動(dòng)選主模式 |
自動(dòng)主備切換 |
故障轉(zhuǎn)移時(shí)間較長(zhǎng),強(qiáng)一致無(wú)法靈活降級(jí),三副本成本壓力較大 |
融合架構(gòu) |
基于 Dledger Controller 的可切換模式 |
可支持無(wú)切換和切換架構(gòu)之間的轉(zhuǎn)換,復(fù)制協(xié)議更簡(jiǎn)單,靈活降級(jí) |
提高了部署和系統(tǒng)的復(fù)雜度 |
?
總結(jié)
相比較于 RocketMQ/MetaQ,Kafka 具有更高的吞吐量。Kafka 默認(rèn)采用異步發(fā)送的機(jī)制,并且還擁有消息收集和批量發(fā)送的機(jī)制,這樣的設(shè)置可以顯著提高其吞吐量。由于 Kafka 的高吞吐量,因此通常被用于日志采集、大數(shù)據(jù)等領(lǐng)域。?
RocketMQ/MetaQ 不采用異步的方式發(fā)送消息。因?yàn)楫?dāng)采用異步的方式發(fā)送消息時(shí),Producer 發(fā)送的消息到達(dá) Broker 就會(huì)返回成功。此時(shí)如果 Producer 宕機(jī),而消息在 Broker 刷盤(pán)失敗時(shí),就會(huì)導(dǎo)致消息丟失,從而降低系統(tǒng)的可靠性。?
RocketMQ/MetaQ 單機(jī)可以支持更多的 topic 數(shù)量。因?yàn)?Kafka 在 Broker 端是將一個(gè)分區(qū)存儲(chǔ)在一個(gè)文件中的,當(dāng) topic 增加時(shí),分區(qū)的數(shù)量也會(huì)增加,就會(huì)產(chǎn)生過(guò)多的文件。當(dāng)消息刷盤(pán)時(shí),就會(huì)出現(xiàn)性能下降的情況。而 RocketMQ/MetaQ 是將所有消息順序?qū)懭胛募?,因此不?huì)出現(xiàn)這種情況。
當(dāng) Kafka 單機(jī)的 topic 數(shù)量從幾十到幾百個(gè)時(shí),就會(huì)出現(xiàn)吞吐量大幅度下降、load 增高、響應(yīng)時(shí)間變長(zhǎng)等現(xiàn)象。而 RocketMQ/MetaQ 的 topic 數(shù)量達(dá)到幾千,甚至上萬(wàn)時(shí),也只是會(huì)出現(xiàn)小幅度的性能下降。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-622226.html
綜上所述,Kafka 具有更高的吞吐量,適合應(yīng)用于日志采集、大數(shù)據(jù)等領(lǐng)域。而 RocketMQ/MetaQ 單機(jī)支持更多的 topic,且具有更高的可靠性(一致性支持),因此適用于淘寶這樣復(fù)雜的業(yè)務(wù)處理。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-622226.html
到了這里,關(guān)于消息隊(duì)列中間件 MetaQ/RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!