一、前言
近年來,物聯(lián)網(wǎng)的發(fā)展如火如荼,已經(jīng)滲透到我們生活的方方面面。從智能家居到工業(yè)自動化,從智慧城市到智慧農(nóng)業(yè),物聯(lián)網(wǎng)正在以前所未有的速度改變著我們的生活。 大家現(xiàn)在可能已經(jīng)習(xí)慣了通過手機(jī)控制家里的燈光、空調(diào)和電視,這就是物聯(lián)網(wǎng)在智能家居領(lǐng)域的應(yīng)用,如果在10年前看到這種設(shè)備的應(yīng)用肯定覺得很牛批,而現(xiàn)在只要是個設(shè)備都能上云,這種家電設(shè)備的遠(yuǎn)程控制已經(jīng)成了大家習(xí)以為常的配置了。而在工業(yè)領(lǐng)域,物聯(lián)網(wǎng)技術(shù)可以幫助企業(yè)實現(xiàn)自動化生產(chǎn)、設(shè)備監(jiān)控和預(yù)防性維護(hù),提高生產(chǎn)效率和產(chǎn)品質(zhì)量。在智慧城市建設(shè)中,物聯(lián)網(wǎng)技術(shù)可以用于交通管理、環(huán)境監(jiān)測和公共安全等方面,提升城市管理和居民生活的質(zhì)量。
從物聯(lián)網(wǎng)開始興起的時候,各大廠家都紛紛推出了自家的IOT物聯(lián)網(wǎng)平臺。 比如: 機(jī)智云、中國移動的onenet、阿里云的IOT、百度的天工物接入、華為云的IOT、騰訊云IOT等等。 這些大廠家的物聯(lián)網(wǎng)服務(wù)器都支持標(biāo)準(zhǔn)的MQTT協(xié)議接入,大家不用自己搭建MQTT服務(wù)器可以直接使用這些現(xiàn)成的服務(wù)器接入設(shè)備開發(fā)是非常的方便的。
我在這幾年也寫了很多物聯(lián)網(wǎng)開發(fā)的案例,不管是、中國移動的onenet、阿里云的IOT、百度的天工物接入、華為云的IOT、騰訊云IOT 這些服務(wù)器都寫了很多教程,演示設(shè)備接入平臺,完成設(shè)備上云,手機(jī)APP對接,電腦程序?qū)?,微信小程序接入,實現(xiàn)遠(yuǎn)程數(shù)據(jù)監(jiān)測控制等等。這些案例都放在了智能家居與物聯(lián)網(wǎng)項目實戰(zhàn)
專欄里。 這些案例里設(shè)備實現(xiàn)上云的方式主要是兩種方式:HTTP協(xié)議、MQTT協(xié)議方式上云。 MQTT協(xié)議是標(biāo)準(zhǔn)的物聯(lián)網(wǎng)協(xié)議,支持雙向數(shù)據(jù)傳輸,也就是可以上傳數(shù)據(jù)到服務(wù)器,也可以接收服務(wù)器下發(fā)的控制命令完成遠(yuǎn)程控制。 我寫的這些案例里硬件端聯(lián)網(wǎng)的模塊主要是用到了4G模塊、ESP8266-WIFI模塊、GSM模塊、NBIOT模塊等等,通過它們聯(lián)網(wǎng),讓單片機(jī)設(shè)備實現(xiàn)上云。
這些設(shè)備中有些是支持MQTT協(xié)議的(也就是本身的固件就支持MQTT協(xié)議),有些不支持的(可能有固件支持,需要自己燒寫)。 如果說固件不支持MQTT協(xié)議,但只要設(shè)備支持TCP協(xié)議,那么我們也可以自己封裝MQTT協(xié)議完成與MQTT服務(wù)器之間的通信。 比如:ESP8266-WIFI模塊,正常的官方默認(rèn)固件中,ESP8266-WIFI是不支持MQTT協(xié)議的,如果我們不燒寫固件的情況下,如何自己實現(xiàn)MQTT協(xié)議上云? 這篇文章就介紹,通過TCP協(xié)議自己封裝MQTT協(xié)議報文,完成數(shù)據(jù)上云。 直接從0開始手?jǐn)]MQTT協(xié)議報文,組合報文,完成與服務(wù)器之間的通信。
MQTT協(xié)議也是分為兩種,分MQTT和MQTTS,就像HTTP協(xié)議一樣也分HTTP和HTTPS,那么區(qū)別呢? 帶S就是要支持SSL協(xié)議,支持認(rèn)證,更加安全,那么復(fù)雜度自然就上來了。 MQTT協(xié)議的端口是1883,MQTTS的端口是8883。 當(dāng)前這篇文章介紹非加密的MQTT協(xié)議,也就是1883端口。MQTTS協(xié)議也手?jǐn)]不了,這玩意涉及到SSL協(xié)議,那就很復(fù)雜了,如果要用,直接使用現(xiàn)成的開源庫就行,但本篇文章不討論這個,后面文章再單獨介紹如何實現(xiàn)MQTTS協(xié)議。
本篇文章的環(huán)境是在windows下,利用VS2022開發(fā)程序,使用windows下網(wǎng)絡(luò)編程接口作為基礎(chǔ),封裝MQTT協(xié)議連接華為云MQTT服務(wù)器,完成數(shù)據(jù)上云。
所以,大家只要有一臺windows電腦,電腦上安裝了VS開發(fā)環(huán)境,任何版本都可以(VS2010、VS2013、VS2015、VS2017、VS2019、VS2022等等都可以的) 跟著這篇文章進(jìn)行學(xué)習(xí),不需要其他任何硬件設(shè)備,我們現(xiàn)在是單純的去學(xué)習(xí)MQTT協(xié)議。
前提呢,大家還是要懂得一點網(wǎng)絡(luò)編程的知識,了解TCP協(xié)議,大致知道TCP協(xié)議通信的簡單過程,如果網(wǎng)絡(luò)編程知識是完全0基礎(chǔ),建議先看另一篇文章學(xué)習(xí)下網(wǎng)絡(luò)編程(我博客有專門講解網(wǎng)絡(luò)編程相關(guān)知識的文章)。 這篇文章也會簡單介紹下TCP協(xié)議和基本網(wǎng)絡(luò)編程知識
那么接下來,我們就開始動手學(xué)習(xí)吧。
二、搭建開發(fā)環(huán)境
如果大家電腦已經(jīng)有開發(fā)環(huán)境,這章節(jié)直接忽略。 這里貼出來為了給 完全0基礎(chǔ) 的小伙伴學(xué)習(xí)
我這里介紹下我用的環(huán)境安裝過程。 所有版本的VS都可以的。
我當(dāng)前環(huán)境是在Windows下,IDE用的是地表最強(qiáng)IDE VS2022。
下載地址:https://visualstudio.microsoft.com/zh-hans/downloads/
因為我這里只需要用到C++和C語言編程,那么安裝的時候可以自己選擇需要安裝的包。
安裝好之后,創(chuàng)建項目。
三、網(wǎng)絡(luò)編程基礎(chǔ)概念科普
如果是老手了,這章節(jié)可以直接忽略。 如果對網(wǎng)絡(luò)編程是 0基礎(chǔ) 的小伙伴,那么就認(rèn)真看一下,了解下基本知識。
3.1 什么是網(wǎng)絡(luò)編程
網(wǎng)絡(luò)編程是通過使用IP地址和端口號等網(wǎng)絡(luò)信息,使兩臺以上的計算機(jī)能夠相互通信,按照規(guī)定的協(xié)議交換數(shù)據(jù)的編程方式。
在網(wǎng)絡(luò)編程中,程序員使用各種協(xié)議和技術(shù),使得不同的設(shè)備可以通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換和信息共享。
要實現(xiàn)網(wǎng)絡(luò)編程,程序員需要了解并掌握各種網(wǎng)絡(luò)通信協(xié)議,比如TCP/IP協(xié)議族,包括TCP、UDP、IP等,這些協(xié)議是實現(xiàn)設(shè)備間通信的基礎(chǔ)。網(wǎng)絡(luò)編程內(nèi)部涉及到數(shù)據(jù)的打包、組裝、發(fā)送、接收、解析等一系列過程,以實現(xiàn)信息的正確傳輸。
在TCP/IP協(xié)議族中,TCP和UDP是位于IP協(xié)議之上的傳輸層協(xié)議。 在OSI模型中,傳輸層是第四層,負(fù)責(zé)總體數(shù)據(jù)傳輸和數(shù)據(jù)控制,為會話層等高三層提供可靠的傳輸服務(wù),為網(wǎng)絡(luò)層提供可靠的目的地點信息。在TCP/IP協(xié)議族中,TCP和UDP正是位于這一層的協(xié)議。
這篇文章主要介紹 TCP 和 UDP 協(xié)議 以及 使用方法。
3.2 TCP 和 UDP協(xié)議介紹
TCP協(xié)議:
TCP(傳輸控制協(xié)議)是一種面向連接的、可靠的傳輸層協(xié)議。在傳輸數(shù)據(jù)之前需要先建立連接,確保數(shù)據(jù)的順序和完整性。TCP通過三次握手建立連接,并通過確認(rèn)、超時和重傳機(jī)制確保數(shù)據(jù)的可靠傳輸。TCP采用流量控制和擁塞控制機(jī)制,以避免網(wǎng)絡(luò)擁塞,確保數(shù)據(jù)的順利傳輸。因為TCP的這些特性,通常被應(yīng)用于需要高可靠性和順序性的應(yīng)用,如網(wǎng)頁瀏覽、電子郵件等。
UDP協(xié)議:
UDP(用戶數(shù)據(jù)報協(xié)議)是一種無連接的、不可靠的傳輸層協(xié)議。與TCP不同,UDP在傳輸數(shù)據(jù)之前不需要建立連接,直接將數(shù)據(jù)打包成數(shù)據(jù)報并發(fā)送出去。因此,UDP沒有TCP的那些確認(rèn)、超時和重傳機(jī)制,也就不保證數(shù)據(jù)的可靠傳輸。UDP也沒有TCP的流量控制和擁塞控制機(jī)制。因為UDP的簡單性和高效性,通常被應(yīng)用于實時性要求較高,但對數(shù)據(jù)可靠性要求不高的應(yīng)用,如語音通話、視頻直播等。
3.3 TCP通信的實現(xiàn)過程
要實現(xiàn)TCP通信,兩端必須要知道對方的IP和端口號:
(1)IP地址:TCP協(xié)議是基于IP協(xié)議進(jìn)行通信的,因此需要知道對方的IP地址,才能建立連接。
(2)端口號:每個TCP連接都有一個唯一的端口號,用于標(biāo)識進(jìn)程和應(yīng)用程序。建立連接時,需要指定本地端口號和遠(yuǎn)端端口號。
(3)應(yīng)用層協(xié)議:TCP協(xié)議只提供數(shù)據(jù)傳輸服務(wù),應(yīng)用程序需要定義自己的應(yīng)用層協(xié)議,用于解析報文和處理數(shù)據(jù)。例如,HTTP協(xié)議就是基于TCP協(xié)議的應(yīng)用層協(xié)議。
在正常的TCP通信過程中,第一步需要建立連接,這個過程稱為“三次握手”。建立連接時,客戶端向服務(wù)器發(fā)送一個SYN包,表示請求建立連接;服務(wù)器接收到SYN包后,向客戶端發(fā)送一個ACK包,表示確認(rèn)收到了SYN包;最后客戶端再向服務(wù)器發(fā)送一個ACK包,表示確認(rèn)收到了服務(wù)器的ACK包,此時連接建立成功。建立連接后,數(shù)據(jù)傳輸就可以開始了。
四、Windows下的網(wǎng)絡(luò)編程相關(guān)API介紹
因為當(dāng)前文章是在Windows下介紹MQTT協(xié)議,要用到網(wǎng)絡(luò)編程的知識,需要使用Windows系統(tǒng)提供的API完成網(wǎng)絡(luò)編程。Windows本身就有一套原生的網(wǎng)絡(luò)編程接口可以直接使用。 在Linux系統(tǒng)下也是一樣,都有自己一套原生的網(wǎng)絡(luò)編程接口。
如果沒有接觸這些API的小伙伴不用慌~~~。 你至少用過C語言里的printf、scanf、strlen之類的函數(shù)吧? 下面介紹的這些網(wǎng)絡(luò)編程API函數(shù)其實和它們沒什么區(qū)別,都是普通的函數(shù),功能不一樣而已。 對你來說,只是多學(xué)了幾個庫函數(shù),只要了解每個函數(shù)的功能就可以調(diào)用了。
那么接下來就學(xué)習(xí)一下常用的網(wǎng)絡(luò)編程相關(guān)的函數(shù)。
微軟的官方文檔地址:https://learn.microsoft.com/zh-cn/windows/win32/api/_winsock/
4.1 常用的函數(shù)介紹
在Windows下進(jìn)行網(wǎng)絡(luò)編程,可以使用Winsock API(Windows Sockets API)來實現(xiàn)。Winsock API是Windows平臺上的標(biāo)準(zhǔn)網(wǎng)絡(luò)編程接口,提供了一系列函數(shù)和數(shù)據(jù)結(jié)構(gòu),用于創(chuàng)建、連接、發(fā)送和接收網(wǎng)絡(luò)數(shù)據(jù)等操作。
下面是常用的Winsock API接口函數(shù):
(1)WSAStartup
:初始化Winsock庫,必須在使用其他Winsock函數(shù)之前調(diào)用。
(2)socket
:創(chuàng)建一個套接字,用于網(wǎng)絡(luò)通信。
(3)bind
:將套接字與本地地址(IP地址和端口號)綁定。
(4)listen
:開始監(jiān)聽連接請求,將套接字設(shè)置為被動模式。
(5)accept
:接受客戶端的連接請求,創(chuàng)建一個新的套接字用于與客戶端通信。
(6)connect
:與遠(yuǎn)程服務(wù)器建立連接。
(7)send
:發(fā)送數(shù)據(jù)到已連接的套接字。
(8)recv
:從已連接的套接字接收數(shù)據(jù)。
(9)sendto
:發(fā)送數(shù)據(jù)到指定的目標(biāo)地址。
(10)recvfrom
:從指定的地址接收數(shù)據(jù)。
(11)closesocket
:關(guān)閉套接字。
(12)getaddrinfo
:根據(jù)主機(jī)名和服務(wù)名獲取地址信息。
(13)gethostbyname
:根據(jù)主機(jī)名獲取主機(jī)的IP地址。
(14)gethostname
:獲取本地主機(jī)名。
4.2 函數(shù)參數(shù)介紹
下面是常用的幾個Winsock API函數(shù)及其函數(shù)原型和參數(shù)含義的介紹:
(1)WSAStartup
:
int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);
-
wVersionRequested
:請求的Winsock版本號。 -
lpWSAData
:指向WSADATA結(jié)構(gòu)的指針,用于接收初始化結(jié)果和相關(guān)信息。
(2)socket
:
SOCKET socket(int af, int type, int protocol);
-
af
:地址族(Address Family),如AF_INET表示IPv4。 -
type
:套接字類型,如SOCK_STREAM表示面向連接的TCP套接字。 -
protocol
:指定協(xié)議。通常為0,表示根據(jù)type
自動選擇合適的協(xié)議。
(3)bind
:
int bind(SOCKET s, const struct sockaddr* name, int namelen);
-
s
:要綁定的套接字。 -
name
:指向sockaddr結(jié)構(gòu)的指針,包含要綁定的本地地址信息。 -
namelen
:name
結(jié)構(gòu)的長度。
(4)listen
:
int listen(SOCKET s, int backlog);
-
s
:要監(jiān)聽的套接字。 -
backlog
:等待連接隊列的最大長度。
(5)accept
:
SOCKET accept(SOCKET s, struct sockaddr* addr, int* addrlen);
-
s
:監(jiān)聽套接字。 -
addr
:用于存儲客戶端地址信息的sockaddr結(jié)構(gòu)。 -
addrlen
:addr
結(jié)構(gòu)的長度。
(6)connect
:
int connect(SOCKET s, const struct sockaddr* name, int namelen);
-
s
:要連接的套接字。 -
name
:指向目標(biāo)地址信息的sockaddr結(jié)構(gòu)指針。 -
namelen
:name
結(jié)構(gòu)的長度。
(7)send
:
int send(SOCKET s, const char* buf, int len, int flags);
-
s
:要發(fā)送數(shù)據(jù)的套接字。 -
buf
:要發(fā)送的數(shù)據(jù)緩沖區(qū)。 -
len
:要發(fā)送的數(shù)據(jù)長度。 -
flags
:額外選項,如MSG_DONTROUTE等。
(8)recv
:
int recv(SOCKET s, char* buf, int len, int flags);
-
s
:要接收數(shù)據(jù)的套接字。 -
buf
:用于存儲接收數(shù)據(jù)的緩沖區(qū)。 -
len
:要接收的數(shù)據(jù)長度。 -
flags
:額外選項。
(9)sendto
:
int sendto(SOCKET s, const char* buf, int len, int flags, const struct sockaddr* to, int tolen);
-
s
:要發(fā)送數(shù)據(jù)的套接字。 -
buf
:要發(fā)送的數(shù)據(jù)緩沖區(qū)。 -
len
:要發(fā)送的數(shù)據(jù)長度。 -
flags
:額外選項。 -
to
:指向目標(biāo)地址信息的sockaddr結(jié)構(gòu)指針。 -
tolen
:to
結(jié)構(gòu)的長度。
(10)recvfrom
:
int recvfrom(SOCKET s, char* buf, int len, int flags, struct sockaddr* from, int* fromlen);
-
s
:要接收數(shù)據(jù)的套接字。 -
buf
:用于存儲接收數(shù)據(jù)的緩沖區(qū)。 -
len
:要接收的數(shù)據(jù)長度。 -
flags
:額外選項。 -
from
:用于存儲發(fā)送方地址信息的sockaddr結(jié)構(gòu)指針。 -
fromlen
:from
結(jié)構(gòu)的長度。
(11)closesocket
:
int closesocket(SOCKET s);
-
s
:要關(guān)閉的套接字。
(12)getaddrinfo
:
int getaddrinfo(const char* nodename, const char* servname, const struct addrinfo* hints, struct addrinfo** res);
-
nodename
:目標(biāo)主機(jī)名或IP地址。 -
servname
:服務(wù)名或端口號。 -
hints
:指向addrinfo結(jié)構(gòu)的指針,提供關(guān)于地址查找的提示。 -
res
:指向addrinfo結(jié)構(gòu)鏈表的指針,用于接收查找結(jié)果。
(13)gethostbyname
:
struct hostent* gethostbyname(const char* name);
-
name
:要查詢的主機(jī)名。
(14)gethostname
:
int gethostname(char* name, int namelen);
-
name
:用于接收主機(jī)名的緩沖區(qū)。 -
namelen
:name
緩沖區(qū)的長度。
4.3 編寫代碼體驗網(wǎng)絡(luò)編程
上面了解了這些函數(shù),可能不知道如何使用。 這里就寫一個例子,以TCP客戶端的身份去連接TCP服務(wù)器,完成數(shù)據(jù)傳輸。
**下面代碼實現(xiàn)一個TCP客戶端,連接到指定的服務(wù)器并完成通信。 ** 可以直接將代碼貼到你的工程里,運行,體驗效果。
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib") //告訴編譯器鏈接Winsock庫
int main()
{
WSADATA wsaData; //創(chuàng)建一個結(jié)構(gòu)體變量,用于存儲關(guān)于Winsock庫的信息
int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock庫,指定版本號2.2,檢查返回值
if (result != 0)
{
std::cout << "WSAStartup failed: " << result << std::endl; //輸出錯誤信息并退出程序
return 1;
}
SOCKET connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //創(chuàng)建一個TCP套接字,檢查返回值
if (connectSocket == INVALID_SOCKET)
{
std::cout << "socket failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
WSACleanup(); //清除Winsock庫
return 1;
}
sockaddr_in service; //創(chuàng)建一個結(jié)構(gòu)體變量,用于存儲服務(wù)器地址信息
service.sin_family = AF_INET; //指定地址族為IPv4
inet_pton(AF_INET, "127.0.0.1", &service.sin_addr); //將字符串類型的IP地址轉(zhuǎn)換為二進(jìn)制網(wǎng)絡(luò)字節(jié)序的IP地址,并存儲在結(jié)構(gòu)體中
service.sin_port = htons(12345); //將端口號從主機(jī)字節(jié)序轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序,并存儲在結(jié)構(gòu)體中
result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //連接到服務(wù)器,檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 1;
}
std::cout << "Connected to server." << std::endl; //連接成功,輸出消息
char sendBuffer[1024] = "Hello, server!"; //創(chuàng)建發(fā)送緩沖區(qū),存儲待發(fā)送的數(shù)據(jù)
result = send(connectSocket, sendBuffer, sizeof(sendBuffer), 0); //向服務(wù)器發(fā)送數(shù)據(jù),檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 1;
}
char recvBuffer[1024]; //創(chuàng)建接收緩沖區(qū),用于存儲從服務(wù)器接收到的數(shù)據(jù)
result = recv(connectSocket, recvBuffer, sizeof(recvBuffer), 0); //從服務(wù)器接收數(shù)據(jù),檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 1;
}
std::cout << "Received message from server: " << recvBuffer << std::endl; //輸出從服務(wù)器收到的數(shù)據(jù)
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 0;
}
運行效果:
五、訪問華為云IOT服務(wù)器創(chuàng)建一個產(chǎn)品和設(shè)備
5.2 開通物聯(lián)網(wǎng)服務(wù)
地址: https://www.huaweicloud.com/product/iothub.html
現(xiàn)在可以免費創(chuàng)建的,按需付費。 只要是不超過規(guī)格,就可以免費使用,對于個人項目Demo來說,完全夠用的
點擊立即創(chuàng)建之后,會開始創(chuàng)建實例,需要等待片刻,再刷新瀏覽器就可以看到創(chuàng)建成功了。
創(chuàng)建完成,點擊實例,即可進(jìn)入實例詳情頁面。
進(jìn)入實例詳情頁面后,可以看到接入信息的描述,我們當(dāng)前設(shè)備準(zhǔn)備采用MQTT協(xié)議接入華為云平臺,這里可以看到MQTT協(xié)議的地址和端口號等信息。
總結(jié):
端口號: MQTT (1883)
接入地址: 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com
**根據(jù)域名地址得到IP地址信息: ** 打開windows的CMD窗口
Microsoft Windows [版本 10.0.19045.3693]
(c) Microsoft Corporation。保留所有權(quán)利。
C:\Users\11266>ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com
正在 Ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com [117.78.5.125] 具有 32 字節(jié)的數(shù)據(jù):
來自 117.78.5.125 的回復(fù): 字節(jié)=32 時間=41ms TTL=94
來自 117.78.5.125 的回復(fù): 字節(jié)=32 時間=38ms TTL=94
來自 117.78.5.125 的回復(fù): 字節(jié)=32 時間=37ms TTL=94
來自 117.78.5.125 的回復(fù): 字節(jié)=32 時間=39ms TTL=94
117.78.5.125 的 Ping 統(tǒng)計信息:
數(shù)據(jù)包: 已發(fā)送 = 4,已接收 = 4,丟失 = 0 (0% 丟失),
往返行程的估計時間(以毫秒為單位):
最短 = 37ms,最長 = 41ms,平均 = 38ms
C:\Users\11266>
MQTT協(xié)議接入端口號有兩個,1883是非加密端口,8883是證書加密端口,單片機(jī)無法加載證書,所以使用1883端口比較合適。 接下來的ESP8266就采用1883端口連接華為云物聯(lián)網(wǎng)平臺。
5.3 創(chuàng)建產(chǎn)品
(1)創(chuàng)建產(chǎn)品
點擊產(chǎn)品頁,再點擊左上角創(chuàng)建產(chǎn)品。
(2)填寫產(chǎn)品信息
根據(jù)自己產(chǎn)品名字填寫,下面的設(shè)備類型選擇自定義類型。
(3)產(chǎn)品創(chuàng)建成功
創(chuàng)建成功之后,點擊產(chǎn)品的名字就可以進(jìn)入到產(chǎn)品的詳情頁。
(4)添加自定義模型
產(chǎn)品創(chuàng)建完成之后,點擊進(jìn)入產(chǎn)品詳情頁面,翻到最下面可以看到模型定義。
模型簡單來說: 就是存放設(shè)備上傳到云平臺的數(shù)據(jù)。
先點擊自定義模型。
再創(chuàng)建一個服務(wù)ID。
接著點擊新增屬性。
這里就創(chuàng)建一個溫度的屬性。我們這個設(shè)備用來測溫的。
3.4 添加設(shè)備
產(chǎn)品是屬于上層的抽象模型,接下來在產(chǎn)品模型下添加實際的設(shè)備。添加的設(shè)備最終需要與真實的設(shè)備關(guān)聯(lián)在一起,完成數(shù)據(jù)交互。
(1)注冊設(shè)備
(2)根據(jù)自己的設(shè)備填寫
(3)保存設(shè)備信息
創(chuàng)建完畢之后,點擊保存并關(guān)閉,得到創(chuàng)建的設(shè)備密匙信息。該信息在后續(xù)生成MQTT三元組的時候需要使用。
{
"device_id": "65697df3585c81787ad4da82_stm32",
"secret": "12345678"
}
點擊設(shè)備名稱可以進(jìn)入到設(shè)備詳情頁。
3.5 MQTT協(xié)議主題訂閱與發(fā)布
(1)華為云平臺MQTT協(xié)議使用限制
描述 | 限制 |
---|---|
支持的MQTT協(xié)議版本 | 3.1.1 |
與標(biāo)準(zhǔn)MQTT協(xié)議的區(qū)別 | 支持Qos 0和Qos 1支持Topic自定義不支持QoS2不支持will、retain msg |
MQTTS支持的安全等級 | 采用TCP通道基礎(chǔ) + TLS協(xié)議(最高TLSv1.3版本) |
單帳號每秒最大MQTT連接請求數(shù) | 無限制 |
單個設(shè)備每分鐘支持的最大MQTT連接數(shù) | 1 |
單個MQTT連接每秒的吞吐量,即帶寬,包含直連設(shè)備和網(wǎng)關(guān) | 3KB/s |
MQTT單個發(fā)布消息最大長度,超過此大小的發(fā)布請求將被直接拒絕 | 1MB |
MQTT連接心跳時間建議值 | 心跳時間限定為30至1200秒,推薦設(shè)置為120秒 |
產(chǎn)品是否支持自定義Topic | 支持 |
消息發(fā)布與訂閱 | 設(shè)備只能對自己的Topic進(jìn)行消息發(fā)布與訂閱 |
每個訂閱請求的最大訂閱數(shù) | 無限制 |
(2)主題訂閱格式
幫助文檔地址:https://support.huaweicloud.com/devg-iothub/iot_02_2200.html
對于設(shè)備而言,一般會訂閱平臺下發(fā)消息給設(shè)備 這個主題。
設(shè)備想接收平臺下發(fā)的消息,就需要訂閱平臺下發(fā)消息給設(shè)備 的主題,訂閱后,平臺下發(fā)消息給設(shè)備,設(shè)備就會收到消息。
如果設(shè)備想要知道平臺下發(fā)的消息,需要訂閱上面圖片里標(biāo)注的主題。
以當(dāng)前設(shè)備為例,最終訂閱主題的格式如下:
$oc/devices/{device_id}/sys/messages/down
最終的格式:
$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down
?
(3)主題發(fā)布格式
對于設(shè)備來說,主題發(fā)布表示向云平臺上傳數(shù)據(jù),將最新的傳感器數(shù)據(jù),設(shè)備狀態(tài)上傳到云平臺。
這個操作稱為:屬性上報。
幫助文檔地址:https://support.huaweicloud.com/usermanual-iothub/iot_06_v5_3010.html
根據(jù)幫助文檔的介紹, 當(dāng)前設(shè)備發(fā)布主題,上報屬性的格式總結(jié)如下:
發(fā)布的主題格式:
$oc/devices/{device_id}/sys/properties/report
最終的格式:
$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report
發(fā)布主題時,需要上傳數(shù)據(jù),這個數(shù)據(jù)格式是JSON格式。
上傳的JSON數(shù)據(jù)格式如下:
{
"services": [
{
"service_id": <填服務(wù)ID>,
"properties": {
"<填屬性名稱1>": <填屬性值>,
"<填屬性名稱2>": <填屬性值>,
..........
}
}
]
}
根據(jù)JSON格式,一次可以上傳多個屬性字段。 這個JSON格式里的,服務(wù)ID,屬性字段名稱,屬性值類型,在前面創(chuàng)建產(chǎn)品的時候就已經(jīng)介紹了,不記得可以翻到前面去查看。
根據(jù)這個格式,組合一次上傳的屬性數(shù)據(jù):
{"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
3.6 MQTT三元組
MQTT協(xié)議登錄需要填用戶ID,設(shè)備ID,設(shè)備密碼等信息,就像我們平時登錄QQ,微信一樣要輸入賬號密碼才能登錄。MQTT協(xié)議登錄的這3個參數(shù),一般稱為MQTT三元組。
接下來介紹,華為云平臺的MQTT三元組參數(shù)如何得到。
(1)MQTT服務(wù)器地址
要登錄MQTT服務(wù)器,首先記得先知道服務(wù)器的地址是多少,端口是多少。
幫助文檔地址:https://console.huaweicloud.com/iotdm/?region=cn-north-4#/dm-portal/home
MQTT協(xié)議的端口支持1883和8883,它們的區(qū)別是:8883 是加密端口更加安全。但是單片機(jī)上使用比較困難,所以當(dāng)前的設(shè)備是采用1883端口進(jìn)連接的。
根據(jù)上面的域名和端口號,得到下面的IP地址和端口號信息: 如果設(shè)備支持填寫域名可以直接填域名,不支持就直接填寫IP地址。 (IP地址就是域名解析得到的)
華為云的MQTT服務(wù)器地址:117.78.5.125
華為云的MQTT端口號:1883
(2)生成MQTT三元組
華為云提供了一個在線工具,用來生成MQTT鑒權(quán)三元組: https://iot-tool.obs-website.cn-north-4.myhuaweicloud.com/
打開這個工具,填入設(shè)備的信息(也就是剛才創(chuàng)建完設(shè)備之后保存的信息),點擊生成,就可以得到MQTT的登錄信息了。
下面是打開的頁面:
填入設(shè)備的信息: (上面兩行就是設(shè)備創(chuàng)建完成之后保存得到的)
直接得到三元組信息。
得到三元組之后,設(shè)備端通過MQTT協(xié)議登錄鑒權(quán)的時候,填入?yún)?shù)即可。
ClientId 65697df3585c81787ad4da82_stm32_0_0_2023120106
Username 65697df3585c81787ad4da82_stm32
Password 12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
到此,云平臺的部署已經(jīng)完成,設(shè)備已經(jīng)可以正常上傳數(shù)據(jù)了。
(3)MQTT登錄測試參數(shù)總結(jié)
IP地址:117.78.5.125
端口號:1883
ClientId 65697df3585c81787ad4da82_stm32_0_0_2023120106
Username 65697df3585c81787ad4da82_stm32
Password 12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
訂閱主題:$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down
發(fā)布主題:$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report
發(fā)布數(shù)據(jù):{"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
六、開始學(xué)習(xí)MQTT協(xié)議
6.1 先了解下MQTT協(xié)議
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱模式的“輕量級”的消息協(xié)議,可在發(fā)布者和訂閱者之間傳遞消息。MQTT協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布,當(dāng)前已經(jīng)成為了一種主流的物聯(lián)網(wǎng)通信協(xié)議。
MQTT最大的優(yōu)點在于,能夠以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實時可靠的消息服務(wù)。它是一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng)用。由于其小巧、高效和可靠的特點,MQTT在物聯(lián)網(wǎng)領(lǐng)域得到了廣泛的應(yīng)用。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT),且已經(jīng)廣泛應(yīng)用于通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中。
MQTT協(xié)議的工作原理是基于發(fā)布/訂閱模式。在這種模式下,發(fā)布者可以向一個或多個主題發(fā)布消息,而訂閱者可以訂閱這些主題以接收相關(guān)消息。這種模式允許多個發(fā)布者和訂閱者同時存在,實現(xiàn)了一種靈活的消息傳遞機(jī)制。此外,MQTT協(xié)議還支持三種消息傳遞質(zhì)量等級,可根據(jù)需要進(jìn)行選擇。
MQTT協(xié)議的另一個重要特點是其輕量級和簡單的設(shè)計。它的消息頭非常小,只有2個字節(jié),這意味著在網(wǎng)絡(luò)帶寬有限的環(huán)境下也能夠?qū)崿F(xiàn)高效的消息傳遞。此外,MQTT協(xié)議還支持持久化連接和消息隊列等高級功能,可進(jìn)一步提高消息的可靠性和傳遞效率。
MQTT協(xié)議的應(yīng)用范圍非常廣泛。例如,在智能家居領(lǐng)域,可以使用MQTT協(xié)議將各種智能設(shè)備連接在一起,實現(xiàn)設(shè)備的遠(yuǎn)程控制和監(jiān)測。在工業(yè)領(lǐng)域,MQTT協(xié)議可以用于實現(xiàn)設(shè)備的遠(yuǎn)程監(jiān)控和維護(hù),提高生產(chǎn)效率和產(chǎn)品質(zhì)量。在智慧城市建設(shè)中,MQTT協(xié)議可以用于交通管理、環(huán)境監(jiān)測和公共安全等方面,提升城市管理和居民生活的質(zhì)量。
6.2 MQTT協(xié)議官網(wǎng)介紹
目前MQTT協(xié)議主要是3.1.1 和 5.0 兩個版本。 本篇文章是介紹3.1.1版本的MQTT協(xié)議。 各大標(biāo)準(zhǔn)的MQTT服務(wù)器都支持3.1.1.
鏈接:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
在文檔的下方就是介紹MQTT協(xié)議的每個包如何封裝的。照著協(xié)議寫代碼就行了。
6.3 需要實現(xiàn)的3個函數(shù)
整個MQTT協(xié)議里,主要是實現(xiàn)3個函數(shù)就行了(其他的接口看自己需求)。
下面列出的3個函數(shù),在一般的MQTT通信里是必備的。我們只要實現(xiàn)了這3個函數(shù),那么完成基本的MQTT通信就沒有問題了。
//發(fā)布主題
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
//訂閱或者取消訂閱主題
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
//登錄MQTT服務(wù)器
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
6.4 查看協(xié)議文檔,了解如何組合協(xié)議報文
【1】打開文檔,打開目錄,翻到第3章節(jié)—MQTT控制報文。
【2】在第3章,控制報文里,找到對應(yīng)的子章節(jié),也就是我們接線需要照著文檔實現(xiàn)協(xié)議組包的章節(jié)。
6.5 實現(xiàn)MQTT_Connect函數(shù)
先認(rèn)真閱讀文檔: 了解這個報文的規(guī)則,以及出現(xiàn)錯誤之后錯誤代碼的含義。
客戶端到服務(wù)端的網(wǎng)絡(luò)連接建立后,客戶端發(fā)送給服務(wù)端的第一個報文必須是 CONNECT
報文。 在一個網(wǎng)絡(luò)連接上,客戶端只能發(fā)送一次CONNECT
報文。服務(wù)端必須將客戶端發(fā)送的第二個 CONNECT
報文當(dāng)作協(xié)議違規(guī)處理并斷開客戶端的連接。
接下來就按順序查看文檔,了解協(xié)議報文里每個字節(jié)如何組成的。
接下來就開始編寫代碼,按照文檔的提示,組合報文。
首先,定義一個數(shù)組,用來存放我們按照MQTT協(xié)議封裝的數(shù)據(jù)。
unsigned char mqtt_txbuf[256];//接收數(shù)據(jù)緩存區(qū)
在定義一個變量,用來保存數(shù)組的下標(biāo),每賦值一次,就自增++;
int mqtt_txlen = 0;
【1】固定報文頭
文檔說了,數(shù)組的第一字節(jié)固定為:0x10。 (不懂為什么是0X10,仔細(xì)看下面文檔里的紅色框框,文檔已經(jīng)把二進(jìn)制位的每個位都標(biāo)注出來了,如果還是看不懂,就需要補(bǔ)習(xí)一下C語言的位運算,熟悉位運算之后,再來看應(yīng)該就很容易了)。
那么第一行代碼就是這么寫:
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT
接下來第2個字節(jié),文檔讓看2.2.3小節(jié)的說明,那么翻到2.2.3小節(jié),了解如何填寫剩余長度值。
根據(jù)文檔說明,那么編寫代碼如下:
//剩余長度(不包括固定頭部)
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
好了,現(xiàn)在第2個字節(jié)賦值已經(jīng)完畢。
【2】協(xié)議名
那么接著看剩下的字節(jié)如何填寫:
根據(jù)文檔說明,編寫代碼如下: (繼續(xù)按順序賦值就行了)
//可變報頭
//協(xié)議名
mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB
mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB
mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M
mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
【3】協(xié)議級別
看文檔說明。
編寫代碼:
//協(xié)議級別
mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4
【4】連接標(biāo)志
關(guān)于每個標(biāo)志的含義,文檔向下翻,下面有詳細(xì)的介紹,每個標(biāo)志位的含義。
編寫代碼:
//連接標(biāo)志
mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags
【5】保持連接的時間
查看文檔說明:
編寫代碼:
mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB
mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包
【6】 可變報頭非規(guī)范示例
【7】最后部分:填寫客戶端ID、用戶名、密碼。
這里面提到的客戶端標(biāo)識符、用戶名、密碼。 就是在前面章節(jié)創(chuàng)建華為云IOT服務(wù)器,得到的MQTT三元組信息。
查看文檔:
編寫代碼:
mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
mqtt_txlen += ClientIDLen;
if (UsernameLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
mqtt_txlen += UsernameLen;
}
if (PasswordLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
mqtt_txlen += PasswordLen;
}
【8】響應(yīng)
上面報文封裝完畢之后,直接就可以通過網(wǎng)絡(luò)接口發(fā)送出去就行了。
前提是創(chuàng)建套接字,連接上MQTT服務(wù)器,然后再將上面封裝好的報文發(fā)送過去就行了。
關(guān)于如何Windows下如何創(chuàng)建套接字,連接服務(wù)器,前面章節(jié)專門講過了,忘記了可以回去再看看。
編寫代碼發(fā)送出去:
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
這個函數(shù)里面的代碼: 就是直接調(diào)用的網(wǎng)絡(luò)接口函數(shù)發(fā)送的。
int result = send(connectSocket,(const char*)buff, len, 0); //向服務(wù)器發(fā)送數(shù)據(jù),檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
return -1;
}
發(fā)送過去之后,服務(wù)器肯定有返回值的。 失敗?成功? 那么我們得判斷。
通過響應(yīng)
章節(jié)的文檔說明,這里提到一個CONNACK
響應(yīng)報文,是由服務(wù)器下發(fā)給客戶端的。
在3.2小節(jié),講解了CONNACK
報文的字段含義。如果客戶端的連接報文是正確的,服務(wù)器會下發(fā)0x20 0x00 的確認(rèn)連接報文給客戶端,告訴客戶端連接成功。
編寫代碼: 寫代碼接收服務(wù)器返回的數(shù)據(jù),判斷返回的數(shù)據(jù)是不是符合要求。
Client_GetData(buff);//從服務(wù)器獲取數(shù)據(jù)
const unsigned char parket_connetAck[] = { 0x20,0x02};
if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //連接成功
{
return 0;//連接成功
}
CONNACK 報文
除了固定報文頭之外還有可變報頭。也就是后面還有2個字節(jié)。 我們可以繼續(xù)看文檔下面的介紹。
一個叫連接確認(rèn)標(biāo)志,一個叫連接返回碼。正確的情況下,這兩個值應(yīng)該為0x00 0x00.
關(guān)于返回碼的值,我們可以對它進(jìn)行判斷。如果連接失敗,也可以知道具體的原因。
【9】完整代碼
/*
函數(shù)功能: 登錄服務(wù)器
函數(shù)返回值: 0表示成功 1表示失敗
*/
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password)
{
unsigned short i, j;
int ClientIDLen = (int)strlen(ClientID);
int UsernameLen = (int)strlen(Username);
int PasswordLen = (int)strlen(Password);
unsigned int DataLen;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
//可變報頭+Payload 每個字段包含兩個字節(jié)的長度標(biāo)識
DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT
//剩余長度(不包括固定頭部)
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可變報頭
//協(xié)議名
mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB
mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB
mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M
mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
//協(xié)議級別
mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4
//連接標(biāo)志
mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags
mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB
mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包
mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
mqtt_txlen += ClientIDLen;
if (UsernameLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
mqtt_txlen += UsernameLen;
}
if (PasswordLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
mqtt_txlen += PasswordLen;
}
for (i = 0; i < 5; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
size = Client_GetData(buff);//從服務(wù)器獲取數(shù)據(jù)
if (size <= 0)continue;
memcpy(mqtt_rxbuf, buff, size);
printf("登錄應(yīng)答:\r\n");
for (j = 0; j < size; j++)
{
printf("%#X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //連接成功
{
return 0;//連接成功
}
}
return 1;
}
6.6 實現(xiàn)MQTT_PublishData函數(shù)
【1】查看文檔說明
和前面一章節(jié)一樣,看文檔說明編寫代碼。
【2】固定報文頭
通過文檔了解到,發(fā)布消息的固定報文頭由2個字節(jié)組成。第一個字節(jié)每個位的組成含義可看文檔的表格介紹。
最高4位是MQTT控制報文類型,固定的值:0x3
后面4個字節(jié)分為是DUP(重發(fā)標(biāo)志)、QOS等級(服務(wù)質(zhì)量等級)、RETAIN(消息的保留標(biāo)志)。
DUP(重發(fā)標(biāo)志):
QOS等級(服務(wù)質(zhì)量等級):
RETAIN(保留標(biāo)志–固定位0):
那么經(jīng)過文檔的解釋,我們編寫代碼如下: (關(guān)于后面4個字節(jié)可以根據(jù)自己的需求設(shè)置)
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH
【3】剩余字段長度
固定報文頭的第2個字節(jié)是填寫剩余字段長度。
在文檔里對剩余長度字段
計算的介紹:
剩余長度字段: 等于可變報頭的長度加上有效載荷的長度。
這里先要了解: 什么是可變報頭? 什么是有效載荷的長度?
往下翻文檔,可看到可變報頭
的介紹。 **可變報頭是:按順序包含主題名和報文標(biāo)識符。 ** 而報文標(biāo)識符只有在QOS為1或者2的時候才有。
再往下翻文檔,可看到有效載荷的長度
的介紹。有效載荷包含將被發(fā)布的應(yīng)用消息。數(shù)據(jù)的內(nèi)容和格式是應(yīng)用特定的。有效載荷的長度這樣計算:用固定 報頭中的剩余長度字段的值減去可變報頭的長度。包含零長度有效載荷的 PUBLISH 報文是合法的。
經(jīng)過文檔的介紹,我們知道了剩余長度字段如何介紹。
那么這個剩余長度字段計算出來之后,如何賦值到報文數(shù)組里去? 其實在Connect
報文的固定字段第2個字節(jié)也是要天剩余長度字段,在Connect
報文的章節(jié)已經(jīng)介紹過,在文檔的2.2.3小節(jié)的有詳細(xì)說明,那么翻到2.2.3小節(jié),了解如何填寫剩余長度值。 (其實我們上一節(jié)已經(jīng)介紹了一遍)
接下來就編寫代碼:
unsigned int topicLength = (int)strlen(topic); //計算主題的長度
unsigned int messageLength = (int)strlen(message); //計算發(fā)送的消息長度
unsigned int DataLen; //保存最終長度
//有效載荷的長度這樣計算:用固定報頭中的剩余長度字段的值減去可變報頭的長度
//QOS為0時沒有標(biāo)識符
//數(shù)據(jù)長度 主題名 報文標(biāo)識符 有效載荷
if (qos) DataLen = (2 + topicLength) + 2 + messageLength;
else DataLen = (2 + topicLength) + messageLength;
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
【4】可變報頭
PUBLISH 報文可變報頭非規(guī)范示例:
編寫代碼:
mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主題長度MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主題長度LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷貝主題
mqtt_txlen += topicLength;
//報文標(biāo)識符
if (qos)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
id++;
}
接著就添加有效載荷,也就是實際發(fā)送的消息內(nèi)容。
編寫代碼:
memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
mqtt_txlen += messageLength;
最后將報文發(fā)送出去:
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
【5】完整代碼
//MQTT發(fā)布數(shù)據(jù)打包函數(shù)
//topic 主題
//message 消息
//qos 消息等級
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos)
{
unsigned int topicLength = (int)strlen(topic);
unsigned int messageLength = (int)strlen(message);
unsigned short id = 0;
unsigned int DataLen;
mqtt_txlen = 0;
printf("上報JSON消息長度:%d\r\n", messageLength);
printf("message=%s\r\n", message);
//有效載荷的長度這樣計算:用固定報頭中的剩余長度字段的值減去可變報頭的長度
//QOS為0時沒有標(biāo)識符
//數(shù)據(jù)長度 主題名 報文標(biāo)識符 有效載荷
if (qos) DataLen = (2 + topicLength) + 2 + messageLength;
else DataLen = (2 + topicLength) + messageLength;
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主題長度MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主題長度LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷貝主題
mqtt_txlen += topicLength;
//報文標(biāo)識符
if (qos)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
id++;
}
memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
mqtt_txlen += messageLength;
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
return mqtt_txlen;
}
【6】發(fā)布確認(rèn)
如果消息質(zhì)量大于0,那么可以繼續(xù)看文檔下面的發(fā)布確認(rèn),對本次發(fā)送的消息進(jìn)行響應(yīng)處理。
6.7 實現(xiàn) MQTT_SubscribeTopic 函數(shù)
【1】查看文檔:訂閱主題的格式
和前面一樣,查看文檔的說明,編寫代碼。
【2】查看文檔:取消訂閱的格式
【3】固定報頭
訂閱主題和取消訂閱主題格式一樣的,只是固定報頭不一樣。
可以封裝一個函數(shù),傳入一個參數(shù)實現(xiàn)兩種功能。
編寫判斷代碼:
//固定報頭
//控制報文類型
if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息類型和標(biāo)志訂閱
else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消訂閱
剩余長度字段:
編寫代碼:
unsigned int topiclen = (int)strlen(topic);
unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可變報頭的長度(2字節(jié))加上有效載荷的長度
剩余長度字段的填寫規(guī)則與前面一樣。
編寫代碼:
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
【4】可變報頭
編寫代碼:
//可變報頭
mqtt_txbuf[mqtt_txlen++] = 0; //消息標(biāo)識符 MSB
mqtt_txbuf[mqtt_txlen++] = 0x01; //消息標(biāo)識符 LSB
【5】完整代碼
/*
函數(shù)功能: MQTT訂閱/取消訂閱數(shù)據(jù)打包函數(shù)
函數(shù)參數(shù):
topic 主題
qos 消息等級 0:最多分發(fā)一次 1: 至少分發(fā)一次 2: 僅分發(fā)一次
whether 訂閱/取消訂閱請求包 (1表示訂閱,0表示取消訂閱)
返回值: 0表示成功 1表示失敗
*/
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether)
{
unsigned char i, j;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
unsigned int topiclen = (int)strlen(topic);
unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可變報頭的長度(2字節(jié))加上有效載荷的長度
//固定報頭
//控制報文類型
if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息類型和標(biāo)志訂閱
else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消訂閱
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可變報頭
mqtt_txbuf[mqtt_txlen++] = 0; //消息標(biāo)識符 MSB
mqtt_txbuf[mqtt_txlen++] = 0x01; //消息標(biāo)識符 LSB
//有效載荷
mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主題長度 MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主題長度 LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);
mqtt_txlen += topiclen;
if (whether)
{
mqtt_txbuf[mqtt_txlen++] = qos;//QoS級別
}
for (i = 0; i < 100; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
//printf("訂閱消息發(fā)布成功\n");
size = Client_GetData(buff);//從服務(wù)器獲取數(shù)據(jù)
if (size <= 0)
{
continue;
}
memcpy(mqtt_rxbuf, buff, size);
printf("訂閱應(yīng)答:\r\n");
for (j = 0; j < size; j++)
{
printf("%#X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //連接成功
{
return 0;//連接成功
}
Sleep(1000);
}
return 1; //失敗
}
七、運行項目、連接華為云服務(wù)器
7.1 整個項目的完整代碼
前面章節(jié)陸續(xù)已經(jīng)編寫好了重要的函數(shù),那么這里就貼出我編寫好的整體的代碼,進(jìn)行運行測試:
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#pragma warning(disable:4996)
#include <string.h>
#include <stdio.h>
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib") //告訴編譯器鏈接Winsock庫
//---------------------------------------MQTT協(xié)議相關(guān)的子函數(shù)聲明-------------------------------------------------------
//發(fā)布主題
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
//訂閱或者取消訂閱主題
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
//登錄MQTT服務(wù)器
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
//MQTT協(xié)議緩沖區(qū)初始化
void MQTT_Init(void);
//調(diào)用底層接口發(fā)送數(shù)據(jù)包
void MQTT_SendBuf(unsigned char* buf, unsigned short len);
//MQTT協(xié)議里最底層的接口,最底層的如果要移植協(xié)議到其他地方運行,那么改這里就行了。其他地方不用改的。
int Client_SendData(unsigned char* buff, unsigned int len);//發(fā)送數(shù)據(jù)到服務(wù)器
int Client_GetData(unsigned char* buff);//從服務(wù)器獲取數(shù)據(jù)
//---------------------------------------全局變量定義--------------------------------------------------------------------
#define BYTE0(dwTemp) (*( char *)(&dwTemp))
#define BYTE1(dwTemp) (*((char *)(&dwTemp) + 1))
#define BYTE2(dwTemp) (*((char *)(&dwTemp) + 2))
#define BYTE3(dwTemp) (*((char *)(&dwTemp) + 3))
unsigned char mqtt_rxbuf[1024 * 1024];//發(fā)送數(shù)據(jù)緩存區(qū)
unsigned char mqtt_txbuf[256];//接收數(shù)據(jù)緩存區(qū)
unsigned int mqtt_rxlen;
unsigned int mqtt_txlen;
typedef enum
{
//名字 值 報文流動方向 描述
M_RESERVED1 = 0, // 禁止 保留
M_CONNECT, // 客戶端到服務(wù)端 客戶端請求連接服務(wù)端
M_CONNACK, // 服務(wù)端到客戶端 連接報文確認(rèn)
M_PUBLISH, // 兩個方向都允許 發(fā)布消息
M_PUBACK, // 兩個方向都允許 QoS 1消息發(fā)布收到確認(rèn)
M_PUBREC, // 兩個方向都允許 發(fā)布收到(保證交付第一步)
M_PUBREL, // 兩個方向都允許 發(fā)布釋放(保證交付第二步)
M_PUBCOMP, // 兩個方向都允許 QoS 2消息發(fā)布完成(保證交互第三步)
M_SUBSCRIBE, // 客戶端到服務(wù)端 客戶端訂閱請求
M_SUBACK, // 服務(wù)端到客戶端 訂閱請求報文確認(rèn)
M_UNSUBSCRIBE, // 客戶端到服務(wù)端 客戶端取消訂閱請求
M_UNSUBACK, // 服務(wù)端到客戶端 取消訂閱報文確認(rèn)
M_PINGREQ, // 客戶端到服務(wù)端 心跳請求
M_PINGRESP, // 服務(wù)端到客戶端 心跳響應(yīng)
M_DISCONNECT, // 客戶端到服務(wù)端 客戶端斷開連接
M_RESERVED2, // 禁止 保留
}_typdef_mqtt_message;
//連接成功服務(wù)器回應(yīng) 20 02 00 00
//客戶端主動斷開連接 e0 00
const unsigned char parket_connetAck[] = { 0x20,0x02,0x00,0x00 };
const unsigned char parket_disconnet[] = { 0xe0,0x00 };
const unsigned char parket_heart[] = { 0xc0,0x00 };
const unsigned char parket_heart_reply[] = { 0xc0,0x00 };
const unsigned char parket_subAck[] = { 0x90,0x03 };
void MQTT_Init(void)
{
//緩沖區(qū)賦值
mqtt_rxlen = sizeof(mqtt_rxbuf);
mqtt_txlen = sizeof(mqtt_txbuf);
memset(mqtt_rxbuf, 0, mqtt_rxlen);
memset(mqtt_txbuf, 0, mqtt_txlen);
}
/*
函數(shù)功能: 登錄服務(wù)器
函數(shù)返回值: 0表示成功 1表示失敗
*/
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password)
{
unsigned short i, j;
int ClientIDLen = (int)strlen(ClientID);
int UsernameLen = (int)strlen(Username);
int PasswordLen = (int)strlen(Password);
unsigned int DataLen;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
//可變報頭+Payload 每個字段包含兩個字節(jié)的長度標(biāo)識
DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT
//剩余長度(不包括固定頭部)
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可變報頭
//協(xié)議名
mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB
mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB
mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M
mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
//協(xié)議級別
mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4
//連接標(biāo)志
mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags
mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB
mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包
mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
mqtt_txlen += ClientIDLen;
if (UsernameLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
mqtt_txlen += UsernameLen;
}
if (PasswordLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
mqtt_txlen += PasswordLen;
}
for (i = 0; i < 5; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
size = Client_GetData(buff);//從服務(wù)器獲取數(shù)據(jù)
if (size <= 0)continue;
memcpy(mqtt_rxbuf, buff, size);
printf("登錄應(yīng)答:\r\n");
for (j = 0; j < size; j++)
{
printf("%#X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //連接成功
{
return 0;//連接成功
}
}
return 1;
}
/*
函數(shù)功能: MQTT訂閱/取消訂閱數(shù)據(jù)打包函數(shù)
函數(shù)參數(shù):
topic 主題
qos 消息等級 0:最多分發(fā)一次 1: 至少分發(fā)一次 2: 僅分發(fā)一次
whether 訂閱/取消訂閱請求包 (1表示訂閱,0表示取消訂閱)
返回值: 0表示成功 1表示失敗
*/
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether)
{
unsigned char i, j;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
unsigned int topiclen = (int)strlen(topic);
unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可變報頭的長度(2字節(jié))加上有效載荷的長度
//固定報頭
//控制報文類型
if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息類型和標(biāo)志訂閱
else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消訂閱
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可變報頭
mqtt_txbuf[mqtt_txlen++] = 0; //消息標(biāo)識符 MSB
mqtt_txbuf[mqtt_txlen++] = 0x01; //消息標(biāo)識符 LSB
//有效載荷
mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主題長度 MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主題長度 LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);
mqtt_txlen += topiclen;
if (whether)
{
mqtt_txbuf[mqtt_txlen++] = qos;//QoS級別
}
for (i = 0; i < 100; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
//printf("訂閱消息發(fā)布成功\n");
size = Client_GetData(buff);//從服務(wù)器獲取數(shù)據(jù)
if (size <= 0)
{
continue;
}
memcpy(mqtt_rxbuf, buff, size);
printf("訂閱應(yīng)答:\r\n");
for (j = 0; j < size; j++)
{
printf("%#X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //連接成功
{
return 0;//連接成功
}
Sleep(1000);
}
return 1; //失敗
}
//MQTT發(fā)布數(shù)據(jù)打包函數(shù)
//topic 主題
//message 消息
//qos 消息等級
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos)
{
unsigned int topicLength = (int)strlen(topic);
unsigned int messageLength = (int)strlen(message);
unsigned short id = 0;
unsigned int DataLen;
mqtt_txlen = 0;
printf("上報JSON消息長度:%d\r\n", messageLength);
printf("message=%s\r\n", message);
//有效載荷的長度這樣計算:用固定報頭中的剩余長度字段的值減去可變報頭的長度
//QOS為0時沒有標(biāo)識符
//數(shù)據(jù)長度 主題名 報文標(biāo)識符 有效載荷
if (qos) DataLen = (2 + topicLength) + 2 + messageLength;
else DataLen = (2 + topicLength) + messageLength;
//固定報頭
//控制報文類型
mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH
//剩余長度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主題長度MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主題長度LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷貝主題
mqtt_txlen += topicLength;
//報文標(biāo)識符
if (qos)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
id++;
}
memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
mqtt_txlen += messageLength;
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
return mqtt_txlen;
}
void MQTT_SendBuf(unsigned char* buf, unsigned short len)
{
Client_SendData(buf, len);//發(fā)送數(shù)據(jù)到服務(wù)器
}
//-----------------------------------------MQTT服務(wù)器的參數(shù)------------------------------------------------------------
//服務(wù)器IP
#define SERVER_IP "117.78.5.125"
#define SERVER_PORT 1883 //端口號
//MQTT三元組
#define ClientID "65697df3585c81787ad4da82_stm32_0_0_2023120106"
#define Username "65697df3585c81787ad4da82_stm32"
#define Password "12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58"//密文
//訂閱主題:
#define SET_TOPIC "$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down"//訂閱
//發(fā)布主題:
#define POST_TOPIC "$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report"//發(fā)布
//-----------------------------------------主函數(shù)------------------------------------------------------------
char mqtt_message[1024];//數(shù)據(jù)緩存區(qū)
SOCKET connectSocket; //網(wǎng)絡(luò)套接字
WSADATA wsaData; //創(chuàng)建一個結(jié)構(gòu)體變量,用于存儲關(guān)于Winsock庫的信息
double TEMP = 10.0;
int main()
{
int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock庫,指定版本號2.2,檢查返回值
if (result != 0)
{
printf("WSAStartup failed: %d\r\n", result);//輸出錯誤信息并退出程序
return 1;
}
connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //創(chuàng)建一個TCP套接字,檢查返回值
if (connectSocket == INVALID_SOCKET)
{
printf("socket failed with error: %d", WSAGetLastError());//輸出錯誤信息并退出程序
WSACleanup(); //清除Winsock庫
return 1;
}
sockaddr_in service; //創(chuàng)建一個結(jié)構(gòu)體變量,用于存儲服務(wù)器地址信息
service.sin_family = AF_INET; //指定地址族為IPv4
inet_pton(AF_INET, SERVER_IP, &service.sin_addr); //將字符串類型的IP地址轉(zhuǎn)換為二進(jìn)制網(wǎng)絡(luò)字節(jié)序的IP地址,并存儲在結(jié)構(gòu)體中
service.sin_port = htons(SERVER_PORT); //將端口號從主機(jī)字節(jié)序轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序,并存儲在結(jié)構(gòu)體中
result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //連接到服務(wù)器,檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 1;
}
std::cout << "Connected to server." << std::endl; //連接成功,輸出消息
MQTT_Init();
while (1)
{
/*登錄服務(wù)器*/
if (MQTT_Connect((char*)ClientID, (char*)Username, (char*)Password) == 0)
{
break;
}
// 延時1000毫秒,即1秒
Sleep(1000);
printf("MQTT服務(wù)器登錄校驗中....\n");
}
printf("連接成功_666\r\n");
//訂閱物聯(lián)網(wǎng)平臺數(shù)據(jù)
int stat = MQTT_SubscribeTopic((char*)SET_TOPIC, 1, 1);
if (stat)
{
printf("訂閱失敗\r\n");
closesocket(connectSocket); //關(guān)閉套接字
WSACleanup(); //清除Winsock庫
return 1;
}
printf("訂閱成功\r\n");
/*創(chuàng)建線程*/
while (1)
{
sprintf(mqtt_message, "{\"services\": [{\"service_id\": \"stm32\",\"properties\":{\"TEMP\":%.1f}}]}", (double)(TEMP+=0.2));//溫度
//發(fā)布主題
MQTT_PublishData((char*)POST_TOPIC, mqtt_message, 0);
printf("發(fā)布消息成功\r\n");
Sleep(5000);
}
}
/*發(fā)送數(shù)據(jù)到服務(wù)器*/
int Client_SendData(unsigned char* buff, unsigned int len)
{
int result = send(connectSocket,(const char*)buff, len, 0); //向服務(wù)器發(fā)送數(shù)據(jù),檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
return -1;
}
return 0;
}
/*獲取服務(wù)器下發(fā)數(shù)據(jù)*/
int Client_GetData(unsigned char* buff)
{
int result = recv(connectSocket, (char*)buff,200, 0); //從服務(wù)器接收數(shù)據(jù),檢查返回值
if (result == SOCKET_ERROR)
{
std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
return -1;
}
return result;
}
7.2 代碼里核心的地方
這里填寫MQTT服務(wù)器的信息,也就是前面創(chuàng)建華為云IOT服務(wù)器得到的信息。
這里是主函數(shù),登錄服務(wù)器后訂閱主題,發(fā)布消息。
7.3 編譯運行代碼
按下Ctrl+F5
運行代碼。 彈出控制臺窗口之后,可以看到,我們已經(jīng)連接了華為云MQTT服務(wù)器,并且完成數(shù)據(jù)上傳。
7.4 登錄華為云IOT云端查看數(shù)據(jù)
可以看到設(shè)備已經(jīng)在線了。
可以看到我們的消息也在實時的上傳。
到此,說明我們的MQTT協(xié)議已經(jīng)封裝完成,可以正常的運行了。
八、下發(fā)命令的處理
一般MQTT設(shè)備端除了上傳數(shù)據(jù)以外,還需要接收MQTT服務(wù)器下發(fā)的控制命令。
那么我們接下來就完善一下代碼,接收華為云MQTT服務(wù)器下發(fā)的命令,并進(jìn)行回應(yīng)。
8.1 添加命令
要測試命令下發(fā),那么首先需要再華為云IOT平臺添加命令。
添加一個控制命令。
命令添加完成:
8.2 下發(fā)命令測試
注意:下發(fā)命令是同步的,設(shè)備端必須在線才可以下發(fā)命令。
這個下發(fā)的命令是有反饋。設(shè)備端收到之后,可以向服務(wù)器反饋狀態(tài),這樣服務(wù)器才能知道剛才的控制命令確實發(fā)送成功了。
設(shè)備收到信息之后,上傳回應(yīng)給服務(wù)器的主題和內(nèi)容格式:
Topic:$oc/devices/{device_id}/sys/commands/response/request_id={request_id}
數(shù)據(jù)格式:
{
"result_code": 0,
"response_name": "COMMAND_RESPONSE",
"paras": {
"result": "success"
}
}
云端發(fā)送控制命令之后,設(shè)備收到的消息如下:
$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689{"paras":{"lock":true},"service_id":"lock","command_name":"鎖開關(guān)控制"}
其中request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689
就是本次的請求ID。回應(yīng)的時候需要加上請求ID。服務(wù)器才好對應(yīng)。
以當(dāng)前設(shè)備為例:
發(fā)布的主題這樣填: $oc/devices/65113d05a559fd7cd41435f8_lock1/sys/commands/response/request_id=ce49181e-7636-4b24-946d-c034ca083c1c
發(fā)布的內(nèi)容這樣填:
{"result_code":0,"response_name":"COMMAND_RESPONSE","paras":{"result":"success"}}
8.3 編寫代碼
為了能夠?qū)崟r接收服務(wù)器的代碼,我們單獨增加一個線程來接收服務(wù)器的消息。
在主函數(shù)里MQTT服務(wù)器連接成功之后,增加以下代碼:
/*創(chuàng)建線程*/
HANDLE hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ReceiveData, NULL, 0, NULL);
if (hThread == NULL) {
printf("CreateThread failed.\n");
return 1;
}
編寫線程的工作函數(shù):
// 處理服務(wù)器下發(fā)的數(shù)據(jù)
void ReceiveData(void)
{
// 接收數(shù)據(jù)
char buffer[1024];
char request_id[100];
char send_cmd[500];
int recvSize;
while (1)
{
//等待服務(wù)器下發(fā)消息
recvSize = recv(connectSocket, buffer, 1024, 0);
if (recvSize == SOCKET_ERROR)
{
std::cout << "網(wǎng)絡(luò)斷開連接: " << WSAGetLastError() << std::endl; //輸出錯誤信息并退出程序
return;
}
if (recvSize > 0)
{
printf("服務(wù)器下發(fā)消息:\r\n");
//接收下發(fā)的數(shù)據(jù)
for (int i = 0; i < recvSize; i++)
{
printf("%c", buffer[i]);
}
printf("\r\n");
//下發(fā)指令請求回應(yīng)給服務(wù)器(命令下發(fā))
if (strstr((char*)&buffer[5], "sys/commands/request_id="))
{
char* p = NULL;
p = strstr((char*)&buffer[5], "request_id=");
if (p)
{
//解析數(shù)據(jù)
//$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=6e925cc1-4a8d-4eab-8d85-6c7f15d72189
strncpy(request_id, p, 47);
}
//上報數(shù)據(jù)
sprintf(mqtt_message, "{\"result_code\":0,\"response_name\":\"COMMAND_RESPONSE\",\"paras\":{\"result\":\"success\"}}");
sprintf(send_cmd, "$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/response/%s", request_id);
MQTT_PublishData(send_cmd, mqtt_message, 0);
printf("(命令)發(fā)布主題:%s\r\n", send_cmd);
printf("(命令)發(fā)布數(shù)據(jù):%s\r\n", mqtt_message);
}
}
}
}
8.4 運行代碼測試
先運行客戶端的代碼,登錄MQTT服務(wù)器。
然后,在華為云IOT平臺下發(fā)命令。 如果點擊下發(fā)之后,右上角彈出了 命令下發(fā)成功
,就表示我們客戶端代碼寫OK了。
我們看設(shè)備端收到的消息:
九、總結(jié)
到此,我們的MQTT協(xié)議已經(jīng)開發(fā)完成了。如果大家詳細(xì)閱讀了文章,并且跟著步驟操作了一次,相信你此刻對MQTT協(xié)議應(yīng)該有所認(rèn)識了。我是DS小龍哥
,歡迎關(guān)注我,后續(xù)會有更多的技術(shù)文章、項目文章發(fā)布。文章來源:http://www.zghlxwxcb.cn/news/detail-753092.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-753092.html
到了這里,關(guān)于基于C語言從0開始手?jǐn)]MQTT協(xié)議代碼連接標(biāo)準(zhǔn)的MQTT服務(wù)器,完成數(shù)據(jù)上傳和命令下發(fā)響應(yīng)(華為云IOT服務(wù)器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!