国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱

這篇具有很好參考價(jià)值的文章主要介紹了Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1.引入RocketMQ依賴:首先,在pom.xml文件中添加RocketMQ的依賴:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本號(hào)根據(jù)實(shí)際情況調(diào)整 -->
</dependency>

2.配置RocketMQ連接信息:在application.propertiesapplication.yml中配置RocketMQ的連接信息,包括Name Server地址等:

spring:
  application:
    name: ${sn.publish}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
        bindings:
          output:
            producer:
              group: testSocket
              sync: true
      bindings:
        output:
          destination: test-topic
          content-type: application/json

3.消息發(fā)布組件

@Component
public class MqSourceComponent {
    @Resource
    Source source;

    public void publishNotify(SampleNotifyDTO notify) {
        source.output().send(MessageBuilder.withPayload(notify).build());
    }
}

4.消息發(fā)布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {
    @Resource
    MqSourceComponent mq;

    @ApiOperation(value = "測(cè)試發(fā)布消息")
    @PostMapping("test-publish")
    public JsonVO<String> testSend(SampleNotifyDTO notify) {
        mq.publishNotify(notify);
        return JsonVO.success("消息已發(fā)送");
    }
}

項(xiàng)目結(jié)構(gòu):

Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱,中間件,微服務(wù),rocketmq

接下來(lái)是websocket模塊的搭建

1. 依賴添加

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本號(hào)根據(jù)實(shí)際情況調(diào)整 -->
</dependency>

2.application.yml配置文件

server:
  port: ${sp.ws}
spring:
  application:
    name: ${sn.ws}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
      bindings:
        input:
          destination: test-topic
          content-type: application/json
          group: testSocket

3.將應(yīng)用程序綁定到消息代理

@EnableBinding(Sink.class): 這是Spring Cloud Stream的注解,它用于將應(yīng)用程序綁定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的預(yù)定義輸入通道,允許你接收消息。通過(guò)這個(gè)注解,你的應(yīng)用程序可以監(jiān)聽消息通道,并定義消息處理邏輯。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {

    public static void main(String[] args) {
        SpringApplication.run(WsApplication.class, args);
    }

}

4.消息訂閱組件

監(jiān)聽消息通道中的消息,一旦有消息到達(dá),就會(huì)觸發(fā)listenNotify方法,該方法負(fù)責(zé)處理消息并通過(guò)chat服務(wù)發(fā)送響應(yīng)。

@Component
@Slf4j
public class MqListenComponent {
    @Resource
    ChatService chat;

    @StreamListener(Sink.INPUT)
    public void listenNotify(SampleNotifyDTO notify) {
        log.info(notify.toString());
        chat.sendMessage(notify.getClientId(), notify);
    }
}

5.消息通知服務(wù)

package com.zeroone.star.ws.service;

import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


@Component
@ServerEndpoint("/chat")
public class ChatService {
    /**
     * 連接會(huì)話池
     */
    private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session) throws IOException {
        // 判斷客戶端對(duì)象是否存在
        if (SESSION_POOL.containsKey(session.getQueryString())) {
            CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID沖突,連接拒絕");
            session.getUserProperties().put("reason", closeReason);
            session.close();
            return;
        }
        // 將客戶端對(duì)象存儲(chǔ)到會(huì)話池
        SESSION_POOL.put(session.getQueryString(), session);
        System.out.println("客戶端(" + session.getQueryString() + "):開啟了連接");
    }

    @OnMessage
    public String onMessage(String msg, Session session) throws IOException {
        // 解析消息 ==> ID::消息內(nèi)容
        String[] msgArr = msg.split("::", 2);
        // 處理群發(fā)消息,ID==all表示群發(fā)
        if ("all".equalsIgnoreCase(msgArr[0])) {
            for (Session one : SESSION_POOL.values()) {
                // 排除自己
                if (one == session) {
                    continue;
                }
                // 發(fā)送消息
                one.getBasicRemote().sendText(msgArr[1]);
            }
        }
        // 指定發(fā)送
        else {
            // 獲取接收方
            Session target = SESSION_POOL.get(msgArr[0]);
            if (target != null) {
                target.getBasicRemote().sendText(msgArr[1]);
            }
        }
        return session.getQueryString() + ":消息發(fā)送成功";
    }

    @OnClose
    public void onClose(Session session) {
        // 連接拒絕關(guān)閉會(huì)話
        Object reason = session.getUserProperties().get("reason");
        if (reason instanceof CloseReason) {
            CloseReason creason = (CloseReason) reason;
            if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
                System.out.println("拒絕客戶(" + session.getQueryString() + "):關(guān)閉連接");
                return;
            }
        }
        // 從會(huì)話池中移除會(huì)話
        SESSION_POOL.remove(session.getQueryString());
        System.out.println("客戶端(" + session.getQueryString() + "):關(guān)閉連接");
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("客戶端(" + session.getQueryString() + ")錯(cuò)誤信息:" + throwable.getMessage());
    }

    @SneakyThrows
    public void sendMessage(String id, Object message) {
        // 群發(fā)
        if ("all".equalsIgnoreCase(id)) {
            for (Session one : SESSION_POOL.values()) {
                // 發(fā)送消息
                one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
        // 指定發(fā)送
        else {
            // 獲取接收方
            Session target = SESSION_POOL.get(id);
            if (target != null) {
                target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
    }
}

項(xiàng)目結(jié)構(gòu):

Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱,中間件,微服務(wù),rocketmq文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-715115.html

到了這里,關(guān)于Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驅(qū)動(dòng)架構(gòu)(MDA)解析,實(shí)現(xiàn)異步處理與解耦合!

    【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驅(qū)動(dòng)架構(gòu)(MDA)解析,實(shí)現(xiàn)異步處理與解耦合!

    ???? 歡迎光臨,終于等到你啦 ???? ??我是 蘇澤 ,一位對(duì)技術(shù)充滿熱情的探索者和分享者。???? ??持續(xù)更新的專欄 《Spring 狂野之旅:從入門到入魔》 ?? 本專欄帶你從Spring入門到入魔 ? 這是蘇澤的個(gè)人主頁(yè)可以看到我其他的內(nèi)容哦???? 努力的蘇澤 http://suzee.blog.

    2024年03月10日
    瀏覽(29)
  • Java:SpringBoot整合WebSocket實(shí)現(xiàn)服務(wù)端向客戶端推送消息

    Java:SpringBoot整合WebSocket實(shí)現(xiàn)服務(wù)端向客戶端推送消息

    思路: 后端通過(guò)websocket向前端推送消息,前端統(tǒng)一使用http協(xié)議接口向后端發(fā)送數(shù)據(jù) 本文僅放一部分重要的代碼,完整代碼可參看github倉(cāng)庫(kù) websocket 前端測(cè)試 :http://www.easyswoole.com/wstool.html 依賴 項(xiàng)目目錄 完整依賴 配置 WebSocketServer.java 前端頁(yè)面 websocket.html 前端邏輯 index.js 參

    2024年02月04日
    瀏覽(29)
  • Spring Cloud【消息驅(qū)動(dòng)(什么是Spring Cloud Stream、SpringCloud Stream核心概念、入門案例之消息消費(fèi)者 )】(十一)

    Spring Cloud【消息驅(qū)動(dòng)(什么是Spring Cloud Stream、SpringCloud Stream核心概念、入門案例之消息消費(fèi)者 )】(十一)

    ? 目錄 消息驅(qū)動(dòng)_什么是Spring Cloud Stream 消息驅(qū)動(dòng)_SpringCloud Stream核心概念

    2024年02月15日
    瀏覽(24)
  • 【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息發(fā)送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道發(fā)送消息 2.1.2 通過(guò)隱式綁定信道, 注冊(cè) Bean 發(fā)送消息 2.2 消息接收者 注意: 多個(gè)方法之間可以使用 “|” 間隔, 但是綁定時(shí) 多個(gè)需要按順序?qū)? 其中

    2024年02月03日
    瀏覽(25)
  • Spring Cloud Alibaba整合RocketMQ架構(gòu)原理分析

    Spring Cloud Alibaba整合RocketMQ架構(gòu)原理分析

    關(guān)于RocketMQ的原理,本文就不做詳細(xì)分析了,這里就重點(diǎn)關(guān)注Spring Cloud Alibaba是如何整合RocketrMQ的。 RocketMQ提供了RocketMQ Client SDK,開發(fā)者可以直接依賴這個(gè)SDK,就可以完成消息的生產(chǎn)和消費(fèi)。 1.生產(chǎn)消息 RocketMQ Client SDK提供了生產(chǎn)消息的API接口DefaultMQProducer,開發(fā)者可以直接使

    2024年01月22日
    瀏覽(94)
  • rocketMq消息隊(duì)列原生api使用以及rocketMq整合springboot

    rocketMq消息隊(duì)列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API開發(fā)是最簡(jiǎn)單也是目前看來(lái)最牢靠的方式。這里用SpringBoot來(lái)搭建一系列消息生產(chǎn)者和消息消費(fèi)者,來(lái)訪問之前搭建的RocketMQ集群。 首先創(chuàng)建一個(gè)基于Maven的SpringBoot工程,引入如下依賴: RocketMQ的官網(wǎng)上有很多經(jīng)典的測(cè)試代碼,這些代碼雖然依賴的版本比較

    2024年02月12日
    瀏覽(22)
  • Spring Boot 3 + Vue 3 整合 WebSocket (STOMP協(xié)議) 實(shí)現(xiàn)廣播和點(diǎn)對(duì)點(diǎn)實(shí)時(shí)消息

    Spring Boot 3 + Vue 3 整合 WebSocket (STOMP協(xié)議) 實(shí)現(xiàn)廣播和點(diǎn)對(duì)點(diǎn)實(shí)時(shí)消息

    ?? 作者主頁(yè): 有來(lái)技術(shù) ?? 開源項(xiàng)目: youlai-mall ?? vue3-element-admin ?? youlai-boot ?? 倉(cāng)庫(kù)主頁(yè): Gitee ?? Github ?? GitCode ?? 歡迎點(diǎn)贊 ?? 收藏 ?留言 ?? 如有錯(cuò)誤敬請(qǐng)糾正! WebSocket是一種在Web瀏覽器與Web服務(wù)器之間建立雙向通信的協(xié)議,而Spring Boot提供了便捷的WebSocket支持

    2024年02月02日
    瀏覽(18)
  • rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring

    rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring

    使用RocketMQ的原生API開發(fā)是最簡(jiǎn)單也是目前看來(lái)最牢靠的方式。這里用SpringBoot來(lái)搭建一系列消息生產(chǎn)者和消息消費(fèi)者,來(lái)訪問之前搭建的RocketMQ集群。 首先創(chuàng)建一個(gè)基于Maven的SpringBoot工程,引入如下依賴: RocketMQ的官網(wǎng)上有很多經(jīng)典的測(cè)試代碼,這些代碼雖然依賴的版本比較

    2024年02月13日
    瀏覽(22)
  • Spring Cloud Stream 4.0.4 rabbitmq 發(fā)送消息多function

    Spring Cloud Stream 4.0.4 rabbitmq 發(fā)送消息多function

    注意當(dāng)多個(gè)消費(fèi)者時(shí),需要添加配置項(xiàng):spring.cloud.function.definition 啟動(dòng)日志 交換機(jī)名稱對(duì)應(yīng): spring.cloud.stream.bindings.demo-in-0.destination配置項(xiàng)的值 隊(duì)列名稱是交換機(jī)名稱+分組名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 問題總結(jié) 問題一 解決辦法: 查看配置是否正確: spring

    2024年02月19日
    瀏覽(22)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來(lái)配置發(fā)送和消費(fèi) RocketMQ 消息

    ? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細(xì)節(jié),然后通過(guò)一個(gè)簡(jiǎn)單的示例來(lái)一步一步的講解如何使用這個(gè) spring-boot-starter 工具包來(lái)配置,發(fā)送和消費(fèi) RocketMQ 消息。 添加maven依賴: 修改application.properties 注意: 請(qǐng)將上述示例配置中的 127.0.0.1:9876 替換

    2024年03月22日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包