Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的NIO框架,它提供了對(duì)TCP、UDP和文件傳輸?shù)闹С帧?/p>
Netty是對(duì)JDK自帶的NIO的API進(jìn)行封裝,具有高并發(fā),高性能等優(yōu)點(diǎn)。
項(xiàng)目中經(jīng)常用到netty實(shí)現(xiàn)服務(wù)器與設(shè)備的通信,先寫(xiě)服務(wù)端代碼:
@Slf4j
@Component
public class NettyServerBootstrap {
private Channel serverChannel;
private static final int DEFAULT_PORT = 60782;
//bossGroup只是處理連接請(qǐng)求
private static EventLoopGroup bossGroup = null;
//workGroup處理非連接請(qǐng)求,如果牽扯到數(shù)據(jù)量處理業(yè)務(wù)非常耗時(shí)的可以再單獨(dú)新建一個(gè)eventLoopGroup,并在childHandler初始化的時(shí)候添加到pipeline綁定
private static EventLoopGroup workGroup = null;
/**
* 啟動(dòng)Netty服務(wù)
*
* @return 啟動(dòng)結(jié)果
*/
@PostConstruct
public boolean start() {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
//創(chuàng)建服務(wù)端啟動(dòng)對(duì)象
ServerBootstrap bootstrap = new ServerBootstrap();
try {
//使用鏈?zhǔn)骄幊虂?lái)設(shè)置
bootstrap.group(bossGroup, workGroup)//設(shè)置兩個(gè)線程組
//使用NioSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
.channel(NioServerSocketChannel.class)
//設(shè)置線程隊(duì)列得到的連接數(shù)
.option(ChannelOption.SO_BACKLOG, 1024)
//設(shè)置保持活動(dòng)連接狀態(tài)
.childOption(ChannelOption.SO_KEEPALIVE, true)
//設(shè)置處理器 WorkerGroup 的 EvenLoop 對(duì)應(yīng)的管道設(shè)置處理器
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch){
log.info("--------------有客戶端連接");
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new NettyServerHandler());
}
});
//綁定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(DEFAULT_PORT).sync();
log.info("netty服務(wù)啟動(dòng)成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
serverChannel = future.channel();
ThreadUtil.execute(() -> {
//等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉
try {
future.channel().closeFuture().sync();
log.info("netty服務(wù)正常關(guān)閉成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
} catch (InterruptedException | UnknownHostException e) {
e.printStackTrace();
} finally {
shutdown();
}
});
} catch (Exception e) {
e.printStackTrace();
log.error("netty服務(wù)異常,異常原因:{}", e.getMessage());
return false;
}
return true;
}
/**
* 關(guān)閉當(dāng)前server
*/
public boolean close() {
if (serverChannel != null) {
serverChannel.close();//關(guān)閉服務(wù)
try {
//保險(xiǎn)起見(jiàn)
serverChannel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} finally {
shutdown();
serverChannel = null;
}
}
return true;
}
/**
* 優(yōu)雅關(guān)閉
*/
private void shutdown() {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
服務(wù)端處理類代碼:
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 處理讀取到的msg
*
* @param ctx 上下文
* @param msg 數(shù)據(jù)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception {
System.out.println("服務(wù)端收到的消息--------"+msg);
ctx.channel().writeAndFlush("ok");
}
/**
* 斷開(kāi)連接
*
* @param ctx 傻瓜下文
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ChannelId channelId = ctx.channel().id();
log.info("客戶端id:{},斷開(kāi)連接,ip:{}", channelId, ctx.channel().remoteAddress());
super.handlerRemoved(ctx);
}
}
接下來(lái)模擬客戶端:
@Configuration
@Component
public class TianmiaoClient {
private static String ip;
private static int port ;
@Value("${tianmiao.nettyServer.ip}")
public void setIp(String ip) {
this.ip = ip;
}
@Value("${tianmiao.nettyServer.port}")
public void setPort(int port) {
this.port = port;
}
/**
* 服務(wù)類
*/
private static Bootstrap bootstrap=null;
/**
* 初始化 項(xiàng)目啟動(dòng)后自動(dòng)初始化
*/
@PostConstruct
public void init() {
//worker
EventLoopGroup worker = new NioEventLoopGroup();
bootstrap = new Bootstrap();
//設(shè)置線程池
bootstrap.group(worker);
//設(shè)置socket工廠
bootstrap.channel(NioSocketChannel.class);
//設(shè)置管道
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new TianmiaoClientHandler());
}
});
}
/**
* 獲取會(huì)話 (獲取或者創(chuàng)建一個(gè)會(huì)話)
*/
public Channel createChannel() {
try {
Channel channel = bootstrap.connect( ip, port).sync().channel();
return channel;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
客戶端處理類代碼
public class TianmiaoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("服務(wù)端發(fā)過(guò)來(lái)的消息:"+s);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(".......................tcp斷開(kāi)連接.........................");
//移除
Channel channel = ctx.channel();
channel.close().sync();
super.channelInactive(ctx);
}
}
管理客戶端channel的一個(gè)工具類:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-660890.html
public class TianmiaoChannelManager {
/**
* 在線會(huì)話(存儲(chǔ)注冊(cè)成功的會(huì)話)
*/
private static final ConcurrentHashMap<String, Channel> onlineChannels = new ConcurrentHashMap<>();
/**
* 加入
*
* @param mn
* @param channel
* @return
*/
public static boolean putChannel(String mn, Channel channel) {
if (!onlineChannels.containsKey(mn)) {
boolean success = onlineChannels.putIfAbsent(mn, channel) == null ? true : false;
return success;
}
return false;
}
/**
* 移除
*
* @param mn
*/
public static Channel removeChannel(String mn) {
return onlineChannels.remove(mn);
}
/**
* 獲取Channel
*
* @param mn
* @return
*/
public static Channel getChannel(String mn) {
// 獲取一個(gè)可用的會(huì)話
Channel channel = onlineChannels.get(mn);
if (channel != null) {
// 連接有可能是斷開(kāi),加入已經(jīng)斷開(kāi)連接了,我們需要進(jìn)行嘗試重連
if (!channel.isActive()) {
//先移除之前的連接
removeChannel(mn);
return null;
}
}
return channel;
}
/**
* 發(fā)送消息[自定義協(xié)議]
*
* @param <T>
* @param mn
* @param msg
*/
public static <T> void sendMessage(String mn, String msg) {
Channel channel = onlineChannels.get(mn);
if (channel != null && channel.isActive()) {
channel.writeAndFlush(msg);
}
}
/**
* 發(fā)送消息[自定義協(xié)議]
*
* @param <T>
* @param msg
*/
public static <T> void sendChannelMessage(Channel channel, String msg) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(msg);
}
}
/**
* 關(guān)閉連接
*
* @return
*/
public static void closeChannel(String mn) {
onlineChannels.get(mn).close();
}
}
最后是客戶端使用方法:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-660890.html
/**
* 發(fā)送數(shù)據(jù)包
* @param key
*/
public static void tianmiaoData(String key, String data) {
Channel channel = TianmiaoChannelManager.getChannel(key);
//將通道存入
if(channel==null){
TianmiaoClient client = new TianmiaoClient();
channel = client.createChannel();
TianmiaoChannelManager.putChannel(key, channel);
}
if (channel != null && channel.isActive()) {
//發(fā)送數(shù)據(jù)
channel.writeAndFlush(data);
System.out.println("-------------天苗轉(zhuǎn)發(fā)數(shù)據(jù)成功-------------");
}
}
到了這里,關(guān)于netty-發(fā)起tcp長(zhǎng)連接(包含客戶端和服務(wù)端)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!