MQTT協(xié)議
什么是物聯(lián)網(wǎng)
物聯(lián)網(wǎng)一詞在2009年由Kevin Ashton首次使用。物聯(lián)網(wǎng)指的是通過互聯(lián)網(wǎng)來連接物理設(shè)備。物理設(shè)備可以相互之間通過互聯(lián)網(wǎng)交換數(shù)據(jù)或由其他設(shè)備控制。物聯(lián)網(wǎng)設(shè)備可以是冰箱、交通信號燈、工業(yè)控制系統(tǒng)等電子裝置或電氣設(shè)備。最常見的用例之一是傳感器數(shù)據(jù)的收集,傳輸,分析和顯示。工作人員可利用物聯(lián)網(wǎng)數(shù)據(jù)實現(xiàn)遠(yuǎn)程監(jiān)控臺,或者對超過閾值的數(shù)據(jù)設(shè)置警報。
什么是MQTT
MQTT是一種輕量級消息傳輸協(xié)議,它為物聯(lián)網(wǎng)設(shè)備提供了一種簡單的方法來傳輸數(shù)據(jù)信息。由于MQTT占用網(wǎng)絡(luò)資源小,且適用于遠(yuǎn)程信息傳輸,MQTT在物聯(lián)網(wǎng)(IoT)領(lǐng)域起著重要作用。
MQTT協(xié)議的第一版由Andy Stanford-Clark(IBM)和Arlen Nipper(Cirrus Link)于1999年建立的。該協(xié)議最早應(yīng)用于監(jiān)控穿越沙漠的石油管道。因為設(shè)備是通過衛(wèi)星鏈路連接的,所以當(dāng)時MQTT所運(yùn)行的網(wǎng)絡(luò)帶寬很小,且十分不穩(wěn)定。而MQTT協(xié)議的設(shè)計目標(biāo)也正是為了適用于這類傳輸距離遠(yuǎn),帶寬小,不穩(wěn)定的網(wǎng)絡(luò)環(huán)境。
MQTT運(yùn)行機(jī)制
MQTT協(xié)議的中央通信中樞是MQTT服務(wù)器,它負(fù)責(zé)發(fā)送方和接收方間的信息通訊。每個向MQTT服務(wù)器發(fā)布消息的客戶端都會在發(fā)布消息中包含一個主題。每個想要接收該消息的客戶端都會訂閱該主題。MQTT服務(wù)器在收到客戶端向主題發(fā)布的信息后,會將信息發(fā)送給所有訂閱該主題的客戶端。這種體系結(jié)構(gòu)可實現(xiàn)高度可擴(kuò)展的解決方案,而數(shù)據(jù)生產(chǎn)者和數(shù)據(jù)使用者之間沒有依賴關(guān)系。
關(guān)于如何連接OneNET
OneNEThttps://open.iot.10086.cn/console/
平臺迭代?
現(xiàn)在多協(xié)議只對老用戶開發(fā),新注冊的賬號已經(jīng)沒有多協(xié)議選項了
若需要測試OneNet,可以私聊博主,博主可以提供一個公共賬號進(jìn)行測試?。。?/strong>
阿里云還是可以直接放心使用的 ~ ~ ~
微信公眾號--星之援工作室 發(fā)送關(guān)鍵字(OneNet公共賬號)
????????????????????
例程文件
鏈接:MQTT-Onenet例程文件https://pan.baidu.com/s/11KlzByF5sEBxxFpcOCyp7w?pwd=xzy0%C2%A0
提取碼:xzy0?
1.搭建云平臺設(shè)備
1.1.協(xié)議選擇
?1.2.添加產(chǎn)品
?1.3.添加設(shè)備
?
?到這里我們就基本快完成云平臺的搭建了
?
?
?到此云平臺已搭建完成
2.查看設(shè)備參數(shù)(MQTT連接使用)
產(chǎn)品id
設(shè)備id
設(shè)備鑒權(quán)信息
?
?
3.下位機(jī)代碼編寫
3.1.ESP8266.h
#ifndef _ESP8266_H_
#define _ESP8266_H_
#include "sys.h"
#define REV_OK 0 //接收完成標(biāo)志
#define REV_WAIT 1 //接收未完成標(biāo)志
void ESP8266_Init(void);
void Usart2_Init(unsigned int baud);
void ESP8266_Clear(void);
void ESP8266_SendData(unsigned char *data, unsigned short len);
unsigned char *ESP8266_GetIPD(unsigned short timeOut);
void Usart2_SendString(unsigned char *str, unsigned short len);
#endif
3.2.ESP8266.c
//單片機(jī)頭文件
#include "stm32f10x.h"
//網(wǎng)絡(luò)設(shè)備驅(qū)動
#include "esp8266.h"
/* FreeRTOS頭文件 */
#include "bsp_SysTick.h"
#include "usart.h"
//C庫
#include <string.h>
#include <stdio.h>
#define ESP8266_WIFI_INFO "AT+CWJAP=\"hhh\",\"12345678\"\r\n"
#define ESP8266_ONENET_INFO "AT+CIPSTART=\"TCP\",\"183.230.40.39\",6002\r\n"
unsigned char esp8266_buf[128];
unsigned short esp8266_cnt = 0, esp8266_cntPre = 0;
//==========================================================
// 函數(shù)名稱: ESP8266_Clear
//
// 函數(shù)功能: 清空緩存
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void ESP8266_Clear(void)
{
memset(esp8266_buf, 0, sizeof(esp8266_buf));
esp8266_cnt = 0;
}
//==========================================================
// 函數(shù)名稱: ESP8266_WaitRecive
//
// 函數(shù)功能: 等待接收完成
//
// 入口參數(shù): 無
//
// 返回參數(shù): REV_OK-接收完成 REV_WAIT-接收超時未完成
//
// 說明: 循環(huán)調(diào)用檢測是否接收完成
//==========================================================
_Bool ESP8266_WaitRecive(void)
{
if(esp8266_cnt == 0) //如果接收計數(shù)為0 則說明沒有處于接收數(shù)據(jù)中,所以直接跳出,結(jié)束函數(shù)
return REV_WAIT;
if(esp8266_cnt == esp8266_cntPre) //如果上一次的值和這次相同,則說明接收完畢
{
esp8266_cnt = 0; //清0接收計數(shù)
return REV_OK; //返回接收完成標(biāo)志
}
esp8266_cntPre = esp8266_cnt; //置為相同
return REV_WAIT; //返回接收未完成標(biāo)志
}
//==========================================================
// 函數(shù)名稱: ESP8266_SendCmd
//
// 函數(shù)功能: 發(fā)送命令
//
// 入口參數(shù): cmd:命令
// res:需要檢查的返回指令
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
_Bool ESP8266_SendCmd(char *cmd, char *res, u16 time)
{
Usart2_SendString((unsigned char *)cmd, strlen((const char *)cmd));
while(time--)
{
if(ESP8266_WaitRecive() == REV_OK) //如果收到數(shù)據(jù)
{
if(strstr((const char *)esp8266_buf, res) != NULL) //如果檢索到關(guān)鍵詞
{
ESP8266_Clear(); //清空緩存
return 0;
}
}
Delay_ms(10);
}
return 1;
}
//==========================================================
// 函數(shù)名稱: ESP8266_SendData
//
// 函數(shù)功能: 發(fā)送數(shù)據(jù)
//
// 入口參數(shù): data:數(shù)據(jù)
// len:長度
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void ESP8266_SendData(unsigned char *data, unsigned short len)
{
char cmdBuf[32];
ESP8266_Clear(); //清空接收緩存
sprintf(cmdBuf, "AT+CIPSEND=%d\r\n", len); //發(fā)送命令
if(!ESP8266_SendCmd(cmdBuf, ">", 200)) //收到‘>’時可以發(fā)送數(shù)據(jù)
{
Usart2_SendString(data, len); //發(fā)送設(shè)備連接請求數(shù)據(jù)
}
}
//==========================================================
// 函數(shù)名稱: ESP8266_GetIPD
//
// 函數(shù)功能: 獲取平臺返回的數(shù)據(jù)
//
// 入口參數(shù): 等待的時間(乘以10ms)
//
// 返回參數(shù): 平臺返回的原始數(shù)據(jù)
//
// 說明: 不同網(wǎng)絡(luò)設(shè)備返回的格式不同,需要去調(diào)試
// 如ESP8266的返回格式為 "+IPD,x:yyy" x代表數(shù)據(jù)長度,yyy是數(shù)據(jù)內(nèi)容
//==========================================================
unsigned char *ESP8266_GetIPD(unsigned short timeOut)
{
char *ptrIPD = NULL;
do
{
if(ESP8266_WaitRecive() == REV_OK) //如果接收完成
{
ptrIPD = strstr((char *)esp8266_buf, "IPD,"); //搜索“IPD”頭
if(ptrIPD == NULL) //如果沒找到,可能是IPD頭的延遲,還是需要等待一會,但不會超過設(shè)定的時間
{
//printf("\"IPD\" not found\r\n");
}
else
{
ptrIPD = strchr(ptrIPD, ':'); //找到':'
if(ptrIPD != NULL)
{
ptrIPD++;
return (unsigned char *)(ptrIPD);
}
else
return NULL;
}
}
Delay_ms(5); //延時等待
} while(timeOut--);
return NULL; //超時還未找到,返回空指針
}
//==========================================================
// 函數(shù)名稱: USART2_IRQHandler
//
// 函數(shù)功能: 串口2收發(fā)中斷
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void USART2_IRQHandler(void)
{
if(USART_GetITStatus(USART2, USART_IT_RXNE) != RESET) //接收中斷
{
if(esp8266_cnt >= sizeof(esp8266_buf)) esp8266_cnt = 0; //防止串口被刷爆
esp8266_buf[esp8266_cnt++] = USART2->DR;
USART_ClearFlag(USART2, USART_FLAG_RXNE);
}
}
//==========================================================
// 函數(shù)名稱: ESP8266_Init
//
// 函數(shù)功能: 初始化ESP8266
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void ESP8266_Init(void)
{
GPIO_InitTypeDef GPIO_Initure;
RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, ENABLE);
//ESP8266復(fù)位引腳
GPIO_Initure.GPIO_Mode = GPIO_Mode_Out_PP;
GPIO_Initure.GPIO_Pin = GPIO_Pin_9; //GPIOB1-復(fù)位
GPIO_Initure.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOB, &GPIO_Initure);
GPIO_WriteBit(GPIOB, GPIO_Pin_9, Bit_RESET);
Delay_ms(100);
GPIO_WriteBit(GPIOB, GPIO_Pin_9, Bit_SET);
Delay_ms(100);
ESP8266_Clear();
printf("AT\r\n");
while(ESP8266_SendCmd("AT\r\n\r", "OK", 200))
Delay_ms(500);
printf("CWMODE\r\n");
while(ESP8266_SendCmd("AT+CWMODE=1\r\n", "OK", 200))
Delay_ms(500);
printf("AT+CWDHCP\r\n");
while(ESP8266_SendCmd("AT+CWDHCP=1,1\r\n", "OK", 200))
Delay_ms(500);
printf("CWJAP\r\n");
while(ESP8266_SendCmd(ESP8266_WIFI_INFO, "GOT IP", 200))
Delay_ms(500);
printf("CIPSTART\r\n");
while(ESP8266_SendCmd(ESP8266_ONENET_INFO, "CONNECT", 200))
Delay_ms(500);
printf("ESP8266 Init OK\r\n");
}
/*
************************************************************
* 函數(shù)名稱: Usart2_Init
*
* 函數(shù)功能: 串口2初始化
*
* 入口參數(shù): baud:設(shè)定的波特率
*
* 返回參數(shù): 無
*
* 說明: TX-PA2 RX-PA3
************************************************************
*/
void Usart2_Init(unsigned int baud)
{
GPIO_InitTypeDef gpio_initstruct;
USART_InitTypeDef usart_initstruct;
NVIC_InitTypeDef nvic_initstruct;
RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);
RCC_APB1PeriphClockCmd(RCC_APB1Periph_USART2, ENABLE);
//PA2 TXD
gpio_initstruct.GPIO_Mode = GPIO_Mode_AF_PP;
gpio_initstruct.GPIO_Pin = GPIO_Pin_2;
gpio_initstruct.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOA, &gpio_initstruct);
//PA3 RXD
gpio_initstruct.GPIO_Mode = GPIO_Mode_IN_FLOATING;
gpio_initstruct.GPIO_Pin = GPIO_Pin_3;
gpio_initstruct.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOA, &gpio_initstruct);
usart_initstruct.USART_BaudRate = baud;
usart_initstruct.USART_HardwareFlowControl = USART_HardwareFlowControl_None; //無硬件流控
usart_initstruct.USART_Mode = USART_Mode_Rx | USART_Mode_Tx; //接收和發(fā)送
usart_initstruct.USART_Parity = USART_Parity_No; //無校驗
usart_initstruct.USART_StopBits = USART_StopBits_1; //1位停止位
usart_initstruct.USART_WordLength = USART_WordLength_8b; //8位數(shù)據(jù)位
USART_Init(USART2, &usart_initstruct);
USART_Cmd(USART2, ENABLE); //使能串口
USART_ITConfig(USART2, USART_IT_RXNE, ENABLE); //使能接收中斷
nvic_initstruct.NVIC_IRQChannel = USART2_IRQn;
nvic_initstruct.NVIC_IRQChannelCmd = ENABLE;
nvic_initstruct.NVIC_IRQChannelPreemptionPriority = 0;
nvic_initstruct.NVIC_IRQChannelSubPriority = 0;
NVIC_Init(&nvic_initstruct);
}
/*
************************************************************
* 函數(shù)名稱: Usart_SendString
*
* 函數(shù)功能: 串口數(shù)據(jù)發(fā)送
*
* 入口參數(shù): USARTx:串口組
* str:要發(fā)送的數(shù)據(jù)
* len:數(shù)據(jù)長度
*
* 返回參數(shù): 無
*
* 說明:
************************************************************
*/
void Usart2_SendString(unsigned char *str, unsigned short len)
{
unsigned short count = 0;
for(; count < len; count++)
{
USART_SendData(USART2, *str++); //發(fā)送數(shù)據(jù)
while(USART_GetFlagStatus(USART2, USART_FLAG_TC) == RESET); //等待發(fā)送完成
}
}
3.3.onenet.h
#ifndef _ONENET_H_
#define _ONENET_H_
_Bool OneNet_DevLink(char *Tip);
void OneNet_SendData(char *Tip);
void OneNet_SendCmd(void);
void OneNet_RevPro(unsigned char *cmd);
_Bool OneNet_Subscribe(const char *topics[], unsigned char topic_cnt);
void OneNet_PacketPing(void);
_Bool OneNet_Publish(const char *topic, const char *msg);
void OneNet_HeartBeat(void);
#endif
3.4.onenet.c
//單片機(jī)頭文件
#include "stm32f10x.h"
//網(wǎng)絡(luò)設(shè)備
#include "esp8266.h"
//協(xié)議文件
#include "onenet.h"
#include "mqttkit.h"
#include "./SysTick/bsp_SysTick.h"
//硬件驅(qū)動
#include "usart.h"
//C庫
#include <string.h>
#include <stdio.h>
//全局變量
extern char *Tips ; //主題
#define PROID "515377" //產(chǎn)品ID
#define AUTH_INFO "123" //鑒權(quán)信息
#define DEVID "943294381" //設(shè)備ID
extern unsigned char esp8266_buf[128];
extern unsigned char esp8266_buf[128];
//==========================================================
// 函數(shù)名稱: OneNet_DevLink
//
// 函數(shù)功能: 與onenet創(chuàng)建連接
//
// 入口參數(shù): 無
//
// 返回參數(shù): 1-成功 0-失敗
//
// 說明: 與onenet平臺建立連接
//==========================================================
_Bool OneNet_DevLink(char *Tip)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //協(xié)議包
unsigned char *dataPtr;
_Bool status = 1;
printf("OneNet_DevLink\r\nPROID: %s, AUIF: %s, DEVID:%s\r\n", PROID, AUTH_INFO, DEVID);
if(MQTT_PacketConnect(PROID, AUTH_INFO, DEVID, 256, 0, MQTT_QOS_LEVEL0, NULL, NULL, 0, &mqttPacket) == 0)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //上傳平臺
dataPtr = ESP8266_GetIPD(250); //等待平臺響應(yīng)
if(dataPtr != NULL)
{
if(MQTT_UnPacketRecv(dataPtr) == MQTT_PKT_CONNACK)
{
switch(MQTT_UnPacketConnectAck(dataPtr))
{
case 0:printf("Tips:%s\r\n",Tip);
status = 0;
//LED2_ON; //入網(wǎng)成功
;break;
case 1:printf("WARN: 連接失?。簠f(xié)議錯誤\r\n");break;
case 2:printf("WARN: 連接失敗:非法的clientid\r\n");break;
case 3:printf("WARN: 連接失?。悍?wù)器失敗\r\n");break;
case 4:printf("WARN: 連接失敗:用戶名或密碼錯誤\r\n");break;
case 5:printf("WARN: 連接失?。悍欠ㄦ溄?比如token非法)\r\n");break;
default:printf("ERR: 連接失?。何粗e誤\r\n");break;
}
}
}
MQTT_DeleteBuffer(&mqttPacket); //刪包
}
else
printf("WARN: MQTT_PacketConnect Failed\r\n");
return status;
}
u8 velue0 = 0;
u8 velue1 = 0;
unsigned char OneNet_FillBuf(char *buf)
{
char text[32];
memset(text, 0, sizeof(text));
strcpy(buf, ",;");
memset(text, 0, sizeof(text));
sprintf(text, "Blue_Led,%d;", velue0);
strcat(buf, text);
memset(text, 0, sizeof(text));
sprintf(text, "Yellow_Led,%d;", velue1);
strcat(buf, text);
return strlen(buf);
}
//==========================================================
// 函數(shù)名稱: OneNet_SendData
//
// 函數(shù)功能: 上傳數(shù)據(jù)到平臺
//
// 入口參數(shù): type:發(fā)送數(shù)據(jù)的格式
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void OneNet_SendData(char *Tip)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //協(xié)議包
char buf[128];
short body_len = 0, i = 0;
//printf("Tips:%s\r\n",Tip);
memset(buf, 0, sizeof(buf));
body_len = OneNet_FillBuf(buf); //獲取當(dāng)前需要發(fā)送的數(shù)據(jù)流的總長度
if(body_len)
{
if(MQTT_PacketSaveData(DEVID, body_len, NULL, 5, &mqttPacket) == 0) //封包
{
for(; i < body_len; i++)
mqttPacket._data[mqttPacket._len++] = buf[i];
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //上傳數(shù)據(jù)到平臺
//printf("Send %d Bytes\r\n", mqttPacket._data);
MQTT_DeleteBuffer(&mqttPacket); //刪包
}
else
printf("WARN: EDP_NewBuffer Failed\r\n");
}
}
//==========================================================
// 函數(shù)名稱: OneNet_Publish
//
// 函數(shù)功能: 發(fā)布消息
//
// 入口參數(shù): topic:發(fā)布的主題
// msg:消息內(nèi)容
//
// 返回參數(shù): 0-成功 1-需要重送
//
// 說明:
//==========================================================
_Bool OneNet_Publish(const char *topic, const char *msg)
{
//Delay_ms(100);
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //協(xié)議包
//printf( "Publish Topic: %s, Msg: %s\r\n", topic, msg);
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, topic, msg, strlen(msg), MQTT_QOS_LEVEL2, 0, 1, &mqttPacket) != 1)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //向平臺發(fā)送訂閱請求
MQTT_DeleteBuffer(&mqttPacket); //刪包
}
return 0;
}
//==========================================================
// 函數(shù)名稱: OneNet_Subscribe
//
// 函數(shù)功能: 訂閱
//
// 入口參數(shù): topics:訂閱的topic
// topic_cnt:topic個數(shù)
//
// 返回參數(shù): SEND_TYPE_OK-成功 SEND_TYPE_SUBSCRIBE-需要重發(fā)
//
// 說明:
//==========================================================
_Bool OneNet_Subscribe(const char *topics[], unsigned char topic_cnt)
{
unsigned char i = 0;
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //協(xié)議包
for(; i < topic_cnt; i++)
printf( "Subscribe Topic: %s\r\n", topics[i]);
if(MQTT_PacketSubscribe(MQTT_SUBSCRIBE_ID, MQTT_QOS_LEVEL2, topics, topic_cnt, &mqttPacket) == 0)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //向平臺發(fā)送訂閱請求
MQTT_DeleteBuffer(&mqttPacket); //刪包
}
return 0;
}
//==========================================================
// 函數(shù)名稱: OneNet_HeartBeat
//
// 函數(shù)功能: 心跳檢測
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void OneNet_HeartBeat(void)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
unsigned char sCount = 3;
//---------------------------------------------步驟一:組包---------------------------------------------
if(MQTT_PacketPing(&mqttPacket))
return;
while(sCount--)
{
//---------------------------------------------步驟二:發(fā)送數(shù)據(jù)-----------------------------------------
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//while(OneNet_DevLink(Tips))
---------------------------------------------步驟三:解析返回數(shù)據(jù)-------------------------------------
// if(MQTT_UnPacketRecv(cmd) == MQTT_PKT_PINGRESP)
// {
// printf( "Tips: HeartBeat OK\r\n");
//
// break;
// }
// else
// {
// //ESP8266_Init(); //初始化ESP8266
// printf("Check Device\r\n");
// }
Delay_ms(10);
}
//---------------------------------------------步驟四:刪包---------------------------------------------
MQTT_DeleteBuffer(&mqttPacket);
}
/**
* @brief 檢測字符長度并提取透傳
* @param 無
* @retval 無
*/
char my_strlen(int8 **payload)//?const?????
{
static char cmd[200];
int i=0;
while(**payload)
{
cmd[i] = **payload;
i++;
payload++;
printf("%d\n",cmd[i]);
}
printf("%c\n",cmd[100]);
printf("%c%c%c%c\n",cmd[111],cmd[112],cmd[113],cmd[114]);
return cmd[11];
}
//==========================================================
// 函數(shù)名稱: OneNet_RevPro
//
// 函數(shù)功能: 平臺返回數(shù)據(jù)檢測
//
// 入口參數(shù): dataPtr:平臺返回的數(shù)據(jù)
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void OneNet_RevPro(unsigned char *cmd)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //協(xié)議包
char *req_payload = NULL;
char *cmdid_topic = NULL;
unsigned short topic_len = 0;
unsigned short req_len = 0;
unsigned char type = 0;
unsigned char qos = 0;
static unsigned short pkt_id = 0;
short result = 0;
char *dataPtr = NULL;
char numBuf[10];
int num = 0;
uint16_t ID = 0;
type = MQTT_UnPacketRecv(cmd);
switch(type)
{
case MQTT_PKT_CMD: //命令下發(fā)
result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload, &req_len); //解出topic和消息體
if(result == 0)
{
printf( "cmdid: %s, req: %s, req_len: %d\r\n", cmdid_topic, req_payload, req_len);
if(MQTT_PacketCmdResp(cmdid_topic, req_payload, &mqttPacket) == 0) //命令回復(fù)組包
{
printf( "Tips: Send CmdResp\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //回復(fù)命令
MQTT_DeleteBuffer(&mqttPacket); //刪包
}
}
break;
case MQTT_PKT_PUBLISH: //接收的Publish消息
result = MQTT_UnPacketPublish(cmd, &cmdid_topic, &topic_len, &req_payload, &req_len, &qos, &pkt_id);
if(result == 0)
{
printf( "topic: %s, topic_len: %d, payload: %s, payload_len: %d\r\n",
cmdid_topic, topic_len, req_payload, req_len);
switch(qos)
{
case 1: //收到publish的qos為1,設(shè)備需要回復(fù)Ack
if(MQTT_PacketPublishAck(pkt_id, &mqttPacket) == 0)
{
printf( "Tips: Send PublishAck\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
break;
case 2: //收到publish的qos為2,設(shè)備先回復(fù)Rec
//平臺回復(fù)Rel,設(shè)備再回復(fù)Comp
if(MQTT_PacketPublishRec(pkt_id, &mqttPacket) == 0)
{
printf( "Tips: Send PublishRec\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
break;
default:
break;
}
}
break;
case MQTT_PKT_PUBACK: //發(fā)送Publish消息,平臺回復(fù)的Ack
if(MQTT_UnPacketPublishAck(cmd) == 0)
printf( "Tips: MQTT Publish Send OK\r\n");
break;
case MQTT_PKT_PUBREC: //發(fā)送Publish消息,平臺回復(fù)的Rec,設(shè)備需回復(fù)Rel消息
if(MQTT_UnPacketPublishRec(cmd) == 0)
{
printf( "Tips: Rev PublishRec\r\n");
if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
printf( "Tips: Send PublishRel\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBREL: //收到Publish消息,設(shè)備回復(fù)Rec后,平臺回復(fù)的Rel,設(shè)備需再回復(fù)Comp
if(MQTT_UnPacketPublishRel(cmd, pkt_id) == 0)
{
printf( "Tips: Rev PublishRel\r\n");
if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
printf( "Tips: Send PublishComp\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBCOMP: //發(fā)送Publish消息,平臺返回Rec,設(shè)備回復(fù)Rel,平臺再返回的Comp
if(MQTT_UnPacketPublishComp(cmd) == 0)
{
printf( "Tips: Rev PublishComp\r\n");
}
break;
case MQTT_PKT_SUBACK: //發(fā)送Subscribe消息的Ack
if(MQTT_UnPacketSubscribe(cmd) == 0)
printf( "Tips: MQTT Subscribe OK\r\n");
else
printf( "Tips: MQTT Subscribe Err\r\n");
break;
case MQTT_PKT_UNSUBACK: //發(fā)送UnSubscribe消息的Ack
if(MQTT_UnPacketUnSubscribe(cmd) == 0)
printf( "Tips: MQTT UnSubscribe OK\r\n");
else
printf( "Tips: MQTT UnSubscribe Err\r\n");
break;
default:
result = -1;
break;
}
ESP8266_Clear(); //清空緩存
if(result == -1)
return;
dataPtr = strchr(req_payload, ':'); //搜索':'
if(dataPtr != NULL && result != -1) //如果找到了
{
dataPtr++;
printf("%s",Tips);
while(*dataPtr >= '0' && *dataPtr <= '9') //判斷是否是下發(fā)的命令控制數(shù)據(jù)
{
numBuf[num++] = *dataPtr++;
}
numBuf[num] = 0;
num = atoi((const char *)numBuf); //轉(zhuǎn)為數(shù)值形式
printf( "num:%d\r\n",num);
if(strstr((char *)req_payload, "LED")) //搜索"redled"
{
//LED
//Mqtt_LED(Tips,num);
}
else if(strstr((char *)req_payload, "TOUCH"))
{
//指紋
dataPtr = strchr(req_payload, '-'); //搜索'-'
if(dataPtr != NULL && result != -1) //如果找到了
{
dataPtr++;
while(*dataPtr >= '0' && *dataPtr <= '9') //判斷是否是下發(fā)的命令控制數(shù)據(jù)
{
numBuf[ID++] = *dataPtr++;
}
numBuf[ID] = 0;
ID = atoi((const char *)numBuf); //轉(zhuǎn)為數(shù)值形式
printf("ID:%d\r\n",ID);
//Mqtt_Task( Tips, num, ID);
}else{
//Mqtt_Task(Tips, num, ID);
}
}
else if(strstr((char *)req_payload, "DOOR"))
{
//步進(jìn)電機(jī)
//Door(Tips,num);
}
}
if(type == MQTT_PKT_CMD || type == MQTT_PKT_PUBLISH)
{
MQTT_FreeBuffer(cmdid_topic);
MQTT_FreeBuffer(req_payload);
}
}
最后MQTT協(xié)議使用引用他人
3.5.MqttKit.h
#ifndef _MQTTKIT_H_
#define _MQTTKIT_H_
#include "Common.h"
//=============================配置==============================
//===========可以提供RTOS的內(nèi)存管理方案,也可以使用C庫的=========
//RTOS
#include <stdlib.h>
#define MQTT_MallocBuffer malloc
#define MQTT_FreeBuffer free
//==========================================================
#define MOSQ_MSB(A) (uint8)((A & 0xFF00) >> 8)
#define MOSQ_LSB(A) (uint8)(A & 0x00FF)
/*--------------------------------內(nèi)存分配方案標(biāo)志--------------------------------*/
#define MEM_FLAG_NULL 0
#define MEM_FLAG_ALLOC 1
#define MEM_FLAG_STATIC 2
typedef struct Buffer
{
uint8 *_data; //協(xié)議數(shù)據(jù)
uint32 _len; //寫入的數(shù)據(jù)長度
uint32 _size; //緩存總大小
uint8 _memFlag; //內(nèi)存使用的方案:0-未分配 1-使用的動態(tài)分配 2-使用的固定內(nèi)存
} MQTT_PACKET_STRUCTURE;
/*--------------------------------固定頭部消息類型--------------------------------*/
enum MqttPacketType
{
MQTT_PKT_CONNECT = 1, /**< 連接請求數(shù)據(jù)包 */
MQTT_PKT_CONNACK, /**< 連接確認(rèn)數(shù)據(jù)包 */
MQTT_PKT_PUBLISH, /**< 發(fā)布數(shù)據(jù)數(shù)據(jù)包 */
MQTT_PKT_PUBACK, /**< 發(fā)布確認(rèn)數(shù)據(jù)包 */
MQTT_PKT_PUBREC, /**< 發(fā)布數(shù)據(jù)已接收數(shù)據(jù)包,Qos 2時,回復(fù)MQTT_PKT_PUBLISH */
MQTT_PKT_PUBREL, /**< 發(fā)布數(shù)據(jù)釋放數(shù)據(jù)包, Qos 2時,回復(fù)MQTT_PKT_PUBREC */
MQTT_PKT_PUBCOMP, /**< 發(fā)布完成數(shù)據(jù)包, Qos 2時,回復(fù)MQTT_PKT_PUBREL */
MQTT_PKT_SUBSCRIBE, /**< 訂閱數(shù)據(jù)包 */
MQTT_PKT_SUBACK, /**< 訂閱確認(rèn)數(shù)據(jù)包 */
MQTT_PKT_UNSUBSCRIBE, /**< 取消訂閱數(shù)據(jù)包 */
MQTT_PKT_UNSUBACK, /**< 取消訂閱確認(rèn)數(shù)據(jù)包 */
MQTT_PKT_PINGREQ, /**< ping 數(shù)據(jù)包 */
MQTT_PKT_PINGRESP, /**< ping 響應(yīng)數(shù)據(jù)包 */
MQTT_PKT_DISCONNECT, /**< 斷開連接數(shù)據(jù)包 */
//新增
MQTT_PKT_CMD /**< 命令下發(fā)數(shù)據(jù)包 */
};
/*--------------------------------MQTT QOS等級--------------------------------*/
enum MqttQosLevel
{
MQTT_QOS_LEVEL0, /**< 最多發(fā)送一次 */
MQTT_QOS_LEVEL1, /**< 最少發(fā)送一次 */
MQTT_QOS_LEVEL2 /**< 只發(fā)送一次 */
};
/*--------------------------------MQTT 連接請求標(biāo)志位,內(nèi)部使用--------------------------------*/
enum MqttConnectFlag
{
MQTT_CONNECT_CLEAN_SESSION = 0x02,
MQTT_CONNECT_WILL_FLAG = 0x04,
MQTT_CONNECT_WILL_QOS0 = 0x00,
MQTT_CONNECT_WILL_QOS1 = 0x08,
MQTT_CONNECT_WILL_QOS2 = 0x10,
MQTT_CONNECT_WILL_RETAIN = 0x20,
MQTT_CONNECT_PASSORD = 0x40,
MQTT_CONNECT_USER_NAME = 0x80
};
/*--------------------------------消息的packet ID,可自定義--------------------------------*/
#define MQTT_PUBLISH_ID 10
#define MQTT_SUBSCRIBE_ID 20
#define MQTT_UNSUBSCRIBE_ID 30
/*--------------------------------刪包--------------------------------*/
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------解包--------------------------------*/
uint8 MQTT_UnPacketRecv(uint8 *dataPtr);
/*--------------------------------登錄組包--------------------------------*/
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,
uint16 cTime, uint1 clean_session, uint1 qos,
const int8 *will_topic, const int8 *will_msg, int32 will_retain,
MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------斷開連接組包--------------------------------*/
uint1 MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------連接響應(yīng)解包--------------------------------*/
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data);
/*--------------------------------數(shù)據(jù)點上傳組包--------------------------------*/
uint1 MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------二進(jìn)制文件上傳組包--------------------------------*/
uint1 MQTT_PacketSaveBinData(const int8 *name, int16 file_len, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------命令下發(fā)解包--------------------------------*/
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len);
/*--------------------------------命令回復(fù)組包--------------------------------*/
uint1 MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------訂閱主題組包--------------------------------*/
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------訂閱主題回復(fù)解包--------------------------------*/
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data);
/*--------------------------------取消訂閱組包--------------------------------*/
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------取消訂閱回復(fù)解包--------------------------------*/
uint1 MQTT_UnPacketUnSubscribe(uint8 *rev_data);
/*--------------------------------發(fā)布主題組包--------------------------------*/
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,
const int8 *payload, uint32 payload_len,
enum MqttQosLevel qos, int32 retain, int32 own,
MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------發(fā)布消息回復(fù)解包--------------------------------*/
uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id);
/*--------------------------------發(fā)布消息的Ack組包--------------------------------*/
uint1 MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------發(fā)布消息的Ack解包--------------------------------*/
uint1 MQTT_UnPacketPublishAck(uint8 *rev_data);
/*--------------------------------發(fā)布消息的Rec組包--------------------------------*/
uint1 MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------發(fā)布消息的Rec解包--------------------------------*/
uint1 MQTT_UnPacketPublishRec(uint8 *rev_data);
/*--------------------------------發(fā)布消息的Rel組包--------------------------------*/
uint1 MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------發(fā)布消息的Rel解包--------------------------------*/
uint1 MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id);
/*--------------------------------發(fā)布消息的Comp組包--------------------------------*/
uint1 MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------發(fā)布消息的Comp解包--------------------------------*/
uint1 MQTT_UnPacketPublishComp(uint8 *rev_data);
/*--------------------------------心跳請求組包--------------------------------*/
uint1 MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket);
#endif
3.6.MqttKit.c
/**
************************************************************
************************************************************
************************************************************
* 文件名: MqttKit.c
*
* 作者: 張繼瑞
*
* 日期: 2018-04-27
*
* 版本: V1.6
*
* 說明: MQTT協(xié)議
*
* 修改記錄: V1.1:解決MQTT_PacketSubscribe訂閱不為2個topic
* 個數(shù)時協(xié)議錯誤的bug
* V1.2:修復(fù)MQTT_PacketCmdResp的bug
* V1.3:將strncpy替換為memcpy,解決潛在bug
* V1.4:修復(fù) MQTT_PacketPublishAck
* MQTT_PacketPublishRel
* 函數(shù)封包錯誤的bug
* V1.5:增加 MQTT_UnPacketCmd
* MQTT_UnPacketPublish
* 接口對消息內(nèi)容長度的提取參數(shù)
* V1.6:增加二進(jìn)制文件上傳接口
************************************************************
************************************************************
************************************************************
**/
//協(xié)議頭文件
#include "MqttKit.h"
//C庫
#include <string.h>
#include <stdio.h>
#define CMD_TOPIC_PREFIX "$creq"
//==========================================================
// 函數(shù)名稱: EDP_NewBuffer
//
// 函數(shù)功能: 申請內(nèi)存
//
// 入口參數(shù): edpPacket:包結(jié)構(gòu)體
// size:大小
//
// 返回參數(shù): 無
//
// 說明: 1.可使用動態(tài)分配來分配內(nèi)存
// 2.可使用局部或全局?jǐn)?shù)組來指定內(nèi)存
//==========================================================
void MQTT_NewBuffer(MQTT_PACKET_STRUCTURE *mqttPacket, uint32 size)
{
uint32 i = 0;
if(mqttPacket->_data == NULL)
{
mqttPacket->_memFlag = MEM_FLAG_ALLOC;
mqttPacket->_data = (uint8 *)MQTT_MallocBuffer(size);
if(mqttPacket->_data != NULL)
{
mqttPacket->_len = 0;
mqttPacket->_size = size;
for(; i < mqttPacket->_size; i++)
mqttPacket->_data[i] = 0;
}
}
else
{
mqttPacket->_memFlag = MEM_FLAG_STATIC;
for(; i < mqttPacket->_size; i++)
mqttPacket->_data[i] = 0;
mqttPacket->_len = 0;
if(mqttPacket->_size < size)
mqttPacket->_data = NULL;
}
}
//==========================================================
// 函數(shù)名稱: MQTT_DeleteBuffer
//
// 函數(shù)功能: 釋放數(shù)據(jù)內(nèi)存
//
// 入口參數(shù): edpPacket:包結(jié)構(gòu)體
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket)
{
if(mqttPacket->_memFlag == MEM_FLAG_ALLOC)
MQTT_FreeBuffer(mqttPacket->_data);
mqttPacket->_data = NULL;
mqttPacket->_len = 0;
mqttPacket->_size = 0;
mqttPacket->_memFlag = MEM_FLAG_NULL;
}
int32 MQTT_DumpLength(size_t len, uint8 *buf)
{
int32 i = 0;
for(i = 1; i <= 4; ++i)
{
*buf = len % 128;
len >>= 7;
if(len > 0)
{
*buf |= 128;
++buf;
}
else
{
return i;
}
}
return -1;
}
int32 MQTT_ReadLength(const uint8 *stream, int32 size, uint32 *len)
{
int32 i;
const uint8 *in = stream;
uint32 multiplier = 1;
*len = 0;
for(i = 0; i < size; ++i)
{
*len += (in[i] & 0x7f) * multiplier;
if(!(in[i] & 0x80))
{
return i + 1;
}
multiplier <<= 7;
if(multiplier >= 2097152) //128 * *128 * *128
{
return -2; // error, out of range
}
}
return -1; // not complete
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketRecv
//
// 函數(shù)功能: MQTT數(shù)據(jù)接收類型判斷
//
// 入口參數(shù): dataPtr:接收的數(shù)據(jù)指針
//
// 返回參數(shù): 0-成功 其他-失敗原因
//
// 說明:
//==========================================================
uint8 MQTT_UnPacketRecv(uint8 *dataPtr)
{
uint8 status = 255;
uint8 type = dataPtr[0] >> 4; //類型檢查
if(type < 1 || type > 14)
return status;
if(type == MQTT_PKT_PUBLISH)
{
uint8 *msgPtr;
uint32 remain_len = 0;
msgPtr = dataPtr + MQTT_ReadLength(dataPtr + 1, 4, &remain_len) + 1;
if(remain_len < 2 || dataPtr[0] & 0x01) //retain
return 255;
if(remain_len < ((uint16)msgPtr[0] << 8 | msgPtr[1]) + 2)
return 255;
if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下發(fā)
status = MQTT_PKT_CMD;
else
status = MQTT_PKT_PUBLISH;
}
else
status = type;
return status;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketConnect
//
// 函數(shù)功能: 連接消息組包
//
// 入口參數(shù): user:用戶名:產(chǎn)品ID
// password:密碼:鑒權(quán)信息或apikey
// devid:設(shè)備ID
// cTime:連接保持時間
// clean_session:離線消息清除標(biāo)志
// qos:重發(fā)標(biāo)志
// will_topic:異常離線topic
// will_msg:異常離線消息
// will_retain:消息推送標(biāo)志
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,
uint16 cTime, uint1 clean_session, uint1 qos,
const int8 *will_topic, const int8 *will_msg, int32 will_retain,
MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint8 flags = 0;
uint8 will_topic_len = 0;
uint16 total_len = 15;
int16 len = 0, devid_len = strlen(devid);
if(!devid)
return 1;
total_len += devid_len + 2;
//斷線后,是否清理離線消息:1-清理 0-不清理--------------------------------------------
if(clean_session)
{
flags |= MQTT_CONNECT_CLEAN_SESSION;
}
//異常掉線情況下,服務(wù)器發(fā)布的topic------------------------------------------------------
if(will_topic)
{
flags |= MQTT_CONNECT_WILL_FLAG;
will_topic_len = strlen(will_topic);
total_len += 4 + will_topic_len + strlen(will_msg);
}
//qos級別--主要用于PUBLISH(發(fā)布態(tài))消息的,保證消息傳遞的次數(shù)-----------------------------
switch((unsigned char)qos)
{
case MQTT_QOS_LEVEL0:
flags |= MQTT_CONNECT_WILL_QOS0; //最多一次
break;
case MQTT_QOS_LEVEL1:
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS1); //最少一次
break;
case MQTT_QOS_LEVEL2:
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS2); //只有一次
break;
default:
return 2;
}
//主要用于PUBLISH(發(fā)布態(tài))的消息,表示服務(wù)器要保留這次推送的信息,如果有新的訂閱者出現(xiàn),就把這消息推送給它。如果不設(shè)那么推送至當(dāng)前訂閱的就釋放了
if(will_retain)
{
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_RETAIN);
}
//賬號為空 密碼為空---------------------------------------------------------------------
if(!user || !password)
{
return 3;
}
flags |= MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSORD;
total_len += strlen(user) + strlen(password) + 4;
//分配內(nèi)存-----------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, total_len);
if(mqttPacket->_data == NULL)
return 4;
memset(mqttPacket->_data, 0, total_len);
/*************************************固定頭部***********************************************/
//固定頭部----------------------連接請求類型---------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_CONNECT << 4;
//固定頭部----------------------剩余長度值-----------------------------------------------
len = MQTT_DumpLength(total_len - 5, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 5;
}
else
mqttPacket->_len += len;
/*************************************可變頭部***********************************************/
//可變頭部----------------------協(xié)議名長度 和 協(xié)議名--------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 0;
mqttPacket->_data[mqttPacket->_len++] = 4;
mqttPacket->_data[mqttPacket->_len++] = 'M';
mqttPacket->_data[mqttPacket->_len++] = 'Q';
mqttPacket->_data[mqttPacket->_len++] = 'T';
mqttPacket->_data[mqttPacket->_len++] = 'T';
//可變頭部----------------------protocol level 4-----------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 4;
//可變頭部----------------------連接標(biāo)志(該函數(shù)開頭處理的數(shù)據(jù))-----------------------------
mqttPacket->_data[mqttPacket->_len++] = flags;
//可變頭部----------------------保持連接的時間(秒)----------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(cTime);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(cTime);
/*************************************消息體************************************************/
//消息體----------------------------devid長度、devid-------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(devid_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(devid_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, devid, devid_len);
mqttPacket->_len += devid_len;
//消息體----------------------------will_flag 和 will_msg---------------------------------
if(flags & MQTT_CONNECT_WILL_FLAG)
{
unsigned short mLen = 0;
if(!will_msg)
will_msg = "";
mLen = strlen(will_topic);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_topic, mLen);
mqttPacket->_len += mLen;
mLen = strlen(will_msg);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_msg, mLen);
mqttPacket->_len += mLen;
}
//消息體----------------------------use---------------------------------------------------
if(flags & MQTT_CONNECT_USER_NAME)
{
unsigned short user_len = strlen(user);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(user_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(user_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, user, user_len);
mqttPacket->_len += user_len;
}
//消息體----------------------------password----------------------------------------------
if(flags & MQTT_CONNECT_PASSORD)
{
unsigned short psw_len = strlen(password);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(psw_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(psw_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, password, psw_len);
mqttPacket->_len += psw_len;
}
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketDisConnect
//
// 函數(shù)功能: 斷開連接消息組包
//
// 入口參數(shù): mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 2);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_DISCONNECT << 4;
//固定頭部----------------------剩余長度值-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 0;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketConnectAck
//
// 函數(shù)功能: 連接消息解包
//
// 入口參數(shù): rev_data:接收的數(shù)據(jù)
//
// 返回參數(shù): 1、255-失敗 其他-平臺的返回碼
//
// 說明:
//==========================================================
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == 0 || rev_data[2] == 1)
return rev_data[3];
else
return 255;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketSaveData
//
// 函數(shù)功能: 數(shù)據(jù)點上傳組包
//
// 入口參數(shù): devid:設(shè)備ID(可為空)
// send_buf:json緩存buf
// send_len:json總長
// type_bin_head:bin文件的消息頭
// type:類型
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket)
{
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", NULL, send_len + 3, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
{
mqttPacket->_data[mqttPacket->_len++] = type; //類型
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(send_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(send_len);
}
else
return 1;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketSaveBinData
//
// 函數(shù)功能: 為禁止文件上傳組包
//
// 入口參數(shù): name:數(shù)據(jù)流名字
// file_len:文件長度
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_PacketSaveBinData(const int8 *name, int16 file_len, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint1 result = 1;
int8 *bin_head = NULL;
uint8 bin_head_len = 0;
int8 *payload = NULL;
int32 payload_size = 0;
bin_head = (int8 *)MQTT_MallocBuffer(13 + strlen(name));
if(bin_head == NULL)
return result;
sprintf(bin_head, "{\"ds_id\":\"%s\"}", name);
bin_head_len = strlen(bin_head);
payload_size = 7 + bin_head_len + file_len;
payload = (int8 *)MQTT_MallocBuffer(payload_size - file_len);
if(payload == NULL)
{
MQTT_FreeBuffer(bin_head);
return result;
}
payload[0] = 2; //類型
payload[1] = MOSQ_MSB(bin_head_len);
payload[2] = MOSQ_LSB(bin_head_len);
memcpy(payload + 3, bin_head, bin_head_len);
payload[bin_head_len + 3] = (file_len >> 24) & 0xFF;
payload[bin_head_len + 4] = (file_len >> 16) & 0xFF;
payload[bin_head_len + 5] = (file_len >> 8) & 0xFF;
payload[bin_head_len + 6] = file_len & 0xFF;
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", payload, payload_size, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
result = 0;
MQTT_FreeBuffer(bin_head);
MQTT_FreeBuffer(payload);
return result;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketCmd
//
// 函數(shù)功能: 命令下發(fā)解包
//
// 入口參數(shù): rev_data:接收的數(shù)據(jù)指針
// cmdid:cmdid-uuid
// req:命令
//
// 返回參數(shù): 0-成功 其他-失敗原因
//
// 說明:
//==========================================================
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len)
{
int8 *dataPtr = strchr((int8 *)rev_data + 6, '/'); //加6是跳過頭信息
uint32 remain_len = 0;
if(dataPtr == NULL) //未找到'/'
return 1;
dataPtr++; //跳過'/'
MQTT_ReadLength(rev_data + 1, 4, &remain_len); //讀取剩余字節(jié)
*cmdid = (int8 *)MQTT_MallocBuffer(37); //cmdid固定36字節(jié),多分配一個結(jié)束符的位置
if(*cmdid == NULL)
return 2;
memset(*cmdid, 0, 37); //全部清零
memcpy(*cmdid, (const int8 *)dataPtr, 36); //復(fù)制cmdid
dataPtr += 36;
*req_len = remain_len - 44; //命令長度 = 剩余長度(remain_len) - 2 - 5($creq) - 1(\) - cmdid長度
*req = (int8 *)MQTT_MallocBuffer(*req_len + 1); //分配命令長度+1
if(*req == NULL)
{
MQTT_FreeBuffer(*cmdid);
return 3;
}
memset(*req, 0, *req_len + 1); //清零
memcpy(*req, (const int8 *)dataPtr, *req_len); //復(fù)制命令
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketCmdResp
//
// 函數(shù)功能: 命令回復(fù)組包
//
// 入口參數(shù): cmdid:cmdid
// req:命令
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint16 cmdid_len = strlen(cmdid);
uint16 req_len = strlen(req);
_Bool status = 0;
int8 *payload = MQTT_MallocBuffer(cmdid_len + 6);
if(payload == NULL)
return 1;
memset(payload, 0, cmdid_len + 6);
memcpy(payload, "$crsp/", 6);
strncat(payload, cmdid, cmdid_len);
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, payload, req, strlen(req), MQTT_QOS_LEVEL0, 0, 1, mqttPacket) == 0)
status = 0;
else
status = 1;
MQTT_FreeBuffer(payload);
return status;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketSubscribe
//
// 函數(shù)功能: Subscribe消息組包
//
// 入口參數(shù): pkt_id:pkt_id
// qos:消息重發(fā)次數(shù)
// topics:訂閱的消息
// topics_cnt:訂閱的消息個數(shù)
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 topic_len = 0, remain_len = 0;
int16 len = 0;
uint8 i = 0;
if(pkt_id == 0)
return 1;
//計算topic長度-------------------------------------------------------------------------
for(; i < topics_cnt; i++)
{
if(topics[i] == NULL)
return 2;
topic_len += strlen(topics[i]);
}
//2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------
remain_len = 2 + 3 * topics_cnt + topic_len;
//分配內(nèi)存------------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, remain_len + 5);
if(mqttPacket->_data == NULL)
return 3;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4 | 0x02;
//固定頭部----------------------剩余長度值-----------------------------------------------
len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 4;
}
else
mqttPacket->_len += len;
/*************************************payload***********************************************/
//payload----------------------pkt_id---------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
//payload----------------------topic_name-----------------------------------------------
for(i = 0; i < topics_cnt; i++)
{
topic_len = strlen(topics[i]);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
mqttPacket->_len += topic_len;
mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;
}
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketSubscrebe
//
// 函數(shù)功能: Subscribe的回復(fù)消息解包
//
// 入口參數(shù): rev_data:接收到的信息
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data)
{
uint8 result = 255;
if(rev_data[2] == MOSQ_MSB(MQTT_SUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_SUBSCRIBE_ID))
{
switch(rev_data[4])
{
case 0x00:
case 0x01:
case 0x02:
//MQTT Subscribe OK
result = 0;
break;
case 0x80:
//MQTT Subscribe Failed
result = 1;
break;
default:
//MQTT Subscribe UnKnown Err
result = 2;
break;
}
}
return result;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketUnSubscribe
//
// 函數(shù)功能: UnSubscribe消息組包
//
// 入口參數(shù): pkt_id:pkt_id
// qos:消息重發(fā)次數(shù)
// topics:訂閱的消息
// topics_cnt:訂閱的消息個數(shù)
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 topic_len = 0, remain_len = 0;
int16 len = 0;
uint8 i = 0;
if(pkt_id == 0)
return 1;
//計算topic長度-------------------------------------------------------------------------
for(; i < topics_cnt; i++)
{
if(topics[i] == NULL)
return 2;
topic_len += strlen(topics[i]);
}
//2 bytes packet id, 2 bytes topic length + topic + 1 byte reserve---------------------
remain_len = 2 + (topics_cnt << 1) + topic_len;
//分配內(nèi)存------------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, remain_len + 5);
if(mqttPacket->_data == NULL)
return 3;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_UNSUBSCRIBE << 4 | 0x02;
//固定頭部----------------------剩余長度值-----------------------------------------------
len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 4;
}
else
mqttPacket->_len += len;
/*************************************payload***********************************************/
//payload----------------------pkt_id---------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
//payload----------------------topic_name-----------------------------------------------
for(i = 0; i < topics_cnt; i++)
{
topic_len = strlen(topics[i]);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
mqttPacket->_len += topic_len;
}
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketUnSubscribe
//
// 函數(shù)功能: UnSubscribe的回復(fù)消息解包
//
// 入口參數(shù): rev_data:接收到的信息
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint1 MQTT_UnPacketUnSubscribe(uint8 *rev_data)
{
uint1 result = 1;
if(rev_data[2] == MOSQ_MSB(MQTT_UNSUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_UNSUBSCRIBE_ID))
{
result = 0;
}
return result;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPublish
//
// 函數(shù)功能: Pulish消息組包
//
// 入口參數(shù): pkt_id:pkt_id
// topic:發(fā)布的topic
// payload:消息體
// payload_len:消息體長度
// qos:重發(fā)次數(shù)
// retain:離線消息推送
// own:
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 其他-失敗
//
// 說明:
//==========================================================
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,
const int8 *payload, uint32 payload_len,
enum MqttQosLevel qos, int32 retain, int32 own,
MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 total_len = 0, topic_len = 0;
uint32 data_len = 0;
int32 len = 0;
uint8 flags = 0;
//pkt_id檢查----------------------------------------------------------------------------
if(pkt_id == 0)
return 1;
//$dp為系統(tǒng)上傳數(shù)據(jù)點的指令--------------------------------------------------------------
for(topic_len = 0; topic[topic_len] != '\0'; ++topic_len)
{
if((topic[topic_len] == '#') || (topic[topic_len] == '+'))
return 2;
}
//Publish消息---------------------------------------------------------------------------
flags |= MQTT_PKT_PUBLISH << 4;
//retain標(biāo)志----------------------------------------------------------------------------
if(retain)
flags |= 0x01;
//總長度--------------------------------------------------------------------------------
total_len = topic_len + payload_len + 2;
//qos級別--主要用于PUBLISH(發(fā)布態(tài))消息的,保證消息傳遞的次數(shù)-----------------------------
switch(qos)
{
case MQTT_QOS_LEVEL0:
flags |= MQTT_CONNECT_WILL_QOS0; //最多一次
break;
case MQTT_QOS_LEVEL1:
flags |= 0x02; //最少一次
total_len += 2;
break;
case MQTT_QOS_LEVEL2:
flags |= 0x04; //只有一次
total_len += 2;
break;
default:
return 3;
}
//分配內(nèi)存------------------------------------------------------------------------------
if(payload[0] == 2)
{
uint32 data_len_t = 0;
while(payload[data_len_t++] != '}');
data_len_t -= 3;
data_len = data_len_t + 7;
data_len_t = payload_len - data_len;
MQTT_NewBuffer(mqttPacket, total_len + 3 - data_len_t);
if(mqttPacket->_data == NULL)
return 4;
memset(mqttPacket->_data, 0, total_len + 3 - data_len_t);
}
else
{
MQTT_NewBuffer(mqttPacket, total_len + 3);
if(mqttPacket->_data == NULL)
return 4;
memset(mqttPacket->_data, 0, total_len + 3);
}
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = flags;
//固定頭部----------------------剩余長度值-----------------------------------------------
len = MQTT_DumpLength(total_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 5;
}
else
mqttPacket->_len += len;
/*************************************可變頭部***********************************************/
//可變頭部----------------------寫入topic長度、topic-------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topic, topic_len);
mqttPacket->_len += topic_len;
if(qos != MQTT_QOS_LEVEL0)
{
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
}
//可變頭部----------------------寫入payload----------------------------------------------
if(payload != NULL)
{
if(payload[0] == 2)
{
memcpy((int8 *)mqttPacket->_data + mqttPacket->_len, payload, data_len);
mqttPacket->_len += data_len;
}
else
{
memcpy((int8 *)mqttPacket->_data + mqttPacket->_len, payload, payload_len);
mqttPacket->_len += payload_len;
}
}
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketPublish
//
// 函數(shù)功能: Publish消息解包
//
// 入口參數(shù): flags:MQTT相關(guān)標(biāo)志信息
// pkt:指向可變頭部
// size:固定頭部中的剩余長度信息
//
// 返回參數(shù): 0-成功 其他-失敗原因
//
// 說明:
//==========================================================
uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id)
{
const int8 flags = rev_data[0] & 0x0F;
uint8 *msgPtr;
uint32 remain_len = 0;
const int8 dup = flags & 0x08;
*qos = (flags & 0x06) >> 1;
msgPtr = rev_data + MQTT_ReadLength(rev_data + 1, 4, &remain_len) + 1;
if(remain_len < 2 || flags & 0x01) //retain
return 255;
*topic_len = (uint16)msgPtr[0] << 8 | msgPtr[1];
if(remain_len < *topic_len + 2)
return 255;
if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下發(fā)
return MQTT_PKT_CMD;
switch(*qos)
{
case MQTT_QOS_LEVEL0: // qos0 have no packet identifier
if(0 != dup)
return 255;
*topic = MQTT_MallocBuffer(*topic_len + 1); //為topic分配內(nèi)存
if(*topic == NULL)
return 255;
memset(*topic, 0, *topic_len + 1);
memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //復(fù)制數(shù)據(jù)
*payload_len = remain_len - 2 - *topic_len; //為payload分配內(nèi)存
*payload = MQTT_MallocBuffer(*payload_len + 1);
if(*payload == NULL) //如果失敗
{
MQTT_FreeBuffer(*topic); //則需要把topic的內(nèi)存釋放掉
return 255;
}
memset(*payload, 0, *payload_len + 1);
memcpy(*payload, (int8 *)msgPtr + 2 + *topic_len, *payload_len);
break;
case MQTT_QOS_LEVEL1:
case MQTT_QOS_LEVEL2:
if(*topic_len + 2 > remain_len)
return 255;
*pkt_id = (uint16)msgPtr[*topic_len + 2] << 8 | msgPtr[*topic_len + 3];
if(pkt_id == 0)
return 255;
*topic = MQTT_MallocBuffer(*topic_len + 1); //為topic分配內(nèi)存
if(*topic == NULL)
return 255;
memset(*topic, 0, *topic_len + 1);
memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //復(fù)制數(shù)據(jù)
*payload_len = remain_len - 4 - *topic_len;
*payload = MQTT_MallocBuffer(*payload_len + 1); //為payload分配內(nèi)存
if(*payload == NULL) //如果失敗
{
MQTT_FreeBuffer(*topic); //則需要把topic的內(nèi)存釋放掉
return 255;
}
memset(*payload, 0, *payload_len + 1);
memcpy(*payload, (int8 *)msgPtr + 4 + *topic_len, *payload_len);
break;
default:
return 255;
}
if(strchr((int8 *)topic, '+') || strchr((int8 *)topic, '#'))
return 255;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPublishAck
//
// 函數(shù)功能: Publish Ack消息組包
//
// 入口參數(shù): pkt_id:packet id
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗原因
//
// 說明: 當(dāng)收到的Publish消息的QoS等級為1時,需要Ack回復(fù)
//==========================================================
uint1 MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 4);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBACK << 4;
//固定頭部----------------------剩余長度-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 2;
/*************************************可變頭部***********************************************/
//可變頭部----------------------pkt_id長度-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketPublishAck
//
// 函數(shù)功能: Publish Ack消息解包
//
// 入口參數(shù): rev_data:收到的數(shù)據(jù)
//
// 返回參數(shù): 0-成功 1-失敗原因
//
// 說明:
//==========================================================
uint1 MQTT_UnPacketPublishAck(uint8 *rev_data)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
return 0;
else
return 1;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPublishRec
//
// 函數(shù)功能: Publish Rec消息組包
//
// 入口參數(shù): pkt_id:packet id
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗原因
//
// 說明: 當(dāng)收到的Publish消息的QoS等級為2時,先收到rec
//==========================================================
uint1 MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 4);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREC << 4;
//固定頭部----------------------剩余長度-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 2;
/*************************************可變頭部***********************************************/
//可變頭部----------------------pkt_id長度-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketPublishRec
//
// 函數(shù)功能: Publish Rec消息解包
//
// 入口參數(shù): rev_data:接收到的數(shù)據(jù)
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_UnPacketPublishRec(uint8 *rev_data)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
return 0;
else
return 1;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPublishRel
//
// 函數(shù)功能: Publish Rel消息組包
//
// 入口參數(shù): pkt_id:packet id
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗原因
//
// 說明: 當(dāng)收到的Publish消息的QoS等級為2時,先收到rec,再回復(fù)rel
//==========================================================
uint1 MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 4);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREL << 4 | 0x02;
//固定頭部----------------------剩余長度-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 2;
/*************************************可變頭部***********************************************/
//可變頭部----------------------pkt_id長度-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketPublishRel
//
// 函數(shù)功能: Publish Rel消息解包
//
// 入口參數(shù): rev_data:接收到的數(shù)據(jù)
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == MOSQ_MSB(pkt_id) && rev_data[3] == MOSQ_LSB(pkt_id))
return 0;
else
return 1;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPublishComp
//
// 函數(shù)功能: Publish Comp消息組包
//
// 入口參數(shù): pkt_id:packet id
// mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗原因
//
// 說明: 當(dāng)收到的Publish消息的QoS等級為2時,先收到rec,再回復(fù)rel
//==========================================================
uint1 MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 4);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBCOMP << 4;
//固定頭部----------------------剩余長度-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 2;
/*************************************可變頭部***********************************************/
//可變頭部----------------------pkt_id長度-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
return 0;
}
//==========================================================
// 函數(shù)名稱: MQTT_UnPacketPublishComp
//
// 函數(shù)功能: Publish Comp消息解包
//
// 入口參數(shù): rev_data:接收到的數(shù)據(jù)
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_UnPacketPublishComp(uint8 *rev_data)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
return 0;
else
return 1;
}
//==========================================================
// 函數(shù)名稱: MQTT_PacketPing
//
// 函數(shù)功能: 心跳請求組包
//
// 入口參數(shù): mqttPacket:包指針
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
uint1 MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 2);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定頭部***********************************************/
//固定頭部----------------------頭部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PINGREQ << 4;
//固定頭部----------------------剩余長度-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 0;
return 0;
}
4.連接測試
4.1.準(zhǔn)備物件
1.stm32F103系列開發(fā)板
2.ESP8266模塊
4.2.連線圖與修改參數(shù)
ESP8266連接引腳
TX -- PA3? ? ? ? VCC -- 3.3V
RX -- PA2? ? ? ?GND -- GND
RST -- PB9
修改OenNET.c的連接參數(shù),把之前保存的參數(shù)填寫上去
使用的熱點名稱為 hhh? 密碼 12345678 (大家自行打開手機(jī)熱點或者修改一下WiFi參數(shù)哦)
WIFi參數(shù)修改在esp8266.c
#define ESP8266_WIFI_INFO?? ??? ?"AT+CWJAP=\"hhh\",\"12345678\"\r\n"#define ESP8266_ONENET_INFO?? ??? ?"AT+CIPSTART=\"TCP\",\"183.230.40.39\",6002\r\n"
4.3.連接成功
顯示平臺設(shè)備在線,只需要在添加另一臺設(shè)備訂閱相同主題即可實現(xiàn)消息的收發(fā)
?到此OneNET連接教程完成??
關(guān)于如何連接阿里云
1.搭建云平臺設(shè)備
阿里云https://www.aliyun.com/?accounttraceid=5df262f1b2db40f2b26ca21fd1f026bdgwje
1.1.平臺選擇?
?
1.2.創(chuàng)造產(chǎn)品
?
?
1.3.創(chuàng)造設(shè)備
?
2.查看設(shè)備參數(shù)(MQTT連接使用)
2.1.設(shè)備連接參數(shù)保存
2.2.主題的發(fā)布訂閱
3.下位機(jī)代碼編寫
3.1.復(fù)用onenet的底層配置
4.連接測試
4.1.準(zhǔn)備物件
1.stm32F103系列開發(fā)板
2.ESP8266模塊
4.2.連線圖與修改參數(shù)
ESP8266連接引腳
TX -- PA3? ? ? ? VCC -- 3.3V
RX -- PA2? ? ? ?GND -- GND
RST -- PB9
修改OenNET.c的連接參數(shù),把之前保存的參數(shù)填寫上去
使用的熱點名稱為 hhh? 密碼 12345678 (大家自行打開手機(jī)熱點或者修改一下WiFi參數(shù)哦)
WIFi參數(shù)修改在esp8266.c
#define ESP8266_WIFI_INFO?? ??? ?"AT+CWJAP=\"hhh\",\"12345678\"\r\n"#define ESP8266_ONENET_INFO?? ???
"AT+CIPSTART=\"TCP\",\"iot-06z00e3auli0497.mqtt.iothub.aliyuncs.com\",1883\r\n"
4.3.連接成功
顯示平臺設(shè)備在線,只需要在添加另一臺設(shè)備訂閱相同主題即可實現(xiàn)消息的收發(fā)
?到此阿里云連接教程完成??
補(bǔ)充信息
?關(guān)于main函數(shù)的使用
//硬件初始化
static void Hardware_Init(void)
{
NVIC_PriorityGroupConfig(NVIC_PriorityGroup_4);//設(shè)置中斷優(yōu)先級分組為組2:2位搶占優(yōu)先級,2位響應(yīng)優(yōu)先級
SysTick_Init(); //延時函數(shù)初始化
USART_Config(); //串口1初始化為115200
Usart2_Init(115200); //串口2,驅(qū)動ESP8266用
// BEEP_Config();
// DHT11_Init();
// ADCx_Init();
}
static void Net_Init()
{
ESP8266_Init(); //初始化ESP8266
Delay_ms(10);
while(OneNet_DevLink(Tips)) //接入OneNET
Delay_ms(100);
OneNet_Subscribe(topics, 1);
}
/**
* @brief 主函數(shù)
* @param 無
* @retval 無
*/
int main(void)
{
unsigned char *dataPtr = NULL;
unsigned short timeCount = 0; //發(fā)送間隔變量
unsigned short timeDoor = 0; //發(fā)送間隔變量
Hardware_Init(); //初始化外圍硬件
Delay_ms(500); /* 延時500個tick */
Net_Init(); //網(wǎng)絡(luò)初始化
while(1)
{
timeDoor ++;
dataPtr = ESP8266_GetIPD(0);
if(dataPtr != NULL)
{
//從收到信息開始算起
timeCount = 0;
OneNet_RevPro(dataPtr);
}else if(timeCount >= 100) //發(fā)送間隔
{
//每兩秒詢問指紋狀態(tài),自動關(guān)門
sprintf((char*)string,"temp:%s humi:%s MQ_2:%s MQ_4:%s MQ_135:%s",temp,humi,MQ_2,MQ_4,MQ_135);
printf("%s\n",string);
OneNet_Subscribe(topics, 1);
//心跳包
while(OneNet_Publish( Tips , string));
timeCount = 0;
}
}
}
關(guān)于如何實現(xiàn)Web端訂閱可以查看這篇文章
WebSocket連接MQTT方案之阿里云及其它平臺通解文章來源:http://www.zghlxwxcb.cn/news/detail-460437.html
?注:轉(zhuǎn)載請標(biāo)明出處!文章來源地址http://www.zghlxwxcb.cn/news/detail-460437.html
到了這里,關(guān)于STM32連接--OneNET,阿里云(MQTT協(xié)議)詳細(xì)教程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!