前言
WebSocket協(xié)議是基于TCP的一種新的網(wǎng)絡(luò)協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工(full-duplex)通信——允許服務(wù)器主動(dòng)發(fā)送信息給客戶端
一、為什么需要WebSocket
HTTP 是基于請求響應(yīng)式的,即通信只能由客戶端發(fā)起,服務(wù)端做出響應(yīng),無狀態(tài),無連接。
無狀態(tài): 每次連接只處理一個(gè)請求,請求結(jié)束后斷開連接。
無連接: 對(duì)于事務(wù)處理沒有記憶能力,服務(wù)器不知道客戶端是什么狀態(tài)。
通過HTTP實(shí)現(xiàn)即時(shí)通訊,只能是頁面輪詢向服務(wù)器發(fā)出請求,服務(wù)器返回查詢結(jié)果。輪詢的效率低,非常浪費(fèi)資源,因?yàn)楸仨毑煌_B接,或者 HTTP 連接始終打開。
WebSocket的最大特點(diǎn)就是,服務(wù)器可以主動(dòng)向客戶端推送信息,客戶端也可以主動(dòng)向服務(wù)器發(fā)送信息,是真正的雙向平等對(duì)話。
WebSocket特點(diǎn)
-
(1)建立在 TCP 協(xié)議之上,服務(wù)器端的實(shí)現(xiàn)比較容易。
-
(2)與 HTTP 協(xié)議有著良好的兼容性。默認(rèn)端口也是80和443,并且握手階段采用 HTTP 協(xié)議,因此握手時(shí)不容易屏蔽,能通過各種
HTTP 代理服務(wù)器。 -
(3)數(shù)據(jù)格式比較輕量,性能開銷小,通信高效。
-
(4)可以發(fā)送文本,也可以發(fā)送二進(jìn)制數(shù)據(jù)。
-
(5)沒有同源限制,客戶端可以與任意服務(wù)器通信。
-
(6)協(xié)議標(biāo)識(shí)符是ws(如果加密,則為wss),服務(wù)器網(wǎng)址就是 URL。
二、SpringBoot整合WebSocket
1.maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.WebSocketConfig
啟用WebSocket的支持
@Configuration
public class WebSocketConfig {
/**
* 自動(dòng)注冊使用@ServerEndpoint注解聲明的websocket endpoint
* 2022/2/14
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.WebSocketServer
因?yàn)閃ebSocket是類似客戶端服務(wù)端的形式(采用ws協(xié)議),那么這里的WebSocketServer其實(shí)就相當(dāng)于一個(gè)ws協(xié)議的Controller
直接@ServerEndpoint(“/websocket/{userId}”) 、@Component啟用即可,然后在里面實(shí)現(xiàn)@OnOpen開啟連接,@onClose關(guān)閉連接,@onMessage接收消息等方法。
集群版(多個(gè)ws節(jié)點(diǎn))還需要借助mysql或者redis等進(jìn)行處理,改造對(duì)應(yīng)的sendMessage方法即可。
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketServer {
static Log log=LogFactory.get(WebSocketServer.class);
/**靜態(tài)變量,用來記錄當(dāng)前在線連接數(shù)。應(yīng)該把它設(shè)計(jì)成線程安全的。*/
private static int onlineCount = 0;
/**concurrent包的線程安全Set,用來存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**與某個(gè)客戶端的連接會(huì)話,需要通過它來給客戶端發(fā)送數(shù)據(jù)*/
private Session session;
/**接收userId*/
private String userId="";
/**
* 連接建立成功調(diào)用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
this.userId=userId;
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
webSocketMap.put(userId,this);
//加入set中
}else{
webSocketMap.put(userId,this);
//加入set中
addOnlineCount();
//在線數(shù)加1
}
log.info("用戶連接:"+userId+",當(dāng)前在線人數(shù)為:" + getOnlineCount());
try {
sendMessage("連接成功");
} catch (IOException e) {
log.error("用戶:"+userId+",網(wǎng)絡(luò)異常!!!!!!");
}
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose() {
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
//從set中刪除
subOnlineCount();
}
log.info("用戶退出:"+userId+",當(dāng)前在線人數(shù)為:" + getOnlineCount());
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @param message 客戶端發(fā)送過來的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用戶消息:"+userId+",報(bào)文:"+message);
//可以群發(fā)消息
//消息保存到數(shù)據(jù)庫、redis
if(StringUtils.isNotBlank(message)){
try {
//解析發(fā)送的報(bào)文
JSONObject jsonObject = JSON.parseObject(message);
//追加發(fā)送人(防止串改)
jsonObject.put("fromUserId",this.userId);
String toUserId=jsonObject.getString("toUserId");
//傳送給對(duì)應(yīng)toUserId用戶的websocket
if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
}else{
log.error("請求的userId:"+toUserId+"不在該服務(wù)器上");
//否則不在這個(gè)服務(wù)器上,發(fā)送到mysql或者redis
}
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶錯(cuò)誤:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();
}
/**
* 實(shí)現(xiàn)服務(wù)器主動(dòng)推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 發(fā)送自定義消息
* */
public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
log.info("發(fā)送消息到:"+userId+",報(bào)文:"+message);
if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
webSocketMap.get(userId).sendMessage(message);
}else{
log.error("用戶"+userId+",不在線!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
4.集群版
在使用websocket的時(shí)候會(huì)面對(duì)session共享的問題
在這里我們通過Redis的監(jiān)聽訂閱模式來實(shí)現(xiàn)session共享,當(dāng)有連接加入時(shí),將連接發(fā)送到隊(duì)列中然后向訂閱該隊(duì)列的服務(wù)下發(fā),由存儲(chǔ)該連接的服務(wù)進(jìn)行邏輯處理
session共享的業(yè)務(wù)邏輯如下圖所示
4.1 Redis發(fā)布訂閱模式實(shí)現(xiàn)
前言
Redis可以通過發(fā)布訂閱模式、輪詢機(jī)制實(shí)現(xiàn)消息隊(duì)列。
由于沒有消息持久化與 ACK 的保證,所以,Redis 的發(fā)布訂閱功能并不可靠。這也就導(dǎo)致了它的應(yīng)用場景很有限,建議用于實(shí)時(shí)與可靠性要求不高的場景。
一、Redis發(fā)布訂閱
??????1. Redis發(fā)布訂閱模式實(shí)現(xiàn)原理
服務(wù)器中維護(hù)著一個(gè)pubsub_channels字典,所有的頻道和訂閱關(guān)系都存儲(chǔ)在這里。字典的鍵為頻道的名稱,而值為訂閱頻道的客戶端鏈表。
1> 當(dāng)有新的客戶端訂閱某個(gè)頻道時(shí),會(huì)發(fā)生兩種情況中的一種:
(1)如果頻道已經(jīng)存在,則新的客戶端會(huì)添加到pubsub_channels對(duì)應(yīng)頻道的鏈表末尾
(2)如果頻道原本不存在,則會(huì)為頻道創(chuàng)建一個(gè)鍵,該客戶端成為鏈表的第一個(gè)元素
2> 當(dāng)一個(gè)客戶端退訂一個(gè)頻道的時(shí)候:
pubsub_channels對(duì)應(yīng)鍵的鏈表會(huì)刪除該客戶端
3> 發(fā)送信息
服務(wù)器會(huì)遍歷pubsub_channels中對(duì)應(yīng)鍵的鏈表,向每一個(gè)客戶端發(fā)送信息
服務(wù)器還維護(hù)著一個(gè)pubsub_patterns鏈表,鏈表的pattern屬性記錄了被訂閱的模式,而client屬性記錄了訂閱模式的客戶端
- 當(dāng)有新的客戶端訂閱某個(gè)模式的時(shí),會(huì)進(jìn)行如下步驟:
(1)創(chuàng)建一個(gè)鏈表節(jié)點(diǎn),pattern屬性記錄訂閱的模式,client記錄訂閱模式的客戶端
(2)將這個(gè)鏈表節(jié)點(diǎn)添加到pubsub_patterns鏈表中
- 當(dāng)一個(gè)客戶端退訂某一個(gè)模式的時(shí)候:
服務(wù)器遍歷pubsob_patterns找到對(duì)應(yīng)的pattern同時(shí)也是對(duì)應(yīng)該client客戶端的節(jié)點(diǎn),將改節(jié)點(diǎn)刪除
- 發(fā)送信息
服務(wù)器遍歷pubsub_channels,查找與channels頻道相匹配的模式麻將消息發(fā)送給訂閱了這些模式的客戶端。
2.引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3.redis配置
public class RedisConfig {
//Key的過期時(shí)間
private Duration timeToLive = Duration.ofDays(1);
/**
* redis模板,存儲(chǔ)關(guān)鍵字是字符串,值jackson2JsonRedisSerializer是序列化后的值
*
* @param
* @return org.springframework.data.redis.core.RedisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(默認(rèn)使用JDK的序列化方式)
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
//使用StringRedisSerializer來序列化和反序列化redis的key值
RedisSerializer redisSerializer = new StringRedisSerializer();
//key
redisTemplate.setKeySerializer(redisSerializer);
redisTemplate.setHashKeySerializer(redisSerializer);
//value
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisCacheConfiguration redisCacheConfiguration() {
return RedisCacheConfiguration.
defaultCacheConfig().
entryTtl(this.timeToLive). //Key過期時(shí)間 1天
serializeKeysWith(RedisSerializationContext.SerializationPair.
fromSerializer(new StringRedisSerializer())).
serializeValuesWith(RedisSerializationContext.SerializationPair.
fromSerializer(new GenericJackson2JsonRedisSerializer()));
}
}
4.redis監(jiān)聽
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.concurrent.CountDownLatch;
@Configuration
public class RedisMessageListener {
/**
* 創(chuàng)建連接工廠
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageListenerAdapter listenerAdapterWang,
MessageListenerAdapter listenerAdapterTest2) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//(不同的監(jiān)聽器可以收到同一個(gè)頻道的信息)接受消息的頻道
container.addMessageListener(listenerAdapter, new PatternTopic("phone"));
container.addMessageListener(listenerAdapterWang, new PatternTopic("phone"));
container.addMessageListener(listenerAdapterTest2, new PatternTopic("phoneTest2"));
return container;
}
/**
* 綁定消息監(jiān)聽者和接收監(jiān)聽的方法
*
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
public MessageListenerAdapter listenerAdapterWang(ReceiverRedisMessage receiver) {
return new MessageListenerAdapter(receiver, "receiveMessageWang");
}
/**
* 綁定消息監(jiān)聽者和接收監(jiān)聽的方法
*
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage2");
}
/**
* 注冊訂閱者
*
* @param latch
* @return
*/
@Bean
ReceiverRedisMessage receiver(CountDownLatch latch) {
return new ReceiverRedisMessage(latch);
}
/**
* 計(jì)數(shù)器,用來控制線程
*
* @return
*/
@Bean
public CountDownLatch latch() {
return new CountDownLatch(1);//指定了計(jì)數(shù)的次數(shù) 1
}
}
5.redis消息接收類
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.CountDownLatch;
public class ReceiverRedisMessage {
private CountDownLatch latch;
@Autowired
public ReceiverRedisMessage(CountDownLatch latch) {
this.latch = latch;
}
/**
* 隊(duì)列消息接收方法
*
* @param jsonMsg
*/
public void receiveMessage(String jsonMsg) {
log.info("[開始消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)...]");
try {
System.out.println(jsonMsg);
log.info("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)成功.]");
} catch (Exception e) {
log.error("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
}
latch.countDown();
}
public void receiveMessageWang(String jsonMsg) {
log.info("[開始消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)...]");
try {
System.out.println(jsonMsg);
log.info("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)成功.]");
} catch (Exception e) {
log.error("[消費(fèi)REDIS消息隊(duì)列phone數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
}
latch.countDown();
}
/**
* 隊(duì)列消息接收方法
*
* @param jsonMsg
*/
public void receiveMessage2(String jsonMsg) {
log.info("[開始消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)...]");
try {
System.out.println(jsonMsg);
/**
* 此處執(zhí)行自己代碼邏輯 操作數(shù)據(jù)庫等
*/
log.info("[消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)成功.]");
} catch (Exception e) {
log.error("[消費(fèi)REDIS消息隊(duì)列phoneTest2數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
}
latch.countDown();
}
}
6.代碼測試
public void redisTest() {
//這個(gè)是測試同一個(gè)頻道,不同的訂閱者收到相同的信息,“phone”也就是topic也可以理解為頻道
redisTemplate.convertAndSend("phone", "223333");
//這個(gè)phoneTest2是另外的一個(gè)頻道
// redisTemplate.convertAndSend("phoneTest2", "34555665");
}
二、Redis輪詢機(jī)制
1.原理
Redis的列表類型鍵可以用來實(shí)現(xiàn)隊(duì)列,并且支持阻塞式讀取,可以實(shí)現(xiàn)一個(gè)高性能的優(yōu)先隊(duì)列, 在Redis中,List類型是按照插入順序排序的字符串鏈表。和數(shù)據(jù)結(jié)構(gòu)中的普通鏈表一樣,我們可以在其頭部(left)和尾部(right)添加新的元素。在插入時(shí),如果該鍵并不存在,Redis將為該鍵創(chuàng)建一個(gè)新的鏈表。與此相反,如果鏈表中所有的元素均被移除,那么該鍵也將會(huì)被從數(shù)據(jù)庫中刪除。List中可以包含的最大元素?cái)?shù)量是4294967295。從元素插入和刪除的效率視角來看,如果我們是在鏈表的兩頭插入或刪除元素,這將會(huì)是非常高效的操作,即使鏈表中已經(jīng)存儲(chǔ)了百萬條記錄,該操作也可以在常量時(shí)間內(nèi)完成。
2.生產(chǎn)者
public R saveUserTicket(String phoneNum) {
redisTemplate.opsForList().leftPush("ticket:Data", phoneNum);
return R.ok();
}
3.消費(fèi)者
@Scheduled(fixedRate = 1)
public synchronized void consumer() {
String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
if (!StringUtils.isEmpty(message)){
//數(shù)據(jù)庫操作
}
}
4.優(yōu)化
如上述代碼,如果此時(shí)隊(duì)列為空,消費(fèi)者依然會(huì)頻繁拉取數(shù)據(jù),造成CPU空轉(zhuǎn),不僅占用CPU資源還對(duì)Redis造成壓力。因此當(dāng)隊(duì)列為空時(shí)我們可以休眠一段時(shí)間,再進(jìn)行拉取。
實(shí)現(xiàn)如下
@Scheduled(fixedRate = 1)
public synchronized void consumer() throws InterruptedException {
long a = redisTemplate.opsForList().size("ticket:Data");
if (a == 0) {
TimeUnit.SECONDS.sleep(1);//等待時(shí)間
}
String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
if (!StringUtils.isEmpty(message)){
//數(shù)據(jù)庫操作
}
Redis發(fā)布訂閱模式實(shí)現(xiàn)原理
前言
發(fā)布訂閱系統(tǒng)在我們?nèi)粘5墓ぷ髦薪?jīng)常會(huì)使用到,這種場景大部分情況我們都是使用消息隊(duì)列,常用的消息隊(duì)列有 Kafka,RocketMQ,RabbitMQ,每一種消息隊(duì)列都有其特性,很多時(shí)候我們可能不需要獨(dú)立部署相應(yīng)的消息隊(duì)列,只是簡單的使用,而且數(shù)據(jù)量也不會(huì)太大,這種情況下,我們就可以使用 Redis 的 Pub/Sub 模型。
一、Redis發(fā)布訂閱
Redis 的發(fā)布訂閱功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令組成,一個(gè)或者多個(gè)客戶端訂閱某個(gè)或者多個(gè)頻道,當(dāng)其他客戶端向該頻道發(fā)送消息的時(shí)候,訂閱了該頻道的客戶端都會(huì)收到對(duì)應(yīng)的消息。
二、發(fā)布訂閱模式的基本命令
Psubscribe 命令訂閱一個(gè)或多個(gè)符合給定模式的頻道。每個(gè)模式以 * 作為匹配符,比如 it* 匹配所有以 it 開頭的頻道( it.news 、 it.blog 、 it.tweets )
Psubscribe 命令基本語法如下:
PSUBSCRIBE pattern [pattern …]
Redis Pubsub 命令用于查看訂閱與發(fā)布系統(tǒng)狀態(tài),它由數(shù)個(gè)不同格式的子命令組成。
PUBSUB [argument [argument …]]
Publish 命令用于將信息發(fā)送到指定的頻道。
Publish 命令基本語法如下:
PUBLISH channel message
Punsubscribe 命令用于退訂所有給定模式的頻道。
Punsubscribe 命令基本語法如下:
PUNSUBSCRIBE [pattern [pattern …]]
Subscribe 命令用于訂閱給定的一個(gè)或多個(gè)頻道的信息。
Subscribe 命令基本語法如下:
SUBSCRIBE channel [channel …]
Unsubscribe 命令用于退訂給定的一個(gè)或多個(gè)頻道的信息。
Unsubscribe 命令基本語法如下:
UNSUBSCRIBE channel [channel …]
三、發(fā)布訂閱模式的實(shí)現(xiàn)原理
Redis通過PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令實(shí)現(xiàn)了發(fā)布和訂閱功能。
- 通過SUBSCRIBE命令訂閱某個(gè)頻道后,redis-server里面維護(hù)了一個(gè)字典,字典的鍵就是一個(gè)個(gè)channel,而字典的值則是一個(gè)鏈表,鏈表中保存了所有訂閱這個(gè)channel的客戶端。SUBSCRIBE命令的關(guān)鍵,就是將客戶端添加到指定channel的訂閱鏈表中。
- 通過PUBLISH命令向訂閱者發(fā)送信息,redis-server會(huì)使用給定的頻道作為鍵,在他所維護(hù)的channel字典中查找記錄訂閱該頻道的所有客戶端的鏈表,通過遍歷這個(gè)鏈表,來將信息發(fā)布給所有的訂閱者
Pub/Sub 底層存儲(chǔ)結(jié)構(gòu)
訂閱 Channel
在 Redis 的底層結(jié)構(gòu)中,客戶端和頻道的訂閱關(guān)系是通過一個(gè)字典加鏈表的結(jié)構(gòu)保存的,形式如下:
在 Redis 的底層結(jié)構(gòu)中,Redis 服務(wù)器結(jié)構(gòu)體中定義了一個(gè) pubsub_channels 字典
struct redisServer {
//用于保存所有頻道的訂閱關(guān)系
dict *pubsub_channels
}
在這個(gè)字典中,key 代表的是頻道名稱,value 是一個(gè)鏈表,這個(gè)鏈表里面存放的是所有訂閱這個(gè)頻道的客戶端。
所以當(dāng)有客戶端執(zhí)行訂閱頻道的動(dòng)作的時(shí)候,服務(wù)器就會(huì)將客戶端與被訂閱的頻道在 pubsub_channels 字典中進(jìn)行關(guān)聯(lián)。
這個(gè)時(shí)候有兩種情況:
-
該渠道是首次被訂閱:首次被訂閱說明在字典中并不存在該渠道的信息,那么程序首先要?jiǎng)?chuàng)建一個(gè)對(duì)應(yīng)的 key,并且要賦值一個(gè)空鏈表,然后將對(duì)應(yīng)的客戶端加入到鏈表中。此時(shí)鏈表只有一個(gè)元素。
-
該渠道已經(jīng)被其他客戶端訂閱過:這個(gè)時(shí)候就直接將對(duì)應(yīng)的客戶端信息添加到鏈表的末尾就好了。
比如,如果有一個(gè)新的客戶端 Client 08 要訂閱 run 渠道,那么上圖就會(huì)變成
如果 Client 08 要訂閱一個(gè)新的渠道 new_sport ,那么就會(huì)變成
取消訂閱
上面介紹的是單個(gè) Channel 的訂閱,相反的如果一個(gè)客戶端要取消訂閱相關(guān) Channel,則無非是找到對(duì)應(yīng)的 Channel 的鏈表,從中刪除對(duì)應(yīng)的客戶端,如果該客戶端已經(jīng)是最后一個(gè)了,則將對(duì)應(yīng) Channel 也刪除。
模式訂閱結(jié)構(gòu)
模式渠道的訂閱與單個(gè)渠道的訂閱類似,不過服務(wù)器是將所有模式的訂閱關(guān)系都保存在服務(wù)器狀態(tài)的pubsub_patterns 屬性里面。
struct redisServer{
//保存所有模式訂閱關(guān)系
list *pubsub_patterns;
}
與訂閱單個(gè) Channel 不同的是,pubsub_patterns 屬性是一個(gè)鏈表,不是字典。節(jié)點(diǎn)的結(jié)構(gòu)如下:
struct pubsubPattern{
//訂閱模式的客戶端
redisClient *client;
//被訂閱的模式
robj *pattern;
} pubsubPattern;
其實(shí) client 屬性是用來存放對(duì)應(yīng)客戶端信息,pattern 是用來存放客戶端對(duì)應(yīng)的匹配模式。
所以對(duì)應(yīng)上面的 Client-06 模式匹配的結(jié)構(gòu)存儲(chǔ)如下
在pubsub_patterns鏈表中有一個(gè)節(jié)點(diǎn),對(duì)應(yīng)的客戶端是 Client-06,對(duì)應(yīng)的匹配模式是run*。
訂閱模式
當(dāng)某個(gè)客戶端通過命令psubscribe 訂閱對(duì)應(yīng)模式的 Channel 時(shí)候,服務(wù)器會(huì)創(chuàng)建一個(gè)節(jié)點(diǎn),并將 Client 屬性設(shè)置為對(duì)應(yīng)的客戶端,pattern 屬性設(shè)置成對(duì)應(yīng)的模式規(guī)則,然后添加到鏈表尾部。
創(chuàng)建新節(jié)點(diǎn);
給節(jié)點(diǎn)的屬性賦值;
將節(jié)點(diǎn)添加到鏈表的尾部;
退訂模式
退訂模式的命令是punsubscribe,客戶端使用這個(gè)命令來退訂一個(gè)或者多個(gè)模式 Channel。服務(wù)器接收到該命令后,會(huì)遍歷pubsub_patterns鏈表,將匹配到的 client 和 pattern 屬性的節(jié)點(diǎn)給刪掉。這里需要判斷 client 屬性和 pattern 屬性都合法的時(shí)候再進(jìn)行刪除。
遍歷所有的節(jié)點(diǎn),當(dāng)匹配到相同 client 屬性和 pattern 屬性的時(shí)候就進(jìn)行節(jié)點(diǎn)刪除。文章來源:http://www.zghlxwxcb.cn/news/detail-515841.html
發(fā)布消息
當(dāng)一個(gè)客戶端執(zhí)行了publish channelName message 命令的時(shí)候,服務(wù)器會(huì)從pubsub_channels和pubsub_patterns 兩個(gè)結(jié)構(gòu)中找到符合channelName 的所有 Channel,進(jìn)行消息的發(fā)送。在 pubsub_channels 中只要找到對(duì)應(yīng)的 Channel 的 key 然后向?qū)?yīng)的 value 鏈表中的客戶端發(fā)送消息。文章來源地址http://www.zghlxwxcb.cn/news/detail-515841.html
4.2 隊(duì)列消息推送
@OnMessage
public void onMessage(Session session, String message) throws IOException, ParseException {
//業(yè)務(wù)邏輯處理
//向所有用戶廣播
redisUtil.convertSend("topic", message);
}
4.3 隊(duì)列消息接收
/**
* 隊(duì)列消息接收方法
* 2022/2/21
*/
public void receiveMessage(String message) {
log.info("[開始消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)...]");
try {
//業(yè)務(wù)邏輯處理
webSocketServer.sendMessageForProject(message);
log.info("[消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)成功.]");
} catch (Exception e) {
log.error("[消費(fèi)REDIS消息隊(duì)列topic數(shù)據(jù)失敗,失敗信息:{}]", e.getMessage());
}
latch.countDown();
}
參考來源:「人苼若只茹初見」
原文鏈接: https://blog.csdn.net/printf88/article/details/123685995
到了這里,關(guān)于SpringBoot+WebSocket+Session共享的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!