国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布

這篇具有很好參考價(jià)值的文章主要介紹了MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、MQTT介紹

1.1 什么是MQTT?

MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。

MQTT最大優(yōu)點(diǎn)在于用極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

MQTT具有協(xié)議簡(jiǎn)潔、輕巧、可擴(kuò)展性強(qiáng)、低開銷、低帶寬占用等優(yōu)點(diǎn),已經(jīng)有PHP,JAVA,Python,C,C#,Go等多個(gè)語言版本,基本可以使用在任何平臺(tái)上。在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用,特別適合用來當(dāng)做物聯(lián)網(wǎng)的通信協(xié)議。

1.2 MQTT特點(diǎn)

MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。

MQTT協(xié)議是為硬件性能有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:

  • 1.使用發(fā)布/訂閱消息模式,提供多對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
  • 2.對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;
  • 3.使用TCP/IP 提供網(wǎng)絡(luò)連接;
  • 4.支持三種消息發(fā)布服務(wù)質(zhì)量(QoS):
    • QoS 0(最多一次):消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這個(gè)級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次數(shù)據(jù)無所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
    • QoS 1(至少一次):確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
    • QoS 2(只有一次):確保消息到達(dá)一次。這個(gè)級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
  • 5.傳輸數(shù)據(jù)小,開銷很?。ü潭ㄩL(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;(用極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。)

1.3 MQTT應(yīng)用場(chǎng)景

MQTT作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有著廣泛的應(yīng)用。MQTT服務(wù)只負(fù)責(zé)消息的接收和傳遞,應(yīng)用系統(tǒng)連接到MQTT服務(wù)器后,可以實(shí)現(xiàn)采集數(shù)據(jù)接收、解析、業(yè)務(wù)處理、存儲(chǔ)入庫(kù)、數(shù)據(jù)展示等功能。常見的應(yīng)用場(chǎng)景主要有以下幾個(gè)方面:

(1)消息推送: 如PC端的推送公告,比如安卓的推送服務(wù),還有一些即時(shí)通信軟件如微信、易信等也是采用的推送技術(shù)。

(2)智能點(diǎn)餐: 通過MQTT消息隊(duì)列產(chǎn)品,消費(fèi)者可在餐桌上掃碼點(diǎn)餐,并與商家后端系統(tǒng)連接實(shí)現(xiàn)自助下單、支付。

(3)信息更新: 實(shí)現(xiàn)商場(chǎng)超市等場(chǎng)所的電子標(biāo)簽、公共場(chǎng)所的多媒體屏幕的顯示更新管理。

(4)掃碼出站: 最常見的停車場(chǎng)掃碼繳費(fèi),自動(dòng)起竿;地鐵閘口掃碼進(jìn)出站。

二、MQTT的角色組成

2.1 MQTT的客戶端和服務(wù)端

2.1.1 服務(wù)端(Broker)

EMQX就是一個(gè)MQTT的Broker,emqx只是基于erlang語言開發(fā)的軟件而已,其它的MQ還有ActiveMQ、RabbitMQ、HiveMQ等等。

EMQX服務(wù)端:下載 EMQX

2.1.2 客戶端(發(fā)布/訂閱)

EMQX客戶端:MQTTX:全功能 MQTT 客戶端工具

這個(gè)是用來測(cè)試驗(yàn)證的客戶端,實(shí)際項(xiàng)目是通過代碼來實(shí)現(xiàn)我們消息的生產(chǎn)者和消費(fèi)者。

2.2 MQTT中的幾個(gè)概念

相比RabbitMQ等消息隊(duì)列,MQTT要相對(duì)簡(jiǎn)單一些,只有Broker、Topic、發(fā)布者、訂閱者等幾部分構(gòu)成。接下來我們先簡(jiǎn)單整理下MQTT日常使用中最常見的幾個(gè)概念:

  • 1.Topic主題:MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進(jìn)行.業(yè)務(wù)邏輯處理, 主題是消息的通道
  • 2.生產(chǎn)者:MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
  • 3.消費(fèi)者:MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
  • 4.broker服務(wù):消息轉(zhuǎn)發(fā)器, 消息是通過它來承載的, EMQX就是我們的broker, 在使用中我們不用關(guān)心它的具體實(shí)現(xiàn)

其實(shí), MQTT的使用流程就是: 生產(chǎn)者給broker的某個(gè)topic發(fā)消息->broker通過topic進(jìn)行消息的傳遞->訂閱該主題的消費(fèi)者拿到消息并進(jìn)行相應(yīng)的業(yè)務(wù)邏輯

三、EMQX的安裝和使用

下面以Windows為例,演示W(wǎng)indows下如何安裝和使用EXQX。

step 1:下載EMQ安裝包,配置EMQ環(huán)境

EMQX服務(wù)端:下載 EMQX

step 2:下載壓縮包解壓,cmd進(jìn)入bin文件夾

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

step 3:?jiǎn)?dòng)EMQX服務(wù)

在命令行輸入:emqx start 啟動(dòng)服務(wù),打卡瀏覽器輸入:http://localhost:18083/ 進(jìn)入登錄頁(yè)面。默認(rèn)用戶名密碼 admin/public 。登錄成功后,會(huì)進(jìn)入emqx的后臺(tái)管理頁(yè)面,如下圖所示:

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

四、使用SpringBoot整合MQTT協(xié)議

前面介紹了MQTT協(xié)議以及如何安裝和啟動(dòng)MQTT服務(wù)。接下來演示如何在SpringBoot項(xiàng)目中整合MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布。

4.1 創(chuàng)建工程

首先,創(chuàng)建spring-boot-starter-mqtt父工程,在父工程下分別創(chuàng)建消息的提供者spring-boot-starter-mqtt-provider 模塊和消息的消費(fèi)者spring-boot-starter-mqtt-consumer模塊。

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

4.2 實(shí)現(xiàn)生產(chǎn)者

接下來,修改生產(chǎn)者模塊spring-boot-starter-mqtt-provider 相關(guān)的代碼,實(shí)現(xiàn)消息發(fā)布的功能模塊。

4.2.1 導(dǎo)入依賴包

修改pom.xml 文件,添加MQTT相關(guān)依賴,具體示例代碼如下所示:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.3.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

    </dependencies>
4.2.2 修改配置文件

修改application.yml配置文件,增加MQTT相關(guān)配置。示例代碼如下所示:

spring:
  application:
    name: provider
    #MQTT配置信息
  mqtt:
    #MQTT服務(wù)地址,端口號(hào)默認(rèn)11883,如果有多個(gè),用逗號(hào)隔開
    url: tcp://127.0.0.1:11883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復(fù))
    client:
      id: provider-id
    #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口是指定
    default:
      topic: topic
server:
  port: 8080
4.2.3 消息生產(chǎn)者客戶端配置

創(chuàng)建MqttProviderConfig配置類,讀取application.yml中的相關(guān)配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
@Slf4j
public class MqttProviderConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對(duì)象
     */
    private MqttClient client;

    /**
     * 在bean初始化后連接到服務(wù)器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務(wù)端
     */
    public void connect(){
        try{
            //創(chuàng)建MQTT客戶端對(duì)象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設(shè)置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
            //設(shè)置為true表示每次連接服務(wù)器都是以新的身份
            options.setCleanSession(true);
            //設(shè)置連接用戶名
            options.setUserName(username);
            //設(shè)置連接密碼
            options.setPassword(password.toCharArray());
            //設(shè)置超時(shí)時(shí)間,單位為秒
            options.setConnectionTimeout(100);
            //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
            //設(shè)置回調(diào)
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    }
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主題的目的地,用于發(fā)布/訂閱信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一種機(jī)制來跟蹤消息的傳遞進(jìn)度
        //用于在以非阻塞方式(在后臺(tái)運(yùn)行)執(zhí)行發(fā)布是跟蹤消息的傳遞進(jìn)度
        MqttDeliveryToken token;
        try {
            //將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
            //一旦此方法干凈地返回,消息就已被客戶端接受發(fā)布,當(dāng)連接可用,將在后臺(tái)完成消息傳遞。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
4.2.4 生產(chǎn)者客戶端消息回調(diào)

創(chuàng)建MqttProviderCallBack類并繼承MqttCallback,實(shí)現(xiàn)相關(guān)消息回調(diào)事件,示例代碼如下圖所示:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttConsumerCallBack implements MqttCallback{

    /**
     * 客戶端斷開連接的回調(diào)
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("與服務(wù)器斷開連接,可重連");
    }

    /**
     * 消息到達(dá)的回調(diào)
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    /**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
4.2.5 創(chuàng)建Controller控制器實(shí)現(xiàn)消息發(fā)布功能

創(chuàng)建SendController控制器類,實(shí)現(xiàn)消息的發(fā)送功能,示例代碼如下所示:

@Controller
public class SendController {
    @Autowired
    private MqttProviderConfig providerClient;

    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            providerClient.publish(qos, retained, topic, message);
            return "發(fā)送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "發(fā)送失敗";
        }
    }
}

4.3 實(shí)現(xiàn)消費(fèi)者

前面完成了生成者消息發(fā)布的模塊,接下來修改消費(fèi)者模塊spring-boot-starter-mqtt-consumer實(shí)現(xiàn)消息訂閱、處理的功能。

4.3.1 導(dǎo)入依賴包

修改pom.xml 文件,添加MQTT相關(guān)依賴,具體示例代碼如下所示:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.3.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

    </dependencies>
4.3.2 修改配置文件

修改application.yml配置文件,增加MQTT相關(guān)配置。示例代碼如下所示:

spring:
  application:
    name: consumer
  #MQTT配置信息
  mqtt:
    #MQTT服務(wù)端地址,端口默認(rèn)為11883,如果有多個(gè),用逗號(hào)隔開
    url: tcp://127.0.0.1:11883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復(fù))
    client:
      id: consumer-id
    #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口時(shí)指定
    default:
      topic: topic
server:
  port: 8085
4.3.3 消費(fèi)者客戶端配置

創(chuàng)建消費(fèi)者客戶端配置類MqttConsumerConfig,讀取application.yml中的相關(guān)配置,并初始化創(chuàng)建MQTT的連接。示例代碼如下所示:

import javax.annotation.PostConstruct;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對(duì)象
     */
    private MqttClient client;

    /**
     * 在bean初始化后連接到服務(wù)器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務(wù)端
     */
    public void connect(){
        try {
            //創(chuàng)建MQTT客戶端對(duì)象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設(shè)置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
            //設(shè)置為true表示每次連接到服務(wù)端都是以新的身份
            options.setCleanSession(true);
            //設(shè)置連接用戶名
            options.setUserName(username);
            //設(shè)置連接密碼
            options.setPassword(password.toCharArray());
            //設(shè)置超時(shí)時(shí)間,單位為秒
            options.setConnectionTimeout(100);
            //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
            //設(shè)置回調(diào)
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //訂閱主題
            //消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶端推送消息
            int[] qos = {1,1};
            //主題
            String[] topics = {"topic1","topic2"};
            //訂閱主題
            client.subscribe(topics,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開連接
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 訂閱主題
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
4.3.4 消費(fèi)者客戶端消息回調(diào)

創(chuàng)建MqttConsumerCallBack類并繼承MqttCallback,實(shí)現(xiàn)相關(guān)消息回調(diào)事件,示例代碼如下圖所示:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttConsumerCallBack implements MqttCallback{

    /**
     * 客戶端斷開連接的回調(diào)
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("與服務(wù)器斷開連接,可重連");
    }

    /**
     * 消息到達(dá)的回調(diào)
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    /**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
4.3.5 創(chuàng)建Controller控制器,實(shí)現(xiàn)MQTT連接的建立和斷開

接下來,創(chuàng)建Controller控制器MqttController,并實(shí)現(xiàn)MQTT連接的建立和斷開等方法。示例代碼如下所示:

import com.weiz.mqtt.config.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class MqttController {
    @Autowired
    private MqttConsumerConfig client;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @RequestMapping("/connect")
    @ResponseBody
    public String connect(){
        client.connect();
        return clientId + "連接到服務(wù)器";
    }

    @RequestMapping("/disConnect")
    @ResponseBody
    public String disConnect(){
        client.disConnect();
        return clientId + "與服務(wù)器斷開連接";
    }
}

4.4 測(cè)試驗(yàn)證

首先,分別啟動(dòng)生產(chǎn)者spring-boot-starter-mqtt-provider 和消費(fèi)者spring-boot-starter-mqtt-consumer兩個(gè)項(xiàng)目,打開瀏覽器,輸入地址http://localhost:18083/,在EMQX管理界面可以看到連接上來的兩個(gè)客戶端。如下圖所示:

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

接下來,調(diào)用生產(chǎn)者的消息發(fā)布接口驗(yàn)證消息發(fā)布是否成功。使用Pomstman調(diào)用消息發(fā)送接口:http://localhost:8080/sendMessage ,如下圖所示:

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

通過上圖可以發(fā)現(xiàn),生產(chǎn)者模塊已經(jīng)把消息發(fā)送成功。接下來查看消費(fèi)者模塊,驗(yàn)證消息是否處理成功。如下圖所示:

MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布,Java,SpringBoot,spring boot,java

通過日志輸出可以發(fā)現(xiàn),消費(fèi)者已經(jīng)成功接收到生產(chǎn)者發(fā)送的消息,說明我們成功實(shí)現(xiàn)在Spring Boot項(xiàng)目中整合MQTT實(shí)現(xiàn)了消息的發(fā)布和訂閱的功能。文章來源地址http://www.zghlxwxcb.cn/news/detail-693343.html

到了這里,關(guān)于MQTT,如何在SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包