国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Java 使用 EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信

這篇具有很好參考價值的文章主要介紹了Java 使用 EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。


前言

EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信。物聯(lián)網(wǎng)的 MQ 消息通信方式。


一、介紹

1、MQTT

MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的"輕量級"通訊協(xié)議,該協(xié)議構建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為遠程連接設備提過實時可靠的消息服務,作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(loT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設備、智能家居、及一些小型化設備中已廣泛使用。

特點:
使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應用程序耦合;
對負載內(nèi)容屏蔽的消息傳輸;
使用 TCP/IP 提供網(wǎng)絡連接;
有三種消息發(fā)布服務質(zhì)量:
小型傳輸,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡流量;
使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。

2、EMQX

EMQX 是一個「無限連接,任意集成,隨處運行」大規(guī)模分布式物聯(lián)網(wǎng)接入平臺。
EMQX 企業(yè)版提供一體化的分布式 MQTT 消息服務和強大的 IoT 規(guī)則引擎,為高可靠、高性能的物聯(lián)網(wǎng)實時數(shù)據(jù)移動、處理和集成提供動力,助力企業(yè)快速構建關鍵業(yè)務的 IoT 平臺與應用。附下載地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下載對應版本運行
mria 集群架構,java程序?qū)崿F(xiàn),mqtt,emqx,java,物聯(lián)網(wǎng)

優(yōu)勢:
海量連接:單節(jié)點支持 500 萬 MQTT 設備連接,集群可水平擴展至支持 1 億并發(fā)的 MQTT 連接。
高可靠:彈性伸縮,無單點故障。內(nèi)置 RocksDB 可靠地持久化 MQTT 消息,確保無數(shù)據(jù)損失。
數(shù)據(jù)安全:端到端數(shù)據(jù)加密(支持國密),細粒度訪問控制,保障數(shù)據(jù)安全,滿足企業(yè)合規(guī)需求。
多協(xié)議:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或?qū)S袇f(xié)議連接任何設備。
高性能:單節(jié)點支持每秒實時接收、處理與分發(fā)數(shù)百萬條的 MQTT 消息。毫秒級消息交付時延。
易運維:圖形化配置、操作與管理,實時監(jiān)測運行狀態(tài)。支持 MQTT 跟蹤進行端到端問題分析。

3、Mria 集群架構?

支持全新的 Mria 集群架構,在此架構下 EMQX 水平擴展性得到指數(shù)級提升,單個集群可以輕松支持 1 億 MQTT 連接,這使得 EMQX 5.0 成為目前全球最具擴展性的 MQTT Broker。

在構建滿足用戶業(yè)務需求的更大規(guī)模集群的同時,Mria 架構還能夠降低大規(guī)模部署下的腦裂風險以及腦裂后的影響,以提供更加穩(wěn)定可靠的物聯(lián)網(wǎng)數(shù)據(jù)接入服務。

具體可以查看官方文檔: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

4、MQTTX

MQTTX 是由 EMQ 開發(fā)的一款開源跨平臺 MQTT 5.0 桌面客戶端,它兼容 macOS,Linux 以及 Windows 系統(tǒng)。MQTTX 的用戶界面 UI 采用聊天式設計,使得操作邏輯更加簡明直觀。它支持用戶快速創(chuàng)建和保存多個 MQTT 連接,便于測試 MQTT/MQTTS 連接,以及 MQTT 消息的訂閱和發(fā)布。

mria 集群架構,java程序?qū)崿F(xiàn),mqtt,emqx,java,物聯(lián)網(wǎng)

主要功能
采用聊天界面設計,使得操作更加簡單明了
跨平臺兼容,支持在 Windows,macOS,Linux 系統(tǒng)上運行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 協(xié)議
訂閱的 MQTT 主題支持自定義顏色標簽
支持單向和雙向 SSL 認證,同時支持 CA 和自簽名證書
支持通過 WebSocket 連接 MQTT 服務器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式轉換
自定義腳本支持模擬 MQTT 發(fā)布/訂閱測試
提供完整的日志記錄功能
多語言支持:簡體中文、英語、日語、土耳其語及匈牙利語 ??? ??? ??? ??? ???
自由切換 Light、Dark、Night 三種主題模式

二、SpringBoot 集成 EMQX

1、yaml 配置

# EMQX配置
emqx:
  # EMQX服務地址,端口號默認18083
  url: http://127.0.0.1:18083
  # 認證用戶名
  username: admin
  # 密碼
  password: admin123456

2、Properties 配置類

/**
 * EMQX配置
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "emqx")
public class EmqxConfig {

	private String url;

	private String username;

	private String password;
}

3、客戶端連接實體 model

客戶端連接實現(xiàn)模型類。

/**
 * EMQX客戶端連接model
 */
@Data
public class EmqxClientModel {

	Integer awaiting_rel_cnt;
	Integer awaiting_rel_max;
	Boolean clean_start;
	
	/**
	 * 客戶端id
	 */
	String clientid;
	
	/**
	 * 連接狀態(tài)
	 */
	Boolean connected;
	Long connected_at;
	Long created_at;
	Long disconnected_at;
	Integer expiry_interval;
	Integer heap_size;
	Integer inflight_cnt;
	Integer inflight_max;
	
	/**
	 * ip地址
	 */
	String ip_address;
	Boolean is_bridge;
	
	/**
	 * 心跳檢測時間s
	 */
	Integer keepalive;
	Integer mailbox_len;
	Integer mqueue_dropped;
	Integer mqueue_len;
	
	/**
	 * 消息隊列最大長度
	 */
	Integer mqueue_max;
	String node;
	Integer port;
	String proto_name;
	Integer proto_ver;
	Integer recv_cnt;
	Integer recv_msg;
}

4、token 服務類

提供獲取 token 的方法。

@Component
@RequiredArgsConstructor
public class EmqxTokenService {

	private final EmqxConfig emqxConfig;

	public String getToken(){
		String authentication = emqxConfig.getUsername() + ":" + emqxConfig.getPassword();
		return "Basic " + Base64.getEncoder().encodeToString(authentication.getBytes());
	}
}

5、客戶端 api

提供對外調(diào)用的 api 服務。

  1. 查詢客戶端連接狀態(tài)。
  2. 查詢客戶端連接信息。
  3. 刪除客戶端連接。
@Slf4j
@Component
@RequiredArgsConstructor
public class EmqxClientsApi {

	private final EmqxConfig emqxConfig;

	private final EmqxTokenService emqxTokenService;

	/**
	 * 根據(jù)客戶端id查詢連接狀態(tài)
	 *
	 * @param clientId 客戶端id
	 * @return 連接狀態(tài)
	 */
	public boolean getConnectedStatus(String clientId) {
		EmqxClientModel client = getByClientId(clientId);
		if (client == null) {
			return false;
		}
		return client.getConnected();
	}

	/**
	 * 根據(jù)客戶端id查詢客戶端信息
	 *
	 * @param clientId 客戶端id
	 * @return 客戶端信息
	 */
	public EmqxClientModel getByClientId(String clientId) {
		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
		HttpResponse httpResponse;
		try {
			httpResponse = HttpRequest.get(url)
					.header("Authorization", emqxTokenService.getToken())
					.execute();
		} catch (Exception e) {
			log.info("未查到emqx客戶端:clientId=" + clientId + "[msg]:" + e.getMessage());
			return null;
		}
		if (httpResponse != null && httpResponse.getStatus() == 200) {
			return JSON.parseObject(httpResponse.body(), EmqxClientModel.class);
		}
		return null;
	}

	/**
	 * 根據(jù)客戶端id刪除客戶端
	 *
	 * @param clientId 客戶端id
	 * @return 客戶端信息
	 */
	public void delete(String clientId) {
		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
		HttpResponse httpResponse;
		try {
			httpResponse = HttpRequest.delete(url)
					.header("Authorization", emqxTokenService.getToken())
					.execute();
		} catch (IORuntimeException e) {
			throw new ServiceException("刪除emqx客戶端請求超時");
		} catch (Exception e) {
			throw new ServiceException("刪除emqx客戶端請求異常:clientId=" + clientId + "[msg]:" + e.getMessage());
		}
		if (httpResponse == null || httpResponse.getStatus() != 204) {
			throw new ServiceException("刪除emqx客戶端失?。篶lientId=" + clientId);
		}
	}

}

三、SpringBoot 集成 MQTT

1、pom 依賴

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

2、yaml 配置

spring:
  # MQTT配置
  mqtt:
    # MQTT服務地址,端口號默認1883,如果有多個,用逗號隔開
    host-url: tcp://127.0.0.1:1883
    # 用戶名
    username: admin
    # 密碼
    password: admin123456
    # 客戶端id(不能重復)
    client-id: real-mqtt-client
    # MQTT默認的消息推送主題,實際可在調(diào)用接口時指定
    default-topic: topic

3、Properties 配置類

@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttConfig {

	private String username;

	private String password;

	private String hostUrl;

	private String clientId;

	private String defaultTopic;
}

4、連接工廠類

執(zhí)行 mq 初始化配置、連接、消息主題訂閱。

@Slf4j
@Component
public class MqttFactory {

	public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

	@Autowired
	private MqttConfig mqttConfig;

	@Autowired
	private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;

	/**
	 * 在bean初始化后連接到服務器
	 */
	@PostConstruct
	public void init() {
		String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);
		if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {
			// 初始化訂閱主題
			initSubscribeTopic(getInstance());
		}
	}

	/**
	 * 初始化客戶端
	 */
	public MqttClient getInstance() {
		MqttClient client = null;
		if (clientMap.get(mqttConfig.getClientId()) == null) {
			try {
				client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());
				// MQTT配置對象
				MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
				// 設置自動重連, 其它具體參數(shù)可以查看MqttConnectOptions
				mqttConnectOptions.setAutomaticReconnect(true);
				// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
				// mqttConnectOptions.setCleanSession(true);
				// 設置超時時間 單位為秒
				mqttConnectOptions.setConnectionTimeout(10);
				mqttConnectOptions.setUserName(mqttConfig.getUsername());
				mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
				// mqttConnectOptions.setServerURIs(new String[]{url});
				// 設置會話心跳時間 單位為秒
				mqttConnectOptions.setKeepAliveInterval(10);
				// 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發(fā)布客戶端的“遺囑”消息。
				// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);
				if (!client.isConnected()) {
					client.connect(mqttConnectOptions);
				}
				client.setCallback(new MqttCallBack());
				log.info("MQTT創(chuàng)建client成功={}", JSONObject.toJSONString(client));
				clientMap.put(mqttConfig.getClientId(), client);
			} catch (MqttException e) {
				log.error("MQTT連接消息服務器[{}]失敗", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());
			}
		} else {
			client = clientMap.get(mqttConfig.getClientId());
			log.info("MQTT從map里獲取到client,clientId=" + mqttConfig.getClientId());
			// TODO 已采用自動重連策略
//			log.info("MQTT從map里獲取到client={}", JSONObject.toJSONString(client));
//			if (!client.isConnected()) {
//				initSubscribeTopic(client);
			// 如果緩存里的client已經(jīng)斷開,則清除該緩存,再重新創(chuàng)建客戶端連接
//				clientMap.remove(mqttConfig.getClientId());
//				this.getInstance();
//			}
		}
		return client;
	}


	/**
	 * 初始化訂閱主題
	 * <p>
	 * 消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
	 */
	public void initSubscribeTopic(MqttClient client) {
			// 訂閱設備發(fā)布消息主題
			List<String> upstreamTopics = new ArrayList<>();
			List<Integer> upstreamQos = new ArrayList<>();
			
			upstreamTopics.add("topic_1");
			upstreamQos.add(1);
			upstreamTopics.add("topic_2");
			upstreamQos.add(0);
			upstreamTopics.add("topic_3");
			upstreamQos.add(1);
		
			try {
				client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray());
			} catch (MqttException e) {
				e.printStackTrace();
			}
		}
	}

}

5、MQTT 回調(diào)類

連接斷開回調(diào),以及消息到達回調(diào)。根據(jù)消息主題進行消息分發(fā)。

@Slf4j
@Component
public class MqttCallBack implements MqttCallback, MqttCallbackExtended {

	/**
	 * 客戶端斷開連接的回調(diào)
	 */
	@Override
	public void connectionLost(Throwable throwable) {
		log.info("客戶端斷開連接回調(diào)");
	}

	@Override
	public void connectComplete(boolean reconnect, String serverURI) {
		log.info("客戶端斷開連接重連");
		// 重新訂閱
		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
		client.initSubscribeTopic(client.getInstance());
		log.info("重連成功");
	}

	/**
	 * 消息到達的回調(diào)
	 */
	@Override
	public void messageArrived(String topic, MqttMessage mqttMessage) {
		// 日志輸出
		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
		log.info("mqtt客戶端ID : {}", client.getInstance().getClientId());
		log.info("mqtt接收消息主題 : {}", topic);
		log.info("mqtt接收消息Qos : {}", mqttMessage.getQos());
		log.info("mqtt接收消息retained : {}", mqttMessage.isRetained());
		log.info("mqtt接收消息內(nèi)容 : {}", new String(mqttMessage.getPayload()));

		String message = new String(mqttMessage.getPayload()); // 消息內(nèi)容

		// TODO 根據(jù)消息主題進行消息分發(fā)
		
	}

	/**
	 * 消息發(fā)布成功的回調(diào)
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		IMqttAsyncClient client = token.getClient();
		log.info(client.getClientId() + " 發(fā)布消息成功!");
	}
}

6、MQ 服務類

提供消息發(fā)布等方法。

這里對發(fā)送消息方法進行了自定義封裝,增加了 redis 對發(fā)送的消息進行存儲,在異步的響應消息返回時,利用 redis 中發(fā)送消息的消息id實現(xiàn)響應數(shù)據(jù)的異步綁定。響應消息存在丟失的情況。需要合理設置發(fā)送消息的過期時間,防止時間過短導致返回的響應丟失,防止時間過長占用 redis 資源。

@Slf4j
@Data
@Configuration
public class UMqttClientService {

	private final MqttFactory mqttFactory;

	private final StringRedisTemplate redisTemplate;

	/**
	 * 訂閱主題
	 */
	public void subscribeTopic(String deviceNo) {
		try {
			// 訂閱設備發(fā)布消息主題
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			int[] upstreamQos = {1, 1, 2, 0};
			mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 取消訂閱主題
	 */
	public void stopSubscribeTopic(String deviceNo) {
		try {
			// 取消訂閱設備發(fā)布消息主題
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 斷開連接
	 */
	public void disConnect() {
		try {
			mqttFactory.getInstance().disconnect();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 訂閱主題
	 */
	public void subscribe(String topic, int qos) {
		try {
			mqttFactory.getInstance().subscribe(topic, qos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 發(fā)布請求設備消息
	 *
	 * @param deviceNo 設備編號
	 * @param message  消息
	 */
	public void publish(String deviceNo, String message) {
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);
	}

	/**
	 * 發(fā)布請求設備消息
	 */
	public void publish(UMqttPublishDate publishDate) {
		// 將消息id和方法名存到redis中:緩存3分鐘
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 發(fā)布響應設備消息
	 */
	public void publishResponse(UMqttPublishDate publishDate) {
		// 將消息id和方法名存到redis中:緩存3分鐘
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 發(fā)布消息
	 *
	 * @param qos      qos
	 * @param retained retained
	 * @param topic    主題
	 * @param message  消息
	 */
	public void publish(int qos, boolean retained, String topic, String message) {
		log.info("發(fā)布消息topic:" + topic);
		log.info("發(fā)布消息message:" + message);
		MqttMessage mqttMessage = new MqttMessage();
		mqttMessage.setQos(qos);
		mqttMessage.setRetained(retained);
		mqttMessage.setPayload(message.getBytes());
		//主題的目的地,用于發(fā)布/訂閱信息
		MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);
		//提供一種機制來跟蹤消息的傳遞進度
		//用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布是跟蹤消息的傳遞進度
		MqttDeliveryToken token;
		try {
			//將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
			//一旦此方法干凈地返回,消息就已被客戶端接受發(fā)布,當連接可用,將在后臺完成消息傳遞。
			token = mqttTopic.publish(mqttMessage);
			token.waitForCompletion();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

四、MQTT 的重連策略

  1. 在 mqtt 工廠類中進行初始化連接時,設置自動重連狀態(tài)為開啟。
// 設置自動重連
mqttConnectOptions.setAutomaticReconnect(true);
  1. 在 mqtt 回調(diào)類中,設置重連處理業(yè)務。
@Override
public void connectComplete(boolean reconnect, String serverURI) {
	log.info("客戶端斷開連接重連");
	// 重新訂閱
	MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
	client.initSubscribeTopic(client.getInstance());
	log.info("重連成功");
}

MQTT 連接失效時,會自動進行重連,執(zhí)行自定義的重連策略,重連過程中 MQTT 消息服務停止,消息處理等業(yè)務會拋出異常。

五、EMQX 的 Windows 部署啟動方式

EMQX 服務需要延時啟動,因為部署服務器開機時 EMQX 服務需要等待完成一些初始化操作。保證 EMQX 服務在業(yè)務服務啟動前啟動。

bat 腳本部署方案:

  1. 方式一:(每次停止服務后需重啟電腦)
@echo off
start cmd /k "cd /d D:\Program Files\emqx-5.0.11-windows-amd64\bin && emqx start"

echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
  1. 方式二:(管理員權限cmd進入D:\Program Files\emqx-5.0.11-windows-amd64\bin,執(zhí)行emqx install安裝成服務,將emqx服務設置為手動啟動)
@echo off
sc start emqx

echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar

六、疑難解答

1、避免消息發(fā)送速率過快

當需要批量發(fā)送大量消息時,如果消息發(fā)送頻率過快,會導致 EMQX 服務器會主動將當前發(fā)送消息的客戶端連接斷開,因此在發(fā)送消息時需要控制 MQTT 消息的發(fā)送頻率。

for (int i = 0; i < recordSize; i++) {
	// 發(fā)送MQTT消息
	mqttClientService.sendMessage("topic", "發(fā)送消息");

	// 執(zhí)行延時程序,控制消息發(fā)送速率。(速率過快會導致MQTT客戶端連接掉線)
	try {
		TimeUnit.SECONDS.sleep(second);
	} catch (Exception e) {
		log.error("延時程序執(zhí)行異常:" + e.getMessage());
	}
}

2、判斷 MQTT 客戶端連接狀態(tài)

會有 MQTT 客戶端連接存在但連接狀態(tài)為已斷開的情況。因此判斷 MQTT 客戶端連接狀態(tài)時,需要獲取 MQTT 客戶端連接的實際連接狀態(tài),而不是僅判斷 MQTT 客戶端連接是否存在。

/**
 * 根據(jù)客戶端id查詢連接狀態(tài)
 *
 * @param clientId 客戶端id
 * @return 連接狀態(tài)
 */
public boolean getConnectedStatus(String clientId) {
	EmqxClientModel client = getByClientId(clientId);
	if (client == null) {
		return false;
	}
	return client.getConnected(); // 查詢實際連接狀態(tài)
}

總結

MQTT 原理同 MQ 消息發(fā)送與接收,EMQX 的 MQTT 客服端連接即消息隊列。文章來源地址http://www.zghlxwxcb.cn/news/detail-777250.html

到了這里,關于Java 使用 EMQX 實現(xiàn)物聯(lián)網(wǎng) MQTT 通信的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • C# 完美實現(xiàn)物聯(lián)網(wǎng) MQTT 數(shù)據(jù)通信

    C# 完美實現(xiàn)物聯(lián)網(wǎng) MQTT 數(shù)據(jù)通信

    MQTT 協(xié)議由于其用極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務,具有開銷低、占用帶寬低、即時通訊等優(yōu)點,使其在物聯(lián)網(wǎng)、小型設備、移動應用等方面有較廣泛的應用,在工業(yè)物聯(lián)網(wǎng)中,MQTT也有廣泛的應用。 搭建一個 MQTT 服務器 見本人其它文章《

    2024年02月02日
    瀏覽(18)
  • 使用4G通信模塊和MQTT協(xié)議,完成物聯(lián)網(wǎng)設備開發(fā)。

    使用4G通信模塊和MQTT協(xié)議,完成物聯(lián)網(wǎng)設備開發(fā)。

    (1)安裝并使用4G模塊通信模塊,建立microPython開發(fā)環(huán)境; (2)使用提供的Demo開發(fā)例程,使用MQTT傳輸協(xié)議連接阿里或騰訊網(wǎng)站,完成物聯(lián)網(wǎng)設備開發(fā)。 (3)將溫濕度信息上傳到網(wǎng)站; (4)手機APP查看數(shù) 這是第一步,在阿里云平臺創(chuàng)建產(chǎn)品和設備,用來將實際的設備數(shù)據(jù)

    2024年02月04日
    瀏覽(32)
  • MQTT(EMQX) - SpringBoot 整合MQTT 連接池 Demo - 附源代碼 + 在線客服聊天架構圖

    MQTT(EMQX) - SpringBoot 整合MQTT 連接池 Demo - 附源代碼 + 在線客服聊天架構圖

    MQTT(EMQX) - Linux CentOS Docker 安裝 MQTT (Message Queue Telemetry Transport) 是一個輕量級傳輸協(xié)議,它被設計用于輕量級的發(fā)布/訂閱式消息傳輸,MQTT協(xié)議針對低帶寬網(wǎng)絡,低計算能力的設備,做了特殊的優(yōu)化。是一種簡單、穩(wěn)定、開放、輕量級易于實現(xiàn)的消息協(xié)議,在物聯(lián)網(wǎng)的應用下的

    2023年04月10日
    瀏覽(20)
  • STM32+ESP-01s+EMQX實現(xiàn)單片機MQTT協(xié)議傳輸數(shù)據(jù)上云(二)STM32F103與ESP-01s的Usart通信,實現(xiàn)STM32連接上網(wǎng)上云

    STM32+ESP-01s+EMQX實現(xiàn)單片機MQTT協(xié)議傳輸數(shù)據(jù)上云(二)STM32F103與ESP-01s的Usart通信,實現(xiàn)STM32連接上網(wǎng)上云

    單片機:STM32F103c8t6 WiFi模塊:ESP8266-01s EMQX:自身服務器上搭載emq服務器或者借用emqx window 版本? USB TO TTL模塊:CH340 因為CH340不能給ESP-01s供3.3V的電,所以測試時需要外加供電 ? ? ? ? ? 本章中涉及到的技術原理主要為ESP01S wfi模塊的AT指令通信,我在上一篇文章給大家提到了

    2024年02月16日
    瀏覽(26)
  • springboot當中使用EMQX(MQTT協(xié)議)

    springboot當中使用EMQX(MQTT協(xié)議)

    本篇博客主要圍繞EMQX是什么?、能干什么?、怎么用? 三點來進行整理。 1.1、MQTT簡介 在了解EMQX前首先了解一下MQTT協(xié)議,MQTT 全稱為 Message Queuing Telemetry Transport(消息隊列遙測傳輸),是一種基于 發(fā)布/訂閱 模式的 輕量級物聯(lián)網(wǎng)消息傳輸協(xié)議。IBM 公司的安迪·斯坦福-克拉

    2024年02月21日
    瀏覽(26)
  • MQTT協(xié)議原理介紹及如何使用emqx

    MQTT協(xié)議原理介紹及如何使用emqx

    MQTT(Message Queuing Telemetry Transport)協(xié)議是一種輕量級的、基于發(fā)布/訂閱模式的通信協(xié)議。它最初由IBM開發(fā),用于在低帶寬和不穩(wěn)定的網(wǎng)絡環(huán)境中傳輸小型數(shù)據(jù)包。MQTT協(xié)議被廣泛應用于物聯(lián)網(wǎng)(IoT)領域,例如傳感器數(shù)據(jù)采集、遠程監(jiān)控和控制等。 MQTT協(xié)議使用了一種異步的、

    2024年02月12日
    瀏覽(19)
  • uniAPP開發(fā)小程序使用MQTT通訊EMQX Cloud

    uniAPP開發(fā)小程序使用MQTT通訊EMQX Cloud

    首先感謝大佬 參考案例 下載并安裝工具 1.Hbuilderx 2. nodejs 3.MQTTX 鏈接放這,自己下載安裝 MQTT服務器:EMQX 第一步:測試MQTTX通訊 1.記住這地址, 你的服務器地址 2.隨便創(chuàng)建幾個用戶 3.打開MQTTX 填入剛剛的服務器地址 注意我選的參數(shù) 用戶就是上圖的用戶和密碼 點擊連接,成功

    2024年01月15日
    瀏覽(18)
  • EMQX(MQTT)----基本用法以及使用Python程序進行模擬流程

    EMQX(MQTT)----基本用法以及使用Python程序進行模擬流程

    ????????EMQX是大規(guī)模分布式物聯(lián)網(wǎng)MQTT消息服務器,除了發(fā)送接送的流量不能太大(不能用于生產(chǎn)?。?,在學習MQTT方面上有很大的優(yōu)勢的! ? ? ? ? 在使用該協(xié)議時,主要需要弄懂的一個知識點就是“發(fā)布者”和“訂閱者”的關系,在最簡單的模型中,一般會含有以上兩

    2023年04月24日
    瀏覽(13)
  • 搭建自己的MQTT服務器,實現(xiàn)設備上云(Ubuntu+EMQX)

    搭建自己的MQTT服務器,實現(xiàn)設備上云(Ubuntu+EMQX)

    這篇文章教大家在ECS云服務器上部署EMQX,搭建自己私有的MQTT服務器,配置EMQX實現(xiàn)設備上云,設備數(shù)據(jù)轉發(fā),存儲;服務器我采用的華為云的ECS服務器,系統(tǒng)選擇Ubuntu系統(tǒng)。 Windows版本的看這里: https://blog.csdn.net/xiaolong1126626497/article/details/134280836 EMQX是一款大規(guī)??蓮椥陨炜s

    2024年02月04日
    瀏覽(21)
  • C#通過MQTT與其他物聯(lián)網(wǎng)設備通信

    MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協(xié)議,常用于物聯(lián)網(wǎng)設備之間的通信。在C#中,我們可以使用MQTT庫來實現(xiàn)與其他物聯(lián)網(wǎng)設備之間的通信,本文將介紹如何使用C#中的MQTT庫進行通信。 一、安裝MQTT庫 C#中有多個MQTT庫可供選擇,例如M2Mqtt、MQTTnet等,本文

    2024年02月16日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包