国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

EMQ的介紹及整合SpringBoot的使用

這篇具有很好參考價(jià)值的文章主要介紹了EMQ的介紹及整合SpringBoot的使用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

首先先了解一下底層的協(xié)議:

1. MQTT

MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱 (publish/subscribe)模式的"輕量級"通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。 MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作 為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng) 用。

由于物聯(lián)網(wǎng)的環(huán)境是非常特別的,所以MQTT遵循以下設(shè)計(jì)原則:

(1)精簡,不添加可有可無的功能;
(2)發(fā)布/訂閱(Pub/Sub)模式,方便消息在傳感器之間傳遞,解耦Client/Server模式,帶來的 好處在于不必預(yù)先知道對方的存在(ip/port),不必同時(shí)運(yùn)行;
(3)允許用戶動態(tài)創(chuàng)建主題(不需要預(yù)先創(chuàng)建主題),零運(yùn)維成本;
(4)把傳輸量降到最低以提高傳輸效率;
(5)把低帶寬、高延遲、不穩(wěn)定的網(wǎng)絡(luò)等因素考慮在內(nèi);
(6)支持連續(xù)的會話保持和控制(心跳);
(7)理解客戶端計(jì)算能力可能很低;
(8)提供服務(wù)質(zhì)量( quality of service level:QoS)管理
(9)不強(qiáng)求傳輸數(shù)據(jù)的類型與格式,保持靈活性(指的是應(yīng)用層業(yè)務(wù)數(shù)據(jù))

2. MQTT協(xié)議實(shí)現(xiàn)方式:

MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:

(1)Topic, 可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內(nèi)容 (payload);

(2)payload, 可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。

3. Qos: 消息服務(wù)質(zhì)量(Quality of Service)

MQTT 設(shè)計(jì)了 3 個(gè) QoS 等級。

理解: Qos: 規(guī)定自己想要發(fā)出,或者接收到的消息的規(guī)則

  • QoS 0:消息最多傳遞一次,如果當(dāng)時(shí)客戶端不可用,則會丟失該消息。

“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因?yàn)椴痪煤筮€會有第二次發(fā)送。這一種方 式主要普通APP的推送,倘若你的智能設(shè)備在消息推送時(shí)未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就 收不到了。

  • QoS 1:消息傳遞至少 1 次。

“至少一次”,確保消息到達(dá),但消息重復(fù)可能會發(fā)生

  • QoS 2:消息僅傳送一次。

“只有一次”,確保消息到達(dá)一次。在一些要求比較嚴(yán)格的計(jì)費(fèi)系統(tǒng)中,可以使用此級別。在計(jì)費(fèi) 系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。這種最高質(zhì)量的消息發(fā)布服務(wù)還可以用于即時(shí)通訊類的 APP的推送,確保用戶收到且只會收到一次

MQTT 發(fā)布與訂閱操作中的 QoS 代表了不同的含義:
發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS
訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS。

基本都是用QoS2

MQTT 發(fā)布與訂閱操作中的 QoS 代表了不同的含義,發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS,訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS。

  • 當(dāng)客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 B 的訂閱 QoS。
  • 當(dāng)客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 A 的發(fā)布 QoS。
總結(jié): 也就是說QoS這個(gè)東西的設(shè)置是對消息的接收來做保證的,即如果是
QoS 1 我只負(fù)責(zé)發(fā)一次,收得到收不到我不管. 當(dāng)網(wǎng)絡(luò)狀態(tài)不穩(wěn)定的時(shí)候就會出現(xiàn)丟失現(xiàn)象
QoS 2 是能夠保證至少收到一次,但是存在重復(fù)消費(fèi)的問題. 
QoS 3 保證只有一條信息到達(dá)  

* 如果發(fā)布和訂閱的客戶端服務(wù)質(zhì)量等級不相同時(shí),誰的低按誰的為準(zhǔn)
4. Topic通配符匹配規(guī)則
  1. 層級分隔符 /

/ 用來分割主題樹的每一層,并給主題空間提供分等級的結(jié)構(gòu)。當(dāng)兩個(gè)通配符在一個(gè)主題中出現(xiàn)的 時(shí)候,主題層次分隔符的使用是很重要的。

示例:
love/you/with/all/my/heart
  1. 多層通配符 #

多層通配符有可以表示大于等于0的層次。因此,love/#也可匹配到單獨(dú)的love,此時(shí)#代表0 層。

多層通配符一定要是主題樹的最后一個(gè)字符。比如說,love/#是有效的,但是love/#/with是無效 的。

  1. 單層通配符 +

只匹配主題的一層

1. love/you/+ :匹配love/you/with和love/you/and,但是不匹配
love/you/with/all/my/heart。
2. 單層通配符只匹配1層,love/+不匹配love。
3. 單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和love/+都是有效的,但是love+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,love/+和love/+/with都是有效。

通配符注意事項(xiàng):

1.主題層次分隔符被用來在主題中引入層次。多層的通配符和單層通配符可以被使用,但他們不能被使用來做發(fā)布者的消息。
2.Topic命名盡量見名知意,符合規(guī)范,主題名字是大小寫敏感的。比如說,love和LOVE是兩個(gè)不同的主題。
3.以/開頭會產(chǎn)生一個(gè)不同的主題。比如說,/love與love不同。/love匹配"+/+"和/+,但不匹配+
4.不要在任何主題中包含null(Unicode \x0000)字符。
5.在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
6.可以有任意數(shù)目的根節(jié)點(diǎn);也就是說,可以有任意數(shù)目的主題樹。
1.EMQ X

EMQ X 是開源社區(qū)中最流行的 MQTT 消息服務(wù)器。

Windows安裝emqx_windows emqx安裝_羅小爬EX的博客-CSDN博客

優(yōu)點(diǎn):

單機(jī)能支持百萬的 MQTT 連接;集群能支持千 萬級別的 MQTT 連接;

支持豐富的物聯(lián)網(wǎng)協(xié)議,包括 MQTT、MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等;

2. Dashboard(可視化界面)

EMQ X 提供了 Dashboard 以方便用戶管理設(shè)備與監(jiān)控相關(guān)指標(biāo)。通過 Dashboard可以查看服務(wù)器基本 信息、負(fù)載情況和統(tǒng)計(jì)數(shù)據(jù),可以查看某個(gè)客戶端的連接狀態(tài)等信息甚至斷開其連接,也可以動態(tài)加載 和卸載指定插件。除此之外,EMQ X Dashboard 還提供了規(guī)則引擎的可視化操作界面,同時(shí)集成了一 個(gè)簡易的 MQTT 客戶端工具供用戶測試使用。

3. MQTTX

模擬客戶端

MQTTX:跨平臺 MQTT 5.0 桌面客戶端工具

MQTT X 是 EMQ 開源的一款優(yōu)雅的跨平臺 MQTT 5.0 桌面客戶端,它支持 macOS, Linux, Windows。 MQTT X 的 UI 采用了聊天界面形式,簡化了頁面操作邏輯,用戶可以快速創(chuàng)建連接,允許保存多個(gè)客 戶端,方便用戶快速測試 MQTT/MQTTS 連接,及 MQTT 消息的訂閱和發(fā)布。

發(fā)送消息

這里一定要記住是在哪方加前綴

1. 消息延遲發(fā)布

此功能由 emqx_mod_delayed 模塊提供,需要開啟模塊后才能使用此功能。

$delayed/{DelayInteval}/{TopicName} 單位: S

當(dāng)客戶端使 用特殊主題前綴 $delayed/{DelayInteval} 發(fā)布消息到 EMQ X 時(shí),將觸發(fā)延遲發(fā)布功能

示例:

$delayed/15/x/y : 15 秒后將 MQTT 消息發(fā)布到主題 x/y 。

$delayed/60/a/b : 1 分鐘后將 MQTT 消息發(fā)布到 a/b

示例: 在MQTT X上演示

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

現(xiàn)在在MQTTX上模擬四個(gè)客戶端: 洗衣機(jī),空調(diào),電視機(jī),手機(jī)

現(xiàn)在讓洗衣機(jī),空調(diào),電視機(jī)訂閱phoneMessage這個(gè)主題, 消息服務(wù)質(zhì)量設(shè)置為2,以后都設(shè)置為2

各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:

洗衣機(jī)客戶端訂閱主題: phoneMessage
空調(diào)客戶端訂閱主題:   phoneMessage
電視機(jī)客戶端訂閱主題: phoneMessage

手機(jī)發(fā)送的主題: $delayed/2/phoneMessage       // 2 秒后發(fā)送到訂閱phoneMessage的客戶端
emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

在手機(jī)客戶端上發(fā)送消息: 注意此時(shí)特殊主題的前綴在發(fā)布者上

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

兩秒后,其他訂閱這個(gè)主題的客戶端都收到了消息.

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)
2. 共享訂閱

注意注意注意: ??

共享訂閱的主題格式是針對訂閱端來指定的,例如: $share/group1/cookie ;而消息的發(fā)布方是向主 題: cookie 發(fā)布消息。這樣在訂閱方才能達(dá)到負(fù)載均衡的效果。

共享訂閱是在多個(gè)訂閱者之間實(shí)現(xiàn)負(fù)載均衡的訂閱方式:

EMQ X 支持兩種格式的共享訂閱前綴:

示例 前綴 真實(shí)主題名
方式一(不帶群組的共享訂閱) $queue/name/cookie $queue/ name/cookie
方式二(帶群組的共享訂閱) $share/group1/cookie $share/group1/ cookie

應(yīng)用場景:

1?? . 對于方式一不帶群組的共享訂閱:

在我們?nèi)″X完成時(shí),我們要求需要向當(dāng)前用戶的手機(jī)發(fā)送一條短信, 為了提高服務(wù)的容錯性,我們準(zhǔn)備了多臺發(fā)短信的服務(wù), 但是我們要求只發(fā)送一條短信,此時(shí)我們就可以使用 方式一, 使只有一臺機(jī)器收到發(fā)送短信的消息如下, 只會有一臺機(jī)器收到

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:

消息服務(wù)實(shí)例1訂閱的主題:  $queue/cookie
消息服務(wù)實(shí)例2訂閱的主題:  $queue/cookie
消息服務(wù)實(shí)例3訂閱的主題:  $queue/cookie

發(fā)布者發(fā)布的主題:               cookie
2?? 對于方式二帶群組的共享訂閱:

$share/group1/cookie

帶分組的使用的特殊前綴是: $share/{group}/{TopicName}

使用場景如下:

在方式一的上面做一點(diǎn)點(diǎn)更改, 就是在發(fā)布者消息發(fā)送成功后, 不僅僅需要有一臺消息服務(wù)實(shí)例去發(fā)送消息, 也需要有一臺郵件服務(wù)實(shí)例去發(fā)送郵件. 所以這里就可以進(jìn)行分組

message 組 : 消息服務(wù)實(shí)例1 消息服務(wù)實(shí)例2 消息服務(wù)實(shí)例2

email組: 郵件服務(wù)實(shí)例1 郵件服務(wù)實(shí)例2

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:

(message組)
    消息服務(wù)實(shí)例1   $share/message/cookie
    消息服務(wù)實(shí)例2   $share/message/cookie
    消息服務(wù)實(shí)例2   $share/message/cookie

(email組)
    郵件服務(wù)實(shí)例1   $share/email/cookie
    郵件服務(wù)實(shí)例2   $share/email/cookie

    發(fā)布者實(shí)例       cookie

此時(shí)就可以保證兩個(gè)組中都會有一個(gè)實(shí)例服務(wù)收到消息.

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

負(fù)載均衡策略修改默認(rèn) 隨機(jī)

在EMQ X 服務(wù)的 etc/emqx.conf中修改

broker.shared_subscription_strategy = random

負(fù)載均衡策略 描述
random 在所有訂閱中隨機(jī)選擇
round_robin 按照訂閱順序輪詢
sticky 一直發(fā)往上次選取的訂閱者
hash 按照發(fā)布者ClientID的哈希值
原生Mqtt代碼的使用??
1.依賴導(dǎo)入
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>
2. 編寫Controller類

說明下面的一些配置也可以配置在yml文件中,這里只是演示(消息的發(fā)送和接受)所以就直接配置通過Set方法配置了

可以通過@ConfigurationProperties(prefix = "")寫一個(gè)配置類

這里的server.port= 8888

@RestController
public class TestController {
    
    /**
     * 發(fā)布消息
     * @throws MqttException
     */
    @GetMapping("/publish")
    public void publish() throws MqttException {

        MqttClientPersistence persistence = new MemoryPersistence();  //內(nèi)存持久化
        // 服務(wù)器地址以及本機(jī)的clientid
        MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
        //連接選項(xiàng)中定義用戶名密碼和其它配置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);//參數(shù)為true表示清除緩存,也就是非持久化訂閱者,這個(gè)時(shí)候只要參數(shù)設(shè)為true,一定是非持久化訂閱者。而參數(shù)設(shè)為false時(shí),表示服務(wù)器保留客戶端的連接記錄
        options.setAutomaticReconnect(true);//是否自動重連
        options.setConnectionTimeout(30);//連接超時(shí)時(shí)間  秒
        options.setKeepAliveInterval(10);//連接保持檢查周期  秒
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
        client.connect(options);//連接
        client.publish("testTopic", "發(fā)送內(nèi)容".getBytes(), 2, false);
    }


    /**
     * 訂閱消息
     * @throws MqttException
     */
    @GetMapping("/subscribe")
    public void subscribe() throws MqttException {

        MqttClientPersistence persistence = new MemoryPersistence();;//內(nèi)存持久化
        // 服務(wù)器地址以及本機(jī)的clientid
        MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "xyz", persistence);

        //連接選項(xiàng)中定義用戶名密碼和其它配置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);//參數(shù)為true表示清除緩存,也就是非持久化訂閱者,這個(gè)時(shí)候只要參數(shù)設(shè)為true,一定是非持久化訂閱者。而參數(shù)設(shè)為false時(shí),表示服務(wù)器保留客戶端的連接記錄
        options.setAutomaticReconnect(true);//是否自動重連
        options.setConnectionTimeout(30);//連接超時(shí)時(shí)間  秒
        options.setKeepAliveInterval(10);//連接保持檢查周期  秒
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
        
		// 設(shè)置回調(diào)函數(shù)    匿名內(nèi)部類
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("連接丟失!");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println( "接收到消息  topic:" +s+"  id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }

            @Override
            public void connectComplete(boolean b, String s) {
                System.out.println("連接成功!");
            }
        });
        client.connect(options);//連接
        client.subscribe("testTopic");  //訂閱主題

    }
}

訪問: http://localhost:8888/publish

訪問: http://localhost:8888/subscribe

第一次訪問兩個(gè)地址,首先會先EMQ注冊自己,和EMQ建立連接(但是一定要快,如果到了10秒任務(wù)你就掛了)

等到第二次訪問,已經(jīng)建立過,所以就可以發(fā)送消息,和接收消息,此時(shí)再看控制臺.

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

OK,原始的方法建立完畢

現(xiàn)在可以設(shè)想,我們在一個(gè)項(xiàng)目中一般不會只發(fā)一次請求, 通過上面的代碼我們可以發(fā)現(xiàn)每一次的發(fā)送其實(shí)有絕大部分內(nèi)容是相同的.對于發(fā)送者來說我們要變的只有主題和消息 而對于接收者來說需要根據(jù)不同的主題來設(shè)計(jì)不同的處理方案, 即根據(jù)不同的主題調(diào)用不同的方案來解決(是不是有一點(diǎn)多態(tài)的意思了), 那么有沒有一種解決方案來解決這樣的問題. bingo~ 必然.

首先先認(rèn)識一個(gè)設(shè)計(jì)模式: 策略模式 (解決多個(gè) if-else 問題)

策略模式詳解 - 知乎 (zhihu.com)

在策略模式(Strategy Pattern)中,一個(gè)類的行為或其算法可以在運(yùn)行時(shí)更改。我們要處理的就是上面高亮的部分

在策略模式中,我們創(chuàng)建表示各種策略的對象和一個(gè)行為隨著策略對象改變而改變的 context 對象。策略對象改變 context 對象的執(zhí)行算法。

大家可以看上面知乎中的鏈接的內(nèi)容; 理解一下在支付時(shí)的策略模式的使用,然后在看我們自己的實(shí)現(xiàn).

使用策略模式改造

注:
一般的策略模式大概是這樣:

  • 定義策略接口
  • 定義不同策略實(shí)現(xiàn)類
  • 提供策略工廠,便于根據(jù)策略枚舉獲取不同策略實(shí)現(xiàn)

而在策略比較簡單的情況下,我們完全可以用枚舉代替策略工廠,簡化策略模式。

1. 配置文件
server:
  port: 8082
mqtt:
  client:
    username: admin
    password: public
    serverURI: tcp://192.168.200.128:1883
    clientId: monitor.task.${random.int[1000,9999]} # 注意: emq的客戶端id 不能重復(fù)
    keepAliveInterval: 10  #連接保持檢查周期  秒
    connectionTimeout: 30 #連接超時(shí)時(shí)間  秒
  producer:
    defaultQos: 2
    defaultRetained: false
    defaultTopic: topic/test1
  consumer:
    consumerTopics: $queue/cookie/#, $share/group1/yfs1024  #不帶群組的共享訂閱    多個(gè)主題逗號隔開
    # $queue/cookie/#
    # 以$queue開頭,不帶群組的共享訂閱   多個(gè)客戶端只能有一個(gè)消費(fèi)者消費(fèi)

    # $share/group1/yfs1024
    # 以$share開頭,群組的共享訂閱 多個(gè)客戶端訂閱
    # 如果在一個(gè)組 只能有一個(gè)消費(fèi)者消費(fèi)
    # 如果不在一個(gè)組 都可以消費(fèi)

對應(yīng)配置類

@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "mqtt.client")
public class MqttProperties {
    private int defaultProducerQos;
    private boolean defaultRetained;
    private String defaultTopic;
    private String username;
    private String password;
    private String serverURI;
    private String clientId;
    private int keepAliveInterval;
    private int connectionTimeout;
}

2. 封裝消息發(fā)送者

下面通過方法重載的方式, 接收不同個(gè)數(shù)的參數(shù)

@Component
@Slf4j
public class MqttProducer {
	// @Value() 讀取配置 當(dāng)然也可以批量讀取配置,這里就一個(gè)一個(gè)了
    @Value("${mqtt.producer.defaultQos}")
    private int defaultProducerQos;
    @Value("${mqtt.producer.defaultRetained}")
    private boolean defaultRetained;
    @Value("${mqtt.producer.defaultTopic}")
    private String defaultTopic;

    @Autowired
    private MqttClient mqttClient;

    public void send(String payload) {
        this.send(defaultTopic, payload);
    }

    public void send(String topic, String payload) {
        this.send(topic, defaultProducerQos, payload);
    }

    public void send(String topic, int qos, String payload) {
        this.send(topic, qos, defaultRetained, payload);
    }

    public void send(String topic, int qos, boolean retained, String payload) {
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            log.error("publish msg error.",e);
        }
    }

    public <T> void send(String topic, int qos, T msg) throws JsonProcessingException {
        String payload = JsonUtil.serialize(msg);
        this.send(topic,qos,payload);
    }
}
3. 定義配置類

說明一下,一般對于一臺服務(wù)來說發(fā)送者和接受者使用一個(gè)mqtt連接,所以這里配置了一個(gè)mqttClient

@Configuration
@Data
@Slf4j
public class MqttConfig {

    // 注入配置
    @Autowired
    private MqttProperties mqtt;
    // 注入回調(diào)函數(shù)
    @Autowired
    private MqttCallback mqttCallback;
    
    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClient client = new MqttClient(mqtt.getServerURI(), mqtt.getClientId(), new MemoryPersistence());
            client.setManualAcks(true); //設(shè)置手動消息接收確認(rèn)
            mqttCallback.setMqttClient(client);
            client.setCallback(mqttCallback);
            client.connect(mqttConnectOptions());
            return client;
        } catch (MqttException e) {
            log.error("emq connect error",e);
            return null;
        }
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        // 下面有一些配置是寫死的, 如果項(xiàng)目需要最好還是寫配置文件中, 這樣后面可以通過注冊中心熱更新配置文件
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqtt.getUsername());
        options.setPassword(mqtt.getPassword().toCharArray());
        options.setAutomaticReconnect(true);//是否自動重新連接
        options.setCleanSession(true);//是否清除之前的連接信息
        options.setConnectionTimeout(mqtt.getConnectionTimeout());//連接超時(shí)時(shí)間
        options.setKeepAliveInterval(mqtt.getKeepAliveInterval());//心跳
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//設(shè)置mqtt版本

        return options;
    }
}

簡單測試一下:

這里發(fā)送一個(gè)對象

@Data
public class User {
    private String id;
    private String name;
    private Integer age;
}
@RestController
public class MyPublish {

    @Autowired
    private MqttProducer mqttProducer;

    @GetMapping("/testPublish")
    public void testSend(){
        User user = new User();
        user.setId("123");
        user.setName("張三");
        user.setAge(18);
        mqttProducer.send("cookie",2,user);
    }

}

訪問: localhost:9999/testPublish

OK,現(xiàn)在消息的發(fā)送是已經(jīng)完成了

我們在Dashboard中訂閱這個(gè)主題并查看結(jié)果:

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

發(fā)送消息已經(jīng)沒有問題了, 注入MqttProducer調(diào)用send方法就可以了, 下面就來解決消息接收的問題.

還記得上面說的嘛, 對于接收來說,主要的瓶頸在于根據(jù)不同的主題來處理不同的消息, 比如訂閱了cookie,和yfs1024這兩個(gè)主題可以通過如下代碼實(shí)現(xiàn):

在接收到的參數(shù)中通過判斷來確定處理的方法:

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
	if(topic.equals("cookie")){
        // cookie 主題的處理邏輯
    }else if(topic.equals("yfs1024")){
        // yfs1024 主題的處理邏輯
    }else if(xxx){ // 其他的主題
	    .........        
    }
}

這種方法可以沒有問題, 但是每次都需要通過修改這里的代碼來處理業(yè)務(wù)邏輯, 就像上面知乎文章所說的雖然寫起來簡單, 但是違反了面向?qū)ο蟮膬蓚€(gè)設(shè)計(jì)原則, 單一職責(zé)原則,開閉原則。

好那么下面就按照策略模式來對接收消息的代碼進(jìn)行改造。

4. 改造接收代碼
1. 定義消息處理接口

大致的邏輯: 因?yàn)槭遣煌闹黝}擁有不同的處理邏輯, 即 一個(gè)主題對應(yīng)一個(gè)處理類, 我們要做的就是通過主題拿到這個(gè)主題的處理類

/**
 * 消息處理接口
 */
public interface MsgHandler{
    void process(String jsonMsg) throws IOException;
}

后面讓每個(gè)主題的處理類來實(shí)現(xiàn)這個(gè)接口就可以

@Component
public class CookieHandler implements MsgHandler {
    @Override
    public void process(String jsonMsg) throws IOException {
        // 解析       這里使用了自己封裝JSON 工具類, 我放最下面了
        User byJson = JsonUtil.getByJson(jsonMsg, User.class);
        System.out.println("CookieHandler 接受到消息:" + byJson);
    }
}

我們還需要根據(jù)主題使用不同的處理器, 怎么辦呢? 可以通過注解中的參數(shù)的值區(qū)分(通過反射獲取)

2. 定義主題注解

用來表示當(dāng)前類所處理的注解.

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Topic {
    String value();
}

對上面進(jìn)行改造如下:

@Component
@Topic("cookie")
public class CookieHandler implements MsgHandler {
    @Override
    public void process(String jsonMsg) throws IOException {
        // 解析       
        User byJson = JsonUtil.getByJson(jsonMsg, User.class);
        System.out.println("CookieHandler 接受到消息:" + byJson);
    }
}
3. 將主題與對應(yīng)處理類建立映射

現(xiàn)在處理主題的方法類有了, 對每個(gè)處理類也做了對應(yīng)主題的標(biāo)記, 那么如果把他們對應(yīng)起來呢? 建立映射, 代碼實(shí)現(xiàn)如下:

定義接口

/**
 * 消息處理上下文, 通過主題拿到topic
 */
public interface MsgHandlerContext{
    MsgHandler getMsgHandler(String topic);
}

/**
 * 消息處理類加載器
 * 作用:
 * 1. 因?yàn)閷?shí)現(xiàn)了Spring 的 ApplicationContextAware 接口, 項(xiàng)目啟動后就會運(yùn)行實(shí)現(xiàn)的方法
 * 2. 獲取MsgHandler接口的所有的實(shí)現(xiàn)類
 * 3. 將實(shí)現(xiàn)類上的Topic注解的值,作為handlerMap的鍵,實(shí)現(xiàn)類(處理器)作為對應(yīng)的值
 */
@Component
public class MsgHandlerContextImp implements ApplicationContextAware, MsgHandlerContext {
    private Map<String, MsgHandler> handlerMap = Maps.newHashMap();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 從spring容器中獲取 <所有> 實(shí)現(xiàn)了MsgHandler接口的對象
        // key 默認(rèn)類名首字母小寫 value 當(dāng)前對象
        Map<String, MsgHandler> map = applicationContext.getBeansOfType(MsgHandler.class);
        map.values().forEach(obj > {
            // 通過反射拿到注解中的值  即 當(dāng)前類處理的 topic
            String topic =  obj.getClass().getAnnotation(Topic.class).value();
		   // 將主題和當(dāng)前主題的處理類建立映射
            handlerMap.put(topic,obj);
        });
    }

    @Override
    public MsgHandler getMsgHandler(String topic) {
        return handlerMap.get(topic);
    }
}

OK, 現(xiàn)在主題和對應(yīng)處理類的問題已經(jīng)完成了. 那么下一步就是怎么在接收消息的回調(diào)函數(shù)中根據(jù)接收到消息的主題,調(diào)用處理類消費(fèi)消息.

其實(shí)我們在上面定義配置的時(shí)候已經(jīng)導(dǎo)入了方法回電函數(shù)的實(shí)現(xiàn)

// 注入回調(diào)函數(shù)
@Autowired
private MqttCallback mqttCallback;

在沒定義之前考慮一下我們要做什么呢?

  1. 在連接成功 之后訂閱所有主題
  2. 處理回調(diào)函數(shù),根據(jù)主題獲取處理器,處理消息內(nèi)容
// 回調(diào)函數(shù)接口的實(shí)現(xiàn)類, 重寫連接丟失,建立連接,
@Component
@Slf4j
public class MqttCallback implements MqttCallbackExtended {
    // 需要訂閱的topic配置
    @Value("${mqtt.consumer.consumerTopics}")
    private List<String> consumerTopics;

    @Autowired
    private MsgHandlerContext msgHandlerContext;

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("emq error.", throwable);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("topic:" + topic + "  message:" + new String(message.getPayload()));
        //處理消息
        String msgContent = new String(message.getPayload());
        log.info("接收到消息:" + msgContent);
        try {
            // 根據(jù)主題名稱 獲取 該主題對應(yīng)的處理器對象
            // 多態(tài) 父類的引用指向子類的對象
            MsgHandler msgHandler = msgHandlerContext.getMsgHandler(topic);
            if (msgHandler == null) {
                return;
            }
            
            msgHandler.process(msgContent); //執(zhí)行
        } catch (IOException e) {
            log.error("process msg error,msg is: " + msgContent, e);
        }
//        mqttService.processMessage(topic, message);
        //處理成功后確認(rèn)消息
        mqttClient.messageArrivedComplete(message.getId(), message.getQos());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    @Override
    public void connectComplete(boolean b, String s) {
        log.info("連接成功");
        //和EMQ連接成功后根據(jù)配置自動訂閱topic
        if (consumerTopics != null && consumerTopics.size() > 0) {
            // 循環(huán)遍歷當(dāng)前項(xiàng)目中配置的所有的主題.
            consumerTopics.forEach(t -> {
                try {
                    log.info(">>>>>>>>>>>>>>subscribe topic:" + t);
                    // 訂閱當(dāng)前集群中所有的主題 消息服務(wù)質(zhì)量 2 -> 至少收到一個(gè)
                    mqttClient.subscribe(t, 2);
                } catch (MqttException e) {
                    log.error("emq connect error", e);
                }
            });
        }
    }

    private MqttClient mqttClient;
	// 在配置類中調(diào)用傳入連接
    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }
}

5. 測試:

使用步驟:

自定義處理類 通過實(shí)現(xiàn)MsgHandler 接口

@Component
@Topic("cookie")
public class CookieHandler implements MsgHandler {
    @Override
    public void process(String jsonMsg) throws IOException {
        // 解析       這里使用了自己封裝JSON 工具類, 我放最下面了
        User byJson = JsonUtil.getByJson(jsonMsg, User.class);
        System.out.println("CookieHandler 接受到消息:" + byJson);
    }
}

此時(shí)要注意配置文件中有沒有對應(yīng)的主題, 我的這里有,采用不帶群組共享訂閱

consumerTopics: $queue/cookie/#, $share/group1/yfs1024  

控制臺有

可以接收到數(shù)據(jù),并且在啟動兩個(gè)示例的時(shí)候也能實(shí)現(xiàn)共享訂閱, 隨機(jī)選擇

emq連接,微服務(wù),網(wǎng)絡(luò),服務(wù)器,java,EMQ,物聯(lián)網(wǎng)

這里注意哈, 如果是測試的話, 訂閱者一定要帶上前綴$queue$share/groupName

JsonUtil
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;

import java.io.IOException;
import java.util.Map;

public class JsonUtil {
    /**
     * 從json字符串中根據(jù)nodeName獲取值
     * @param nodeName
     * @param json
     * @return
     * @throws IOException
     */
    public static String getValueByNodeName(String nodeName, String json) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode jsonNode = objectMapper.readTree(json);
        JsonNode node = jsonNode.findPath(nodeName);
        if(node == null) return null;

        return node.asText();
    }

    /**
     * 根據(jù)nodeName獲取節(jié)點(diǎn)內(nèi)容
     * @param nodeName
     * @param json
     * @return
     * @throws IOException
     */
    public static JsonNode getNodeByName(String nodeName, String json) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();

        return objectMapper.readTree(json).findPath(nodeName);
    }

    /**
     * 反序列化
     * @param json
     * @param clazz
     * @param <T>
     * @return
     * @throws IOException
     */
    public static <T> T getByJson(String json, Class<T> clazz) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        // 在反序列化時(shí)忽略在 json 中存在但 Java 對象不存在的屬性
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        // 在序列化時(shí)日期格式默認(rèn)為 yyyy-MM-dd'T'HH:mm:ss.SSSZ
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

        return mapper.readValue(json, clazz);
    }

    /**
     * 反序列化(駝峰轉(zhuǎn)換)
     * @param json
     * @param clazz
     * @param <T>
     * @return
     * @throws IOException
     */
    public static <T> T getByJsonSNAKE(String json, Class<T> clazz) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        // 在反序列化時(shí)忽略在 json 中存在但 Java 對象不存在的屬性
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        // 在序列化時(shí)日期格式默認(rèn)為 yyyy-MM-dd'T'HH:mm:ss.SSSZ
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        // 設(shè)置駝峰和下劃線之間的映射
        mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
        return mapper.readValue(json, clazz);
    }


    /**
     * 序列化
     * @param object
     * @return
     * @throws JsonProcessingException
     */
    public static String serialize(Object object) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();

        return mapper.writeValueAsString(object);
    }


    /**
     * 序列化(駝峰轉(zhuǎn)換)
     * @param object
     * @return
     * @throws JsonProcessingException
     */
    public static String serializeSNAKE(Object object) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        // 設(shè)置駝峰和下劃線之間的映射
        mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
        return mapper.writeValueAsString(object);
    }


    public static JsonNode getTreeNode(String json) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();

        return objectMapper.readTree(json);
    }


    /**
     * 將對象轉(zhuǎn)map
     * @param obj
     * @return
     * @throws IOException
     */
    public static Map<String,Object> convertToMap(Object obj) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        return mapper.readValue(serialize(obj),Map.class);
    }
}


如果您發(fā)現(xiàn)錯誤,還望及時(shí)提醒,共同進(jìn)步

后面還有代理訂閱,保留消息,認(rèn)證,ACL,這里暫時(shí)還沒有用到所以就不在記錄, 用到的話再補(bǔ)充文章來源地址http://www.zghlxwxcb.cn/news/detail-672748.html

到了這里,關(guān)于EMQ的介紹及整合SpringBoot的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • emq集群配置nginx做負(fù)載均衡

    emq集群配置nginx做負(fù)載均衡 創(chuàng)建 EMQ X 節(jié)點(diǎn)集群 emqx 集群搭建 例如: 節(jié)點(diǎn) IP 地址 emqx@192.168.1.17 192.168.1.17 emqx@192.168.1.18 192.168.1.18 emqx@192.168.1.19 192.168.1.19 配置 /etc/nginx/nginx.conf mqtt集群搭建并使用nginx做負(fù)載均衡_親測得結(jié)論 示例: 參考鏈接: https://docs.emqx.cn/enterprise/v4.3/tutorial/dep

    2024年02月05日
    瀏覽(18)
  • EMQ X(3):客戶端websocket消息收發(fā)

    EMQ X(3):客戶端websocket消息收發(fā)

    在EMQ X Broker提供的 Dashboard 中 TOOLS 導(dǎo)航下的 Websocket 頁面提供了一個(gè)簡易但有效的WebSocket 客戶端工具,它包含了連接、訂閱和發(fā)布功能,同時(shí)還能查看自己發(fā)送和接收的報(bào)文數(shù)據(jù),我們期望 它可以幫助您快速地完成某些場景或功能的測試驗(yàn)證: MQTT是為了物聯(lián)網(wǎng)場景設(shè)計(jì)的基

    2024年02月13日
    瀏覽(23)
  • EMQ X如何生成認(rèn)證信息?身份認(rèn)證流程和操作步驟

    EMQ X如何生成認(rèn)證信息?身份認(rèn)證流程和操作步驟

    身份認(rèn)證是大多數(shù)應(yīng)用的重要組成部分,EMQ X 中的認(rèn)證指的是當(dāng)一個(gè)客戶端連接到 EMQ X 的時(shí)候,通過服務(wù)器端的配置來控制客戶端連接服務(wù)器的權(quán)限。 EMQ X 的認(rèn)證支持包括兩個(gè)層面: ? MQTT 協(xié)議本身在 CONNECT 報(bào)文中指定用戶名和密碼,EMQ X 以插件形式支持基于 Username、 Cl

    2024年02月11日
    瀏覽(21)
  • EMQ & 明道云:零代碼高效構(gòu)建工業(yè)物聯(lián)網(wǎng)設(shè)備管理平臺

    EMQ & 明道云:零代碼高效構(gòu)建工業(yè)物聯(lián)網(wǎng)設(shè)備管理平臺

    智能物聯(lián)網(wǎng)設(shè)備在 IIoT 場景中有著廣泛的應(yīng)用,但如何管理和監(jiān)控這些設(shè)備是一個(gè)挑戰(zhàn)。 明道云是一家專業(yè)的 hpaPaaS 平臺服務(wù)商,其所開發(fā)的 明道云平臺 (Mingdao Cloud)是一個(gè)企業(yè)軟件設(shè)計(jì)和開發(fā)工具,讓企業(yè)可以低代碼或無代碼搭建個(gè)性化的 CRM、ERP、OA、項(xiàng)目管理、進(jìn)銷存

    2024年02月11日
    瀏覽(17)
  • 玩兒轉(zhuǎn)EMQ X:SSL篇之認(rèn)證證書及單片機(jī)實(shí)現(xiàn)

    玩兒轉(zhuǎn)EMQ X:SSL篇之認(rèn)證證書及單片機(jī)實(shí)現(xiàn) 在使用EMQ X搭建MQTT Broker時(shí),通常需要開啟SSL加密以保障通信安全。而為了確保SSL的安全性,我們需要使用認(rèn)證證書進(jìn)行雙向身份驗(yàn)證。本文將介紹如何在EMQ X中配置SSL認(rèn)證證書,并給出如何在單片機(jī)上實(shí)現(xiàn)MQTT SSL連接的示例代碼。 一

    2024年02月07日
    瀏覽(15)
  • EMQ x 阿里云:云上高效構(gòu)建,IoT 數(shù)據(jù)一站處理|直播預(yù)告

    隨著物聯(lián)網(wǎng)與云計(jì)算的發(fā)展,進(jìn)入云時(shí)代以來,各企業(yè)的數(shù)字化轉(zhuǎn)型也紛紛「云」化。在云上構(gòu)建可彈性伸縮、自動化管理、承載海量物聯(lián)網(wǎng)設(shè)備連接的數(shù)據(jù)中心,從而實(shí)現(xiàn)企業(yè)的降本增效,成為大勢所趨。 為了幫助企業(yè)應(yīng)對在云上構(gòu)建物聯(lián)網(wǎng)應(yīng)用過程中所面臨的協(xié)議選擇困

    2024年02月11日
    瀏覽(19)
  • 13、MongoDB--通過 SpringBoot 整合 Spring Data MongoDB(【連接多個(gè) MongoDB 服務(wù)器】、【高級定制 MongoDB 客戶端】的簡單介紹)

    放棄 Spring Boot 為 MongeDB 提供的自動配置,接下來同樣要干如下事情: 手動配置多組 ReactiveMongoDatabaseFactory 和 ReactiveMongoTemplate,要連幾個(gè) MongoDB 服務(wù)器就配置幾組。 同步 API 則使用 MongoDatabaseFactory 和 MongoTemplate。 針對不同 MongoDB 服務(wù)器,分別開發(fā)相應(yīng)的 DAO 組件類,建議將它

    2024年03月19日
    瀏覽(25)
  • 【微服務(wù)】springboot整合quartz使用詳解

    目錄 一、前言 二、quartz介紹 2.1 quartz概述 2.2 quartz優(yōu)缺點(diǎn) 2.3 quartz核

    2024年02月05日
    瀏覽(18)
  • 【微服務(wù)】springboot整合skywalking使用詳解

    目錄 一、前言 二、SkyWalking介紹 2.1 SkyWalking是什么 2.2 SkyWalking核心功能 2.3 SkyWalking整體架構(gòu)

    2024年02月03日
    瀏覽(21)
  • 【微服務(wù)】springboot整合mongodb使用詳解

    目錄 一、mongodb簡介 1.1 什么是mongodb 1.2 mongodb特點(diǎn) 二、mongodb中的核心術(shù)語 2.1 mogodb與數(shù)據(jù)庫對比

    2024年02月15日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包