常見的都是springboot應(yīng)用做服務(wù),前端頁(yè)面做客戶端,進(jìn)行websocket通信進(jìn)行數(shù)據(jù)傳輸交互。但其實(shí)springboot服務(wù)也能做客戶端去連接別的webSocket服務(wù)提供者。
剛好最近在項(xiàng)目中就使用到了,需求背景大概就是我們作為一個(gè)java段應(yīng)用需要和一個(gè)C語(yǔ)言應(yīng)用進(jìn)行通信。在項(xiàng)目需求及環(huán)境等多方面的考量之下,最后放了使用http協(xié)議和C程序進(jìn)行通信轉(zhuǎn)而使用webSocket,然后在C側(cè)開發(fā)人員的要求下,由他們做服務(wù)端,我們做客戶端。
引入pom依賴
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
配置類
ws連接配置
cancan:
websocket:
client:
config:
- wsUrl: ws://127.0.0.1:8080/websocket/${cancan.websocket.client.config[0].wsName}
wsName: ws-01
enableHeartbeat: true
heartbeatInterval: 20000
enableReconnection: true
# - wsUrl: ws://localhost:8083
# wsName: ws-02
# enableHeartbeat: true
# heartbeatInterval: 20000
# enableReconnection: true
server:
port: 8099
WebsocketClientConfiguration讀取配置文件配置
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:11
* @Description:
*/
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {
private List<ServerProperties> config;
public static class ServerProperties {
/**
* websocket server ws://ip:port
*/
private String wsUrl;
/**
* websocket server name,用于區(qū)分不同的服務(wù)端
*/
private String wsName;
/**
* 是否啟用心跳監(jiān)測(cè) 默認(rèn)開啟
*/
private Boolean enableHeartbeat;
/**
* 心跳監(jiān)測(cè)間隔 默認(rèn)20000毫秒
*/
private Integer heartbeatInterval;
/**
* 是否啟用重連接 默認(rèn)啟用
*/
private Boolean enableReconnection;
public String getWsUrl() {
return wsUrl;
}
public void setWsUrl(String wsUrl) {
this.wsUrl = wsUrl;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Boolean getEnableReconnection() {
return enableReconnection;
}
public void setEnableReconnection(Boolean enableReconnection) {
this.enableReconnection = enableReconnection;
}
public String getWsName() {
return wsName;
}
public void setWsName(String wsName) {
this.wsName = wsName;
}
}
public List<ServerProperties> getConfig() {
return config;
}
public void setConfig(List<ServerProperties> config) {
this.config = config;
}
}
WebsocketMultipleBeanConfig連接服務(wù)端并保存到bean中
import com.example.demo.service.WebsocketRunClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:14
* @Description: Websocket多客戶端配置
*/
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {
@Bean
public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){
Map<String, WebsocketRunClient> retMap = new HashMap<>(5);
List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();
for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {
String wsUrl = serverProperties.getWsUrl();
String wsName = serverProperties.getWsName();
Boolean enableReconnection = serverProperties.getEnableReconnection();
Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
Integer heartbeatInterval = serverProperties.getHeartbeatInterval();
try {
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
new Thread(()->{
while (true){
try {
Thread.sleep(heartbeatInterval);
if(enableHeartbeat){
websocketRunClient.send("[websocket "+wsName+"] 心跳檢測(cè)");
log.info("[websocket {}] 心跳檢測(cè)",wsName);
}
} catch (Exception e) {
log.error("[websocket {}] 發(fā)生異常{}",wsName,e.getMessage());
try {
if(enableReconnection){
log.info("[websocket {}] 重新連接",wsName);
websocketRunClient.reconnect();
websocketRunClient.setConnectionLostTimeout(0);
}
}catch (Exception ex){
log.error("[websocket {}] 重連異常,{}",wsName,ex.getMessage());
}
}
}
}).start();
retMap.put(wsName,websocketRunClient);
} catch (URISyntaxException ex) {
log.error("[websocket {}] 連接異常,{}",wsName,ex.getMessage());
}
}
return retMap;
}
}
客戶端
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.nio.ByteBuffer;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:13
* @Description:
*/
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
/**
* websocket連接名稱
*/
private String wsName;
public WebsocketRunClient(URI serverUri,String wsName) {
super(serverUri);
this.wsName = wsName;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("[websocket {}] Websocket客戶端連接成功",wsName);
}
@Override
public void onMessage(String msg) {
log.info("[websocket {}] 收到String消息:{}",wsName,msg);
}
@Override
public void onMessage(ByteBuffer bytes) {
log.info("[websocket {}] 收到ByteBuffer消息:{}",wsName);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket {}] Websocket客戶端關(guān)閉",wsName);
System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
}
@Override
public void onError(Exception e) {
log.info("[websocket {}] Websocket客戶端出現(xiàn)異常, 異常原因?yàn)椋簕}",wsName,e.getMessage());
}
}
發(fā)送消息
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 11:55
* @Description:
*/
@RestController
public class ctrl {
@Value("${cancan.websocket.client.config[0].wsName}")
private String ws1Name;
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
@RequestMapping("/send")
public String send(String text){
System.out.println(ws1Name);
WebsocketRunClient websocketRunClient = websocketRunClientMap.get(ws1Name);
websocketRunClient.send(text);
return "發(fā)送成功";
}
@RequestMapping("/close")
public String close(){
WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
websocketRunClient.close();
return "關(guān)閉成功";
}
}
改造
上面是一個(gè)客戶端的例子,但是這次的需求有些特殊。ws的連接需要在數(shù)據(jù)庫(kù)表中讀取,而且這個(gè)表的數(shù)據(jù)用戶在系統(tǒng)頁(yè)面上可以隨時(shí)進(jìn)行增刪改查的操作。這個(gè)時(shí)候上面讀配置文件,在項(xiàng)目啟動(dòng)時(shí)就發(fā)起ws連接,然后保存連接的方法就行不通了。
根據(jù)需求改造之后的設(shè)計(jì)是這樣的,ws連接依舊保存再config注冊(cè)bean的map中。但是注冊(cè)bean哪里我們只是將這個(gè)map初始化,并不初始化ws連接。ws連接使用定時(shí)任務(wù)初始,定時(shí)讀取表中的數(shù)據(jù),如果表中的ws連接再map中沒有,則初始化后放入map,如果存在表中沒有而map中有的ws連接,則移除map中的這條ws連接。
WebsocketMultipleBeanConfig配置類
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/18 9:20
* @Description: websocket配置,用于開啟websocket支持
*/
@Configuration
@Slf4j
public class WebsocketMultipleBeanConfig {
@Bean
public Map<String, WebsocketRunClient> websocketRunClientMap(){
Map<String, WebsocketRunClient> retMap = new HashMap<>(8);
return retMap;
}
}
ws初始化定時(shí)任務(wù)
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/18 11:26
* @Description: 讀取配置,保持ws連接
*/
@Component
@Slf4j
public class KeepConnectTask {
@Value("${signaling.tracking.webSocket.uri:/websocket/}")
private String uri;
@Autowired
private ServerConfigService serverConfigService;
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
// @Scheduled(cron = "0/20 * * * * ? ")
public void execute() throws Exception {
InetAddress adds = InetAddress.getLocalHost();
String hostIp = adds.getHostAddress();
//查詢服務(wù)器配置
//這里就是查詢表的方法 測(cè)試方便先用固定數(shù)據(jù)代替了
// List<ServerConfigModel> serverConfigs = serverConfigService.selectAll();
List<ServerConfigModel> serverConfigs = new ArrayList<>();
ServerConfigModel serverConfigModel = new ServerConfigModel();
serverConfigModel.setServerIp("127.0.0.1");
serverConfigModel.setServerPort("8080");
serverConfigs.add(serverConfigModel);
Map<String,String> configs = new HashMap<>();
for(ServerConfigModel serverConfig:serverConfigs){
String key = "ws://" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
String wsName = "ws-" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
String value = key + uri + wsName;
configs.put(wsName,value);
}
//刪除、關(guān)閉已不在配置中的連接
//注意使用keySet邊遍歷邊刪除會(huì)報(bào)錯(cuò),建議使用迭代器遍歷刪除
Iterator<Map.Entry<String, WebsocketRunClient>> it = websocketRunClientMap.entrySet().iterator();
while(it.hasNext()){
Map.Entry<String, WebsocketRunClient> enter = it.next();
if(!configs.containsKey(enter.getKey())){
enter.getValue().close();
it.remove();
}
}
//已有連接心跳發(fā)送
for(String key:websocketRunClientMap.keySet()){
WebsocketRunClient client = websocketRunClientMap.get(key);
try{
client.send("websocket[" + hostIp + "]心跳檢測(cè)");
log.info("websocket[" + hostIp + "]心跳檢測(cè)");
} catch (Exception e){
log.error("websocket[{}] 發(fā)生異常{}",hostIp,e.getLocalizedMessage());
e.printStackTrace();
try {
client.reconnect();
client.setConnectionLostTimeout(0);
} catch (Exception ex){
log.error("websocket[{}] 重連異常,{}",hostIp,ex.getMessage());
}
}
}
//新配置增加連接
for(String key:configs.keySet()){
if(!websocketRunClientMap.containsKey(key)){
String url = configs.get(key);
try{
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(url),key);
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
websocketRunClientMap.put(key,websocketRunClient);
} catch (Exception e){
log.error("websocket[{}] 新增連接異常,{}",key,e.getMessage());
}
}
}
}
}
發(fā)送消息
發(fā)送消息仍然和之前一樣先注入文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-512516.html
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
然后map.get拿到對(duì)應(yīng)想發(fā)送消息的WebsocketRunClient,調(diào)用其中的send方法即可文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-512516.html
到了這里,關(guān)于SpringBoot WebSocket做客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!