WebSocket 實(shí)現(xiàn)長(zhǎng)連接及通過WebSocket獲取客戶端IP
WebSocket 是一種支持雙向通訊的網(wǎng)絡(luò)通信協(xié)議。
實(shí)現(xiàn)過程:
1 添加ServerEndpointExporter配置bean
@Configuration
public class WebSocketConfig {
// 自動(dòng)注冊(cè)使用了@ServerEndpoint**注解聲明的Websocket endpoint
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2 實(shí)現(xiàn)過程
需求是通過WebSocket,建立長(zhǎng)連接,并獲取當(dāng)前在線的人數(shù)。通過Websocket 不斷發(fā)送消息,建立長(zhǎng)連接,給Session續(xù)命。我是通過MAC地址,區(qū)分不同的設(shè)備,因?yàn)槲业男枨笾行枰粋€(gè)賬號(hào)能夠登錄多臺(tái)機(jī)器。所以我通過MAC地址用于標(biāo)識(shí)不同的設(shè)備信息。(若是一個(gè)賬號(hào)只能登陸一次,采用用戶ID)
1 . 添加配置
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}")
2. 主要方法
-
@OnOpen
- 首次建立連接時(shí),運(yùn)行該注解下的方法。
- 在此方法中可以獲取websocket的session
- 并將該用戶的Mac 以及 Session存放到
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException { log.info("【Ame websocket 鏈接成功】,Ame mac:"+ mac); session.setMaxIdleTimeout(sessionTimeout); // 獲取客戶端的Ip if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){ log.error("并未上傳設(shè)備信息"); } setMap(session,mac); } private void setMap(Session session,String mac){ sessionMap.put(mac,session); log.warn("Ame MAC address:{},當(dāng)前在線人數(shù)為:{}",mac,sessionMap.size()); }
-
@OnMessage
-
該方法是客戶端與服務(wù)端進(jìn)行通訊。
-
每次客戶端與服務(wù)端建立通訊時(shí),會(huì)給Session續(xù)命,延長(zhǎng)Session的時(shí)常
@OnMessage public void onMessage(Session session, String msg) { session.setMaxIdleTimeout(sessionTimeout); if(StringUtils.isBlank(msg)){ return; } // 判斷 MAC 地址 是否 是正在上線 String mac = getMACBySession(session); if(StringUtils.isBlank(mac)){ return; } // 將上傳的msg轉(zhuǎn)化為 AmeServicePack handleAmeMsg(mac,ame); } private String getMACBySession(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return null; } return mac; } private String getUserIdBySession(Session session){ for (String mac : sessionMap.keySet()) { /*session 本身是有一個(gè)id的,通過userid 找到Session 然后再通過 其對(duì)應(yīng) id,與 傳入的Session 中的 session對(duì)比 */ if(sessionMap.get(mac).getId().equals(session.getId())){ return mac; } } return null; }
-
-
@OnClose
@OnClose public void onClose(Session session,@PathParam(value = "Mac") String mac) { removeMap(session); log.info("【websocket退出成功】該設(shè)備退出:"+mac); }
private void removeMap(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return; } sessionMap.remove(mac); //userMap.remove(userId); removeAme(mac); } private void removeAme(String mac){ ameHashMap.remove(mac); sendInfo("Ame:"+ameInfo.getAmeip()+"下線成功",mac); }
-
@OnError
@OnError public void onError(Session session,Throwable throwable) { log.error("websocket: 發(fā)生了錯(cuò)誤"); removeMap(session); throwable.printStackTrace(); }
-
向某一個(gè)用戶發(fā)送消息文章來源:http://www.zghlxwxcb.cn/news/detail-700094.html
/* 發(fā)送自定義消息 向某一個(gè)用戶發(fā)送,消息*/ public static void sendInfo(String message,String toMac){ log.info("發(fā)送消息:{},內(nèi)容是:{}",message,toMac); if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){ log.error("消息不完整"); return; } // 包含就發(fā)送 //System.out.println(sessionMap.containsKey(toUserId)); if(sessionMap.containsKey(toMac)){ try { sendMessage(sessionMap.get(toMac),message); }catch (Exception e){ log.error("發(fā)送給用戶{}的消息出錯(cuò)",toMac); } } // 用戶不在線 else { log.error("設(shè)備{}不在線",toMac); } } public static void sendMessage(Session session,String message) throws IOException { session.getBasicRemote().sendText(message); }
3 通過WebSocket獲取 請(qǐng)求Ip地址
Websocket 中的request中并沒有header 中并沒有客戶端的Ip地址,但是在SpringCloud中,是通過網(wǎng)關(guān),路由轉(zhuǎn)發(fā)。在網(wǎng)關(guān)中的請(qǐng)求的request中存在Ip地址,可以通過攔截器,獲取網(wǎng)關(guān)的ip然后將request放到websocket的request中。文章來源地址http://www.zghlxwxcb.cn/news/detail-700094.html
3.1 攔截器
package com.mam.gateway.filter;
import jdk.nashorn.internal.runtime.regexp.joni.Config;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* 獲取 WebSocket 上傳Session的 Ip信息
*/
@Component
public class SessionFilter extends AbstractGatewayFilterFactory<SessionFilter.Config> {
public SessionFilter()
{
super(SessionFilter.Config.class);
}
@Override
public String name()
{
return "SessionFilter";
}
@Override
public GatewayFilter apply(SessionFilter.Config config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(exchange.getRequest()).build();
return chain.filter(mutatedServerWebExchange);
}
};
}
static class Config
{
private Integer order;
public Integer getOrder()
{
return order;
}
public void setOrder(Integer order)
{
this.order = order;
}
}
}
3.2 ServerEndpointConfig 的配置
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator{
private static final Logger log = LoggerFactory.getLogger(WebSocketConfigurator.class);
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
Map<String, Object> attributes = sec.getUserProperties();
try{
String clientIp = IpUtils.getIpAddrByHandshakeRequest(request.getHeaders());
attributes.put("clientIp",clientIp);
log.info("websocker攔截器X-Real_IP{}header{}",request.getHeaders().get("X-Real_IP"),request.getHeaders().toString());
}catch (Exception e){
e.printStackTrace();
}
super.modifyHandshake(sec,request,response);
}
}
public static String getIpAddrByHandshakeRequest(Map<String, List<String>> map)
{
if (map == null)
{
return null;
}
String ip = null;
// X-Forwarded-For:Squid 服務(wù)代理
String ipAddresses = Convert.toStr(map.get("X-Forwarded-For"));
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// Proxy-Client-IP:apache 服務(wù)代理
ipAddresses = Convert.toStr(map.get("Proxy-Client-IP"));
}else {
ipAddresses = ipAddresses.substring(1,ipAddresses.length()-1);
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// WL-Proxy-Client-IP:weblogic 服務(wù)代理
ipAddresses = Convert.toStr(map.get("WL-Proxy-Client-IP"));
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// HTTP_CLIENT_IP:有些代理服務(wù)器
ipAddresses = Convert.toStr(map.get("HTTP_CLIENT_IP"));
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// X-Real-IP:nginx服務(wù)代理
ipAddresses = Convert.toStr(map.get("X-Real-IP"));
}
// 有些網(wǎng)絡(luò)通過多層代理,那么獲取到的ip就會(huì)有多個(gè),一般都是通過逗號(hào)(,)分割開來,并且第一個(gè)ip為客戶端的真實(shí)IP
if (ipAddresses != null && ipAddresses.length() != 0)
{
ip = ipAddresses.split(",")[0];
}
return ip.equals("0:0:0:0:0:0:0:1") ? "127.0.0.1" : ip;
}
3.3 WebSocket 獲取Ip
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");
4 完整代碼
@Component
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}",configurator = WebSocketConfigurator.class)
public class AmeLoginWebSocket {
static Logger log = LoggerFactory.getLogger(AmeLoginWebSocket.class);
/* 存儲(chǔ)session */
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
/* 存儲(chǔ) 在線 ame服務(wù) 信息*/
private ConcurrentHashMap<String,AmeServicePack> ameHashMap = new ConcurrentHashMap<>();
/* 存儲(chǔ) Ip */
private static final long sessionTimeout = 600000;
/** 鏈接成功后發(fā)送消息**/
@OnMessage
public void onMessage(Session session, String msg) {
session.setMaxIdleTimeout(sessionTimeout);
log.info("【websocket 接收成功】?jī)?nèi)容為"+msg);
if(StringUtils.isBlank(msg)){
return;
}
// 判斷 MAC 地址 是否 是正在上線
String mac = getMACBySession(session);
if(StringUtils.isBlank(mac)){
return;
}
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");
// 將上傳的msg轉(zhuǎn)化為 AmeServicePack
handleAmeMsg(mac,ame);
}
private void handleAmeMsg(String mac, AmeServicePack ameInfo) {
log.info("Ame:MAC{}Ip{}:",mac,ameInfo.getAmeip());
if(ameHashMap.containsKey(mac)){
log.info("該設(shè)備{}Ip{}已上線",mac,ameInfo.getAmeip());
sendInfo("該用戶Ip"+ameInfo.getAmeip()+"已存在",mac);
}else {
ameHashMap.put(mac,ameInfo);
}
}
private boolean updateOnline(AmeServicePack ameInfo){
AjaxResult isonline = SpringUtils.getBean(IAmePackService.class).update(ameInfo,SecurityConstants.INNER);
if((Integer)isonline.get("code")== 200){
return true;
}else {
return false;
}
}
/**
* ameIp 為上線 但 任務(wù)表中仍有 正在運(yùn)行的任務(wù),并將其修改為 -2
* @param ameIp
*/
private void updateErrorTaskStatus(String ameIp){
SpringUtils.getBean(IAmePackService.class).updateErrorAMEStatus(ameIp,SecurityConstants.INNER);
}
private void handlePCMsg(LoginUser loginUser, String msg) {
log.info("系統(tǒng)用戶:{},消息{}:",loginUser.getUsername(),msg);
}
private String getMACBySession(Session session){
String mac = getUserIdBySession(session);
if(ObjectUtil.isNull(mac)){
return null;
}
return mac;
}
/*
*成功建立連接后調(diào)用
* @param [session, username]
* @return void
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {
log.info("【Ame websocket 鏈接成功】,Ame mac:"+ mac);
session.setMaxIdleTimeout(sessionTimeout);
// 獲取客戶端的Ip
if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){
log.error("并未上傳設(shè)備信息");
}
setMap(session,mac);
}
private void setMap(Session session,String mac){
sessionMap.put(mac,session);
log.warn("Ame MAC address:{},當(dāng)前在線人數(shù)為:{}",mac,sessionMap.size());
}
/*
*關(guān)閉連接時(shí)調(diào)用
* @param [userId]
* @return void
*/
@OnClose
public void onClose(Session session,@PathParam(value = "Mac") String mac) {
removeMap(session);
log.info("【websocket退出成功】該設(shè)備退出:"+mac);
}
private void removeMap(Session session){
String mac = getUserIdBySession(session);
if(ObjectUtil.isNull(mac)){
return;
}
sessionMap.remove(mac);
//userMap.remove(userId);
removeAme(mac);
}
private String getUserIdBySession(Session session){
for (String mac : sessionMap.keySet()) {
/*session 本身是有一個(gè)id的,通過userid 找到Session 然后再通過 其對(duì)應(yīng) id,與 傳入的Session 中的 session對(duì)比 */
if(sessionMap.get(mac).getId().equals(session.getId())){
return mac;
}
}
return null;
}
private void removeAme(String mac){
ameHashMap.remove(mac);
log.info("{}:下線成功",ameInfo.getAmeip());
sendInfo("Ame:"+ameInfo.getAmeip()+"下線成功",mac);
}
/*
*發(fā)生錯(cuò)誤時(shí)調(diào)用
* @param [session, throwable]
* @return void
*/
@OnError
public void onError(Session session,Throwable throwable) {
log.error("websocket: 發(fā)生了錯(cuò)誤");
removeMap(session);
throwable.printStackTrace();
}
/*
發(fā)送自定義消息
向某一個(gè)用戶發(fā)送,消息
*/
public static void sendInfo(String message,String toMac){
log.info("發(fā)送消息:{},內(nèi)容是:{}",message,toMac);
if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){
log.error("消息不完整");
return;
}
// 包含就發(fā)送
//System.out.println(sessionMap.containsKey(toUserId));
if(sessionMap.containsKey(toMac)){
try {
sendMessage(sessionMap.get(toMac),message);
}catch (Exception e){
log.error("發(fā)送給用戶{}的消息出錯(cuò)",toMac);
}
}
// 用戶不在線
else {
log.error("設(shè)備{}不在線",toMac);
}
}
public static void sendMessage(Session session,String message) throws IOException {
session.getBasicRemote().sendText(message);
}
}
到了這里,關(guān)于WebSocket 實(shí)現(xiàn)長(zhǎng)連接及通過WebSocket獲取客戶端IP的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!