項目需求:原本做的項目是單進程單線程模式訂閱mqtt,發(fā)現在消息回調處理消息時耗時較久,我們業(yè)務對消息處理是一次性的,只要求處理一次,所以需要提升并發(fā)處理能力??戳司W上建議改為多線程模式,然而本人實踐過程,采用多進程or多線程模式方式運行,發(fā)現并沒達到預期效果。下面時本人的一下實踐記錄,僅供參考學習。
環(huán)境:python3.7
本地mqtt服務使用的emqx
操作工具用的MQTTX客戶端
?1、下面是mqtt多線程模式運行代碼實現,只實現消息訂閱端。
import random, string
from paho.mqtt.client import Client
from threading import Thread
broker = '192.168.8.205'
port = 1883
topic = "python-mqtt"
def connect_mqtt():
def on_connect(_, __, ___, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client_id = f"test-client_{''.join(random.choice(string.ascii_lowercase) for _ in range(4))}"
client = Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client):
def on_message(_, __, mesgage):
print(f"Received `{mesgage.payload.decode()}` from `{mesgage.topic}` topic\n")
client.subscribe(topic)
client.on_message = on_message
def main():
c = connect_mqtt()
subscribe(c)
c.loop_forever()
# c.loop_start()
if __name__ == '__main__':
lt = []
for i in range(10):
t = Thread(target=main, args=(), name=f'thread-{i}')
lt.append(t)
for t in lt:
t.start()
print(t.name)
for t in lt:
t.join()
?2、MQTTX連接發(fā)送消息:
?3、運行效果,發(fā)現發(fā)布一條消息可以被接收10次(訂閱客戶端10個,分別被處理了10次),而我的需求是想要發(fā)布一條消息,被這個10個客戶端之一消費掉,且只處理一次。
代碼參考前文,修改如下
?4、后來又換個思路,嘗試了下還是10個線程,客戶端唯一(client_id唯一),這種模式由于mqtt協議要求客戶端唯一,導致10個線程并發(fā)啟動,出現搶占式連接mqqt服務,出現不停的斷開連接,重新連接。這種模式下運行客戶端實際也只有一個,訂閱處理等能力同于一個線程模式下的客戶端方式,也無法達到預期。
-----------------------------------------------分割線-------------------------------?
后面無意間找到一篇文章,了解到mqtt服務有一種叫共享訂閱模式。以emqx為例,emqx支持兩種格式的共享訂閱前綴:$share/topic 和$queue/topic,然后通過修改emqx服務的配置etc/emqx.conf
如圖:
消息發(fā)布時,topic配置不變,訂閱時,沿用$share/topic 和$queue/topic即可。
代碼參考前文修改如下:
?運行效果:
?可以看到程序運行只收到一條消息了
?參考文章:
? ?1.(mqtt集群訂閱如何只消費一個(一次)消息? - 程序新視界)文章來源:http://www.zghlxwxcb.cn/news/detail-432557.html
? ?2.(paho-mqtt 實現通信_nuc_baixu的博客-CSDN博客_paho mqtt)文章來源地址http://www.zghlxwxcb.cn/news/detail-432557.html
到了這里,關于paho-mqtt實現多客戶端訂閱一個主題,并保證消息只被接收一次的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!