前言
MQTT是一個(gè)極其輕量級(jí)的發(fā)布/訂閱
消息傳輸協(xié)議,適用于網(wǎng)絡(luò)帶寬較低的場(chǎng)合.
它通過(guò)一個(gè)代理服務(wù)器(broker),任何一個(gè)客戶端(client)都可以訂閱或者發(fā)布某個(gè)主題的消息,然后訂閱了該主題的客戶端則會(huì)收到該消息
業(yè)務(wù)場(chǎng)景
硬件采集的數(shù)據(jù)傳入EMQX平臺(tái)(采用MQTT協(xié)議),java通過(guò)代碼連接MQTT服務(wù)器,進(jìn)行采集數(shù)據(jù)接收、解析、業(yè)務(wù)處理、存儲(chǔ)入庫(kù)、數(shù)據(jù)展示。
MQTT 是基于?發(fā)布(Publish)/訂閱(Subscribe)?模式來(lái)進(jìn)行通信及數(shù)據(jù)交換的。
什么是MQTT
MQTT是基于二進(jìn)制消息的發(fā)布/訂閱編程模式的消息協(xié)議,最早由IBM提出的,如今已經(jīng)成為OASIS規(guī)范。由于規(guī)范很簡(jiǎn)單,非常適合需要低功耗和網(wǎng)絡(luò)帶寬有限的IoT場(chǎng)景,比如:
- 遙感數(shù)據(jù)
- 汽車
- 智能家居
- 智慧城市
- 醫(yī)療醫(yī)護(hù)
由于物聯(lián)網(wǎng)的環(huán)境是非常特別的,所以MQTT遵循以下設(shè)計(jì)原則:
- 精簡(jiǎn),不添加可有可無(wú)的功能。
- 發(fā)布/訂閱(Pub/Sub)模式,方便消息在傳感器之間傳遞。
- 允許用戶動(dòng)態(tài)創(chuàng)建主題,零運(yùn)維成本。
- 把傳輸量降到最低以提高傳輸效率。
- 把低帶寬、高延遲、不穩(wěn)定的網(wǎng)絡(luò)等因素考慮在內(nèi)。
- 支持連續(xù)的會(huì)話控制。
- 理解客戶端計(jì)算能力可能很低。
- 提供服務(wù)質(zhì)量管理。
- 假設(shè)數(shù)據(jù)不可知,不強(qiáng)求傳輸數(shù)據(jù)的類型與格式,保持靈活性。
MQTT與HTTP的區(qū)別
首先看一下HTTP請(qǐng)求:
- HTTP 是一種同步協(xié)議??蛻舳诵枰却?wù)器響應(yīng)。Web 瀏覽器具有這樣的要求,但它的代價(jià)是犧牲了可伸縮性。
- HTTP 是單向的??蛻舳吮仨毎l(fā)起連接。
- HTTP 是一種 1-1 協(xié)議??蛻舳税l(fā)出請(qǐng)求,服務(wù)器進(jìn)行響應(yīng)。將消息傳送到網(wǎng)絡(luò)上的所有設(shè)備上,不但很困難,而且成本很高。
- HTTP 是一種有許多標(biāo)頭和規(guī)則的重量級(jí)協(xié)議。它不適合受限的網(wǎng)絡(luò)。
再來(lái)看一下MQTT:?
????????在 IoT 領(lǐng)域,大量設(shè)備以及很可能不可靠或高延遲的網(wǎng)絡(luò)使得同步通信成為問(wèn)題。異步消息協(xié)議更適合 IoT 應(yīng)用程序。傳感器發(fā)送讀數(shù),讓網(wǎng)絡(luò)確定將其傳送到目標(biāo)設(shè)備和服務(wù)的最佳路線和時(shí)間。在 IoT 應(yīng)用程序中,設(shè)備或傳感器通常是客戶端,這意味著它們無(wú)法被動(dòng)地接收來(lái)自網(wǎng)絡(luò)的命令。
MQTT的核心: 發(fā)布和訂閱模型?
MQTT 協(xié)議在網(wǎng)絡(luò)中定義了兩種實(shí)體類型:一個(gè)消息代理和一些客戶端。
代理是一個(gè)服務(wù)器,它從客戶端接收所有消息,然后將這些消息路由到相關(guān)的目標(biāo)客戶端。
客戶端是能夠與代理交互來(lái)發(fā)送和接收消息的任何事物??蛻舳丝梢允乾F(xiàn)場(chǎng)的 IoT 傳感器,或者是數(shù)據(jù)中心內(nèi)處理 IoT 數(shù)據(jù)的應(yīng)用程序。
- 客戶端連接到代理。它可以訂閱代理中的任何消息“主題”。此連接可以是簡(jiǎn)單的 TCP/IP 連接,也可以是用于發(fā)送敏感消息的加密 TLS 連接。
- 客戶端通過(guò)將消息和主題發(fā)送給代理,發(fā)布某個(gè)主題范圍內(nèi)的消息。
- 代理然后將消息轉(zhuǎn)發(fā)給所有訂閱該主題的客戶端。
????????同時(shí),MQTT 是輕量級(jí)的。它有一個(gè)用來(lái)指定消息類型的簡(jiǎn)單標(biāo)頭,有一個(gè)基于文本的主題,還有一個(gè)任意的二進(jìn)制有效負(fù)載。應(yīng)用程序可對(duì)有效負(fù)載采用任何數(shù)據(jù)格式,比如 JSON、XML、加密二進(jìn)制或 Base64,只要目標(biāo)客戶端能夠解析該有效負(fù)載。
Java實(shí)例
1.通過(guò)包管理工具 Maven導(dǎo)入依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
?2.編寫訂閱方的代碼,并啟動(dòng)。
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 訂閱方 */ public class SubscribeSample { public static void main(String[] args) { //EMQ X 默認(rèn)端口 1883 String broker = "tcp://broker.emqx.io:1883"; String TOPIC = "test"; int qos = 1; String clientid = "subClient"; String userName = "admin"; String passWord = "password"; try { // host為主機(jī)名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存 MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); // MQTT的連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 options.setCleanSession(true); // 設(shè)置連接的用戶名 options.setUserName(userName); // 設(shè)置連接的密碼 options.setPassword(passWord.toCharArray()); // 設(shè)置超時(shí)時(shí)間 單位為秒 options.setConnectionTimeout(10); // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制 options.setKeepAliveInterval(20); // 設(shè)置回調(diào)函數(shù) client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost"); } public void messageArrived(String topic, MqttMessage message) { System.out.println("======監(jiān)聽到來(lái)自[" + topic + "]的消息======"); System.out.println("message content:"+new String(message.getPayload())); System.out.println("============"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------"+ token.isComplete()); } }); // 建立連接 System.out.println("連接到 broker: " + broker); client.connect(options); System.out.println("連接成功."); //訂閱消息 client.subscribe(TOPIC, qos); System.out.println("開始監(jiān)聽" + TOPIC); } catch (Exception e) { e.printStackTrace(); } } }
啟動(dòng)訂閱方運(yùn)行結(jié)果如下:
?
3.編寫發(fā)布方的代碼并啟動(dòng)
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 發(fā)布方 */ public class PublishSample { public static void main(String[] args) { String topic = "test"; String content = "你好,我給你發(fā)了條消息呀?。。。。。。。。。?!"; int qos = 1; String broker = "tcp://broker.emqx.io:1883"; String userName = "admin"; String password = "password"; String clientId = "pubClient"; // 內(nèi)存存儲(chǔ) MemoryPersistence persistence = new MemoryPersistence(); try { // 創(chuàng)建客戶端 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); // 創(chuàng)建鏈接參數(shù) MqttConnectOptions connOpts = new MqttConnectOptions(); // 在重新啟動(dòng)和重新連接時(shí)記住狀態(tài) connOpts.setCleanSession(false); // 設(shè)置連接的用戶名 connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); // 建立連接 System.out.println("連接到 broker: " + broker); sampleClient.connect(connOpts); System.out.println("連接成功."); // 創(chuàng)建消息 MqttMessage message = new MqttMessage(content.getBytes()); // 設(shè)置消息的服務(wù)質(zhì)量 message.setQos(qos); // 發(fā)布消息 System.out.println("向" + topic + "發(fā)送消息:" + message); sampleClient.publish(topic, message); // 斷開連接 sampleClient.disconnect(); // 關(guān)閉客戶端 sampleClient.close(); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
?
啟動(dòng)發(fā)布方運(yùn)行結(jié)果如下:
4.最后查看訂閱方的控制臺(tái),訂閱方收到消息文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-698431.html
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-698431.html
到了這里,關(guān)于Java實(shí)踐-物聯(lián)網(wǎng)loT入門-MQTT傳輸協(xié)議的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!