一、什么是消息隊(duì)列?
提到消息隊(duì)列是否喚醒了你腦海深處的記憶?回看前面的這篇文章:《Java 多線程系列Ⅳ(單例模式+阻塞式隊(duì)列+定時(shí)器+線程池)》,其中我們?cè)诮榻B阻塞隊(duì)列時(shí)說(shuō)過(guò),阻塞隊(duì)列最大的用途就是實(shí)現(xiàn) 生產(chǎn)者消費(fèi)者模型。
我們知道對(duì)于生產(chǎn)者消費(fèi)者模型來(lái)說(shuō),它具有兩個(gè)十分亮眼的特點(diǎn):
- 解耦合.
- 削峰填谷.
(1)解耦合
在引入生產(chǎn)者消費(fèi)者模型之前,兩臺(tái)服務(wù)器之間通常是直接交互,這種交互模式使得服務(wù)器之間的耦合是非常大的。而引入生產(chǎn)者消費(fèi)者模型之后,兩臺(tái)服務(wù)器之間不再進(jìn)行直接通信,而是借助阻塞隊(duì)列進(jìn)行業(yè)務(wù)處理,起到了解耦的效果。
(2)削峰填谷
在引入生產(chǎn)者消費(fèi)者模型之前,同樣是兩臺(tái)服務(wù)器進(jìn)行直接通信,如果在一個(gè)時(shí)間點(diǎn),服務(wù)器 A 突然發(fā)送一組請(qǐng)求峰值,此刻服務(wù)器 B 也會(huì)隨之感受到峰值,這種情況下很可能造成服務(wù)器故障。如果此時(shí)使用阻塞隊(duì)列,A 將收到的請(qǐng)求發(fā)給隊(duì)列,雖然隊(duì)列中有很多請(qǐng)求,但是服務(wù)器 B 仍然和以按照原有的節(jié)奏讀取請(qǐng)求。
其實(shí)正是因?yàn)樯a(chǎn)者消費(fèi)者模型具有以上諸多好處,在實(shí)際的后端開(kāi)發(fā)中,特別是分布式系統(tǒng)里,跨主機(jī)使用生產(chǎn)者消費(fèi)者模型是非常普遍的需求。因此通常會(huì)把阻塞隊(duì)列單獨(dú)分離出來(lái),賦予更加豐富的功能,封裝成一個(gè)獨(dú)立的服務(wù)器程序,這個(gè)程序就稱為 消息隊(duì)列。
二、需求整理
1、生產(chǎn)者消費(fèi)者模型核心概念
生產(chǎn)者 (Producer): 負(fù)責(zé)將消息發(fā)送到消息隊(duì)列中。
消費(fèi)者 (Consumer): 從消息隊(duì)列中接收和處理消息。。
中間人 (Broker): 它負(fù)責(zé)接收發(fā)布者發(fā)送的消息,并將這些消息存儲(chǔ)在隊(duì)列中,然后將這些消息傳遞給訂閱者。
發(fā)布 (Publish): 生產(chǎn)者將消息投遞到中間人的過(guò)程。
訂閱 (Subscribe): 消費(fèi)者在中間人這里注冊(cè)的過(guò)程。只有消費(fèi)者注冊(cè)之后,當(dāng)一個(gè)消息發(fā)布到消息隊(duì)列時(shí)消息才會(huì)被發(fā)送給相應(yīng)的訂閱者。
根據(jù)以上概念,我們可以大致畫出生產(chǎn)者消費(fèi)者模型概念圖:(PS:下面的每個(gè)模塊均表示服務(wù)器)
一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者:
N 個(gè)生產(chǎn)者,N 個(gè)消費(fèi)者:
2、Broker 設(shè)計(jì)概要
我們當(dāng)前的目的是為了實(shí)現(xiàn)一個(gè)消息隊(duì)列,其中 Broker 是最核心的部分,它主要負(fù)責(zé)消息的 存儲(chǔ) 和 轉(zhuǎn)發(fā),其中涉及到的核心概念如下:
虛擬主機(jī) (VirtualHost): 類似于 MySQL 的 “database”,是?個(gè)邏輯上的集合。在實(shí)際的開(kāi)發(fā)中一個(gè) BrokerServer 可能會(huì)同時(shí)管理多組業(yè)務(wù)線上的數(shù)據(jù),此時(shí)可以使用不同的 VirtualHost 進(jìn)行區(qū)分。
交換機(jī) (Exchange): 生產(chǎn)者把消息先發(fā)送到 Broker 的 Exchange 上,再根據(jù)不同的規(guī)則,把消息轉(zhuǎn)發(fā)給不同的 Queue。
隊(duì)列 (Queue): 真正用來(lái)存儲(chǔ)消息的部分,每個(gè)消費(fèi)者決定自己從哪個(gè) Queue 上讀取消息(根據(jù)訂閱的隊(duì)列)。?個(gè) Exchange 可以綁定多個(gè) Queue (可以向多個(gè) Queue 中轉(zhuǎn)發(fā)消息),一個(gè) Queue 也可以被多個(gè) Exchange 綁定
(一個(gè) Queue 中的消息可以來(lái)自于多個(gè) Exchange)。綁定 (Binding): Exchange 和 Queue 之間的關(guān)聯(lián)關(guān)系,Exchange 和 Queue 可以理解成 “多對(duì)多” 關(guān)系。使用一個(gè)關(guān)聯(lián)表就可以把這兩個(gè)概念聯(lián)系起來(lái)。
消息 (Message): 具體來(lái)說(shuō)是服務(wù)器之間的請(qǐng)求和響應(yīng)。一個(gè)消息,可以視為一個(gè)字符串(二進(jìn)制數(shù)據(jù)),具體由程序員自定義。
上述概念在 Broker 中的體現(xiàn)如圖所示:
補(bǔ)充說(shuō)明1:數(shù)據(jù)存儲(chǔ)
以上這些概念對(duì)應(yīng)的數(shù)據(jù),既需要在內(nèi)存中存儲(chǔ),也需要在硬盤上存儲(chǔ),以內(nèi)存為主,硬盤為輔:
- 內(nèi)存存儲(chǔ):對(duì)于 MQ 來(lái)說(shuō),能夠高效的處理轉(zhuǎn)發(fā)數(shù)據(jù)時(shí)非常關(guān)鍵的指標(biāo),因此使用內(nèi)存組織上述數(shù)據(jù),能夠得到較高的效率。
- 硬盤存儲(chǔ):主要是為了防止內(nèi)存中的數(shù)據(jù)隨著進(jìn)程/主機(jī)重啟而丟失。
補(bǔ)充說(shuō)明:2: 交換機(jī)類型與轉(zhuǎn)發(fā)規(guī)則
上面我們提到,在生產(chǎn)者發(fā)送消息時(shí),首先會(huì)將消息發(fā)送到 Broker 的交換機(jī)上,再由交換機(jī)根據(jù)不同的規(guī)則轉(zhuǎn)發(fā)到相應(yīng)的隊(duì)列中。在 MQ 中支持四種類型的交換機(jī),它們分別是: Direct(直接交換機(jī))、Fanout(扇出交換機(jī))、Topic(主題交換機(jī))、Header(頭部交換機(jī))。其中 Header 這種方式比較復(fù)雜,也比較少見(jiàn),當(dāng)前項(xiàng)目中主要實(shí)現(xiàn)了前三種,下面分別對(duì)他們進(jìn)行詳細(xì)介紹:
前要說(shuō)明:
- 以下 bindingKey(綁定鍵)是在創(chuàng)建隊(duì)列和交換機(jī)綁定關(guān)系時(shí)指定的關(guān)鍵字。
- 以下 routingKey(路由鍵)是生產(chǎn)者發(fā)送消息時(shí)指定的關(guān)鍵字。
(1)Direct
(直接交換機(jī))
- 生產(chǎn)者發(fā)送消息時(shí),會(huì)指定一個(gè)"目標(biāo)隊(duì)列"的名字(此時(shí)的 routingKey 就是 隊(duì)列的名字,bindingKey 無(wú)效)
- 交換機(jī)收到消息后,查看當(dāng)前交換機(jī)對(duì)應(yīng)的綁定里面是否存在隊(duì)列名字為routingKey的隊(duì)列
- 如果有,就轉(zhuǎn)發(fā)過(guò)去(把消息塞進(jìn)對(duì)應(yīng)的“目標(biāo)隊(duì)列”中)
- 如果沒(méi)有,消息直接丟棄
(2)Fanout
(扇出交換機(jī))
- 生產(chǎn)者無(wú)需指定routingKey,直接發(fā)送消息到指定交換機(jī)
- 交換機(jī)收到消息后,直接將消息轉(zhuǎn)發(fā)給當(dāng)前交換機(jī)已綁定的所有隊(duì)列中。(此時(shí)的 bindingKey 和 routingKey 對(duì)扇出交換機(jī)無(wú)效。)
(3)Topic
(主題交換機(jī))
- 生產(chǎn)者發(fā)送消息時(shí),指定一個(gè) routingKey
- 交換機(jī)收到消息后,查看當(dāng)前交換機(jī)對(duì)應(yīng)的綁定中是否存在一個(gè) bindingKey 通過(guò)一定的規(guī)則和 routingKey 相匹配
- 如果有,就將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的綁定隊(duì)列中。
- 如果沒(méi)有,則將消息丟棄。
PS:以上所有概念出自 AMQP 協(xié)議:一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
3、Broker 核心 API
Broker 基于以上概念和功能,需要實(shí)現(xiàn)的核心 API 如下:
- 創(chuàng)建隊(duì)列 (queueDeclare)
- 銷毀隊(duì)列 (queueDelete)
- 創(chuàng)建交換機(jī) (exchangeDeclare)
- 銷毀交換機(jī) (exchangeDelete)
- 創(chuàng)建綁定 (queueBind)
- 解除綁定 (queueUnbind)
- 發(fā)布消息 (basicPublish)
- 訂閱消息 (basicConsume)
- 確認(rèn)消息 (basicAck)
補(bǔ)充說(shuō)明:
- 從上面可以看出來(lái),在進(jìn)行創(chuàng)建操作時(shí)并沒(méi)有使用 create,而是使用 declare,這是從語(yǔ)義上說(shuō)明,這里的創(chuàng)建起到的效果是不存在則創(chuàng)建,存在則啥也不做。
- 上述并沒(méi)有創(chuàng)建一個(gè)“消費(fèi)消息”的API,這是因?yàn)楫?dāng)前我們使用的工作模式是 push(推),Broker 會(huì)將消息主動(dòng)推送給訂閱的消費(fèi)者。當(dāng)然也有 pull(拉) 工作模式,需要消費(fèi)者主動(dòng)調(diào)用 Broker 的 API 獲取消息,當(dāng)前項(xiàng)目不涉及這種模式。
- 在MQ中有兩種應(yīng)答方式,一種是自動(dòng)應(yīng)答,這種方式下 Broker 將消息推送給訂閱的消費(fèi)者后就算應(yīng)答完畢。另一種應(yīng)答方式是手動(dòng)應(yīng)答,上述確認(rèn)消息(basicAck)起到的效果,是可以讓消費(fèi)者 顯式 的告訴 Broker,這個(gè)消息我處理完畢了,提高整個(gè)系統(tǒng)的可靠性。
4、客戶端設(shè)計(jì)概要
生產(chǎn)者、消費(fèi)者都是客戶端程序,broker 則是作為服務(wù)器,通過(guò)網(wǎng)絡(luò)進(jìn)行通信。此處設(shè)定,使用 TCP + 自定義的應(yīng)用層協(xié)議 實(shí)現(xiàn) 生產(chǎn)者/消費(fèi)者 和 BrokerServer 之間的交互工作,這里需要給客戶端提供一組 API,讓客戶端的業(yè)務(wù)代碼來(lái)調(diào)用,從而通過(guò)網(wǎng)絡(luò)通信的方式遠(yuǎn)程調(diào)用 brokerserver 上的方法。
客戶端核心API:
- 創(chuàng)建 Connection
- 關(guān)閉 Connection
- 創(chuàng)建 Channel
- 關(guān)閉 Channel
- 創(chuàng)建隊(duì)列 (queueDeclare)
- 銷毀隊(duì)列 (queueDelete)
- 創(chuàng)建交換機(jī) (exchangeDeclare)
- 銷毀交換機(jī) (exchangeDelete)
- 創(chuàng)建綁定 (queueBind)
- 解除綁定 (queueUnbind)
- 發(fā)布消息 (basicPublish)
- 訂閱消息 (basicConsume)
- 確認(rèn)消息 (basicAck)
補(bǔ)充說(shuō)明:
- 和 Broker 服務(wù)器 API 相比,客戶端程序還提供了如下 4 個(gè) API:創(chuàng)建 Connection、關(guān)閉 Connection、創(chuàng)建 Channel、關(guān)閉 Channel。
- 這里的一個(gè)Connection對(duì)象代表一個(gè)TCP連接。
- Channel 是 Connection 內(nèi)部邏輯上的鏈接,多個(gè)Channel復(fù)用同一個(gè)TCP連接,一個(gè)Connection 中可以包含多個(gè)Channel,每個(gè)Channel負(fù)責(zé)客戶端中不同的模塊,其中傳輸?shù)臄?shù)據(jù)是互不相干的。
- 這樣的設(shè)定主要是為了能夠更好的復(fù)用 TCP 連接, 達(dá)到長(zhǎng)連接的效果, 避免頻繁的創(chuàng)建關(guān)閉 TCP 連接。
- 上述客戶端提供的 API 只是給業(yè)務(wù)代碼進(jìn)行調(diào)用,真正的方法執(zhí)行是交給了BrokerServer。這個(gè)過(guò)程稱為 遠(yuǎn)程過(guò)程調(diào)用(Remote Procedure Call,簡(jiǎn)稱RPC)是一種計(jì)算機(jī)通信協(xié)議,它允許程序調(diào)用另一個(gè)地址空間(通常是不同的計(jì)算機(jī))中的過(guò)程或函數(shù),而無(wú)需程序員顯式編寫網(wǎng)絡(luò)代碼。通過(guò)使用RPC,應(yīng)用程序可以像調(diào)用本地進(jìn)程一樣調(diào)用遠(yuǎn)程服務(wù)器上的進(jìn)程。
5、小結(jié)
最后簡(jiǎn)單總結(jié)一下,我們大致需要做的工作,其中涉及到的細(xì)節(jié)問(wèn)題,我們后面在進(jìn)行補(bǔ)充:
- 實(shí)現(xiàn)生產(chǎn)者、BrokerServer、消費(fèi)者三個(gè)部分
- 針對(duì)生產(chǎn)者、消費(fèi)者來(lái)說(shuō),主要編寫客戶端和服務(wù)器的網(wǎng)絡(luò)通信部分。
- 重點(diǎn)實(shí)現(xiàn)BrokerServer,包括內(nèi)部的基本概念和核心 API。
- 數(shù)據(jù)的持久化存儲(chǔ)。
三、具體實(shí)現(xiàn)
附上連接:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-753599.html
1、消息隊(duì)列詳細(xì)設(shè)計(jì)與實(shí)現(xiàn)思維導(dǎo)圖
2、Gitee 完整代碼地址文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-753599.html
到了這里,關(guān)于手寫消息隊(duì)列(基于RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!