一、第一種方式-原生注解(tomcat內嵌)
1.1、引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.2、配置文件
package cn.jt.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月06日
*/
@Configuration
public class WebSocketConfig {
/**
* 初始化Bean,它會自動注冊使用了 @ServerEndpoint 注解聲明的 WebSocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
1.3、構建安全的WebSocket抽象層
1、該類可以作為一個基礎的安全抽象層,后續(xù)項目中如果需要做認證的操作,都可以繼承該抽象類
ClientUserInfoService 大家可以看作一個 UserService 就是一張用戶表的service類
這里認證采用的是 jwt的方式,大家可以換成自己的
2、大家這里注意,我們使用的是 javax.websocket.Session;
這個是tomcat下的
package cn.jt.websocket;
import cn.jt.client.entity.ClientUserInfo;
import cn.jt.client.service.ClientUserInfoService;
import cn.jt.jwt.JwtUtils;
import cn.jt.utils.SpringContextUtils;
import lombok.extern.slf4j.Slf4j;
import javax.websocket.Session;
import java.io.IOException;
import java.util.Date;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月06日
*/
@Slf4j
public abstract class SecureWebSocket {
private static final ClientUserInfoService clientUserInfoService;
static {
clientUserInfoService = SpringContextUtils.getBean(ClientUserInfoService.class);
}
protected Session session;
protected String token;
protected Long tokenExpiresAt;
protected ClientUserInfo clientUserInfo;
/**
* 驗證token是否有效(包含有效期)
*
* @param token token
* @param isInit 是否對token和userInfo進行初始化賦值
* @return boolean
*/
protected boolean isTokenValid(String token, boolean isInit) {
ClientUserInfo clientUserInfo;
try {
clientUserInfo = JwtUtils.getClientUserInfo(token);
} catch (Exception e) {
log.error("ws 認證失敗", e);
return false;
}
if (isInit) {
this.clientUserInfo = clientUserInfo;
this.tokenExpiresAt = JwtUtils.getDecodedJWT(token).getExpiresAt().getTime();
this.token = token;
}
return true;
}
/**
* 認證失敗,斷開連接
*
* @param session session
*/
protected void sendAuthFailed(Session session) {
try {
session.getBasicRemote().sendText("認證失敗");
session.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
1.4、構建基礎的WebSocket
1、代碼很簡單,大家一看就知道邏輯了,這里就解釋一下各個注解的含義
- @ServerEndpoint:將目前的類定義成一個websocket服務器端,注解的值將被用于監(jiān)聽用戶連接的終端訪問URL地址,客戶端可以通過這個URL來連接到WebSocket服務器端
- @OnOpen:當WebSocket建立連接成功后會觸發(fā)這個注解修飾的方法。
- @OnClose:當WebSocket建立的連接斷開后會觸發(fā)這個注解修飾的方法。
- @OnMessage:當客戶端發(fā)送消息到服務端時,會觸發(fā)這個注解修改的方法。
- @OnError:當WebSocket建立連接時出現異常會觸發(fā)這個注解修飾的方法。
2、大家這里注意,我們使用的是 javax.websocket.Session;
這個是tomcat下的
package cn.jt.websocket;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author GXM
* @version 1.0.0
* @Description
* @createTime 2023年07月06日
*/
@Slf4j
@ServerEndpoint("/globalWs/{token}")
@Component
public class GlobalWebsocket extends SecureWebSocket {
/**
* key: userKye
* value: GlobalWebsocket 這里你直接存儲 session 也是可以的
*/
private static final Map<String, GlobalWebsocket> CLIENTS = new ConcurrentHashMap<>();
/**
* // 如果允許 一個賬號 多人登錄的話 就 加上 "-" + tokenTime,因為每次登錄的token過期時間都是不一樣的
* clientUserInfo.getId() + "-" + clientUserInfo.getAccount() ;
*/
private String userKye;
@OnOpen
public void onOpen(Session session, @PathParam("token") String token) {
if (!isTokenValid(token, true)) {
sendAuthFailed(session);
return;
}
this.session = session;
this.userKye = clientUserInfo.getId() + "-" + clientUserInfo.getAccount() + "-" + super.tokenExpiresAt;
CLIENTS.put(userKye, this);
log.info("當前在線用戶:{}", CLIENTS.keySet());
try {
session.getBasicRemote().sendText("連接成功!");
} catch (IOException e) {
e.printStackTrace();
}
}
@OnMessage
public String onMessage(Session session, String message) {
// 先判斷當前token 是否已經到期了
if (!isTokenValid(token, false)) {
sendAuthFailed(session);
return null;
}
try {
session.getBasicRemote().sendText("received");
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@OnError
public void onError(Session session, Throwable throwable) {
// log.error("ws session 發(fā)生錯誤,session key is {}",throwable);
log.error("ws session 發(fā)生錯誤:{}", throwable.getMessage());
}
@OnClose
public void onClose(Session session) {
CLIENTS.remove(userKye);
log.info("ws 用戶 userKey {} 已下線,當前在線用戶:{}", userKye, CLIENTS.keySet());
}
/**
* 發(fā)送消息
*
* @param messageVo
*/
public void sendMessage(MessageVo messageVo) {
try {
this.session.getBasicRemote().sendText(JSON.toJSONString(messageVo));
} catch (IOException e) {
log.error("發(fā)送消息異常", e);
}
}
/**
* 向user精確用戶發(fā)送消息
*
* @param userKey 由 account + "-" + refreshToken的簽發(fā)時間組成,例:"admin-1635830649000"
* @param messageVo 消息內容
*/
public static void sendToUser(String userKey, MessageVo messageVo) {
GlobalWebsocket globalWebsocket = CLIENTS.get(userKey);
if (null != globalWebsocket) {
globalWebsocket.sendMessage(messageVo);
return;
}
log.error("發(fā)送消息到指定用戶,但是用戶不存在,userKey is {},message is {}", userKey, JSON.toJSONString(messageVo));
}
/**
* 全體組播消息
*
* @param
*/
public static void broadcast(MessageVo messageVo) {
CLIENTS.values().forEach(c -> {
Session curSession = c.session;
if (curSession.isOpen()) {
try {
curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));
} catch (IOException e) {
log.error("發(fā)送ws數據錯誤:{}", e.getMessage());
}
}
}
);
}
}
1.5、SpringBoot 開啟 WebSocket
@EnableWebSocket
1.6、高并發(fā)時候的問題
1、這里要說明一下在高并發(fā)下的問題,如果你同時向在線的 3 個webSocket 在線客戶端發(fā)送消息,即廣播所有在線用戶(目前是3個),每個用戶每秒10條,那就是說,你每秒要發(fā)送 30 條數據,我們調用上述的廣播函數 broadcast()
,有時候會出現
java.lang.IllegalStateException: 遠程 endpoint 處于 [xxxxxx] 狀態(tài),如:
The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for calle
這是因為在高并發(fā)的情況下,出現了session搶占的問題,導致session,的狀態(tài)不一致,所以,這里可以去嘗試加鎖操作,如下
public static final ExecutorService WEBSOCKET_POOL_EXECUTOR = new ThreadPoolExecutor(
20, 20,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setNameFormat("GlobalWebsocket-executor-" + "%d")
.setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable)).build(),
new ThreadPoolExecutor.AbortPolicy());
/**
* 全體組播消息
*
* @param
*/
public static void broadcast(MessageVo messageVo) {
CLIENTS.values().forEach(c -> {
Session curSession = c.session;
if (curSession.isOpen()) {
// 建議單個session 一個線程,避免 一個session會話網絡不好,會出現超時異常,當前線程會因此中斷。
// 導致后面的session沒有進行發(fā)送操作。使用單個線程,單個session情況下避免session之間的相互影響。
WEBSOCKET_POOL_EXECUTOR.execute(() -> {
synchronized (curSession) {
// 雙重鎖檢查,外邊的 isOpen 第一遍過濾,里面枷加鎖之后,第二遍過濾
if (curSession.isOpen()) {
try {
curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));
} catch (IOException e) {
log.error("發(fā)送ws數據錯誤:{}", e.getMessage());
}
}
}
});
}
}
);
}
其中增加了,雙重鎖檢查,以及線程池操作,當然加上鎖之后,性能是肯定會有所下降的
建議單個session 一個線程,避免 一個session會話網絡不好,會出現超時異常,當前線程會因此中斷
2、按照上述的代碼,我這邊測試12個webSocket 鏈接,每秒每個客戶端都發(fā)送10條數據,相當于每秒發(fā)送120條數據,目前看來,速度還是不錯的,但是當客戶端重連后,偶爾會出現錯誤信息 遠程主機已經關閉了一個鏈接
,類似于這種錯誤,這條錯誤日志是在廣播代碼的如下位置打印的,這是因為當準備發(fā)送消息的時候,遠程客戶端還是關閉了。
try {
curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));
} catch (IOException e) {
log.error("發(fā)送ws數據錯誤:{}", e.getMessage());
}
二、第二種方式-Spring封裝
2.1、引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.2、自己的webSocket處理service
1、WebSocketService 處理器類如下
類似于 UserService 等等,主要是抽出一部分的業(yè)務邏輯
package cn.jt.websocket.spring;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月19日
*/
public interface WebSocketService {
/**
* 會話開始回調
*
* @param session 會話
*/
void handleOpen(WebSocketSession session);
/**
* 會話結束回調
*
* @param session 會話
*/
void handleClose(WebSocketSession session);
/**
* 處理消息
*
* @param session 會話
* @param message 接收的消息
*/
void handleMessage(WebSocketSession session, String message);
/**
* 發(fā)送消息
*
* @param session 當前會話
* @param message 要發(fā)送的消息
* @throws IOException 發(fā)送io異常
*/
void sendMessage(WebSocketSession session, String message) throws IOException;
/**
* 發(fā)送消息
*
* @param userId 用戶id
* @param message 要發(fā)送的消息
* @throws IOException 發(fā)送io異常
*/
void sendMessage(Integer userId, TextMessage message) throws IOException;
/**
* 發(fā)送消息
*
* @param userId 用戶id
* @param message 要發(fā)送的消息
* @throws IOException 發(fā)送io異常
*/
void sendMessage(Integer userId, String message) throws IOException;
/**
* 發(fā)送消息
*
* @param session 當前會話
* @param message 要發(fā)送的消息
* @throws IOException 發(fā)送io異常
*/
void sendMessage(WebSocketSession session, TextMessage message) throws IOException;
/**
* 廣播
*
* @param message 字符串消息
* @throws IOException 異常
*/
void broadCast(String message) throws IOException;
/**
* 廣播
*
* @param message 文本消息
* @throws IOException 異常
*/
void broadCast(TextMessage message) throws IOException;
/**
* 處理會話異常
*
* @param session 會話
* @param error 異常
*/
void handleError(WebSocketSession session, Throwable error);
}
2、WebSocketServiceImpl 實現類如下
類似于 UserServiceImpl 等等,主要是抽出一部分的業(yè)務邏輯
package cn.jt.websocket.spring;
import cn.jt.client.entity.ClientUserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月19日
*/
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {
private final Map<Integer, WebSocketSession> clients = new ConcurrentHashMap<>();
@Override
public void handleOpen(WebSocketSession session) {
// 這個時候就需要在建立 webSocket 時存儲的 用戶信息了
Map<String, Object> attributes = session.getAttributes();
ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo");
clients.put(clientUserInfo.getId(), session);
log.info("a new connection opened,current online count:{}", clients.size());
}
@Override
public void handleClose(WebSocketSession session) {
// 這個時候就需要在建立 webSocket 時存儲的 用戶信息了
Map<String, Object> attributes = session.getAttributes();
ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo");
clients.remove(clientUserInfo.getId());
log.info("a new connection closed,current online count:{}", clients.size());
}
@Override
public void handleMessage(WebSocketSession session, String message) {
// 只處理前端傳來的文本消息,并且直接丟棄了客戶端傳來的消息
log.info("received a message:{}", message);
}
@Override
public void sendMessage(WebSocketSession session, String message) throws IOException {
this.sendMessage(session, new TextMessage(message));
}
@Override
public void sendMessage(Integer userId, TextMessage message) throws IOException {
WebSocketSession webSocketSession = clients.get(userId);
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(message);
}
}
@Override
public void sendMessage(Integer userId, String message) throws IOException {
this.sendMessage(userId, new TextMessage(message));
}
@Override
public void sendMessage(WebSocketSession session, TextMessage message) throws IOException {
session.sendMessage(message);
}
@Override
public void broadCast(String message) throws IOException {
clients.values().forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
@Override
public void broadCast(TextMessage message) throws IOException {
clients.values().forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(message);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
@Override
public void handleError(WebSocketSession session, Throwable error) {
log.error("websocket error:{},session id:{}", error.getMessage(), session.getId());
log.error("", error);
}
}
2.3、實現spring框架的WebSocket處理器
1、注意這里的 webSocketSession
就是 spring 包下的了,不再是 tomcat
包下的了
這里其實就和我們之前使用原生注解(tomcat)的那個一樣了,都是幾個特定的函數
我們在特定的方法下,調用我們自己的 service去單獨處理,解耦合
package cn.jt.websocket.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.*;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月19日
*/
public class DefaultWebSocketHandler implements WebSocketHandler {
@Autowired
private WebSocketService webSocketService;
/**
* 建立連接
*
* @param session Session
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
webSocketService.handleOpen(session);
}
/**
* 接收消息
*
* @param session Session
* @param message 消息
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
webSocketService.handleMessage(session, textMessage.getPayload());
}
}
/**
* 發(fā)生錯誤
*
* @param session Session
* @param exception 異常
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
webSocketService.handleError(session, exception);
}
/**
* 關閉連接
*
* @param session Session
* @param closeStatus 關閉狀態(tài)
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
webSocketService.handleClose(session);
}
/**
* 是否支持發(fā)送部分消息
*
* @return false
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
2.4、自定義攔截器
這里,我們可以設置攔截器,在做請求參數,或者權限認證的時候,不用在建立鏈接的函數
afterConnectionEstablished
里面去處理
可以理解為 springMvc 每次請求前的攔截器
package cn.jt.websocket.spring;
import cn.jt.client.entity.ClientUserInfo;
import cn.jt.jwt.JwtUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月19日
*/
@Slf4j
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* 建立請求之前,可以用來做權限判斷
*
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param attributes the attributes from the HTTP handshake to associate with the WebSocket
* session; the provided attributes are copied, the original map is not used.
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
// 模擬用戶(通常利用JWT令牌解析用戶信息)
String token = servletServerHttpRequest.getServletRequest().getParameter("token");
try {
ClientUserInfo clientUserInfo = JwtUtils.getClientUserInfo(token);
// 設置當前這個session的屬性,后續(xù)我們在發(fā)送消息時,可以通過 session.getAttributes().get("clientUserInfo")可以取出 clientUserInfo參數
attributes.put("clientUserInfo", clientUserInfo);
} catch (Exception e) {
log.error("webSocket 認證失敗 ", e);
return false;
}
return true;
}
return false;
}
/**
* 建立請求之后
*
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param exception an exception raised during the handshake, or {@code null} if none
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
2.5、WebSocket配置
將自定義處理器、攔截器以及WebSocket操作類依次注入到IOC容器中。
- @EnableWebSocket:開啟WebSocket功能
- addHandler:添加處理器
- addInterceptors:添加攔截器
- setAllowedOrigins:設置允許跨域(允許所有請求來源)
package cn.jt.websocket.spring;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年07月19日
*/
@Configuration
public class WebSocketConfiguration implements WebSocketConfigurer {
@Bean
public DefaultWebSocketHandler defaultWebSocketHandler() {
return new DefaultWebSocketHandler();
}
@Bean
public WebSocketService webSocket() {
return new WebSocketServiceImpl();
}
@Bean
public WebSocketInterceptor webSocketInterceptor() {
return new WebSocketInterceptor();
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 鏈接方式如下 ws://127.0.0.1:9085/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb
// 如果你設置了springboot的 contentPath 那就需要在:9085端口后面 加上contentPath 的值,在拼接上 globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb
registry.addHandler(defaultWebSocketHandler(), "/globalWs/message")
.addInterceptors(webSocketInterceptor())
.setAllowedOrigins("*");
}
}
2.6、SpringBoot 開啟 WebSocket
@EnableWebSocket
2.7、鏈接
1、其中 thermal-api
是我的項目名稱
2、鏈接路徑如下
ws://127.0.0.1:9085/thermal-api/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb
2.8、高并發(fā)時候的問題
1、如果在廣播的時候,客戶端很多,發(fā)送的消息也是很多,還是會出現和之前 第一種方式-原生注解(tomcat內嵌)
相同的問題,出現類似如下報錯
The remote endpoint was in state [xxxx] which is an invalid state for calle
2、錯誤分析可以看 踩坑筆記 Spring websocket并發(fā)發(fā)送消息異常,寫的很清楚。
2.8.1、解決方案一
1、和之前一樣,加鎖
@Override
public void broadCast(String message) throws IOException {
clients.values().forEach(session -> {
if (session.isOpen()) {
synchronized (session){
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});
}
2.8.2、解決方案二
1、使用 spring 的,Spring 的解決方案是把原來的 WebSocketSession 封了一層,即 org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator
3、代碼稍微改一下,如下
@Override
public void handleOpen(WebSocketSession session) {
// 這個時候就需要在建立 webSocket 時存儲的 用戶信息了
Map<String, Object> attributes = session.getAttributes();
ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo");
clients.put(clientUserInfo.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
log.info("a new connection opened,current online count:{}", clients.size());
}
第三種方式-TIO
1、請上網了解,用的比較少,不做過多說明文章來源地址http://www.zghlxwxcb.cn/news/detail-702422.html文章來源:http://www.zghlxwxcb.cn/news/detail-702422.html
第四種方式-STOMP
1、請上網了解,用的比較少,不做過多說明
到了這里,關于springBoot使用webSocket的幾種方式以及在高并發(fā)出現的問題及解決的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!