目錄
1.MQ概述
1.1 RocketMQ簡介
1.2 MQ用途
1.3 常見MQ產(chǎn)品
2.RocketMQ 基本概念
2.1 消息
2.2 主題
2.3 標(biāo)簽
2.4 隊列
?2.5 Producer
2.6 Consumer
2.7 NameServer
2.8 Broker
2.9 RocketMQ 工作流程
?文章來源地址http://www.zghlxwxcb.cn/news/detail-439116.html
1.MQ概述
1.1 RocketMQ簡介
RocketMQ 是阿里開源的分布式消息中間件,跟其它中間件相比,RocketMQ 的特點是純JAVA實現(xiàn),是一套提供了消息生產(chǎn),存儲,消費全過程API的軟件系統(tǒng)。
1.2 MQ用途
限流削峰
MQ可以將系統(tǒng)的超量請求暫存其中,以便系統(tǒng)后期可以慢慢進行處理,從而避免了請求的丟失或系統(tǒng)被壓垮。
?異步解耦
上游系統(tǒng)對下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高、而異步調(diào)用則會解決這些問題。所以兩層之間若要實現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個MQ層。
?數(shù)據(jù)收集
分布式系統(tǒng)會產(chǎn)生海量級數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控數(shù)據(jù)、用戶行為等。針對這些數(shù)據(jù)流進行實時或批量采集匯總,然后對這些數(shù)據(jù)流進行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺的必備技術(shù)。通過MQ完成此類數(shù)據(jù)收集是最好的選擇。
?
1.3 常見MQ產(chǎn)品
RabbitMQ
RabbitMQ是使用ErLang語言開發(fā)的一款MQ產(chǎn)品。其吞吐量較Kafka與RocketMQ要低,且由于其不是Java語言開發(fā),所以公司內(nèi)部對其實現(xiàn)定制化開發(fā)難度較大。
Kafka
Kafka是使用Scala/Java語言開發(fā)的一款MQ產(chǎn)品。其最大的特點就是高吞吐量,常用于大數(shù)據(jù)領(lǐng)域的實時計算、日志采集等場景。其沒有遵循任何常見的MQ協(xié)議,而是使用自研協(xié)議。
RocketMQ
RocketMQ是使用Java語言開發(fā)的一款MQ產(chǎn)品。經(jīng)過數(shù)年阿里雙11的考驗,性能與穩(wěn)定性非常高。其沒有遵循任何常見的MQ協(xié)議,而是使用自研協(xié)議。
對比
?
2.RocketMQ 基本概念
2.1 消息
消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。單個消息所占空間不會很大。
RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key,以方便對消息的查詢。不過需要注意的是,MessageId有兩個:在生產(chǎn)者send()消息時會自動生成一個MessageId(msgId),當(dāng)消息到達Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標(biāo)識。?
msgId:由producer端生成,其生成規(guī)則為: producerIp + 進程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當(dāng)前時間 + AutomicInteger自增計數(shù)器?
offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量)?
key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識
?
2.2 主題
Topic表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。 一個生產(chǎn)者可以同時發(fā)送多種Topic的消息;而一個消費者只對某種特定的Topic感興趣,即只可以訂閱和消費一種Topic的消息。?
2.3 標(biāo)簽
標(biāo)簽為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。 標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消費邏輯,實現(xiàn)更好的擴展性。 Topic是消息的一級分類,Tag是消息的二級分類。Topic相當(dāng)于貨物,Tag相當(dāng)于上海山東等地區(qū)。
2.4 隊列
存儲消息的物理實體。 一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。 一個Topic的Queue也被稱為一個Topic中消息的分區(qū)(Partition)。 一個Topic的Queue中的消息只能被一個消費者組中的一個消費者消費。 一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。
?
分片不同于分區(qū)。在RocketMQ中,分片指的是存放相應(yīng)Topic的Broker。每個分片中會創(chuàng)建出相應(yīng)數(shù)量的分區(qū),即Queue,每個Queue的大小都是相同的。
?
?
?2.5 Producer
消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。??例如:用戶提交的請求寫入到MQ的過程,就是消息生產(chǎn)的過程,在這里用戶就是生產(chǎn)者 。
?RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(Producer Group)的形式出現(xiàn)的。生產(chǎn)者組是同一類生產(chǎn)者的集合,這類Producer發(fā)送相同Topic類型的消息。一個生產(chǎn)者組可以同時發(fā)送多個主題的消息。如果主題中有多個隊列,生產(chǎn)者組只有一個生產(chǎn)者,生產(chǎn)者會采取輪詢的方式進行發(fā)送消息。
生產(chǎn)者代碼如下:
導(dǎo)入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
?生產(chǎn)者代碼
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer order = new DefaultMQProducer("order");
order.setNamesrvAddr("localhost:9876");
order.start();
Message message = new Message("myTopic", "myTag", ("test").getBytes());
SendResult result = order.send(message);
System.out.println(result);
order.shutdown();
}
2.6 Consumer
消息消費者,負(fù)責(zé)消費消息。一個消息消費者會從Broker服務(wù)器中獲取到消息,并對消息進行相關(guān)業(yè)務(wù)處理。??例如:系統(tǒng)從MQ中讀取到請求,并對請求進行處理的過程就是消息消費的過程,在這里系統(tǒng)就是消費者。?
?
RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現(xiàn)的。消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息。 消費者組使得在消息消費方面,實現(xiàn)負(fù)載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負(fù)載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著消費原Consumer消費的Queue)的目標(biāo)變得非常容易。
消費者代碼
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("myTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("收到的消息"+list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
?
負(fù)載均衡策略
queue 個數(shù)大于 Consumer個數(shù), 那么 Consumer 會平均分配 queue,不夠平均,會根據(jù)clientId排序來拿取余數(shù)
queue個數(shù)小于Consumer個數(shù),那么會有Consumer閑置,就是浪費掉了,其余Consumer平均分配到queue
消費者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費消息。
2.7 NameServer
?NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。?
主要包括兩個功能:?
Broker管理:接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù);提供心跳檢測機制,檢查Broker是否還存活。
路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用于客戶端查詢的隊列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。?
路由注冊?
Name Server既然是注冊中心,那么是如何完成注冊的呢? NameServer通常也是以集群的方式部署,不過,NameServer是無狀態(tài)的,即NameServer集群中的各個節(jié)點間是無差異的,各節(jié)點間相互不進行信息通訊。 那各節(jié)點中的數(shù)據(jù)是如何進行數(shù)據(jù)同步的呢?在Broker節(jié)點啟動時,輪詢NameServer列表,與每個NameServer節(jié)點建立長連接,發(fā)起注冊請求。在NameServer內(nèi)部維護著?個Broker列表,用來動態(tài)存儲Broker的信息。?
?
Broker節(jié)點為了證明自己是活著的,為了維護與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發(fā)送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。?
路由剔除?
由于Broker關(guān)機、宕機或網(wǎng)絡(luò)抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。 NameServer中有?個定時任務(wù),每隔10秒就會掃描?次Broker表,查看每一個Broker的最新心跳時間戳距離當(dāng)前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。?
路由發(fā)現(xiàn)?
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取Topic最新的路由。 默認(rèn)客戶端每30秒會拉取一次最新的路由。
?
2.8 Broker
Broker充當(dāng)著消息中轉(zhuǎn)角色,負(fù)責(zé)存儲消息、轉(zhuǎn)發(fā)消息。
Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲從生產(chǎn)者發(fā)送來的消息,同時為消費者的拉取請求作準(zhǔn)備。Broker同時也存儲著消息相關(guān)的元數(shù)據(jù),包括消費者組消費進度偏移offset、主題、隊列等。
模塊如下圖:
Remoting Module:整個Broker的實體,負(fù)責(zé)處理來自clients端的請求。而這個Broker實體則由以下模塊構(gòu)成。
Client Manager:客戶端管理器。負(fù)責(zé)接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護Consumer的Topic訂閱信息
Store Service:存儲服務(wù)。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。
HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。
Index Service:索引服務(wù)。根據(jù)特定的Message key,對投遞到Broker的消息進行索引服務(wù),同時也提供根據(jù)Message Key對消息進行快速查詢的功能。
?
2.9 RocketMQ 工作流程
工作流程如下圖:
1)啟動NameServer,NameServer啟動后開始監(jiān)聽端口,等待Broker、Producer、Consumer連接。
2)啟動Broker時,Broker會與所有的NameServer建立并保持長連接,然后每50秒向NameServer定時發(fā)送心跳包。
3)發(fā)送消息前,可以先創(chuàng)建Topic,創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,當(dāng)然,在創(chuàng)建Topic時也會將Topic與Broker的關(guān)系寫入到NameServer中。不過,這步是可選的,也可以在發(fā)送消息時自動創(chuàng)建Topic。
4) Producer發(fā)送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發(fā)消息。當(dāng)然,在獲取到路由信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。
5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費的Queue,然后直接跟Broker建立長連接,開始消費其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。文章來源:http://www.zghlxwxcb.cn/news/detail-439116.html
?
到了這里,關(guān)于分布式消息隊列RocketMQ概念詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!