国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

java集成mqtt、rabbitmq服務(wù)遠程連接數(shù)dtu實現(xiàn)寄存器rtu數(shù)據(jù)讀寫

這篇具有很好參考價值的文章主要介紹了java集成mqtt、rabbitmq服務(wù)遠程連接數(shù)dtu實現(xiàn)寄存器rtu數(shù)據(jù)讀寫。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

xxx智慧管控一體化平臺mqtt穿透數(shù)據(jù)采集寫入方案

數(shù)據(jù)采集及寫入流程設(shè)計圖

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux


一、硬件設(shè)備

硬件設(shè)備與原有設(shè)備保持不變通過配置dtu設(shè)備進行mqtt穿透功能進行數(shù)據(jù)交互

1、dtu配置詳解:
1.1 dtu工具

本項目使用塔石TAS-LTE-364支持4G無線dtu模塊,下載安裝塔石物聯(lián)網(wǎng)廠家提供的串口測試程序Tool V2.7.1 D20220616.exe

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.2打開程序選擇對應(yīng)dtu型號

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.3 配置串口

點擊右上角三角符號選擇端口(為你插入電腦的串口),波特率(dtu出廠默認9600),校驗參數(shù)選擇8,N,1;點擊打開串口

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.4 COM 口查看

電腦右鍵進入屬性界面,再進入設(shè)備管理界面,最后點擊“端口”查看

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.5 連接成功狀態(tài)

如果 COM 口及波特率等參數(shù)選擇正確,設(shè)備上電開機后會上報 AT Ready,證明設(shè)備啟動成功; 若沒有出現(xiàn),則先斷電再重新上電即可出現(xiàn);如下圖:

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.6 進入配置狀態(tài)

顯示框出現(xiàn) AT Ready 后,點擊“進入配置狀態(tài)”;如果 DTU 配置工具的右邊顯示框返回 OK,則代表 進入配置模式成功了;如下圖:

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.7 配置mqtt穿透通道

點擊通道1,工作模式選擇MQTT透傳,MQTT連接參數(shù),目標地址填寫搭建mqtt服務(wù)地址,目標端口1883(默認),設(shè)備賬戶和密碼為搭建MQTT服務(wù)設(shè)置的賬號密碼,clientId自己設(shè)置不能有中文,如下圖:

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.8 添加訂閱和推送

訂閱參數(shù)訂閱開關(guān)選擇訂閱輸入主題,訂閱指令選擇0、1或2,訂閱主題按照井廠編號/in或out(訂閱為in,推送為out)規(guī)則進行設(shè)置,推送參數(shù)設(shè)置與訂閱方法一致注意區(qū)分主題,如圖:

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

1.9 配置完成重啟

配置完成后先點擊一鍵配置參數(shù)等待右邊顯示框返回ok,在點擊重啟重啟后可在mqtt服務(wù)后臺查看設(shè)備是否上線;

二、搭建MQTT,rabbitmq以及開發(fā)應(yīng)用程序代碼;

1、MQQT服務(wù)搭建
1.1、下載并安裝

在官網(wǎng)https://www.emqx.io/zh/downloads進行emqx開源版安裝包下載并解壓;

1.2、命令行下進入解壓路徑,啟動 EMQX
./emqx/bin/emqx start
注:也可通過以下命令操作 EMQX,通過 命令行cmd進入emqx安裝目錄bin
emqx start  #啟動服務(wù)
emqx ping  #返回pong 連接正常
emqx stop  #停止服務(wù)
1.3、在安裝環(huán)境下訪問http://localhost:18083 進入后臺管理頁面

默認賬號密碼

admin
public

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

2、rabbitMQ服務(wù)搭建

(對mqtt消息進行緩存避免直接寫入數(shù)據(jù)庫并發(fā)量大擊垮mysql造成服務(wù)宕機,起到流量削峰作用)

2.1 下載并安裝

進入rabbitmq官網(wǎng) https://www.rabbitmq.com 下載Erlang和與其對應(yīng)rabbitmq-server包

2.2 安裝Erlang

設(shè)置系統(tǒng)環(huán)境變量,然后在命令窗口輸入Erl返回版本號說明安裝成功

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux
連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

2.3 安裝rabbitmq

進入rabbitMQ安裝目錄的sbin目錄點擊上方的路徑框輸入cmd,按下回車鍵

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

輸入命令點擊回車

 rabbitmq-plugins enable rabbitmq_management

重啟服務(wù),雙擊rabbitmq-server.bat(雙擊后可能需要等待一會)連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

打開瀏覽器,地址欄輸入http://127.0.0.1:15672 ,即可看到管理界面的登陸頁(賬號密碼guest)

連接dtu 服務(wù)端java,網(wǎng)絡(luò),物聯(lián)網(wǎng),linux

3、 Java集成mqtt和rabbitmq
3.1 mqtt
3.11 添加依賴mqtt依賴
<!-- mqtt數(shù)據(jù)對接 -->
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
    </dependency>
3.12 mqtt配置類示例
/**
 * <b>mqtt配置類</b>
 *
 * @date 2022/11/15
 */
public class MqttConfig {
	/** 訂閱消息主題 - 可為多個 現(xiàn)指一個作為示例 */
	public static final String TOPIC = "dtu/out";
	/** QOS */
	public static final Integer QOS = 0;
	/** 鏈接地址 */
	public static final String IP_ADDRESS = "tcp://localhost:1883"; //服務(wù)器地址
	/** 用戶名 */
	public static final String USERNAME = "admin";
	/** 密碼 */
	public static final String PASSWD = "admin@123";
    /**輪詢查詢語句,該語句為modbus指令 */
	public static final String []  datas={"0103012B007C35DF",
										"010301B7000A7417",
										"01030D20002AC773",
										"010303D4001405B9",
										"010303E80078C598",
										"0103046000730501",
										"010304D3000FF507",
										"010304E2006E6520",
										"0103055000698539",
										"010305B90023D53A",
										"010305DC00648517",
										"01030640005F04AE",
										"0103069F003734BA",
										"010306D6005A2481",
										"010307300055848E",
										"01030785004B1560"};
	/*QoS0 代表,Sender 發(fā)送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,也就算了;
    QoS1 代表,Sender 發(fā)送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發(fā)送消息,如果發(fā)送失敗,會繼續(xù)重試,直到 Receiver 收到消息為止,但是因為重傳的原因,Receiver 有可能會收到重復(fù)的消息;
    QoS2 代表,Sender 發(fā)送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,會繼續(xù)重試,直到 Receiver 收到消息為止,同時保證 Receiver 不會因為消息重傳而收到重復(fù)的消息。*/
}

3.13 啟動mqtt客戶端代碼

包含連接狀態(tài)獲取,斷線重連,啟動后可在mqtt服務(wù)后臺查看連接

public class MqttInitialized{
	/** MQTT客戶端 */
	private static MqttClient client = null;
	/** 連接選項 */
	private static MqttConnectOptions connOpts = null;
	/** 連接狀態(tài) */
	private static Boolean connectStatus = false;

	/**
	 * 設(shè)置連接信息
	 */
	static {
		try {
			// MQTT 連接選項
			connOpts = new MqttConnectOptions();
			// 設(shè)置認證信息
			connOpts.setUserName(MqttConfig.USERNAME);
			connOpts.setPassword(MqttConfig.PASSWD.toCharArray());
			connOpts.setAutomaticReconnect(true);//啟用自動重新連接
			//  持久化
			MemoryPersistence persistence = new MemoryPersistence();
			// MQ客戶端建立
			client = new MqttClient(MqttConfig.IP_ADDRESS, "lxb-lunxun", persistence);
			// 設(shè)置回調(diào)
			client.setCallback(new MqttHandle());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
			/**
			 * MQTT客戶端啟動
			 */
			@PostConstruct
			public static void connect() {
				try {
					// 建立連接
					client.connect(connOpts);
					connectStatus = client.isConnected();
					log.info("MQTT服務(wù)器連接成功~~~");
				} catch (Exception e) {
					connectStatus = client.isConnected();
					log.error("MQTT服務(wù)器連接失敗!!");
					e.printStackTrace();
					reconnection();
				}
			}
			
			/**
             * 獲取MQTT客戶端連接狀態(tài)
             * @return
             */
            public static Boolean getConnectStatus() {
                return connectStatus;
            }

			/**
			 * 斷線重連
			 */
			public static void reconnection() {
				// 嘗試進行重新連接
				while (true) {
					if (MqttInitialized.getConnectStatus()) {
						// 查詢連接狀態(tài) 連接成功則停止重連
						break;
					}
					try {
						log.info("開始進行MQTT服務(wù)器連接.......");
						// 進行連接
						connect();
						Thread.sleep(10000);
					} catch (Exception e) {
						log.error("重新連接出現(xiàn)異常");
						e.printStackTrace();
						break;
					}
				}
			}
		}
3.14 使用mqtt消息發(fā)布獲取寄存器數(shù)據(jù)

此處只需要將modbus查詢指令發(fā)送給dtu,dtu得到指令會自動取獲取寄存器數(shù)據(jù),查詢語句為配置類中的datas數(shù)據(jù),發(fā)布的主題必須要與dtu設(shè)置的訂閱主題,消息質(zhì)量一致;使用循環(huán)每10秒發(fā)送一條進行查詢實現(xiàn)輪詢效果也可使用dtu自定義輪詢配置與此效果一致;

注意:在此還需要把modus指令16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串才能進行發(fā)送(mqtt發(fā)送消息只能是byte);

			/**
			 * 消息發(fā)布
			 * @throws MqttException
			 */
			public  void publish() throws MqttException, InterruptedException {
				while (true){
					for (int i=0; i<MqttConfig.datas.length;i++) {
						byte[] bytes =HexStringToByte(MqttConfig.datas[i]);
						MqttMessage mqttMessage = new MqttMessage(bytes);
						mqttMessage.setQos(MqttConfig.QOS);
						client.publish("dtu/in", mqttMessage);
						System.out.println(MqttConfig.datas[i]);
						Thread.sleep(10000);
					}
				}
			}
3.15 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串方法
	/**
	 * 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串
	 *
	 * @param
	 * @return Byte類型數(shù)組
	 */
	public static byte[] HexStringToByte(String hexString) {
		hexString = hexString.replace(" ", "");
		int len = hexString.length();
		if (len % 2 != 0)
			return null;
		byte[] bufD = new byte[len / 2];
		byte[] tmpBuf = hexString.getBytes();
		int i = 0, j = 0;
		for (i = 0; i < len; i++) {
			if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
				tmpBuf[i] -= 0x30;
			else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
				tmpBuf[i] -= 0x37;
			else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
				tmpBuf[i] -= 0x57;
			else
				tmpBuf[i] = 0xF;
		}
		for (i = 0, j = 0; i < len; i += 2, j++) {
			bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
		}
		return bufD;
	}
3.16 mqtt消息訂閱主題

需要主題名稱以及消息質(zhì)量兩個參數(shù)(此處訂閱主題則為dtu設(shè)置的發(fā)布主題)

			/**
			 * 消息訂閱
			 * @throws MqttException
			 */
	public static void subscribe(String topic, Integer qos) throws MqttException {
				client.subscribe(topic,qos);
			}
3.17 mqtt回調(diào)消息

處理查看訂閱主題的消息和發(fā)送消息成功的回調(diào)(也就是dtu獲取的寄存器數(shù)據(jù))需要實現(xiàn)MqttCallback接口;

注意:mqtt收到消息默認為byte要將byte轉(zhuǎn)為16進制字符串

public class MqttHandle implements MqttCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private RedisTemplate redisTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
		redisTemplate=mqttHandle.redisTemplate;
	}
	/**
	 * 連接丟失
	 * @param cause
	 */
	@Override
	public void connectionLost(Throwable cause) {
		log.info("connection lost:" + cause.getMessage());
		MqttInitialized.reconnection();
	}

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String string = bytesToHexString(message.getPayload());
	}

	/**
	 * 消息傳遞成功
	 * @param token
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		System.out.println("發(fā)送消息---------" + token.isComplete());
	}

	/**
	 * byte[] ->string
	 * @param src
	 * @return
	 */
	public static String bytesToHexString(byte[] src){
	   StringBuilder stringBuilder = new StringBuilder("");
	    if (src == null || src.length <= 0) {
				return null;
		     }
	     for (int i = 0; i < src.length; i++) {
		        int v = src[i] & 0xFF;
		        String hv = Integer.toHexString(v);
		        if (hv.length() < 2) {
			             stringBuilder.append(0);
			         }
		         stringBuilder.append(hv);
		     }
	    return stringBuilder.toString();
	}

}
3.2 rabbitmq**
3.21 添加pom依賴
  <!-- AMQP-rabbitmq 依賴 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
3.22 添加yml配置

需要在rabbitmq管理頁面添加新賬號,guest賬號只能本地環(huán)境使用,開啟手動應(yīng)答避免重復(fù)消費

spring:
  rabbitmq:
    port: 5672
    username: user
    password: password
    virtual-host: /
    host: localhost
    listener:
      simple:
        acknowledge-mode: manual #開啟手動應(yīng)答
        prefetch: 1 #消費者每次消費一個消息
3.23 添加配置類

? 設(shè)置交換機、隊列、綁定關(guān)系、路由key(并開啟交換機及隊列持久消息默認持久避免rabbitmq服務(wù)重啟導(dǎo)致消息丟失)

//代碼示例
@Configuration
public class RabbitMqConfiguration {
	//1.聲明direct模式交換機
	@Bean
	public DirectExchange directExchange(){
		return new DirectExchange("direct_mqqt_exchage",true,false );
	}
	//direct模式隊列
	@Bean
	public Queue directMqttQueue(){
		return new Queue("mqtt.direct.queue",true);
	}
	//3.聲明direct模式綁定關(guān)系
	@Bean
	public Binding directMqttBindings(){
		return BindingBuilder.bind(directMqttQueue()).to(directExchange()).with("mqtt");
	}
}

3.24 將mqtt查詢回調(diào)消息放入rabbitmq隊列

修改mqtt回調(diào)消息代碼,判斷消息如果已0103開頭則為查詢數(shù)據(jù),交換機和路由key必須和設(shè)置的一致;


@Component
@Slf4j
public class MqttHandle implements MqttCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
	}

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String string = bytesToHexString(message.getPayload());
		if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0103")){
				String exchageName="direct_mqqt_exchage";
				String routingKey="mqtt";
				mqttHandle.rabbitTemplate.convertAndSend(exchageName,routingKey,string);//存儲消息
		}
	}
3.25 對rabbitmq中的消息持久到數(shù)據(jù)庫

在@RabbitListener注解中 填寫配置的隊列即可消費消息,開啟手動應(yīng)當(dāng)需手動確認消息

public class MessagesConsumer {
	@Autowired
	private MessageMapper mapper;
	private static MessagesConsumer messagesConsumer;
	@PostConstruct
	private void init(){
		messagesConsumer = this;
		mapper=messagesConsumer.mapper;
	}
	@RabbitListener(queues = "mqtt.direct.queue")
	public void processMessage(String msg,Message message, Channel channel) throws IOException {
//		獲得消息內(nèi)容
		String messageId = message.getMessageProperties().getMessageId();
		String s = new String(message.getBody(), StandardCharsets.UTF_8);
		try {
			/*
			 *   業(yè)務(wù)代碼,出現(xiàn) 異常則放回丟列不進行消費
			 * */
			int count=0;
				count=messagesConsumer.mapper.insert(s);
			if (count >0){
				System.out.println("插入成功");
				//確認消息
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			}else{
				System.out.println("插入失敗");
				//退回消息,將消息重新放回隊列
				channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
			}
		} catch (Exception e) {
			e.printStackTrace();
			//退回消息,將消息重新放回隊列
			channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
		}
	}
}
3.3 使用mqtt寫入單個保持寄存器
3.31 寫入單個寄存器指令組成如下,如果寫入成功則返回發(fā)送的指令
從機地址 功能碼 寄存器地址高位 寄存器地址低位 數(shù)據(jù)高位 數(shù)據(jù)低位 CRC高位 CRC低位
01 06 00 01 00 03 98 0B

1、從機地址和功能碼不表為0106,寄存器高低位地址和數(shù)據(jù)高低位地址需要前臺傳參,crc校驗通過計算獲得從而可以獲得完整的寫入指令;

2、使用mqtt發(fā)送寫入指令消息給對應(yīng)的dtu,并使用當(dāng)前用戶userId作為key,當(dāng)前時間設(shè)為value存入redis;

3、發(fā)送成功會返回發(fā)送的指令,在mqtt消息回調(diào)方法處將由0106開頭的消息+topic主題作為key,當(dāng)前時間設(shè)為value存入redis;

4、發(fā)送完消息后每隔1.5秒執(zhí)行一次判斷key為0106開頭的消息+topic是否大于key為userId的時間,執(zhí)行三次來判斷是否寫入成功;

5、如果為寫入成功則查詢一次修改保持寄存器的數(shù)據(jù)進行數(shù)據(jù)更新 ;

3.32 代碼實現(xiàn)

注意發(fā)送消息的topic與回調(diào)時放入到redis作為 key的topic不相同

@RestController
@RequestMapping("/mqtt")
@Slf4j
public class PublishController {
	@Autowired
	private MqttInitialized mqttInitialized;

	@Autowired
	private RedisTemplate redisTemplate;
	/**
	 * 寫入保持寄存器
	 * @param addr
	 * @param data
	 * @param topic
	 * @return
	 * @throws InterruptedException
	 * @throws MqttException
	 */
	@PostMapping("/writeSingle")
	public boolean writeSingle(@RequestParam  String addr, @RequestParam String data, @RequestParam String topic) throws InterruptedException, MqttException {
		if (ObjectUtil.isEmpty(addr)||ObjectUtil.isEmpty(data)||ObjectUtil.isEmpty(topic))return false;
		ValueOperations<String, Long> valueOperations = redisTemplate.opsForValue();
		String  message="0106"+addr+data+ModbusCRCUtils.getCRC("0106"+addr+data);
		valueOperations.set("userId",System.currentTimeMillis());
		mqttInitialized.publish(topic,message);
		for (int i = 0; i < 3; i++) {
			if (ObjectUtil.isNotEmpty(valueOperations.get(message+"dtu/in"))&&valueOperations.get(message+"dtu/in")>(valueOperations.get("userId"))){
				mqttInitialized.publish(topic,"0103012B007C35DF");//修改成功后查詢
				return true;
			}
			Thread.sleep(1500);
		}
		return false;
	}

}
3.33 CRC校驗碼計算原理
ModBus 通信協(xié)議的 CRC (冗余循環(huán)校驗碼含2個字節(jié), 即 16 位二進制數(shù))。
CRC 碼由發(fā)送設(shè)備計算, 放置于所發(fā)送信息幀的尾部。
接收信息設(shè)備再重新計算所接收信息 (除 CRC 之外的部分)的 CRC,
比較計算得到的 CRC 是否與接收到CRC相符, 如果兩者不相符, 則認為數(shù)據(jù)出錯。
1) 預(yù)置 1 個 16 位的寄存器為十六進制FFFF(即全為 1) , 稱此寄存器為 CRC寄存器。
2) 把第一個 8 位二進制數(shù)據(jù) (通信信息幀的第一個字節(jié)) 與 16 位的 CRC寄存器的低 8 位相異或, 把結(jié)果放于 CRC寄存器。
3) 把 CRC 寄存器的內(nèi)容右移一位(朝低位)用 0 填補最高位, 并檢查右移后的移出位。
4) 如果移出位為 0, 重復(fù)第 3 步 (再次右移一位); 如果移出位為 1, CRC 寄存器與多項式A001 ( 1010 0000 0000 0001) 進行異或。
5) 重復(fù)步驟 3 和步驟 4, 直到右移 8 次,這樣整個8位數(shù)據(jù)全部進行了處理。
6) 重復(fù)步驟 2 到步驟 5, 進行通信信息幀下一個字節(jié)的處理。
7) 將該通信信息幀所有字節(jié)按上述步驟計算完成后,得到的16位CRC寄存器的高、低字節(jié)進行交換。
8) 最后得到的 CRC寄存器內(nèi)容即為 CRC碼。
/**
 * <b>計算CRC16校驗碼工具類</b>
 *
 * @author Lixubo
 * @date 2023/6/2
 */
public class ModbusCRCUtils {
   /**
    * 計算CRC16校驗碼
    * @param str16
    * @return
    */
   public static String getCRC(String str16) {
      byte[] bytes = HexStringToByte(str16);
      int CRC = 0x0000ffff;
      int POLYNOMIAL = 0x0000a001;
      int i, j;
      for (i = 0; i < bytes.length; i++) {
         CRC ^= (int) bytes[i];
         for (j = 0; j < 8; j++) {
            if ((CRC & 0x00000001) == 1) {
               CRC >>= 1;
               CRC ^= POLYNOMIAL;
            } else {
               CRC >>= 1;
            }
         }
      }
      //高低位轉(zhuǎn)換,看情況使用
      CRC = ( (CRC & 0x0000FF00) >> 8) | ( (CRC & 0x000000FF ) << 8);
      return Integer.toHexString(CRC);
   }
   /**
    * 16進制字符串轉(zhuǎn)換為Byte型數(shù)組16進制源字符串
    *
    * @param
    * @return Byte類型數(shù)組
    */
   public static byte[] HexStringToByte(String hexString) {
      hexString = hexString.replace(" ", "");
      int len = hexString.length();
      if (len % 2 != 0)
         return null;
      byte[] bufD = new byte[len / 2];
      byte[] tmpBuf = hexString.getBytes();
      int i = 0, j = 0;
      for (i = 0; i < len; i++) {
         if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
            tmpBuf[i] -= 0x30;
         else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
            tmpBuf[i] -= 0x37;
         else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
            tmpBuf[i] -= 0x57;
         else
            tmpBuf[i] = 0xF;
      }
      for (i = 0, j = 0; i < len; i += 2, j++) {
         bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
      }
      return bufD;
   }

}
3.34 調(diào)整mqtt消息回調(diào)代碼加入redis
/**
 * <b></b>
 *
 * @author Lixubo
 * @date 2022/11/15
 */
@Component
@Slf4j
public class MqttHandle implements MqttCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private RedisTemplate redisTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
		redisTemplate=mqttHandle.redisTemplate;

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		ValueOperations<String, Long> valueOperations = mqttHandle.redisTemplate.opsForValue();
		String string = bytesToHexString(message.getPayload());
		if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0106")){
			valueOperations.set(string.concat(topic),System.currentTimeMillis());
			System.out.println("=========="+string+topic+"==========");
		}
	}

	/**
	 * byte[] ->string
	 * @param src
	 * @return
	 */
	public static String bytesToHexString(byte[] src){
	   StringBuilder stringBuilder = new StringBuilder("");
	    if (src == null || src.length <= 0) {
				return null;
		     }
	     for (int i = 0; i < src.length; i++) {
		        int v = src[i] & 0xFF;
		        String hv = Integer.toHexString(v);
		        if (hv.length() < 2) {
			             stringBuilder.append(0);
			         }
		         stringBuilder.append(hv);
		     }
	    return stringBuilder.toString();
	}
}

3.4 根據(jù)clientId獲取客戶端連接狀態(tài)(emqx官方提供的api)
3.41 在emqx管理后臺添加api密鑰進行訪問授權(quán)

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-2sBqFGXe-1686193465745)(C:\Users\Lixubo\AppData\Roaming\Typora\typora-user-images\image-20230606173903317.png)]

3.42 添加pom依賴
    <dependency>
      <groupId>com.squareup.okhttp3</groupId>
      <artifactId>okhttp</artifactId>
      <version>3.8.1</version>
    </dependency>
public static boolean getClientStstus(String clientId) {
//     clientId="175-4";
      try {
         /*
         服務(wù)端生成
          */
         String username = "1f408e6ac4af426a";
         String password = "qEEdNrgr4FI6q289B5wJy9BXOir25eoKdG2ygxsEu5onA";

         OkHttpClient client = new OkHttpClient();

         Request request = new Request.Builder()
               .url("http://localhost:18083/api/v5/clients/"+clientId)
               .header("Content-Type", "application/json")
               .header("Authorization", Credentials.basic(username, password))
               .build();

         Response response = client.newCall(request).execute();
         JSONObject jsonObject=JSONObject.parseObject(response.body().string());
         System.out.println(jsonObject.toString());
         if (ObjectUtil.isNotEmpty(jsonObject.getBoolean("connected"))&&jsonObject.getBoolean("connected")){
            return true;
         }
      } catch (IOException e) {
         e.printStackTrace();
      }
      return false;
   }

三、數(shù)據(jù)層使用mysql數(shù)據(jù)持久,redis作為緩存;

四、 其它注意事項

1、mqtt發(fā)布消息查詢數(shù)據(jù)如果為同一個寄存器時注意消息間隔時間必須大于Modbus 超時時間
2、Modbus 超時時間計算:

如果通訊速率為9600時,按照常規(guī)的Modbus RTU,8個數(shù)據(jù)位、1個停止位、偶校驗方式,每傳輸1個字節(jié)數(shù)據(jù)需要的時間為:(8+1+1)/9600=1.04ms/Byte

因此,主站發(fā)出響應(yīng)到從站返回數(shù)據(jù)的時間周期為:(8+5+2*n)*1.04+T1+T2,其中n為寄存器個數(shù),T1為從站的響應(yīng)時間(如果是PLC,則為PLC的掃描時間),T2為通訊余量,一般為20~50ms。如果讀取10個字的數(shù)據(jù),從站響應(yīng)時間為50ms,則整個周期為:(8+5+2x10)x1.04+50+50=134.32ms。因此,超時時間必須大于134.32ms,可以設(shè)置為150ms以上。

如果超時時間太短,響應(yīng)不能完全返回,通訊會報錯。

如果超時時間設(shè)置太長,按照上述例子,一共10個從站,每個從站讀取10個字的數(shù)據(jù),

如果超時時間設(shè)置為1s,如果有1個從站出現(xiàn)故障,則整個輪詢周期為:1000+9*134.32=2208.88 ms。

同理,如果超時時間設(shè)為150ms,則整個輪詢周期為:150+9*134.32=1358.88 ms。

因此,可以看出正確的超時時間設(shè)置可縮短整個輪詢周期,不正確的設(shè)置,將導(dǎo)致通訊出錯或整個通訊周期過長。


3、mqtt多主題訂閱通配符規(guī)則
3.1 主題層級分隔符/

/ 被用來分割主題樹的每一層,并給主題空間提供分等級的結(jié)構(gòu)。當(dāng)兩個通配符在一個主題中出現(xiàn)的時候,主題層次分隔符的使用是很重要的。

3.2 多層通配符#

#是一個匹配主題中任意層次數(shù)的通配符。比如說,如果你訂閱了finance/stock/ibm/#,你就可以接收到以下這些主題的消息。
finance/stock/ibm
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
多層通配符有可以表示大于等于0的層次。因此,finance/#也可以匹配到單獨的finance,在這種情況下#代表0層。在這種語境下主題層次分隔符/就沒有意義了。因為沒有可以分的層次。

多層通配符只可以確定當(dāng)前層或者下一層。因此,#和finance/#都是有效的,但是finance#不是有效的。多層通配符一定要是主題樹的最后一個字符。比如說,finance/#是有效的,但是finance/#/closingprice是無效的。

3.3 單層通配符+

+只匹配主題的一層。比如說,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因為單層通配符只匹配1層,finance/+不匹配finance。
單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,finance/+和finance/+/ibm都是有效的。

3.4 主題語法和用法

當(dāng)你建立一個應(yīng)用,設(shè)計主題樹的時候應(yīng)該考慮以下的主題名字的語法和語義:

主題至少有一個字符長。
主題名字是大小寫敏感的。比如說,ACCOUNTS和Accounts是兩個不同的主題。
主題名字可以包含空格。比如,Accounts payable是一個有效的主題。
以/開頭會產(chǎn)生一個不同的主題。比如說,/finnace與finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主題中包含null(Unicode \x0000)字符。
以下的原則應(yīng)用于主題樹的建造和內(nèi)容

在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
可以有任意數(shù)目的根節(jié)點;也就是說,可以有任意數(shù)目的主題樹。文章來源地址http://www.zghlxwxcb.cn/news/detail-776095.html


層。在這種語境下主題層次分隔符/就沒有意義了。因為沒有可以分的層次。

多層通配符只可以確定當(dāng)前層或者下一層。因此,#和finance/#都是有效的,但是finance#不是有效的。多層通配符一定要是主題樹的最后一個字符。比如說,finance/#是有效的,但是finance/#/closingprice是無效的。

3.3 單層通配符+

+只匹配主題的一層。比如說,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因為單層通配符只匹配1層,finance/+不匹配finance。
單層通配符可以被用于主題樹的任意層級,連帶多層通配符。它必須被用在主題層級分隔符/的右邊,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+無效。單層通配符可以用在主題樹的末端,也可以用在中間。比如說,finance/+和finance/+/ibm都是有效的。

3.4 主題語法和用法

當(dāng)你建立一個應(yīng)用,設(shè)計主題樹的時候應(yīng)該考慮以下的主題名字的語法和語義:

主題至少有一個字符長。
主題名字是大小寫敏感的。比如說,ACCOUNTS和Accounts是兩個不同的主題。
主題名字可以包含空格。比如,Accounts payable是一個有效的主題。
以/開頭會產(chǎn)生一個不同的主題。比如說,/finnace與finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主題中包含null(Unicode \x0000)字符。
以下的原則應(yīng)用于主題樹的建造和內(nèi)容

在主題樹中,長度被限制于64k內(nèi)但是在這以內(nèi)沒有限制層級的數(shù)目 。
可以有任意數(shù)目的根節(jié)點;也就是說,可以有任意數(shù)目的主題樹。


到了這里,關(guān)于java集成mqtt、rabbitmq服務(wù)遠程連接數(shù)dtu實現(xiàn)寄存器rtu數(shù)據(jù)讀寫的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 手機、電腦mqtt客戶端通過騰訊云服務(wù)器遠程連接ESP32

    手機、電腦mqtt客戶端通過騰訊云服務(wù)器遠程連接ESP32

    ????????本文將實現(xiàn): ????????1、esp32與騰訊云物聯(lián)網(wǎng)服務(wù)器通過mqtt協(xié)議通信 ????????2、電腦和手機客戶端通過mqtt與騰訊云相通信 ????????3、騰訊云服務(wù)器內(nèi)部消息轉(zhuǎn)發(fā),將手機、電腦發(fā)布的主題轉(zhuǎn)發(fā)給esp32訂閱,實現(xiàn)手機、電腦與esp32的遠程通信。 ?????

    2024年02月11日
    瀏覽(25)
  • Java基于RabbitMQ實現(xiàn)MQTT

    Java基于RabbitMQ實現(xiàn)MQTT

    要想使用MQ的MQTT服務(wù)需要先開啟MQTT服務(wù),因為RabbitMQ的MQTT默認是關(guān)閉的,具體啟動方法可以參考:rabbitmq使用mqtt協(xié)議_panda_225400的博客-CSDN博客_rabbitmq mqtt 下面具體實現(xiàn)我就直接貼代碼吧,一切說明都在代碼里面,方便直接 POM依賴 配置類 HTTP處理相關(guān)類 網(wǎng)關(guān)類 控制層測試類

    2024年02月12日
    瀏覽(16)
  • java實現(xiàn)連接遠程服務(wù)器,并可以執(zhí)行shell命令

    你可以使用Java中的SSH庫來連接遠程服務(wù)器并執(zhí)行shell命令。下面是一個簡單的示例代碼: 請注意替換 your_host , your_username , your_password 和 your_shell_command 為實際的遠程服務(wù)器信息和要執(zhí)行的shell命令。該示例代碼使用JSch庫來建立SSH連接并執(zhí)行命令。

    2024年01月20日
    瀏覽(33)
  • 內(nèi)網(wǎng)穿透-外遠程連接中的RabbitMQ服務(wù)

    內(nèi)網(wǎng)穿透-外遠程連接中的RabbitMQ服務(wù)

    RabbitMQ是一個在 AMQP(高級消息隊列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。 由erlang開發(fā)的AMQP(Advanced Message Queue 高級消息隊列協(xié)議 )的開源實現(xiàn),由于erlang 語言的高并發(fā)特性,性能較好,本質(zhì)是個隊列,F(xiàn)IFO 先入先出,里面存放的內(nèi)容

    2024年02月12日
    瀏覽(19)
  • 5G DTU實現(xiàn)燃氣管道數(shù)據(jù)采集遠程管理

    5G DTU實現(xiàn)燃氣管道數(shù)據(jù)采集遠程管理

    隨著物聯(lián)網(wǎng)技術(shù)與智慧城市的不斷發(fā)展,燃氣管道戶外組網(wǎng)的需求逐漸浮現(xiàn)。在戶外組網(wǎng)應(yīng)用中5G DTU(Data Terminal Unit)發(fā)揮著至關(guān)重要的作用。5G DTU可用于數(shù)據(jù)采集、傳輸與遠程管理,能夠?qū)崿F(xiàn)燃氣數(shù)據(jù)的單點或多點采集和傳輸,為燃氣管道系統(tǒng)的遠程管理提供了數(shù)據(jù)傳輸和

    2024年02月21日
    瀏覽(23)
  • DTU和MQTT網(wǎng)關(guān)優(yōu)缺點

    DTU和MQTT網(wǎng)關(guān)優(yōu)缺點

    目前市面上有兩種設(shè)備實現(xiàn)Modbus轉(zhuǎn)MQTT網(wǎng)關(guān)。網(wǎng)關(guān)式、DTU式。 鋇錸技術(shù)網(wǎng)關(guān)內(nèi)部進行轉(zhuǎn)換 網(wǎng)關(guān)式 優(yōu)點: 1、通訊模塊和MCU分開,通訊模塊只做通訊功能,協(xié)議轉(zhuǎn)換有單獨主控MCU,“硬轉(zhuǎn)換”; 2、數(shù)據(jù)點是通過映射到主控ARM芯片,配置更方便,點表多更加無壓力; 3、網(wǎng)關(guān)式,

    2024年02月07日
    瀏覽(22)
  • 【物聯(lián)網(wǎng)開發(fā)】-微信小程序之MQTT連接,基于MQTT實現(xiàn)設(shè)備-服務(wù)器-小程序的消息傳輸

    【物聯(lián)網(wǎng)開發(fā)】-微信小程序之MQTT連接,基于MQTT實現(xiàn)設(shè)備-服務(wù)器-小程序的消息傳輸

    想要開發(fā)微信小程序,首先要有一些基礎(chǔ)知識:html、cs、js、json等,小程序中要用到的知識框架大體相同,一個頁面包括js、json、wxml、wxss格式的文件。 由于本人此前從未接觸過小程序開發(fā),本篇文章將會以新手小白的角度一步步剖析如何使用微信小程序通過MQTT服務(wù)器連接設(shè)

    2023年04月24日
    瀏覽(97)
  • Linux本地部署Mosquitto MQTT協(xié)議消息服務(wù)端并實現(xiàn)遠程訪問【內(nèi)網(wǎng)穿透】

    Linux本地部署Mosquitto MQTT協(xié)議消息服務(wù)端并實現(xiàn)遠程訪問【內(nèi)網(wǎng)穿透】

    Mosquitto是一個開源的消息代理,它實現(xiàn)了MQTT協(xié)議版本3.1和3.1.1。它可以在不同的平臺上運行,包括Windows、Linux、macOS等。mosquitto可以用于物聯(lián)網(wǎng)、傳感器、移動應(yīng)用程序等場景,提供了一種輕量級的、可靠的、基于發(fā)布/訂閱模式的消息傳遞機制。 MQTT協(xié)議遠程訪問的好處在于

    2024年02月05日
    瀏覽(23)
  • esp8266模塊--MQTT協(xié)議連接服務(wù)器實現(xiàn)數(shù)據(jù)接收和發(fā)送+源碼

    esp8266模塊--MQTT協(xié)議連接服務(wù)器實現(xiàn)數(shù)據(jù)接收和發(fā)送+源碼

    首先推薦中國移動的代碼,我覺得中國移動的代碼更為合理:(但是有一些其他的模塊在里面) OneNET開發(fā)板代碼、資料--2020-09-27--標準板、Mini板bug修復(fù) - 開發(fā)板專區(qū) - OneNET設(shè)備云論壇 (10086.cn) 以及這位b站up做的視頻:(wifi模塊在p9節(jié)) 【挽救小白第一季】STM32+8266+小程序智能

    2024年02月08日
    瀏覽(34)
  • ESP32的MQTT AT固件燒錄+STM32以ESP32的MQTT AT固件的AT指令連接EMQX下mqtt服務(wù)器實現(xiàn)消息訂閱和發(fā)布

    ESP32的MQTT AT固件燒錄+STM32以ESP32的MQTT AT固件的AT指令連接EMQX下mqtt服務(wù)器實現(xiàn)消息訂閱和發(fā)布

    目錄 寫在前面 三種方案(利用ESP32連接EMQX下的MQTT) 步驟 ESP32燒錄固件并AT指令進行測試。 下載固件 ?燒錄工具下載 燒錄固件(選擇ESP32) ?關(guān)于AT 指令與MQTT服務(wù)器斷開后自動重連MQTT服務(wù)器 關(guān)于AT指令設(shè)置上電自動連接WIFI 關(guān)于AT指令設(shè)置斷開后自動重新連接WIFI STM32對接E

    2023年04月12日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包