RocketMQ MQTT 概覽
傳統(tǒng)的消息隊(duì)列MQ主要應(yīng)用于服務(wù)(端)之間的消息通信,比如電商領(lǐng)域的交易消息、支付消息、物流消息等等。然而在消息這個(gè)大類下,還有一個(gè)非常重要且常見(jiàn)的消息領(lǐng)域,即IoT類終端設(shè)備消息。近些年,我們看到隨著智能家居、工業(yè)互聯(lián)而興起的面向IoT設(shè)備類的消息正在呈爆炸式增長(zhǎng),而且已經(jīng)發(fā)展十余年的移動(dòng)互聯(lián)網(wǎng)的手機(jī)APP端消息仍然是數(shù)量級(jí)龐大。面向終端設(shè)備的消息數(shù)量級(jí)比傳統(tǒng)服務(wù)端的消息要大很多量級(jí)并仍然在快速增長(zhǎng)。
如果可以有一個(gè)統(tǒng)一的消息系統(tǒng)(產(chǎn)品)來(lái)提供多場(chǎng)景計(jì)算(如stream、event)、多場(chǎng)景(IoT、APP)接入,其實(shí)是非常有價(jià)值的,因?yàn)橄⒁彩且环N重要數(shù)據(jù),數(shù)據(jù)如果只存在一個(gè)系統(tǒng)內(nèi),可以最大地降低存儲(chǔ)成本,同時(shí)可以有效地避免數(shù)據(jù)因在不同系統(tǒng)間同步帶來(lái)的一致性難題和挑戰(zhàn)。
基于此,我們引入了RocketMQ-MQTT這個(gè)擴(kuò)展項(xiàng)目來(lái)實(shí)現(xiàn)RocketMQ統(tǒng)一接入IoT設(shè)備和服務(wù)端的消息,提供一體化消息存儲(chǔ)和互通能力。
MQTT協(xié)議?
在IoT終端場(chǎng)景,目前業(yè)界廣泛使用的是MQTT協(xié)議,是起源于物聯(lián)網(wǎng)IoT場(chǎng)景,OASIS聯(lián)盟定義的標(biāo)準(zhǔn)的開放式協(xié)議。因?yàn)镮oT設(shè)備種類繁多,運(yùn)行環(huán)境各異,一個(gè)標(biāo)準(zhǔn)的接入?yún)f(xié)議尤為關(guān)鍵。
MQTT協(xié)議定義的是一個(gè)Pub/Sub的通信模型,這個(gè)與RocketMQ是類似的,不過(guò)其在訂閱方式上比較靈活,可以支持多級(jí)Topic訂閱(如 “/t/t1/t2”),甚至可以支持通配符訂閱(如 “/t/t1/+”)。
模型介紹?
隊(duì)列存儲(chǔ)模型?
我們?cè)O(shè)計(jì)了一種多維度分發(fā)的Topic隊(duì)列模型,如上圖所示,消息可以來(lái)自各個(gè)接入場(chǎng)景(如服務(wù)端的MQ/AMQP、客戶端的MQTT),但只會(huì)寫一份存到commitlog里面,然后分發(fā)出多個(gè)需求場(chǎng)景的隊(duì)列索引(ConsumerQueue),如服務(wù)端場(chǎng)景(MQ/AMQP)可以按照一級(jí)Topic隊(duì)列進(jìn)行傳統(tǒng)的服務(wù)端消費(fèi),客戶端MQTT場(chǎng)景可以按照MQTT多級(jí)Topic以及通配符訂閱進(jìn)行消費(fèi)消息。
這樣的一個(gè)隊(duì)列模型就可以同時(shí)支持服務(wù)端和終端場(chǎng)景的接入和消息收發(fā),達(dá)到一體化的目標(biāo)。
推拉模型?
上圖展示的是一個(gè)推拉模型,圖中的P節(jié)點(diǎn)是一個(gè)協(xié)議網(wǎng)關(guān)或broker插件,終端設(shè)備通過(guò)MQTT協(xié)議連到這個(gè)網(wǎng)關(guān)節(jié)點(diǎn)。消息可以來(lái)自多種場(chǎng)景(MQ/AMQP/MQTT)發(fā)送過(guò)來(lái),存到Topic隊(duì)列后會(huì)有一個(gè)notify邏輯模塊來(lái)實(shí)時(shí)感知這個(gè)新消息到達(dá),然后會(huì)生成消息事件(就是消息的Topic名稱),將該事件推送至網(wǎng)關(guān)節(jié)點(diǎn),網(wǎng)關(guān)節(jié)點(diǎn)根據(jù)其連上的終端設(shè)備訂閱情況進(jìn)行內(nèi)部匹配,找到哪些終端設(shè)備能匹配上,然后會(huì)觸發(fā)pull請(qǐng)求去存儲(chǔ)層讀取消息再推送至終端設(shè)備。
架構(gòu)概覽?
?我們的目標(biāo)是期望基于RocketMQ實(shí)現(xiàn)一體化且自閉環(huán),但不希望Broker被侵入更多場(chǎng)景邏輯,我們抽象了一個(gè)協(xié)議計(jì)算層,這個(gè)計(jì)算層可以是一個(gè)網(wǎng)關(guān),也可以是一個(gè)broker插件。Broker專注解決Queue的事情以及為了滿足上面的計(jì)算需求做一些Queue存儲(chǔ)的適配或改造。協(xié)議計(jì)算層負(fù)責(zé)協(xié)議接入,并且要可插拔部署。
RocketMQ MQTT 快速開始
系統(tǒng)要求?
- 64位操作系統(tǒng),推薦 Linux/Unix/macOS
- 64位 JDK 1.8+
部署說(shuō)明?
由于RocketMQ-MQTT項(xiàng)目依賴RocketMQ底層的多隊(duì)列分發(fā),RocketMQ從4.9.3版本開始支持這一特性,因此您需要確認(rèn)RocketMQ的版本升級(jí)到4.9.3或更高版本,并且確保以下配置項(xiàng)已開啟:
enableLmq = true
enableMultiDispatch = true
RocketMQ-MQTT的部署參考項(xiàng)目說(shuō)明,下載工程release版本或直接從源碼構(gòu)建。
git clone https://github.com/apache/rocketmq-mqtt
cd rocketmq-mqtt
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/
cd bin
sh mqtt.sh start
配置說(shuō)明?
username=xxx // 權(quán)限驗(yàn)證賬戶配置
secretKey=xxx // 權(quán)限驗(yàn)證賬戶配置
NAMESRV_ADDR=xxx //namesrv接入點(diǎn)
eventNotifyRetryTopic=xx //notify重試topic,提前創(chuàng)建
clientRetryTopic=xx //客戶端消息重試topic,提前創(chuàng)建
其他啟動(dòng)配置參考項(xiàng)目README.md文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-686024.html
示例說(shuō)明?
RocketMQ-MQTT項(xiàng)目工程代碼里面提供了基本的example代碼,詳見(jiàn)代碼示例。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-686024.html
MqttConsumer.java // MQTT客戶端啟動(dòng)訂閱消息
MqttProducer.java // MQTT客戶端啟動(dòng)發(fā)布消息
RocketMQConsumer.java //RocketMQ客戶端啟動(dòng)訂閱消息
RocketMQProducer.java // RocketMQ客戶端啟動(dòng)發(fā)布消息
到了這里,關(guān)于RocketMQ-(9-1)-MQTT-EventBridge概述的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!