国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試)

這篇具有很好參考價(jià)值的文章主要介紹了MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

需要spring-boot集成spring-integration-mqtt代碼的直接跳到第5部分

1.MQTT介紹

1.1 MQTT是什么呢?

message queue telemetry translation
是一種基于發(fā)布與訂閱的輕量級(jí)消息傳輸協(xié)議.適用于低帶寬或網(wǎng)絡(luò)不穩(wěn)定的物聯(lián)網(wǎng)應(yīng)用.開發(fā)者可以使用極少的代碼來(lái)實(shí)現(xiàn)物聯(lián)網(wǎng)設(shè)備之間的消息傳輸.mqtt協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng),移動(dòng)互聯(lián)網(wǎng),智能硬件,車聯(lián)網(wǎng),遠(yuǎn)程醫(yī)療,電力石油等領(lǐng)域

1.2 mqtt必須具備一下幾點(diǎn)優(yōu)勢(shì):

簡(jiǎn)單易實(shí)現(xiàn)
消息傳遞可靠,支持QoS
輕量省帶寬
數(shù)據(jù)無(wú)關(guān)性,不關(guān)心數(shù)據(jù)格式
心跳模式(時(shí)刻感知客戶端狀態(tài))

1.3 MQTT與HTTP協(xié)議的區(qū)別

mqtt最小報(bào)文僅為2字節(jié),比http占用更少的網(wǎng)絡(luò)開銷
mqtt基于發(fā)布訂閱模型,http基于請(qǐng)求相應(yīng).mqtt支持雙工通信,http不支持
mqtt是有狀態(tài)的,http是無(wú)狀態(tài)的
mqtt具有斷開重連機(jī)制,http不支持

1.4 可靠的消息傳遞(QoS)

MQTT 協(xié)議提供了 3 種消息服務(wù)質(zhì)量等級(jí)(Quality of Service),保證了在不同的網(wǎng)絡(luò)環(huán)境下消息傳遞的可靠性
QoS 0:消息最多傳遞一次
如果當(dāng)時(shí)客戶端不可用,則會(huì)丟失該消息。發(fā)布者發(fā)送一條消息之后,就不再關(guān)心它有沒(méi)有發(fā)送到對(duì)方,也不設(shè)置任何重發(fā)機(jī)制
QoS 1:消息傳遞至少 1 次
包含了簡(jiǎn)單的重發(fā)機(jī)制,發(fā)布者發(fā)送消息之后等待接收者的 ACK,如果沒(méi)收到 ACK 則重新發(fā)送消息。這種模式能保證消息至少能到達(dá)一次,但無(wú)法保證消息重復(fù)
QoS 2:消息僅傳送一次
設(shè)計(jì)了重發(fā)和重復(fù)消息發(fā)現(xiàn)機(jī)制,保證消息到達(dá)對(duì)方并且嚴(yán)格只到達(dá)一次

2.docker部署一個(gè)emqx容器

安裝運(yùn)行emqx鏡像
docker run -itd --restart=always --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
安裝完成之后,我們相當(dāng)于有了一個(gè)mqtt的服務(wù)器
看一下運(yùn)行日志:docker logs -f emqx

[root@VM-4-3-centos dockerfile]# docker logs -f emqx
WARNING: Default (insecure) Erlang cookie is in use.
WARNING: Configure node.cookie in /opt/emqx/etc/emqx.conf or override from environment variable EMQX_NODE__COOKIE
WARNING: Use the same config value for all nodes in the cluster.
EMQX_RPC__PORT_DISCOVERY [rpc.port_discovery]: manual
EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE [log.file_handlers.default.enable]: false
EMQX_LOG__CONSOLE_HANDLER__ENABLE [log.console_handler.enable]: true
EMQX_NODE__NAME [node.name]: emqx@172.17.0.7
Listener ssl:default on 0.0.0.0:8883 started.
Listener tcp:default on 0.0.0.0:1883 started.
Listener ws:default on 0.0.0.0:8083 started.
Listener wss:default on 0.0.0.0:8084 started.
Listener http:dashboard on :18083 started.
EMQX 5.0.16 is running now!

此時(shí)mqtt安裝完畢,上述的四個(gè)端口只要防火墻允許通過(guò),我們就可以用mqtt官方提供的客戶端MQTTX進(jìn)行連接了
查詢啟動(dòng)的容器 docker ps
刪除容器 docker rm -f 容器名

3.客戶端工具M(jìn)QTTBox和MQTTX安裝(2選1)

3.1 MQTTBox安裝

安裝地址:http://workswithweb.com/mqttbox.html
如果無(wú)法訪問(wèn),可以百度云獲取
鏈接:https://pan.baidu.com/s/1HKd7qfHmezBwY6DXif9E1g
提取碼:sei6
解壓后找到MQTTBox.exe運(yùn)行即可

客戶端連接
MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試),docker,spring boot,中間件

3.2 MQTTX安裝

安裝地址:https://mqttx.app/zh
客戶端連接
MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試),docker,spring boot,中間件

4.發(fā)布訂閱測(cè)試(MQTTBOX)

mqttbox的好處是發(fā)布訂閱在一個(gè)頁(yè)面,便于觀察,發(fā)布的topic是emqx/123/test,訂閱的時(shí)候用emqx/+/test訂閱,+是通配符
MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試),docker,spring boot,中間件

5.spring-boot項(xiàng)目基于spring-integration-mqtt實(shí)現(xiàn)

代碼基于spring-integration-mqtt實(shí)現(xiàn).spring-integration-mqtt內(nèi)依賴了org.eclipse.paho.client.mqttv3(這個(gè)是Paho Java原生庫(kù))
下面的內(nèi)容足以應(yīng)對(duì)開發(fā)需求,實(shí)現(xiàn)了并發(fā)處理消息的能力,提供了回復(fù)消息的思路(往指定topic發(fā)送消息,讓另一個(gè)客戶端進(jìn)行接收)

5.1 application.yml

mqtt:
  protocol: tcp
  host: yourHost
  port: 1883
  username: admin
  password: admin
  client-id: admin
  inbound-topic: emqx/+/test,emqx/+/test_reply,emqx/young/+
  keep-alive-interval: 60
  connection-timeout: 120

5.2 pom.xml

	<parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.4.11</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
		//核心依賴就這個(gè)
		<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.5</version>
        </dependency>
    </dependencies>
		

5.3 代碼實(shí)現(xiàn)

大致的邏輯
1.通過(guò)協(xié)議ip端口等參數(shù)進(jìn)行mqtt連接
2.定義一個(gè)訂閱者客戶端進(jìn)行數(shù)據(jù)統(tǒng)一接入
3.定義一個(gè)topic路由器進(jìn)行消息分發(fā),每一個(gè)topic對(duì)應(yīng)一個(gè)channel
4.接收消息
5.定義一個(gè)發(fā)布者客戶端進(jìn)行消息發(fā)布
6.topic訂閱與取消訂閱

5.3.1 配置類(5個(gè))

MqttConnectConfiguration:mqtt連接配置
MqttInboundConfiguration:訂閱消息的客戶端
MqttOutboundConfiguration:發(fā)布消息的客戶端
MqttMessageChannel:消息通道
ThreadPoolOfInbound:統(tǒng)一入棧的線程池,處理并發(fā)消息

/**
 * @Description: mqtt連接配置
 * @Author: young
 * @Date: 2023/2/15 16:26
 */
@Configuration
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttConnectConfiguration {
    //協(xié)議
    private String protocol;

    //ip
    private String host;

    //端口
    private Integer port;

    private String username;

    private String password;

    //客戶端id
    private String clientId;

    //客戶端連接時(shí)需要自動(dòng)訂閱的topic,多個(gè)用逗號(hào)分割
    private String inboundTopic;

    //心跳時(shí)間:默認(rèn)60s,如果該時(shí)間內(nèi)客戶端沒(méi)有收到消息,客戶端ping一次服務(wù)端,判斷服務(wù)端是否宕機(jī)
    private Integer keepAliveInterval;

    //定義了客戶端等待建立到MQTT服務(wù)器的網(wǎng)絡(luò)連接的最大時(shí)間間隔,默認(rèn)超時(shí)為30秒
    private Integer connectionTimeout;


    @Bean
    //連接對(duì)象
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        serviceUrl = new StringBuilder()
                .append(protocol.trim())
                .append("://")
                .append(host.trim())
                .append(":")
                .append(port)
                .toString();
        mqttConnectOptions.setServerURIs(new String[]{serviceUrl});
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
        mqttConnectOptions.setConnectionTimeout(connectionTimeout);
        //重連不清除session
        mqttConnectOptions.setCleanSession(false);
        return mqttConnectOptions;
    }

    @Bean
    //客戶端工廠
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        //建立連接
        factory.setConnectionOptions(mqttConnectOptions());
        return factory;
    }

    //tcp://127.0.0.1:1883
    private String serviceUrl;
}



/**
 * @Description: 訂閱消息的客戶端
 * @Author: young
 * @Date: 2023/2/15 16:51
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

    @Resource
    private MqttConnectConfiguration mqttConnectConfiguration;
    @Resource
    private MqttPahoClientFactory mqttClientFactory;

    @Resource(name = ChannelName.INBOUND)
    private MessageChannel inboundChannel;

    /**
     * 消費(fèi)者訂閱消息
     */
    @Bean(name = "adapter")
    public MessageProducerSupport mqttInbound() {
        //消息適配器
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttConnectConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(),
                mqttClientFactory, mqttConnectConfiguration.getInboundTopic().split(","));
        //消息轉(zhuǎn)換器
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        //統(tǒng)一字節(jié)傳輸
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        //只接收一次
        adapter.setQos(2);
        //入棧的消息統(tǒng)一交給inbound通道處理
        adapter.setOutputChannel(inboundChannel);
        return adapter;
    }

    /**
     * 非法主題數(shù)據(jù)進(jìn)入
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.DEFAULT)
    public MessageHandler defaultInboundHandler() {
        return message -> {
            log.info("默認(rèn)通道接收到數(shù)據(jù)但無(wú)法處理,topic:{},payload:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), new String((byte[]) message.getPayload()));
        };
    }
}


/**
 * @Description: 發(fā)布消息的客戶端
 * @Author: young
 * @Date: 2023/2/15 17:00
 */
@Configuration
public class MqttOutboundConfiguration {

    @Resource
    private MqttConnectConfiguration mqttConnectConfiguration;
    @Resource
    private MqttPahoClientFactory mqttClientFactory;

    /**
     * 生產(chǎn)者發(fā)布消息
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttConnectConfiguration.getClientId() + "_producer_" + System.currentTimeMillis(),
                mqttClientFactory);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        messageHandler.setAsync(true);
        //只發(fā)送一次,不關(guān)心是客戶端(訂閱者)是否接收到
        messageHandler.setDefaultQos(0);
        messageHandler.setConverter(converter);
        return messageHandler;
    }
}


/**
 * @Description: 消息的通道,通過(guò)ExecutorChannel引入線程池,可以并發(fā)接收數(shù)據(jù)
 * @Author: young
 * @Date: 2023/2/15 16:35
 */
@Configuration
public class MqttMessageChannel {

//    @Bean(name = ChannelName.INBOUND)
//    public MessageChannel inboundChannel() {
//        return new DirectChannel();
//    }
    @Resource(name = "inboundThreadPool")
    private Executor inboundThreadPool;

    //通過(guò)ExecutorChannel引入線程池,可以并發(fā)接收數(shù)據(jù)
    @Bean(name = ChannelName.INBOUND)
    public MessageChannel inboundChannel() {
        return new ExecutorChannel(inboundThreadPool);
    }

    @Bean(name = ChannelName.DEFAULT)
    public MessageChannel defaultChannel() {
        return new DirectChannel();
    }

    @Bean(name = ChannelName.OUTBOUND)
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    //每一個(gè)指定的通道(或者topic)都可以指定一個(gè)線程池來(lái)并發(fā)處理接收的消息,這里只在inbound中做并發(fā)處理
    @Bean(name = ChannelName.TEST)
    public MessageChannel testChannel() {
        return new DirectChannel();
    }

    @Bean(name = ChannelName.TEST_REPLY)
    public MessageChannel testReplyChannel() {
        return new DirectChannel();
    }

    @Bean(name = ChannelName.YOUNG)
    public MessageChannel youngChannel() {
        return new DirectChannel();
    }

}

/**
 * @Description: 統(tǒng)一入棧的線程池
 * @Author: young
 * @Date: 2023/2/17 16:30
 */
@Component
public class ThreadPoolOfInbound {
    @Value("${thread.pool.core-pool-size: 20}")
    private int corePoolSize;

    @Value("${thread.pool.maximum-pool-size: 40}")
    private int maximumPoolSize;

    @Value("${thread.pool.keep-alive-time: 120}")
    private long keepAliveTime;

    @Value("${thread.pool.queue.capacity: 2000}")
    private int capacity;

    @Bean("inboundThreadPool")
    public Executor inboundThreadPool() {
        return new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(capacity),
                //線程名前綴
                new CustomizableThreadFactory("inboundThreadPool-"),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );
    }
}



5.3.2 topic聲明

TopicConstant :topic常量類,可自定義正則
TopicEnum:topic枚舉,關(guān)鍵的一個(gè)類,根據(jù)topic正則匹配到對(duì)應(yīng)的channel中

/**
 * @Description: topic 常量
 * @Author: young
 * @Date: 2023/2/16 11:02
 */
public class TopicConstant {

    public static final String EMQX = "emqx/";
    public static final String YOUNG = "young/";
    public static final String TEST = "/test";
    public static final String REPLY = "_reply";
    public static final String REGEX = "[A-Za-z0-9]+";

}

/**
 * @Description: 主題枚舉
 * @Author: young
 * @Date: 2023/2/15 17:13
 */
@Getter
public enum TopicEnum {
    //^限制開始位,$限制結(jié)束位
    TEST(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + "$"), ChannelName.TEST),
    TEST_REPLY(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + TopicConstant.REPLY + "$"), ChannelName.TEST_REPLY),
    YOUNG(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.YOUNG + TopicConstant.REGEX + "$"), ChannelName.YOUNG),
    //默認(rèn)通道
    UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT);


    //主題匹配規(guī)則
    Pattern pattern;
    //通道名稱
    String channelName;

    TopicEnum(Pattern pattern, String channelName) {
        this.pattern = pattern;
        this.channelName = channelName;
    }

    public static TopicEnum find(String topic) {
        //如果無(wú)法匹配topic,則分發(fā)到默認(rèn)通道
        return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN);
    }
}

5.3.3 channel與router聲明

ChannelName:通道名稱
InboundMessageRouter:從INBOUND中獲取topic,路由到指定的channel

/**
 * @Description: 消息通道
 * @Author: young
 * @Date: 2023/2/15 16:32
 */
public class ChannelName {

    public static final String INBOUND = "inbound";

    public static final String DEFAULT = "default";

    public static final String OUTBOUND = "outbound";


    public static final String TEST = "test";
    public static final String TEST_REPLY = "testReply";
    public static final String YOUNG = "young";

/**
 * @Description: 入站消息路由
 * @Author: young
 * @Date: 2023/2/15 17:09
 */
@Component
@Slf4j
public class InboundMessageRouter extends AbstractMessageRouter {

    @Resource
    private ApplicationContext applicationContext;

    private static final ConcurrentHashMap<String, MessageChannel> channels = new ConcurrentHashMap<>(16);

    /**
     * 入站數(shù)據(jù)路由到指定通道
     */
    @Override
    @Router(inputChannel = ChannelName.INBOUND)
    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
        byte[] payload = (byte[]) message.getPayload();

        log.info("接收到消息,topic:{},payload:{}", topic, new String(payload));

        //查詢topic是否定義了
        TopicEnum topicEnum = TopicEnum.find(topic);
        if (channels.containsKey(topicEnum.getChannelName())) {
            return Collections.singleton(channels.get(topicEnum.getChannelName()));
        }
        MessageChannel bean = (MessageChannel) applicationContext.getBean(topicEnum.getChannelName());
        channels.put(topicEnum.getChannelName(), bean);
        return Collections.singleton(bean);
    }
}


5.3.4 消息訂閱與發(fā)布

MessageListenService :消息監(jiān)聽類
重點(diǎn)是@ServiceActivator(inputChannel = ChannelName.TEST)指定要監(jiān)聽的通道
SubscribeTopicJob :定時(shí)器,每分鐘獲取訂閱的topic

/**
 * @Description: 消息監(jiān)聽(訂閱)
 * @Author: young
 * @Date: 2023/2/17 10:12
 */
@Service
@Slf4j
public class MessageListenService {
    @Resource
    private MqttMessageSenderService messageSenderService;

    @ServiceActivator(inputChannel = ChannelName.TEST)
    public void listenTest(Message<?> message) {
        byte[] payload = (byte[]) message.getPayload();
        log.info("listenTest receive message : {}", new String(payload));
        //消息回復(fù),回復(fù)的topic在連接客戶端的時(shí)候我這邊已經(jīng)訂閱了,第二個(gè)方法就能及時(shí)接收到回復(fù)的消息
        String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
        messageSenderService.publish(topic + TopicConstant.REPLY, "test reply...");
    }

    @ServiceActivator(inputChannel = ChannelName.TEST_REPLY)
    public void listenTestReply(Message<?> message) {
        byte[] payload = (byte[]) message.getPayload();
        log.info("listenTestReply receive message : {}", new String(payload));
    }

    @ServiceActivator(inputChannel = ChannelName.YOUNG)
    public void listenYoung(Message<?> message) {
        byte[] payload = (byte[]) message.getPayload();
        log.info("listenYoung receive message : {}", new String(payload));
    }

}


/**
 * @Description: 消息發(fā)布
 * @Author: young
 * @Date: 2023/2/16 9:20
 */
@Service
@Slf4j
public class MqttMessageSenderServiceImpl implements MqttMessageSenderService {
    @Resource
    private MqttMessageGateway messageGateway;
    @Resource
    private ObjectMapper objectMapper;

    @Override
    public void publish(String topic, Object message) {
        try {
            messageGateway.publish(topic, objectMapper.writeValueAsBytes(message));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            log.info("發(fā)布消息失敗:{}", e.getMessage());
        }
    }

    @Override
    public void publish(String topic, Object message, int qos) {
        try {
            messageGateway.publish(topic, objectMapper.writeValueAsBytes(message), qos);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            log.info("發(fā)布消息失敗:{}", e.getMessage());
        }
    }
}

//topic訂閱與取消訂閱的接口實(shí)現(xiàn)
@Service
public class MqttTopicServiceImpl implements MqttTopicService {
    @Resource
    private MqttPahoMessageDrivenChannelAdapter adapter;

    @Override
    public void subscribe(String topic) {
        adapter.addTopic(topic);
    }

    @Override
    public void unsubscribe(String topic) {
        adapter.removeTopic(topic);
    }

    @Override
    public String[] getSubscribedTopic() {
        return adapter.getTopic();
    }
}

//消息發(fā)送網(wǎng)關(guān),最終由MessagingGateway發(fā)送消息,這個(gè)注解不能少
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface MqttMessageGateway {

    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);

    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}


//消息發(fā)送接口
public interface MqttMessageSenderService {

    void publish(String topic, Object message);

    void publish(String topic, Object message, int qos);
}

//topic訂閱與取消訂閱的接口
public interface MqttTopicService {

    void subscribe(@Header(MqttHeaders.TOPIC) String topic);

    void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);

    String[] getSubscribedTopic();
}


//定時(shí)器:每分鐘監(jiān)聽一次訂閱
@Component
@Slf4j
public class SubscribeTopicJob {
    @Resource
    private MqttTopicService mqttTopicService;

    //每分鐘打印一下訂閱情況
    @Scheduled(cron = "0 0/1 * * * ?")
    @Async
    public void subscribeTopicListen() {
        log.info("訂閱了:{}", JSONObject.toJSONString(mqttTopicService.getSubscribedTopic()));
    }
}


代碼已提供,大家可以結(jié)合工具進(jìn)行測(cè)試,除了可以測(cè)試消息發(fā)布和訂閱,還可以測(cè)試客戶端自動(dòng)重連(docker 刪除容器:docker rm -f emqx后觀察日志)
如有侵權(quán),請(qǐng)告知?jiǎng)h除文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-664228.html

到了這里,關(guān)于MQTT記錄(概述,docker部署,基于spring-integration-mqtt實(shí)現(xiàn)消息訂閱與發(fā)布,客戶端工具測(cè)試)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用Docker部署Gitlab的記錄

    使用 docker -v 查看 映射本機(jī)的9980端口為Docker內(nèi)部的80端口 映射本機(jī)的9922端口為Docker內(nèi)部的22端口 使用root用戶啟動(dòng) 映射本機(jī)目錄/mnt/sda/gitlab/log為Docker內(nèi)部的/var/log/gitlab 映射本機(jī)目錄/mnt/sda/gitlab/opt為Docker內(nèi)部的/var/opt/gitlab 映射本機(jī)目錄/mnt/sda/gitlab/etc為Docker內(nèi)部的/etc/gitlab 使

    2024年02月09日
    瀏覽(16)
  • Docker無(wú)法部署Onlyoffice問(wèn)題記錄

    國(guó)產(chǎn)操作系統(tǒng)(麒麟v10-x86-64bit,openEuler20.03 64bit x86),安裝docker之后,通過(guò)docker run之后Onlyoffice無(wú)法成功跑起來(lái),且會(huì)消耗大量cpu資源,甚至導(dǎo)致系統(tǒng)卡死 鏡像的entrypoint中指向運(yùn)行一個(gè)sh腳本,腳本中通過(guò)service命令啟動(dòng)rabbitmq,但是一直無(wú)法成功啟動(dòng),且一直處于循環(huán)嘗試導(dǎo)

    2024年01月18日
    瀏覽(21)
  • Docker-Confluence部署記錄

    Docker-Confluence部署記錄

    參考-Confluence 破解方式(Linux) 按流程破解進(jìn)入系統(tǒng) 盡量是將備份包,放置到文件夾/var/atlassian/confluence/restore下。 1、部署完成,導(dǎo)入mysql數(shù)據(jù)后,出現(xiàn)中文亂碼-全是問(wèn)號(hào)的問(wèn)題 修改完成后,重啟docker容器。

    2024年01月21日
    瀏覽(14)
  • MQTT概述及環(huán)境搭建、python例程

    MQTT概述及環(huán)境搭建、python例程

    MQTT(英文全稱Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議)。 MQTT是一種輕量級(jí)的協(xié)議,適用于需要較小代碼占用空間或網(wǎng)絡(luò)帶寬非常寶貴的遠(yuǎn)程連接,是專為受限設(shè)備和低帶寬、高延遲或不可靠的網(wǎng)絡(luò)而設(shè)計(jì)。這些原則也使該協(xié)議成為新興的“機(jī)器到機(jī)器”( M2M )或物

    2024年02月04日
    瀏覽(14)
  • Docker環(huán)境下kkfileview安裝部署記錄

    Docker環(huán)境下kkfileview安裝部署記錄

    官方文檔地址:http://kkfileview.keking.cn/zh-cn/docs/production.html 源碼地址:https://gitee.com/kekingcn/file-online-preview 一般沒(méi)有改代碼的話用這種方式就可以,注意有nginx轉(zhuǎn)發(fā)的話需要配置base.url參數(shù) 由于對(duì)代碼做了一些更改,所以手動(dòng)部署 ? Dockerfile文件 ?pom 五、前端url處理

    2024年02月11日
    瀏覽(24)
  • 記錄.Net部署Docker-v指令使用

    之前我淺學(xué)了一下docker,方便部署.net項(xiàng)目(部署的是打包之后的項(xiàng)目) dockerfile文件如下: 然后下載鏡像,創(chuàng)建容器,都是使用的如下命令: 如果只是簡(jiǎn)單的使用上面的命令,是可以正確的部署項(xiàng)目,但是會(huì)出現(xiàn)一個(gè)問(wèn)題,就是我項(xiàng)目如果更新了的話。我需要先刪除容器,還要

    2023年04月20日
    瀏覽(14)
  • RocketMQ-(9-1)-MQTT-EventBridge概述

    RocketMQ-(9-1)-MQTT-EventBridge概述

    傳統(tǒng)的消息隊(duì)列MQ主要應(yīng)用于服務(wù)(端)之間的消息通信,比如電商領(lǐng)域的交易消息、支付消息、物流消息等等。然而在消息這個(gè)大類下,還有一個(gè)非常重要且常見的消息領(lǐng)域,即IoT類終端設(shè)備消息。近些年,我們看到隨著智能家居、工業(yè)互聯(lián)而興起的面向IoT設(shè)備類的消息正

    2024年02月10日
    瀏覽(15)
  • 使用docker部署 java web項(xiàng)目完整記錄

    使用docker部署 java web項(xiàng)目完整記錄

    一、docker 安裝 1、參考文檔:https://docs.docker.com/engine/install/centos/ 2、安裝步驟詳細(xì)說(shuō)明: 1)、清理或卸載舊的或已安裝的docker版本 2)、設(shè)置倉(cāng)庫(kù) 3)、安裝docker 其中需要輸入命令 一直輸 y 4)、啟動(dòng)docker 至此 docker已安裝完成 二、mysql安裝 基于docker容器安裝mysql;注意:?jiǎn)?/p>

    2024年02月09日
    瀏覽(46)
  • 記錄使用gitlab實(shí)現(xiàn)Docker自動(dòng)化部署

    記錄使用gitlab實(shí)現(xiàn)Docker自動(dòng)化部署

    目錄 前言 一、gitlab-runner docker安裝 二、gitlab-runner的注冊(cè)與使用 1.注冊(cè) 2. .gitlab-ci.yml 腳本編寫 總結(jié) 前面搭建了gitlab與harbor ,現(xiàn)在就使用它們來(lái)實(shí)現(xiàn)自動(dòng)化docker部署。所謂自動(dòng)化部署,就是提交代碼到指定分支時(shí)自動(dòng)觸發(fā)預(yù)先寫好的腳本,來(lái)實(shí)現(xiàn)打包,推送鏡像。想要實(shí)現(xiàn)這

    2023年04月11日
    瀏覽(14)
  • 內(nèi)網(wǎng)環(huán)境使用docker部署微服務(wù)系統(tǒng)記錄

    內(nèi)網(wǎng)環(huán)境使用docker部署微服務(wù)系統(tǒng)記錄

    內(nèi)網(wǎng)環(huán)境部署一套微服務(wù)應(yīng)用系統(tǒng),采用docker方式部署。包括mysql、redis、nginx、nacos、gateway以及應(yīng)用程序的jar包。下面記錄部署的過(guò)程和遇到的問(wèn)題。 內(nèi)網(wǎng)生成mysql鏡像 在一個(gè)可以連接外網(wǎng)的環(huán)境中,下載mysql鏡像: 將鏡像打包: 將打好的mysql.tar包傳到內(nèi)網(wǎng)服務(wù)器中,解壓: 此

    2024年02月16日
    瀏覽(17)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包