首先先了解一下底層的協(xié)議:
1. MQTT
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱 (publish/subscribe)模式的"輕量級"通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。 MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作 為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng) 用。
由于物聯(lián)網(wǎng)的環(huán)境是非常特別的,所以MQTT遵循以下設(shè)計(jì)原則:
(1)精簡,不添加可有可無的功能;
(2)發(fā)布/訂閱(Pub/Sub)模式,方便消息在傳感器之間傳遞,解耦Client/Server模式,帶來的 好處在于不必預(yù)先知道對方的存在(ip/port),不必同時(shí)運(yùn)行;
(3)允許用戶動態(tài)創(chuàng)建主題(不需要預(yù)先創(chuàng)建主題),零運(yùn)維成本;
(4)把傳輸量降到最低以提高傳輸效率;
(5)把低帶寬、高延遲、不穩(wěn)定的網(wǎng)絡(luò)等因素考慮在內(nèi);
(6)支持連續(xù)的會話保持和控制(心跳);
(7)理解客戶端計(jì)算能力可能很低;
(8)提供服務(wù)質(zhì)量( quality of service level:QoS)管理
(9)不強(qiáng)求傳輸數(shù)據(jù)的類型與格式,保持靈活性(指的是應(yīng)用層業(yè)務(wù)數(shù)據(jù))
2. MQTT協(xié)議實(shí)現(xiàn)方式:
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:
(1)Topic, 可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內(nèi)容 (payload);
(2)payload, 可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。
3. Qos: 消息服務(wù)質(zhì)量(Quality of Service)
MQTT 設(shè)計(jì)了 3 個(gè) QoS 等級。
理解: Qos: 規(guī)定自己想要發(fā)出,或者接收到的消息的規(guī)則
- QoS 0:消息最多傳遞一次,如果當(dāng)時(shí)客戶端不可用,則會丟失該消息。
“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。
這一級別可用于如下情況
,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因?yàn)椴痪煤筮€會有第二次發(fā)送。這一種方 式主要普通APP的推送,倘若你的智能設(shè)備在消息推送時(shí)未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就 收不到了。
- QoS 1:消息傳遞至少 1 次。
“至少一次”,確保消息到達(dá),但消息重復(fù)可能會發(fā)生
- QoS 2:消息僅傳送一次。
“只有一次”,確保消息到達(dá)一次。在一些要求比較嚴(yán)格的計(jì)費(fèi)系統(tǒng)中,可以使用此級別。在計(jì)費(fèi) 系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。這種最高質(zhì)量的消息發(fā)布服務(wù)還可以用于即時(shí)通訊類的 APP的推送,確保用戶收到且只會收到一次
MQTT 發(fā)布與訂閱操作中的 QoS 代表了不同的含義:
發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS
訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS。
基本都是用QoS2
MQTT 發(fā)布與訂閱操作中的 QoS 代表了不同的含義,發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS,訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS。
- 當(dāng)客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 B 的訂閱 QoS。
- 當(dāng)客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 A 的發(fā)布 QoS。
總結(jié): 也就是說QoS這個(gè)東西的設(shè)置是對消息的接收來做保證的,即如果是
QoS 1 我只負(fù)責(zé)發(fā)一次,收得到收不到我不管. 當(dāng)網(wǎng)絡(luò)狀態(tài)不穩(wěn)定的時(shí)候就會出現(xiàn)丟失現(xiàn)象
QoS 2 是能夠保證至少收到一次,但是存在重復(fù)消費(fèi)的問題.
QoS 3 保證只有一條信息到達(dá)
* 如果發(fā)布和訂閱的客戶端服務(wù)質(zhì)量等級不相同時(shí),誰的低按誰的為準(zhǔn)
4. Topic通配符匹配規(guī)則
- 層級分隔符 /
/ 用來分割主題樹的每一層,并給主題空間提供分等級的結(jié)構(gòu)。當(dāng)兩個(gè)通配符在一個(gè)主題中出現(xiàn)的 時(shí)候,主題層次分隔符的使用是很重要的。
示例:
love/you/with/all/my/heart
- 多層通配符 #
多層通配符有可以表示大于等于0的層次。因此,love/#也可匹配到單獨(dú)的love,此時(shí)#代表0 層。
多層通配符一定要是主題樹的最后一個(gè)字符。比如說,love/#是有效的,但是love/#/with是無效 的。
- 單層通配符 +
只匹配主題的一層
1. love/you/+ :匹配love/you/with和love/you/and,但是不匹配
love/you/with/all/my/heart。
2. 單層通配符只匹配1層,love/+不匹配love。
3. 單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和love/+都是有效的,但是love+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,love/+和love/+/with都是有效。
通配符注意事項(xiàng):
1.主題層次分隔符被用來在主題中引入層次。多層的通配符和單層通配符可以被使用,但他們不能被使用來做發(fā)布者的消息。
2.Topic命名盡量見名知意,符合規(guī)范,主題名字是大小寫敏感的。比如說,love和LOVE是兩個(gè)不同的主題。
3.以/開頭會產(chǎn)生一個(gè)不同的主題。比如說,/love與love不同。/love匹配"+/+"和/+,但不匹配+
4.不要在任何主題中包含null(Unicode \x0000)字符。
5.在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
6.可以有任意數(shù)目的根節(jié)點(diǎn);也就是說,可以有任意數(shù)目的主題樹。
1.EMQ X
EMQ X 是開源社區(qū)中最流行的 MQTT 消息服務(wù)器。
Windows安裝emqx_windows emqx安裝_羅小爬EX的博客-CSDN博客
優(yōu)點(diǎn):
單機(jī)能支持百萬的 MQTT 連接;集群能支持千 萬級別的 MQTT 連接;
支持豐富的物聯(lián)網(wǎng)協(xié)議,包括 MQTT、MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等;
2. Dashboard(可視化界面)
EMQ X 提供了 Dashboard 以方便用戶管理設(shè)備與監(jiān)控相關(guān)指標(biāo)。通過 Dashboard可以查看服務(wù)器基本 信息、負(fù)載情況和統(tǒng)計(jì)數(shù)據(jù),可以查看某個(gè)客戶端的連接狀態(tài)等信息甚至斷開其連接,也可以動態(tài)加載 和卸載指定插件。除此之外,EMQ X Dashboard 還提供了規(guī)則引擎的可視化操作界面,同時(shí)集成了一 個(gè)簡易的 MQTT 客戶端工具供用戶測試使用。
3. MQTTX
模擬客戶端
MQTTX:跨平臺 MQTT 5.0 桌面客戶端工具
MQTT X 是 EMQ 開源的一款優(yōu)雅的跨平臺 MQTT 5.0 桌面客戶端,它支持 macOS, Linux, Windows。 MQTT X 的 UI 采用了聊天界面形式,簡化了頁面操作邏輯,用戶可以快速創(chuàng)建連接,允許保存多個(gè)客 戶端,方便用戶快速測試 MQTT/MQTTS 連接,及 MQTT 消息的訂閱和發(fā)布。
發(fā)送消息
這里一定要記住是在哪方加前綴
1. 消息延遲發(fā)布
此功能由 emqx_mod_delayed 模塊提供,需要開啟模塊后才能使用此功能。
$delayed/{DelayInteval}/{TopicName} 單位: S
當(dāng)客戶端使 用特殊主題前綴 $delayed/{DelayInteval} 發(fā)布消息到 EMQ X 時(shí),將觸發(fā)延遲發(fā)布功能
示例:
$delayed/15/x/y : 15 秒后將 MQTT 消息發(fā)布到主題 x/y 。
$delayed/60/a/b : 1 分鐘后將 MQTT 消息發(fā)布到 a/b
示例: 在MQTT X上演示

現(xiàn)在在MQTTX
上模擬四個(gè)客戶端: 洗衣機(jī)
,空調(diào)
,電視機(jī)
,手機(jī)
現(xiàn)在讓洗衣機(jī)
,空調(diào)
,電視機(jī)
訂閱phoneMessage
這個(gè)主題, 消息服務(wù)質(zhì)量設(shè)置為2
,以后都設(shè)置為2
各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:
洗衣機(jī)客戶端訂閱主題: phoneMessage
空調(diào)客戶端訂閱主題: phoneMessage
電視機(jī)客戶端訂閱主題: phoneMessage
手機(jī)發(fā)送的主題: $delayed/2/phoneMessage // 2 秒后發(fā)送到訂閱phoneMessage的客戶端

在手機(jī)客戶端上發(fā)送消息: 注意此時(shí)特殊主題的前綴在發(fā)布者上

兩秒后,其他訂閱這個(gè)主題的客戶端都收到了消息.

2. 共享訂閱
注意注意注意: ??
共享訂閱的主題格式是針對訂閱端來指定的,例如: $share/group1/cookie ;而消息的發(fā)布方是向主 題: cookie 發(fā)布消息。這樣在訂閱方才能達(dá)到負(fù)載均衡的效果。
共享訂閱是在多個(gè)訂閱者之間實(shí)現(xiàn)負(fù)載均衡的訂閱方式:
EMQ X 支持兩種格式的共享訂閱前綴:
示例 | 前綴 | 真實(shí)主題名 | |
---|---|---|---|
方式一(不帶群組的共享訂閱) | $queue/name/cookie | $queue/ | name/cookie |
方式二(帶群組的共享訂閱) | $share/group1/cookie | $share/group1/ | cookie |
應(yīng)用場景:
1?? . 對于方式一不帶群組的共享訂閱:
在我們?nèi)″X完成時(shí),我們要求需要向當(dāng)前用戶的手機(jī)發(fā)送一條短信, 為了提高服務(wù)的容錯性,我們準(zhǔn)備了多臺發(fā)短信的服務(wù), 但是我們要求只發(fā)送一條短信,此時(shí)我們就可以使用 方式一, 使只有一臺機(jī)器收到發(fā)送短信的消息如下, 只會有一臺機(jī)器收到
各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:
消息服務(wù)實(shí)例1訂閱的主題: $queue/cookie
消息服務(wù)實(shí)例2訂閱的主題: $queue/cookie
消息服務(wù)實(shí)例3訂閱的主題: $queue/cookie
發(fā)布者發(fā)布的主題: cookie
2?? 對于方式二帶群組的共享訂閱:
$share/group1/cookie
帶分組的使用的特殊前綴是: $share/{group}/{TopicName}
使用場景如下:
在方式一的上面做一點(diǎn)點(diǎn)更改, 就是在發(fā)布者消息發(fā)送成功后, 不僅僅需要有一臺消息服務(wù)實(shí)例去發(fā)送消息, 也需要有一臺郵件服務(wù)實(shí)例去發(fā)送郵件. 所以這里就可以進(jìn)行分組
message 組 : 消息服務(wù)實(shí)例1 消息服務(wù)實(shí)例2 消息服務(wù)實(shí)例2
email組: 郵件服務(wù)實(shí)例1 郵件服務(wù)實(shí)例2
各個(gè)實(shí)例訂閱的主題和發(fā)送的主題如下:
(message組)
消息服務(wù)實(shí)例1 $share/message/cookie
消息服務(wù)實(shí)例2 $share/message/cookie
消息服務(wù)實(shí)例2 $share/message/cookie
(email組)
郵件服務(wù)實(shí)例1 $share/email/cookie
郵件服務(wù)實(shí)例2 $share/email/cookie
發(fā)布者實(shí)例 cookie
此時(shí)就可以保證兩個(gè)組中都會有一個(gè)實(shí)例服務(wù)收到消息.
負(fù)載均衡策略修改默認(rèn) 隨機(jī)
在EMQ X 服務(wù)的 etc/emqx.conf中修改
broker.shared_subscription_strategy = random
負(fù)載均衡策略 | 描述 |
---|---|
random | 在所有訂閱中隨機(jī)選擇 |
round_robin | 按照訂閱順序輪詢 |
sticky | 一直發(fā)往上次選取的訂閱者 |
hash | 按照發(fā)布者ClientID的哈希值 |
原生Mqtt代碼的使用??
1.依賴導(dǎo)入
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
2. 編寫Controller類
說明下面的一些配置也可以配置在yml文件中,這里只是演示(消息的發(fā)送和接受)所以就直接配置通過Set方法配置了
可以通過
@ConfigurationProperties(prefix = "")
寫一個(gè)配置類這里的server.port= 8888
@RestController
public class TestController {
/**
* 發(fā)布消息
* @throws MqttException
*/
@GetMapping("/publish")
public void publish() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence(); //內(nèi)存持久化
// 服務(wù)器地址以及本機(jī)的clientid
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
//連接選項(xiàng)中定義用戶名密碼和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//參數(shù)為true表示清除緩存,也就是非持久化訂閱者,這個(gè)時(shí)候只要參數(shù)設(shè)為true,一定是非持久化訂閱者。而參數(shù)設(shè)為false時(shí),表示服務(wù)器保留客戶端的連接記錄
options.setAutomaticReconnect(true);//是否自動重連
options.setConnectionTimeout(30);//連接超時(shí)時(shí)間 秒
options.setKeepAliveInterval(10);//連接保持檢查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
client.connect(options);//連接
client.publish("testTopic", "發(fā)送內(nèi)容".getBytes(), 2, false);
}
/**
* 訂閱消息
* @throws MqttException
*/
@GetMapping("/subscribe")
public void subscribe() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//內(nèi)存持久化
// 服務(wù)器地址以及本機(jī)的clientid
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "xyz", persistence);
//連接選項(xiàng)中定義用戶名密碼和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//參數(shù)為true表示清除緩存,也就是非持久化訂閱者,這個(gè)時(shí)候只要參數(shù)設(shè)為true,一定是非持久化訂閱者。而參數(shù)設(shè)為false時(shí),表示服務(wù)器保留客戶端的連接記錄
options.setAutomaticReconnect(true);//是否自動重連
options.setConnectionTimeout(30);//連接超時(shí)時(shí)間 秒
options.setKeepAliveInterval(10);//連接保持檢查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
// 設(shè)置回調(diào)函數(shù) 匿名內(nèi)部類
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("連接丟失!");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println( "接收到消息 topic:" +s+" id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("連接成功!");
}
});
client.connect(options);//連接
client.subscribe("testTopic"); //訂閱主題
}
}
訪問: http://localhost:8888/publish
訪問: http://localhost:8888/subscribe
第一次訪問兩個(gè)地址,首先會先EMQ注冊自己,和EMQ建立連接(但是一定要快,如果到了10秒任務(wù)你就掛了)
等到第二次訪問,已經(jīng)建立過,所以就可以發(fā)送消息,和接收消息,此時(shí)再看控制臺.

OK,原始的方法建立完畢
現(xiàn)在可以設(shè)想,我們在一個(gè)項(xiàng)目中一般不會只發(fā)一次請求, 通過上面的代碼我們可以發(fā)現(xiàn)每一次的發(fā)送其實(shí)有絕大部分內(nèi)容是相同的.對于發(fā)送者來說我們要變的只有主題和消息
而對于接收者來說需要根據(jù)不同的主題來設(shè)計(jì)不同的處理方案, 即根據(jù)不同的主題調(diào)用不同的方案來解決(是不是有一點(diǎn)多態(tài)的意思了), 那么有沒有一種解決方案來解決這樣的問題. bingo~ 必然.
首先先認(rèn)識一個(gè)設(shè)計(jì)模式: 策略模式 (解決多個(gè) if-else 問題)
策略模式詳解 - 知乎 (zhihu.com)
在策略模式(Strategy Pattern)中,一個(gè)類的行為或其算法可以在運(yùn)行時(shí)更改。我們要處理的就是上面高亮的部分
在策略模式中,我們創(chuàng)建表示各種策略的對象和一個(gè)行為隨著策略對象改變而改變的 context 對象。策略對象改變 context 對象的執(zhí)行算法。
大家可以看上面知乎中的鏈接的內(nèi)容; 理解一下在支付時(shí)的策略模式的使用,然后在看我們自己的實(shí)現(xiàn).
使用策略模式改造
注:
一般的策略模式大概是這樣:
- 定義策略接口
- 定義不同策略實(shí)現(xiàn)類
- 提供策略工廠,便于根據(jù)策略枚舉獲取不同策略實(shí)現(xiàn)
而在策略比較簡單的情況下,我們完全可以用枚舉代替策略工廠,簡化策略模式。
1. 配置文件
server:
port: 8082
mqtt:
client:
username: admin
password: public
serverURI: tcp://192.168.200.128:1883
clientId: monitor.task.${random.int[1000,9999]} # 注意: emq的客戶端id 不能重復(fù)
keepAliveInterval: 10 #連接保持檢查周期 秒
connectionTimeout: 30 #連接超時(shí)時(shí)間 秒
producer:
defaultQos: 2
defaultRetained: false
defaultTopic: topic/test1
consumer:
consumerTopics: $queue/cookie/#, $share/group1/yfs1024 #不帶群組的共享訂閱 多個(gè)主題逗號隔開
# $queue/cookie/#
# 以$queue開頭,不帶群組的共享訂閱 多個(gè)客戶端只能有一個(gè)消費(fèi)者消費(fèi)
# $share/group1/yfs1024
# 以$share開頭,群組的共享訂閱 多個(gè)客戶端訂閱
# 如果在一個(gè)組 只能有一個(gè)消費(fèi)者消費(fèi)
# 如果不在一個(gè)組 都可以消費(fèi)
對應(yīng)配置類
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "mqtt.client")
public class MqttProperties {
private int defaultProducerQos;
private boolean defaultRetained;
private String defaultTopic;
private String username;
private String password;
private String serverURI;
private String clientId;
private int keepAliveInterval;
private int connectionTimeout;
}
2. 封裝消息發(fā)送者
下面通過方法重載的方式, 接收不同個(gè)數(shù)的參數(shù)
@Component
@Slf4j
public class MqttProducer {
// @Value() 讀取配置 當(dāng)然也可以批量讀取配置,這里就一個(gè)一個(gè)了
@Value("${mqtt.producer.defaultQos}")
private int defaultProducerQos;
@Value("${mqtt.producer.defaultRetained}")
private boolean defaultRetained;
@Value("${mqtt.producer.defaultTopic}")
private String defaultTopic;
@Autowired
private MqttClient mqttClient;
public void send(String payload) {
this.send(defaultTopic, payload);
}
public void send(String topic, String payload) {
this.send(topic, defaultProducerQos, payload);
}
public void send(String topic, int qos, String payload) {
this.send(topic, qos, defaultRetained, payload);
}
public void send(String topic, int qos, boolean retained, String payload) {
try {
mqttClient.publish(topic, payload.getBytes(), qos, retained);
} catch (MqttException e) {
log.error("publish msg error.",e);
}
}
public <T> void send(String topic, int qos, T msg) throws JsonProcessingException {
String payload = JsonUtil.serialize(msg);
this.send(topic,qos,payload);
}
}
3. 定義配置類
說明一下,一般對于一臺服務(wù)來說發(fā)送者和接受者使用一個(gè)mqtt連接,所以這里配置了一個(gè)mqttClient
@Configuration
@Data
@Slf4j
public class MqttConfig {
// 注入配置
@Autowired
private MqttProperties mqtt;
// 注入回調(diào)函數(shù)
@Autowired
private MqttCallback mqttCallback;
@Bean
public MqttClient mqttClient() {
try {
MqttClient client = new MqttClient(mqtt.getServerURI(), mqtt.getClientId(), new MemoryPersistence());
client.setManualAcks(true); //設(shè)置手動消息接收確認(rèn)
mqttCallback.setMqttClient(client);
client.setCallback(mqttCallback);
client.connect(mqttConnectOptions());
return client;
} catch (MqttException e) {
log.error("emq connect error",e);
return null;
}
}
@Bean
public MqttConnectOptions mqttConnectOptions() {
// 下面有一些配置是寫死的, 如果項(xiàng)目需要最好還是寫配置文件中, 這樣后面可以通過注冊中心熱更新配置文件
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqtt.getUsername());
options.setPassword(mqtt.getPassword().toCharArray());
options.setAutomaticReconnect(true);//是否自動重新連接
options.setCleanSession(true);//是否清除之前的連接信息
options.setConnectionTimeout(mqtt.getConnectionTimeout());//連接超時(shí)時(shí)間
options.setKeepAliveInterval(mqtt.getKeepAliveInterval());//心跳
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//設(shè)置mqtt版本
return options;
}
}
簡單測試一下:
這里發(fā)送一個(gè)對象
@Data
public class User {
private String id;
private String name;
private Integer age;
}
@RestController
public class MyPublish {
@Autowired
private MqttProducer mqttProducer;
@GetMapping("/testPublish")
public void testSend(){
User user = new User();
user.setId("123");
user.setName("張三");
user.setAge(18);
mqttProducer.send("cookie",2,user);
}
}
訪問:
localhost:9999/testPublish
OK,現(xiàn)在消息的發(fā)送是已經(jīng)完成了
我們在Dashboard中訂閱這個(gè)主題并查看結(jié)果:
發(fā)送消息已經(jīng)沒有問題了, 注入MqttProducer
調(diào)用send方法就可以了, 下面就來解決消息接收的問題.
還記得上面說的嘛, 對于接收來說,主要的瓶頸在于根據(jù)不同的主題來處理不同的消息, 比如訂閱了cookie
,和yfs1024
這兩個(gè)主題可以通過如下代碼實(shí)現(xiàn):
在接收到的參數(shù)中通過判斷來確定處理的方法:
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
if(topic.equals("cookie")){
// cookie 主題的處理邏輯
}else if(topic.equals("yfs1024")){
// yfs1024 主題的處理邏輯
}else if(xxx){ // 其他的主題
.........
}
}
這種方法可以沒有問題, 但是每次都需要通過修改這里的代碼來處理業(yè)務(wù)邏輯, 就像上面知乎文章所說的雖然寫起來簡單, 但是違反了面向?qū)ο蟮膬蓚€(gè)設(shè)計(jì)原則, 單一職責(zé)原則
,開閉原則
。
好那么下面就按照策略模式來對接收消息的代碼進(jìn)行改造。
4. 改造接收代碼
1. 定義消息處理接口
大致的邏輯: 因?yàn)槭遣煌闹黝}擁有不同的處理邏輯, 即
一個(gè)主題對應(yīng)一個(gè)處理類
, 我們要做的就是通過主題拿到這個(gè)主題的處理類
/**
* 消息處理接口
*/
public interface MsgHandler{
void process(String jsonMsg) throws IOException;
}
后面讓每個(gè)主題的處理類來實(shí)現(xiàn)這個(gè)接口就可以
@Component
public class CookieHandler implements MsgHandler {
@Override
public void process(String jsonMsg) throws IOException {
// 解析 這里使用了自己封裝JSON 工具類, 我放最下面了
User byJson = JsonUtil.getByJson(jsonMsg, User.class);
System.out.println("CookieHandler 接受到消息:" + byJson);
}
}
我們還需要根據(jù)主題使用不同的處理器, 怎么辦呢? 可以通過注解中的參數(shù)的值區(qū)分(通過反射獲取)
2. 定義主題注解
用來表示當(dāng)前類所處理的注解.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Topic {
String value();
}
對上面進(jìn)行改造如下:
@Component
@Topic("cookie")
public class CookieHandler implements MsgHandler {
@Override
public void process(String jsonMsg) throws IOException {
// 解析
User byJson = JsonUtil.getByJson(jsonMsg, User.class);
System.out.println("CookieHandler 接受到消息:" + byJson);
}
}
3. 將主題與對應(yīng)處理類建立映射
現(xiàn)在處理主題的方法類有了, 對每個(gè)處理類也做了對應(yīng)主題的標(biāo)記
, 那么如果把他們對應(yīng)起來呢? 建立映射, 代碼實(shí)現(xiàn)如下:
定義接口
/**
* 消息處理上下文, 通過主題拿到topic
*/
public interface MsgHandlerContext{
MsgHandler getMsgHandler(String topic);
}
/**
* 消息處理類加載器
* 作用:
* 1. 因?yàn)閷?shí)現(xiàn)了Spring 的 ApplicationContextAware 接口, 項(xiàng)目啟動后就會運(yùn)行實(shí)現(xiàn)的方法
* 2. 獲取MsgHandler接口的所有的實(shí)現(xiàn)類
* 3. 將實(shí)現(xiàn)類上的Topic注解的值,作為handlerMap的鍵,實(shí)現(xiàn)類(處理器)作為對應(yīng)的值
*/
@Component
public class MsgHandlerContextImp implements ApplicationContextAware, MsgHandlerContext {
private Map<String, MsgHandler> handlerMap = Maps.newHashMap();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 從spring容器中獲取 <所有> 實(shí)現(xiàn)了MsgHandler接口的對象
// key 默認(rèn)類名首字母小寫 value 當(dāng)前對象
Map<String, MsgHandler> map = applicationContext.getBeansOfType(MsgHandler.class);
map.values().forEach(obj > {
// 通過反射拿到注解中的值 即 當(dāng)前類處理的 topic
String topic = obj.getClass().getAnnotation(Topic.class).value();
// 將主題和當(dāng)前主題的處理類建立映射
handlerMap.put(topic,obj);
});
}
@Override
public MsgHandler getMsgHandler(String topic) {
return handlerMap.get(topic);
}
}
OK, 現(xiàn)在主題和對應(yīng)處理類的問題已經(jīng)完成了. 那么下一步就是怎么在接收消息的回調(diào)函數(shù)中根據(jù)接收到消息的主題,調(diào)用處理類消費(fèi)消息.
其實(shí)我們在上面定義配置的時(shí)候已經(jīng)導(dǎo)入了方法回電函數(shù)的實(shí)現(xiàn)
// 注入回調(diào)函數(shù)
@Autowired
private MqttCallback mqttCallback;
在沒定義之前考慮一下我們要做什么呢?
- 在連接成功 之后訂閱所有主題
- 處理回調(diào)函數(shù),根據(jù)主題獲取處理器,處理消息內(nèi)容
// 回調(diào)函數(shù)接口的實(shí)現(xiàn)類, 重寫連接丟失,建立連接,
@Component
@Slf4j
public class MqttCallback implements MqttCallbackExtended {
// 需要訂閱的topic配置
@Value("${mqtt.consumer.consumerTopics}")
private List<String> consumerTopics;
@Autowired
private MsgHandlerContext msgHandlerContext;
@Override
public void connectionLost(Throwable throwable) {
log.error("emq error.", throwable);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("topic:" + topic + " message:" + new String(message.getPayload()));
//處理消息
String msgContent = new String(message.getPayload());
log.info("接收到消息:" + msgContent);
try {
// 根據(jù)主題名稱 獲取 該主題對應(yīng)的處理器對象
// 多態(tài) 父類的引用指向子類的對象
MsgHandler msgHandler = msgHandlerContext.getMsgHandler(topic);
if (msgHandler == null) {
return;
}
msgHandler.process(msgContent); //執(zhí)行
} catch (IOException e) {
log.error("process msg error,msg is: " + msgContent, e);
}
// mqttService.processMessage(topic, message);
//處理成功后確認(rèn)消息
mqttClient.messageArrivedComplete(message.getId(), message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
log.info("連接成功");
//和EMQ連接成功后根據(jù)配置自動訂閱topic
if (consumerTopics != null && consumerTopics.size() > 0) {
// 循環(huán)遍歷當(dāng)前項(xiàng)目中配置的所有的主題.
consumerTopics.forEach(t -> {
try {
log.info(">>>>>>>>>>>>>>subscribe topic:" + t);
// 訂閱當(dāng)前集群中所有的主題 消息服務(wù)質(zhì)量 2 -> 至少收到一個(gè)
mqttClient.subscribe(t, 2);
} catch (MqttException e) {
log.error("emq connect error", e);
}
});
}
}
private MqttClient mqttClient;
// 在配置類中調(diào)用傳入連接
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
}
5. 測試:
使用步驟:
自定義處理類 通過實(shí)現(xiàn)MsgHandler
接口
@Component
@Topic("cookie")
public class CookieHandler implements MsgHandler {
@Override
public void process(String jsonMsg) throws IOException {
// 解析 這里使用了自己封裝JSON 工具類, 我放最下面了
User byJson = JsonUtil.getByJson(jsonMsg, User.class);
System.out.println("CookieHandler 接受到消息:" + byJson);
}
}
此時(shí)要注意配置文件中有沒有對應(yīng)的主題, 我的這里有,采用不帶群組共享訂閱
consumerTopics: $queue/cookie/#, $share/group1/yfs1024
控制臺有
可以接收到數(shù)據(jù),并且在啟動兩個(gè)示例的時(shí)候也能實(shí)現(xiàn)共享訂閱, 隨機(jī)選擇
這里注意哈, 如果是測試的話, 訂閱者一定要帶上前綴$queue
或$share/groupName
JsonUtil
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import java.io.IOException;
import java.util.Map;
public class JsonUtil {
/**
* 從json字符串中根據(jù)nodeName獲取值
* @param nodeName
* @param json
* @return
* @throws IOException
*/
public static String getValueByNodeName(String nodeName, String json) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(json);
JsonNode node = jsonNode.findPath(nodeName);
if(node == null) return null;
return node.asText();
}
/**
* 根據(jù)nodeName獲取節(jié)點(diǎn)內(nèi)容
* @param nodeName
* @param json
* @return
* @throws IOException
*/
public static JsonNode getNodeByName(String nodeName, String json) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readTree(json).findPath(nodeName);
}
/**
* 反序列化
* @param json
* @param clazz
* @param <T>
* @return
* @throws IOException
*/
public static <T> T getByJson(String json, Class<T> clazz) throws IOException {
ObjectMapper mapper = new ObjectMapper();
// 在反序列化時(shí)忽略在 json 中存在但 Java 對象不存在的屬性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 在序列化時(shí)日期格式默認(rèn)為 yyyy-MM-dd'T'HH:mm:ss.SSSZ
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return mapper.readValue(json, clazz);
}
/**
* 反序列化(駝峰轉(zhuǎn)換)
* @param json
* @param clazz
* @param <T>
* @return
* @throws IOException
*/
public static <T> T getByJsonSNAKE(String json, Class<T> clazz) throws IOException {
ObjectMapper mapper = new ObjectMapper();
// 在反序列化時(shí)忽略在 json 中存在但 Java 對象不存在的屬性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 在序列化時(shí)日期格式默認(rèn)為 yyyy-MM-dd'T'HH:mm:ss.SSSZ
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
// 設(shè)置駝峰和下劃線之間的映射
mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
return mapper.readValue(json, clazz);
}
/**
* 序列化
* @param object
* @return
* @throws JsonProcessingException
*/
public static String serialize(Object object) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(object);
}
/**
* 序列化(駝峰轉(zhuǎn)換)
* @param object
* @return
* @throws JsonProcessingException
*/
public static String serializeSNAKE(Object object) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
// 設(shè)置駝峰和下劃線之間的映射
mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
return mapper.writeValueAsString(object);
}
public static JsonNode getTreeNode(String json) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readTree(json);
}
/**
* 將對象轉(zhuǎn)map
* @param obj
* @return
* @throws IOException
*/
public static Map<String,Object> convertToMap(Object obj) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(serialize(obj),Map.class);
}
}
如果您發(fā)現(xiàn)錯誤,還望及時(shí)提醒,共同進(jìn)步文章來源:http://www.zghlxwxcb.cn/news/detail-672748.html
后面還有代理訂閱,保留消息,認(rèn)證,ACL,這里暫時(shí)還沒有用到所以就不在記錄, 用到的話再補(bǔ)充文章來源地址http://www.zghlxwxcb.cn/news/detail-672748.html
到了這里,關(guān)于EMQ的介紹及整合SpringBoot的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!