目錄
一、MQTT簡介
二、MQTT使用方法
三、MQTT驅(qū)動設(shè)計
四、代碼解析
五、使用過程
六、總結(jié)
一、MQTT簡介
? ? ? ? MQTT因為其輕量、高效和穩(wěn)定的特點,特別適合作為物聯(lián)網(wǎng)系統(tǒng)的數(shù)據(jù)傳輸協(xié)議,已經(jīng)成為物聯(lián)網(wǎng)事實上的通信標(biāo)準(zhǔn)了。關(guān)于協(xié)議的具體內(nèi)容看看這篇文章和官方文檔MQTT協(xié)議詳解(完整版)-CSDN博客,在這里我們主要講解使用方法。
? ? ? ? 作為嵌入式設(shè)備,設(shè)備資源比較緊張,我們這里選用開源庫paho mqtt,開源地址在這兒GitHub - eclipse/paho.mqtt.embedded-c: Paho MQTT C client library for embedded systems. Paho is an Eclipse IoT project (https://iot.eclipse.org/)
? ? ? ? 我們項目里已經(jīng)都整理好了,直接用就行了,具體如下圖所示,從映射文件可以看出,mqtt開源庫大概占用2KB的 ROM,已經(jīng)很輕量化了。這個開源庫的核心作用就是可以幫我們根據(jù)協(xié)議要求組合要發(fā)送的數(shù)據(jù),或者拆解接收到的數(shù)據(jù),而應(yīng)用層不用去太關(guān)心協(xié)議本身的內(nèi)容。
二、MQTT使用方法
? ? ? ? MQTT是以服務(wù)器為中心,客戶端對為對象,話題為關(guān)系紐帶的一種通訊協(xié)議,在這個體系里,凈化器設(shè)備是客戶端,用戶手機(jī)也是客戶端,手機(jī)訂閱凈化器發(fā)布的話題,服務(wù)器就會把凈化器發(fā)布的消息推送給手機(jī);同樣的道理,手機(jī)根據(jù)設(shè)備訂閱的話題來發(fā)布消息,就可以對凈化器設(shè)備進(jìn)行控制了。
????????下圖是凈化器項目的話題,其中11223344是設(shè)備的序列號,對于所有凈化器的數(shù)據(jù)手機(jī)都能收的到,手機(jī)針對某個凈化器的數(shù)據(jù)也只有某個凈化器能接收,其它序列號的設(shè)備收不到。這里面的核心邏輯都是服務(wù)器根據(jù)話題來區(qū)分運(yùn)行的。
三、MQTT驅(qū)動設(shè)計
? ? ? ? MQTT的驅(qū)動應(yīng)該算是比較難的,首先要確定它的地位和作用,如下圖所示,drv_mqtt是作為設(shè)備端mqtt的核心,整合了底層的開源庫、物理層的收發(fā)接口和應(yīng)用層的參數(shù)配置功能,以及自身的連接、收發(fā)、訂閱/取消訂閱等功能。
????????下面進(jìn)入代碼進(jìn)行解析,從頭文件開始,MQTTPacket.h主要包含了mqtt開源庫的功能文件,這個應(yīng)該沒什么問題,下面的ringbuffer.h需要強(qiáng)調(diào)下,它是RT-Thread的功能,叫環(huán)形緩沖區(qū),就是數(shù)據(jù)按順序環(huán)形保存,取出的時候按照先進(jìn)先出的原則,MQTT開源庫需要按順序取出數(shù)據(jù)解析,有這個ringbuffer作為緩存媒介在操作上非常便捷,這也是使用RT-Thread的另一個重要原因了。
? ? ?接下來是宏定義的內(nèi)容,沒什么特殊情況默認(rèn)即可,有需要改變的在user_opt.h中重定義即可,具體的內(nèi)容都有注釋,就不贅述了。
???
? ? ? ? 訂閱話題是個重要組成部分,在這里定義了話題的三個狀態(tài),空閑、訂閱和取消訂閱,取消訂閱一般用不到,特殊情況下會有一些臨時話題,為了緩解資源,可以取消訂閱。結(jié)構(gòu)體里的base_msg_id主要是為了標(biāo)記 訂閱/取消訂閱 時返回的話題,這樣程序才能區(qū)分。
????????
? ? ? ? 最后是最重要的客戶端連接信息了,具體都有注釋,其中用戶名、密碼和客戶端ID都是指針,在應(yīng)用層定義這些信息需要用全局變量或者靜態(tài)變量,才能保證信息的完整性;同樣的,收發(fā)函數(shù)也是采用回調(diào)的方式,在應(yīng)用程根據(jù)不同的物理接口進(jìn)行注冊,這里我們采用的自然是esp8266的收發(fā)函數(shù)了。
四、代碼解析
? ? ? ? 先從初始化開始,主要就是對用戶名、密碼和客戶端ID進(jìn)行賦值。
/*
================================================================================
描述 : 初始化指定MQTT連接
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_init(u8 index, char *usr_name, char *passwd, char *client_id)
{
if(index<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[index];
MQTTPacket_connectData connect_init = MQTTPacket_connectData_initializer;
if((pClient->rb=rt_ringbuffer_create(MQTT_RING_BUFF_SIZE))!=NULL )
{
memcpy(&pClient->condata, &connect_init, sizeof(connect_init));//復(fù)制連接初始化信息
pClient->condata.keepAliveInterval=MQTT_KEEP_TIME;
pClient->condata.username.cstring=usr_name;//用戶名
pClient->condata.password.cstring=passwd;//密碼
pClient->condata.clientID.cstring=client_id;//客戶ID
pClient->is_enable=true;
}
}
}
? ? ? ? 接下來就是連接和訂閱了,在這里就可以很清晰的看到mqtt開源庫的作用了,就是組合連接、訂閱和取消訂閱的報文。MQTT里也有?;罟δ?,這是協(xié)議層的,如果指定時間內(nèi)沒有沒有收到數(shù)據(jù),那么會自己發(fā)個ping請求包來保持連接。
/*
================================================================================
描述 : 連接和訂閱
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_connect(void)
{
static u32 last_sec_time=0;
static u8 make_buff[80]={0};
const int make_size=sizeof(make_buff);
int make_len;
u32 now_sec_time=drv_get_sec_counter();
if(now_sec_time-last_sec_time>=2)
{
static u8 conn_ptr=0;
if(conn_ptr>=MQTT_CONN_NUM)
conn_ptr=0;
MqttClientStruct *pClient=&g_sMqttWork.client_list[conn_ptr];
if(pClient->is_enable)
{
if(pClient->is_connected==false)
{
memset(make_buff, 0, make_size);
make_len=MQTTSerialize_connect(make_buff, make_size, &pClient->condata);//組合連接請求包
if(pClient->mqtt_send!=NULL)
{
// printf("client=%d, mqtt send connect! make_len=%d\n",conn_ptr, make_len);
pClient->mqtt_send(make_buff, make_len);//發(fā)送
}
}
else
{
//訂閱話題
for(u8 i=0; i<MQTT_SUB_NUM; i++)
{
SubPackStruct *pSub=&pClient->sub_list[i];
if(strlen(pSub->sub_topic)>0 && pSub->curr_state!=pSub->dst_state)
{
if(pSub->dst_state==TopicStateSub)//需要訂閱
{
MQTTString topicString = MQTTString_initializer;
int req_qos=1;
topicString.cstring=pSub->sub_topic;
memset(make_buff, 0, make_size);
make_len = MQTTSerialize_subscribe(make_buff, make_size, 0, pSub->base_msg_id, 1, &topicString, &req_qos);//組合訂閱報文
if(pClient->mqtt_send!=NULL)
{
printf("sub topic=%s\n", pSub->sub_topic);
pClient->mqtt_send(make_buff, make_len);//發(fā)送
}
}
else if(pSub->dst_state==TopicStateUnSub)//需要取消訂閱
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring=pSub->sub_topic;
memset(make_buff, 0, make_size);
make_len = MQTTSerialize_unsubscribe(make_buff, make_size, 0, pSub->base_msg_id, 1, &topicString);//組合取消訂閱報文
if(pClient->mqtt_send!=NULL)
{
printf("unsub topic=%s\n", pSub->sub_topic);
pClient->mqtt_send(make_buff, make_len);//發(fā)送
}
}
break;//每次只訂閱一個,避免堵塞
}
}
//超時檢測
u32 det_time=now_sec_time-pClient->keep_time;
if(det_time>=MQTT_KEEP_TIME)
{
printf("mqtt sock_id=%d timeout, close!\n", conn_ptr);
drv_mqtt_close(pClient);//超時關(guān)閉
}
else if(det_time>=MQTT_KEEP_TIME-10)
{
//發(fā)送ping請求,?;? memset(make_buff, 0, make_size);
make_len=MQTTSerialize_pingreq(make_buff, make_size);//組合ping包
if(pClient->mqtt_send!=NULL)
{
// printf("sock=%d, mqtt send ping req! make_len=%d\n",conn_ptr,make_len);
pClient->mqtt_send(make_buff, make_len);//發(fā)送
}
}
}
}
conn_ptr++;
last_sec_time=drv_get_sec_counter();
}
}
? ? ? ? 接收部分的邏輯是MQTTPacket_read函數(shù)調(diào)用回調(diào)函數(shù)pClient->mqtt_recv獲取環(huán)形緩沖區(qū)內(nèi)的數(shù)據(jù)并按照協(xié)議解析,最后根據(jù)解析結(jié)果執(zhí)行相應(yīng)動作,消息類型如下圖所示,常用的是連接回復(fù)、收到發(fā)布數(shù)據(jù)、訂閱回復(fù)、取消訂閱回復(fù)、ping回復(fù)和斷開連接。
/*
================================================================================
描述 : 接收檢查
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_recv_check(void)
{
static u8 make_buff[MQTT_SUB_BUFF_SIZE];
const int make_size=sizeof(make_buff);
int rc;
u8 dup;
int qos;
u8 retained;
u16 msgid;
int payloadlen_in;
u8 *payload_in;
MQTTString receivedTopic;
for(u8 i=0; i<MQTT_CONN_NUM; i++)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[i];
if(pClient->is_enable==true)//啟用
{
rc=MQTTPacket_read(make_buff, make_size, pClient->mqtt_recv);
switch(rc)
{
case CONNACK://連接回復(fù)
{
printf("mqtt_id=%d CONNACK!\n", i);
u8 sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, make_buff, make_size) != 1 || connack_rc != 0)//解析收到的回復(fù)報文
{
drv_mqtt_close(pClient);
printf("mqtt sock_id=%d Unable to connect, return code %d\n",i, connack_rc);
}
else
{
pClient->is_connected=true;
pClient->keep_time=drv_get_sec_counter();//更新時間
printf("mqtt sock_id=%d connect ok!\n", i);
}
break;
}
case PUBREC:
case PUBACK: //發(fā)布回復(fù)
{
// debug("sock_id=%d PUBACK!\n", i);
break;
}
case PUBLISH://收到發(fā)布的消息
{
pClient->keep_time=drv_get_sec_counter();//更新時間
printf("sock_id=%d PUBLISH!\n", i);
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, make_buff, make_size);
char *pTopic=receivedTopic.lenstring.data;
if(g_sMqttWork.mqtt_recv_parse!=NULL)
{
char topic[30]={0};
int len=(char*)payload_in-pTopic;//topic 長度
if(len>sizeof(topic))
{
len=sizeof(topic)-1;
}
memcpy(topic, pTopic, len);
g_sMqttWork.mqtt_recv_parse(i, topic, payload_in, payloadlen_in);//應(yīng)用層數(shù)據(jù)解析
}
break;
}
case SUBACK://訂閱回復(fù)
{
// debug("sock_id=%d SUBACK!\n", i);
// printf_hex("sub buff=", make_buff, 30);
int count, requestedQoSs[1];
MQTTDeserialize_suback(&msgid, 1, &count, requestedQoSs, make_buff, make_size);
// debug("$$$ msgid=0x%04X\n", msgid);
for(u8 k=0; k<MQTT_SUB_NUM; k++)
{
SubPackStruct *pSub=&pClient->sub_list[k];
if(pSub->base_msg_id==msgid)
{
printf("topic=%s sub ok!\n", pSub->sub_topic);
pSub->curr_state=TopicStateSub;
// pSub->subed_time=drv_get_sec_counter();
}
}
break;
}
case UNSUBACK://取消訂閱回復(fù)
{
// debug("sock_id=%d UNSUBACK!\n", i);
MQTTDeserialize_unsuback(&msgid, make_buff, make_size);
// debug("$$$ msgid=0x%04X\n", msgid);
for(u8 k=0; k<MQTT_SUB_NUM; k++)
{
SubPackStruct *pSub=&pClient->sub_list[k];
if(pSub->base_msg_id==msgid)
{
printf("topic=%s unsub ok!\n", pSub->sub_topic);
pSub->curr_state=TopicStateUnSub;
// pSub->subed_time=drv_get_sec_counter();
}
}
break;
}
case PINGRESP://ping回復(fù)
{
pClient->keep_time=drv_get_sec_counter();//更新時間
// debug("sock_id=%d PINGRESP!\n", i);
break;
}
case DISCONNECT://斷開連接
{
printf("mqtt_id=%d DISCONNECT!\n", i);
drv_mqtt_close(pClient);
break;
}
}
}
}
}
? ? ? ? 剩下的就是一些簡單的功能了,比如設(shè)置話題、發(fā)布消息,關(guān)閉連接等等,較為簡單。
/*
================================================================================
描述 : 設(shè)置話題信息
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_set_topic_info(u8 client_id, u8 sub_id, char *topic, u32 base_msg_id, u8 dst_state)
{
if(client_id<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[client_id];
if(sub_id<MQTT_SUB_NUM)
{
SubPackStruct *pSub=&pClient->sub_list[sub_id];
if(strlen(topic)<sizeof(pSub->sub_topic))
{
pSub->curr_state=TopicStateIdel;
pSub->dst_state=dst_state;
pSub->base_msg_id=base_msg_id;
strcpy(pSub->sub_topic, topic);
}
}
}
}
/*
================================================================================
描述 : 設(shè)置話題訂閱狀態(tài)
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_set_topic_state(u8 client_id, u8 sub_id, u8 dst_state)
{
if(client_id<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[client_id];
if(sub_id<MQTT_SUB_NUM)
{
SubPackStruct *pSub=&pClient->sub_list[sub_id];
pSub->dst_state=dst_state;
}
}
}
/*
================================================================================
描述 : MQTT發(fā)布數(shù)據(jù)
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_publish(u8 index, u8 *msg_buff, u16 msg_len, char *topic)
{
static u8 make_buff[MQTT_PUB_BUFF_SIZE]={0};
static const int make_size=sizeof(make_buff);
u16 make_len=0;
if(index<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[index];
if(pClient->is_connected==true)//已經(jīng)連接
{
pClient->msg_id++;
MQTTString topicString = MQTTString_initializer;
topicString.cstring=topic;
make_len = MQTTSerialize_publish(make_buff, make_size, 0, 1,0, pClient->msg_id, topicString, msg_buff, msg_len);//組合發(fā)布報文
if(pClient->mqtt_send!=NULL && make_len>0)
{
int ret=pClient->mqtt_send(make_buff, make_len);//發(fā)送
}
}
}
}
/*
================================================================================
描述 : 關(guān)閉連接
輸入 :
輸出 :
================================================================================
*/
void drv_mqtt_close(MqttClientStruct *pClient)
{
pClient->is_connected=false;
for(u8 i=0; i<MQTT_SUB_NUM; i++)
{
SubPackStruct *pSub=&pClient->sub_list[i];
pSub->curr_state=TopicStateIdel;
// pSub->subed_time=0;
}
pClient->msg_id=0;
pClient->keep_time=0;
}
五、使用過程
? ? ? ? 應(yīng)用層的使用主要就是根據(jù)要求配置信息,首先物理通訊接口先設(shè)置,這里使用esp8266的連接3作為網(wǎng)絡(luò)鏈路,同時注冊接收函數(shù)把數(shù)據(jù)緩存進(jìn)ringbuffer;然后就是MQTT用戶名、密碼、客戶端ID的設(shè)置了;接下來有三個回調(diào)函數(shù)注冊,兩個是物理層的MQTT收發(fā),還有一個是應(yīng)用層的數(shù)據(jù)解析,這里已經(jīng)來到了最后的凈化器項目本身了,由此可以看出,要想代碼好維護(hù),寫代碼之前就要分層設(shè)計,這樣出問題了才好分級排查,再后期自己閱讀時邏輯也更走得通;最后一步就是話題訂閱了,這樣才能收到用戶的控制數(shù)據(jù),每個設(shè)備訂閱話題都不一樣,最后都帶上了自己序列號,這樣用戶端才能針對性控制設(shè)備。
? ? ? ? 下面代碼是凈化器應(yīng)用層的數(shù)據(jù)解析。
/*
================================================================================
描述 : 設(shè)備解析服務(wù)器下發(fā)的數(shù)據(jù)
輸入 :
輸出 :
================================================================================
*/
void app_air_recv_parse(u8 *buff, u16 len)
{
u8 head[2]={0xAA, 0x55};
u8 *pData=memstr(buff, len, head, 2);
if(pData!=NULL)
{
u16 total_len=pData[2]<<8 | pData[3];
u16 crcValue=pData[total_len]<<8 | pData[total_len+1];
if(crcValue==drv_crc16(pData, total_len))
{
pData+=4;
u32 device_sn=pData[0]<<24|pData[1]<<16|pData[2]<<8|pData[3];
pData+=4;
if(device_sn!=g_sAirWork.device_sn)//識別碼確認(rèn)
return;
u8 cmd_type=pData[0];
pData++;
switch(cmd_type)
{
case AIR_CMD_HEART://心跳包
{
break;
}
case AIR_CMD_DATA://數(shù)據(jù)包
{
break;
}
case AIR_CMD_SET_SPEED://設(shè)置風(fēng)速
{
u8 speed=pData[0];
pData+=1;
app_motor_set_speed(speed);
break;
}
case AIR_CMD_SET_SWITCH://設(shè)置開關(guān)
{
u8 state=pData[0];
pData+=1;
g_sAirWork.switch_state=state;
if(state>0)
{
app_motor_set_speed(100);//啟動風(fēng)扇
}
else
{
app_motor_set_speed(0);//停止風(fēng)扇
}
app_air_send_status();
break;
}
}
}
}
}
六、總結(jié)
? ? ? ? MQTT協(xié)議本身較為繁瑣,現(xiàn)在應(yīng)用階段暫時不用太深入,先學(xué)會使用就行,用熟了再去查閱文檔,這樣理解起來更透徹。mqtt的驅(qū)動設(shè)計相較于其他驅(qū)動文件更為復(fù)雜,因為它所牽涉的內(nèi)容更廣,有開源庫、網(wǎng)絡(luò)鏈路、應(yīng)用層參數(shù)配置等等,完整的工程在第二篇文章里有的下載,自行查閱。
本項目的交流QQ群:701889554文章來源:http://www.zghlxwxcb.cn/news/detail-844476.html
???寫于2024-4-1文章來源地址http://www.zghlxwxcb.cn/news/detail-844476.html
到了這里,關(guān)于物聯(lián)網(wǎng)實戰(zhàn)--入門篇之(七)嵌入式-MQTT的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!