前言
EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信。物聯(lián)網(wǎng)的 MQ 消息通信方式。
一、介紹
1、MQTT
MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的"輕量級"通訊協(xié)議,該協(xié)議構建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為遠程連接設備提過實時可靠的消息服務,作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(loT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設備、智能家居、及一些小型化設備中已廣泛使用。
特點:
使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應用程序耦合;
對負載內(nèi)容屏蔽的消息傳輸;
使用 TCP/IP 提供網(wǎng)絡連接;
有三種消息發(fā)布服務質(zhì)量:
小型傳輸,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡流量;
使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
2、EMQX
EMQX 是一個「無限連接,任意集成,隨處運行」大規(guī)模分布式物聯(lián)網(wǎng)接入平臺。
EMQX 企業(yè)版提供一體化的分布式 MQTT 消息服務和強大的 IoT 規(guī)則引擎,為高可靠、高性能的物聯(lián)網(wǎng)實時數(shù)據(jù)移動、處理和集成提供動力,助力企業(yè)快速構建關鍵業(yè)務的 IoT 平臺與應用。附下載地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下載對應版本運行
優(yōu)勢:
海量連接:單節(jié)點支持 500 萬 MQTT 設備連接,集群可水平擴展至支持 1 億并發(fā)的 MQTT 連接。
高可靠:彈性伸縮,無單點故障。內(nèi)置 RocksDB 可靠地持久化 MQTT 消息,確保無數(shù)據(jù)損失。
數(shù)據(jù)安全:端到端數(shù)據(jù)加密(支持國密),細粒度訪問控制,保障數(shù)據(jù)安全,滿足企業(yè)合規(guī)需求。
多協(xié)議:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或?qū)S袇f(xié)議連接任何設備。
高性能:單節(jié)點支持每秒實時接收、處理與分發(fā)數(shù)百萬條的 MQTT 消息。毫秒級消息交付時延。
易運維:圖形化配置、操作與管理,實時監(jiān)測運行狀態(tài)。支持 MQTT 跟蹤進行端到端問題分析。
3、Mria 集群架構?
支持全新的 Mria 集群架構,在此架構下 EMQX 水平擴展性得到指數(shù)級提升,單個集群可以輕松支持 1 億 MQTT 連接,這使得 EMQX 5.0 成為目前全球最具擴展性的 MQTT Broker。
在構建滿足用戶業(yè)務需求的更大規(guī)模集群的同時,Mria 架構還能夠降低大規(guī)模部署下的腦裂風險以及腦裂后的影響,以提供更加穩(wěn)定可靠的物聯(lián)網(wǎng)數(shù)據(jù)接入服務。
具體可以查看官方文檔: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html
4、MQTTX
MQTTX 是由 EMQ 開發(fā)的一款開源跨平臺 MQTT 5.0 桌面客戶端,它兼容 macOS,Linux 以及 Windows 系統(tǒng)。MQTTX 的用戶界面 UI 采用聊天式設計,使得操作邏輯更加簡明直觀。它支持用戶快速創(chuàng)建和保存多個 MQTT 連接,便于測試 MQTT/MQTTS 連接,以及 MQTT 消息的訂閱和發(fā)布。
主要功能
采用聊天界面設計,使得操作更加簡單明了
跨平臺兼容,支持在 Windows,macOS,Linux 系統(tǒng)上運行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 協(xié)議
訂閱的 MQTT 主題支持自定義顏色標簽
支持單向和雙向 SSL 認證,同時支持 CA 和自簽名證書
支持通過 WebSocket 連接 MQTT 服務器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式轉換
自定義腳本支持模擬 MQTT 發(fā)布/訂閱測試
提供完整的日志記錄功能
多語言支持:簡體中文、英語、日語、土耳其語及匈牙利語 ??? ??? ??? ??? ???
自由切換 Light、Dark、Night 三種主題模式
二、SpringBoot 集成 EMQX
1、yaml 配置
# EMQX配置
emqx:
# EMQX服務地址,端口號默認18083
url: http://127.0.0.1:18083
# 認證用戶名
username: admin
# 密碼
password: admin123456
2、Properties 配置類
/**
* EMQX配置
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "emqx")
public class EmqxConfig {
private String url;
private String username;
private String password;
}
3、客戶端連接實體 model
客戶端連接實現(xiàn)模型類。
/**
* EMQX客戶端連接model
*/
@Data
public class EmqxClientModel {
Integer awaiting_rel_cnt;
Integer awaiting_rel_max;
Boolean clean_start;
/**
* 客戶端id
*/
String clientid;
/**
* 連接狀態(tài)
*/
Boolean connected;
Long connected_at;
Long created_at;
Long disconnected_at;
Integer expiry_interval;
Integer heap_size;
Integer inflight_cnt;
Integer inflight_max;
/**
* ip地址
*/
String ip_address;
Boolean is_bridge;
/**
* 心跳檢測時間s
*/
Integer keepalive;
Integer mailbox_len;
Integer mqueue_dropped;
Integer mqueue_len;
/**
* 消息隊列最大長度
*/
Integer mqueue_max;
String node;
Integer port;
String proto_name;
Integer proto_ver;
Integer recv_cnt;
Integer recv_msg;
}
4、token 服務類
提供獲取 token 的方法。
@Component
@RequiredArgsConstructor
public class EmqxTokenService {
private final EmqxConfig emqxConfig;
public String getToken(){
String authentication = emqxConfig.getUsername() + ":" + emqxConfig.getPassword();
return "Basic " + Base64.getEncoder().encodeToString(authentication.getBytes());
}
}
5、客戶端 api
提供對外調(diào)用的 api 服務。
- 查詢客戶端連接狀態(tài)。
- 查詢客戶端連接信息。
- 刪除客戶端連接。
@Slf4j
@Component
@RequiredArgsConstructor
public class EmqxClientsApi {
private final EmqxConfig emqxConfig;
private final EmqxTokenService emqxTokenService;
/**
* 根據(jù)客戶端id查詢連接狀態(tài)
*
* @param clientId 客戶端id
* @return 連接狀態(tài)
*/
public boolean getConnectedStatus(String clientId) {
EmqxClientModel client = getByClientId(clientId);
if (client == null) {
return false;
}
return client.getConnected();
}
/**
* 根據(jù)客戶端id查詢客戶端信息
*
* @param clientId 客戶端id
* @return 客戶端信息
*/
public EmqxClientModel getByClientId(String clientId) {
String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
HttpResponse httpResponse;
try {
httpResponse = HttpRequest.get(url)
.header("Authorization", emqxTokenService.getToken())
.execute();
} catch (Exception e) {
log.info("未查到emqx客戶端:clientId=" + clientId + "[msg]:" + e.getMessage());
return null;
}
if (httpResponse != null && httpResponse.getStatus() == 200) {
return JSON.parseObject(httpResponse.body(), EmqxClientModel.class);
}
return null;
}
/**
* 根據(jù)客戶端id刪除客戶端
*
* @param clientId 客戶端id
* @return 客戶端信息
*/
public void delete(String clientId) {
String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
HttpResponse httpResponse;
try {
httpResponse = HttpRequest.delete(url)
.header("Authorization", emqxTokenService.getToken())
.execute();
} catch (IORuntimeException e) {
throw new ServiceException("刪除emqx客戶端請求超時");
} catch (Exception e) {
throw new ServiceException("刪除emqx客戶端請求異常:clientId=" + clientId + "[msg]:" + e.getMessage());
}
if (httpResponse == null || httpResponse.getStatus() != 204) {
throw new ServiceException("刪除emqx客戶端失?。篶lientId=" + clientId);
}
}
}
三、SpringBoot 集成 MQTT
1、pom 依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
2、yaml 配置
spring:
# MQTT配置
mqtt:
# MQTT服務地址,端口號默認1883,如果有多個,用逗號隔開
host-url: tcp://127.0.0.1:1883
# 用戶名
username: admin
# 密碼
password: admin123456
# 客戶端id(不能重復)
client-id: real-mqtt-client
# MQTT默認的消息推送主題,實際可在調(diào)用接口時指定
default-topic: topic
3、Properties 配置類
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttConfig {
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
}
4、連接工廠類
執(zhí)行 mq 初始化配置、連接、消息主題訂閱。
@Slf4j
@Component
public class MqttFactory {
public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();
@Autowired
private MqttConfig mqttConfig;
@Autowired
private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;
/**
* 在bean初始化后連接到服務器
*/
@PostConstruct
public void init() {
String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);
if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {
// 初始化訂閱主題
initSubscribeTopic(getInstance());
}
}
/**
* 初始化客戶端
*/
public MqttClient getInstance() {
MqttClient client = null;
if (clientMap.get(mqttConfig.getClientId()) == null) {
try {
client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());
// MQTT配置對象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 設置自動重連, 其它具體參數(shù)可以查看MqttConnectOptions
mqttConnectOptions.setAutomaticReconnect(true);
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
// mqttConnectOptions.setCleanSession(true);
// 設置超時時間 單位為秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
// mqttConnectOptions.setServerURIs(new String[]{url});
// 設置會話心跳時間 單位為秒
mqttConnectOptions.setKeepAliveInterval(10);
// 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發(fā)布客戶端的“遺囑”消息。
// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
}
client.setCallback(new MqttCallBack());
log.info("MQTT創(chuàng)建client成功={}", JSONObject.toJSONString(client));
clientMap.put(mqttConfig.getClientId(), client);
} catch (MqttException e) {
log.error("MQTT連接消息服務器[{}]失敗", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());
}
} else {
client = clientMap.get(mqttConfig.getClientId());
log.info("MQTT從map里獲取到client,clientId=" + mqttConfig.getClientId());
// TODO 已采用自動重連策略
// log.info("MQTT從map里獲取到client={}", JSONObject.toJSONString(client));
// if (!client.isConnected()) {
// initSubscribeTopic(client);
// 如果緩存里的client已經(jīng)斷開,則清除該緩存,再重新創(chuàng)建客戶端連接
// clientMap.remove(mqttConfig.getClientId());
// this.getInstance();
// }
}
return client;
}
/**
* 初始化訂閱主題
* <p>
* 消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
*/
public void initSubscribeTopic(MqttClient client) {
// 訂閱設備發(fā)布消息主題
List<String> upstreamTopics = new ArrayList<>();
List<Integer> upstreamQos = new ArrayList<>();
upstreamTopics.add("topic_1");
upstreamQos.add(1);
upstreamTopics.add("topic_2");
upstreamQos.add(0);
upstreamTopics.add("topic_3");
upstreamQos.add(1);
try {
client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
5、MQTT 回調(diào)類
連接斷開回調(diào),以及消息到達回調(diào)。根據(jù)消息主題進行消息分發(fā)。
@Slf4j
@Component
public class MqttCallBack implements MqttCallback, MqttCallbackExtended {
/**
* 客戶端斷開連接的回調(diào)
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("客戶端斷開連接回調(diào)");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("客戶端斷開連接重連");
// 重新訂閱
MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
client.initSubscribeTopic(client.getInstance());
log.info("重連成功");
}
/**
* 消息到達的回調(diào)
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
// 日志輸出
MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
log.info("mqtt客戶端ID : {}", client.getInstance().getClientId());
log.info("mqtt接收消息主題 : {}", topic);
log.info("mqtt接收消息Qos : {}", mqttMessage.getQos());
log.info("mqtt接收消息retained : {}", mqttMessage.isRetained());
log.info("mqtt接收消息內(nèi)容 : {}", new String(mqttMessage.getPayload()));
String message = new String(mqttMessage.getPayload()); // 消息內(nèi)容
// TODO 根據(jù)消息主題進行消息分發(fā)
}
/**
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
log.info(client.getClientId() + " 發(fā)布消息成功!");
}
}
6、MQ 服務類
提供消息發(fā)布等方法。
這里對發(fā)送消息方法進行了自定義封裝,增加了 redis 對發(fā)送的消息進行存儲,在異步的響應消息返回時,利用 redis 中發(fā)送消息的消息id實現(xiàn)響應數(shù)據(jù)的異步綁定。響應消息存在丟失的情況。需要合理設置發(fā)送消息的過期時間,防止時間過短導致返回的響應丟失,防止時間過長占用 redis 資源。
@Slf4j
@Data
@Configuration
public class UMqttClientService {
private final MqttFactory mqttFactory;
private final StringRedisTemplate redisTemplate;
/**
* 訂閱主題
*/
public void subscribeTopic(String deviceNo) {
try {
// 訂閱設備發(fā)布消息主題
List<String> upstreamTopics = new ArrayList<>();
upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
upstreamTopics.add(UMqttCommonConstants.ONLINE);
upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
int[] upstreamQos = {1, 1, 2, 0};
mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消訂閱主題
*/
public void stopSubscribeTopic(String deviceNo) {
try {
// 取消訂閱設備發(fā)布消息主題
List<String> upstreamTopics = new ArrayList<>();
upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
upstreamTopics.add(UMqttCommonConstants.ONLINE);
upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disConnect() {
try {
mqttFactory.getInstance().disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
*/
public void subscribe(String topic, int qos) {
try {
mqttFactory.getInstance().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 發(fā)布請求設備消息
*
* @param deviceNo 設備編號
* @param message 消息
*/
public void publish(String deviceNo, String message) {
publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);
}
/**
* 發(fā)布請求設備消息
*/
public void publish(UMqttPublishDate publishDate) {
// 將消息id和方法名存到redis中:緩存3分鐘
redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());
}
/**
* 發(fā)布響應設備消息
*/
public void publishResponse(UMqttPublishDate publishDate) {
// 將消息id和方法名存到redis中:緩存3分鐘
redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());
}
/**
* 發(fā)布消息
*
* @param qos qos
* @param retained retained
* @param topic 主題
* @param message 消息
*/
public void publish(int qos, boolean retained, String topic, String message) {
log.info("發(fā)布消息topic:" + topic);
log.info("發(fā)布消息message:" + message);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主題的目的地,用于發(fā)布/訂閱信息
MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進度
//用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布是跟蹤消息的傳遞進度
MqttDeliveryToken token;
try {
//將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
//一旦此方法干凈地返回,消息就已被客戶端接受發(fā)布,當連接可用,將在后臺完成消息傳遞。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
四、MQTT 的重連策略
- 在 mqtt 工廠類中進行初始化連接時,設置自動重連狀態(tài)為開啟。
// 設置自動重連
mqttConnectOptions.setAutomaticReconnect(true);
- 在 mqtt 回調(diào)類中,設置重連處理業(yè)務。
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("客戶端斷開連接重連");
// 重新訂閱
MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
client.initSubscribeTopic(client.getInstance());
log.info("重連成功");
}
MQTT 連接失效時,會自動進行重連,執(zhí)行自定義的重連策略,重連過程中 MQTT 消息服務停止,消息處理等業(yè)務會拋出異常。
五、EMQX 的 Windows 部署啟動方式
EMQX 服務需要延時啟動,因為部署服務器開機時 EMQX 服務需要等待完成一些初始化操作。保證 EMQX 服務在業(yè)務服務啟動前啟動。
bat 腳本部署方案:
- 方式一:(每次停止服務后需重啟電腦)
@echo off
start cmd /k "cd /d D:\Program Files\emqx-5.0.11-windows-amd64\bin && emqx start"
echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
- 方式二:(管理員權限cmd進入D:\Program Files\emqx-5.0.11-windows-amd64\bin,執(zhí)行emqx install安裝成服務,將emqx服務設置為手動啟動)
@echo off
sc start emqx
echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
六、疑難解答
1、避免消息發(fā)送速率過快
當需要批量發(fā)送大量消息時,如果消息發(fā)送頻率過快,會導致 EMQX 服務器會主動將當前發(fā)送消息的客戶端連接斷開,因此在發(fā)送消息時需要控制 MQTT 消息的發(fā)送頻率。
for (int i = 0; i < recordSize; i++) {
// 發(fā)送MQTT消息
mqttClientService.sendMessage("topic", "發(fā)送消息");
// 執(zhí)行延時程序,控制消息發(fā)送速率。(速率過快會導致MQTT客戶端連接掉線)
try {
TimeUnit.SECONDS.sleep(second);
} catch (Exception e) {
log.error("延時程序執(zhí)行異常:" + e.getMessage());
}
}
2、判斷 MQTT 客戶端連接狀態(tài)
會有 MQTT 客戶端連接存在但連接狀態(tài)為已斷開的情況。因此判斷 MQTT 客戶端連接狀態(tài)時,需要獲取 MQTT 客戶端連接的實際連接狀態(tài),而不是僅判斷 MQTT 客戶端連接是否存在。文章來源:http://www.zghlxwxcb.cn/news/detail-777250.html
/**
* 根據(jù)客戶端id查詢連接狀態(tài)
*
* @param clientId 客戶端id
* @return 連接狀態(tài)
*/
public boolean getConnectedStatus(String clientId) {
EmqxClientModel client = getByClientId(clientId);
if (client == null) {
return false;
}
return client.getConnected(); // 查詢實際連接狀態(tài)
}
總結
MQTT 原理同 MQ 消息發(fā)送與接收,EMQX 的 MQTT 客服端連接即消息隊列。文章來源地址http://www.zghlxwxcb.cn/news/detail-777250.html
到了這里,關于Java 使用 EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!