什么是MQTT
簡介
MQTT(英文全稱Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議)。
MQTT是一種輕量級的協(xié)議,適用于需要較小代碼占用空間或網(wǎng)絡(luò)帶寬非常寶貴的遠程連接,是專為受限設(shè)備和低帶寬、高延遲或不可靠的網(wǎng)絡(luò)而設(shè)計。這些原則也使該協(xié)議成為新興的“機器到機器”(M2M)或物聯(lián)網(wǎng)(IoT)世界的連接設(shè)備,以及帶寬和電池功率非常高的移動應(yīng)用的理想選擇。
主要模式及圖示
MQTT的主要模式是發(fā)布/訂閱(PUBLISH/SUBSCRIBE)模式,簡單圖示如下:

服務(wù)器(server)在MQTT中被稱作消息服務(wù)器(Broker),而客戶端(client)可以是發(fā)布者(publisher)也可以是訂閱者(subscriber),client也可以同時是publisher和subscriber。
在發(fā)布者發(fā)布主題(topic)之后,訂閱者就可以訂閱該主題,之后就會收到所訂閱主題的消息(payload),消息的內(nèi)容叫做負載(payload)。一個發(fā)布者可以發(fā)布多個主題,一個訂閱者同樣可以訂閱多個主題,一個主題可以有多個訂閱者。事實上這種模式解耦了傳統(tǒng)的client跟server的關(guān)系。不必預(yù)先知道對方的存在(ip/port), 不必同時運行。
MQTT特點
開放消息協(xié)議,簡單易實現(xiàn)
發(fā)布訂閱模式,一對多消息發(fā)布
基于TCP/IP網(wǎng)絡(luò)連接,提供有序,無損,雙向連接。(事實上也有以UDP實現(xiàn)的,比如MQTT-SN)
1字節(jié)固定報頭,2字節(jié)心跳報文,負載小,最小化傳輸開銷和協(xié)議交換,有效減少網(wǎng)絡(luò)流量。
消息QoS支持,可靠傳輸保證
QoS=0:“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。這一種方式主要普通APP的推送,倘若你的智能設(shè)備在消息推送時未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就收不到了。
QoS=1:“至少—次”,確保消息到達,但消息重復(fù)可能會發(fā)生。
QoS=2:“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統(tǒng)中,可以使用此級別。在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。這種最高質(zhì)量的消息發(fā)布服務(wù)還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。
主題分級
主題可以分級,以“/”隔開,比如 a/b/c/d
Suscriber在訂閱主題的時候,也可以包含通配符,通配符有兩種“+”和“#”。+表示一個任意層級,比如 a/+,可以同時匹配a/b或者a/c,但匹配不到a/b/c,而#表示往后多個層級,比如a/#可以匹配a/b/c/d。+可以放在中間層級,比如a/+/c,而#只能放在末尾。
Publisher發(fā)布主題的時候只能指定一個清晰的主題名,具體到a/b/c/d某個主題,不能以通配符的形式指定。
環(huán)境搭建
Broker搭建
選擇比較常用的emqx作為Broker環(huán)境搭建:
登錄到https://www.emqx.io/zh/downloads?os=Ubuntu 選擇你對應(yīng)的服務(wù)器版本,我使用的是ubuntu22.4
登錄進網(wǎng)頁中,會顯示對應(yīng)版本的指令,在服務(wù)器中依次執(zhí)行指令安裝并啟動emqx:
wget https://www.emqx.com/zh/downloads/broker/5.0.20/emqx-5.0.20-ubuntu22.04-amd64.deb
sudo apt install ./emqx-5.0.20-ubuntu22.04-amd64.deb
sudo systemctl start emqx
創(chuàng)建賬戶,用戶名和密碼替換為你自己想設(shè)置的值:
sudo emqx ctl admins add 用戶名 密碼
使用創(chuàng)建的用戶名密碼登錄到emqx,端口一般都是指定18083
http://服務(wù)器IP:18083
我的是本地服務(wù)器:http://192.168.200.128:18083/#/login?to=/dashboard/overview

登陸之后可以查看各種信息,后續(xù)寫代碼的時候再細看:

Python下mqtt環(huán)境安裝
我們使用paho-mqtt:
pip install paho-mqtt
client代碼實現(xiàn)
創(chuàng)建工程目錄并新建兩個py文件:

查看mqtt client類初始化參數(shù):
def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
? ? ? ? """client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then the behaviour is
defined by which protocol version is in use. If using MQTT v3.1.1, then
a zero length client id will be sent to the broker and the broker will
generate a random for the client. If using MQTT v3.1 then an id will be
randomly generated. In both cases, clean_session must be True. If this
is not the case a ValueError will be raised.
clean_session is a boolean that determines the client type. If True,
the broker will remove all information about this client when it
disconnects. If False, the client is a persistent client and
subscription information and queued messages will be retained when the
client disconnects.
Note that a client will never discard its own outgoing messages on
disconnect. Calling connect() or reconnect() will cause the messages to
be resent. Use reinitialise() to reset a client to its original state.
The clean_session argument only applies to MQTT versions v3.1.1 and v3.1.
It is not accepted if the MQTT version is v5.0 - use the clean_start
argument on connect() instead.
userdata is user defined data of any type that is passed as the "userdata"
parameter to callbacks. It may be updated at a later point with the
user_data_set() function.
The protocol argument allows explicit setting of the MQTT version to
use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1),
paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0),
with the default being v3.1.1.
Set transport to "websockets" to use WebSockets as the transport
mechanism. Set to "tcp" to use raw TCP, which is the default.
"""
我們可以試著自定義client_id,其他的使用默認即可。
clean_session(bool, disconnect后是否清除數(shù)據(jù),默認為True)、
user_data(str, 用戶自定義數(shù)據(jù))
protocol(enum,協(xié)議版本為MQTTv311或者MQTTv5,默認為MQTTv311)
transport(tcp或者websocket,默認為tcp)
reconnect_on_failure(bool, connect失敗后是否自動重新connect,默認為True)
on_connect函數(shù)是client connect之后的回調(diào)函數(shù),我們可以自定義回調(diào)函數(shù)
def on_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
**IMPORTANT** the required function signature for a callback can differ
depending on whether you are using MQTT v5 or MQTT v3.1.1/v3.1. See the
documentation for each callback.
All of the callbacks as described below have a "client" and an "userdata"
argument. "client" is the Client instance that is calling the callback.
"userdata" is user data of any type and can be set when creating a new client
instance or with user_data_set(userdata).
If you wish to suppress exceptions within a callback, you should set
`client.suppress_exceptions = True`
The callbacks are listed below, documentation for each of them can be found
at the same function name:
on_connect, on_connect_fail, on_disconnect, on_message, on_publish,
on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close,
on_socket_register_write, on_socket_unregister_write
"""
查看on_connect的說明,需要多個參數(shù):
client:client對象實例
userdata:自定義數(shù)據(jù),創(chuàng)建client時傳入的
flags:broker回應(yīng)的標記
rc:返回值,返回0為成功連接
基本的信息了解清楚后,嘗試寫Publisher例程代碼,主函數(shù)中簡單地死循環(huán)發(fā)布topic,這樣訂閱者就可以持續(xù)收到topic的payload message:
import random
import time
from paho.mqtt import client as mqtt_client
class Mqtt_Publisher:
def __init__(self, broker_ip='192.168.200.128', client_prefix="pub_", port=1883, timeout=60):
self.broker_ip = broker_ip #server ip
self.port = port #network port
self.timeout = timeout #connect timeout time
self.connected = False
self.client_id = client_prefix + str(random.randint(10000,99999)) #create an random integer as client id
self.start()
def start(self):
self.client = mqtt_client.Client(self.client_id)
self.client.on_connect = self.on_connect
self.client.connect(self.broker_ip, self.port, self.timeout)
self.client.loop_start() #default loop to try connection forever until you call disconnect()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
else:
raise Exception("Failed to connect mqtt server")
def publish(self, topic, payload, qos=0):
if self.connected:
return self.client.publish(topic, payload=payload, qos=qos)
else:
raise Exception("mqtt server not connected, cannot publish topic")
if __name__=='__main__':
pub=Mqtt_Publisher()
while not pub.connected: #waiting for client connection
time.sleep(0.05)
print("publisher connect successfully")
while True:
pub.publish('topic_test','this is a test message')
pub.publish('topic_test1/a','this is level2 testa')
pub.publish('topic_test1/b','this is level2 testb')
pub.publish('topic_test1/a/c','this is level3 testc')
time.sleep(1)
雖然publish了四次,事實上這是兩個topic,只是第二個topic分成了多個級別。
執(zhí)行代碼,查看broker:


可以看到client的連接狀態(tài),已經(jīng)有一個publisher連接了,并且每秒有4個消息。
接著編寫Subscriber的代碼:
需要多一個修改publish接口為subscribe接口,另外額外需要一個回調(diào)函數(shù)on_message處理topic發(fā)出的payload, 查看on_message_callback的示例:
on_message_callback(client, userdata, message)
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
message: an instance of MQTTMessage.
This is a class with members topic, payload, qos, retain.
我們簡單地實現(xiàn)為打印出payload即可:
def on_message(self, client, userdata, msg): #after subscribing topic, will get topic message, this is the callback function
print(msg.payload.decode('utf-8')) #simply print message
上subscriber完整代碼:
import random
import time
from paho.mqtt import client as mqtt_client
class Mqtt_Subscriber:
def __init__(self, broker_ip='192.168.200.128', client_prefix="sub_", port=1883, timeout=60, topic_name="topic_test"):
self.broker_ip = broker_ip #server ip
self.port = port #network port
self.timeout = timeout #connect timeout time
self.topic_name = topic_name
self.connected = False
self.client_id = client_prefix + str(random.randint(10000,99999)) #create an random integer as client id
self.start()
def start(self):
self.client = mqtt_client.Client(self.client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_ip, self.port, self.timeout)
self.client.subscribe(self.topic_name)
self.client.loop_start() #default loop to try connection forever until you call disconnect()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
else:
raise Exception("Failed to connect mqtt server")
def on_message(self, client, userdata, msg): #after subscribing topic, will get topic message, this is the callback function
print(msg.payload.decode('utf-8')) #simply print message
def subscribe(self, topic, qos=0):
if self.connected:
return self.client.subscribe(topic, qos=qos, options=None, properties=None)
else:
raise Exception("mqtt server not connected, cannot publish topic")
if __name__=='__main__':
sub=Mqtt_Subscriber(topic_name="topic_test1")
while not sub.connected: #waiting for client connection
time.sleep(0.05)
print("subsciber connect successfully")
? ? sub.subscribe(topic="topic_test")
while True:
time.sleep(1)
執(zhí)行subscriber,發(fā)現(xiàn)topic_test1沒有指定到下一級別,并不能成功訂閱,必須指定具體的級別:

修改代碼為“+”通配符:
if __name__=='__main__':
sub=Mqtt_Subscriber(topic_name="topic_test1/+")
while not sub.connected: #waiting for client connection
time.sleep(0.05)
print("subsciber connect successfully")
sub.subscribe(topic="topic_test")
while True:
time.sleep(1)
可以看到訂閱到了test1/a和test1/b 還有test:

測試通配符“#”:
if __name__=='__main__':
sub=Mqtt_Subscriber(topic_name="topic_test1/#")
while not sub.connected: #waiting for client connection
time.sleep(0.05)
print("subsciber connect successfully")
sub.subscribe(topic="topic_test")
while True:
time.sleep(1)
可以看到訂閱到所有的topic了。

對應(yīng)查看broker狀態(tài),此時可以看到subsciber和publisher

訂閱后也可以看到topic的數(shù)量,沒有subscribe的時候并不能看到:

思維導(dǎo)圖總結(jié)

代碼已添加到github:https://github.com/Sampsin/learning.git文章來源:http://www.zghlxwxcb.cn/news/detail-761571.html
就到這里。文章來源地址http://www.zghlxwxcb.cn/news/detail-761571.html
到了這里,關(guān)于MQTT概述及環(huán)境搭建、python例程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!