一、MQTT介紹
1.1 什么是MQTT?
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。
MQTT最大優(yōu)點(diǎn)在于用極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。
MQTT具有協(xié)議簡(jiǎn)潔、輕巧、可擴(kuò)展性強(qiáng)、低開銷、低帶寬占用等優(yōu)點(diǎn),已經(jīng)有PHP,JAVA,Python,C,C#,Go等多個(gè)語言版本,基本可以使用在任何平臺(tái)上。在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用,特別適合用來當(dāng)做物聯(lián)網(wǎng)的通信協(xié)議。
1.2 MQTT特點(diǎn)
MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。
MQTT協(xié)議是為硬件性能有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:
- 1.使用發(fā)布/訂閱消息模式,提供多對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
- 2.對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;
- 3.使用TCP/IP 提供網(wǎng)絡(luò)連接;
- 4.支持三種消息發(fā)布服務(wù)質(zhì)量(QoS):
-
- QoS 0(最多一次):消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這個(gè)級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次數(shù)據(jù)無所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
- QoS 1(至少一次):確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
- QoS 2(只有一次):確保消息到達(dá)一次。這個(gè)級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
- 5.傳輸數(shù)據(jù)小,開銷很?。ü潭ㄩL(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;(用極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。)
1.3 MQTT應(yīng)用場(chǎng)景
MQTT作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有著廣泛的應(yīng)用。MQTT服務(wù)只負(fù)責(zé)消息的接收和傳遞,應(yīng)用系統(tǒng)連接到MQTT服務(wù)器后,可以實(shí)現(xiàn)采集數(shù)據(jù)接收、解析、業(yè)務(wù)處理、存儲(chǔ)入庫(kù)、數(shù)據(jù)展示等功能。常見的應(yīng)用場(chǎng)景主要有以下幾個(gè)方面:
(1)消息推送: 如PC端的推送公告,比如安卓的推送服務(wù),還有一些即時(shí)通信軟件如微信、易信等也是采用的推送技術(shù)。
(2)智能點(diǎn)餐: 通過MQTT消息隊(duì)列產(chǎn)品,消費(fèi)者可在餐桌上掃碼點(diǎn)餐,并與商家后端系統(tǒng)連接實(shí)現(xiàn)自助下單、支付。
(3)信息更新: 實(shí)現(xiàn)商場(chǎng)超市等場(chǎng)所的電子標(biāo)簽、公共場(chǎng)所的多媒體屏幕的顯示更新管理。
(4)掃碼出站: 最常見的停車場(chǎng)掃碼繳費(fèi),自動(dòng)起竿;地鐵閘口掃碼進(jìn)出站。
二、MQTT的角色組成
2.1 MQTT的客戶端和服務(wù)端
2.1.1 服務(wù)端(Broker)
EMQX就是一個(gè)MQTT的Broker,emqx只是基于erlang語言開發(fā)的軟件而已,其它的MQ還有ActiveMQ、RabbitMQ、HiveMQ等等。
EMQX服務(wù)端:下載 EMQX
2.1.2 客戶端(發(fā)布/訂閱)
EMQX客戶端:MQTTX:全功能 MQTT 客戶端工具
這個(gè)是用來測(cè)試驗(yàn)證的客戶端,實(shí)際項(xiàng)目是通過代碼來實(shí)現(xiàn)我們消息的生產(chǎn)者和消費(fèi)者。
2.2 MQTT中的幾個(gè)概念
相比RabbitMQ等消息隊(duì)列,MQTT要相對(duì)簡(jiǎn)單一些,只有Broker、Topic、發(fā)布者、訂閱者等幾部分構(gòu)成。接下來我們先簡(jiǎn)單整理下MQTT日常使用中最常見的幾個(gè)概念:
- 1.Topic主題:MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進(jìn)行.業(yè)務(wù)邏輯處理, 主題是消息的通道
- 2.生產(chǎn)者:MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
- 3.消費(fèi)者:MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
- 4.broker服務(wù):消息轉(zhuǎn)發(fā)器, 消息是通過它來承載的, EMQX就是我們的broker, 在使用中我們不用關(guān)心它的具體實(shí)現(xiàn)
其實(shí), MQTT的使用流程就是: 生產(chǎn)者給broker的某個(gè)topic發(fā)消息->broker通過topic進(jìn)行消息的傳遞->訂閱該主題的消費(fèi)者拿到消息并進(jìn)行相應(yīng)的業(yè)務(wù)邏輯
三、EMQX的安裝和使用
下面以Windows為例,演示W(wǎng)indows下如何安裝和使用EXQX。
step 1:下載EMQ安裝包,配置EMQ環(huán)境
EMQX服務(wù)端:下載 EMQX
step 2:下載壓縮包解壓,cmd進(jìn)入bin文件夾
step 3:?jiǎn)?dòng)EMQX服務(wù)
在命令行輸入:emqx start
啟動(dòng)服務(wù),打卡瀏覽器輸入:http://localhost:18083/ 進(jìn)入登錄頁(yè)面。默認(rèn)用戶名密碼 admin/public 。登錄成功后,會(huì)進(jìn)入emqx的后臺(tái)管理頁(yè)面,如下圖所示:
四、使用SpringBoot整合MQTT協(xié)議
前面介紹了MQTT協(xié)議以及如何安裝和啟動(dòng)MQTT服務(wù)。接下來演示如何在SpringBoot項(xiàng)目中整合MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布。
4.1 創(chuàng)建工程
首先,創(chuàng)建spring-boot-starter-mqtt
父工程,在父工程下分別創(chuàng)建消息的提供者spring-boot-starter-mqtt-provider
模塊和消息的消費(fèi)者spring-boot-starter-mqtt-consumer
模塊。
4.2 實(shí)現(xiàn)生產(chǎn)者
接下來,修改生產(chǎn)者模塊spring-boot-starter-mqtt-provider
相關(guān)的代碼,實(shí)現(xiàn)消息發(fā)布的功能模塊。
4.2.1 導(dǎo)入依賴包
修改pom.xml 文件,添加MQTT相關(guān)依賴,具體示例代碼如下所示:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency> </dependencies>
4.2.2 修改配置文件
修改application.yml配置文件,增加MQTT相關(guān)配置。示例代碼如下所示:
spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服務(wù)地址,端口號(hào)默認(rèn)11883,如果有多個(gè),用逗號(hào)隔開 url: tcp://127.0.0.1:11883 #用戶名 username: admin #密碼 password: public #客戶端id(不能重復(fù)) client: id: provider-id #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口是指定 default: topic: topic server: port: 8080
4.2.3 消息生產(chǎn)者客戶端配置
創(chuàng)建MqttProviderConfig配置類,讀取application.yml中的相關(guān)配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration @Slf4j public class MqttProviderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客戶端對(duì)象 */ private MqttClient client; /** * 在bean初始化后連接到服務(wù)器 */ @PostConstruct public void init(){ connect(); } /** * 客戶端連接服務(wù)端 */ public void connect(){ try{ //創(chuàng)建MQTT客戶端對(duì)象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息 //設(shè)置為true表示每次連接服務(wù)器都是以新的身份 options.setCleanSession(true); //設(shè)置連接用戶名 options.setUserName(username); //設(shè)置連接密碼 options.setPassword(password.toCharArray()); //設(shè)置超時(shí)時(shí)間,單位為秒 options.setConnectionTimeout(100); //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false); //設(shè)置回調(diào) client.setCallback(new MqttProviderCallBack()); client.connect(options); } catch(MqttException e){ e.printStackTrace(); } } public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主題的目的地,用于發(fā)布/訂閱信息 MqttTopic mqttTopic = client.getTopic(topic); //提供一種機(jī)制來跟蹤消息的傳遞進(jìn)度 //用于在以非阻塞方式(在后臺(tái)運(yùn)行)執(zhí)行發(fā)布是跟蹤消息的傳遞進(jìn)度 MqttDeliveryToken token; try { //將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài) //一旦此方法干凈地返回,消息就已被客戶端接受發(fā)布,當(dāng)連接可用,將在后臺(tái)完成消息傳遞。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
4.2.4 生產(chǎn)者客戶端消息回調(diào)
創(chuàng)建MqttProviderCallBack類并繼承MqttCallback,實(shí)現(xiàn)相關(guān)消息回調(diào)事件,示例代碼如下圖所示:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback{ /** * 客戶端斷開連接的回調(diào) */ @Override public void connectionLost(Throwable throwable) { System.out.println("與服務(wù)器斷開連接,可重連"); } /** * 消息到達(dá)的回調(diào) */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主題 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息發(fā)布成功的回調(diào) */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("接收消息成功")); } }
4.2.5 創(chuàng)建Controller控制器實(shí)現(xiàn)消息發(fā)布功能
創(chuàng)建SendController控制器類,實(shí)現(xiàn)消息的發(fā)送功能,示例代碼如下所示:
@Controller public class SendController { @Autowired private MqttProviderConfig providerClient; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { providerClient.publish(qos, retained, topic, message); return "發(fā)送成功"; } catch (Exception e) { e.printStackTrace(); return "發(fā)送失敗"; } } }
4.3 實(shí)現(xiàn)消費(fèi)者
前面完成了生成者消息發(fā)布的模塊,接下來修改消費(fèi)者模塊spring-boot-starter-mqtt-consumer
實(shí)現(xiàn)消息訂閱、處理的功能。
4.3.1 導(dǎo)入依賴包
修改pom.xml 文件,添加MQTT相關(guān)依賴,具體示例代碼如下所示:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency> </dependencies>
4.3.2 修改配置文件
修改application.yml配置文件,增加MQTT相關(guān)配置。示例代碼如下所示:
spring: application: name: consumer #MQTT配置信息 mqtt: #MQTT服務(wù)端地址,端口默認(rèn)為11883,如果有多個(gè),用逗號(hào)隔開 url: tcp://127.0.0.1:11883 #用戶名 username: admin #密碼 password: public #客戶端id(不能重復(fù)) client: id: consumer-id #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口時(shí)指定 default: topic: topic server: port: 8085
4.3.3 消費(fèi)者客戶端配置
創(chuàng)建消費(fèi)者客戶端配置類MqttConsumerConfig,讀取application.yml中的相關(guān)配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:
import javax.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration public class MqttConsumerConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客戶端對(duì)象 */ private MqttClient client; /** * 在bean初始化后連接到服務(wù)器 */ @PostConstruct public void init(){ connect(); } /** * 客戶端連接服務(wù)端 */ public void connect(){ try { //創(chuàng)建MQTT客戶端對(duì)象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息 //設(shè)置為true表示每次連接到服務(wù)端都是以新的身份 options.setCleanSession(true); //設(shè)置連接用戶名 options.setUserName(username); //設(shè)置連接密碼 options.setPassword(password.toCharArray()); //設(shè)置超時(shí)時(shí)間,單位為秒 options.setConnectionTimeout(100); //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false); //設(shè)置回調(diào) client.setCallback(new MqttConsumerCallBack()); client.connect(options); //訂閱主題 //消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶端推送消息 int[] qos = {1,1}; //主題 String[] topics = {"topic1","topic2"}; //訂閱主題 client.subscribe(topics,qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 斷開連接 */ public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱主題 */ public void subscribe(String topic,int qos){ try { client.subscribe(topic,qos); } catch (MqttException e) { e.printStackTrace(); } } }
4.3.4 消費(fèi)者客戶端消息回調(diào)
創(chuàng)建MqttConsumerCallBack類并繼承MqttCallback,實(shí)現(xiàn)相關(guān)消息回調(diào)事件,示例代碼如下圖所示:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback{ /** * 客戶端斷開連接的回調(diào) */ @Override public void connectionLost(Throwable throwable) { System.out.println("與服務(wù)器斷開連接,可重連"); } /** * 消息到達(dá)的回調(diào) */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主題 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息發(fā)布成功的回調(diào) */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("接收消息成功")); } }
4.3.5 創(chuàng)建Controller控制器,實(shí)現(xiàn)MQTT連接的建立和斷開
接下來,創(chuàng)建Controller控制器MqttController,并實(shí)現(xiàn)MQTT連接的建立和斷開等方法。示例代碼如下所示:
import com.weiz.mqtt.config.MqttConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class MqttController { @Autowired private MqttConsumerConfig client; @Value("${spring.mqtt.client.id}") private String clientId; @RequestMapping("/connect") @ResponseBody public String connect(){ client.connect(); return clientId + "連接到服務(wù)器"; } @RequestMapping("/disConnect") @ResponseBody public String disConnect(){ client.disConnect(); return clientId + "與服務(wù)器斷開連接"; } }
4.4 測(cè)試驗(yàn)證
首先,分別啟動(dòng)生產(chǎn)者spring-boot-starter-mqtt-provider
和消費(fèi)者spring-boot-starter-mqtt-consume
r兩個(gè)項(xiàng)目,打開瀏覽器,輸入地址http://localhost:18083/,在EMQX管理界面可以看到連接上來的兩個(gè)客戶端。如下圖所示:
接下來,調(diào)用生產(chǎn)者的消息發(fā)布接口驗(yàn)證消息發(fā)布是否成功。使用Pomstman調(diào)用消息發(fā)送接口:http://localhost:8080/sendMessage ,如下圖所示:
通過上圖可以發(fā)現(xiàn),生產(chǎn)者模塊已經(jīng)把消息發(fā)送成功。接下來查看消費(fèi)者模塊,驗(yàn)證消息是否處理成功。如下圖所示:
文章來源:http://www.zghlxwxcb.cn/news/detail-693343.html
通過日志輸出可以發(fā)現(xiàn),消費(fèi)者已經(jīng)成功接收到生產(chǎn)者發(fā)送的消息,說明我們成功實(shí)現(xiàn)在Spring Boot項(xiàng)目中整合MQTT實(shí)現(xiàn)了消息的發(fā)布和訂閱的功能。文章來源地址http://www.zghlxwxcb.cn/news/detail-693343.html
到了這里,關(guān)于MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!