- mqtt服務(wù)器的選擇與安裝
emqx擁有界面,可視化比較好,但是windows下安裝有問題,后面采用虛擬機(jī)安裝沒問題
mosquitto:windows下安裝簡(jiǎn)單,使用也簡(jiǎn)單,但是功能比較單一,只能通過命令操作,無界面
2.mosquitto的安裝:
windows下搭建mqtt服務(wù)器
3.java相關(guān)坐標(biāo)
<!--mqtt連接相關(guān)-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--ssl相關(guān),如果不使用ssl連接可以不引入 -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>
4.創(chuàng)建客戶端:發(fā)布客戶端、訂閱客戶端
import com.hanqian.common.queue.MessageEvent;
import com.hanqian.common.queue.MessageQueue;
import com.hanqian.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
/**
* MQTT 訂閱客戶端
* 本地mqtt服務(wù)器使用 mosquitto 測(cè)試命令
* 啟動(dòng):mosquitto -c mosquitto.conf -v
* 訂閱:mosquitto_sub.exe -h 127.0.0.1 -p 7777 -v -t sensor
* 發(fā)布:mosquitto_pub.exe -h 127.0.0.1 -p 7777 -t topic_hq -m "消息測(cè)試2323"
*/
@Configuration
@Slf4j
@ConditionalOnProperty(value = "mqtt.mqttIsLoad",havingValue="true")
public class MqttSubscribeClient {
/**
* MQtt服務(wù)器地址
*/
@Value("${mqtt.host}")
private String host;
/**
* 訂閱主題
*/
@Value("${mqtt.topic}")
private String[] topic;
/**
* 連接mqtt服務(wù)器時(shí)使用的id,一般唯一。
* 不特殊指定可使用 MqttClient.generateClientId();
*/
@Value("${mqtt.clientId}")
private String clientId;
/**
* 設(shè)置是否清空session, 需要與qos配合
* 設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
*/
@Value("${mqtt.isCleanSession}")
private boolean isCleanSession;
/**
* 消息質(zhì)量Qos:有0、1、2三個(gè)級(jí)別,默認(rèn)用0,
* QoS0:至多一次;Sender 發(fā)送的一條消息,Receiver 最多能收到一次,如果發(fā)送失敗,也就算了。
* QoS1:至少一次;Sender 發(fā)送的一條消息,Receiver 至少能收到一次,如果發(fā)送失敗,會(huì)繼續(xù)重試,
* 直到 Receiver 收到消息為止,但Receiver 有可能會(huì)收到重復(fù)的消息
* QoS2:確保只有一次。Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,會(huì)繼續(xù)重試,
* 直到 Receiver 收到消息為止,同時(shí)保證 Receiver 不會(huì)因?yàn)橄⒅貍鞫盏街貜?fù)的消息。
* QoS=1通訊時(shí)的注意事項(xiàng)
* 接收端連接服務(wù)端時(shí)cleanSession設(shè)置為false
* 接收端訂閱主題時(shí)QoS=1
* 發(fā)布端發(fā)布消息時(shí)QoS=1
*
* QoS=2通訊時(shí)的注意事項(xiàng)
* 接收端連接服務(wù)端時(shí)cleanSession設(shè)置為false
* 接收端訂閱主題時(shí)QoS=2
* 發(fā)布端發(fā)布消息時(shí)QoS=2
*/
@Value("${mqtt.qos}")
private int[] qos;
/**
* 超時(shí)時(shí)間 單位為秒
*/
@Value("${mqtt.connectionTimeout}")
private int connectionTimeout;
/**
* 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制
*/
@Value("${mqtt.keepAliveInterval}")
private int keepAliveInterval;
/**
* 是否設(shè)置遺囑,只能設(shè)置一個(gè)主題且topic不可以帶通配符
*/
@Value("${mqtt.isWill}")
private boolean isWill;
@Value("${mqtt.userName}")
private String userName;
@Value("${mqtt.passWord}")
private String passWord;
@Value("${mqtt.ssl.cacert}")
private String cacert;
@Value("${mqtt.ssl.clientCert}")
private String clientCert;
@Value("${mqtt.ssl.clientKey}")
private String clientKey;
@Value("${mqtt.ssl.sslPassWord}")
private String sslPassWord;
@Value("${mqtt.ssl.isTwoWay}")
private boolean isTwoWay;
private MqttConnectOptions options;
private MqttClient client;
@Autowired
private MessageQueue messageQueue;
@Bean
public MqttClient start(){
try {
// host為主機(jī)名,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存
client = new MqttClient(host, clientId, new MemoryPersistence());
// MQTT的連接設(shè)置
options = new MqttConnectOptions();
// 自動(dòng)重連,10分鐘內(nèi) or connectionLost 方法
//options.setAutomaticReconnect(true);
//options.setMaxReconnectDelay(600000);
options.setCleanSession(isCleanSession);
if (StringUtil.isNotEmpty(userName)){
options.setUserName(userName);
}
if ( StringUtil.isNotEmpty(passWord)){
options.setPassword(passWord.toCharArray());
}
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
if (host.indexOf("ssl")>=0){
if (!isTwoWay){
// 單向ssl,這里的 TrustManager 是自己實(shí)現(xiàn)的,沒有去校驗(yàn)服務(wù)端的證書
options.setHttpsHostnameVerificationEnabled(false);
TrustManager[] trustAllCerts = new TrustManager[1];
TrustManager tm = new MyTM();
trustAllCerts[0] = tm;
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, null);
SocketFactory factory = sc.getSocketFactory();
options.setSocketFactory(factory);
//SSLSocketFactory socketFactory = SSLUtils.getSingleSocketFactory(cacert);
//options.setSocketFactory(socketFactory);
}else {
雙向
SSLSocketFactory socketFactory = SSLUtils.getSocketFactory(cacert, clientCert, clientKey,
sslPassWord);
options.setSocketFactory(socketFactory);
}
}
// 設(shè)置回調(diào)
client.setCallback(new MQTTCallback());
if (isWill){
for (String s : topic) {
MqttTopic mqttTopic = client.getTopic(s);
String message = "我要關(guān)閉拉,客戶端:"+clientId + "主題是:"+s;
options.setWill(mqttTopic, message.getBytes(), 2, true);
}
}
client.connect(options);
return client;
}catch (Exception e){
log.error("[MQTT]訂閱客戶端連接出錯(cuò):{}",e);
return null;
}
}
class MQTTCallback implements MqttCallback, MqttCallbackExtended {
/**
* 連接成功后調(diào)用
*/
@Override
public void connectComplete(boolean b, String host) {
try {
//訂閱消息: 單個(gè) or 多個(gè)(topic與qos個(gè)數(shù)需一致,否則報(bào)錯(cuò):已在進(jìn)行連接)
client.subscribe(topic, qos);
} catch (MqttException e) {
log.error("[MQtt]訂閱主題失?。簕}",e);
}
log.info("[MQTT]連接完成:{},地址:{}",b,host);
}
/**
* 連接斷開(也可在此配置重連)
*/
public void connectionLost(Throwable cause) {
log.error("[MQTT]:連接斷開:{}", cause.getMessage());
// 需要先成功連接一次,斷開連接才能進(jìn)入此方法
while (!client.isConnected()) {
try {
client.reconnect();
if (client.isConnected()){
log.info("[MQTT]重新連接成功");
break;
}
log.info("[MQTT]重新連接失敗,服務(wù)器未開啟");
} catch (MqttException e) {
log.error("[MQTT]重連失敗,錯(cuò)誤信息:{}", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("[MQTT]重連失敗每隔五秒嘗試:{}", e);
}
}
}
/**
* 訂閱接收消息
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("[MQTT]:訂閱主題:{},接受消息Qos:{},接受消息內(nèi)容:{}",topic,message.getQos(),new String(message.getPayload()));
MessageEvent event = new MessageEvent();
Map map = new HashMap<>();
map.put("topic",topic);
map.put("message",new String(message.getPayload()));
event.setParam(map);
event.addHandlerType(MessageEvent.HandlerType.MESSAGE);
event.setBeanName("mqttReceiveMessage");
messageQueue.add(event);
}
/**
* 消息傳遞成功
* @param token
*/
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("[MQTT]:deliveryComplete:{}",token.isComplete());
}
}
/**
* MyTM 是自己實(shí)現(xiàn)的認(rèn)證管理類,里面并有校驗(yàn)服務(wù)端的證書就返回true,永久成功!
*/
static class MyTM implements TrustManager, X509TrustManager {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public boolean isServerTrusted(X509Certificate[] certs) {
return true;
}
public boolean isClientTrusted(X509Certificate[] certs) {
return true;
}
@Override
public void checkServerTrusted(X509Certificate[] certs, String authType)
throws CertificateException {
return;
}
@Override
public void checkClientTrusted(X509Certificate[] certs, String authType)
throws CertificateException {
return;
}
}
/**
* 發(fā)送消息
*/
public static void main(String[] args) throws Exception {
MqttClient client = new MqttClient("ssl://127.0.0.1:7777", "xdcfsdfsfsdfsdfs", new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName("admin");
options.setPassword("admin".toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
// 單向ssl,這里的 TrustManager 是自己實(shí)現(xiàn)的,沒有去校驗(yàn)服務(wù)端的證書
//options.setHttpsHostnameVerificationEnabled(false);
//TrustManager[] trustAllCerts = new TrustManager[1];
//TrustManager tm = new MyTM();
//trustAllCerts[0] = tm;
//SSLContext sc = SSLContext.getInstance("SSL");
//sc.init(null, trustAllCerts, null);
//SocketFactory factory = sc.getSocketFactory();
//options.setSocketFactory(factory);
SSLSocketFactory socketFactory = SSLUtils.getSocketFactory("D:\\yyb_ssl\\new\\ca.crt",
"D:\\yyb_ssl\\new\\client.crt", "D:\\yyb_ssl\\new\\client.key",
"123456");
options.setSocketFactory(socketFactory);
try {
client.setCallback(null);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
MqttMessage message = new MqttMessage();
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload("hello,你好222阿".getBytes());
//MqttTopic topic11 = client.getTopic("topic_hq");
//MqttDeliveryToken token = topic11.publish(message);
//token.waitForCompletion();
client.publish("topic_hq2",message);
}
}
5.SSLUtils
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
public class SSLUtils {
public static SSLSocketFactory getSingleSocketFactory(String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile))));
//BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
public static SSLSocketFactory getSocketFactory(final String caCrtFile,
final String crtFile, final String keyFile, final String password)
throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
X509Certificate caCert = null;
FileInputStream fis = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(fis);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
// load client certificate
bis = new BufferedInputStream(new FileInputStream(crtFile));
X509Certificate cert = null;
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
}
// load client private key
PEMParser pemParser = new PEMParser(new FileReader(keyFile));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
PEMParser pemParser2 = new PEMParser(new FileReader("D:\\yyb_ssl\\client\\client.pem"));
Object object2 = pemParser2.readObject();
PEMKeyPair pemKeyPair = new PEMKeyPair((SubjectPublicKeyInfo) object2, (PrivateKeyInfo) object);
KeyPair key = converter.getKeyPair(pemKeyPair);
pemParser.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
// client key and certificates are sent to server, so it can authenticate
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}
6.配置文件相關(guān)配置文章來源:http://www.zghlxwxcb.cn/news/detail-861901.html
#MQTT相關(guān)配置
mqtt:
host: ssl://127.0.0.1:7777
#需要與qos個(gè)數(shù)一致
topic: topic_hq,topic_hq2
Qos: 0,0
isCleanSession: false
clientId: client_test
connectionTimeout: 10
keepAliveInterval: 20
isWill: true
userName: admin
passWord: public
#配置是否加載
mqttIsLoad: true
ssl:
isTwoWay: true
cacert: D:\yyb_ssl\new\ca.crt
clientCert: D:\yyb_ssl\new\client.crt
clientKey: D:\yyb_ssl\new\client.key
sslPassWord:
7.ssl證書的生成:必須具備主題備用名稱(Subject Alternative Name)文章來源地址http://www.zghlxwxcb.cn/news/detail-861901.html
1.生成CA證書
1.1創(chuàng)建CA證書私鑰
openssl genrsa -out ca.key 2048
1.2.請(qǐng)求證書 證數(shù)各參數(shù)含義如下
C—–國(guó)家(Country Name)
ST—-省份(State or Province Name)
L—-城市(Locality Name)
O—-公司(Organization Name)
OU—-部門(Organizational Unit Name)
CN—-產(chǎn)品名(Common Name)
emailAddress—-郵箱(Email Address)
openssl req -new -sha256 -key ca.key -out ca.csr -subj "/C=CN/ST=SZ/L=SZ/O=C.X.L/OU=C.X.L/CN=CA/emailAddress=123456@test.com"
1.3.自簽署證書
openssl x509 -req -days 36500 -sha256 -extensions v3_ca -signkey ca.key -in ca.csr -out ca.crt
2.生成服務(wù)端證書
2.1.創(chuàng)建服務(wù)器私鑰
openssl genrsa -out server.key 2048
2.2新建 openssl.cnf 文件,
req_distinguished_name :根據(jù)情況進(jìn)行修改,
alt_names:BROKER_ADDRESS 修改為 EMQ X 服務(wù)器實(shí)際的 IP 或 DNS 地址,例如:IP.1 = 127.0.0.1,或 DNS.1 = broker.xxx.com
注意:IP 和 DNS 二者保留其一即可,如果已購(gòu)買域名,只需保留 DNS 并修改為你所使用的域名地址。
[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = SZ
organizationName = C.X.L
organizationalUnitName = C.X.L
commonName = service
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 127.0.0.1
IP.2 = 192.168.5.249
2.3請(qǐng)求證書
openssl req -new -sha256 -key server.key -config openssl.cnf -out server.csr
2.4.使用CA證書簽署服務(wù)器證書
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
2.5.驗(yàn)證服務(wù)端證書
openssl verify -CAfile ca.crt server.crt
2.6查看服務(wù)端證書
openssl x509 -noout -text -in server.crt
2.7.Netty需要支持PKCS8格式讀取私鑰
openssl pkcs8 -topk8 -nocrypt -in server.key -out pkcs8_key.pem
**注:**錯(cuò)誤日志也很明確的打印了:at sun.security.pkcs.PKCS8Key.decode(PKCS8Key.java:351),采用PKCS8無法解析證書。這是因?yàn)椴糠諱QTT broker使用的是netty,netty默認(rèn)使用PKCS8格式對(duì)證書進(jìn)行解析,然而我們使用openssl生成的服務(wù)端server.key是PKCS1格式的,所以MQTT broker采用PKCS8無法對(duì)證書進(jìn)行解析。
問題處理:對(duì)證書進(jìn)行格式轉(zhuǎn)行,將PKCS1格式轉(zhuǎn)換成PKCS8即可。
證書格式區(qū)別:
PKCS1的文件頭格式 —–BEGIN RSA PRIVATE KEY—–
PKCS8的文件頭格式 —–BEGIN PRIVATE KEY—–
生成客戶端證書
1.生成客戶端私鑰
openssl genrsa -out client.key 2048
2.請(qǐng)求證書
openssl req -new -sha256 -key client.key -out client.csr -subj "/C=CN/ST=SZ/L=SZ/O=C.X.L/OU=C.X.L/CN=CLIENT/emailAddress=123456@test.com"
3.使用CA證書簽署客戶端證書
openssl x509 -req -days 36500 -sha256 -extensions v3_req -CA ca.cer -CAkey ca.key -CAserial ca.srl -CAcreateserial -in client.csr -out client.crt
4.驗(yàn)證服務(wù)端證書
openssl verify -CAfile ca.crt client.crt
5.查看服務(wù)端證書
openssl x509 -noout -text -in client.crt
證書轉(zhuǎn)換
CRT轉(zhuǎn)為PEM
#.key 轉(zhuǎn)換成 .pem:
openssl rsa -in server.key -out server-key.pem
#.crt 轉(zhuǎn)換成 .pem:
openssl x509 -in server.crt -out server.pem -outform PEM
既然PEM與DER只是編碼格式上的不同,那么不管是證書還是密鑰,都可以隨意轉(zhuǎn)換為想要的格式:
PEM轉(zhuǎn)DER
openssl x509 -outform der -in server.pem -out server.der
DER專PEM
openssl x509 -inform der -in server.der -out server.crt
注:也可以直接生成PEM格式的證書,生成方式和CRT一樣
區(qū)別:
// 生成CA證書
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem
// 生成服務(wù)端證書
openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out server.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
// 生成客戶端證書
openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem
到了這里,關(guān)于java連接mqtt(tcp、ssl單雙向)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!