場景
在SpringBoot項目中需要對接三方系統,對接協議是TCP,需實現一個TCP客戶端接收
服務端發(fā)送的數據并按照16進制進行解析數據,然后對數據進行過濾,將指定類型的數據
通過mybatis存儲進mysql數據庫中。并且當tcp服務端斷連時,tcp客戶端能定時檢測并發(fā)起重連。
全流程效果
?
注:
博客:
霸道流氓氣質的博客_CSDN博客-C#,架構之路,SpringBoot領域博主
實現
1、SpringBoot+Netty實現TCP客戶端
本篇參考如下博客,在如下博客基礎上進行修改
Springboot+Netty搭建基于TCP協議的客戶端(二):
https://www.cnblogs.com/haolb123/p/16553005.html
上面博客提供的示例代碼
https://download.csdn.net/download/myyhtw/12369531
引入Netty的依賴
??????? <!--? netty依賴 springboot2.x自動導入版本-->
??????? <dependency>
??????????? <groupId>io.netty</groupId>
??????????? <artifactId>netty-all</artifactId>
??????? </dependency>
2、新建Netty的client類
package com.badao.demo.netty;
import com.badao.demo.global.Global;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
?*
?* netty 客戶端
?*
?*/
public class BootNettyClient {
?public void connect(int port, String host) throws Exception{
??/**
?? * 客戶端的NIO線程組
?? *
?? */
??????? EventLoopGroup group = new NioEventLoopGroup();
??????? try {
??????? ?/**
??????? ? * Bootstrap 是一個啟動NIO服務的輔助啟動類 客戶端的
??????? ? */
??????? ?Bootstrap bootstrap = new Bootstrap();
??????? ?bootstrap = bootstrap.group(group);
??????? ?bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
??????? ?/**
??????? ? * 設置 I/O處理類,主要用于網絡I/O事件,記錄日志,編碼、解碼消息
??????? ? */
??????? ?bootstrap = bootstrap.handler(new BootNettyChannelInitializer<SocketChannel>());
??????? ?/**
??????? ? * 連接服務端
??????? ? */
???ChannelFuture future = bootstrap.connect(host, port).sync();
???if(future.isSuccess()) {
????//是否連接tcp成功
????Global.getInstance().canTcpConnected = true;
????Channel channel = future.channel();
????String id = future.channel().id().toString();
????BootNettyClientChannel bootNettyClientChannel = new BootNettyClientChannel();
????bootNettyClientChannel.setChannel(channel);
????bootNettyClientChannel.setCode("clientId:"+id);
????BootNettyClientChannelCache.save("clientId:"+id, bootNettyClientChannel);
????System.out.println("netty client start success="+id);
????/**
???? * 等待連接端口關閉
???? */
????future.channel().closeFuture().sync();
???}else{
???}
??} finally {
???/**
??? * 退出,釋放資源
??? */
???group.shutdownGracefully().sync();
??}
?}
}
注意這里的在連接成功之后的修改
?
新增了一個全局的單例變量類Global,用來作為斷線重連的判斷,后面后具體代碼實現。
接著將clientId保存的實現,可以根據自己需要決定是否保留,不需要可刪除,并且下面
第4條BootNettyClientChannel以及第5條BootNettyClientChannelCache也可做相應的刪除或修改。
3、新建通道初始化
package com.badao.demo.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
/**
?* 通道初始化
?*
?*/
@ChannelHandler.Sharable
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
?@Override
?protected void initChannel(Channel ch) {
??????? /**
???????? * 自定義ChannelInboundHandlerAdapter
???????? */
??????? ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
?}
}
注意與原有不一樣的是這里刪掉了自定義解碼器的實現
?文章來源地址http://www.zghlxwxcb.cn/news/detail-778485.html
這里根據自己實際情況決定是否保留以及格式,否則會提示
String cannot be cast to io.netty.buffer.ByteBuf
4、新建通道對象
package com.badao.demo.netty;
import io.netty.channel.Channel;
public class BootNettyClientChannel {
?//?連接客戶端唯一的code
?private String code;
?//?客戶端最新發(fā)送的消息內容
?private String last_data;
?private transient volatile Channel channel;
?public String getCode() {
??return code;
?}
?public void setCode(String code) {
??this.code = code;
?}
?public Channel getChannel() {
??return channel;
?}
?public void setChannel(Channel channel) {
??this.channel = channel;
?}
?public String getLast_data() {
??return last_data;
?}
?public void setLast_data(String last_data) {
??this.last_data = last_data;
?}
}
5、新建保存ClientChannel的Cache類
package com.badao.demo.netty;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class BootNettyClientChannelCache {
??? public static volatile Map<String, BootNettyClientChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyClientChannel>();
??? public static void add(String code, BootNettyClientChannel channel){
??? ?channelMapCache.put(code,channel);
??? }
??? public static BootNettyClientChannel get(String code){
??????? return channelMapCache.get(code);
??? }
??? public static void remove(String code){
??? ?channelMapCache.remove(code);
??? }
??? public static void save(String code, BootNettyClientChannel channel) {
??????? if(channelMapCache.get(code) == null) {
??????????? add(code,channel);
??????? }
??? }
}
6、最重要的是新建客戶端I/O數據讀寫處理類
package com.badao.demo.netty;
import com.badao.demo.entity.BusStallProptection;
import com.badao.demo.entity.StallVo;
import com.badao.demo.global.Global;
import com.badao.demo.mapper.BusStallProptectionMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Date;
/**
?*
?* I/O數據讀寫處理類
?*
?*/
@ChannelHandler.Sharable
@Component
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
??? public static BootNettyChannelInboundHandlerAdapter bootNettyChannelInboundHandlerAdapter;
??? //1.正常注入[記得主類也需要使用@Component注解]
??? @Autowired
??? BusStallProptectionMapper busStallProptectionMapper;
??? //2.初始化構造方法一定要有
??? public BootNettyChannelInboundHandlerAdapter(){
??? }
??? //3.容器初始化的時候進行執(zhí)行-這里是重點
??? @PostConstruct
??? public void init() {
??????? bootNettyChannelInboundHandlerAdapter = this;
??????? bootNettyChannelInboundHandlerAdapter.busStallProptectionMapper = this.busStallProptectionMapper;
??? }
??? /**
???? * 從服務端收到新的數據時,這個方法會在收到消息時被調用
???? */
??? @Override
??? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
??????? if(msg == null){
??????????? return;
??????? }
??????? ByteBuf buf = (ByteBuf) msg;
??????? byte[] bytes = new byte[buf.readableBytes()];
??????? // 復制內容到字節(jié)數組bytes
??????? buf.readBytes(bytes);
??????? // 將接收到的數據轉為字符串,此字符串就是客戶端發(fā)送的字符串
??????? String receiveStr = NettyConnectHelper.receiveHexToString(bytes);
??????? StallVo stallVo = NettyConnectHelper.receiveHexToObj(bytes);
??????? BootNettyClientChannel bootNettyClientChannel = BootNettyClientChannelCache.get("clientId:"+ctx.channel().id().toString());
??????? if(bootNettyClientChannel != null){
??????????? //判斷指定狀態(tài)的數據進行處理
??????????? if(Global.getInstance().abnormalCarStatusList.contains(stallVo.getCarStatus())){
??????????????? BusStallProptection busStallProptection = BusStallProptection.builder()
??????????????????????? .carNumber(stallVo.getCarNumber())
??????????????????????? .carState(stallVo.getCarStatus())
??????????????????????? .stallScope(stallVo.getAreaNumber())
??????????????????????? .rawData(receiveStr)
??????????????????????? .uploadTime(new Date())
??????????????????????? .build();
??????????????? //插入數據庫
??????????????? bootNettyChannelInboundHandlerAdapter.busStallProptectionMapper.insert(busStallProptection);
??????????? }
??????????? bootNettyClientChannel.setLast_data(msg.toString());
??????? }
??? }
??? /**
???? * 從服務端收到新的數據、讀取完成時調用
???? */
??? @Override
??? public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
??? ?ctx.flush();
??? }
??? /**
???? * 當出現 Throwable 對象才會被調用,即當 Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時
???? */
??? @Override
??? public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
??? ?System.out.println("exceptionCaught");
??????? cause.printStackTrace();
??????? ctx.close();//拋出異常,斷開與客戶端的連接
??? }
??? /**
???? * 客戶端與服務端第一次建立連接時 執(zhí)行
???? */
??? @Override
??? public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
??????? super.channelActive(ctx);
??????? InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
??????? String clientIp = inSocket.getAddress().getHostAddress();
??????? System.out.println(clientIp);
??? }
??? /**
???? * 客戶端與服務端 斷連時 執(zhí)行
???? */
??? @Override
??? public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
??????? super.channelInactive(ctx);
??????? InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
??????? String clientIp = inSocket.getAddress().getHostAddress();
??????? ctx.close(); //斷開連接時,必須關閉,否則造成資源浪費
??????? System.out.println("channelInactive:"+clientIp);
??????? Global.getInstance().canTcpConnected = false;
??? }
}
這里做的修改較多主要是修改channelRead從服務端收到新的數據時的回調方法
會將數據進行按照16進制讀取和解析為字符串,并作為對接的原始數據進行存儲。
還會將數據按照16進制解析并獲取對應位的字符并賦值到對象保存到數據庫。
其中用到的相關工具類方法,又封裝了一個NettyConnectHelper
其中包含用來發(fā)起連接以及16進制解析和轉換對象的相關方法
package com.badao.demo.netty;
import com.badao.demo.entity.StallVo;
public? class NettyConnectHelper {
??? /**
???? * 發(fā)起連接
???? */
??? public static void doConnect(){
??????? try {
??????????? /**
???????????? * 使用異步注解方式啟動netty客戶端服務
???????????? */
??????????? new BootNettyClient().connect(8600, "127.0.0.1");
??????? }catch (Exception exception){
??????????? System.out.println("tcp連接異常");
??????? }
??? }
??? /**
???? * 接收字節(jié)數據并轉換為16進制字符串
???? * @param by
???? * @return
???? */
??? public? static String receiveHexToString(byte[] by) {
??????? try {
??????????? String str = bytes2Str(by);
??????????? str = str.toUpperCase();
??????????? return str;
??????? } catch (Exception ex) {
??????????? ex.printStackTrace();
??????????? System.out.println("接收字節(jié)數據并轉為16進制字符串異常");
??????? }
??????? return null;
??? }
??? /**
???? * 字節(jié)數組轉換為16進制字符串
???? * @param src
???? * @return
???? */
??? public static String bytes2Str(byte[] src){
??????? StringBuilder stringBuilder = new StringBuilder("");
??????? if (src == null || src.length <= 0) {
??????????? return null;
??????? }
??????? for (int i = 0; i < src.length; i++) {
??????????? if(i>0){
??????????????? stringBuilder.append(" ");
??????????? }
??????????? int v = src[i] & 0xFF;
??????????? String hv = Integer.toHexString(v);
??????????? if (hv.length() < 2) {
??????????????? stringBuilder.append(0);
??????????? }
??????????? stringBuilder.append(hv);
??????? }
??????? return stringBuilder.toString();
??? }
??? /**
???? *? 字節(jié)轉換為16進制字符
???? * @param src
???? * @return
???? */
??? public static String byte2Str(byte src){
??????? StringBuilder stringBuilder = new StringBuilder("");
??????? int v = src & 0xFF;
??????? String hv = Integer.toHexString(v);
??????? if (hv.length() < 2) {
??????????? stringBuilder.append(0);
??????? }
??????? stringBuilder.append(hv.toUpperCase());
??????? return stringBuilder.toString();
??? }
??? /**
???? * 接收字節(jié)數據并轉換為對象
???? * @param by
???? * @return
???? */
??? public static StallVo receiveHexToObj(byte[] by) {
??????? try {
??????????? StallVo stallVo = bytes2Obj(by);
??????????? return stallVo;
??????? } catch (Exception ex) {
??????????? ex.printStackTrace();
??????????? System.out.println("接收字節(jié)數據并轉為對象異常");
??????? }
??????? return null;
??? }
??? /**
???? * 對象屬性賦值
???? * @param src
???? * @return
???? */
??? public static StallVo bytes2Obj(byte[] src){
??????? if (src == null || src.length <= 0) {
??????????? return null;
??????? }
??????? //依據約定,第一位為區(qū)域編號;第四位為車輛狀態(tài);第五位為車輛編號
??????? StallVo stallVo = StallVo.builder()
??????????????? .areaNumber(byte2Str(src[0]))
??????????????? .carStatus(byte2Str(src[3]))
??????????????? .carNumber(byte2Str(src[4]))
??????????????? .build();
??????? return stallVo;
??? }
}
這里跟業(yè)務相關挺多,對象屬性映射的都是依據對接時的約定。
這里的對象StallVo
package com.badao.demo.entity;
import lombok.Builder;
import lombok.Data;
/**
?* 失速保護VO
?*/
@Data
@Builder
public class StallVo {
??? //區(qū)域編號
??? private String areaNumber;
??? //車輛狀態(tài)
??? private String carStatus;
??? //車輛編號
??? private String carNumber;
}
繼續(xù)上面的讀的處理類
if(Global.getInstance().abnormalCarStatusList.contains(stallVo.getCarStatus()))
這里是業(yè)務需要,根據傳輸的數據進行判斷,指定位的數據是否為需要的類型數據,只對需要的數據進行存儲。
下面附全局單例類Global
package com.badao.demo.global;
import com.badao.demo.enums.CarStatus;
import java.util.ArrayList;
import java.util.List;
public class Global {
??? //標識當前是否已經連接TCP
??? public? boolean canTcpConnected = false;
??? //過濾tcp數據,需要的數據類型的枚舉變量的list
??? public List<String> abnormalCarStatusList = new ArrayList<String>()
??? {
??????? {
??????????? add(CarStatus.OverSpeed.getCode());
??????????? add(CarStatus.EmergStop.getCode());
??????????? add(CarStatus.StallProtected.getCode());
??????? }
??? };
??? private static final Global _global = new Global();
??? private Global(){};
??? public static Global getInstance(){
??????? return _global;
??? }
}
關于單例模式的實現可參考
設計模式-單例模式-餓漢式單例模式、懶漢式單例模式、靜態(tài)內部類在Java中的使用示例:
設計模式-單例模式-餓漢式單例模式、懶漢式單例模式、靜態(tài)內部類在Java中的使用示例_霸道流氓氣質的博客-CSDN博客
其中Global中保存的list是枚舉類的相關字段屬性
package com.badao.demo.enums;
/**
?* 車輛狀態(tài)
?*
?*/
public enum CarStatus
{
??? NormalCar("00", "沒有車輛通過或車輛速度正常"), OverSpeed("01", "車輛超速行駛"),EmergStop("02", "車輛急停"), StallProtected("03", "車輛失速保護");
??? private final String code;
??? private final String info;
??? CarStatus(String code, String info)
??? {
??????? this.code = code;
??????? this.info = info;
??? }
??? public String getCode()
??? {
??????? return code;
??? }
??? public String getInfo()
??? {
??????? return info;
??? }
}
繼續(xù)上面在解析數據并判斷是需要的類型之后,就是封裝到存儲數據庫的相關實體并插入到mysql。
附BusStallProptection
package com.badao.demo.entity;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@Builder
public class BusStallProptection implements Serializable {
??? private Integer id;
??? private String carNumber;
??? private String carState;
??? private String stallScope;
??? private String rawData;
??? private Date uploadTime;
}
封裝完之后實現調用mapper的方法插入到數據庫。
7、Netty的I/O數據讀寫處理類BootNettyChannelInboundHandlerAdapter中注入Mapper的方式
I/O數據讀寫處理類BootNettyChannelInboundHandlerAdapter添加注解@Component
@ChannelHandler.Sharable
@Component
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
然后正常注入mapper或service
??? //1.正常注入[記得主類也需要使用@Component注解]
??? @Autowired
??? BusStallProptectionMapper busStallProptectionMapper;
然后新增初始化構造方法
??? //2.初始化構造方法一定要有
??? public BootNettyChannelInboundHandlerAdapter(){
??? }
然后容器初始化時執(zhí)行如下
??? //3.容器初始化的時候進行執(zhí)行-這里是重點
??? @PostConstruct
??? public void init() {
??????? bootNettyChannelInboundHandlerAdapter = this;
??????? bootNettyChannelInboundHandlerAdapter.busStallProptectionMapper = this.busStallProptectionMapper;
??? }
前面要聲明static變量
public static BootNettyChannelInboundHandlerAdapter bootNettyChannelInboundHandlerAdapter;
然后在使用時就可以
bootNettyChannelInboundHandlerAdapter.busStallProptectionMapper.insert(busStallProptection);
?
8、修改SpringBoot的啟動類,使Netty項目啟動后進行TCP連接
package com.badao.demo;
import com.badao.demo.netty.NettyConnectHelper;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.badao.demo.mapper")
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
??? public static void main(String[] args) {
??????? SpringApplication application = new SpringApplication(DemoApplication.class);
??????? application.run(args);
??? }
??? @Override
??? public void run(String... args) {
??????? //如果需要項目一啟動就連接則執(zhí)行,否則通過定時任務執(zhí)行
??????? NettyConnectHelper.doConnect();
??? }
}
這里將發(fā)起連接的操作封裝到工具類方法中,并在方法中添加try-catch,避免連接不上tcp導致無法啟動。
9、SpringBoot中進行TCP客戶端斷線檢測與自動重連。
這里需要TCP的客戶端在斷線之后能自動發(fā)起重連,且不需重啟SpringBoot,所以這里需要借助定時任務的
實現。
新建Task類并進行定時任務實現
package com.badao.demo.task;
import com.badao.demo.global.Global;
import com.badao.demo.netty.NettyConnectHelper;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class CheckTcpConnectTask {
??? @Scheduled(cron = "0/10 * * * * ? ")
??? public void? checkReconnectTcpServer(){
??????? System.out.println("發(fā)起重連檢測");
??????? if(!Global.getInstance().canTcpConnected){
??????????? //進行連接
??????????? System.out.println("執(zhí)行連接");
??????????? NettyConnectHelper.doConnect();
??????? }
??? }
}
這里是10秒檢測一次前面定義的全局變量,如果未連接則調用發(fā)起連接的方法。
該變量默認為false,在建立連接的回調方法BootNettyClient中連接服務端之后將其賦值
為true。
???ChannelFuture future = bootstrap.connect(host, port).sync();
???if(future.isSuccess()) {
????//是否連接tcp成功
????Global.getInstance().canTcpConnected = true;
?
并在斷連的回調BootNettyChannelInboundHandlerAdapter的channelInactive賦值為false
??? @Override
??? public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
??????? super.channelInactive(ctx);
??????? InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
??????? String clientIp = inSocket.getAddress().getHostAddress();
??????? ctx.close(); //斷開連接時,必須關閉,否則造成資源浪費
??????? System.out.println("channelInactive:"+clientIp);
??????? Global.getInstance().canTcpConnected = false;
??? }
?
10、斷連檢測效果
?
11、TCP服務端、客戶端模擬測試工具
第一個是sokit
http://sqdownd.onlinedown.net/down/sokit-1.3-win32-chs.zip
下載之后解壓即用
?
可模擬TCP服務器、客戶端、轉發(fā)器等,F1打開幫助,如果需要發(fā)送16進制數據,需要用方括號
包圍。
除此之外還要諸多其他模擬和測試工具,比如客戶端工具serial,可自行搜索友善串口調試助手。
文章來源:http://www.zghlxwxcb.cn/news/detail-778485.html
?
到了這里,關于SpringBoot+Netty實現TCP客戶端實現接收數據按照16進制解析并存儲到Mysql以及Netty斷線重連檢測與自動重連的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!