大家好,這篇文章給大家介紹MQTT協(xié)議以及如何在OpenWrt系統(tǒng)中使用MQTT客戶端和開發(fā),并給出相關(guān)實(shí)例代碼。
MQTT簡介
MQTT(Message Queuing Telemetry Transport)是一種輕量級的通信協(xié)議,設(shè)計(jì)用于在低帶寬和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中傳輸消息。它最初由IBM開發(fā),用于連接遠(yuǎn)程設(shè)備和傳感器到網(wǎng)絡(luò),并支持發(fā)布/訂閱模型的消息通信。
MQTT被廣泛用于物聯(lián)網(wǎng)(IoT)領(lǐng)域,其中大量的設(shè)備需要進(jìn)行實(shí)時通信和數(shù)據(jù)交換。它采用了一種發(fā)布/訂閱(publish/subscribe)模型,其中消息的發(fā)送者(發(fā)布者)將消息發(fā)布到特定的主題(topic),而訂閱者可以選擇性地訂閱感興趣的主題,以接收相應(yīng)的消息。
MQTT特點(diǎn)
以下是MQTT的一些關(guān)鍵特點(diǎn):
輕量級:MQTT的設(shè)計(jì)非常輕量,協(xié)議頭部非常小,傳輸?shù)臄?shù)據(jù)量很小,適用于帶寬有限的網(wǎng)絡(luò)環(huán)境,如低速、高延遲或不穩(wěn)定的網(wǎng)絡(luò)。
簡單:MQTT的協(xié)議規(guī)范相對簡單,易于實(shí)現(xiàn)和部署。它定義了少量的消息類型和協(xié)議操作,使得開發(fā)人員可以快速上手。
異步通信:MQTT使用異步通信模式,發(fā)布者發(fā)送消息后,不需要等待接收者的響應(yīng),可以繼續(xù)執(zhí)行其他操作。這種異步通信模式適合在資源有限的設(shè)備和網(wǎng)絡(luò)中工作。
可靠性:MQTT支持三種不同的消息傳遞質(zhì)量(QoS)級別:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。這使得可以根據(jù)應(yīng)用程序的要求選擇適當(dāng)?shù)南⒔桓侗WC級別。
網(wǎng)絡(luò)狀況適應(yīng)性:MQTT可以適應(yīng)不穩(wěn)定的網(wǎng)絡(luò)狀況,如網(wǎng)絡(luò)中斷、重連等。它具有斷開連接后自動重連的機(jī)制,可以確保消息的可靠傳輸。
訂閱和發(fā)布模型
Publisher(發(fā)布者):發(fā)布者是消息的發(fā)送者,它將消息發(fā)布到特定的主題(topic)上。可以有一個或多個發(fā)布者。
Subscriber(訂閱者):訂閱者是對消息感興趣的實(shí)體,它選擇性地訂閱一個或多個主題。一旦訂閱了主題,它就會接收到相應(yīng)的消息。
MQTT Broker(MQTT代理):MQTT代理是中間件,負(fù)責(zé)接收發(fā)布者發(fā)送的消息,并將其路由到對應(yīng)的訂閱者。它維護(hù)著主題和訂閱關(guān)系的注冊表,并確保消息的可靠傳遞。
工作流程如下:
發(fā)布者將消息發(fā)布到特定的主題上。
MQTT代理接收到消息后,根據(jù)訂閱者的注冊信息,將消息路由到對應(yīng)的訂閱者。
訂閱者接收到發(fā)布者發(fā)布的消息,并進(jìn)行相應(yīng)的處理。
通過發(fā)布/訂閱模型,MQTT允許實(shí)現(xiàn)解耦和靈活性,發(fā)布者和訂閱者之間不需要直接的點(diǎn)對點(diǎn)連接,而是通過MQTT代理進(jìn)行中轉(zhuǎn)和路由。這種模型非常適合在物聯(lián)網(wǎng)中進(jìn)行大規(guī)模設(shè)備間的通信和數(shù)據(jù)交換。
MQTT QoS
MQTT(Message Queuing Telemetry Transport)協(xié)議支持三種不同的QoS(Quality of Service)級別,用于控制消息的可靠性和傳輸保證。以下是MQTT的三個QoS級別:
QoS 0(至多一次):
在QoS 0級別下,消息以“至多一次”傳輸,沒有確認(rèn)機(jī)制。消息被發(fā)布后,發(fā)布者不會接收到關(guān)于消息是否成功傳輸或交付的確認(rèn)。MQTT代理會盡最大努力將消息傳輸給訂閱者,但可能會出現(xiàn)消息丟失或重復(fù)的情況。此級別適用于對消息傳輸?shù)目煽啃砸蟛桓叩膱鼍?,如傳感器?shù)據(jù)的臨時更新等。
QoS 1(至少一次):
在QoS 1級別下,消息以“至少一次”傳輸,確保至少傳輸一次。發(fā)布者發(fā)送消息后,會等待MQTT代理發(fā)送確認(rèn)消息(PUBACK)來確認(rèn)消息的接收。
如果發(fā)布者沒有收到確認(rèn)消息,它會再次發(fā)送相同的消息,直到收到確認(rèn)為止。MQTT代理會確保消息至少傳輸一次給訂閱者,但可能會出現(xiàn)重復(fù)傳輸?shù)那闆r。此級別適用于對消息傳輸?shù)目煽啃砸筝^高的場景,如控制指令的傳遞。
QoS 2(只有一次):
在QoS 2級別下,消息以“只有一次”傳輸,確保僅傳輸一次。發(fā)布者發(fā)送消息后,會等待MQTT代理發(fā)送兩個確認(rèn)消息(PUBREC和PUBCOMP)來確認(rèn)消息的接收和完成。MQTT代理會確保消息僅傳輸一次給訂閱者,沒有重復(fù)傳輸?shù)那闆r。
此級別提供了最高的消息傳輸可靠性,但也伴隨著更高的網(wǎng)絡(luò)開銷。此級別適用于對消息傳輸?shù)目煽啃砸蠓浅8叩膱鼍埃缃鹑诮灰谆驀?yán)格的數(shù)據(jù)同步。選擇合適的QoS級別取決于應(yīng)用程序?qū)ο鬏斂煽啃院途W(wǎng)絡(luò)開銷的要求。更高的QoS級別提供了更可靠的傳輸,但同時也增加了網(wǎng)絡(luò)開銷。因此,需要根據(jù)具體場景的需求來選擇適當(dāng)?shù)募墑e。
OpenWrt中使用mosquitto
插件安裝
默認(rèn)是沒有包含mosquitto客戶端和broker的,我們可以手動安裝相關(guān)插件,為了測試我們需要安裝broker和client
首先更新openwrt軟件源
opkg?update
然后調(diào)用以下命令分別安裝mosquitto broker和client,這里我們選用nossl版本,也就是不需要ssl加密,方便測試
opkg?install?mosquitto-nossl
opkg?install?mosquitto-client-nossl
mosquitto服務(wù)
安裝完成后就可以使用broker和client了,首先我們需要啟動mosquitto broker服務(wù),
mosquitto broker服務(wù)配置文件在/etc/mosquitto/目錄中,我們可以修改服務(wù)器相關(guān)信息,比如監(jiān)聽端口號、接口、ip地址等。
root@OpenWrt:~#?ls?/etc/mosquitto/mosquitto.conf?
/etc/mosquitto/mosquitto.conf
root@OpenWrt:~#?
mosquitto客戶端
mosquitto客戶端包含sub和pub兩部分,分別用于訂閱和發(fā)布
訂閱主題:
mosquitto_sub?-h?<MQTT?Broker?IP>?-p?<MQTT?Broker?Port>?-t?<Topic>
其中,是MQTT Broker的IP地址,是MQTT Broker的端口號,是要訂閱的主題名稱。
示例:
mosquitto_sub?-h?192.168.1.1?-p?1883?-t?test/topic
發(fā)布主題:
mosquitto_pub?-h?<MQTT?Broker?IP>?-p?<MQTT?Broker?Port>?-t?<Topic>?-m?<Message>
其中,是MQTT Broker的IP地址,是MQTT Broker的端口號,是要發(fā)布的主題名稱,是要發(fā)布的消息內(nèi)容。
示例:
mosquitto_pub?-h?192.168.1.1?-p?1883?-t?test/topic?-m?"Hello,?MQTT!"
運(yùn)行結(jié)果:
由于訂閱和發(fā)布客戶端都在本地,ip使用localhost地址127.0.0.1
root@OpenWrt:~#?mosquitto_sub?-h?127.0.0.1?-p?1883?-t?test/topic?&
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?test/topic?-m?"hello?MQTT."
root@OpenWrt:~#?hello?MQTT.
在訂閱主題時,我們還可以使用通配符,最常用就是通配符"#",通過"#"可以匹配多級topic
比如訂閱了主題"test/#",則可以收到"test/"開頭的所有topic,比如"test/topic1"、"test/hello"等
以下實(shí)例中分別訂閱了"test/#"和"test/topic1",當(dāng)發(fā)布"test/topic1"消息時,二者都可以收到,而發(fā)布"test/topic2"時只有一個可以收到。
除了通配符"#"之外,還有"$"、"+"等通配符,不在這里詳解。
使用云端公共broker測試
emqx提供了公共免費(fèi)的broker供開發(fā)者測試,注意不要在生產(chǎn)環(huán)境使用,僅供測試
我們可以準(zhǔn)備兩臺不同的設(shè)備,都連接broker.emqx.io,這兩臺設(shè)備可以在不同區(qū)域,通過公網(wǎng)broker可以輕松實(shí)現(xiàn)兩臺設(shè)備通信。
-
客戶端1
客戶端1訂閱openwrt/topic消息
mosquitto_sub?-h?broker.emqx.io?-p?1883?-i?client_001??-t?openwrt/topic
-
客戶端2
發(fā)送一條消息到主題openwrt/topic,這樣客戶端1就可以收到該消息
mosquitto_pub?-h?broker.emqx.io?-p?1883?-t?openwrt/topic?-i?client_002?-m?"hello?client1,?i?am?froms?client2"??
OpenWrt中基于libmosquitto開發(fā)
前面給大家演示了mosquitto客戶端的使用,但命令行客戶端僅供測試使用,我們在開發(fā)過程中需要自定義消息并且能夠?qū)崟r解析消息,而通過命令行就沒那么方便消息的處理了,需要調(diào)用mosquitto底層api接口實(shí)現(xiàn)想要的功能。
libmosquitto庫
在openwrt系統(tǒng)中默認(rèn)集成了mosquitto庫,可以直接依賴調(diào)用。
對應(yīng)依賴的庫為:libmosquitto-nossl
?不支持ssl加密libmosquitto
?支持ssl加密
api接口詳解
mosquitto_lib_init:初始化libmosquitto庫。在使用其他libmosquitto函數(shù)之前,應(yīng)該首先調(diào)用此函數(shù)。
mosquitto_lib_version:獲取libmosquitto庫的版本號信息。
mosquitto_new:創(chuàng)建一個新的mosquitto對象(MQTT客戶端)。
mosquitto_connect:與MQTT代理服務(wù)器建立連接。
mosquitto_disconnect:斷開與MQTT代理服務(wù)器的連接。
mosquitto_publish:向指定主題發(fā)布消息。
mosquitto_subscribe:訂閱一個或多個主題。
mosquitto_unsubscribe:取消訂閱一個或多個主題。
mosquitto_loop_start:啟動一個線程來處理MQTT消息循環(huán)。
mosquitto_loop_forever:開始一個阻塞的循環(huán),處理MQTT消息。
mosquitto_loop:在非阻塞模式下處理MQTT消息。
mosquitto_message_callback_set:設(shè)置用于接收訂閱消息的回調(diào)函數(shù)。
mosquitto_username_pw_set:設(shè)置連接時使用的用戶名和密碼。
mosquitto_tls_set:為MQTT連接啟用SSL/TLS加密。
mosquitto_tls_opts_set:設(shè)置SSL/TLS選項(xiàng),如CA證書、客戶端證書和私鑰等。
mosquitto_tls_insecure_set:設(shè)置是否允許SSL/TLS連接中的不安全選項(xiàng)。
mosquitto_will_set:設(shè)置遺囑消息,即在客戶端異常斷開時發(fā)布的消息。
基于libmosquitto實(shí)現(xiàn)一個消息訂閱程序
源碼?
#include?<unistd.h>
#include?<stdio.h>
#include?<stdlib.h>
#include?<string.h>
#include?<mosquitto.h>
#include?<sys/time.h>
#include?<sys/sysinfo.h>
struct?mosquitto?*g_test_mosq?=?NULL;
void?mqtt_connect_callback(struct?mosquitto?*mosq,?void?*userdata,?int?result)
{
?printf("connect?to?mqtt?server?ok\n");
?if?(MOSQ_ERR_SUCCESS?!=?mosquitto_subscribe(mosq,?NULL,?"openwrt/#",?0))
?{
??printf("sub?topic?openwrt/#?failed...\n");
?}
?else{
??printf("sub?topic?openwrt/#?failed...\n");
?}
}
void?mqtt_disconnect_callback(struct?mosquitto?*mosq,?void?*userdata,?int?result)
{
?if?(result)
??printf("disconnect?%s\n",?mosquitto_connack_string(result));
?else
??printf("disconnect?from?mqtt?server.\n");
}
void?mqtt_sub_callback(struct?mosquitto?*mosq,
????????void?*userdata,
????????int?mid,
????????int?qos_count,
????????const?int?*granted_qos)
{
?printf("sub?callback\n");
}
void?mqtt_msg_callback(struct?mosquitto?*mosq,
????????void?*userdata,
????????struct?mosquitto_message?*message)
{
?printf("callback?recv?mqtt?msg,?topic?=?%s,?payload?=?%s\n",?message->topic,?message->payload);
}
struct?mosquitto?*connect_to_mqtt_server(char?*server_ip)
{
?struct?mosquitto?*mosq?=?NULL;
?int?rc;
?char?mqtt_user[128]?=?{0};
?char?mqtt_pwd[128]?=?{0};
?char?client_id[128]?=?{0};
????struct?timeval?tv;
????gettimeofday(&tv,?NULL);
?mosquitto_lib_init();
?snprintf(client_id,?sizeof(client_id),?"test_%d",?tv.tv_sec);
?printf("connect?to?mqtt?server..client_id=%s\n",?client_id);
?mosq?=?mosquitto_new(client_id,?true,?NULL);
?if?(!mosq)
?{
??return?NULL;
?}
#if?0
?rc?=?mosquitto_username_pw_set(mosq,?"test",?"test");
?if?(rc)
?{
??mosquitto_destroy(mosq);
??return?NULL;
?}
#endif
?mosquitto_connect_callback_set(mosq,?mqtt_connect_callback);
?mosquitto_message_callback_set(mosq,?mqtt_msg_callback);
?mosquitto_subscribe_callback_set(mosq,?mqtt_sub_callback);
?mosquitto_disconnect_callback_set(mosq,?mqtt_disconnect_callback);
?rc?=?mosquitto_connect(mosq,?server_ip,?1883,?30);
?if?(rc)
?{
??printf("Unable?to?connect?mqtt?server?rc=%d\n",?rc);
??mosquitto_destroy(mosq);
??return?NULL;
?}
?return?mosq;
}
int?mqtt_bcast_msg(char?*api,?char?*data,?int?len)
{
?char?topic[128]?=?{0};
?int?mid;
?if?(!api?||?!data?||?len?==?0)
??return?-1;
?if?(!g_test_mosq)
??return?-1;
?sprintf(topic,?"openwrt/%s",?api);
?return?mosquitto_publish(g_test_mosq,?&mid,?topic,?len,?data,?0,?0);
}
int?main(int?argc,?char?*argv[]){
?char?*host?=?NULL;
?if?(argc?<?2){
??host?=?"127.0.0.1";
??printf("use?default?ip:?127.0.0.1\n");
?}
?else{
??host?=?argv[1];
??printf("use?ip:?%s\n",?host);
?}
?g_test_mosq?=?connect_to_mqtt_server(host);
?if?(!g_test_mosq){
??printf("connect?to?server?%s?failed\n",?host);
??exit(0);
?}
?mosquitto_loop_forever(g_test_mosq,?-1,?1);
?mosquitto_destroy(g_test_mosq);
?mosquitto_lib_cleanup();
?return?0;
}
實(shí)例源碼編譯
將源碼包拷貝到openwrt源碼package目錄
完整的OpenWrt源碼包和pdf教程可以在我的知識星球中下載
點(diǎn)擊查看知識星球簡介
開啟mqtt_test宏并生成默認(rèn)依賴配置
echo?"CONFIG_PACKAGE_mqtt_test=y"?>>.config
make?defconfig
編譯
make?package/mqtt_test/compile?V=s
插件安裝:
mqtt_test依賴了libmosquitto庫,而libmosquitto依賴了libcares,所以需要安裝三個插件
-
libcares
-
libmosquitto-nossl
-
mqtt_test
將插件通過winscp或其他工具上傳到openwrt系統(tǒng)中,執(zhí)行以下命令安裝(以X86為例)
opkg?install?libcares_1.18.1-1_x86_64.ipk
opkg?install?libmosquitto-nossl_2.0.15-1_x86_64.ipk
opkg?install?mqtt_test_1.0-1_x86_64.ipk
運(yùn)行:
mqtt_test默認(rèn)連接本地broker,也可以指定ip
運(yùn)行如果出現(xiàn)錯誤,表示服務(wù)器沒有啟動或者參數(shù)異常,請先確認(rèn)mosquitto服務(wù)已經(jīng)啟動。
use?default?ip:?127.0.0.1
connect?to?mqtt?server..client_id=test_1686993389
Unable?to?connect?mqtt?server?rc=14
connect?to?server?127.0.0.1?failed
運(yùn)行成功
root@OpenWrt:~#?mqtt_test?
use?default?ip:?127.0.0.1
connect?to?mqtt?server..client_id=test_1686993598
connect?to?mqtt?server?ok
sub?callback
現(xiàn)在就啟動了一個mqtt客戶端,訂閱了openwrt/#
通過mosquitto_pub工具可以發(fā)送指令到該客戶端,客戶端當(dāng)前處理方式是輸出收到的消息,當(dāng)然實(shí)際開發(fā)是解析指令并執(zhí)行對應(yīng)的命令,比如接收到reboot命令后執(zhí)行重啟。
pub命令如下:
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?openwrt/send_msg?-m?"hello?openwrt"
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?openwrt/send_msg?-m?"你好"
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?openwrt/send_msg?-m?"你好"
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?openwrt/send_msg?-m?"reboot"
root@OpenWrt:~#?mosquitto_pub?-h?127.0.0.1?-p?1883?-t?openwrt/send_msg?-m?"reboot"
客戶端輸出如下:
callback?recv?mqtt?msg,?topic?=?openwrt/send_msg,?payload?=?hello?openwrt
callback?recv?mqtt?msg,?topic?=?openwrt/send_msg,?payload?=?你好
callback?recv?mqtt?msg,?topic?=?openwrt/send_msg,?payload?=?你好
callback?recv?mqtt?msg,?topic?=?openwrt/send_msg,?payload?=?reboot
callback?recv?mqtt?msg,?topic?=?openwrt/send_msg,?payload?=?reboot
如果客戶端連接云端的broker,就可以實(shí)現(xiàn)遠(yuǎn)程操作設(shè)備,比如遠(yuǎn)程重啟設(shè)備、配置下發(fā)等。文章來源:http://www.zghlxwxcb.cn/news/detail-786914.html
總結(jié)
在物聯(lián)網(wǎng)開發(fā)中我們會經(jīng)常用的MQTT協(xié)議,常見的就是邊緣設(shè)備和云端通信,上報實(shí)時狀態(tài)、遠(yuǎn)程管理等,當(dāng)然也可以局域網(wǎng)間通信,實(shí)現(xiàn)節(jié)點(diǎn)間通信,比如可以通過MQTT協(xié)議實(shí)現(xiàn)mesh數(shù)據(jù)同步、AC集中管理等,有了MQTT協(xié)議我們不需要自己通過底層socket實(shí)現(xiàn)私有協(xié)議,可以只關(guān)注業(yè)務(wù)處理,可大大提高程序穩(wěn)定性。當(dāng)然MQTT也有一些缺陷,就是訂閱發(fā)布模型實(shí)現(xiàn)實(shí)時回復(fù)消息比較麻煩,不適合做一些狀態(tài)較復(fù)雜的交互。文章來源地址http://www.zghlxwxcb.cn/news/detail-786914.html
到了這里,關(guān)于基于OpenWrt使用MQTT物聯(lián)網(wǎng)協(xié)議詳解和應(yīng)用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!