国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

MQTT概述及環(huán)境搭建、python例程

這篇具有很好參考價值的文章主要介紹了MQTT概述及環(huán)境搭建、python例程。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

  1. 什么是MQTT

簡介

MQTT(英文全稱Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議)。

MQTT是一種輕量級的協(xié)議,適用于需要較小代碼占用空間或網(wǎng)絡(luò)帶寬非常寶貴的遠程連接,是專為受限設(shè)備和低帶寬、高延遲或不可靠的網(wǎng)絡(luò)而設(shè)計。這些原則也使該協(xié)議成為新興的“機器到機器”(M2M)或物聯(lián)網(wǎng)(IoT)世界的連接設(shè)備,以及帶寬和電池功率非常高的移動應(yīng)用的理想選擇。

主要模式及圖示

MQTT的主要模式是發(fā)布/訂閱(PUBLISH/SUBSCRIBE)模式,簡單圖示如下:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

服務(wù)器(server)在MQTT中被稱作消息服務(wù)器(Broker),而客戶端(client)可以是發(fā)布者(publisher)也可以是訂閱者(subscriber),client也可以同時是publisher和subscriber。

在發(fā)布者發(fā)布主題(topic)之后,訂閱者就可以訂閱該主題,之后就會收到所訂閱主題的消息(payload),消息的內(nèi)容叫做負載(payload)。一個發(fā)布者可以發(fā)布多個主題,一個訂閱者同樣可以訂閱多個主題,一個主題可以有多個訂閱者。事實上這種模式解耦了傳統(tǒng)的client跟server的關(guān)系。不必預(yù)先知道對方的存在(ip/port), 不必同時運行。

MQTT特點

  • 開放消息協(xié)議,簡單易實現(xiàn)

  • 發(fā)布訂閱模式,一對多消息發(fā)布

  • 基于TCP/IP網(wǎng)絡(luò)連接,提供有序,無損,雙向連接。(事實上也有以UDP實現(xiàn)的,比如MQTT-SN)

  • 1字節(jié)固定報頭,2字節(jié)心跳報文,負載小,最小化傳輸開銷和協(xié)議交換,有效減少網(wǎng)絡(luò)流量。

  • 消息QoS支持,可靠傳輸保證

QoS=0:“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。這一種方式主要普通APP的推送,倘若你的智能設(shè)備在消息推送時未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就收不到了。

QoS=1:“至少—次”,確保消息到達,但消息重復(fù)可能會發(fā)生。

QoS=2:“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統(tǒng)中,可以使用此級別。在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。這種最高質(zhì)量的消息發(fā)布服務(wù)還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。

主題分級

  • 主題可以分級,以“/”隔開,比如 a/b/c/d

  • Suscriber在訂閱主題的時候,也可以包含通配符,通配符有兩種“+”和“#”。+表示一個任意層級,比如 a/+,可以同時匹配a/b或者a/c,但匹配不到a/b/c,而#表示往后多個層級,比如a/#可以匹配a/b/c/d。+可以放在中間層級,比如a/+/c,而#只能放在末尾。

  • Publisher發(fā)布主題的時候只能指定一個清晰的主題名,具體到a/b/c/d某個主題,不能以通配符的形式指定。

  1. 環(huán)境搭建

Broker搭建

選擇比較常用的emqx作為Broker環(huán)境搭建:

  • 登錄到https://www.emqx.io/zh/downloads?os=Ubuntu 選擇你對應(yīng)的服務(wù)器版本,我使用的是ubuntu22.4

  • 登錄進網(wǎng)頁中,會顯示對應(yīng)版本的指令,在服務(wù)器中依次執(zhí)行指令安裝并啟動emqx:

wget https://www.emqx.com/zh/downloads/broker/5.0.20/emqx-5.0.20-ubuntu22.04-amd64.deb
sudo apt install ./emqx-5.0.20-ubuntu22.04-amd64.deb
sudo systemctl start emqx
  • 創(chuàng)建賬戶,用戶名和密碼替換為你自己想設(shè)置的值:

sudo emqx ctl admins add 用戶名 密碼
  • 使用創(chuàng)建的用戶名密碼登錄到emqx,端口一般都是指定18083

http://服務(wù)器IP:18083

我的是本地服務(wù)器:http://192.168.200.128:18083/#/login?to=/dashboard/overview

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔
  • 登陸之后可以查看各種信息,后續(xù)寫代碼的時候再細看:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

Python下mqtt環(huán)境安裝

我們使用paho-mqtt:

pip install paho-mqtt
  1. client代碼實現(xiàn)

  • 創(chuàng)建工程目錄并新建兩個py文件:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔
  • 查看mqtt client類初始化參數(shù):

def __init__(self, client_id="", clean_session=None, userdata=None,
                 protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
? ? ? ? """client_id is the unique client id string used when connecting to the
        broker. If client_id is zero length or None, then the behaviour is
        defined by which protocol version is in use. If using MQTT v3.1.1, then
        a zero length client id will be sent to the broker and the broker will
        generate a random for the client. If using MQTT v3.1 then an id will be
        randomly generated. In both cases, clean_session must be True. If this
        is not the case a ValueError will be raised.

        clean_session is a boolean that determines the client type. If True,
        the broker will remove all information about this client when it
        disconnects. If False, the client is a persistent client and
        subscription information and queued messages will be retained when the
        client disconnects.
        Note that a client will never discard its own outgoing messages on
        disconnect. Calling connect() or reconnect() will cause the messages to
        be resent.  Use reinitialise() to reset a client to its original state.
        The clean_session argument only applies to MQTT versions v3.1.1 and v3.1.
        It is not accepted if the MQTT version is v5.0 - use the clean_start
        argument on connect() instead.

        userdata is user defined data of any type that is passed as the "userdata"
        parameter to callbacks. It may be updated at a later point with the
        user_data_set() function.

        The protocol argument allows explicit setting of the MQTT version to
        use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1),
        paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0),
        with the default being v3.1.1.

        Set transport to "websockets" to use WebSockets as the transport
        mechanism. Set to "tcp" to use raw TCP, which is the default.
        """
  • 我們可以試著自定義client_id,其他的使用默認即可。

clean_session(bool, disconnect后是否清除數(shù)據(jù),默認為True)、

user_data(str, 用戶自定義數(shù)據(jù))

protocol(enum,協(xié)議版本為MQTTv311或者MQTTv5,默認為MQTTv311)

transport(tcp或者websocket,默認為tcp)

reconnect_on_failure(bool, connect失敗后是否自動重新connect,默認為True)

  • on_connect函數(shù)是client connect之后的回調(diào)函數(shù),我們可以自定義回調(diào)函數(shù)

def on_connect(client, userdata, flags, rc):
        print("Connection returned " + str(rc))


    **IMPORTANT** the required function signature for a callback can differ
    depending on whether you are using MQTT v5 or MQTT v3.1.1/v3.1. See the
    documentation for each callback.

    All of the callbacks as described below have a "client" and an "userdata"
    argument. "client" is the Client instance that is calling the callback.
    "userdata" is user data of any type and can be set when creating a new client
    instance or with user_data_set(userdata).

    If you wish to suppress exceptions within a callback, you should set
    `client.suppress_exceptions = True`

    The callbacks are listed below, documentation for each of them can be found
    at the same function name:

    on_connect, on_connect_fail, on_disconnect, on_message, on_publish,
    on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close,
    on_socket_register_write, on_socket_unregister_write
    """

查看on_connect的說明,需要多個參數(shù):

client:client對象實例

userdata:自定義數(shù)據(jù),創(chuàng)建client時傳入的

flags:broker回應(yīng)的標記

rc:返回值,返回0為成功連接

基本的信息了解清楚后,嘗試寫Publisher例程代碼,主函數(shù)中簡單地死循環(huán)發(fā)布topic,這樣訂閱者就可以持續(xù)收到topic的payload message:

import random
import time
from paho.mqtt import client as mqtt_client

class Mqtt_Publisher:
    def __init__(self, broker_ip='192.168.200.128', client_prefix="pub_", port=1883, timeout=60):
        self.broker_ip = broker_ip #server ip
        self.port = port #network port
        self.timeout = timeout #connect timeout time
        self.connected = False
        self.client_id = client_prefix + str(random.randint(10000,99999)) #create an random integer as client id
        self.start()

    def start(self):
        self.client = mqtt_client.Client(self.client_id)
        self.client.on_connect = self.on_connect
        self.client.connect(self.broker_ip, self.port, self.timeout)
        self.client.loop_start() #default loop to try connection forever until you call disconnect()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = True
        else:
            raise Exception("Failed to connect mqtt server")
        
    def publish(self, topic, payload, qos=0):
        if self.connected:
            return self.client.publish(topic, payload=payload, qos=qos)
        else:
            raise Exception("mqtt server not connected, cannot publish topic")
        
if __name__=='__main__':
    pub=Mqtt_Publisher()
    while not pub.connected: #waiting for client connection
        time.sleep(0.05)
    print("publisher connect successfully")
    while True:
        pub.publish('topic_test','this is a test message')
        pub.publish('topic_test1/a','this is level2 testa')
        pub.publish('topic_test1/b','this is level2 testb')
        pub.publish('topic_test1/a/c','this is level3 testc')
        time.sleep(1)

雖然publish了四次,事實上這是兩個topic,只是第二個topic分成了多個級別。

執(zhí)行代碼,查看broker:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔
python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

可以看到client的連接狀態(tài),已經(jīng)有一個publisher連接了,并且每秒有4個消息。

  • 接著編寫Subscriber的代碼:

需要多一個修改publish接口為subscribe接口,另外額外需要一個回調(diào)函數(shù)on_message處理topic發(fā)出的payload, 查看on_message_callback的示例:

on_message_callback(client, userdata, message)
    client:     the client instance for this callback
    userdata:   the private user data as set in Client() or userdata_set()
    message:    an instance of MQTTMessage.
                This is a class with members topic, payload, qos, retain.

我們簡單地實現(xiàn)為打印出payload即可:

 def on_message(self, client, userdata, msg): #after subscribing topic, will get topic message, this is the callback function
        print(msg.payload.decode('utf-8')) #simply print message

上subscriber完整代碼:

import random
import time
from paho.mqtt import client as mqtt_client

class Mqtt_Subscriber:
    def __init__(self, broker_ip='192.168.200.128', client_prefix="sub_", port=1883, timeout=60, topic_name="topic_test"):
        self.broker_ip = broker_ip #server ip
        self.port = port #network port
        self.timeout = timeout #connect timeout time
        self.topic_name = topic_name
        self.connected = False
        self.client_id = client_prefix + str(random.randint(10000,99999)) #create an random integer as client id
        self.start()

    def start(self):
        self.client = mqtt_client.Client(self.client_id)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(self.broker_ip, self.port, self.timeout)
        self.client.subscribe(self.topic_name)
        self.client.loop_start() #default loop to try connection forever until you call disconnect()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = True
        else:
            raise Exception("Failed to connect mqtt server")
        
    def on_message(self, client, userdata, msg): #after subscribing topic, will get topic message, this is the callback function
        print(msg.payload.decode('utf-8')) #simply print message
        
    def subscribe(self, topic, qos=0):
        if self.connected:
            return self.client.subscribe(topic, qos=qos, options=None, properties=None)
        else:
            raise Exception("mqtt server not connected, cannot publish topic")
        
if __name__=='__main__':
    sub=Mqtt_Subscriber(topic_name="topic_test1")
    while not sub.connected: #waiting for client connection
        time.sleep(0.05)
    print("subsciber connect successfully")
? ? sub.subscribe(topic="topic_test")
    while True:
        time.sleep(1)

執(zhí)行subscriber,發(fā)現(xiàn)topic_test1沒有指定到下一級別,并不能成功訂閱,必須指定具體的級別:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

修改代碼為“+”通配符:

if __name__=='__main__':
    sub=Mqtt_Subscriber(topic_name="topic_test1/+")
    while not sub.connected: #waiting for client connection
        time.sleep(0.05)
    print("subsciber connect successfully")
    sub.subscribe(topic="topic_test")
    while True:
        time.sleep(1)

可以看到訂閱到了test1/a和test1/b 還有test:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

測試通配符“#”:

if __name__=='__main__':
    sub=Mqtt_Subscriber(topic_name="topic_test1/#")
    while not sub.connected: #waiting for client connection
        time.sleep(0.05)
    print("subsciber connect successfully")
    sub.subscribe(topic="topic_test")
    while True:
        time.sleep(1)

可以看到訂閱到所有的topic了。

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

對應(yīng)查看broker狀態(tài),此時可以看到subsciber和publisher

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

訂閱后也可以看到topic的數(shù)量,沒有subscribe的時候并不能看到:

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔
  1. 思維導(dǎo)圖總結(jié)

python mqtt服務(wù)器,Python,嵌入式物聯(lián)網(wǎng),python,物聯(lián)網(wǎng),網(wǎng)絡(luò)協(xié)議,Powered by 金山文檔

代碼已添加到github:https://github.com/Sampsin/learning.git

就到這里。文章來源地址http://www.zghlxwxcb.cn/news/detail-761571.html

到了這里,關(guān)于MQTT概述及環(huán)境搭建、python例程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • MQTT:windows環(huán)境下配置MQTT服務(wù)器(mosquitto)

    MQTT:windows環(huán)境下配置MQTT服務(wù)器(mosquitto)

    目錄 1.下載 mosquitto 2.安裝?mosquitto? 3.配置?mosquitto? 4.測試 mosquitto ????????登錄網(wǎng)址: ????????http://mosquitto.org/files/binary/ ????????這里是window環(huán)境,選擇win32/,下載mosquitto安裝包。 ? ? ? ? ?雙擊安裝 - 點擊Next - 點擊Next - 選擇安裝路徑 - 點擊install - 點擊Finish。

    2024年02月11日
    瀏覽(39)
  • 自己搭建mqtt服務(wù)器

    自己搭建mqtt服務(wù)器

    ??????? 前言:網(wǎng)上資料大部分都是使用的云服務(wù),我是采用自己搭建的服務(wù)器來進行試驗的,接下來將記錄過程。 云服務(wù)器有很多種網(wǎng)上也有很多教學(xué)在這里不進行過多的解釋了,我實驗的時候采用的阿里云國內(nèi)的服務(wù)器這里以后還會進行介紹。 ????????本實驗主要

    2024年02月03日
    瀏覽(18)
  • mqtt服務(wù)器搭建與qt下的mqtt客戶端實現(xiàn)

    mqtt服務(wù)器搭建與qt下的mqtt客戶端實現(xiàn)

    ??MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(Io

    2024年02月06日
    瀏覽(26)
  • 個人云服務(wù)器搭建MQTT服務(wù)器

    個人云服務(wù)器搭建MQTT服務(wù)器

    ??????????相關(guān)文章?????????? ESP32連接MQ Sensor實現(xiàn)氣味反應(yīng) ?? https://blog.csdn.net/ws15168689087/article/details/131365573 ESP32連接云服務(wù)器【W(wǎng)ebSocket】 ?? https://blog.csdn.net/ws15168689087/article/details/131406163 ESP32+MQTT+MySQL實現(xiàn)發(fā)布訂閱【氣味數(shù)據(jù)收集】 ?? https://blog.csdn.net/ws1516868

    2024年02月15日
    瀏覽(36)
  • Windows下搭建MQTT服務(wù)器

    Windows下搭建MQTT服務(wù)器

    MQ遙測傳輸(MQTT)是輕量級基于代理的發(fā)布/訂閱的消息傳輸協(xié)議,設(shè)計思想是開放、簡單、輕量、易于實現(xiàn)。這些特點使它適用于低帶寬受限環(huán)境。 特點包括以下: 使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。 對負載內(nèi)容屏蔽的消息傳輸。 使用

    2024年02月03日
    瀏覽(33)
  • 本地MQTT服務(wù)器搭建(EMQX)

    本地MQTT服務(wù)器搭建(EMQX)

    下載地址:EMQ (emqx.com) 打開官網(wǎng)后,選擇右邊的免費試用按鈕 然后單擊EMQX Enterprise標簽,然后選擇下面的EMQX開源版,選擇開源版的系統(tǒng)平臺為Windows,單擊免費下載。 在新頁面下單擊立即下載 將下載的emqx-5.1.6-windows-amd64.zip解壓出來,解壓目錄不能存在中文、空格、特殊字符

    2024年02月09日
    瀏覽(28)
  • 阿里云服務(wù)器如何搭建MQTT服務(wù)器

    阿里云服務(wù)器如何搭建MQTT服務(wù)器

    入門教程:鏈接 將系統(tǒng)配置成Ubuntu18的(因為我只會用這個系統(tǒng)) 在實例處停止當前系統(tǒng)的運行,然后依次選擇2,下拉菜單找到3進行更換系統(tǒng),更換完成以后重啟就好了。 如下圖,依次點擊1-4的按鈕,第五步需要重新設(shè)置系統(tǒng)的密碼 Xshell下載鏈接 安裝完成后打開Xshell按照

    2024年02月03日
    瀏覽(28)
  • Linux搭建MQTT服務(wù)器(Mosquitto)

    Linux搭建MQTT服務(wù)器(Mosquitto)

    編譯時,若提示fatal error: cjson/cJSON.h: No such file or directory,需要安裝cJSON,然后重新安裝mosquitto。 若不添加軟連接,發(fā)布、訂閱消息時會提示\\\"error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory\\\"。 打開兩個服務(wù)器連接,分別執(zhí)行mosquitto_sub、

    2024年02月09日
    瀏覽(32)
  • 快速搭建個人MQTT服務(wù)器(基于EMQX)

    快速搭建個人MQTT服務(wù)器(基于EMQX)

    4分鐘快速搭建個人MQTT服務(wù)器(基于EMQX) 相信看到這篇教程的人應(yīng)該對MQTT協(xié)議有了一定的了解。其實提供MQTT服務(wù)的廠商也有很多,比較知名的有EMQX等。EMQX雖然提供了免費的公共MQTT5服務(wù)器。 但是對于多個用戶利用公共服務(wù)器同時訂閱或發(fā)布同一主題內(nèi)容時,可能會接收到

    2024年02月03日
    瀏覽(27)
  • linux下MQTT服務(wù)器(EMQX)搭建及paho.mqtt.c客戶端開發(fā)

    linux下MQTT服務(wù)器(EMQX)搭建及paho.mqtt.c客戶端開發(fā)

    前言: MQTT 是一種基于客戶端服務(wù)端架構(gòu)的發(fā)布 / 訂閱模式的消息傳輸協(xié)議。它的設(shè)計思想是輕巧、開放、 簡單、規(guī)范,易于實現(xiàn)。這些特點使得它對很多場景來說都是很好的選擇,特別是對于受限的環(huán)境如機器與 機器的通信( M2M )以及物聯(lián)網(wǎng)環(huán)境( IoT )。? ? ? ? ---

    2024年02月06日
    瀏覽(38)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包