用途
- 實時獲取服務(wù)端的最新數(shù)據(jù)
- 查看調(diào)度任務(wù)的進度和執(zhí)行狀態(tài)
- 用戶感知:修改數(shù)據(jù)后,相關(guān)用戶收到信息
- 提升用戶體驗:耗時業(yè)務(wù)異步處理(Excel導(dǎo)入導(dǎo)出,復(fù)雜計算)
前端輪詢
這種方式實現(xiàn)簡單,前端通過setInterval
定時去請求接口來獲取最新的數(shù)據(jù),當(dāng)實時性要求不高,更新頻率低的情況下可以使用這種方式。但是當(dāng)實時性很高的時候,我們的請求會很頻繁,服務(wù)器的消耗非常大,而且每次請求的時候服務(wù)端的數(shù)據(jù)可能還沒有改變,導(dǎo)致很多請求都是沒有意義的。
javascript
復(fù)制代碼
????setInterval(function?()?{ ????????????//?請求接口操作 ????????????//?。。。 ????????}, ????????3000 ????);
webSocket
WebSocket是基于TCP協(xié)議的,它是全雙工通信的,服務(wù)端可以向客戶端發(fā)送信息,客戶端同樣可以向服務(wù)器發(fā)送指令,常用于聊天應(yīng)用中。
pom.xml
SpringBoot提供了websocket的starter
xml
復(fù)制代碼
????????<dependency> ????????????<groupId>org.springframework.boot</groupId> ????????????<artifactId>spring-boot-starter-websocket</artifactId> ????????</dependency>
config類
注入ServerEndpointExporter
,這個bean會自動注冊使用了@ServerEndpoint
注解聲明的Websocket endpoint
typescript
復(fù)制代碼
@Configuration public?class?WebSocketConfig?{ ????@Bean ????public?ServerEndpointExporter?serverEndpointExporter()?{ ????????return?new?ServerEndpointExporter(); ????} }
server類
創(chuàng)建一個服務(wù)類:
- 加上
@ServerEndpoint
注解,設(shè)置WebSocket連接點的服務(wù)地址。 - 創(chuàng)建
AtomicInteger
用于記錄連接數(shù) - 創(chuàng)建
ConcurrentHashMap
用于存放連接信息 -
@OnOpen
注解表明該方法在建立連接后調(diào)用 -
@OnClose
注解表明該方法在斷開連接后調(diào)用 -
@OnError
注解表明該方法在連接異常調(diào)用 -
@OnMessage
注解表明該方法在收到客戶端消息后調(diào)用 - 創(chuàng)建推送信息的方法
- 創(chuàng)建移除連接的方法
typescript
復(fù)制代碼
@ServerEndpoint("/websocket/{userId}") @Component public?class?WebSocketServer?{ ????private?final?static?Logger?logger?=?LoggerFactory.getLogger(WebSocketServer.class); ????/** ?????*?當(dāng)前連接數(shù) ?????*/ ????private?static?AtomicInteger?count?=?new?AtomicInteger(0); ????/** ?????*?使用map對象,便于根據(jù)userId來獲取對應(yīng)的WebSocket,或者放redis里面 ?????*/ ????private?static?Map<String,?WebSocketServer>?websocketMap?=?new?ConcurrentHashMap<>(); ????/** ?????*?與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù) ?????*/ ????private?Session?session; ????/** ?????*?對應(yīng)的用戶ID ?????*/ ????private?String?userId?=?""; ????/** ?????*?連接建立成功調(diào)用的方法 ?????*/ ????@OnOpen ????public?void?onOpen(Session?session,?@PathParam("userId")?String?userId)?{ ????????try?{ ????????????this.session?=?session; ????????????this.userId?=?userId; ????????????websocketMap.put(userId,?this); ????????????//?數(shù)量+1 ????????????count.getAndIncrement(); ????????????logger.info("websocket?新連接:{}",?userId); ????????}?catch?(Exception?e)?{ ????????????logger.error("websocket?新建連接?IO異常"); ????????} ????} ????/** ?????*?連接關(guān)閉調(diào)用的方法 ?????*/ ????@OnClose ????public?void?onClose()?{ ????????//?刪除 ????????websocketMap.remove(this.userId); ????????//?數(shù)量-1 ????????count.getAndDecrement(); ????????logger.info("close?websocket?:?{}",?this.userId); ????} ????/** ?????*?收到客戶端消息后調(diào)用的方法 ?????* ?????*?@param?message?客戶端發(fā)送過來的消息 ?????*/ ????@OnMessage ????public?void?onMessage(String?message)?{ ????????logger.info("來自客戶端{}的消息:{}",?this.userId,?message); ????} ????@OnError ????public?void?onError(Throwable?error)?{ ????????logger.info("websocket?發(fā)生錯誤,移除當(dāng)前websocket:{},err:{}",?this.userId,?error.getMessage()); ????????websocketMap.remove(this.userId); ????????//?數(shù)量-1 ????????count.getAndDecrement(); ????} ????/** ?????*?發(fā)送消息?(異步發(fā)送) ?????* ?????*?@param?message?消息主題 ?????*/ ????private?void?sendMessage(String?message)?{ ????????this.session.getAsyncRemote().sendText(message); ????} ????/** ?????*?向指定用戶發(fā)送信息 ?????* ?????*?@param?userId?用戶id ?????*?@param?wsInfo?信息 ?????*/ ????public?static?void?sendInfo(String?userId,?String?wsInfo)?{ ????????if?(websocketMap.containsKey(userId))?{ ????????????websocketMap.get(userId).sendMessage(wsInfo); ????????} ????} ????/** ?????*?群發(fā)消息 ?????*/ ????public?static?void?batchSendInfo(String?wsInfo,?List<String>?ids)?{ ????????ids.forEach(userId?->?sendInfo(userId,?wsInfo)); ????} ????/** ?????*?群發(fā)所有人 ?????*/ ????public?static?void?batchSendInfo(String?wsInfo)?{ ????????websocketMap.forEach((k,?v)?->?v.sendMessage(wsInfo)); ????} ????/** ?????*?獲取當(dāng)前連接信息 ?????*/ ????public?static?List<String>?getIds()?{ ????????return?new?ArrayList<>(websocketMap.keySet()); ????} ????/** ?????*?獲取當(dāng)前連接數(shù)量 ?????*/ ????public?static?int?getUserCount()?{ ????????return?count.intValue(); ????} }
測試接口
less
復(fù)制代碼
@RestController @RequestMapping("/ws") public?class?WebSocketController?{ ????@GetMapping("/push/{message}") ????public?ResponseEntity<String>?push(@PathVariable(name?=?"message")?String?message)?{ ????????WebSocketServer.batchSendInfo(message); ????????return?ResponseEntity.ok("WebSocket?推送消息給所有人"); ????} }
html
在resources/static
下創(chuàng)建ws.html
,將WebSocket的地址設(shè)為服務(wù)類中@ServerEndpoint
注解所配置的地址
xml
復(fù)制代碼
<!DOCTYPE?html> <html?lang="en"> <head> ????<meta?charset="UTF-8"> ????<title>WebSocket</title> </head> <body> <div?id="message"></div> </body> <script> ????let?websocket?=?null; ????//?用時間戳模擬登錄用戶 ????const?username?=?new?Date().getTime(); ????//?alert(username) ????//判斷當(dāng)前瀏覽器是否支持WebSocket ????if?('WebSocket'?in?window)?{ ????????console.log("瀏覽器支持Websocket"); ????????websocket?=?new?WebSocket('ws://localhost:8080/websocket/'?+?username); ????}?else?{ ????????alert('當(dāng)前瀏覽器?不支持?websocket'); ????} ????//連接發(fā)生錯誤的回調(diào)方法 ????websocket.onerror?=?function?()?{ ????????setMessageInnerHTML("WebSocket連接發(fā)生錯誤"); ????}; ????//連接成功建立的回調(diào)方法 ????websocket.onopen?=?function?()?{ ????????setMessageInnerHTML("WebSocket連接成功"); ????}; ????//接收到消息的回調(diào)方法 ????websocket.onmessage?=?function?(event)?{ ????????setMessageInnerHTML(event.data); ????}; ????//連接關(guān)閉的回調(diào)方法 ????websocket.onclose?=?function?()?{ ????????setMessageInnerHTML("WebSocket連接關(guān)閉"); ????}; ????//監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。 ????window.onbeforeunload?=?function?()?{ ????????closeWebSocket(); ????}; ????//關(guān)閉WebSocket連接 ????function?closeWebSocket()?{ ????????websocket.close(); ????} ????//將消息顯示在網(wǎng)頁上 ????function?setMessageInnerHTML(innerHTML)?{ ????????document.getElementById('message').innerHTML?+=?innerHTML?+?'<br/>'; ????} </script> </html>
測試文章來源:http://www.zghlxwxcb.cn/news/detail-819507.html
啟動項目,訪問http://localhost:8080/ws.html
,開啟連接。調(diào)用消息推送接口http://localhost:8080/ws/push/hello
,查看網(wǎng)頁顯示信息。
SseEmitter
SseEmitter是SpringMVC(4.2+)提供的一種技術(shù),它是基于Http協(xié)議的,相比WebSocket,它更輕量,但是它只能從服務(wù)端向客戶端單向發(fā)送信息。在SpringBoot中我們無需引用其他jar就可以使用。
創(chuàng)建服務(wù)類
- 創(chuàng)建
AtomicInteger
用于記錄連接數(shù) - 創(chuàng)建
ConcurrentHashMap
用于存放連接信息 - 建立連接:創(chuàng)建并返回一個帶有超時時間的
SseEmitter
給前端。超時間設(shè)為0表示永不過期 - 設(shè)置連接結(jié)束的回調(diào)方法
completionCallBack
- 設(shè)置連接超時的回調(diào)方法
timeoutCallBack
- 設(shè)置連接異常的回調(diào)方法
errorCallBack
- 創(chuàng)建推送信息的方法
SseEmitter.send()
- 創(chuàng)建移除連接的方法
scss
復(fù)制代碼
public?class?SseEmitterServer?{ ????private?static?final?Logger?logger?=?LoggerFactory.getLogger(SseEmitterServer.class); ????/** ?????*?當(dāng)前連接數(shù) ?????*/ ????private?static?AtomicInteger?count?=?new?AtomicInteger(0); ????/** ?????*?使用map對象,便于根據(jù)userId來獲取對應(yīng)的SseEmitter,或者放redis里面 ?????*/ ????private?static?Map<String,?SseEmitter>?sseEmitterMap?=?new?ConcurrentHashMap<>(); ????/** ?????*?創(chuàng)建用戶連接并返回?SseEmitter ?????* ?????*?@param?userId?用戶ID ?????*?@return?SseEmitter ?????*/ ????public?static?SseEmitter?connect(String?userId)?{ ????????//?設(shè)置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutException ????????SseEmitter?sseEmitter?=?new?SseEmitter(0L); ????????//?注冊回調(diào) ????????sseEmitter.onCompletion(completionCallBack(userId)); ????????sseEmitter.onError(errorCallBack(userId)); ????????sseEmitter.onTimeout(timeoutCallBack(userId)); ????????sseEmitterMap.put(userId,?sseEmitter); ????????//?數(shù)量+1 ????????count.getAndIncrement(); ????????logger.info("創(chuàng)建新的sse連接,當(dāng)前用戶:{}",?userId); ????????return?sseEmitter; ????} ????/** ?????*?給指定用戶發(fā)送信息 ?????*/ ????public?static?void?sendMessage(String?userId,?String?message)?{ ????????if?(sseEmitterMap.containsKey(userId))?{ ????????????try?{ ????????????????//?sseEmitterMap.get(userId).send(message,?MediaType.APPLICATION_JSON); ????????????????sseEmitterMap.get(userId).send(message); ????????????}?catch?(IOException?e)?{ ????????????????logger.error("用戶[{}]推送異常:{}",?userId,?e.getMessage()); ????????????????removeUser(userId); ????????????} ????????} ????} ????/** ?????*?群發(fā)消息 ?????*/ ????public?static?void?batchSendMessage(String?wsInfo,?List<String>?ids)?{ ????????ids.forEach(userId?->?sendMessage(wsInfo,?userId)); ????} ????/** ?????*?群發(fā)所有人 ?????*/ ????public?static?void?batchSendMessage(String?wsInfo)?{ ????????sseEmitterMap.forEach((k,?v)?->?{ ????????????try?{ ????????????????v.send(wsInfo,?MediaType.APPLICATION_JSON); ????????????}?catch?(IOException?e)?{ ????????????????logger.error("用戶[{}]推送異常:{}",?k,?e.getMessage()); ????????????????removeUser(k); ????????????} ????????}); ????} ????/** ?????*?移除用戶連接 ?????*/ ????public?static?void?removeUser(String?userId)?{ ????????sseEmitterMap.remove(userId); ????????//?數(shù)量-1 ????????count.getAndDecrement(); ????????logger.info("移除用戶:{}",?userId); ????} ????/** ?????*?獲取當(dāng)前連接信息 ?????*/ ????public?static?List<String>?getIds()?{ ????????return?new?ArrayList<>(sseEmitterMap.keySet()); ????} ????/** ?????*?獲取當(dāng)前連接數(shù)量 ?????*/ ????public?static?int?getUserCount()?{ ????????return?count.intValue(); ????} ????private?static?Runnable?completionCallBack(String?userId)?{ ????????return?()?->?{ ????????????logger.info("結(jié)束連接:{}",?userId); ????????????removeUser(userId); ????????}; ????} ????private?static?Runnable?timeoutCallBack(String?userId)?{ ????????return?()?->?{ ????????????logger.info("連接超時:{}",?userId); ????????????removeUser(userId); ????????}; ????} ????private?static?Consumer<Throwable>?errorCallBack(String?userId)?{ ????????return?throwable?->?{ ????????????logger.info("連接異常:{}",?userId); ????????????removeUser(userId); ????????}; ????} }
測試接口
less
復(fù)制代碼
@RestController @RequestMapping("/sse") public?class?SseEmitterController?{ ????/** ?????*?用于創(chuàng)建連接 ?????*/ ????@GetMapping("/connect/{userId}") ????public?SseEmitter?connect(@PathVariable?String?userId)?{ ????????return?SseEmitterServer.connect(userId); ????} ????@GetMapping("/push/{message}") ????public?ResponseEntity<String>?push(@PathVariable(name?=?"message")?String?message)?{ ????????SseEmitterServer.batchSendMessage(message); ????????return?ResponseEntity.ok("WebSocket?推送消息給所有人"); ????} }
html
在resources/static
下創(chuàng)建ws.html
,將EventSource的地址設(shè)為創(chuàng)建連接的地址
xml
復(fù)制代碼
<!DOCTYPE?html> <html?lang="en"> <head> ????<meta?charset="UTF-8"> ????<title>SseEmitter</title> </head> <body> <button?onclick="closeSse()">關(guān)閉連接</button> <div?id="message"></div> </body> <script> ????let?source?=?null; ????//?用時間戳模擬登錄用戶 ????const?userId?=?new?Date().getTime(); ????if?(!!window.EventSource)?{ ????????//?建立連接 ????????source?=?new?EventSource('http://localhost:8080/sse/connect/'?+?userId); ????????/** ?????????*?連接一旦建立,就會觸發(fā)open事件 ?????????*?另一種寫法:source.onopen?=?function?(event)?{} ?????????*/ ????????source.addEventListener('open',?function?(e)?{ ????????????setMessageInnerHTML("建立連接。。。"); ????????},?false); ????????/** ?????????*?客戶端收到服務(wù)器發(fā)來的數(shù)據(jù) ?????????*?另一種寫法:source.onmessage?=?function?(event)?{} ?????????*/ ????????source.addEventListener('message',?function?(e)?{ ????????????setMessageInnerHTML(e.data); ????????}); ????????/** ?????????*?如果發(fā)生通信錯誤(比如連接中斷),就會觸發(fā)error事件 ?????????*?或者: ?????????*?另一種寫法:source.onerror?=?function?(event)?{} ?????????*/ ????????source.addEventListener('error',?function?(e)?{ ????????????if?(e.readyState?===?EventSource.CLOSED)?{ ????????????????setMessageInnerHTML("連接關(guān)閉"); ????????????}?else?{ ????????????????console.log(e); ????????????} ????????},?false); ????}?else?{ ????????setMessageInnerHTML("你的瀏覽器不支持SSE"); ????} ????//?監(jiān)聽窗口關(guān)閉事件,主動去關(guān)閉sse連接,如果服務(wù)端設(shè)置永不過期,瀏覽器關(guān)閉后手動清理服務(wù)端數(shù)據(jù) ????window.onbeforeunload?=?function?()?{ ????????closeSse(); ????}; ????//?關(guān)閉Sse連接 ????function?closeSse()?{ ????????source.close(); ????????const?httpRequest?=?new?XMLHttpRequest(); ????????httpRequest.open('GET',?'http://localhost:8080/sse/close/'?+?userId,?true); ????????httpRequest.send(); ????????console.log("close"); ????} ????//?將消息顯示在網(wǎng)頁上 ????function?setMessageInnerHTML(innerHTML)?{ ????????document.getElementById('message').innerHTML?+=?innerHTML?+?'<br/>'; ????} </script> </html>
測試
啟動項目,訪問網(wǎng)頁http://localhost:8080/sse.html
建立連接。調(diào)用發(fā)送信息接口http://localhost:8080/sse/push/hello
,查看網(wǎng)頁顯示信息。文章來源地址http://www.zghlxwxcb.cn/news/detail-819507.html
到了這里,關(guān)于javaWEB消息推送之 WebSocket和SseEmitter的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!