国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot+WebSocket+Session共享

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot+WebSocket+Session共享。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

前言

WebSocket協(xié)議是基于TCP的一種新的網(wǎng)絡(luò)協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工(full-duplex)通信——允許服務(wù)器主動(dòng)發(fā)送信息給客戶端

一、為什么需要WebSocket

HTTP 是基于請求響應(yīng)式的,即通信只能由客戶端發(fā)起,服務(wù)端做出響應(yīng),無狀態(tài),無連接。

無狀態(tài): 每次連接只處理一個(gè)請求,請求結(jié)束后斷開連接。

無連接: 對(duì)于事務(wù)處理沒有記憶能力,服務(wù)器不知道客戶端是什么狀態(tài)。

通過HTTP實(shí)現(xiàn)即時(shí)通訊,只能是頁面輪詢向服務(wù)器發(fā)出請求,服務(wù)器返回查詢結(jié)果。輪詢的效率低,非常浪費(fèi)資源,因?yàn)楸仨毑煌_B接,或者 HTTP 連接始終打開。

WebSocket的最大特點(diǎn)就是,服務(wù)器可以主動(dòng)向客戶端推送信息,客戶端也可以主動(dòng)向服務(wù)器發(fā)送信息,是真正的雙向平等對(duì)話。

WebSocket特點(diǎn)

  • (1)建立在 TCP 協(xié)議之上,服務(wù)器端的實(shí)現(xiàn)比較容易。

  • (2)與 HTTP 協(xié)議有著良好的兼容性。默認(rèn)端口也是80和443,并且握手階段采用 HTTP 協(xié)議,因此握手時(shí)不容易屏蔽,能通過各種
    HTTP 代理服務(wù)器。

  • (3)數(shù)據(jù)格式比較輕量,性能開銷小,通信高效。

  • (4)可以發(fā)送文本,也可以發(fā)送二進(jìn)制數(shù)據(jù)。

  • (5)沒有同源限制,客戶端可以與任意服務(wù)器通信。

  • (6)協(xié)議標(biāo)識(shí)符是ws(如果加密,則為wss),服務(wù)器網(wǎng)址就是 URL。

二、SpringBoot整合WebSocket

1.maven依賴

		<dependency>  
           <groupId>org.springframework.boot</groupId>  
           <artifactId>spring-boot-starter-websocket</artifactId>  
       </dependency> 

2.WebSocketConfig

啟用WebSocket的支持

@Configuration
public class WebSocketConfig {
    /**
     * 自動(dòng)注冊使用@ServerEndpoint注解聲明的websocket endpoint
     * 2022/2/14
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.WebSocketServer

因?yàn)閃ebSocket是類似客戶端服務(wù)端的形式(采用ws協(xié)議),那么這里的WebSocketServer其實(shí)就相當(dāng)于一個(gè)ws協(xié)議的Controller

直接@ServerEndpoint(“/websocket/{userId}”) 、@Component啟用即可,然后在里面實(shí)現(xiàn)@OnOpen開啟連接,@onClose關(guān)閉連接,@onMessage接收消息等方法。

集群版(多個(gè)ws節(jié)點(diǎn))還需要借助mysql或者redis等進(jìn)行處理,改造對(duì)應(yīng)的sendMessage方法即可。

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
 
 
@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketServer {
 
    static Log log=LogFactory.get(WebSocketServer.class);
    /**靜態(tài)變量,用來記錄當(dāng)前在線連接數(shù)。應(yīng)該把它設(shè)計(jì)成線程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的線程安全Set,用來存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**與某個(gè)客戶端的連接會(huì)話,需要通過它來給客戶端發(fā)送數(shù)據(jù)*/
    private Session session;
    /**接收userId*/
    private String userId="";
 
    /**
     * 連接建立成功調(diào)用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            webSocketMap.put(userId,this);
            //加入set中
        }else{
            webSocketMap.put(userId,this);
            //加入set中
            addOnlineCount();
            //在線數(shù)加1
        }
 
        log.info("用戶連接:"+userId+",當(dāng)前在線人數(shù)為:" + getOnlineCount());
 
        try {
            sendMessage("連接成功");
        } catch (IOException e) {
            log.error("用戶:"+userId+",網(wǎng)絡(luò)異常!!!!!!");
        }
    }
 
    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //從set中刪除
            subOnlineCount();
        }
        log.info("用戶退出:"+userId+",當(dāng)前在線人數(shù)為:" + getOnlineCount());
    }
 
    /**
     * 收到客戶端消息后調(diào)用的方法
     *
     * @param message 客戶端發(fā)送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用戶消息:"+userId+",報(bào)文:"+message);
        //可以群發(fā)消息
        //消息保存到數(shù)據(jù)庫、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析發(fā)送的報(bào)文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加發(fā)送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId=jsonObject.getString("toUserId");
                //傳送給對(duì)應(yīng)toUserId用戶的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                }else{
                    log.error("請求的userId:"+toUserId+"不在該服務(wù)器上");
                    //否則不在這個(gè)服務(wù)器上,發(fā)送到mysql或者redis
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
 
    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用戶錯(cuò)誤:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }
    /**
     * 實(shí)現(xiàn)服務(wù)器主動(dòng)推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
 
 
    /**
     * 發(fā)送自定義消息
     * */
    public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
        log.info("發(fā)送消息到:"+userId+",報(bào)文:"+message);
        if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用戶"+userId+",不在線!");
        }
    }
 
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
 
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }
 
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

4.集群版

在使用websocket的時(shí)候會(huì)面對(duì)session共享的問題

在這里我們通過Redis的監(jiān)聽訂閱模式來實(shí)現(xiàn)session共享,當(dāng)有連接加入時(shí),將連接發(fā)送到隊(duì)列中然后向訂閱該隊(duì)列的服務(wù)下發(fā),由存儲(chǔ)該連接的服務(wù)進(jìn)行邏輯處理

session共享的業(yè)務(wù)邏輯如下圖所示
SpringBoot+WebSocket+Session共享,SpringBoot,spring boot,websocket,后端

4.1 Redis發(fā)布訂閱模式實(shí)現(xiàn)

前言

Redis可以通過發(fā)布訂閱模式、輪詢機(jī)制實(shí)現(xiàn)消息隊(duì)列。

由于沒有消息持久化與 ACK 的保證,所以,Redis 的發(fā)布訂閱功能并不可靠。這也就導(dǎo)致了它的應(yīng)用場景很有限,建議用于實(shí)時(shí)與可靠性要求不高的場景。

一、Redis發(fā)布訂閱
??????1. Redis發(fā)布訂閱模式實(shí)現(xiàn)原理

服務(wù)器中維護(hù)著一個(gè)pubsub_channels字典,所有的頻道和訂閱關(guān)系都存儲(chǔ)在這里。字典的鍵為頻道的名稱,而值為訂閱頻道的客戶端鏈表。

1> 當(dāng)有新的客戶端訂閱某個(gè)頻道時(shí),會(huì)發(fā)生兩種情況中的一種:

(1)如果頻道已經(jīng)存在,則新的客戶端會(huì)添加到pubsub_channels對(duì)應(yīng)頻道的鏈表末尾

(2)如果頻道原本不存在,則會(huì)為頻道創(chuàng)建一個(gè)鍵,該客戶端成為鏈表的第一個(gè)元素

2> 當(dāng)一個(gè)客戶端退訂一個(gè)頻道的時(shí)候:

pubsub_channels對(duì)應(yīng)鍵的鏈表會(huì)刪除該客戶端

3> 發(fā)送信息

服務(wù)器會(huì)遍歷pubsub_channels中對(duì)應(yīng)鍵的鏈表,向每一個(gè)客戶端發(fā)送信息

服務(wù)器還維護(hù)著一個(gè)pubsub_patterns鏈表,鏈表的pattern屬性記錄了被訂閱的模式,而client屬性記錄了訂閱模式的客戶端

  1. 當(dāng)有新的客戶端訂閱某個(gè)模式的時(shí),會(huì)進(jìn)行如下步驟:

(1)創(chuàng)建一個(gè)鏈表節(jié)點(diǎn),pattern屬性記錄訂閱的模式,client記錄訂閱模式的客戶端

(2)將這個(gè)鏈表節(jié)點(diǎn)添加到pubsub_patterns鏈表中

  1. 當(dāng)一個(gè)客戶端退訂某一個(gè)模式的時(shí)候:

服務(wù)器遍歷pubsob_patterns找到對(duì)應(yīng)的pattern同時(shí)也是對(duì)應(yīng)該client客戶端的節(jié)點(diǎn),將改節(jié)點(diǎn)刪除

  1. 發(fā)送信息

服務(wù)器遍歷pubsub_channels,查找與channels頻道相匹配的模式麻將消息發(fā)送給訂閱了這些模式的客戶端。

2.引入依賴
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
3.redis配置
public class RedisConfig {
    //Key的過期時(shí)間
    private Duration timeToLive = Duration.ofDays(1);
 
    /**
     * redis模板,存儲(chǔ)關(guān)鍵字是字符串,值jackson2JsonRedisSerializer是序列化后的值
     *
     * @param
     * @return org.springframework.data.redis.core.RedisTemplate
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(默認(rèn)使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
 
        //使用StringRedisSerializer來序列化和反序列化redis的key值
        RedisSerializer redisSerializer = new StringRedisSerializer();
        //key
        redisTemplate.setKeySerializer(redisSerializer);
        redisTemplate.setHashKeySerializer(redisSerializer);
        //value
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
 
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
 
    @Bean
    public RedisCacheConfiguration redisCacheConfiguration() {
        return RedisCacheConfiguration.
                defaultCacheConfig().
                entryTtl(this.timeToLive).			//Key過期時(shí)間 1天
                serializeKeysWith(RedisSerializationContext.SerializationPair.
                fromSerializer(new StringRedisSerializer())).
                serializeValuesWith(RedisSerializationContext.SerializationPair.
                        fromSerializer(new GenericJackson2JsonRedisSerializer()));
    }
}

4.redis監(jiān)聽
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
import java.util.concurrent.CountDownLatch;
 
 
@Configuration
public class RedisMessageListener {
    /**
     * 創(chuàng)建連接工廠
     *
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter,
                                                   MessageListenerAdapter listenerAdapterWang,
                                                   MessageListenerAdapter listenerAdapterTest2) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //(不同的監(jiān)聽器可以收到同一個(gè)頻道的信息)接受消息的頻道
        container.addMessageListener(listenerAdapter, new PatternTopic("phone"));
 
        container.addMessageListener(listenerAdapterWang, new PatternTopic("phone"));
 
        container.addMessageListener(listenerAdapterTest2, new PatternTopic("phoneTest2"));
        return container;
    }
 
 
    /**
     * 綁定消息監(jiān)聽者和接收監(jiān)聽的方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
 
    @Bean
    public MessageListenerAdapter listenerAdapterWang(ReceiverRedisMessage receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessageWang");
    }
   /**
     * 綁定消息監(jiān)聽者和接收監(jiān)聽的方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage2");
    }
 
    /**
     * 注冊訂閱者
     *
     * @param latch
     * @return
     */
    @Bean
    ReceiverRedisMessage receiver(CountDownLatch latch) {
        return new ReceiverRedisMessage(latch);
    }
 
 
    /**
     * 計(jì)數(shù)器,用來控制線程
     *
     * @return
     */
    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(1);//指定了計(jì)數(shù)的次數(shù) 1
    }
}
5.redis消息接收類
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.util.concurrent.CountDownLatch;
 
 
public class ReceiverRedisMessage {
    private CountDownLatch latch;
 
    @Autowired
    public ReceiverRedisMessage(CountDownLatch latch) {
        this.latch = latch;
    }
 
 
    /**
     * 隊(duì)列消息接收方法
     *
     * @param jsonMsg
     */
    public void receiveMessage(String jsonMsg) {
        log.info("[開始消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)...]");
        try {
            System.out.println(jsonMsg);
            log.info("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)成功.]");
        } catch (Exception e) {
            log.error("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
 
 
    public void receiveMessageWang(String jsonMsg) {
        log.info("[開始消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)...]");
        try {
            System.out.println(jsonMsg);
            log.info("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)成功.]");
        } catch (Exception e) {
            log.error("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
 
    /**
     * 隊(duì)列消息接收方法
     *
     * @param jsonMsg
     */
    public void receiveMessage2(String jsonMsg) {
        log.info("[開始消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)...]");
        try {
            System.out.println(jsonMsg);
            /**
             *  此處執(zhí)行自己代碼邏輯 操作數(shù)據(jù)庫等
             */
 
            log.info("[消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)成功.]");
        } catch (Exception e) {
            log.error("[消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
 
}
6.代碼測試
  public void redisTest() {
        //這個(gè)是測試同一個(gè)頻道,不同的訂閱者收到相同的信息,“phone”也就是topic也可以理解為頻道
        redisTemplate.convertAndSend("phone", "223333");
        //這個(gè)phoneTest2是另外的一個(gè)頻道
      //  redisTemplate.convertAndSend("phoneTest2", "34555665");
    } 
二、Redis輪詢機(jī)制
1.原理

Redis的列表類型鍵可以用來實(shí)現(xiàn)隊(duì)列,并且支持阻塞式讀取,可以實(shí)現(xiàn)一個(gè)高性能的優(yōu)先隊(duì)列, 在Redis中,List類型是按照插入順序排序的字符串鏈表。和數(shù)據(jù)結(jié)構(gòu)中的普通鏈表一樣,我們可以在其頭部(left)和尾部(right)添加新的元素。在插入時(shí),如果該鍵并不存在,Redis將為該鍵創(chuàng)建一個(gè)新的鏈表。與此相反,如果鏈表中所有的元素均被移除,那么該鍵也將會(huì)被從數(shù)據(jù)庫中刪除。List中可以包含的最大元素?cái)?shù)量是4294967295。從元素插入和刪除的效率視角來看,如果我們是在鏈表的兩頭插入或刪除元素,這將會(huì)是非常高效的操作,即使鏈表中已經(jīng)存儲(chǔ)了百萬條記錄,該操作也可以在常量時(shí)間內(nèi)完成。

2.生產(chǎn)者
  public R saveUserTicket(String phoneNum) {
        redisTemplate.opsForList().leftPush("ticket:Data", phoneNum);
        return R.ok();
    }
3.消費(fèi)者
 @Scheduled(fixedRate = 1)
    public synchronized void consumer() {
            String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
            if (!StringUtils.isEmpty(message)){
           //數(shù)據(jù)庫操作
            }
    }
4.優(yōu)化

如上述代碼,如果此時(shí)隊(duì)列為空,消費(fèi)者依然會(huì)頻繁拉取數(shù)據(jù),造成CPU空轉(zhuǎn),不僅占用CPU資源還對(duì)Redis造成壓力。因此當(dāng)隊(duì)列為空時(shí)我們可以休眠一段時(shí)間,再進(jìn)行拉取。

實(shí)現(xiàn)如下

     @Scheduled(fixedRate = 1)
    public synchronized void consumer() throws InterruptedException {
        long a = redisTemplate.opsForList().size("ticket:Data");
        if (a == 0) {
            TimeUnit.SECONDS.sleep(1);//等待時(shí)間
        }
            String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
            if (!StringUtils.isEmpty(message)){
           //數(shù)據(jù)庫操作
            }

Redis發(fā)布訂閱模式實(shí)現(xiàn)原理

前言

發(fā)布訂閱系統(tǒng)在我們?nèi)粘5墓ぷ髦薪?jīng)常會(huì)使用到,這種場景大部分情況我們都是使用消息隊(duì)列,常用的消息隊(duì)列有 Kafka,RocketMQ,RabbitMQ,每一種消息隊(duì)列都有其特性,很多時(shí)候我們可能不需要獨(dú)立部署相應(yīng)的消息隊(duì)列,只是簡單的使用,而且數(shù)據(jù)量也不會(huì)太大,這種情況下,我們就可以使用 Redis 的 Pub/Sub 模型。

一、Redis發(fā)布訂閱

Redis 的發(fā)布訂閱功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令組成,一個(gè)或者多個(gè)客戶端訂閱某個(gè)或者多個(gè)頻道,當(dāng)其他客戶端向該頻道發(fā)送消息的時(shí)候,訂閱了該頻道的客戶端都會(huì)收到對(duì)應(yīng)的消息。

二、發(fā)布訂閱模式的基本命令

Psubscribe 命令訂閱一個(gè)或多個(gè)符合給定模式的頻道。每個(gè)模式以 * 作為匹配符,比如 it* 匹配所有以 it 開頭的頻道( it.news 、 it.blog 、 it.tweets )

Psubscribe 命令基本語法如下:

PSUBSCRIBE pattern [pattern …]

Redis Pubsub 命令用于查看訂閱與發(fā)布系統(tǒng)狀態(tài),它由數(shù)個(gè)不同格式的子命令組成。

PUBSUB [argument [argument …]]

Publish 命令用于將信息發(fā)送到指定的頻道。

Publish 命令基本語法如下:

PUBLISH channel message

Punsubscribe 命令用于退訂所有給定模式的頻道。

Punsubscribe 命令基本語法如下:

PUNSUBSCRIBE [pattern [pattern …]]

Subscribe 命令用于訂閱給定的一個(gè)或多個(gè)頻道的信息。

Subscribe 命令基本語法如下:

SUBSCRIBE channel [channel …]

Unsubscribe 命令用于退訂給定的一個(gè)或多個(gè)頻道的信息。

Unsubscribe 命令基本語法如下:

UNSUBSCRIBE channel [channel …]

三、發(fā)布訂閱模式的實(shí)現(xiàn)原理

Redis通過PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令實(shí)現(xiàn)了發(fā)布和訂閱功能。

  • 通過SUBSCRIBE命令訂閱某個(gè)頻道后,redis-server里面維護(hù)了一個(gè)字典,字典的鍵就是一個(gè)個(gè)channel,而字典的值則是一個(gè)鏈表,鏈表中保存了所有訂閱這個(gè)channel的客戶端。SUBSCRIBE命令的關(guān)鍵,就是將客戶端添加到指定channel的訂閱鏈表中。
  • 通過PUBLISH命令向訂閱者發(fā)送信息,redis-server會(huì)使用給定的頻道作為鍵,在他所維護(hù)的channel字典中查找記錄訂閱該頻道的所有客戶端的鏈表,通過遍歷這個(gè)鏈表,來將信息發(fā)布給所有的訂閱者
Pub/Sub 底層存儲(chǔ)結(jié)構(gòu)

訂閱 Channel

在 Redis 的底層結(jié)構(gòu)中,客戶端和頻道的訂閱關(guān)系是通過一個(gè)字典加鏈表的結(jié)構(gòu)保存的,形式如下:

SpringBoot+WebSocket+Session共享,SpringBoot,spring boot,websocket,后端

在 Redis 的底層結(jié)構(gòu)中,Redis 服務(wù)器結(jié)構(gòu)體中定義了一個(gè) pubsub_channels 字典

struct redisServer {
 
//用于保存所有頻道的訂閱關(guān)系
 
dict *pubsub_channels
 
}

在這個(gè)字典中,key 代表的是頻道名稱,value 是一個(gè)鏈表,這個(gè)鏈表里面存放的是所有訂閱這個(gè)頻道的客戶端。

所以當(dāng)有客戶端執(zhí)行訂閱頻道的動(dòng)作的時(shí)候,服務(wù)器就會(huì)將客戶端與被訂閱的頻道在 pubsub_channels 字典中進(jìn)行關(guān)聯(lián)。

這個(gè)時(shí)候有兩種情況:

  • 該渠道是首次被訂閱:首次被訂閱說明在字典中并不存在該渠道的信息,那么程序首先要?jiǎng)?chuàng)建一個(gè)對(duì)應(yīng)的 key,并且要賦值一個(gè)空鏈表,然后將對(duì)應(yīng)的客戶端加入到鏈表中。此時(shí)鏈表只有一個(gè)元素。

  • 該渠道已經(jīng)被其他客戶端訂閱過:這個(gè)時(shí)候就直接將對(duì)應(yīng)的客戶端信息添加到鏈表的末尾就好了。

比如,如果有一個(gè)新的客戶端 Client 08 要訂閱 run 渠道,那么上圖就會(huì)變成
SpringBoot+WebSocket+Session共享,SpringBoot,spring boot,websocket,后端

如果 Client 08 要訂閱一個(gè)新的渠道 new_sport ,那么就會(huì)變成

SpringBoot+WebSocket+Session共享,SpringBoot,spring boot,websocket,后端

取消訂閱

上面介紹的是單個(gè) Channel 的訂閱,相反的如果一個(gè)客戶端要取消訂閱相關(guān) Channel,則無非是找到對(duì)應(yīng)的 Channel 的鏈表,從中刪除對(duì)應(yīng)的客戶端,如果該客戶端已經(jīng)是最后一個(gè)了,則將對(duì)應(yīng) Channel 也刪除。

模式訂閱結(jié)構(gòu)

模式渠道的訂閱與單個(gè)渠道的訂閱類似,不過服務(wù)器是將所有模式的訂閱關(guān)系都保存在服務(wù)器狀態(tài)的pubsub_patterns 屬性里面。


struct redisServer{
 
//保存所有模式訂閱關(guān)系
 
list *pubsub_patterns;
 
}

與訂閱單個(gè) Channel 不同的是,pubsub_patterns 屬性是一個(gè)鏈表,不是字典。節(jié)點(diǎn)的結(jié)構(gòu)如下:


struct pubsubPattern{
 
//訂閱模式的客戶端
 
redisClient *client;
 
//被訂閱的模式
 
robj *pattern;
 
} pubsubPattern;

其實(shí) client 屬性是用來存放對(duì)應(yīng)客戶端信息,pattern 是用來存放客戶端對(duì)應(yīng)的匹配模式。

所以對(duì)應(yīng)上面的 Client-06 模式匹配的結(jié)構(gòu)存儲(chǔ)如下
SpringBoot+WebSocket+Session共享,SpringBoot,spring boot,websocket,后端

在pubsub_patterns鏈表中有一個(gè)節(jié)點(diǎn),對(duì)應(yīng)的客戶端是 Client-06,對(duì)應(yīng)的匹配模式是run*。

訂閱模式

當(dāng)某個(gè)客戶端通過命令psubscribe 訂閱對(duì)應(yīng)模式的 Channel 時(shí)候,服務(wù)器會(huì)創(chuàng)建一個(gè)節(jié)點(diǎn),并將 Client 屬性設(shè)置為對(duì)應(yīng)的客戶端,pattern 屬性設(shè)置成對(duì)應(yīng)的模式規(guī)則,然后添加到鏈表尾部。

創(chuàng)建新節(jié)點(diǎn);

給節(jié)點(diǎn)的屬性賦值;

將節(jié)點(diǎn)添加到鏈表的尾部;

退訂模式

退訂模式的命令是punsubscribe,客戶端使用這個(gè)命令來退訂一個(gè)或者多個(gè)模式 Channel。服務(wù)器接收到該命令后,會(huì)遍歷pubsub_patterns鏈表,將匹配到的 client 和 pattern 屬性的節(jié)點(diǎn)給刪掉。這里需要判斷 client 屬性和 pattern 屬性都合法的時(shí)候再進(jìn)行刪除。

遍歷所有的節(jié)點(diǎn),當(dāng)匹配到相同 client 屬性和 pattern 屬性的時(shí)候就進(jìn)行節(jié)點(diǎn)刪除。

發(fā)布消息

當(dāng)一個(gè)客戶端執(zhí)行了publish channelName message 命令的時(shí)候,服務(wù)器會(huì)從pubsub_channels和pubsub_patterns 兩個(gè)結(jié)構(gòu)中找到符合channelName 的所有 Channel,進(jìn)行消息的發(fā)送。在 pubsub_channels 中只要找到對(duì)應(yīng)的 Channel 的 key 然后向?qū)?yīng)的 value 鏈表中的客戶端發(fā)送消息。文章來源地址http://www.zghlxwxcb.cn/news/detail-515841.html

4.2 隊(duì)列消息推送

    @OnMessage
    public void onMessage(Session session, String message) throws IOException, ParseException {
        //業(yè)務(wù)邏輯處理
        //向所有用戶廣播
        redisUtil.convertSend("topic", message);
    } 

4.3 隊(duì)列消息接收

/**
     * 隊(duì)列消息接收方法
     * 2022/2/21
     */
    public void receiveMessage(String message) {
        log.info("[開始消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)...]");
        try {
            //業(yè)務(wù)邏輯處理
            webSocketServer.sendMessageForProject(message);
            log.info("[消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)成功.]");
        } catch (Exception e) {
            log.error("[消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
參考來源:「人苼若只茹初見」
原文鏈接: https://blog.csdn.net/printf88/article/details/123685995

到了這里,關(guān)于SpringBoot+WebSocket+Session共享的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Spring Boot進(jìn)階(49):SpringBoot之集成WebSocket實(shí)現(xiàn)前后端通信 | 超級(jí)詳細(xì),建議收藏

    Spring Boot進(jìn)階(49):SpringBoot之集成WebSocket實(shí)現(xiàn)前后端通信 | 超級(jí)詳細(xì),建議收藏

    ????????在上一期,我對(duì)WebSocket進(jìn)行了基礎(chǔ)及理論知識(shí)普及學(xué)習(xí),WebSocket是一種基于TCP協(xié)議實(shí)現(xiàn)的全雙工通信協(xié)議,使用它可以實(shí)現(xiàn)實(shí)時(shí)通信,不必?fù)?dān)心HTTP協(xié)議的短連接問題。Spring Boot作為一款微服務(wù)框架,也提供了輕量級(jí)的WebSocket集成支持,本文將介紹如何在Spring Boot項(xiàng)

    2024年02月14日
    瀏覽(27)
  • Spring Boot進(jìn)階(49):實(shí)時(shí)通信不再是夢想,SpringBoot+WebSocket助你輕松實(shí)現(xiàn)前后端即時(shí)通訊!

    Spring Boot進(jìn)階(49):實(shí)時(shí)通信不再是夢想,SpringBoot+WebSocket助你輕松實(shí)現(xiàn)前后端即時(shí)通訊!

    ????????在上一期,我對(duì)WebSocket進(jìn)行了基礎(chǔ)及理論知識(shí)普及學(xué)習(xí),WebSocket是一種基于TCP協(xié)議實(shí)現(xiàn)的全雙工通信協(xié)議,使用它可以實(shí)現(xiàn)實(shí)時(shí)通信,不必?fù)?dān)心HTTP協(xié)議的短連接問題。Spring Boot作為一款微服務(wù)框架,也提供了輕量級(jí)的WebSocket集成支持,本文將介紹如何在Spring Boot項(xiàng)

    2024年02月11日
    瀏覽(21)
  • springboot websocket 屏幕共享

    實(shí)現(xiàn)springboot websocket同屏瀏覽功能 1,服務(wù)端:websocket screen sharejersey-server,推送給其他客戶端。 2,運(yùn)行websocketTestclient.bat,java websocket client截屏發(fā)送到服務(wù)端,客戶端代碼websocketTestWebSocketClient.java。 3,通過瀏覽器拉取數(shù)據(jù),地址為http://ip:8080/hello 運(yùn)行順序,先啟動(dòng)服務(wù)端,

    2024年02月11日
    瀏覽(15)
  • 服務(wù)端(后端)主動(dòng)通知前端的實(shí)現(xiàn):WebSocket(springboot中使用WebSocket案例)

    我們都知道 http 協(xié)議只能瀏覽器單方面向服務(wù)器發(fā)起請求獲得響應(yīng),服務(wù)器不能主動(dòng)向?yàn)g覽器推送消息。想要實(shí)現(xiàn)瀏覽器的主動(dòng)推送有兩種主流實(shí)現(xiàn)方式: 輪詢:缺點(diǎn)很多,但是實(shí)現(xiàn)簡單 websocket:在瀏覽器和服務(wù)器之間建立 tcp 連接,實(shí)現(xiàn)全雙工通信 springboot 使用 websocket 有

    2023年04月14日
    瀏覽(36)
  • Spring Boot整合WebSocket

    Spring Boot整合WebSocket

    在HTTP協(xié)議中,所有的請求都是由客戶端發(fā)起的,由服務(wù)端進(jìn)行響應(yīng),服務(wù)端無法向客戶端推送消息,但是在一些需要即時(shí)通信的應(yīng)用中,又不可避免地需要服務(wù)端向客戶端推送消息,傳統(tǒng)的解決方案主要有如下幾種。 輪詢 輪詢是最簡單的一種解決方案,所謂輪詢,就是客戶

    2024年02月05日
    瀏覽(23)
  • spring boot 項(xiàng)目整合 websocket

    spring boot 項(xiàng)目整合 websocket

    ? ? ? ? 負(fù)責(zé)的項(xiàng)目有一個(gè)搜索功能,搜索的范圍幾乎是全表掃,且數(shù)據(jù)源類型賊多。目前對(duì)搜索的數(shù)據(jù)量量級(jí)未知,但肯定不會(huì)太少,不僅需要搜索還得點(diǎn)擊下載文件。 ? ? ? ? ? 關(guān)于搜索這塊類型 眾多,未了避免有個(gè)別極大數(shù)據(jù)源影響整個(gè)搜索效率,我采用多線程異步

    2024年02月11日
    瀏覽(23)
  • Spring Boot 實(shí)現(xiàn) WebSocket 示例

    Spring Boot 實(shí)現(xiàn) WebSocket 示例

    WebSocket協(xié)議提供了一種標(biāo)準(zhǔn)化的方法,通過單個(gè)TCP連接在客戶機(jī)和服務(wù)器之間建立全雙工、雙向的通信通道。它是一種不同于HTTP的TCP協(xié)議,但被設(shè)計(jì)為在HTTP上工作,使用端口80和443,并允許重用現(xiàn)有的防火墻規(guī)則。 WebSocket 協(xié)議是獨(dú)立的基于 TCP 協(xié)議。它與 HTTP 的唯一關(guān)系是

    2024年02月14日
    瀏覽(22)
  • 4 Spring Boot與WebSocket實(shí)戰(zhàn)

    作者:禪與計(jì)算機(jī)程序設(shè)計(jì)藝術(shù) WebSocket(Web Socket)是一種雙向通訊協(xié)議,使得客戶端和服務(wù)器之間可以進(jìn)行實(shí)時(shí)通信。在WebSocket出現(xiàn)之前,開發(fā)者通常采用輪詢或Comet的方式來實(shí)現(xiàn)Web應(yīng)用中的實(shí)時(shí)更新功能。輪詢方式是通過瀏覽器定時(shí)向服務(wù)器發(fā)送請求,來檢查是否有新的消

    2024年02月06日
    瀏覽(22)
  • Spring Boot集成WebSocket實(shí)現(xiàn)消息推送

    Spring Boot集成WebSocket實(shí)現(xiàn)消息推送

    項(xiàng)目中經(jīng)常會(huì)用到消息推送功能,關(guān)于推送技術(shù)的實(shí)現(xiàn),我們通常會(huì)聯(lián)想到輪詢、comet長連接技術(shù),雖然這些技術(shù)能夠?qū)崿F(xiàn),但是需要反復(fù)連接,對(duì)于服務(wù)資源消耗過大,隨著技術(shù)的發(fā)展,HtML5定義了WebSocket協(xié)議,能更好的節(jié)省服務(wù)器資源和帶寬,并且能夠更實(shí)時(shí)地進(jìn)行通訊。

    2023年04月08日
    瀏覽(21)
  • 在 Spring Boot 中整合、使用 WebSocket

    WebSocket 是一種基于 TCP 協(xié)議的全雙工通信協(xié)議,它允許客戶端和服務(wù)器之間建立持久的、雙向的通信連接。相比傳統(tǒng)的 HTTP 請求 - 響應(yīng)模式,WebSocket 提供了實(shí)時(shí)、低延遲的數(shù)據(jù)傳輸能力。通過 WebSocket,客戶端和服務(wù)器可以在任意時(shí)間點(diǎn)互相發(fā)送消息,實(shí)現(xiàn)實(shí)時(shí)更新和即時(shí)通

    2024年04月13日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包