目錄
?
前言
技術(shù)棧
功能展示
一、springboot項(xiàng)目添加netty依賴
二、netty服務(wù)端
三、netty客戶端
四、測試
五、代碼倉庫地址
??專屬小彩蛋:前些天發(fā)現(xiàn)了一個(gè)巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點(diǎn)擊跳轉(zhuǎn)到網(wǎng)站(前言 - 床長人工智能教程)?
前言
? ? ? ? 最近做了一個(gè)硬件設(shè)備通信項(xiàng)目,需求是這樣,前端使用webSocket向后端進(jìn)行tcp協(xié)議的通信,后端netty服務(wù)端收到數(shù)據(jù)后,將數(shù)據(jù)發(fā)往socket客戶端,客戶端收到數(shù)據(jù)之后需要進(jìn)行響應(yīng)數(shù)據(jù)顯示到前端頁面供用戶進(jìn)行實(shí)時(shí)監(jiān)控。
技術(shù)棧
????????后端
- springboot?
- netty
????????前端
- 前端websocket
功能展示
前端頁面輸入webSocket地址,點(diǎn)擊連接,輸入待發(fā)送的數(shù)據(jù),點(diǎn)擊發(fā)送
?后端我們可以使用網(wǎng)絡(luò)測試工具NetAssist 進(jìn)行響應(yīng)測試
?在工具中連接netty服務(wù)端,并點(diǎn)擊發(fā)送按鈕,可以看到,前端頁面右側(cè)對(duì)話框成功顯示出了NetAssist測試工具響應(yīng)的數(shù)據(jù)內(nèi)容。接下來我們來看一看代碼如何進(jìn)行實(shí)現(xiàn),關(guān)鍵的點(diǎn)在于需要同時(shí)支持前端websocket和后端socket的連接,需要自定義一個(gè)協(xié)議選擇處理器。
一、springboot項(xiàng)目添加netty依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example.dzx.netty</groupId>
<artifactId>qiyan-project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>qiyan-project</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.6.7</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
二、netty服務(wù)端
?(1)netty服務(wù)啟動(dòng)類
package com.example.dzx.netty.qiyanproject.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: netty服務(wù)啟動(dòng)類
* @date 2023年06月30日 21:27:16
*/
@Slf4j
@Component
public class NettyServer {
public void start(InetSocketAddress address) {
//配置服務(wù)端的NIO線程組
/*
* 在Netty中,事件循環(huán)組是一組線程池,用于處理網(wǎng)絡(luò)事件,例如接收客戶端連接、讀寫數(shù)據(jù)等操作。
* 它由兩個(gè)部分組成:bossGroup和workerGroup。
* bossGroup 是負(fù)責(zé)接收客戶端連接請(qǐng)求的線程池。
* workerGroup 是負(fù)責(zé)處理客戶端連接的線程池。
* */
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創(chuàng)建ServerBootstrap實(shí)例,boss組用于接收客戶端連接請(qǐng)求,worker組用于處理客戶端連接。
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 綁定線程池
.channel(NioServerSocketChannel.class)//通過TCP/IP方式進(jìn)行傳輸
.childOption(ChannelOption.SO_REUSEADDR, true) //快速復(fù)用端口
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.localAddress(address)//監(jiān)聽服務(wù)器地址
.childHandler(new NettyServerChannelInitializer())
// .childHandler(new com.ccp.dev.system.netty.NettyServerChannelInitializer())
.childOption(ChannelOption.TCP_NODELAY, true)//子處理器處理客戶端連接的請(qǐng)求和數(shù)據(jù)
.option(ChannelOption.SO_BACKLOG, 1024) //服務(wù)端接受連接的隊(duì)列長度,如果隊(duì)列已滿,客戶端連接將被拒絕
.childOption(ChannelOption.SO_KEEPALIVE, true); //保持長連接,2小時(shí)無數(shù)據(jù)激活心跳機(jī)制
// 綁定端口,開始接收進(jìn)來的連接
ChannelFuture future = bootstrap.bind(address).sync();
future.addListener(l -> {
if (future.isSuccess()) {
System.out.println("Netty服務(wù)啟動(dòng)成功");
} else {
System.out.println("Netty服務(wù)啟動(dòng)失敗");
}
});
log.info("Netty服務(wù)開始監(jiān)聽端口: " + address.getPort());
//關(guān)閉channel和塊,直到它被關(guān)閉
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("啟動(dòng)Netty服務(wù)器時(shí)出錯(cuò)", e);
} finally {
//釋放資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
?(2)服務(wù)端初始化類編寫,客戶端與服務(wù)器端連接一旦創(chuàng)建,這個(gè)類中方法就會(huì)被回調(diào),設(shè)置出站編碼器和入站解碼器以及各項(xiàng)處理器
package com.example.dzx.netty.qiyanproject.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: 服務(wù)端初始化,客戶端與服務(wù)器端連接一旦創(chuàng)建,這個(gè)類中方法就會(huì)被回調(diào),設(shè)置出站編碼器和入站解碼器以及各項(xiàng)處理器
* @date 2023年06月30日 21:27:16
*/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
// private FullHttpResponse createCorsResponseHeaders() {
// FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//
// // 設(shè)置允許跨域訪問的響應(yīng)頭
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, Authorization");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_MAX_AGE, "3600");
//
// return response;
// }
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("active", new ChannelActiveHandler());
//Socket 連接心跳檢測
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
pipeline.addLast("socketChoose", new SocketChooseHandler());
pipeline.addLast("commonhandler",new NettyServerHandler());
}
}
(3) 編寫新建連接處理器
package com.example.dzx.netty.qiyanproject.server;
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: 客戶端新建連接處理器
* @date 2023年06月30日 21:27:16
*/
@ChannelHandler.Sharable
@Slf4j
public class ChannelActiveHandler extends ChannelInboundHandlerAdapter {
/**
* 有客戶端連接服務(wù)器會(huì)觸發(fā)此函數(shù)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
//獲取客戶端連接的遠(yuǎn)程地址
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
//獲取客戶端的IP地址
String clientIp = insocket.getAddress().getHostAddress();
//獲取客戶端的端口號(hào)
int clientPort = insocket.getPort();
//獲取連接通道唯一標(biāo)識(shí)
ChannelId channelId = ctx.channel().id();
//如果map中不包含此連接,就保存連接
if (General.CHANNEL_MAP.containsKey(channelId)) {
log.info("Socket------客戶端【" + channelId + "】是連接狀態(tài),連接通道數(shù)量: " + General.CHANNEL_MAP.size());
} else {
//保存連接
General.CHANNEL_MAP.put(channelId, ctx);
log.info("Socket------客戶端【" + channelId + "】連接netty服務(wù)器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("Socket------連接通道數(shù)量: " + General.CHANNEL_MAP.size());
}
}
}
(4)編寫協(xié)議初始化解碼器,用來判定實(shí)際使用什么協(xié)議(實(shí)現(xiàn)websocket和socket同時(shí)支持的關(guān)鍵點(diǎn)就在這里)
package com.example.dzx.netty.qiyanproject.server;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2023年06月30日 21:29:17
*/
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author dzx
* @ClassName:
* @Description: 協(xié)議初始化解碼器.用來判定實(shí)際使用什么協(xié)議,以用來處理前端websocket或者后端netty客戶端的連接或通信
* @date 2023年06月30日 21:31:24
*/
@Component
@Slf4j
public class SocketChooseHandler extends ByteToMessageDecoder {
/** 默認(rèn)暗號(hào)長度為23 */
private static final int MAX_LENGTH = 23;
/** WebSocket握手的協(xié)議前綴 */
private static final String WEBSOCKET_PREFIX = "GET /";
@Resource
private SpringContextUtil springContextUtil;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in);
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
springContextUtil.getBean(PipelineAdd.class).websocketAdd(ctx);
//對(duì)于 webSocket ,不設(shè)置超時(shí)斷開
ctx.pipeline().remove(IdleStateHandler.class);
// ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class);
this.putChannelType(ctx.channel().id(), true);
}else{
this.putChannelType(ctx.channel().id(), false);
}
in.resetReaderIndex();
ctx.pipeline().remove(this.getClass());
}
private String getBufStart(ByteBuf in){
int length = in.readableBytes();
if (length > MAX_LENGTH) {
length = MAX_LENGTH;
}
// 標(biāo)記讀位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}
/**
*
* @param channelId
* @param type
*/
public void putChannelType(ChannelId channelId,Boolean type){
if (General.CHANNEL_TYPE_MAP.containsKey(channelId)) {
log.info("Socket------客戶端【" + channelId + "】是否websocket協(xié)議:"+type);
} else {
//保存連接
General.CHANNEL_TYPE_MAP.put(channelId, type);
log.info("Socket------客戶端【" + channelId + "】是否websocket協(xié)議:"+type);
}
}
}
(5)給NettyServerChannelInitializer初始化類中的commonhandler添加前置處理器
package com.example.dzx.netty.qiyanproject.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: 給NettyServerChannelInitializer初始化類中的commonhandler添加前置處理器
* @date 2023年06月30日 21:31:24
*/
@Component
public class PipelineAdd {
public void websocketAdd(ChannelHandlerContext ctx) {
System.out.println("PipelineAdd");
ctx.pipeline().addBefore("commonhandler", "http-codec", new HttpServerCodec());
ctx.pipeline().addBefore("commonhandler", "aggregator", new HttpObjectAggregator(999999999));
ctx.pipeline().addBefore("commonhandler", "http-chunked", new ChunkedWriteHandler());
// ctx.pipeline().addBefore("commonhandler","WebSocketServerCompression",new WebSocketServerCompressionHandler());
ctx.pipeline().addBefore("commonhandler", "ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
// ctx.pipeline().addBefore("commonhandler","StringDecoder",new StringDecoder(CharsetUtil.UTF_8)); // 解碼器,將字節(jié)轉(zhuǎn)換為字符串
// ctx.pipeline().addBefore("commonhandler","StringEncoder",new StringEncoder(CharsetUtil.UTF_8));
// HttpServerCodec:將請(qǐng)求和應(yīng)答消息解碼為HTTP消息
// ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
//
// // HttpObjectAggregator:將HTTP消息的多個(gè)部分合成一條完整的HTTP消息
// ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(999999999));
//
// // ChunkedWriteHandler:向客戶端發(fā)送HTML5文件,文件過大會(huì)將內(nèi)存撐爆
// ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
//
// ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(999999999));
//
// //用于處理websocket, /ws為訪問websocket時(shí)的uri
// ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
}
}
(6)編寫業(yè)務(wù)處理器
package com.example.dzx.netty.qiyanproject.server;
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author dzx
* @ClassName:
* @Description: netty服務(wù)端處理類
* @date 2023年06月30日 21:27:16
*/
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
//由于繼承了SimpleChannelInboundHandler,這個(gè)方法必須實(shí)現(xiàn),否則報(bào)錯(cuò)
//但實(shí)際應(yīng)用中,這個(gè)方法沒被調(diào)用
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buff = (ByteBuf) msg;
String info = buff.toString(CharsetUtil.UTF_8);
log.info("收到消息內(nèi)容:" + info);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// WebSocket消息處理
String strMsg = "";
if (msg instanceof WebSocketFrame) {
log.info("WebSocket消息處理************************************************************");
strMsg = ((TextWebSocketFrame) msg).text().trim();
log.info("收到webSocket消息:" + strMsg);
}
// Socket消息處理
else if (msg instanceof ByteBuf) {
log.info("Socket消息處理=================================");
ByteBuf buff = (ByteBuf) msg;
strMsg = buff.toString(CharsetUtil.UTF_8).trim();
log.info("收到socket消息:" + strMsg);
}
// else {
// strMsg = msg.toString();
// }
this.channelWrite(ctx.channel().id(), strMsg);
}
/**
* 有客戶端終止連接服務(wù)器會(huì)觸發(fā)此函數(shù)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客戶端才去刪除
if (General.CHANNEL_MAP.containsKey(channelId)) {
//刪除連接
General.CHANNEL_MAP.remove(channelId);
System.out.println();
log.info("Socket------客戶端【" + channelId + "】退出netty服務(wù)器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
log.info("Socket------連接通道數(shù)量: " + General.CHANNEL_MAP.size());
General.CHANNEL_TYPE_MAP.remove(channelId);
}
}
/**
* 服務(wù)端給客戶端發(fā)送消息
*/
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
ChannelHandlerContext ctx = General.CHANNEL_MAP.get(channelId);
if (ctx == null) {
log.info("Socket------通道【" + channelId + "】不存在");
return;
}
if (msg == null || msg == "") {
log.info("Socket------服務(wù)端響應(yīng)空的消息");
return;
}
//將客戶端的信息直接返回寫入ctx
log.info("Socket------服務(wù)端端返回報(bào)文......【" + channelId + "】" + " :" + (String) msg);
// ctx.channel().writeAndFlush(msg);
// ctx.writeAndFlush(msg);
//刷新緩存區(qū)
// ctx.flush();
//過濾掉當(dāng)前通道id
Set<ChannelId> channelIdSet = General.CHANNEL_MAP.keySet().stream().filter(id -> !id.asLongText().equalsIgnoreCase(channelId.asLongText())).collect(Collectors.toSet());
//廣播消息到客戶端
for (ChannelId id : channelIdSet) {
//是websocket協(xié)議
Boolean aBoolean = General.CHANNEL_TYPE_MAP.get(id);
if(aBoolean!=null && aBoolean){
General.CHANNEL_MAP.get(id).channel().writeAndFlush(new TextWebSocketFrame((String) msg));
}else {
ByteBuf byteBuf = Unpooled.copiedBuffer(((String) msg).getBytes());
General.CHANNEL_MAP.get(id).channel().writeAndFlush(byteBuf);
}
}
}
/**
* 處理空閑狀態(tài)事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("Socket------Client: " + socketString + " READER_IDLE 讀超時(shí)");
ctx.disconnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("Socket------Client: " + socketString + " WRITER_IDLE 寫超時(shí)");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("Socket------Client: " + socketString + " ALL_IDLE 總超時(shí)");
ctx.disconnect();
}
}
}
/**
* @DESCRIPTION: 發(fā)生異常會(huì)觸發(fā)此函數(shù)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("Socket------" + ctx.channel().id() + " 發(fā)生了錯(cuò)誤,此連接被關(guān)閉" + "此時(shí)連通數(shù)量: " + General.CHANNEL_MAP.size(),cause);
}
}
?(7)spring上下文工具類
package com.example.dzx.netty.qiyanproject.netty1;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: spring容器上下文工具類
* @date 2023年06月30日 21:30:02
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @Description: 獲取spring容器中的bean, 通過bean類型獲取
*/
public static <T> T getBean(Class<T> beanClass) {
return applicationContext.getBean(beanClass);
}
}
(8)編寫全局map常量類
package com.example.dzx.netty.qiyanproject.constants;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2023年07月02日 19:12:42
*/
public class General {
/**
* 管理一個(gè)全局map,保存連接進(jìn)服務(wù)端的通道數(shù)量
*/
public static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 管理一個(gè)全局mao, 報(bào)存連接進(jìn)服務(wù)器的各個(gè)通道類型
*/
public static final ConcurrentHashMap<ChannelId, Boolean> CHANNEL_TYPE_MAP = new ConcurrentHashMap<>();
}
三、netty客戶端
(1)編寫netty客戶端,用于測試向服務(wù)端的消息發(fā)送
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author dzx
* @ClassName:
* @Description: netty客戶端
* @date 2023年06月30日 21:30:02
*/
public class SocketClient {
// 服務(wù)端IP
static final String HOST = System.getProperty("host", "127.0.0.1");
// 服務(wù)端開放端口
static final int PORT = Integer.parseInt(System.getProperty("port", "7777"));
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);
// 主函數(shù)啟動(dòng)
public static void main(String[] args) throws InterruptedException {
sendMessage("我是客戶端,我發(fā)送了一條數(shù)據(jù)給netty服務(wù)端。。");
}
/**
* 核心方法(處理:服務(wù)端向客戶端發(fā)送的數(shù)據(jù)、客戶端向服務(wù)端發(fā)送的數(shù)據(jù))
*/
public static void sendMessage(String content) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new SocketChannelInitializer());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
for (int i = 0; i < 3; i++) {
future.channel().writeAndFlush(content);
Thread.sleep(2000);
}
// 程序阻塞
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
(2)編寫netty客戶端初始化處理器
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* @author dzx
* @ClassName:
* @Description: netty客戶端初始化時(shí)設(shè)置出站和入站的編碼器和解碼器
* @date 2023年06月30日 21:30:02
*/
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new SocketHandler());
}
}
(3)netty客戶端業(yè)務(wù)處理器,用于接收并處理服務(wù)端發(fā)送的消息數(shù)據(jù)
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: netty客戶端處理器
* @date 2023年06月30日 21:30:02
*/
@Slf4j
public class SocketHandler extends ChannelInboundHandlerAdapter {
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.debug("SocketHandler Active(客戶端)");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.debug("####接收服務(wù)端發(fā)送過來的消息####");
LOGGER.debug("SocketHandler read Message:" + msg);
//獲取服務(wù)端連接的遠(yuǎn)程地址
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
//獲取服務(wù)端的IP地址
String clientIp = insocket.getAddress().getHostAddress();
//獲取服務(wù)端的端口號(hào)
int clientPort = insocket.getPort();
log.info("netty服務(wù)端[IP:" + clientIp + "--->PORT:" + clientPort + "]");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.debug("####客戶端斷開連接####");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
至此,netty服務(wù)端和netty客戶端都編寫完畢,我們可以來進(jìn)行測試了。
四、測試
(1)前端 websocket 向 后端NetAssist測試工具發(fā)送消息
?
?在前端窗口向后端 發(fā)送了一個(gè) 22222的 字符串,后端測試工具成功接收到消息并展示在對(duì)話框中。
(2)后端NetAssist向 前端 websocket 發(fā)送消息
?
??在后端窗口向前端 發(fā)送了一個(gè){"deviceId":"11111","deviceName":"qz-01","deviceStatus":"2"}的 字符串,前端測試工具成功接收到消息并展示在對(duì)話框中。
五、代碼倉庫地址
完整項(xiàng)目已上傳至gitee倉庫,請(qǐng)點(diǎn)擊下方傳送門自行獲取,一鍵三連!!
https://gitee.com/dzxmy/netty-web-socketd-dnamic
無法訪問就點(diǎn)擊下方傳送門去我的資源下載即可文章來源:http://www.zghlxwxcb.cn/news/detail-532767.html
https://download.csdn.net/download/qq_31905135/88044942?spm=1001.2014.3001.5503文章來源地址http://www.zghlxwxcb.cn/news/detail-532767.html
到了這里,關(guān)于SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!