目錄
一.MQ概述
1.簡介
2.用途
限流削峰
異步解耦?
數(shù)據(jù)收集?
3.MQ對(duì)比
二. RocketMQ概述
1.基本概念
消息(Message)
主題(Topic)
標(biāo)簽(Tag)
隊(duì)列(Queue)
消息標(biāo)識(shí)(MessageId/Key)
?2.系統(tǒng)架構(gòu)
Producer
Consumer
NameServer
Broker
工作流程
三.RocketMQ的啟動(dòng)
1.安裝JDK
2.配置RocketMQ
①修改NameServer和Broker的內(nèi)存參數(shù)
②配置 NameServer 的環(huán)境變量
③修改broker的配置文件
3.啟動(dòng)RocketMQ
①創(chuàng)建一個(gè)logs文件夾,用于存放日志(切換到該目錄)
②啟動(dòng)NameServer
③啟動(dòng)broker
4.關(guān)閉RocketMQ
①關(guān)閉broker
②關(guān)閉nameserver
5.配置rocketmq-dashboard
一.MQ概述
1.簡介
MQ,Message Queue,是一種提供消息隊(duì)列服務(wù)的中間件,也稱為消息中間件,是一套提供了消息生產(chǎn)、存儲(chǔ)、消費(fèi)全過程API的軟件系統(tǒng)。消息即數(shù)據(jù)。一般消息的體量不會(huì)很大。
2.用途
限流削峰
MQ可以將系統(tǒng)的超量請(qǐng)求暫存其中,以便系統(tǒng)后期可以慢慢進(jìn)行處理,從而避免了請(qǐng)求的丟失或系統(tǒng) 被壓垮。
異步解耦?
上游系統(tǒng)對(duì)下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會(huì)大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高。而異步調(diào)用則會(huì)解決這些問題。所以兩層之間若要實(shí)現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個(gè)MQ層
數(shù)據(jù)收集?
分布式系統(tǒng)會(huì)產(chǎn)生海量級(jí)數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控?cái)?shù)據(jù)、用戶行為等。針對(duì)這些數(shù)據(jù)流進(jìn)行實(shí)時(shí)或批量采集匯總,然后對(duì)這些數(shù)據(jù)流進(jìn)行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺(tái)的必備技術(shù)。通過MQ完成此類數(shù)據(jù)收集是最好的選擇。(如Kafka)
3.MQ對(duì)比
二. RocketMQ概述
1.基本概念
消息(Message)
消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。
主題(Topic)
Topic表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。
標(biāo)簽(Tag)
為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。
隊(duì)列(Queue)
存儲(chǔ)消息的物理實(shí)體。一個(gè)Topic中可以包含多個(gè)Queue,每個(gè)Queue中存放的就是該Topic的消息。一個(gè)Topic的Queue也被稱為一個(gè)Topic中消息的分區(qū)(Partition)。
一個(gè) Topic 的 Queue 中的消息只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。一個(gè)Queue中的消息不允許同一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者同時(shí)消費(fèi)。
消息標(biāo)識(shí)(MessageId/Key)
RocketMQ中每個(gè)消息擁有自己的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key,以方便對(duì)消息的查詢。MessageId有兩個(gè):
- 在生產(chǎn)者send()消息時(shí)會(huì)自動(dòng)生成一個(gè)MessageId(msgId)
- 當(dāng)消息到達(dá)Broker后,Broker也會(huì)自動(dòng)生成一個(gè)MessageId(offsetMsgId)。
msgId、offsetMsgId與key都稱為消息標(biāo)識(shí)。
- msgId:由producer端生成,其生成規(guī)則為:producerIp + 進(jìn)程pid + MessageClientIDSetter類的ClassLoader的hashCode +當(dāng)前時(shí)間 + AutomicInteger自增計(jì)數(shù)器。[重復(fù)概率較低]
- offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量) [重復(fù)概率較高]
- key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識(shí)。[用戶可控制不重復(fù)]
?2.系統(tǒng)架構(gòu)
RocketMQ架構(gòu)上主要分為四部分構(gòu)成:
Producer
消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列(先選擇Broker,再選擇隊(duì)列)進(jìn)行消息投遞,投遞的過程支持快速失敗并且低延遲。
RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(ProducerGroup)的形式出現(xiàn)的。一個(gè)生產(chǎn)者組可以同時(shí)發(fā)送多個(gè)主題的消息。
Consumer
消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器中獲取到消息,并對(duì)消息進(jìn)行相關(guān)業(yè)務(wù)處理。
RocketMQ中的消息消費(fèi)者都是以消費(fèi)者組(ConsumerGroup)的形式出現(xiàn)的。消費(fèi)者組是同一類消費(fèi)者的集合,這類Consumer消費(fèi)的是相同Topic類型(可以是一個(gè),可以是多個(gè),但是要相同)、并且是相同的tag(可以是一個(gè),可以是多個(gè),但是要相同)(保證訂閱關(guān)系的一致性)。消費(fèi)者組使得在消息消費(fèi)方面,容易實(shí)現(xiàn)
- 負(fù)載均衡(將一個(gè)Topic中的不同的Queue平均分配給同一個(gè)ConsumerGroup的不同的Consumer,注意,并不是將消息負(fù)載均衡)
- 容錯(cuò)(一個(gè)Consmer掛了,該ConsumerGroup中的其它Consumer可以接著消費(fèi)原Consumer消費(fèi)的Queue)
注意:
- 消費(fèi)者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費(fèi)消息。
- 一個(gè)Topic類型的消息可以被多個(gè)消費(fèi)者組同時(shí)消費(fèi)
- 消費(fèi)者組需要保證訂閱關(guān)系的一致性。這類Consumer消費(fèi)的是相同Topic類型(可以是一個(gè),可以是多個(gè),但是要相同)、并且是相同的tag(可以是一個(gè),可以是多個(gè),但是要相同)
NameServer
NameServer是一個(gè)Broker與Topic路由的注冊(cè)中心,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。
主要包括兩個(gè)功能:
- Broker管理:接受Broker集群的注冊(cè)信息并且保存下來作為路由信息的基本數(shù)據(jù);提供心跳檢測機(jī)制,檢查Broker是否還存活。
- 路由信息管理:每個(gè)NameServer中都保存著Broker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。Producer和Conumser通過NameServer可以獲取整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。
①路由注冊(cè)
NameServer通常也是以集群的方式部署,不過,NameServer是無狀態(tài)的,即NameServer集群中的各個(gè)節(jié)點(diǎn)間是無差異的,各節(jié)點(diǎn)間相互不進(jìn)行信息通訊。
各節(jié)點(diǎn)的數(shù)據(jù)同步:在Broker節(jié)點(diǎn)啟動(dòng)時(shí),輪詢NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長連接,發(fā)起注冊(cè)請(qǐng)求。在NameServer內(nèi)部維護(hù)著?個(gè)Broker列表,用來動(dòng)態(tài)存儲(chǔ)Broker的信息。
注意:NameServer的無狀態(tài)特性使得它和ZooKeeper、Eureka、Nacos等注冊(cè)中心不同
- 優(yōu)點(diǎn):NameServer集群搭建簡單,擴(kuò)容簡單。
- 缺點(diǎn):對(duì)于Broker,必須明確指出所有NameServer地址。否則未指出的將不會(huì)去注冊(cè)。也正因 為如此,NameServer并不能隨便擴(kuò)容。因?yàn)?,若Broker不重新配置,新增的NameServer對(duì)于 Broker來說是不可見的,其不會(huì)向這個(gè)NameServer進(jìn)行注冊(cè)。
Broker節(jié)點(diǎn)為了證明自己是活著的,為了維護(hù)與NameServer間的長連接,會(huì)將最新的信息以心跳包的方式上報(bào)給NameServer,每30秒發(fā)送一次心跳。心跳包中包含BrokerId、Broker地址(IP+Port)、 Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會(huì)更新心跳時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間。
②路由剔除
由于Broker關(guān)機(jī)、宕機(jī)或網(wǎng)絡(luò)抖動(dòng)等原因,NameServer沒有收到Broker的心跳,NameServer可能會(huì)將其從Broker列表中剔除。
NameServer中有?個(gè)定時(shí)任務(wù),每隔10秒就會(huì)掃描?次Broker表,查看每一個(gè)Broker的最新心跳時(shí)間戳距離當(dāng)前時(shí)間是否超過120秒,如果超過,則會(huì)判定Broker失效,然后將其從Broker列表中剔除。
③路由發(fā)現(xiàn)
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時(shí),NameServer不會(huì)主動(dòng)推送給 客戶端,而是客戶端定時(shí)拉取主題最新的路由。默認(rèn)客戶端每30秒會(huì)拉取一次最新的路由。
補(bǔ)充:
- Push模型:推送模型。是一個(gè)“發(fā)布-訂閱”模型(客戶端向服務(wù)端訂閱),需要維護(hù)一個(gè)長連接。一旦服務(wù)端數(shù)據(jù)發(fā)生變化,馬上推送到客戶端,實(shí)時(shí)性較好。但?長連接的維護(hù)是需要服務(wù)端的資源成本的,該模型適合于實(shí)時(shí)性要求較高 的場景
- Pull模型:拉取模型。存在的問題是,實(shí)時(shí)性較差。由客戶端定時(shí)向服務(wù)端拉取數(shù)據(jù)。
-
Long Polling模型:長輪詢模型。客戶端發(fā)起請(qǐng)求后,服務(wù)端不會(huì)立即返回請(qǐng)求結(jié)果,而是將請(qǐng)求掛起等待一段時(shí)間,如果此段時(shí)間內(nèi)服務(wù)端數(shù)據(jù)變更,立即響應(yīng)客戶端請(qǐng)求,若是一直無變化則等到指定的超時(shí)時(shí)間后響應(yīng)請(qǐng)求,客戶端重新發(fā)起長鏈接。
?其實(shí)這是對(duì)Push與Pull模型的整合,充分利用了這兩種模型的優(yōu) 勢,屏蔽了它們的劣勢。
④客戶端選擇NameServer的策略
這里的客戶端指的是Producer與Consumer
客戶端在配置時(shí)必須要寫上NameServer集群的地址
客戶端首先會(huì)生產(chǎn)一個(gè)隨機(jī)數(shù),然后再與NameServer節(jié)點(diǎn)數(shù)量取模,此時(shí)得到的就是所要連接的 節(jié)點(diǎn)索引,然后就會(huì)進(jìn)行連接。如果連接失敗,則會(huì)采用round-robin策略,逐個(gè)嘗試著去連接其它節(jié) 點(diǎn)。 首先采用的是隨機(jī)策略進(jìn)行的選擇,失敗后采用的是輪詢策略。
客戶端(Producer與Consumer)首先采用的是隨機(jī)策略進(jìn)行的選擇,失敗后采用的是輪詢策略。
Broker
Broker充當(dāng)著消息的中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲(chǔ)從 生產(chǎn)者發(fā)送來的消息,同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。Broker同時(shí)也存儲(chǔ)著消息相關(guān)的元數(shù)據(jù),包括 消費(fèi)者組消費(fèi)進(jìn)度偏移offset、主題、隊(duì)列等。
集群部署(解決單點(diǎn)故障)
可以將每個(gè)Broker集群節(jié)點(diǎn)進(jìn)行橫向擴(kuò)展
Broker節(jié)點(diǎn)集群是一個(gè)主從集群,即集群中具有?Master?與?Slave?兩種角色。Master負(fù)責(zé)處理讀寫操作請(qǐng)求,Slave負(fù)責(zé)對(duì)Master中的數(shù)據(jù)進(jìn)行備份。當(dāng)Master掛掉了,Slave則會(huì)自動(dòng)切換為Master去工作。所以這個(gè)Broker集群是主備集群。
一個(gè)Master可以包含多個(gè)Slave,但一個(gè)Slave只能隸屬于一個(gè)Master。Master與Slave的對(duì)應(yīng)關(guān)系是通過指定相同的BrokerName、不同的BrokerId來確定的。BrokerId為0表示Master,非0表示Slave。每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊(cè)Topic信息到所有NameServer。
工作流程
①具體流程
1)啟動(dòng)NameServer,NameServer啟動(dòng)后開始監(jiān)聽端口,等待Broker、Producer、Consumer連接。
2)啟動(dòng)Broker時(shí),Broker會(huì)與所有的NameServer建立并保持長連接,然后每30秒向NameServer定時(shí)發(fā)送心跳包。
3)發(fā)送消息前,需要先創(chuàng)建Topic,創(chuàng)建Topic時(shí)需要指定該Topic要存儲(chǔ)在哪些Broker上以及在這些broker上創(chuàng)建幾個(gè)Queue,在創(chuàng)建Topic時(shí)也會(huì)將Topic與Broker的關(guān)系寫入到NameServer中。
補(bǔ)充:Topic的創(chuàng)建模式(手動(dòng)創(chuàng)建和自動(dòng)創(chuàng)建)
手動(dòng)創(chuàng)建Topic時(shí),有兩種模式:?
- 集群模式:該模式下創(chuàng)建的Topic在該集群中,所有Broker中的Queue數(shù)量是相同的
- Broker模式:該模式下創(chuàng)建的Topic在該集群中,每個(gè)Broker中的Queue數(shù)量可以不同
自動(dòng)創(chuàng)建Topic時(shí),默認(rèn)采用的是Broker模式,會(huì)為每個(gè)Broker默認(rèn)創(chuàng)建4個(gè)Queue
4)Producer發(fā)送消息,啟動(dòng)時(shí)與NameServer集群中的其中一臺(tái)建立長鏈接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略選擇一個(gè)Queue,與隊(duì)列所在的broker建立長鏈接從而向broker發(fā)送消息。在獲取到路由信息后,Producer會(huì)首先將路由信息緩存到本地,再每隔30s從NameServer更新一個(gè)路由信息。
5)Consumer跟Producer類似,跟其中一臺(tái)NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費(fèi)的Queue,然后直接跟Broker建立長連接,開始消費(fèi)其中的消息。Consumer在獲取到路由信息后,同樣也會(huì)每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會(huì)向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。
②讀寫隊(duì)列
從物理上來講,讀/寫隊(duì)列是同一個(gè)隊(duì)列。所以,不存在讀/寫隊(duì)列數(shù)據(jù)同步問題。讀/寫隊(duì)列是邏輯上進(jìn)行區(qū)分的概念。一般情況下,讀/寫隊(duì)列數(shù)量是相同的。
- 當(dāng)寫隊(duì)列多于讀隊(duì)列時(shí):例如,創(chuàng)建Topic時(shí)設(shè)置的寫隊(duì)列數(shù)量為8,讀隊(duì)列數(shù)量為4,此時(shí)系統(tǒng)會(huì)創(chuàng)建8個(gè)Queue,分別是0 1 2 3?4 5 6 7。Producer會(huì)將消息寫入到這8個(gè)隊(duì)列,但Consumer只會(huì)消費(fèi)0 1 2 3 這4個(gè)隊(duì)列中的消息,4 5 6 7 中的消息是不會(huì)被消費(fèi)到的。
- 當(dāng)讀隊(duì)列多于寫隊(duì)列時(shí):例如,創(chuàng)建Topic時(shí)設(shè)置的寫隊(duì)列數(shù)量為4,讀隊(duì)列數(shù)量為8,此時(shí)系統(tǒng)會(huì)創(chuàng)建8個(gè)Queue,分別是0 1 2 3?4 5 6 7。Producer會(huì)將消息寫入到0 1 2 3這4個(gè)隊(duì)列,但Consumer只會(huì)消費(fèi)0 1 2 3 4 5 6 7這8個(gè)隊(duì)列中的消息,但是4 5 6 7中是沒有消息的。此時(shí)假設(shè)ConsumerGroup中包含兩個(gè)Consuer,Consumer1消費(fèi)0 1 2 3,而Consumer2消費(fèi)4 5 6 7。但實(shí)際情況是,Consumer2是沒有消息可消費(fèi)的。
也就是說,當(dāng)讀/寫隊(duì)列數(shù)量設(shè)置不同時(shí),總是有問題的
那為什么還要這樣設(shè)計(jì)呢?
為了方便Topic的Queue的縮容。例如,原來創(chuàng)建的Topic中包含16個(gè)Queue,如何能夠使其Queue縮容為8個(gè),還不會(huì)丟失消息?
可以先動(dòng)態(tài)修改寫隊(duì)列數(shù)量為8,讀隊(duì)列數(shù)量不變。此時(shí)新的消息只能寫入到前8個(gè)隊(duì)列,而消費(fèi)都消費(fèi)的卻是 16個(gè)隊(duì)列中的數(shù)據(jù)。當(dāng)發(fā)現(xiàn)后8個(gè)Queue中的消息消費(fèi)完畢后,就可以再將讀隊(duì)列數(shù)量動(dòng)態(tài)設(shè)置為8。整個(gè)縮容過程,沒有丟失任何消息。
在rocketmq-dashboard 的 topic 配置中,perm 的值只能有 3種:
- 2:該topic 只讀不寫
- 4:該topic 只寫不讀
- 6:該topic 讀寫都可
三.RocketMQ的啟動(dòng)
1.安裝JDK
上傳到服務(wù)器后解壓(記得切換目錄)
tar -zxvf jdk-8u341-linux-x64.tar.gz
配置環(huán)境
vim /etc/profile
添加以下配置
# java env
# jdk安裝路徑,根據(jù)自己安裝路徑更改
export JAVA_HOME=/usr/local/java/jdk1.8.0_171
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
重新編譯文件生效
source /etc/profile
2.配置RocketMQ
①修改NameServer和Broker的內(nèi)存參數(shù)
修改runserver.sh
使用vim命令打開bin/runserver.sh文件
修改runbroker.sh?
使用vim命令打開bin/runbroker.sh文件
②配置 NameServer 的環(huán)境變量
?配置環(huán)境
vim /etc/profile
添加以下配置(若為虛擬機(jī)可以寫localhost)
export NAMESRV_ADDR=阿里云公網(wǎng)IP:9876
重新編譯文件生效
source /etc/profile
③修改broker的配置文件
進(jìn)入conf目錄下,修改broker.conf文件,添加以下配置
namesrvAddr=阿里云公網(wǎng)IP:9876
autoCreateTopicEnable=true
brokerIP1=阿里云公網(wǎng)IP
- namesrvAddr:nameSrv地址
- autoCreateTopicEnable:自動(dòng)創(chuàng)建主題,不然需要手動(dòng)創(chuàng)建出來
- brokerIP1:broker也需要一個(gè)公網(wǎng)ip
如果不指定,那么是阿里云的內(nèi)網(wǎng)地址,我們?cè)诒镜責(zé)o法連接使用
并且云服務(wù)器的安全組需要打開9876 (nameserver),10100~11000之間的端口 (broker),如果開了安全組本地還是連不上,可以查看防火墻開放的端口
firewall-cmd --list-all
打開防火墻端口?
firewall-cmd --zone=public --add-port=端口號(hào)/tcp --permanent
重啟一下(重啟生效):
firewall-cmd --reload
3.啟動(dòng)RocketMQ
①創(chuàng)建一個(gè)logs文件夾,用于存放日志(切換到該目錄)
②啟動(dòng)NameServer
nohup sh bin/mqnamesrv > ./logs/namesrv.log &
③啟動(dòng)broker
這里的 -c 是指定使用的配置文件
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
4.關(guān)閉RocketMQ
切換到bin目錄下
①關(guān)閉broker
./mqshutdown broker
②關(guān)閉nameserver
./mqshutdown namesrv
5.配置rocketmq-dashboard
下載地址:
https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip
下載后解壓出來,在跟目錄下執(zhí)行
mvn clean package -Dmaven.test.skip=true
打成jar包
將jar包上傳到服務(wù)器上去?
指定NameServer的地址和啟動(dòng)端口(8000)以及輸出日志
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8000 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > ./rocketmq-all-4.9.0-bin-release/logs/dashboard.log &
直接訪問8000端口即可,如果是云服務(wù)器記得設(shè)置安全組文章來源:http://www.zghlxwxcb.cn/news/detail-486111.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-486111.html
到了這里,關(guān)于RocketMQ架構(gòu)和工作流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!