
MQTT 發(fā)布/訂閱模式
發(fā)布訂閱模式(Publish-Subscribe Pattern)是一種消息傳遞模式,它將發(fā)送消息的客戶端(發(fā)布者)與接收消息的客戶端(訂閱者)解耦,使得兩者不需要建立直接的聯(lián)系也不需要知道對方的存在。
MQTT 發(fā)布/訂閱模式的精髓在于由一個被稱為代理(Broker)的中間角色負(fù)責(zé)所有消息的路由和分發(fā)工作,發(fā)布者將帶有主題的消息發(fā)送給代理,訂閱者則向代理訂閱主題來接收感興趣的消息。
在 MQTT 中,主題和訂閱無法被提前注冊或創(chuàng)建,所以代理也無法預(yù)知某一個主題之后是否會有訂閱者,以及會有多少訂閱者,所以只能將消息轉(zhuǎn)發(fā)給當(dāng)前的訂閱者,如果當(dāng)前不存在任何訂閱,那么消息將被直接丟棄。
MQTT 發(fā)布/訂閱模式有 4 個主要組成部分:發(fā)布者、訂閱者、代理和主題。
- 發(fā)布者(Publisher)
負(fù)責(zé)將消息發(fā)布到主題上,發(fā)布者一次只能向一個主題發(fā)送數(shù)據(jù),發(fā)布者發(fā)布消息時也無需關(guān)心訂閱者是否在線。 - 訂閱者(Subscriber)
訂閱者通過訂閱主題接收消息,且可一次訂閱多個主題。MQTT 還支持通過共享訂閱的方式在多個訂閱者之間實現(xiàn)訂閱的負(fù)載均衡。 - 代理(Broker)
負(fù)責(zé)接收發(fā)布者的消息,并將消息轉(zhuǎn)發(fā)至符合條件的訂閱者。另外,代理也需要負(fù)責(zé)處理客戶端發(fā)起的連接、斷開連接、訂閱、取消訂閱等請求。 - 主題(Topic)
主題是 MQTT 進(jìn)行消息路由的基礎(chǔ),它類似 URL 路徑,使用斜杠 / 進(jìn)行分層,比如sensor/1/temperature
。一個主題可以有多個訂閱者,代理會將該主題下的消息轉(zhuǎn)發(fā)給所有訂閱者;一個主題也可以有多個發(fā)布者,代理將按照消息到達(dá)的順序轉(zhuǎn)發(fā)。
MQTT 還支持訂閱者使用主題通配符一次訂閱多個主題。
MQTT 發(fā)布/訂閱中的消息路由
在 MQTT 發(fā)布/訂閱模式中,一個客戶端既可以是發(fā)布者,也可以是訂閱者,也可以同時具備這兩個身份。 當(dāng)客戶端發(fā)布一條消息時,它會被發(fā)送到代理,然后代理將消息路由到該主題的所有訂閱者。 當(dāng)客戶端訂閱一個主題時,它會收到代理轉(zhuǎn)發(fā)到該主題的所有消息。
一般來說,大多數(shù)發(fā)布/訂閱系統(tǒng)主要通過以下兩種方式過濾并路由消息。
-
根據(jù)主題
訂閱者向代理訂閱自己感興趣的主題,發(fā)布者發(fā)布的所有消息中都會包含自己的主題,代理根據(jù)消息的主題判斷需要將消息轉(zhuǎn)發(fā)給哪些訂閱者。 -
根據(jù)消息內(nèi)容
訂閱者定義其感興趣的消息的條件,只有當(dāng)消息的屬性或內(nèi)容滿足訂閱者定義的條件時,消息才會被投遞到該訂閱者。
MQTT 協(xié)議是基于主題進(jìn)行消息路由的,在這個基礎(chǔ)上,EMQX 從 3.1 版本開始通過基于 SQL 的規(guī)則引擎提供了額外的按消息內(nèi)容進(jìn)行路由的能力。
MQTT 與 HTTP 請求響應(yīng)
HTTP 是萬維網(wǎng)數(shù)據(jù)通信的基礎(chǔ),其簡單易用無客戶端依賴,被廣泛應(yīng)用于各個行業(yè)。在物聯(lián)網(wǎng)領(lǐng)域,HTTP 也可以用于連接物聯(lián)網(wǎng)設(shè)備和 Web 服務(wù)器,實現(xiàn)設(shè)備的遠(yuǎn)程監(jiān)控和控制。
雖然使用簡單、開發(fā)周期短,但是基于請求響應(yīng)的 HTTP 在物聯(lián)網(wǎng)領(lǐng)域的應(yīng)用卻有一定的局限性。首先,協(xié)議層面 HTTP 報文相較與 MQTT 需要占用更多的網(wǎng)絡(luò)開銷;其次,HTTP 是一種無狀態(tài)協(xié)議,這意味著服務(wù)器在處理請求時不會記錄客戶端的狀態(tài),也無法實現(xiàn)從連接異常斷開中恢復(fù);最后,請求響應(yīng)模式需要通過輪詢才能獲取數(shù)據(jù)更新,而 MQTT 通過訂閱即可獲取實時數(shù)據(jù)更新。
發(fā)布訂閱模式的松耦合特性,也給 MQTT 帶來了一些副作用。由于發(fā)布者并不知曉訂閱者的狀態(tài),因此發(fā)布者也無法得知訂閱者是否收到了消息,或者是否正確處理了消息。為此,MQTT 5.0 增加了請求響應(yīng)特性,以實現(xiàn)訂閱者收到消息后向某個主題發(fā)送應(yīng)答,發(fā)布者收到應(yīng)答后再進(jìn)行后續(xù)操作。
MQTT 與消息隊列
盡管 MQTT 與消息隊列的很多行為和特性非常接近,比如都采用發(fā)布/訂閱模式,但是他們面向的場景卻有著顯著的不同。消息隊列主要用于服務(wù)端應(yīng)用之間的消息存儲與轉(zhuǎn)發(fā),這類場景往往數(shù)據(jù)量大但客戶端數(shù)量少。MQTT 是一種消息傳輸協(xié)議,主要用于物聯(lián)網(wǎng)設(shè)備之間的消息傳遞,這類場景的特點是海量的設(shè)備接入、管理與消息傳輸。
在一些實際的應(yīng)用場景中,MQTT 與消息隊列往往會被結(jié)合起來使用,以使 MQTT 服務(wù)器能專注于處理設(shè)備的連接與設(shè)備間的消息路由。比如先由 MQTT 服務(wù)器接收物聯(lián)網(wǎng)設(shè)備上報的數(shù)據(jù),然后再通過消息隊列將這些數(shù)據(jù)轉(zhuǎn)發(fā)到不同的業(yè)務(wù)系統(tǒng)進(jìn)行處理。
不同于消息隊列,MQTT 主題不需要提前創(chuàng)建。MQTT 客戶端在訂閱或發(fā)布時即自動的創(chuàng)建了主題,開發(fā)者無需再關(guān)心主題的創(chuàng)建,并且也不需要手動刪除主題。
Paho Java 使用示例
通過包管理工具 Maven 可以方便地安裝 Paho Java 客戶端庫,截止目前最新版本安裝如下:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
Java 體系中 Paho Java 是比較穩(wěn)定、廣泛應(yīng)用的 MQTT 客戶端庫,本示例包含 Java 語言的 Paho Java 連接 EMQX Broker,并進(jìn)行消息收發(fā)完整代碼:
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 連接選項
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// 保留會話
connOpts.setCleanSession(true);
// 設(shè)置回調(diào)
client.setCallback(new PushCallback());
// 建立連接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 訂閱
client.subscribe(subTopic);
// 消息發(fā)布所需參數(shù)
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
回調(diào)消息處理類 OnMessageCallback.java
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進(jìn)行重連
System.out.println("連接斷開,可以做重連");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執(zhí)行到這里面
System.out.println("接收消息主題:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息內(nèi)容:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
結(jié)語
MQTT 的發(fā)布/訂閱機(jī)制可以很輕易地滿足我們一對一、一對多、多對一的通信需要。這也在很大程度上拓寬了 MQTT 在 IoT 領(lǐng)域之外的應(yīng)用,像網(wǎng)絡(luò)直播互動、手機(jī)消息推送等行業(yè)場景,都非常適合使用 MQTT。文章來源:http://www.zghlxwxcb.cn/news/detail-777281.html
鏈接來源:https://www.emqx.com/zh/blog/mqtt-5-introduction-to-publish-subscribe-model文章來源地址http://www.zghlxwxcb.cn/news/detail-777281.html
到了這里,關(guān)于開啟物聯(lián)網(wǎng)的魔法之門 - 深入探索發(fā)布/訂閱模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!