物聯(lián)網(wǎng)開發(fā)終端管理篇-java從MQTT獲取設(shè)備數(shù)據(jù),并通過Druid連接池把數(shù)據(jù)寫入MySQL數(shù)據(jù)庫(kù)(Windows系統(tǒng))
下面來給大家做個(gè)簡(jiǎn)單的數(shù)據(jù)對(duì)接,也就是通過寫JAVA代碼實(shí)現(xiàn)MQTT協(xié)議
- 首頁(yè)我們得搭建一個(gè)簡(jiǎn)單的IDEA項(xiàng)目,這個(gè)我就不做演示了
- 搭建完項(xiàng)目,我們需要準(zhǔn)備一些jar包,jar包名如下:
- org.eclipse.paho.client.mqttv3-1.1.0.jar
- mysql-connector-java-5.1.34.jar
- jackson-databind-2.10.0.jar
- jackson-core-2.10.0.jar
- jackson-annotations-2.10.0.jar
- 如果是連接sqlserver數(shù)據(jù)庫(kù) 則用???jtds-1.3.1.jar
下面就是java代碼了
package com.baidai;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 接收訂閱的消息
*/
public class ClientMQTT implements MqttCallback {
public static final String HOST = "tcp://127.0.0.1:1883";//(127.0.0.1也就是EMQX的ip地址)
private static final String clientID = "clientXX";//(這個(gè)clientXX 可以隨便寫)
private String TOPIC= "testtopic";//(這個(gè)testtopic 是EMQX的訂閱主題,如果你對(duì)接別的數(shù)據(jù),別人給了你主題,改這個(gè)就行)
private MqttClient client;
private MqttConnectOptions options;
private String user = "admin";//(連接登錄EMQX的賬號(hào))
private String password = "xiaofang";//(連接登錄EMQX的密碼)
private String driverName = "com.mysql.cj.jdbc.Driver";//(連接MySQL數(shù)據(jù)庫(kù))
//private String driverName = "net.sourceforge.jtds.jdbc.Driver";//(連接SQLServer數(shù)據(jù)庫(kù))
private String url = "";//根據(jù)不同數(shù)據(jù)庫(kù)填寫自己的數(shù)據(jù)庫(kù)地址
private String userName = "";//填寫自己的數(shù)據(jù)庫(kù)名稱
private String userPwd = "";//數(shù)據(jù)庫(kù)對(duì)應(yīng)密碼
/*連接MQtt*/
public void clientStart(){
try {
client = new MqttClient(HOST,clientID,new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(10);
options.setConnectionTimeout(50);
options.setUserName(user);
options.setPassword(password.toCharArray());
client.setCallback(new ClientMQTT());
MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息
options.setWill(topic,"close".getBytes(),1,true);
client.connect(options);
int[] Qos = {1};
String[] topic1 = {TOPIC};
client.subscribe(topic1,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 連接數(shù)據(jù)庫(kù)
*/
public void connection(){
try {
Class.forName(driverName);
System.out.println("連接成功!!!");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.out.println("驅(qū)動(dòng)加載失敗");
}
try {
//在這里可以先測(cè)試數(shù)據(jù)庫(kù)能不能連接
Connection dbcon = DriverManager.getConnection(url,userName,userPwd);
System.out.println("數(shù)據(jù)庫(kù)連接成功!");
System.out.println("數(shù)據(jù)庫(kù)連接成功!");
} catch (SQLException e) {
e.printStackTrace();
System.out.println("連接失敗");
}
}
@Override
public void connectionLost(Throwable throwable) {
System.out.println(throwable);
//連接斷掉會(huì)執(zhí)行到這里
System.out.println("連接以斷,請(qǐng)重新連接?。?!");
}
//接收EMQX上訂閱主題的數(shù)據(jù)
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
try {
//獲取消息返回格式
String msg = new String(mqttMessage.getPayload());
if(msg.equals("close")){
return;
}
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(msg);
String info= jsonNode.get("info").toString();
String time= jsonNode.get("time").toString().replaceAll("\"", "").replaceAll("/","");
//格式化接收過來的時(shí)間
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmss");
Date productTime = simpleDateFormat.parse(timeStamp);
String infoTwo = jsonNode.get("infoTwo ").toString();
//按照對(duì)方返回過來的格式接收數(shù)據(jù)
JsonNode infoList = objectMapper.readTree(info);
JsonNode infoTwoList = objectMapper.readTree(infoTwo);
for (JsonNode dataJsonNode : infoList) {
//這里就省略了
}
for (JsonNode dataJsonNode : infoTwoList) {
//這里就省略了
}
//連接數(shù)據(jù)庫(kù)
Connection dbcon = DriverManager.getConnection(url,userName,userPwd);
Statement stmt = dbcon.createStatement();
//如果是SqlServe不能自動(dòng)生成id,可以用這個(gè)生成一個(gè)隨機(jī)id
ResultSet rs = stmt.executeQuery("select REPLACE(NEWID(), '-', '') as Id");
String id="";
while(rs.next()) {
id=rs.getString("Id").toString();
}
String sql="insert into 表的名稱 (id,name,date,number,totalNumber,nowDate)"+
"values(?,?,?,?,?,?)";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
PreparedStatement preparedStatement = dbcon.prepareStatement(sql);//預(yù)編譯下SQL語(yǔ)句
preparedStatement.setString( 1,id);
preparedStatement.setString( 2,"測(cè)試");
preparedStatement.setString( 3, dateFormat.format(time));//獲取時(shí)間
preparedStatement.setInt( 4,number);//數(shù)量
preparedStatement.setDouble( 5,totalNumber);//總數(shù)量
preparedStatement.setString( 6,dateFormat.format(System.currentTimeMillis()));//獲取當(dāng)前時(shí)間
//這里是執(zhí)行上面的sql語(yǔ)句的方法
preparedStatement.executeUpdate();
//subscribe后會(huì)執(zhí)行到這里
System.out.println("訂閱消息的主題是:"+s);
System.out.println("消息的ID是:"+mqttMessage.getId());
System.out.println("添加成功:"+msg);
System.out.println("添加成功SQl語(yǔ)句:"+preparedStatement);
}catch (Exception e){
System.out.println("插入錯(cuò)誤信息:"+e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//publish可以執(zhí)行到這里
System.out.println("This is deliveryComplete method----->"+iMqttDeliveryToken.isComplete());
}
public static void main(String[] args) {
ClientMQTT clientMQTT = new ClientMQTT();
clientMQTT.clientStart();
//在這里可以先測(cè)試數(shù)據(jù)庫(kù)能不能連接
//clientMQTT.connection();
}
}
上面的代碼寫完,下面我們?cè)撛趺醋屔厦娴拇a可以直接在服務(wù)器上面跑,不用自己一直啟動(dòng)idea項(xiàng)目呢???
生成可執(zhí)行jar包, 并安裝運(yùn)行到服務(wù)器
1.停止運(yùn)行
2.因?yàn)槌绦蛞呀?jīng)設(shè)置過生成jar包,所以用戶直接按照下面操作即可生成可執(zhí)行jar包.
然后找到idea設(shè)置的Show Excluded Files
然后可能就會(huì)生成Out文件或者class文件。文件是橙黃色的,然后右鍵Show in Explorer
然后找到生成的jar包的位置
如果服務(wù)器沒有安裝JDK,請(qǐng)用戶先安裝JDK,然后把先前生成的jar包拷貝到服務(wù)器
下一步就是打開命令窗口 cmd
進(jìn)入到j(luò)ar包目錄
執(zhí)行 java -jar MqttDataToMySQL.jar
打印所有設(shè)備的數(shù)據(jù),說明已經(jīng)運(yùn)行起來了
讓程序在后臺(tái)運(yùn)行
1. 關(guān)閉
2. 注意呀!不要犯常識(shí)性錯(cuò)誤!把文件擴(kuò)展名選中!
3. 在jar包目錄新建一個(gè)后綴名字為.bat的文件
編輯里面的內(nèi)容如下(MqttDataToMySQL.jar 就是jar包的名字)
@echo off
start javaw -jar MqttDataToMySQL.jar
exit
4. 雙擊運(yùn)行即可
5. 如果要關(guān)閉,找到任務(wù)管理器java運(yùn)行進(jìn)程,關(guān)閉即可
6.也可以指定JDK路徑運(yùn)行
C:\java8\jdk1.8.0.131 為JDK的安裝路徑
@echo off
set JAVA_HOME=C:\java8\jdk1.8.0.131
set CLASSPATH=.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOMe%\lib\tools.jar;
set Path=%JAVA_HOME%\bin;
start javaw -jar MqttDataToMySQL.jar
exit
在服務(wù)器運(yùn)行Jar包可能會(huì)出現(xiàn)的錯(cuò)誤:
“Exception in thread “main” java.lang.SecurityException: Invalid signature file digest for Manifest”
解決辦法
找到你打的jar包META-INF中后綴為.SF和.RSA文件并且刪除,然后重新運(yùn)行,就會(huì)成功?。。?!趕緊去試試吧?。?!
文章來源:http://www.zghlxwxcb.cn/news/detail-671607.html
如果還不可以的話,你可以自行查看別人解決的辦法點(diǎn)擊查看解決方法
如果你想多次運(yùn)行不同的jar包,你得改java代碼的“clientID”,然后再重新打jar包,取不同的jar包名稱,然后放到服務(wù)器上面運(yùn)行
文章來源地址http://www.zghlxwxcb.cn/news/detail-671607.html
到了這里,關(guān)于物聯(lián)網(wǎng)開發(fā)終端管理篇-java從MQTT獲取設(shè)備數(shù)據(jù),并通過Druid連接池把數(shù)據(jù)寫入MySQL數(shù)據(jù)庫(kù)(Windows系統(tǒng))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!