從零開始用Nodejs搭建一個MQTT服務器,并且用stm32通過esp8266進行消息訂閱和發(fā)布
一、項目背景
最近在做一個物聯(lián)網(wǎng)項目,需要用到服務器進行數(shù)據(jù)的存儲和數(shù)據(jù)的請求和發(fā)送,之前我用過onenet平臺上的http服務,雖然能通過get和post請求進行數(shù)據(jù)的提交和發(fā)送,但是平臺上的數(shù)據(jù)發(fā)生改變卻不能主動推送給esp8266,與我此次的項目不符合,所以pass。然后我了解了下mqtt協(xié)議,它可以消息的發(fā)布和訂閱實現(xiàn)服務器和esp8266的雙向通信,而我之前又學過一些js,所以我就想能不能自己搭建一個mqtt服務器,最終還真的實現(xiàn)了。
二、搭建mqtt服務器
1.搭建開發(fā)環(huán)境
- 安裝nodejs,可以前往nodejs官網(wǎng)下載,網(wǎng)上有許多的安裝教程,這里就不介紹了
- 安裝vscode,同樣網(wǎng)上也有許多的教程,就不介紹了
2.創(chuàng)建一個項目
- 創(chuàng)建一個文件夾并用vscode打開
- 在終端輸入
npm init -y
,初始化項目 - 下載aedes包,aedes是一個基于Node.js的MQTT(Message Queuing Telemetry Transport)服務器實現(xiàn),它可以讓你輕松地搭建MQTT服務器,用于處理MQTT協(xié)議的消息傳遞。通過在終端輸入命令
npm i aedes --save
進行下載。 - 創(chuàng)建一個app.js文件
3.代碼編寫
//app.js
//引入mqtt包
import aedes from "aedes";
//網(wǎng)絡服務包,nodejs自帶
import net from "net";
//創(chuàng)建aedes實例
/*
配置項: mq: 消息隊列,用于存儲和處理消息。默認情況下,aedes 使用內存消息隊列,但你可以使用其他消息隊列實現(xiàn),例如 Redis。
id: 服務器的唯一標識符。如果未指定,則將自動生成一個唯一標識符。
persistence: 持久化存儲,用于將連接和會話狀態(tài)存儲到磁盤或數(shù)據(jù)庫中。默認情況下,aedes 使用內存持久化存儲,但你可以使用其他持久化存儲實現(xiàn),例如 LevelDB 或 MongoDB。
concurrency: 最大并發(fā)連接數(shù)。默認情況下,aedes 允許無限制的并發(fā)連接。
heartbeatInterval: 心跳間隔時間,用于檢測連接是否處于活動狀態(tài)。默認情況下,aedes 每 5 分鐘發(fā)送一次心跳包。
connectTimeout: 連接超時時間。默認情況下,aedes 允許無限制的連接超時時間。
queueLimit: 消息隊列長度限制,用于限制消息隊列的最大長度。默認情況下,aedes 不限制消息隊列長度。
maxClientsIdLength: 最大客戶端 ID 長度。默認情況下,aedes 允許客戶端 ID 的最大長度為 23 個字符。
preConnect: 在連接建立之前執(zhí)行的處理程序??梢杂糜隍炞C客戶端的身份或執(zhí)行其他預處理操作。
authenticate: 身份驗證處理程序,用于驗證客戶端的身份??梢允褂糜脩裘?密碼、證書等進行身份驗證。
authorizePublish: 發(fā)布授權處理程序,用于授權客戶端發(fā)布消息。
authorizeSubscribe: 訂閱授權處理程序,用于授權客戶端訂閱主題。
authorizeForward: 轉發(fā)授權處理程序,用于授權客戶端轉發(fā)消息。
published: 發(fā)布處理程序,用于在消息發(fā)布后執(zhí)行自定義操作,例如記錄日志或觸發(fā)事件。
*/
//我只用到三個配置項,其他配置項有需要可以自行配置
export const aedesApp = new aedes({
heartbeatInterval: 60000, //60s發(fā)送一次心跳包
connectTimeout: 120000, //如果與服務器120s沒有收到連接客戶端發(fā)過來的心跳包,則視為連接斷開
});
//驗證賬號密碼
aedesApp.authenticate = async function (client, username, password, callback) {
//client.id是客戶端的id,是唯一的,username是客戶端的用戶名(密碼為buffer,需要轉化為string),password是客戶端的密碼
//我們可以在這里進行用戶的身份驗證,是否允許客戶端的這次連接請求
const newPassword = password.toString();
if(username === 'admin' && newPassword === '123456'){
callback(null, true); //callback函數(shù)需要傳遞兩個參數(shù),第一個是錯誤實例,第二個是是否同意連接
}else{
callback(null, false)
}
}
//監(jiān)聽MQTT服務器端口,當有客戶端連接上時,觸發(fā)該回調
aedesApp.on('client', async (client) => {
console.log('ClientConnect:', client.id)
})
//監(jiān)聽MQTT服務器端口,當有客戶端主動斷開連接或者服務器120s內沒收到某個客戶端的心跳包就會觸發(fā)該回調
aedesApp.on('clientDisconnect', async (client) => {
console.log('clientDisconnect:', client.id)
})
//訂閱主題。該函數(shù)第一個參數(shù)是要訂閱的主題; 第二個是用于處理收到的消息的函,它接受兩個參數(shù):packet 和 callback。packet 是一個 AedesPublishPacket 對象,表示收到的消息;callback 是一個函數(shù),用于在消息處理完成后通知 aedes 服務器;第三個參數(shù)是訂閱成功的回調函數(shù)
aedesApp.subscribe("myMsg", async function (packet, callback) {
callback();
}, () => {
console.log("訂閱myMsg成功");
});
//處理收到的消息,我們訂閱所有主題收到的消息都可以通過這個事件獲取(我們可以把訂閱收到消息的處理函數(shù)寫在上面訂閱主題函數(shù)的第二個參數(shù)里面,或者統(tǒng)一寫在下面)
aedesApp.on("publish", async function (packet, client){
//packet.topic表示哪個主題,packet.payload是收到的數(shù)據(jù),是一串二進制數(shù)據(jù),我們需要用.toString()將它轉化為字符串
if(packet.topic === 'myMsg'){
console.log("Received message:", packet.payload.toString());
}
})
//發(fā)布主題
setInterval(()=>{ //兩秒發(fā)布一次
aedesApp.publish({
topic: "success", //發(fā)布主題
payload: "yes", //消息內容
qos: 1, //MQTT消息的服務質量(quality of service)。服務質量是1,這意味著這個消息需要至少一次確認(ACK)才能被認為是傳輸成功
retain: false, // MQTT消息的保留標志(retain flag),它用于控制消息是否應該被保留在MQTT服務器上,以便新的訂閱者可以接收到它。保留標志是false,這意味著這個消息不應該被保留
cmd: "publish", // MQTT消息的命令(command),它用于控制消息的類型。命令是"publish",這意味著這個消息是一個發(fā)布消息
dup: false //判斷消息是否是重復的
}, (err) => { //發(fā)布失敗的回調
if (err) {
console.log('發(fā)布失敗')
}
});
},2000)
//創(chuàng)建服務器
const server = net.createServer(aedesApp.handle);
// 運行服務器,運行在1883端口
server.listen(1883, function () {
console.log('server started and listening on port 1883')
})
4.啟動服務
-
在終端輸入
node app.js
啟動服務 -
終端輸出
server started and listening on port 1883
,服務啟動成功
5.驗證服務是否正常
-
下載mqtt包,在終端輸入
npm i mqtt --save
-
創(chuàng)建一個mqtt.js文件
-
代碼編寫
import mqtt from "mqtt"; //連接到 MQTT 服務器 const client = mqtt.connect("mqtt://localhost",{ username:'admin', //用戶名 password:'123456', //密碼 clientId: '1', //客戶端id }); //訂閱主題 client.on("connect", function () { client.subscribe("success", function (err) { if (!err) { console.log("Subscribed to success"); } }); }); //處理收到的消息 client.on("message", function (topic, message) { if (topic === "success") { console.log("Received message:", message.toString()); } }); //發(fā)布消息 setInterval(() => { client.publish("myMsg", '123123'); }, 2000);
-
打開另個一個終端,在終端輸入
node mqtt.js
啟動連接 -
啟動app.js的終端打印出
訂閱myMsg成功 ClientConnect: 1 Received message: 123123
運行截圖
-
當前終端打印出
Subscribed to success Received message : yes
運行截圖
-
到此,mqtt服務啟動成功,且能正常的發(fā)布和訂閱消息
-
三、stm32f103 代碼編寫
mqtt指令可以到安可信的官網(wǎng)查看:安可信mqtt固件
1.esp8266.h
#ifndef _ESP8266_H_
#define _ESP8266_H_
#include "stm32f10x.h"
#define REV_OK 0
#define REV_WAIT 1
void USART3_Init(u32 baud);
void ESP8266_Clear(void);
void ESP8266_MQTT_Init(void);
void publishMYMSG();
#endif
2.esp8266.c
#include "esp8266.h"
#include "delay.h"
#include "string.h"
//復位
#define ESP8266_RST "AT+RST\r\n"
//檢測esp8266是否處于正常工作狀態(tài)
#define ESP8266_AT "AT\r\n"
//設置為客戶端模式
#define ESP8266_CWMODE "AT+CWMODE=1"
//設置WiFi和密碼
#define ESP8266_WIFI_INFO "AT+CWJAP=\"Robotlab-2.4G\",\"8b107lab\"\r\n"
//設置客戶端信息,第三個參數(shù)為客戶端id,第四個為用戶名,第五個為密碼
#define ESP8266_MQTT_MQTTUSERCFG "AT+MQTTUSERCFG=0,1,\"4\",\"admin\",\"123456\",0,0,\"\"\r\n"
//開啟心跳檢測,60s內沒收到心跳包視為斷開
#define ESP8266_MQTT_MQTTCONNCFG "AT+MQTTCONNCFG=0,60,1,\"\",\"\",0,0"
//連接mqtt服務器(注意:esp8266連接的WiFi需要跟啟動mqtt服務器的電腦連接的是同一個,連接的ip地址我們可以打開cmd,輸入ipconfig指令進行查看)
#define ESP8266_MQTT_MQTTCONN "AT+MQTTCONN=0,\"192.168.1.104\",1883,1\r\n"
//訂閱success主題
#define ESP8266_MQTT_MQTTSUB "AT+MQTTSUB=0,\"success\",1\r\n"
//發(fā)布myMsg主題
#define ESP8266_MQTT_MQTTPUB_MYMSG "AT+MQTTPUB=0,\"myMsg\",\"%s\",0,0\r\n"
unsigned char esp8266_buf[500];
unsigned int esp8266_cnt = 0, esp8266_cntPre = 0;
//==========================================================
// 函數(shù)名稱: ESP8266_Clear
//
// 函數(shù)功能: 清空緩存
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void ESP8266_Clear(void)
{
memset(esp8266_buf, 0, sizeof(esp8266_buf));
esp8266_cnt = 0;
}
//==========================================================
// 函數(shù)名稱: ESP8266_WaitRecive
//
// 函數(shù)功能: 等待接收完成
//
// 入口參數(shù): 無
//
// 返回參數(shù): REV_OK-接收完成 REV_WAIT-接收超時未完成
//
// 說明: 循環(huán)調用檢測是否接收完成
//==========================================================
_Bool ESP8266_WaitRecive(void)
{
if(esp8266_cnt == 0) //如果接收計數(shù)為0 則說明沒有處于接收數(shù)據(jù)中,所以直接跳出,結束函數(shù)
return REV_WAIT;
if(esp8266_cnt == esp8266_cntPre) //如果上一次的值和這次相同,則說明接收完畢
{
esp8266_cnt = 0; //清0接收計數(shù)
return REV_OK; //返回接收完成標志
}
esp8266_cntPre = esp8266_cnt; //置為相同
return REV_WAIT; //返回接收未完成標志
}
//==========================================================
// 函數(shù)名稱: ESP8266_SendCmd
//
// 函數(shù)功能: 發(fā)送命令
//
// 入口參數(shù): cmd:命令
// res:需要檢查的返回指令
//
// 返回參數(shù): 0-成功 1-失敗
//
// 說明:
//==========================================================
_Bool ESP8266_SendCmd(char *cmd, char *res, u16 time)
{
char *str;
Usart3_SendString((unsigned char *)cmd, strlen((const char *)cmd));
while(time--)
{
if(ESP8266_WaitRecive() == REV_OK) //如果收到數(shù)據(jù)
{
str=strstr((const char *)esp8266_buf, res) ; //接收返回的數(shù)據(jù)
if(str!= NULL) //如果檢索到關鍵詞
{
ESP8266_Clear(); //清空緩存
return 0;
}
}
delay_ms(10);
}
return 0;
}
//==========================================================
// 函數(shù)名稱: ESP8266_MQTT_Init
//
// 函數(shù)功能: mqtt初始化
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void ESP8266_MQTT_Init(void)
{
printf("START Init\r\n");
ESP8266_Clear();
while(ESP8266_SendCmd("AT+RST\r\n", "ready", 200));
printf("AT\r\n");
while(ESP8266_SendCmd("AT\r\n", "OK", 200));
printf("CWMODE\r\n");
while(ESP8266_SendCmd(ESP8266_CWMODE, "OK", 200));
printf("CWJAP\r\n");
while(ESP8266_SendCmd(ESP8266_WIFI_INFO, "OK", 200));
printf(ESP8266_MQTT_MQTTUSERCFG);
while(ESP8266_SendCmd(ESP8266_MQTT_MQTTUSERCFG, "OK", 200));
printf(ESP8266_MQTT_MQTTCONN);
while(ESP8266_SendCmd(ESP8266_MQTT_MQTTCONN, "OK", 200));
printf(ESP8266_MQTT_MQTTSUB);
while(ESP8266_SendCmd(ESP8266_MQTT_MQTTSUB, "OK", 200));
printf("ESP8266 Init OK\r\n");
}
//發(fā)布myMsg主題
void publishMYMSG()
{
char data[150];
sprintf(data, ESP8266_MQTT_MQTTPUB_MYMSG, "123");
ESP8266_SendCmd(data, "OK", 100);
memset(data, 0, sizeof(data));
}
//串口初始化
void USART3_Init(u32 baud)
{
USART_InitTypeDef USART_InitStructure;
NVIC_InitTypeDef NVIC_InitStructure;
GPIO_InitTypeDef GPIO_InitStructure; // 聲明一個結構體變量,用來初始化GPIO
// 使能串口的RCC時鐘
RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, ENABLE); // 使能UART3所在GPIOB的時鐘
RCC_APB1PeriphClockCmd(RCC_APB1Periph_USART3, ENABLE);
// 串口使用的GPIO口配置
// Configure USART3 Tx (PB.10) as alternate function push-pull
GPIO_InitStructure.GPIO_Pin = GPIO_Pin_10;
GPIO_InitStructure.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_InitStructure.GPIO_Mode = GPIO_Mode_AF_PP;
GPIO_Init(GPIOB, &GPIO_InitStructure);
// Configure USART3 Rx (PB.11) as input floating
GPIO_InitStructure.GPIO_Pin = GPIO_Pin_11;
GPIO_InitStructure.GPIO_Mode = GPIO_Mode_IN_FLOATING;
GPIO_Init(GPIOB, &GPIO_InitStructure);
// 串口中斷配置
// Enable the USART3 Interrupt
NVIC_InitStructure.NVIC_IRQChannel = USART3_IRQn;
NVIC_InitStructure.NVIC_IRQChannelPreemptionPriority = 6; // 搶占優(yōu)先級3
NVIC_InitStructure.NVIC_IRQChannelSubPriority = 0; // 子優(yōu)先級2
NVIC_InitStructure.NVIC_IRQChannelCmd = ENABLE;
NVIC_Init(&NVIC_InitStructure);
// Enable USART3 Receive interrupts 使能串口接收中斷
USART_ITConfig(USART3, USART_IT_RXNE, ENABLE);
// 配置串口
USART_InitStructure.USART_BaudRate = baud;
USART_InitStructure.USART_WordLength = USART_WordLength_8b;
USART_InitStructure.USART_StopBits = USART_StopBits_1;
USART_InitStructure.USART_Parity = USART_Parity_No;
USART_InitStructure.USART_HardwareFlowControl = USART_HardwareFlowControl_None;
USART_InitStructure.USART_Mode = USART_Mode_Rx | USART_Mode_Tx;
// Configure USART3
USART_Init(USART3, &USART_InitStructure); // 配置串口3
// Enable the USART3
USART_Cmd(USART3, ENABLE); // 使能串口3
}
//串口發(fā)送字符串
void Usart3_SendString(unsigned char *str, unsigned short len)
{
unsigned short count = 0;
for (; count < len; count++)
{
USART_SendData(USART3, *str++);
while (USART_GetFlagStatus(USART3, USART_FLAG_TXE) == RESET)
;
while ((USART_GetFlagStatus(USART3, USART_FLAG_TC) == RESET))
; // 等待串口發(fā)送完畢
}
}
//==========================================================
// 函數(shù)名稱: USART3_IRQHandler
//
// 函數(shù)功能: 串口3收發(fā)中斷
//
// 入口參數(shù): 無
//
// 返回參數(shù): 無
//
// 說明:
//==========================================================
void USART3_IRQHandler(void)
{
if(USART_GetFlagStatus(USART3, USART_IT_RXNE))
{
if(esp8266_cnt >= sizeof(esp8266_buf)) esp8266_cnt = 0;
esp8266_buf[esp8266_cnt++] = USART_ReceiveData(USART3);
//1.處理函數(shù)最好不要寫在中斷函數(shù)里面,如果處理函數(shù)時間過長,會造成串口中斷觸發(fā)不及時,數(shù)據(jù)丟失
//2.建議最好使用操作系統(tǒng),將數(shù)據(jù)處理函數(shù)單獨寫成一個任務,或者使用一個標志位,收到就將標志位置1,然后在main函數(shù)中判斷處理,就能避免上面的情況
//3.我們在判斷是否收到mqtt消息時需要有一些特殊字段來進行判斷,比如我們可以在mqtt服務器發(fā)布消息時給每個消息都添加一個 ok 字段,便于我們識別
if(strstr((const char *)esp8266_buf, "success"))
{
printf("收到success主題\r\n");
ESP8266_Clear();
}
}
}
3.main.c
#include "esp8266.h"
int main()
{
USART3_Init(115200);
ESP8266_MQTT_Init();
while(1)
{
publishMYMSG();
delay_ms(2000);
}
}
4.運行結果
串口打印
mqtt服務器
四、結語
如果你有一些nodejs后端知識,你就可以將上述的mqtt服務進行優(yōu)化擴展,比如可以使用數(shù)據(jù)庫進行數(shù)據(jù)持久化,可以使用http服務進行前后端交互,在通過mqtt發(fā)布主題給esp8266進行響應,等等。
有什么問題歡迎大家提問,有什么錯誤也歡迎大家指出來。文章來源:http://www.zghlxwxcb.cn/news/detail-761223.html
創(chuàng)作不易,留下你寶貴的贊吧?。。?span toymoban-style="hidden">文章來源地址http://www.zghlxwxcb.cn/news/detail-761223.html
到了這里,關于從零開始用Nodejs搭建一個MQTT服務器,并且用stm32通過esp8266進行消息訂閱和發(fā)布的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!