適用于單客戶端,一個賬號登陸一個客戶端,登陸多個客戶端會報錯
The remote endpoint was in state [TEXT_FULL_WRITING]?文章來源:http://www.zghlxwxcb.cn/news/detail-689135.html
這是因為此時的session是不同的,只能鎖住一個session,解決此問題的方法把全局靜態(tài)對象鎖住,因為賬號是唯一的文章來源地址http://www.zghlxwxcb.cn/news/detail-689135.html
/**
* @Description 開啟springboot對websocket的支持
* @Author WangKun
* @Date 2023/8/14 17:21
* @Version
*/
@ConditionalOnProperty(name = "spring.profiles.active", havingValue = "dev")
@Configuration
public class WebSocketConfig{
/**
* @Description 注入一個ServerEndpointExporter, 會自動注冊使用@ServerEndpoint注解
* @param
* @Throws
* @Return org.springframework.web.socket.server.standard.ServerEndpointExporter
* @Date 2023-08-14 17:26:31
* @Author WangKun
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
/**
* @Description websocket服務(wù),不考慮分組
* @Author WangKun
* @Date 2023/8/14 17:29
* @Version
*/
@ConditionalOnClass(value = WebSocketConfig.class)
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocket {
private static final long SESSION_TIMEOUT = 60000;
//存放每個客戶端對應(yīng)的WebSocket對象。
private static final ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocket>> WEB_SOCKET_MAP = new ConcurrentHashMap<>();
//與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
private Session session;
private String userId;
/**
* @param o
* @Description 重寫防止session重復(fù)
* @Throws
* @Return boolean
* @Date 2023-09-01 10:02:51
* @Author WangKun
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WebSocket that = (WebSocket) o;
return Objects.equals(session, that.session);
}
@Override
public int hashCode() {
return Objects.hash(session);
}
/**
* @param session
* @param userId
* @Description 建立連接
* @Throws
* @Return void
* @Date 2023-08-14 17:52:08
* @Author WangKun
*/
@SneakyThrows
@OnOpen
public void onOpen(final Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
session.setMaxIdleTimeout(SESSION_TIMEOUT);
//先查找是否有uniCode
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
if (users == null) {
//處理多個同時連接并發(fā)
synchronized (WEB_SOCKET_MAP) {
if (!WEB_SOCKET_MAP.contains(userId)) {
users = new CopyOnWriteArraySet<>();
WEB_SOCKET_MAP.put(userId, users);
}
}
}
users.add(this);
sendMessage(String.valueOf(ResponseCode.CONNECT_SUCCESS.getCode()));
log.info("用戶--->{} 連接成功,當(dāng)前在線人數(shù)為--->{}", userId, WEB_SOCKET_MAP.size());
}
/**
* @param message
* @Description 向客戶端發(fā)送消息 session.getBasicRemote()與session.getAsyncRemote()的區(qū)別
* @Throws
* @Return void
* @Date 2023-08-14 17:51:07
* @Author WangKun
*/
@SneakyThrows
public void sendMessage(String message) {
// 加鎖避免阻塞
// 如果有多個客戶端的話,亦或者同一個用戶,或者打開了多個瀏覽器(同一個用戶打開多個客戶端或者多個界面),開了多個頁面,此時Session是不同的,只能鎖住一個session,所以鎖住全局靜態(tài)對象
// synchronized(session) {
// this.session.getBasicRemote().sendText(message);
// }
synchronized (WEB_SOCKET_MAP) {
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
if (users != null) {
for (WebSocket user : users) {
// 判斷當(dāng)前客戶端的用戶是否打開連接
if (user.session.isOpen()) {
user.session.getBasicRemote().sendText(message);
log.info("向客戶端發(fā)送數(shù)據(jù)--->{} 數(shù)據(jù)為--->{}", userId, message);
}
}
}
}
}
/**
* @param
* @Description 關(guān)閉連接
* @Throws
* @Return void
* @Date 2023-08-14 17:52:30
* @Author WangKun
*/
@SneakyThrows
@OnClose
public void onClose(Session session) {
// 避免多人同時在線直接關(guān)閉通道。
CopyOnWriteArraySet<WebSocket> copyOnWriteArraySet = WEB_SOCKET_MAP.get(this.userId);
if (!copyOnWriteArraySet.isEmpty()) {
Object[] objects = copyOnWriteArraySet.toArray();
for (Object object : objects) {
if (((WebSocket) object).session.equals(session)) {
//刪除當(dāng)前用戶
WEB_SOCKET_MAP.get(this.userId).remove((WebSocket) object);
// 如果有一個客戶端登陸 下線清除用戶
if (WEB_SOCKET_MAP.get(this.userId).isEmpty()) {
WEB_SOCKET_MAP.remove(this.userId);
}
CloseReason close = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "關(guān)閉客戶端,下線!");
session.close(close);
log.info("用戶--->{} 關(guān)閉連接!", userId);
}
}
}
}
/**
* @param message
* @param session
* @Description 收到客戶端消息
* @Throws
* @Return void
* @Date 2023-08-15 10:54:55
* @Author WangKun
*/
@SneakyThrows
@OnMessage
public void onMessage(String message, Session session) {
//枷鎖避免多個資源互搶
//這一塊可以操作數(shù)據(jù),比如存到數(shù)據(jù)
// 同一個用戶,多個地方登錄(多個session),循環(huán)發(fā)送消息,
// 如果有多個客戶端的話,亦或者同一個用戶,或者打開了多個瀏覽器,開了多個頁面,此時Session是不同的,只能鎖住一個session,所以鎖住全局靜態(tài)對象
synchronized (WEB_SOCKET_MAP) {
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
if (users != null) {
for (WebSocket user : users) {
if (user.session.isOpen()) {
user.session.getBasicRemote().sendText("pong");
log.info("收到客戶端發(fā)送的心跳的數(shù)據(jù)--->{} 數(shù)據(jù)為--->{}", userId, message);
}
}
}
}
}
/**
* @param session
* @param error
* @Description 發(fā)生錯誤時
* @Throws
* @Return void
* @Date 2023-08-15 10:55:27
* @Author WangKun
*/
@SneakyThrows
@OnError
public void onError(Session session, Throwable error) {
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
if (!users.isEmpty()) {
Object[] objects = users.toArray();
for (Object object : objects) {
if (((WebSocket) object).session.equals(session)) {
//刪除當(dāng)前用戶
WEB_SOCKET_MAP.get(this.userId).remove((WebSocket) object);
// 如果有一個客戶端登陸 下線清除用戶
if (WEB_SOCKET_MAP.get(this.userId).isEmpty()) {
WEB_SOCKET_MAP.remove(this.userId);
}
CloseReason close = new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "異常,下線!");
session.close(close);
log.error("用戶--->{} 錯誤!" + userId, "原因--->{}" + error.getMessage(), error);
}
}
}
// WEB_SOCKET_MAP.remove(userId);
// log.error("用戶--->{} 錯誤!" + userId, "原因--->{}" + error.getMessage(), error);
}
/**
* @param userId
* @param message
* @Description 通過userId向客戶端發(fā)送消息(指定用戶發(fā)送)
* @Throws
* @Return void
* @Date 2023-08-14 18:01:35
* @Author WangKun
*/
public static void sendTextMessageByUserId(String userId, String message) {
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
if (users != null) {
for (WebSocket user : users) {
user.sendMessage(message);
log.info("服務(wù)端發(fā)送消息到用戶{},消息:{}", userId, message);
}
}
}
/**
* @param message
* @Description 群發(fā)自定義消息
* @Throws
* @Return void
* @Date 2023-08-14 18:03:38
* @Author WangKun
*/
public static void sendTextMessage(String message) {
// 如果在線一個就廣播
if (!WEB_SOCKET_MAP.isEmpty()) {
for (String item : WEB_SOCKET_MAP.keySet()) {
CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(item);
if (users != null) {
for (WebSocket user : users) {
user.sendMessage(message);
log.info("服務(wù)端發(fā)送消息到用戶{},消息:{}", item, message);
}
}
}
}
}
}
到了這里,關(guān)于SpringBoot2.0集成WebSocket,多客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!