一文搞懂MQTT,如何在SpringBoot中使用MQTT實現(xiàn)消息的訂閱和發(fā)布
簡介: 之前介紹了RabbitMQ以及如何在SpringBoot項目中整合使用RabbitMQ,看過的朋友都說寫的比較詳細,希望再總結一下目前比較流行的MQTT。所以接下來,就來介紹什么MQTT?它在IoT中有著怎樣的作用?如何在項目中使用MQTT?
之前介紹了RabbitMQ以及如何在SpringBoot項目中整合使用RabbitMQ,看過的朋友都說寫的比較詳細,希望再總結一下目前比較流行的MQTT。所以接下來,就來介紹什么MQTT?它在IoT中有著怎樣的作用?如何在項目中使用MQTT?
一、MQTT介紹
1.1 什么是MQTT?
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級”通訊協(xié)議,該協(xié)議構建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。
MQTT最大優(yōu)點在于用極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。
MQTT具有協(xié)議簡潔、輕巧、可擴展性強、低開銷、低帶寬占用等優(yōu)點,已經(jīng)有PHP,JAVA,Python,C,C#,Go等多個語言版本,基本可以使用在任何平臺上。在物聯(lián)網(wǎng)、小型設備、移動應用等方面有較廣泛的應用,特別適合用來當做物聯(lián)網(wǎng)的通信協(xié)議。
1.2 MQTT特點
MQTT是一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)。
MQTT協(xié)議是為硬件性能有限,且工作在低帶寬、不可靠的網(wǎng)絡的遠程傳感器和控制設備通訊而設計的協(xié)議,它具有以下主要的幾項特性:
-
1.使用發(fā)布/訂閱消息模式,提供多對多的消息發(fā)布,解除應用程序耦合;
-
2.對負載內容屏蔽的消息傳輸;
-
3.使用TCP/IP 提供網(wǎng)絡連接;
-
4.支持三種消息發(fā)布服務質量(QoS):
-
QoS 0(最多一次):消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡。會發(fā)生消息丟失或重復。這個級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次數(shù)據(jù)無所謂,因為不久后還會有第二次發(fā)送。
-
QoS 1(至少一次):確保消息到達,但消息重復可能會發(fā)生。
-
QoS 2(只有一次):確保消息到達一次。這個級別可用于如下情況,在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。
-
5.傳輸數(shù)據(jù)小,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡流量;(用極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。)
1.3 MQTT應用場景
MQTT作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設備、移動應用等方面有著廣泛的應用。MQTT服務只負責消息的接收和傳遞,應用系統(tǒng)連接到MQTT服務器后,可以實現(xiàn)采集數(shù)據(jù)接收、解析、業(yè)務處理、存儲入庫、數(shù)據(jù)展示等功能。常見的應用場景主要有以下幾個方面:
(1)消息推送: 如PC端的推送公告,比如安卓的推送服務,還有一些即時通信軟件如微信、易信等也是采用的推送技術。
(2)智能點餐: 通過MQTT消息隊列產(chǎn)品,消費者可在餐桌上掃碼點餐,并與商家后端系統(tǒng)連接實現(xiàn)自助下單、支付。
(3)信息更新: 實現(xiàn)商場超市等場所的電子標簽、公共場所的多媒體屏幕的顯示更新管理。
(4)掃碼出站: 最常見的停車場掃碼繳費,自動起竿;地鐵閘口掃碼進出站。
二、MQTT的角色組成
2.1 MQTT的客戶端和服務端
2.1.1 服務端(Broker)
EMQX就是一個MQTT的Broker,emqx只是基于erlang語言開發(fā)的軟件而已,其它的MQ還有ActiveMQ、RabbitMQ、HiveMQ等等。
EMQX服務端:https://www.emqx.io/zh/downloads?os=Windows
2.1.2 客戶端(發(fā)布/訂閱)
EMQX客戶端:https://mqttx.app/zh
這個是用來測試驗證的客戶端,實際項目是通過代碼來實現(xiàn)我們消息的生產(chǎn)者和消費者。
2.2 MQTT中的幾個概念
相比RabbitMQ等消息隊列,MQTT要相對簡單一些,只有Broker、Topic、發(fā)布者、訂閱者等幾部分構成。接下來我們先簡單整理下MQTT日常使用中最常見的幾個概念:
- 1.Topic主題:MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進行.業(yè)務邏輯處理, 主題是消息的通道
- 2.生產(chǎn)者:MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
- 3.消費者:MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
- 4.broker服務:消息轉發(fā)器, 消息是通過它來承載的, EMQX就是我們的broker, 在使用中我們不用關心它的具體實現(xiàn)
其實, MQTT的使用流程就是: 生產(chǎn)者給broker的某個topic發(fā)消息->broker通過topic進行消息的傳遞->訂閱該主題的消費者拿到消息并進行相應的業(yè)務邏輯
三、EMQX的安裝和使用
下面以Windows為例,演示W(wǎng)indows下如何安裝和使用EXQX。
step 1:下載EMQ安裝包,配置EMQ環(huán)境
EMQX服務端:https://www.emqx.io/zh/downloads?os=Windows
step 2:下載壓縮包解壓,cmd進入bin文件夾
step 3:啟動EMQX服務
在命令行輸入:emqx start
啟動服務,打卡瀏覽器輸入:http://localhost:18083/ 進入登錄頁面。默認用戶名密碼 admin/public 。登錄成功后,會進入emqx的后臺管理頁面,如下圖所示:
四、使用SpringBoot整合MQTT協(xié)議
前面介紹了MQTT協(xié)議以及如何安裝和啟動MQTT服務。接下來演示如何在SpringBoot項目中整合MQTT實現(xiàn)消息的訂閱和發(fā)布。
4.1 創(chuàng)建工程
首先,創(chuàng)建spring-boot-starter-mqtt
父工程,在父工程下分別創(chuàng)建消息的提供者spring-boot-starter-mqtt-provider
模塊和消息的消費者spring-boot-starter-mqtt-consumer
模塊。
4.2 實現(xiàn)生產(chǎn)者
接下來,修改生產(chǎn)者模塊spring-boot-starter-mqtt-provider
相關的代碼,實現(xiàn)消息發(fā)布的功能模塊。
4.2.1 導入依賴包
修改pom.xml 文件,添加MQTT相關依賴,具體示例代碼如下所示:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
</dependencies>
4.2.2 修改配置文件
修改application.yml配置文件,增加MQTT相關配置。示例代碼如下所示:
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服務地址,端口號默認11883,如果有多個,用逗號隔開
url: tcp://127.0.0.1:11883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復)
client:
id: provider-id
#MQTT默認的消息推送主題,實際可在調用接口是指定
default:
topic: topic
server:
port: 8080
4.2.3 消息生產(chǎn)者客戶端配置
創(chuàng)建MqttProviderConfig配置類,讀取application.yml中的相關配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
*/
public void connect(){
try{
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設置false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接服務器都是以新的身份
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔 1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch(MqttException e){
e.printStackTrace();
}
}
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主題的目的地,用于發(fā)布/訂閱信息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進度
//用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布是跟蹤消息的傳遞進度
MqttDeliveryToken token;
try {
//將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
//一旦此方法干凈地返回,消息就已被客戶端接受發(fā)布,當連接可用,將在后臺完成消息傳遞。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.2.4 生產(chǎn)者客戶端消息回調
創(chuàng)建MqttProviderCallBack類并繼承MqttCallback,實現(xiàn)相關消息回調事件,示例代碼如下圖所示:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback{
/**
* 客戶端斷開連接的回調
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務器斷開連接,可重連");
}
/**
* 消息到達的回調
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發(fā)布成功的回調
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println(String.format("接收消息成功"));
}
}
4.2.5 創(chuàng)建Controller控制器實現(xiàn)消息發(fā)布功能
創(chuàng)建SendController控制器類,實現(xiàn)消息的發(fā)送功能,示例代碼如下所示:
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos, retained, topic, message);
return "發(fā)送成功";
} catch (Exception e) {
e.printStackTrace();
return "發(fā)送失敗";
}
}
}
4.3 實現(xiàn)消費者
前面完成了生成者消息發(fā)布的模塊,接下來修改消費者模塊spring-boot-starter-mqtt-consumer
實現(xiàn)消息訂閱、處理的功能。
4.3.1 導入依賴包
修改pom.xml 文件,添加MQTT相關依賴,具體示例代碼如下所示:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
</dependencies>
4.3.2 修改配置文件
修改application.yml配置文件,增加MQTT相關配置。示例代碼如下所示:
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服務端地址,端口默認為11883,如果有多個,用逗號隔開
url: tcp://127.0.0.1:11883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復)
client:
id: consumer-id
#MQTT默認的消息推送主題,實際可在調用接口時指定
default:
topic: topic
server:
port: 8085
4.3.3 消費者客戶端配置
創(chuàng)建消費者客戶端配置類MqttConsumerConfig,讀取application.yml中的相關配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
*/
public void connect(){
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄,客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//訂閱主題
//消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1,1};
//主題
String[] topics = {"topic1","topic2"};
//訂閱主題
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.3.4 消費者客戶端消息回調
創(chuàng)建MqttConsumerCallBack類并繼承MqttCallback,實現(xiàn)相關消息回調事件,示例代碼如下圖所示:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback{
/**
* 客戶端斷開連接的回調
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務器斷開連接,可重連");
}
/**
* 消息到達的回調
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發(fā)布成功的回調
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println(String.format("接收消息成功"));
}
}
4.3.5 創(chuàng)建Controller控制器,實現(xiàn)MQTT連接的建立和斷開
接下來,創(chuàng)建Controller控制器MqttController,并實現(xiàn)MQTT連接的建立和斷開等方法。示例代碼如下所示:
import com.weiz.mqtt.config.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class MqttController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("/connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "連接到服務器";
}
@RequestMapping("/disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "與服務器斷開連接";
}
}
4.4 測試驗證
首先,分別啟動生產(chǎn)者spring-boot-starter-mqtt-provider
和消費者spring-boot-starter-mqtt-consume
r兩個項目,打開瀏覽器,輸入地址http://localhost:18083/,在EMQX管理界面可以看到連接上來的兩個客戶端。如下圖所示:
接下來,調用生產(chǎn)者的消息發(fā)布接口驗證消息發(fā)布是否成功。使用Pomstman調用消息發(fā)送接口:http://localhost:8080/sendMessage ,如下圖所示:
通過上圖可以發(fā)現(xiàn),生產(chǎn)者模塊已經(jīng)把消息發(fā)送成功。接下來查看消費者模塊,驗證消息是否處理成功。如下圖所示:
通過日志輸出可以發(fā)現(xiàn),消費者已經(jīng)成功接收到生產(chǎn)者發(fā)送的消息,說明我們成功實現(xiàn)在Spring Boot項目中整合MQTT實現(xiàn)了消息的發(fā)布和訂閱的功能。
最后
以上就是如何在Spring Boot中使用MQTT的詳細內容,更多關于在Spring Boot中MQTT的使用大家可以去自己研究學習。比如:如何利用qos機制保證數(shù)據(jù)不會丟失?消息的隊列和排序?集群模式下的應用?等等。
MQTT介紹與使用
正文
物聯(lián)網(wǎng)是新一代信息技術的重要組成部分,也是“信息化”時代的重要發(fā)展階段。其英文名稱是:“Internet of things(IoT)”。顧名思義,物聯(lián)網(wǎng)就是物物相連的互聯(lián)網(wǎng)。這有兩層意思:其一,物聯(lián)網(wǎng)的核心和基礎仍然是互聯(lián)網(wǎng),是在互聯(lián)網(wǎng)基礎上的延伸和擴展的網(wǎng)絡;其二,其用戶端延伸和擴展到了任何物品與物品之間,進行信息交換和通信,也就是物物相息。物聯(lián)網(wǎng)通過智能感知、識別技術與普適計算等通信感知技術,廣泛應用于網(wǎng)絡的融合中,也因此被稱為繼計算機、互聯(lián)網(wǎng)之后世界信息產(chǎn)業(yè)發(fā)展的第三次浪潮。
而在物聯(lián)網(wǎng)的應用上,對于信息傳輸,MQTT是一種再合適不過的協(xié)議工具了。
一、MQTT簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的輕量級協(xié)議,該協(xié)議構建于TCP/IP協(xié)議之上,MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設備、智能家居、及一些小型化設備中已廣泛使用。
二、特性
MQTT協(xié)議工作在低帶寬、不可靠的網(wǎng)絡的遠程傳感器和控制設備通訊而設計的協(xié)議,它具有以下主要的幾項特性:
(1)使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應用程序耦合。
(2)對負載內容屏蔽的消息傳輸。
(3)使用TCP/IP提供網(wǎng)絡連接。
主流的MQTT是基于TCP連接進行數(shù)據(jù)推送的,但是同樣有基于UDP的版本,叫做MQTT-SN。這兩種版本由于基于不同的連接方式,優(yōu)缺點自然也就各有不同了。
(4)有三種消息發(fā)布服務質量:
“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡。會發(fā)生消息丟失或重復。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就收不到了。
“至少一次”,確保消息到達,但消息重復可能會發(fā)生。
“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統(tǒng)中,可以使用此級別。在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發(fā)布服務還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。
(5)小型傳輸,開銷很?。ü潭ㄩL度的頭部是2字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡流量。
這就是為什么在介紹里說它非常適合“在物聯(lián)網(wǎng)領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協(xié)議來傳遞消息再適合不過了。
三、實現(xiàn)方式
實現(xiàn)MQTT協(xié)議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務器,消息發(fā)布者可以同時是訂閱者。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
(2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
四、MQTT的搭建(ubuntu)
1、apt-get安裝mqtt相關包
2、測試mosquitto是否正確運行
3、本機終端測試mqtt
打開一個終端,訂閱主題
mosquitto_sub -h 192.168.1.102 -t "mqtt" -v
【-h】指定要連接的MQTT服務器
【-t】訂閱主題,此處為mqtt
【-v】打印更多的調試信息
再打開一個終端,發(fā)布主題
mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"
【-h】指定要連接的MQTT服務器
【-t】向指定主題推送消息
【-m】指定消息內容
結果展示
五、MQTT權限配置
前面我們基于Mosquitto服務器已經(jīng)搭建成功了,但是默認是允許匿名用戶登錄,對于正式上線的項目則是需要進行用戶認證(當然,用戶一般都會與數(shù)據(jù)庫映射,不過在這里我們就會直接將用戶寫入配置文件中)
1、Mosquitto服務器的配置文件為/etc/mosquitto/mosquitto.conf,關于用戶認證的方式和讀取的配置都在這個文件中進行
配置文件參數(shù)說明:
ID | allow_anonymous | password_file | acl_file | result |
---|---|---|---|---|
1 | True(默認) | 允許匿名方式登錄 | ||
2 | False | password_file | 開啟用戶驗證機制 | |
3 | False | password_file | acl_file | 開啟用戶驗證機制,但訪問控制不起作用 |
4 | True | password_file | acl_file | 用戶名及密碼不為空,將自動進行用戶驗證且受到訪問控制的限制;用戶名及密碼為空,將不進行用戶驗證且受到訪問控制的限制 |
5 | False | 無法啟動服務 |
allow_anonymous允許匿名
password-file密碼文件
acl_file訪問控制列表
2、修改配置文件
命令:sudo vi /etc/mosquitto/mosquitto.conf
3、添加用戶信息
命令解釋: -c 創(chuàng)建一個用戶、/etc/mosquitto/pwfile.example 是將用戶創(chuàng)建到 pwfile.example 文件中、admin 是用戶名。
同樣連續(xù)會提示連續(xù)輸入兩次密碼。注意第二次創(chuàng)建用戶時不用加 -c 如果加 -c 會把第一次創(chuàng)建的用戶覆蓋。
至此兩個用戶創(chuàng)建成功,此時如果查看 pwfile.example 文件會發(fā)現(xiàn)其中多了兩個用戶。
4、添加Topic和用戶的關系
5、用戶認證測試
(1)重啟Mosquitto步驟
查看mosquitto的進程
命令:ps -aux|grep mosquitto
(2)殺死進程
命令:sudo kill -9 pid
(3)啟動
命令:mosquitto -c /etc/mosquitto/mosquitto.conf
(4)訂閱端啟動(不加用戶)
訂閱端啟動(加用戶)
(5)發(fā)布端啟動
六、MQTT實現(xiàn)(Java語言)
注意:由于我們在上面配置了MQTT的用戶權限控制,所以下面的用戶只能使用stonegeek登錄,否則項目會運行報錯,而且我們在上面設置的訪問控制列表中只有mtopic主題,所以我們必須使用此主題,否則,訂閱者會收不到已發(fā)布的主題內容(已經(jīng)測試過了)
下面是我們Java語言實現(xiàn)的MQTT服務的發(fā)布/訂閱
1、添加Maven依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
2、ServerMQTT.class
package com.stonegeek;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ServerMQTT {
//tcp://MQTT安裝的服務器地址:MQTT定義的端口號
public static final String HOST = "tcp://192.168.1.102:1883";
//定義一個主題
public static final String TOPIC = "mtopic";
//定義MQTT的ID,可以在MQTT服務配置中指定
private static final String clientid = "server11";
private MqttClient client;
private MqttTopic topic11;
private String userName = "stonegeek";
private String passWord = "123456";
private MqttMessage message;
/**
* 構造函數(shù)
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
}
/**
* 用來連接服務器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 設置超時時間
options.setConnectionTimeout(10);
// 設置會話心跳時間
options.setKeepAliveInterval(20);
try {
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
/**
* 啟動入口
* @param args
* @throws MqttException
*/
public static void main(String[] args) throws MqttException {
ServerMQTT server = new ServerMQTT();
server.message = new MqttMessage();
server.message.setQos(1);
server.message.setRetained(true);
server.message.setPayload("hello,topic11".getBytes());
server.publish(server.topic11 , server.message);
System.out.println(server.message.isRetained() + "------ratained狀態(tài)");
}
}
3、ClientMQTT.class
package com.stonegeek;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ClientMQTT {
public static final String HOST = "tcp://192.168.1.102:1883";
public static final String TOPIC = "mtopic";
private static final String clientid = "client11";
private MqttClient client;
private MqttConnectOptions options;
private String userName = "stonegeek";
private String passWord = "123456";
private ScheduledExecutorService scheduler;
private void start() {
try {
// host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設置
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
// 設置連接的用戶名
options.setUserName(userName);
// 設置連接的密碼
options.setPassword(passWord.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 設置回調
client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
//訂閱消息
int[] Qos = {1};
String[] topic1 = {TOPIC};
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
ClientMQTT client = new ClientMQTT();
client.start();
}
}
4、PushCallback.class
package com.stonegeek;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Created by StoneGeek on 2018/6/5.
* 博客地址:http://www.cnblogs.com/sxkgeek
* 發(fā)布消息的回調類
*
* 必須實現(xiàn)MqttCallback的接口并實現(xiàn)對應的相關接口方法CallBack 類將實現(xiàn) MqttCallBack。
* 每個客戶機標識都需要一個回調實例。在此示例中,構造函數(shù)傳遞客戶機標識以另存為實例數(shù)據(jù)。
* 在回調中,將它用來標識已經(jīng)啟動了該回調的哪個實例。
* 必須在回調類中實現(xiàn)三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預訂的發(fā)布。
*
* public void connectionLost(Throwable cause)在斷開連接時調用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
* 由 MqttClient.connect 激活此回調。
*/
public class PushCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執(zhí)行到這里面
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + new String(message.getPayload()));
}
}
5、結果展示
MQTT 客戶端重連
MQTT客戶端重連主要有兩種方法
第一種:自動重連
設置org.eclipse.paho.client.mqttv3.MqttConnectOptions#setAutomaticReconnect
為true
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
然后callback實現(xiàn)org.eclipse.paho.client.mqttv3.MqttCallbackExtended
,這個接口提供了一個連接完成的回調方法,連接完成后做一些操作,如訂閱主題。
public MyMqttCallback implements MqttCallbackExtended {
/**
* 連接成功會進入到這里
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 可以做訂閱主題
}
}
這樣當斷開連接后,MqttClient內部會自動進行重連,每次連接成功后都會回調connectComplete()方法。
第二種:自定義重連
如果不指定setAutomaticReconnect為true文章來源:http://www.zghlxwxcb.cn/news/detail-753974.html
可以在public void connectionLost(Throwable throwable)
中處理重連文章來源地址http://www.zghlxwxcb.cn/news/detail-753974.html
@Override
public void connectionLost(Throwable throwable) {
System.out.println("失去連接:" + throwable.getMessage());
int times = 1;
while (!client.isConnected()) {
try {
System.out.println("重新連接, 第" + (times++) + "次");
client.reconnect();
System.out.println("重連成功");
break;
} catch (MqttException e) {
e.printStackTrace();
System.out.println("重連失敗, msg:" + e.getMessage());
}
// 每隔10秒重試一次
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 執(zhí)行到這里,說明連接成功,重新訂閱主題
xxx
}
到了這里,關于一文搞懂MQTT,如何在SpringBoot中使用MQTT實現(xiàn)消息的訂閱和發(fā)布&MQTT 客戶端重連的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!