RocketMq系列整體欄目
內(nèi)容 | 鏈接地址 |
---|---|
【一】RocketMq安裝和基本概念 | https://zhenghuisheng.blog.csdn.net/article/details/134486709 |
【二】RocketMq的架構(gòu)解析和高性能設(shè)計(jì)/font> | https://zhenghuisheng.blog.csdn.net/article/details/134559514 |
一,RocketMq的架構(gòu)解析和高性能設(shè)計(jì)
在rocketMq中,其整體架構(gòu)如下,在RocketMqServer中,主要有NameServer,Broker,MessageQueue,Message等組件,并且存在Topic這種邏輯組件,表示一種主題
NameServer是topic的注冊中心,NameServer會和topic建立長連接,將broker的信息通過topic注冊到NameServer中,然后生產(chǎn)者和消費(fèi)者都會先通過這個NameServer獲取相關(guān)信息,再和對應(yīng)的broker建立長連接。
在微服務(wù)中,有Nacos,zookeeper等作為注冊中心:
但是zk很明顯不適合作為這種高可用的注冊的這中心,因?yàn)閮?nèi)部可能會因?yàn)檫x舉出現(xiàn)腦裂問題,并且因?yàn)檫@個問題可能會導(dǎo)致整個服務(wù)出現(xiàn)一定時(shí)間的不可用的問題,而rocketmq主要就是高吞吐量,低延遲的特性,因此不可能去選擇zk作為注冊中心的;
而nacos和eureka也不適合作為rocketmq的注冊中心,如nacos中會記錄很多信息,如心跳信息,端口,host等信息,而Nameserver中只需要記錄這個Broker的信息,如果使用nacos來做的話,有點(diǎn)大材小用了。并且如果引用nacos,還要考慮版本沖突這些,做一些適配器等,相對來說是更加復(fù)雜的
在topic中的Consumer配置中,每個topic都會對應(yīng)一個或者多個消費(fèi)者組,topic主題和消費(fèi)者組是多對多的關(guān)系,一個consumer消費(fèi)者組,代表的是一組邏輯相同的消費(fèi)者,一個message消息,只能被消費(fèi)者組中的一個消費(fèi)者消費(fèi),這個和kafka中的消息消費(fèi)是一樣的
上面提到了消費(fèi)者組的概念,在生產(chǎn)者中,也有生產(chǎn)者組。在事務(wù)機(jī)制中,當(dāng)生產(chǎn)者給broker發(fā)送數(shù)據(jù)之后,broker需要給生產(chǎn)者一個數(shù)據(jù)回調(diào),那么就需要指定生產(chǎn)者名字,那么此時(shí)生產(chǎn)者組就能發(fā)揮其作用
生產(chǎn)者producer在本地會有一個緩存存儲Nameserver中存儲的broker,在往broker投遞之前,會向注冊中心中發(fā)起一個請求判斷是否需要拉取最新的配置,然后再往對應(yīng)的broker發(fā)送數(shù)據(jù)
2,rocketmq底層原理
2.1,事務(wù)的底層實(shí)現(xiàn)
rocketmq的事務(wù)實(shí)現(xiàn),相當(dāng)于一個簡單的分布式事務(wù),主要是保證生產(chǎn)者本地事務(wù)和發(fā)送到broker事務(wù)的原子性。而broker到consumer端是一定可以保證消息消費(fèi)成功的,如果一個消費(fèi)者失敗,那么可以往別的消費(fèi)者里面推送,如果最終依舊失敗,那么可以先重試,最后加入到死信隊(duì)列里面
事務(wù)消息的底層實(shí)現(xiàn)如下圖,首先生產(chǎn)者會發(fā)送一個half消息給Broker,Broker在接收到這個half消息之后,就會向broker返回一個確認(rèn)的標(biāo)志,然后事務(wù)的發(fā)送者就會執(zhí)行本地事務(wù),通過這個execute去執(zhí)行本地事務(wù)。如果本地事務(wù)執(zhí)行成功,那么生產(chǎn)者會返回一個提在交的狀態(tài)給Broker,隨后Broker將消息投遞到消費(fèi)者中;如果是回滾狀態(tài),那么消息會直接丟掉;如果是在4的時(shí)候,本地事務(wù)需要的時(shí)間過長,那么本地會先返回一個unknow的未知狀態(tài),然后broker會等一段時(shí)間,隨后再回生產(chǎn)者中定時(shí)回查,消息生產(chǎn)者會去檢查事務(wù),默認(rèn)是回查15次,如果是15次之后檢查還是沒有完成,那么消息就會直接丟棄掉
half消息有點(diǎn)類似于建立tcp連接,主要是做為一種嗅探機(jī)制,判斷當(dāng)前broker服務(wù)是否正常,如果broker服務(wù)掛了,那么連本地事務(wù),也可以直接不執(zhí)行了。
如一個訂單場景,30s檢查一次是否支付,那么就可以直接通過這種事務(wù)去實(shí)現(xiàn),通過execute方法去執(zhí)行本地事務(wù),然后通過這個check的方式去銀行進(jìn)行對賬。如果最終超時(shí),那么最終將消息放入到死信隊(duì)列中,在私信隊(duì)列中寫對應(yīng)的邏輯,如將庫存加回等。
2.2,如何保證消息不丟失
在mq中,消息丟失主要有四個地方,分別是生產(chǎn)者到broker、broker到消費(fèi)者,broker的master到slave以及操作系統(tǒng)自身的緩存。
- 生產(chǎn)者到broker的解決方案可以如下:可以選擇最簡單的同步+多次試錯的方式,或者可以直接選擇事務(wù)消息
- broker到消費(fèi)者之間:消費(fèi)者本身具有重試功能,消費(fèi)者不應(yīng)答就會往別的消費(fèi)者投遞
- 操作系統(tǒng)主要是因?yàn)閿?shù)據(jù)在緩存,如果出現(xiàn)斷電而未來得及刷盤導(dǎo)致,因此應(yīng)該采用同步刷盤解決
- broker到的master到slave之間:也可以采用同步的方式,來一條消息就往slave寫入,或者通過Dledger集群
操作系統(tǒng)和主從之間保證消息不丟失,主要是通過同步的方式解決,但是在保證安全的情況下,會在一定的程度上影響吞吐量和性能
2.3,rocketmq積壓問題
在rocketmq中,其處理數(shù)據(jù)積壓問題時(shí)比其他mq的能力強(qiáng)的,如果出現(xiàn)積壓,那么可以直接通過控制臺上面的topic,通過內(nèi)部的代理者位點(diǎn)和消費(fèi)者位點(diǎn)所產(chǎn)生的差值查看,如果差值為0,則表示有消息積壓未處理。
在rocketmq內(nèi)部,一個MessageQueue隊(duì)列的消息只能由一個消費(fèi)者組中的一個消費(fèi)者去消費(fèi),其底層實(shí)現(xiàn)和kafka是一樣的,因此如果出現(xiàn)消息積壓,那么首先可以查看消費(fèi)者組中的消費(fèi)者個數(shù)和隊(duì)列的個數(shù)是否相同,如果消費(fèi)者個數(shù)小于隊(duì)列的個數(shù),那么可以增加消費(fèi)者個數(shù),直到和隊(duì)列的個數(shù)一致,如默認(rèn)隊(duì)列的個數(shù)為4,那么將消費(fèi)者組中的消費(fèi)者個數(shù)設(shè)置成4
當(dāng)然,消費(fèi)者個數(shù)調(diào)大是沒有用的,因?yàn)樽畲笾荒芎蛅opic中的隊(duì)列一致,那么就可以通過重寫一個topic,調(diào)大topic中隊(duì)列的數(shù)量,如原來的隊(duì)列個數(shù)只有4,那么可以創(chuàng)建一個新的topic,設(shè)置隊(duì)列的個數(shù)為8,并且原來的消費(fèi)者對消息不消費(fèi),而是做一個轉(zhuǎn)發(fā)功能,將4個隊(duì)列的topic的數(shù)據(jù)轉(zhuǎn)發(fā)到8個隊(duì)列的topic中,那么在消費(fèi)者組中,其個數(shù)就可以設(shè)置成8,那么這樣子就很好的處理消息積壓的問題了。
數(shù)據(jù)的搬運(yùn)可以在具體的消費(fèi)者代碼里面去編寫,主要功能有接收四個topic隊(duì)列的數(shù)據(jù),然后轉(zhuǎn)發(fā)到八個topic的隊(duì)列中,最后再寫一個消費(fèi)者去消費(fèi)八個隊(duì)列topic的消息
2.4,如何保證順序消費(fèi)
這里的順序消息只能保證局部有序,而不是全局有序。在rocketmq內(nèi)部,在生產(chǎn)者端,消息會根據(jù)id做一個取模運(yùn)算,會將同一個區(qū)取模運(yùn)算的值放入一個隊(duì)列里面,在消費(fèi)者端,會鎖定隊(duì)列消費(fèi),就是會先消費(fèi)完一個隊(duì)列再消費(fèi)下一個隊(duì)列,從而保證單個隊(duì)列消費(fèi)的有序性
2.5,rocketmq的持久化
rocketmq為了保證消息的安全性,在broker內(nèi)部都會做一個持久化的操作,首先當(dāng)生產(chǎn)者將消息發(fā)送到broker之后,會現(xiàn)將消息存儲到 coimmit 文件中,每個topic都會有對應(yīng)的commit文件,每個文件大小為1g,如果消息滿了則會創(chuàng)建新的文件,文件的格式為二進(jìn)制格式。
在消費(fèi)者中,會有一個 comsumeQueue 文件,改文件不存數(shù)據(jù),只存索引信息,如存一些偏移量等,在消費(fèi)時(shí)可以更快的定位到commit文件中的數(shù)據(jù),隨后去消費(fèi)里面的數(shù)據(jù),并且可以通過Tag標(biāo)簽去過濾消息
除了上面兩個文件之外,還有維護(hù)一個index文件,內(nèi)部會記錄Commit日志的偏移量等
2.6,死信隊(duì)列
當(dāng)broker和consumer之間重試16次之后,消息依舊沒能被消費(fèi),那么消息就會加入到死信隊(duì)列中。一個私信隊(duì)列會對應(yīng)一個消費(fèi)者組,其perm對應(yīng)的權(quán)限值為2。死信隊(duì)列的消息默認(rèn)不會被消費(fèi),而是需要開發(fā)者自身去處理該隊(duì)列中的數(shù)據(jù)。
并且私信隊(duì)列中消息的有效期也是三天,可以在broker.conf配置文件設(shè)置,當(dāng)超過這個時(shí)間,消息都會被刪除。
2.7,消息的冪等性
在rocketmq中,消息的冪等性為 at least once 至少被消費(fèi)一次。官方建議使用里面的key去做冪等性,key是一個唯一值,就是一個唯一id。除了這些方式之外,在分布式場景下,也可以開率分布式鎖這些做冪等。
3,rocketmq高性能的設(shè)計(jì)
3.1,零拷貝技術(shù)
零拷貝是操作系統(tǒng)層面的一種加速文件讀寫的操作機(jī)制,可以通過這種零拷貝的形式提升IO操作的性能。在java中,主要是通過這種 fileChannel 的方式實(shí)現(xiàn)零拷貝,其具體實(shí)現(xiàn)由 mmap和sendFile 兩種形式
以一個文件的拷貝為例,正常來說,需要從用戶態(tài)切換到內(nèi)核態(tài),然后再去執(zhí)行io操作,并且需要通過cpu的調(diào)度,從磁盤中將文件加載到內(nèi)存,再加載到網(wǎng)卡。而在引入零拷貝技術(shù)之后,可以讓channel代替cpu去做io操作,cpu只需要給channel對應(yīng)的權(quán)限即可。在操作系統(tǒng)層面,就是利用這種DMA技術(shù),將原來四次的cpu拷貝,變成了兩次,從而提高整體性能。
3.2,順序?qū)懠夹g(shù)
本人在寫過一個順序io和隨機(jī)io的文章:https://zhenghuisheng.blog.csdn.net/article/details/129080088 ,順序?qū)懣梢詼p少磁頭的移動去尋址,不管是插入數(shù)據(jù)還是查詢數(shù)據(jù),都可以提升其性能,并且可以減少磁盤的碎片。文章來源:http://www.zghlxwxcb.cn/news/detail-752031.html
3.3,刷盤機(jī)制
rocketmq為了保證數(shù)據(jù)的安全性,在broker中會持久化到commitlog中,在刷盤時(shí)有兩種方式,分別是:同步刷盤和異步刷盤 ,默認(rèn)采用的刷盤機(jī)制時(shí)異步刷盤文章來源地址http://www.zghlxwxcb.cn/news/detail-752031.html
flushDiskType=ASYNC_FLUSH
到了這里,關(guān)于【RocketMq系列-02】RocketMq的架構(gòu)解析和高性能設(shè)計(jì)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!