xxx智慧管控一體化平臺mqtt穿透數(shù)據(jù)采集寫入方案
數(shù)據(jù)采集及寫入流程設(shè)計圖
一、硬件設(shè)備
硬件設(shè)備與原有設(shè)備保持不變通過配置dtu設(shè)備進行mqtt穿透功能進行數(shù)據(jù)交互
1、dtu配置詳解:
1.1 dtu工具
本項目使用塔石TAS-LTE-364支持4G無線dtu模塊,下載安裝塔石物聯(lián)網(wǎng)廠家提供的串口測試程序Tool V2.7.1 D20220616.exe
1.2打開程序選擇對應(yīng)dtu型號
1.3 配置串口
點擊右上角三角符號選擇端口(為你插入電腦的串口),波特率(dtu出廠默認9600),校驗參數(shù)選擇8,N,1;點擊打開串口
1.4 COM 口查看
電腦右鍵進入屬性界面,再進入設(shè)備管理界面,最后點擊“端口”查看
1.5 連接成功狀態(tài)
如果 COM 口及波特率等參數(shù)選擇正確,設(shè)備上電開機后會上報 AT Ready,證明設(shè)備啟動成功; 若沒有出現(xiàn),則先斷電再重新上電即可出現(xiàn);如下圖:
1.6 進入配置狀態(tài)
顯示框出現(xiàn) AT Ready 后,點擊“進入配置狀態(tài)”;如果 DTU 配置工具的右邊顯示框返回 OK,則代表 進入配置模式成功了;如下圖:
1.7 配置mqtt穿透通道
點擊通道1,工作模式選擇MQTT透傳,MQTT連接參數(shù),目標地址填寫搭建mqtt服務(wù)地址,目標端口1883(默認),設(shè)備賬戶和密碼為搭建MQTT服務(wù)設(shè)置的賬號密碼,clientId自己設(shè)置不能有中文,如下圖:
1.8 添加訂閱和推送
訂閱參數(shù)訂閱開關(guān)選擇訂閱輸入主題,訂閱指令選擇0、1或2,訂閱主題按照井廠編號/in或out(訂閱為in,推送為out)規(guī)則進行設(shè)置,推送參數(shù)設(shè)置與訂閱方法一致注意區(qū)分主題,如圖:
1.9 配置完成重啟
配置完成后先點擊一鍵配置參數(shù)等待右邊顯示框返回ok,在點擊重啟重啟后可在mqtt服務(wù)后臺查看設(shè)備是否上線;
二、搭建MQTT,rabbitmq以及開發(fā)應(yīng)用程序代碼;
1、MQQT服務(wù)搭建
1.1、下載并安裝
在官網(wǎng)https://www.emqx.io/zh/downloads進行emqx開源版安裝包下載并解壓;
1.2、命令行下進入解壓路徑,啟動 EMQX
./emqx/bin/emqx start
注:也可通過以下命令操作 EMQX,通過 命令行cmd進入emqx安裝目錄bin
emqx start #啟動服務(wù)
emqx ping #返回pong 連接正常
emqx stop #停止服務(wù)
1.3、在安裝環(huán)境下訪問http://localhost:18083 進入后臺管理頁面
默認賬號密碼
admin
public
2、rabbitMQ服務(wù)搭建
(對mqtt消息進行緩存避免直接寫入數(shù)據(jù)庫并發(fā)量大擊垮mysql造成服務(wù)宕機,起到流量削峰作用)
2.1 下載并安裝
進入rabbitmq官網(wǎng) https://www.rabbitmq.com 下載Erlang和與其對應(yīng)rabbitmq-server包
2.2 安裝Erlang
設(shè)置系統(tǒng)環(huán)境變量,然后在命令窗口輸入Erl返回版本號說明安裝成功
2.3 安裝rabbitmq
進入rabbitMQ安裝目錄的sbin目錄點擊上方的路徑框輸入cmd,按下回車鍵
輸入命令點擊回車
rabbitmq-plugins enable rabbitmq_management
重啟服務(wù),雙擊rabbitmq-server.bat(雙擊后可能需要等待一會)
打開瀏覽器,地址欄輸入http://127.0.0.1:15672 ,即可看到管理界面的登陸頁(賬號密碼guest)
3、 Java集成mqtt和rabbitmq
3.1 mqtt
3.11 添加依賴mqtt依賴
<!-- mqtt數(shù)據(jù)對接 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
3.12 mqtt配置類示例
/**
* <b>mqtt配置類</b>
*
* @date 2022/11/15
*/
public class MqttConfig {
/** 訂閱消息主題 - 可為多個 現(xiàn)指一個作為示例 */
public static final String TOPIC = "dtu/out";
/** QOS */
public static final Integer QOS = 0;
/** 鏈接地址 */
public static final String IP_ADDRESS = "tcp://localhost:1883"; //服務(wù)器地址
/** 用戶名 */
public static final String USERNAME = "admin";
/** 密碼 */
public static final String PASSWD = "admin@123";
/**輪詢查詢語句,該語句為modbus指令 */
public static final String [] datas={"0103012B007C35DF",
"010301B7000A7417",
"01030D20002AC773",
"010303D4001405B9",
"010303E80078C598",
"0103046000730501",
"010304D3000FF507",
"010304E2006E6520",
"0103055000698539",
"010305B90023D53A",
"010305DC00648517",
"01030640005F04AE",
"0103069F003734BA",
"010306D6005A2481",
"010307300055848E",
"01030785004B1560"};
/*QoS0 代表,Sender 發(fā)送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,也就算了;
QoS1 代表,Sender 發(fā)送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發(fā)送消息,如果發(fā)送失敗,會繼續(xù)重試,直到 Receiver 收到消息為止,但是因為重傳的原因,Receiver 有可能會收到重復(fù)的消息;
QoS2 代表,Sender 發(fā)送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,會繼續(xù)重試,直到 Receiver 收到消息為止,同時保證 Receiver 不會因為消息重傳而收到重復(fù)的消息。*/
}
3.13 啟動mqtt客戶端代碼
包含連接狀態(tài)獲取,斷線重連,啟動后可在mqtt服務(wù)后臺查看連接
public class MqttInitialized{
/** MQTT客戶端 */
private static MqttClient client = null;
/** 連接選項 */
private static MqttConnectOptions connOpts = null;
/** 連接狀態(tài) */
private static Boolean connectStatus = false;
/**
* 設(shè)置連接信息
*/
static {
try {
// MQTT 連接選項
connOpts = new MqttConnectOptions();
// 設(shè)置認證信息
connOpts.setUserName(MqttConfig.USERNAME);
connOpts.setPassword(MqttConfig.PASSWD.toCharArray());
connOpts.setAutomaticReconnect(true);//啟用自動重新連接
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
// MQ客戶端建立
client = new MqttClient(MqttConfig.IP_ADDRESS, "lxb-lunxun", persistence);
// 設(shè)置回調(diào)
client.setCallback(new MqttHandle());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* MQTT客戶端啟動
*/
@PostConstruct
public static void connect() {
try {
// 建立連接
client.connect(connOpts);
connectStatus = client.isConnected();
log.info("MQTT服務(wù)器連接成功~~~");
} catch (Exception e) {
connectStatus = client.isConnected();
log.error("MQTT服務(wù)器連接失敗!!");
e.printStackTrace();
reconnection();
}
}
/**
* 獲取MQTT客戶端連接狀態(tài)
* @return
*/
public static Boolean getConnectStatus() {
return connectStatus;
}
/**
* 斷線重連
*/
public static void reconnection() {
// 嘗試進行重新連接
while (true) {
if (MqttInitialized.getConnectStatus()) {
// 查詢連接狀態(tài) 連接成功則停止重連
break;
}
try {
log.info("開始進行MQTT服務(wù)器連接.......");
// 進行連接
connect();
Thread.sleep(10000);
} catch (Exception e) {
log.error("重新連接出現(xiàn)異常");
e.printStackTrace();
break;
}
}
}
}
3.14 使用mqtt消息發(fā)布獲取寄存器數(shù)據(jù)
此處只需要將modbus查詢指令發(fā)送給dtu,dtu得到指令會自動取獲取寄存器數(shù)據(jù),查詢語句為配置類中的datas數(shù)據(jù),發(fā)布的主題必須要與dtu設(shè)置的訂閱主題,消息質(zhì)量一致;使用循環(huán)每10秒發(fā)送一條進行查詢實現(xiàn)輪詢效果也可使用dtu自定義輪詢配置與此效果一致;
注意:在此還需要把modus指令16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串才能進行發(fā)送(mqtt發(fā)送消息只能是byte);
/**
* 消息發(fā)布
* @throws MqttException
*/
public void publish() throws MqttException, InterruptedException {
while (true){
for (int i=0; i<MqttConfig.datas.length;i++) {
byte[] bytes =HexStringToByte(MqttConfig.datas[i]);
MqttMessage mqttMessage = new MqttMessage(bytes);
mqttMessage.setQos(MqttConfig.QOS);
client.publish("dtu/in", mqttMessage);
System.out.println(MqttConfig.datas[i]);
Thread.sleep(10000);
}
}
}
3.15 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串方法
/**
* 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串
*
* @param
* @return Byte類型數(shù)組
*/
public static byte[] HexStringToByte(String hexString) {
hexString = hexString.replace(" ", "");
int len = hexString.length();
if (len % 2 != 0)
return null;
byte[] bufD = new byte[len / 2];
byte[] tmpBuf = hexString.getBytes();
int i = 0, j = 0;
for (i = 0; i < len; i++) {
if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
tmpBuf[i] -= 0x30;
else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
tmpBuf[i] -= 0x37;
else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
tmpBuf[i] -= 0x57;
else
tmpBuf[i] = 0xF;
}
for (i = 0, j = 0; i < len; i += 2, j++) {
bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
}
return bufD;
}
3.16 mqtt消息訂閱主題
需要主題名稱以及消息質(zhì)量兩個參數(shù)(此處訂閱主題則為dtu設(shè)置的發(fā)布主題)
/**
* 消息訂閱
* @throws MqttException
*/
public static void subscribe(String topic, Integer qos) throws MqttException {
client.subscribe(topic,qos);
}
3.17 mqtt回調(diào)消息
處理查看訂閱主題的消息和發(fā)送消息成功的回調(diào)(也就是dtu獲取的寄存器數(shù)據(jù))需要實現(xiàn)MqttCallback接口;
注意:mqtt收到消息默認為byte要將byte轉(zhuǎn)為16進制字符串
public class MqttHandle implements MqttCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
private static MqttHandle mqttHandle;
@PostConstruct
private void init(){
mqttHandle = this;
rabbitTemplate=mqttHandle.rabbitTemplate;
redisTemplate=mqttHandle.redisTemplate;
}
/**
* 連接丟失
* @param cause
*/
@Override
public void connectionLost(Throwable cause) {
log.info("connection lost:" + cause.getMessage());
MqttInitialized.reconnection();
}
/**
* 收到消息
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String string = bytesToHexString(message.getPayload());
}
/**
* 消息傳遞成功
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("發(fā)送消息---------" + token.isComplete());
}
/**
* byte[] ->string
* @param src
* @return
*/
public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}
}
3.2 rabbitmq**
3.21 添加pom依賴
<!-- AMQP-rabbitmq 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.22 添加yml配置
需要在rabbitmq管理頁面添加新賬號,guest賬號只能本地環(huán)境使用,開啟手動應(yīng)答避免重復(fù)消費
spring:
rabbitmq:
port: 5672
username: user
password: password
virtual-host: /
host: localhost
listener:
simple:
acknowledge-mode: manual #開啟手動應(yīng)答
prefetch: 1 #消費者每次消費一個消息
3.23 添加配置類
? 設(shè)置交換機、隊列、綁定關(guān)系、路由key(并開啟交換機及隊列持久消息默認持久避免rabbitmq服務(wù)重啟導(dǎo)致消息丟失)
//代碼示例
@Configuration
public class RabbitMqConfiguration {
//1.聲明direct模式交換機
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_mqqt_exchage",true,false );
}
//direct模式隊列
@Bean
public Queue directMqttQueue(){
return new Queue("mqtt.direct.queue",true);
}
//3.聲明direct模式綁定關(guān)系
@Bean
public Binding directMqttBindings(){
return BindingBuilder.bind(directMqttQueue()).to(directExchange()).with("mqtt");
}
}
3.24 將mqtt查詢回調(diào)消息放入rabbitmq隊列
修改mqtt回調(diào)消息代碼,判斷消息如果已0103開頭則為查詢數(shù)據(jù),交換機和路由key必須和設(shè)置的一致;
@Component
@Slf4j
public class MqttHandle implements MqttCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
private static MqttHandle mqttHandle;
@PostConstruct
private void init(){
mqttHandle = this;
rabbitTemplate=mqttHandle.rabbitTemplate;
}
/**
* 收到消息
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String string = bytesToHexString(message.getPayload());
if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0103")){
String exchageName="direct_mqqt_exchage";
String routingKey="mqtt";
mqttHandle.rabbitTemplate.convertAndSend(exchageName,routingKey,string);//存儲消息
}
}
3.25 對rabbitmq中的消息持久到數(shù)據(jù)庫
在@RabbitListener注解中 填寫配置的隊列即可消費消息,開啟手動應(yīng)當(dāng)需手動確認消息
public class MessagesConsumer {
@Autowired
private MessageMapper mapper;
private static MessagesConsumer messagesConsumer;
@PostConstruct
private void init(){
messagesConsumer = this;
mapper=messagesConsumer.mapper;
}
@RabbitListener(queues = "mqtt.direct.queue")
public void processMessage(String msg,Message message, Channel channel) throws IOException {
// 獲得消息內(nèi)容
String messageId = message.getMessageProperties().getMessageId();
String s = new String(message.getBody(), StandardCharsets.UTF_8);
try {
/*
* 業(yè)務(wù)代碼,出現(xiàn) 異常則放回丟列不進行消費
* */
int count=0;
count=messagesConsumer.mapper.insert(s);
if (count >0){
System.out.println("插入成功");
//確認消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}else{
System.out.println("插入失敗");
//退回消息,將消息重新放回隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
} catch (Exception e) {
e.printStackTrace();
//退回消息,將消息重新放回隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
3.3 使用mqtt寫入單個保持寄存器
3.31 寫入單個寄存器指令組成如下,如果寫入成功則返回發(fā)送的指令
從機地址 | 功能碼 | 寄存器地址高位 | 寄存器地址低位 | 數(shù)據(jù)高位 | 數(shù)據(jù)低位 | CRC高位 | CRC低位 |
---|---|---|---|---|---|---|---|
01 | 06 | 00 | 01 | 00 | 03 | 98 | 0B |
1、從機地址和功能碼不表為0106,寄存器高低位地址和數(shù)據(jù)高低位地址需要前臺傳參,crc校驗通過計算獲得從而可以獲得完整的寫入指令;
2、使用mqtt發(fā)送寫入指令消息給對應(yīng)的dtu,并使用當(dāng)前用戶userId作為key,當(dāng)前時間設(shè)為value存入redis;
3、發(fā)送成功會返回發(fā)送的指令,在mqtt消息回調(diào)方法處將由0106開頭的消息+topic主題作為key,當(dāng)前時間設(shè)為value存入redis;
4、發(fā)送完消息后每隔1.5秒執(zhí)行一次判斷key為0106開頭的消息+topic是否大于key為userId的時間,執(zhí)行三次來判斷是否寫入成功;
5、如果為寫入成功則查詢一次修改保持寄存器的數(shù)據(jù)進行數(shù)據(jù)更新 ;
3.32 代碼實現(xiàn)
注意發(fā)送消息的topic與回調(diào)時放入到redis作為 key的topic不相同
@RestController
@RequestMapping("/mqtt")
@Slf4j
public class PublishController {
@Autowired
private MqttInitialized mqttInitialized;
@Autowired
private RedisTemplate redisTemplate;
/**
* 寫入保持寄存器
* @param addr
* @param data
* @param topic
* @return
* @throws InterruptedException
* @throws MqttException
*/
@PostMapping("/writeSingle")
public boolean writeSingle(@RequestParam String addr, @RequestParam String data, @RequestParam String topic) throws InterruptedException, MqttException {
if (ObjectUtil.isEmpty(addr)||ObjectUtil.isEmpty(data)||ObjectUtil.isEmpty(topic))return false;
ValueOperations<String, Long> valueOperations = redisTemplate.opsForValue();
String message="0106"+addr+data+ModbusCRCUtils.getCRC("0106"+addr+data);
valueOperations.set("userId",System.currentTimeMillis());
mqttInitialized.publish(topic,message);
for (int i = 0; i < 3; i++) {
if (ObjectUtil.isNotEmpty(valueOperations.get(message+"dtu/in"))&&valueOperations.get(message+"dtu/in")>(valueOperations.get("userId"))){
mqttInitialized.publish(topic,"0103012B007C35DF");//修改成功后查詢
return true;
}
Thread.sleep(1500);
}
return false;
}
}
3.33 CRC校驗碼計算原理
ModBus 通信協(xié)議的 CRC (冗余循環(huán)校驗碼含2個字節(jié), 即 16 位二進制數(shù))。
CRC 碼由發(fā)送設(shè)備計算, 放置于所發(fā)送信息幀的尾部。
接收信息設(shè)備再重新計算所接收信息 (除 CRC 之外的部分)的 CRC,
比較計算得到的 CRC 是否與接收到CRC相符, 如果兩者不相符, 則認為數(shù)據(jù)出錯。
1) 預(yù)置 1 個 16 位的寄存器為十六進制FFFF(即全為 1) , 稱此寄存器為 CRC寄存器。
2) 把第一個 8 位二進制數(shù)據(jù) (通信信息幀的第一個字節(jié)) 與 16 位的 CRC寄存器的低 8 位相異或, 把結(jié)果放于 CRC寄存器。
3) 把 CRC 寄存器的內(nèi)容右移一位(朝低位)用 0 填補最高位, 并檢查右移后的移出位。
4) 如果移出位為 0, 重復(fù)第 3 步 (再次右移一位); 如果移出位為 1, CRC 寄存器與多項式A001 ( 1010 0000 0000 0001) 進行異或。
5) 重復(fù)步驟 3 和步驟 4, 直到右移 8 次,這樣整個8位數(shù)據(jù)全部進行了處理。
6) 重復(fù)步驟 2 到步驟 5, 進行通信信息幀下一個字節(jié)的處理。
7) 將該通信信息幀所有字節(jié)按上述步驟計算完成后,得到的16位CRC寄存器的高、低字節(jié)進行交換。
8) 最后得到的 CRC寄存器內(nèi)容即為 CRC碼。
/**
* <b>計算CRC16校驗碼工具類</b>
*
* @author Lixubo
* @date 2023/6/2
*/
public class ModbusCRCUtils {
/**
* 計算CRC16校驗碼
* @param str16
* @return
*/
public static String getCRC(String str16) {
byte[] bytes = HexStringToByte(str16);
int CRC = 0x0000ffff;
int POLYNOMIAL = 0x0000a001;
int i, j;
for (i = 0; i < bytes.length; i++) {
CRC ^= (int) bytes[i];
for (j = 0; j < 8; j++) {
if ((CRC & 0x00000001) == 1) {
CRC >>= 1;
CRC ^= POLYNOMIAL;
} else {
CRC >>= 1;
}
}
}
//高低位轉(zhuǎn)換,看情況使用
CRC = ( (CRC & 0x0000FF00) >> 8) | ( (CRC & 0x000000FF ) << 8);
return Integer.toHexString(CRC);
}
/**
* 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串
*
* @param
* @return Byte類型數(shù)組
*/
public static byte[] HexStringToByte(String hexString) {
hexString = hexString.replace(" ", "");
int len = hexString.length();
if (len % 2 != 0)
return null;
byte[] bufD = new byte[len / 2];
byte[] tmpBuf = hexString.getBytes();
int i = 0, j = 0;
for (i = 0; i < len; i++) {
if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
tmpBuf[i] -= 0x30;
else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
tmpBuf[i] -= 0x37;
else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
tmpBuf[i] -= 0x57;
else
tmpBuf[i] = 0xF;
}
for (i = 0, j = 0; i < len; i += 2, j++) {
bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
}
return bufD;
}
}
3.34 調(diào)整mqtt消息回調(diào)代碼加入redis
/**
* <b></b>
*
* @author Lixubo
* @date 2022/11/15
*/
@Component
@Slf4j
public class MqttHandle implements MqttCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
private static MqttHandle mqttHandle;
@PostConstruct
private void init(){
mqttHandle = this;
rabbitTemplate=mqttHandle.rabbitTemplate;
redisTemplate=mqttHandle.redisTemplate;
/**
* 收到消息
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
ValueOperations<String, Long> valueOperations = mqttHandle.redisTemplate.opsForValue();
String string = bytesToHexString(message.getPayload());
if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0106")){
valueOperations.set(string.concat(topic),System.currentTimeMillis());
System.out.println("=========="+string+topic+"==========");
}
}
/**
* byte[] ->string
* @param src
* @return
*/
public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}
}
3.4 根據(jù)clientId獲取客戶端連接狀態(tài)(emqx官方提供的api)
3.41 在emqx管理后臺添加api密鑰進行訪問授權(quán)
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-2sBqFGXe-1686193465745)(C:\Users\Lixubo\AppData\Roaming\Typora\typora-user-images\image-20230606173903317.png)]
3.42 添加pom依賴
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
</dependency>
public static boolean getClientStstus(String clientId) {
// clientId="175-4";
try {
/*
服務(wù)端生成
*/
String username = "1f408e6ac4af426a";
String password = "qEEdNrgr4FI6q289B5wJy9BXOir25eoKdG2ygxsEu5onA";
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://localhost:18083/api/v5/clients/"+clientId)
.header("Content-Type", "application/json")
.header("Authorization", Credentials.basic(username, password))
.build();
Response response = client.newCall(request).execute();
JSONObject jsonObject=JSONObject.parseObject(response.body().string());
System.out.println(jsonObject.toString());
if (ObjectUtil.isNotEmpty(jsonObject.getBoolean("connected"))&&jsonObject.getBoolean("connected")){
return true;
}
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
三、數(shù)據(jù)層使用mysql數(shù)據(jù)持久,redis作為緩存;
四、 其它注意事項
1、mqtt發(fā)布消息查詢數(shù)據(jù)如果為同一個寄存器時注意消息間隔時間必須大于Modbus 超時時間
2、Modbus 超時時間計算:
如果通訊速率為9600時,按照常規(guī)的Modbus RTU,8個數(shù)據(jù)位、1個停止位、偶校驗方式,每傳輸1個字節(jié)數(shù)據(jù)需要的時間為:(8+1+1)/9600=1.04ms/Byte
因此,主站發(fā)出響應(yīng)到從站返回數(shù)據(jù)的時間周期為:(8+5+2*n)*1.04+T1+T2,其中n為寄存器個數(shù),T1為從站的響應(yīng)時間(如果是PLC,則為PLC的掃描時間),T2為通訊余量,一般為20~50ms。如果讀取10個字的數(shù)據(jù),從站響應(yīng)時間為50ms,則整個周期為:(8+5+2x10)x1.04+50+50=134.32ms。因此,超時時間必須大于134.32ms,可以設(shè)置為150ms以上。
如果超時時間太短,響應(yīng)不能完全返回,通訊會報錯。
如果超時時間設(shè)置太長,按照上述例子,一共10個從站,每個從站讀取10個字的數(shù)據(jù),
如果超時時間設(shè)置為1s,如果有1個從站出現(xiàn)故障,則整個輪詢周期為:1000+9*134.32=2208.88 ms。
同理,如果超時時間設(shè)為150ms,則整個輪詢周期為:150+9*134.32=1358.88 ms。
因此,可以看出正確的超時時間設(shè)置可縮短整個輪詢周期,不正確的設(shè)置,將導(dǎo)致通訊出錯或整個通訊周期過長。
3、mqtt多主題訂閱通配符規(guī)則
3.1 主題層級分隔符/
/ 被用來分割主題樹的每一層,并給主題空間提供分等級的結(jié)構(gòu)。當(dāng)兩個通配符在一個主題中出現(xiàn)的時候,主題層次分隔符的使用是很重要的。
3.2 多層通配符#
#是一個匹配主題中任意層次數(shù)的通配符。比如說,如果你訂閱了finance/stock/ibm/#,你就可以接收到以下這些主題的消息。
finance/stock/ibm
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
多層通配符有可以表示大于等于0的層次。因此,finance/#也可以匹配到單獨的finance,在這種情況下#代表0層。在這種語境下主題層次分隔符/就沒有意義了。因為沒有可以分的層次。
多層通配符只可以確定當(dāng)前層或者下一層。因此,#和finance/#都是有效的,但是finance#不是有效的。多層通配符一定要是主題樹的最后一個字符。比如說,finance/#是有效的,但是finance/#/closingprice是無效的。
3.3 單層通配符+
+只匹配主題的一層。比如說,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因為單層通配符只匹配1層,finance/+不匹配finance。
單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,finance/+和finance/+/ibm都是有效的。
3.4 主題語法和用法
當(dāng)你建立一個應(yīng)用,設(shè)計主題樹的時候應(yīng)該考慮以下的主題名字的語法和語義:
主題至少有一個字符長。
主題名字是大小寫敏感的。比如說,ACCOUNTS和Accounts是兩個不同的主題。
主題名字可以包含空格。比如,Accounts payable是一個有效的主題。
以/開頭會產(chǎn)生一個不同的主題。比如說,/finnace與finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主題中包含null(Unicode \x0000)字符。
以下的原則應(yīng)用于主題樹的建造和內(nèi)容文章來源:http://www.zghlxwxcb.cn/news/detail-776095.html
在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
可以有任意數(shù)目的根節(jié)點;也就是說,可以有任意數(shù)目的主題樹。文章來源地址http://www.zghlxwxcb.cn/news/detail-776095.html
層。在這種語境下主題層次分隔符/就沒有意義了。因為沒有可以分的層次。
多層通配符只可以確定當(dāng)前層或者下一層。因此,#和finance/#都是有效的,但是finance#不是有效的。多層通配符一定要是主題樹的最后一個字符。比如說,finance/#是有效的,但是finance/#/closingprice是無效的。
3.3 單層通配符+
+只匹配主題的一層。比如說,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因為單層通配符只匹配1層,finance/+不匹配finance。
單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,finance/+和finance/+/ibm都是有效的。
3.4 主題語法和用法
當(dāng)你建立一個應(yīng)用,設(shè)計主題樹的時候應(yīng)該考慮以下的主題名字的語法和語義:
主題至少有一個字符長。
主題名字是大小寫敏感的。比如說,ACCOUNTS和Accounts是兩個不同的主題。
主題名字可以包含空格。比如,Accounts payable是一個有效的主題。
以/開頭會產(chǎn)生一個不同的主題。比如說,/finnace與finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主題中包含null(Unicode \x0000)字符。
以下的原則應(yīng)用于主題樹的建造和內(nèi)容
在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
可以有任意數(shù)目的根節(jié)點;也就是說,可以有任意數(shù)目的主題樹。
到了這里,關(guān)于java集成mqtt、rabbitmq服務(wù)遠程連接數(shù)dtu實現(xiàn)寄存器rtu數(shù)據(jù)讀寫的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!