一、WebSocket配置類
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author HFL
* @date 2022/5/16 14:49
* 配置類
*/
@Configuration
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
二、WebSocket服務(wù)端類
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/**
* @author HFL
* @date 2022/5/16 15:17
* Websocket應(yīng)用實現(xiàn):
* 1.建立連接,連接放入連接池
* 2.關(guān)閉連接,連接移出連接池
* 3.接收客戶端發(fā)送的消息,并做出相應(yīng)處理
* 4.注入業(yè)務(wù)層的service
* [注意:Spring管理的Bean是單例模式的,而WebSocket不是單例,注入時需要處理一下]
* 5.異常處理,連接移除連接池
*/
@Slf4j
@Component
@ServerEndpoint("/endPoint/{screen}")
public class WebSocketServer {
/**
* 建立連接成功調(diào)用 (Session + 場景)
*/
@OnOpen
public void onOpen(Session session,@PathParam("screen") String screen) throws IOException {
log.info("[onOpen][session({}) 接入, [screen: {}]", session, screen);
WebSocketServerPool.addDataConnect(session, screen);
//WebSocketServerPool.sendMessage(session, configurationScreenService.queryAllJsonById(screen));
}
/**
* 關(guān)閉連接時調(diào)用
* @param session 連接
*/
@OnClose
public void onClose(Session session, CloseReason closeReason) {
log.info("[onClose][session({}) 連接關(guān)閉。關(guān)閉原因是({})}]", session, closeReason);
WebSocketServerPool.removeConnect(session);
}
/**
* 錯誤時調(diào)用
* @param session 連接
* @param throwable 異常
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.info("[onClose][session({}) 發(fā)生異常]", session, throwable);
WebSocketServerPool.removeConnect(session);
}
/**
* 收到客戶端信息后,根據(jù)接收到的信息進(jìn)行處理
* @param session 連接
* @param message 數(shù)據(jù)消息
*/
@OnMessage
public void onMessage(Session session, String message) {
log.info("[onOpen][session({}) 接收到一條消息({})]", session, message);
// TODO: 2022/5/18 對于客戶端發(fā)送的指令信息,解析后進(jìn)行對應(yīng)的邏輯處理
}
}
三、WebSocket的連接池類
import javax.websocket.Session;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author HFL
* @date 2022/5/16 9:39
* Websocket連接池、對連接池內(nèi)連接操作 和數(shù)據(jù)推送方法
*/
public class WebSocketServerPool {
/**
* WebSocket連接池
*/
private static ConcurrentMap<Session, String> dataConnect = new ConcurrentHashMap<>();
/**
* 將websocket連接,放入連接池
* @param session websocket連接
* @param screen 場景ID
*/
public static void addDataConnect(Session session, String screen){
dataConnect.put(session, screen);
Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();
synchronized (iterator){
//移除失效連接
while(iterator.hasNext()){
Map.Entry<Session, String> entry = iterator.next();
Session sessionNew = entry.getKey();
Map<String, Object> userProperties = sessionNew.getUserProperties();
if(null != userProperties && null != userProperties.get("ReadyState") && "0" != String.valueOf(userProperties.get("ReadyState"))){
iterator.remove();
}
}
}
}
/**
* 將websocket連接從連接池中移除
* @param session websocket連接
*/
public static void removeConnect(Session session){
Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();
synchronized (iterator){
//主動移除連接
while (iterator.hasNext()){
if(session.equals(iterator.next().getKey())){
iterator.remove();
}
}
}
}
/**
* 獲取連接池中所有連接
* @return 連接池所有數(shù)據(jù)
*/
public static ConcurrentMap<Session, String> getDataConnect(){
return dataConnect;
}
/**
* Websocket消息推送
* @param session 連接
* @param message 消息主體
* @throws IOException I/O異常
*/
public static void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
}
四、啟動Spring Boot服務(wù)
import com.ljgk.ems.maitreya.user.annotation.EnableLoginArgResolver;
import com.ljgk.ems.maitreya.validator.config.EnableFormValidator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* @author HFL
* @date 2022/5/13 8:40
* 僅做測試,只需要@SpringBootApplication注釋即可,
* 其他注釋,用于其他的功能,例如:
* @EnableScheduling用于掃描開啟定時任務(wù)即@Scheduled注解的方法
*/
@SpringBootApplication
@EnableDiscoveryClient
@Configuration
@EnableFeignClients(value = {"com.ljgk.ems.maitreya"})
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
@Slf4j
@EnableLoginArgResolver
@EnableFormValidator
@EnableScheduling
public class ServerApplication {
public static void main(String[] args) throws UnknownHostException {
ConfigurableApplicationContext application = SpringApplication.run(ConfigurationServerApplication.class, args);
}
}
五、測試WebSocket連接
- WebSocket在線測試工具:
http://www.easyswoole.com/wstool.html - 測試連接
服務(wù)地址:ws://172.18.42.29:14785/endPoint/1
服務(wù)啟動的IP:172.18.42.29
服務(wù)端口:14785
WS的URl:/endPoint
入?yún)ⅲ?
六、遇到的問題
-
服務(wù)啟動報錯:
2022-06-09 10:31:27.616:[ERROR] [main:18941] [org.springframework.boot.SpringApplication.reportFailure:826] --> Application run failed java.lang.IllegalStateException: Failed to register @ServerEndpoint class: class com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer$$EnhancerBySpringCGLIB$$8a624780 at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:159) at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoints(ServerEndpointExporter.java:134) at org.springframework.web.socket.server.standard.ServerEndpointExporter.afterSingletonsInstantiated(ServerEndpointExporter.java:112) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:896) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) at com.ljgk.ems.maitreya.ConfigurationServerApplication.main(ConfigurationServerApplication.java:36) Caused by: javax.websocket.DeploymentException: UT003027: Class class com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer$$EnhancerBySpringCGLIB$$8a624780 was not annotated with @ClientEndpoint or @ServerEndpoint at io.undertow.websockets.jsr.ServerWebSocketContainer.addEndpointInternal(ServerWebSocketContainer.java:735) at io.undertow.websockets.jsr.ServerWebSocketContainer.addEndpoint(ServerWebSocketContainer.java:628) at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:156) ... 12 common frames omitted
原因:
WebSocketServer類被代理了,我出現(xiàn)這個問題的原因是做了整個項目的接口攔截,然后做了接口的日志切面處理,導(dǎo)致這個類被代理了,而@ServerEndpoint注解在處理WebSocketServer時,取到了代理的類,無法處理導(dǎo)致的異常。解決辦法:
? 將WebSocketServer類從日志處理的切面中排除掉即可。
-
加入業(yè)務(wù)處理時報錯:
java.lang.NullPointerException: null at com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer.onOpen(WebSocketServer.java:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at io.undertow.websockets.jsr.annotated.BoundMethod.invoke(BoundMethod.java:87) at io.undertow.websockets.jsr.annotated.AnnotatedEndpoint$3.run(AnnotatedEndpoint.java:158) at io.undertow.websockets.jsr.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:170) at io.undertow.websockets.jsr.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:167)
原因:
通過注解@Resource自動注入的ConfigurationScreenService的Bean為空,即Spring容器中,ConfigurationScreenService沒有注入進(jìn)去,因為Spring管理的Bean的作用域和WebSocket的作用域不同,Spring管理的Bean都是單例,WebSocket不是。
解決辦法:? 將截圖中通過@Resource注解注入ConfigurationScreenService的方式換成下面方式:
/** * @author HFL * @date 2022/5/16 15:17 * Websocket應(yīng)用實現(xiàn): * 1.建立連接,連接放入連接池 * 2.關(guān)閉連接,連接移出連接池 * 3.接收客戶端發(fā)送的消息,并做出相應(yīng)處理 * 4.注入業(yè)務(wù)層的service * [注意:Spring管理的Bean是單例模式的,而WebSocket不是單例,注入時需要處理一下] * 5.異常處理,連接移除連接池 */ @Slf4j @Component @ServerEndpoint("/endPoint/{screen}") public class WebSocketServer { private static ConfigurationScreenService configurationScreenService; @Resource public void setConfigurationScreenService(ConfigurationScreenService configurationScreenService){ WebSocketServer.configurationScreenService = configurationScreenService; } /** * 建立連接成功調(diào)用 (Session + 場景ID) */ @OnOpen public void onOpen(Session session,@PathParam("screen") String screen) throws IOException { log.info("[onOpen][session({}) 接入, [screen: {}]", session, screen); WebSocketServerPool.addDataConnect(session, screen); WebSocketServerPool.sendMessage(session, configurationScreenService.queryAllJsonById(screen)); } ... }
七、擴(kuò)展(實現(xiàn)實時推送數(shù)據(jù))
-
定時任務(wù),輪詢連接池中的連接,并取到對于的場景、綁定的設(shè)備,即可查詢最新數(shù)據(jù),最后推送至客戶端。(最簡單實現(xiàn))
import cn.hutool.json.JSONUtil; import com.ljgk.ems.maitreya.configuration.entity.Content; import com.ljgk.ems.maitreya.configuration.service.ConfigurationScreenDataService; import com.ljgk.ems.maitreya.configuration.vm.ScreenDataJsonVm; import com.ljgk.ems.maitreya.configuration.websocket.WebSocketServerPool; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.Session; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; /** * @author HFL * @date 2022/6/88:35 */ @Slf4j @Component public class ScreenPushScheduledTask { @Resource private ConfigurationScreenDataService configurationScreenDataService; /** * 5秒一次 * 定時場景數(shù)據(jù)推送 */ @Scheduled(cron = "0/5 * * * * ? ") public void executeScreenDataPush(){ try { ConcurrentMap<Session, String> dataConnect = WebSocketServerPool.getDataConnect(); //查詢待推送場景 dataConnect.keySet().forEach(session -> { try { String screen = dataConnect.get(session); //查詢需要的場景對應(yīng)的元件需要的值,并按規(guī)則組裝成JSON List<ScreenDataJsonVm> screenDataJsonVms = configurationScreenDataService.queryDataJson(screen); WebSocketServerPool.sendMessage(session, JSONUtil.toJsonStr(screenDataJsonVms)); } catch (IOException e) { log.error("WebSocket SendMessage Error, Session:[{}], Exception : [{}]", session, e.getMessage()); } }); }catch (Exception e){ log.error("WebSocket Scheduler Executor Error : [{}]", e.getMessage()); } } }
-
監(jiān)聽Binlog日志,將MySql中變化數(shù)據(jù)取出,推送至客戶端。
-
RocketMq實現(xiàn),將變化數(shù)據(jù)寫入隊列,WS服務(wù)端消費(fèi)隊列中數(shù)據(jù)時,推送至客戶端。文章來源:http://www.zghlxwxcb.cn/news/detail-424830.html
八、參考
1. WebSocket原理及技術(shù)簡介
2. Java實現(xiàn)WebSocket服務(wù)端
3. Java Websocket——服務(wù)器端
4.【W(wǎng)ebSocket】SpringBoot整合WebSocket 注入Bean的方式文章來源地址http://www.zghlxwxcb.cn/news/detail-424830.html
到了這里,關(guān)于【Spring Boot 實現(xiàn) WebSocket實時數(shù)據(jù)推送-服務(wù)端】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!