需要spring-boot集成spring-integration-mqtt代碼的直接跳到第5部分
1.MQTT介紹
1.1 MQTT是什么呢?
message queue telemetry translation
是一種基于發(fā)布與訂閱的輕量級(jí)消息傳輸協(xié)議.適用于低帶寬或網(wǎng)絡(luò)不穩(wěn)定的物聯(lián)網(wǎng)應(yīng)用.開發(fā)者可以使用極少的代碼來(lái)實(shí)現(xiàn)物聯(lián)網(wǎng)設(shè)備之間的消息傳輸.mqtt協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng),移動(dòng)互聯(lián)網(wǎng),智能硬件,車聯(lián)網(wǎng),遠(yuǎn)程醫(yī)療,電力石油等領(lǐng)域
1.2 mqtt必須具備一下幾點(diǎn)優(yōu)勢(shì):
簡(jiǎn)單易實(shí)現(xiàn)
消息傳遞可靠,支持QoS
輕量省帶寬
數(shù)據(jù)無(wú)關(guān)性,不關(guān)心數(shù)據(jù)格式
心跳模式(時(shí)刻感知客戶端狀態(tài))
1.3 MQTT與HTTP協(xié)議的區(qū)別
mqtt最小報(bào)文僅為2字節(jié),比http占用更少的網(wǎng)絡(luò)開銷
mqtt基于發(fā)布訂閱模型,http基于請(qǐng)求相應(yīng).mqtt支持雙工通信,http不支持
mqtt是有狀態(tài)的,http是無(wú)狀態(tài)的
mqtt具有斷開重連機(jī)制,http不支持
1.4 可靠的消息傳遞(QoS)
MQTT 協(xié)議提供了 3 種消息服務(wù)質(zhì)量等級(jí)(Quality of Service),保證了在不同的網(wǎng)絡(luò)環(huán)境下消息傳遞的可靠性
QoS 0:消息最多傳遞一次
如果當(dāng)時(shí)客戶端不可用,則會(huì)丟失該消息。發(fā)布者發(fā)送一條消息之后,就不再關(guān)心它有沒(méi)有發(fā)送到對(duì)方,也不設(shè)置任何重發(fā)機(jī)制
QoS 1:消息傳遞至少 1 次
包含了簡(jiǎn)單的重發(fā)機(jī)制,發(fā)布者發(fā)送消息之后等待接收者的 ACK,如果沒(méi)收到 ACK 則重新發(fā)送消息。這種模式能保證消息至少能到達(dá)一次,但無(wú)法保證消息重復(fù)
QoS 2:消息僅傳送一次
設(shè)計(jì)了重發(fā)和重復(fù)消息發(fā)現(xiàn)機(jī)制,保證消息到達(dá)對(duì)方并且嚴(yán)格只到達(dá)一次
2.docker部署一個(gè)emqx容器
安裝運(yùn)行emqx鏡像
docker run -itd --restart=always --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
安裝完成之后,我們相當(dāng)于有了一個(gè)mqtt的服務(wù)器
看一下運(yùn)行日志:docker logs -f emqx
[root@VM-4-3-centos dockerfile]# docker logs -f emqx
WARNING: Default (insecure) Erlang cookie is in use.
WARNING: Configure node.cookie in /opt/emqx/etc/emqx.conf or override from environment variable EMQX_NODE__COOKIE
WARNING: Use the same config value for all nodes in the cluster.
EMQX_RPC__PORT_DISCOVERY [rpc.port_discovery]: manual
EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE [log.file_handlers.default.enable]: false
EMQX_LOG__CONSOLE_HANDLER__ENABLE [log.console_handler.enable]: true
EMQX_NODE__NAME [node.name]: emqx@172.17.0.7
Listener ssl:default on 0.0.0.0:8883 started.
Listener tcp:default on 0.0.0.0:1883 started.
Listener ws:default on 0.0.0.0:8083 started.
Listener wss:default on 0.0.0.0:8084 started.
Listener http:dashboard on :18083 started.
EMQX 5.0.16 is running now!
此時(shí)mqtt安裝完畢,上述的四個(gè)端口只要防火墻允許通過(guò),我們就可以用mqtt官方提供的客戶端MQTTX進(jìn)行連接了
查詢啟動(dòng)的容器 docker ps
刪除容器 docker rm -f 容器名
3.客戶端工具M(jìn)QTTBox和MQTTX安裝(2選1)
3.1 MQTTBox安裝
安裝地址:http://workswithweb.com/mqttbox.html
如果無(wú)法訪問(wèn),可以百度云獲取
鏈接:https://pan.baidu.com/s/1HKd7qfHmezBwY6DXif9E1g
提取碼:sei6
解壓后找到MQTTBox.exe運(yùn)行即可
客戶端連接
3.2 MQTTX安裝
安裝地址:https://mqttx.app/zh
客戶端連接
4.發(fā)布訂閱測(cè)試(MQTTBOX)
mqttbox的好處是發(fā)布訂閱在一個(gè)頁(yè)面,便于觀察,發(fā)布的topic是emqx/123/test,訂閱的時(shí)候用emqx/+/test訂閱,+是通配符
5.spring-boot項(xiàng)目基于spring-integration-mqtt實(shí)現(xiàn)
代碼基于spring-integration-mqtt實(shí)現(xiàn).spring-integration-mqtt內(nèi)依賴了org.eclipse.paho.client.mqttv3(這個(gè)是Paho Java原生庫(kù))
下面的內(nèi)容足以應(yīng)對(duì)開發(fā)需求,實(shí)現(xiàn)了并發(fā)處理消息的能力,提供了回復(fù)消息的思路(往指定topic發(fā)送消息,讓另一個(gè)客戶端進(jìn)行接收)
5.1 application.yml
mqtt:
protocol: tcp
host: yourHost
port: 1883
username: admin
password: admin
client-id: admin
inbound-topic: emqx/+/test,emqx/+/test_reply,emqx/young/+
keep-alive-interval: 60
connection-timeout: 120
5.2 pom.xml
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.4.11</version>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
//核心依賴就這個(gè)
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.5</version>
</dependency>
</dependencies>
5.3 代碼實(shí)現(xiàn)
大致的邏輯
1.通過(guò)協(xié)議ip端口等參數(shù)進(jìn)行mqtt連接
2.定義一個(gè)訂閱者客戶端進(jìn)行數(shù)據(jù)統(tǒng)一接入
3.定義一個(gè)topic路由器進(jìn)行消息分發(fā),每一個(gè)topic對(duì)應(yīng)一個(gè)channel
4.接收消息
5.定義一個(gè)發(fā)布者客戶端進(jìn)行消息發(fā)布
6.topic訂閱與取消訂閱
5.3.1 配置類(5個(gè))
MqttConnectConfiguration:mqtt連接配置
MqttInboundConfiguration:訂閱消息的客戶端
MqttOutboundConfiguration:發(fā)布消息的客戶端
MqttMessageChannel:消息通道
ThreadPoolOfInbound:統(tǒng)一入棧的線程池,處理并發(fā)消息
/**
* @Description: mqtt連接配置
* @Author: young
* @Date: 2023/2/15 16:26
*/
@Configuration
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttConnectConfiguration {
//協(xié)議
private String protocol;
//ip
private String host;
//端口
private Integer port;
private String username;
private String password;
//客戶端id
private String clientId;
//客戶端連接時(shí)需要自動(dòng)訂閱的topic,多個(gè)用逗號(hào)分割
private String inboundTopic;
//心跳時(shí)間:默認(rèn)60s,如果該時(shí)間內(nèi)客戶端沒(méi)有收到消息,客戶端ping一次服務(wù)端,判斷服務(wù)端是否宕機(jī)
private Integer keepAliveInterval;
//定義了客戶端等待建立到MQTT服務(wù)器的網(wǎng)絡(luò)連接的最大時(shí)間間隔,默認(rèn)超時(shí)為30秒
private Integer connectionTimeout;
@Bean
//連接對(duì)象
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
serviceUrl = new StringBuilder()
.append(protocol.trim())
.append("://")
.append(host.trim())
.append(":")
.append(port)
.toString();
mqttConnectOptions.setServerURIs(new String[]{serviceUrl});
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
//重連不清除session
mqttConnectOptions.setCleanSession(false);
return mqttConnectOptions;
}
@Bean
//客戶端工廠
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//建立連接
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
//tcp://127.0.0.1:1883
private String serviceUrl;
}
/**
* @Description: 訂閱消息的客戶端
* @Author: young
* @Date: 2023/2/15 16:51
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
@Resource
private MqttConnectConfiguration mqttConnectConfiguration;
@Resource
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
/**
* 消費(fèi)者訂閱消息
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
//消息適配器
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConnectConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(),
mqttClientFactory, mqttConnectConfiguration.getInboundTopic().split(","));
//消息轉(zhuǎn)換器
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
//統(tǒng)一字節(jié)傳輸
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
//只接收一次
adapter.setQos(2);
//入棧的消息統(tǒng)一交給inbound通道處理
adapter.setOutputChannel(inboundChannel);
return adapter;
}
/**
* 非法主題數(shù)據(jù)進(jìn)入
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
public MessageHandler defaultInboundHandler() {
return message -> {
log.info("默認(rèn)通道接收到數(shù)據(jù)但無(wú)法處理,topic:{},payload:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), new String((byte[]) message.getPayload()));
};
}
}
/**
* @Description: 發(fā)布消息的客戶端
* @Author: young
* @Date: 2023/2/15 17:00
*/
@Configuration
public class MqttOutboundConfiguration {
@Resource
private MqttConnectConfiguration mqttConnectConfiguration;
@Resource
private MqttPahoClientFactory mqttClientFactory;
/**
* 生產(chǎn)者發(fā)布消息
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttConnectConfiguration.getClientId() + "_producer_" + System.currentTimeMillis(),
mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
messageHandler.setAsync(true);
//只發(fā)送一次,不關(guān)心是客戶端(訂閱者)是否接收到
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}
}
/**
* @Description: 消息的通道,通過(guò)ExecutorChannel引入線程池,可以并發(fā)接收數(shù)據(jù)
* @Author: young
* @Date: 2023/2/15 16:35
*/
@Configuration
public class MqttMessageChannel {
// @Bean(name = ChannelName.INBOUND)
// public MessageChannel inboundChannel() {
// return new DirectChannel();
// }
@Resource(name = "inboundThreadPool")
private Executor inboundThreadPool;
//通過(guò)ExecutorChannel引入線程池,可以并發(fā)接收數(shù)據(jù)
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new ExecutorChannel(inboundThreadPool);
}
@Bean(name = ChannelName.DEFAULT)
public MessageChannel defaultChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.OUTBOUND)
public MessageChannel outboundChannel() {
return new DirectChannel();
}
//每一個(gè)指定的通道(或者topic)都可以指定一個(gè)線程池來(lái)并發(fā)處理接收的消息,這里只在inbound中做并發(fā)處理
@Bean(name = ChannelName.TEST)
public MessageChannel testChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.TEST_REPLY)
public MessageChannel testReplyChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.YOUNG)
public MessageChannel youngChannel() {
return new DirectChannel();
}
}
/**
* @Description: 統(tǒng)一入棧的線程池
* @Author: young
* @Date: 2023/2/17 16:30
*/
@Component
public class ThreadPoolOfInbound {
@Value("${thread.pool.core-pool-size: 20}")
private int corePoolSize;
@Value("${thread.pool.maximum-pool-size: 40}")
private int maximumPoolSize;
@Value("${thread.pool.keep-alive-time: 120}")
private long keepAliveTime;
@Value("${thread.pool.queue.capacity: 2000}")
private int capacity;
@Bean("inboundThreadPool")
public Executor inboundThreadPool() {
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(capacity),
//線程名前綴
new CustomizableThreadFactory("inboundThreadPool-"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
}
5.3.2 topic聲明
TopicConstant :topic常量類,可自定義正則
TopicEnum:topic枚舉,關(guān)鍵的一個(gè)類,根據(jù)topic正則匹配到對(duì)應(yīng)的channel中
/**
* @Description: topic 常量
* @Author: young
* @Date: 2023/2/16 11:02
*/
public class TopicConstant {
public static final String EMQX = "emqx/";
public static final String YOUNG = "young/";
public static final String TEST = "/test";
public static final String REPLY = "_reply";
public static final String REGEX = "[A-Za-z0-9]+";
}
/**
* @Description: 主題枚舉
* @Author: young
* @Date: 2023/2/15 17:13
*/
@Getter
public enum TopicEnum {
//^限制開始位,$限制結(jié)束位
TEST(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + "$"), ChannelName.TEST),
TEST_REPLY(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + TopicConstant.REPLY + "$"), ChannelName.TEST_REPLY),
YOUNG(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.YOUNG + TopicConstant.REGEX + "$"), ChannelName.YOUNG),
//默認(rèn)通道
UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT);
//主題匹配規(guī)則
Pattern pattern;
//通道名稱
String channelName;
TopicEnum(Pattern pattern, String channelName) {
this.pattern = pattern;
this.channelName = channelName;
}
public static TopicEnum find(String topic) {
//如果無(wú)法匹配topic,則分發(fā)到默認(rèn)通道
return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN);
}
}
5.3.3 channel與router聲明
ChannelName:通道名稱
InboundMessageRouter:從INBOUND中獲取topic,路由到指定的channel
/**
* @Description: 消息通道
* @Author: young
* @Date: 2023/2/15 16:32
*/
public class ChannelName {
public static final String INBOUND = "inbound";
public static final String DEFAULT = "default";
public static final String OUTBOUND = "outbound";
public static final String TEST = "test";
public static final String TEST_REPLY = "testReply";
public static final String YOUNG = "young";
/**
* @Description: 入站消息路由
* @Author: young
* @Date: 2023/2/15 17:09
*/
@Component
@Slf4j
public class InboundMessageRouter extends AbstractMessageRouter {
@Resource
private ApplicationContext applicationContext;
private static final ConcurrentHashMap<String, MessageChannel> channels = new ConcurrentHashMap<>(16);
/**
* 入站數(shù)據(jù)路由到指定通道
*/
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[]) message.getPayload();
log.info("接收到消息,topic:{},payload:{}", topic, new String(payload));
//查詢topic是否定義了
TopicEnum topicEnum = TopicEnum.find(topic);
if (channels.containsKey(topicEnum.getChannelName())) {
return Collections.singleton(channels.get(topicEnum.getChannelName()));
}
MessageChannel bean = (MessageChannel) applicationContext.getBean(topicEnum.getChannelName());
channels.put(topicEnum.getChannelName(), bean);
return Collections.singleton(bean);
}
}
5.3.4 消息訂閱與發(fā)布
MessageListenService :消息監(jiān)聽類
重點(diǎn)是@ServiceActivator(inputChannel = ChannelName.TEST)指定要監(jiān)聽的通道
SubscribeTopicJob :定時(shí)器,每分鐘獲取訂閱的topic文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-664228.html
/**
* @Description: 消息監(jiān)聽(訂閱)
* @Author: young
* @Date: 2023/2/17 10:12
*/
@Service
@Slf4j
public class MessageListenService {
@Resource
private MqttMessageSenderService messageSenderService;
@ServiceActivator(inputChannel = ChannelName.TEST)
public void listenTest(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenTest receive message : {}", new String(payload));
//消息回復(fù),回復(fù)的topic在連接客戶端的時(shí)候我這邊已經(jīng)訂閱了,第二個(gè)方法就能及時(shí)接收到回復(fù)的消息
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
messageSenderService.publish(topic + TopicConstant.REPLY, "test reply...");
}
@ServiceActivator(inputChannel = ChannelName.TEST_REPLY)
public void listenTestReply(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenTestReply receive message : {}", new String(payload));
}
@ServiceActivator(inputChannel = ChannelName.YOUNG)
public void listenYoung(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenYoung receive message : {}", new String(payload));
}
}
/**
* @Description: 消息發(fā)布
* @Author: young
* @Date: 2023/2/16 9:20
*/
@Service
@Slf4j
public class MqttMessageSenderServiceImpl implements MqttMessageSenderService {
@Resource
private MqttMessageGateway messageGateway;
@Resource
private ObjectMapper objectMapper;
@Override
public void publish(String topic, Object message) {
try {
messageGateway.publish(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
e.printStackTrace();
log.info("發(fā)布消息失敗:{}", e.getMessage());
}
}
@Override
public void publish(String topic, Object message, int qos) {
try {
messageGateway.publish(topic, objectMapper.writeValueAsBytes(message), qos);
} catch (JsonProcessingException e) {
e.printStackTrace();
log.info("發(fā)布消息失敗:{}", e.getMessage());
}
}
}
//topic訂閱與取消訂閱的接口實(shí)現(xiàn)
@Service
public class MqttTopicServiceImpl implements MqttTopicService {
@Resource
private MqttPahoMessageDrivenChannelAdapter adapter;
@Override
public void subscribe(String topic) {
adapter.addTopic(topic);
}
@Override
public void unsubscribe(String topic) {
adapter.removeTopic(topic);
}
@Override
public String[] getSubscribedTopic() {
return adapter.getTopic();
}
}
//消息發(fā)送網(wǎng)關(guān),最終由MessagingGateway發(fā)送消息,這個(gè)注解不能少
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface MqttMessageGateway {
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}
//消息發(fā)送接口
public interface MqttMessageSenderService {
void publish(String topic, Object message);
void publish(String topic, Object message, int qos);
}
//topic訂閱與取消訂閱的接口
public interface MqttTopicService {
void subscribe(@Header(MqttHeaders.TOPIC) String topic);
void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
String[] getSubscribedTopic();
}
//定時(shí)器:每分鐘監(jiān)聽一次訂閱
@Component
@Slf4j
public class SubscribeTopicJob {
@Resource
private MqttTopicService mqttTopicService;
//每分鐘打印一下訂閱情況
@Scheduled(cron = "0 0/1 * * * ?")
@Async
public void subscribeTopicListen() {
log.info("訂閱了:{}", JSONObject.toJSONString(mqttTopicService.getSubscribedTopic()));
}
}
代碼已提供,大家可以結(jié)合工具進(jìn)行測(cè)試,除了可以測(cè)試消息發(fā)布和訂閱,還可以測(cè)試客戶端自動(dòng)重連(docker 刪除容器:docker rm -f emqx后觀察日志)
如有侵權(quán),請(qǐng)告知?jiǎng)h除文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-664228.html
到了這里,關(guān)于MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!