国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦)

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

?

前言

技術(shù)棧

功能展示

一、springboot項(xiàng)目添加netty依賴

二、netty服務(wù)端

三、netty客戶端

四、測試

五、代碼倉庫地址


??專屬小彩蛋:前些天發(fā)現(xiàn)了一個(gè)巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點(diǎn)擊跳轉(zhuǎn)到網(wǎng)站(前言 - 床長人工智能教程)?

前言

? ? ? ? 最近做了一個(gè)硬件設(shè)備通信項(xiàng)目,需求是這樣,前端使用webSocket向后端進(jìn)行tcp協(xié)議的通信,后端netty服務(wù)端收到數(shù)據(jù)后,將數(shù)據(jù)發(fā)往socket客戶端,客戶端收到數(shù)據(jù)之后需要進(jìn)行響應(yīng)數(shù)據(jù)顯示到前端頁面供用戶進(jìn)行實(shí)時(shí)監(jiān)控。

技術(shù)棧

????????后端

  • springboot?
  • netty

????????前端

  • 前端websocket

功能展示

前端頁面輸入webSocket地址,點(diǎn)擊連接,輸入待發(fā)送的數(shù)據(jù),點(diǎn)擊發(fā)送

SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

?后端我們可以使用網(wǎng)絡(luò)測試工具NetAssist 進(jìn)行響應(yīng)測試SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

?在工具中連接netty服務(wù)端,并點(diǎn)擊發(fā)送按鈕,可以看到,前端頁面右側(cè)對(duì)話框成功顯示出了NetAssist測試工具響應(yīng)的數(shù)據(jù)內(nèi)容。接下來我們來看一看代碼如何進(jìn)行實(shí)現(xiàn),關(guān)鍵的點(diǎn)在于需要同時(shí)支持前端websocket和后端socket的連接,需要自定義一個(gè)協(xié)議選擇處理器。

一、springboot項(xiàng)目添加netty依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example.dzx.netty</groupId>
    <artifactId>qiyan-project</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>qiyan-project</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
            <version>2.6.7</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

二、netty服務(wù)端

?(1)netty服務(wù)啟動(dòng)類

package com.example.dzx.netty.qiyanproject.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * @author dzx
 * @ClassName:
 * @Description: netty服務(wù)啟動(dòng)類
 * @date 2023年06月30日 21:27:16
 */
@Slf4j
@Component
public class NettyServer {

    public void start(InetSocketAddress address) {
        //配置服務(wù)端的NIO線程組

        /*
         * 在Netty中,事件循環(huán)組是一組線程池,用于處理網(wǎng)絡(luò)事件,例如接收客戶端連接、讀寫數(shù)據(jù)等操作。
         * 它由兩個(gè)部分組成:bossGroup和workerGroup。
         * bossGroup 是負(fù)責(zé)接收客戶端連接請(qǐng)求的線程池。
         * workerGroup 是負(fù)責(zé)處理客戶端連接的線程池。
         * */

        EventLoopGroup bossGroup = new NioEventLoopGroup(10);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //創(chuàng)建ServerBootstrap實(shí)例,boss組用于接收客戶端連接請(qǐng)求,worker組用于處理客戶端連接。
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)  // 綁定線程池
                    .channel(NioServerSocketChannel.class)//通過TCP/IP方式進(jìn)行傳輸
                    .childOption(ChannelOption.SO_REUSEADDR, true) //快速復(fù)用端口
                    .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
                    .localAddress(address)//監(jiān)聽服務(wù)器地址
                    .childHandler(new NettyServerChannelInitializer())
//                    .childHandler(new com.ccp.dev.system.netty.NettyServerChannelInitializer())
                    .childOption(ChannelOption.TCP_NODELAY, true)//子處理器處理客戶端連接的請(qǐng)求和數(shù)據(jù)
                    .option(ChannelOption.SO_BACKLOG, 1024)  //服務(wù)端接受連接的隊(duì)列長度,如果隊(duì)列已滿,客戶端連接將被拒絕
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  //保持長連接,2小時(shí)無數(shù)據(jù)激活心跳機(jī)制

            // 綁定端口,開始接收進(jìn)來的連接
            ChannelFuture future = bootstrap.bind(address).sync();
            future.addListener(l -> {
                if (future.isSuccess()) {
                    System.out.println("Netty服務(wù)啟動(dòng)成功");
                } else {
                    System.out.println("Netty服務(wù)啟動(dòng)失敗");
                }
            });
            log.info("Netty服務(wù)開始監(jiān)聽端口: " + address.getPort());
            //關(guān)閉channel和塊,直到它被關(guān)閉
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("啟動(dòng)Netty服務(wù)器時(shí)出錯(cuò)", e);
        } finally {
            //釋放資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

?(2)服務(wù)端初始化類編寫,客戶端與服務(wù)器端連接一旦創(chuàng)建,這個(gè)類中方法就會(huì)被回調(diào),設(shè)置出站編碼器和入站解碼器以及各項(xiàng)處理器

package com.example.dzx.netty.qiyanproject.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;


/**
 * @author dzx
 * @ClassName:
 * @Description: 服務(wù)端初始化,客戶端與服務(wù)器端連接一旦創(chuàng)建,這個(gè)類中方法就會(huì)被回調(diào),設(shè)置出站編碼器和入站解碼器以及各項(xiàng)處理器
 * @date 2023年06月30日 21:27:16
 */
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {

//    private FullHttpResponse createCorsResponseHeaders() {
//        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//
//        // 設(shè)置允許跨域訪問的響應(yīng)頭
//        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
//        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
//        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, Authorization");
//        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_MAX_AGE, "3600");
//
//        return response;
//    }

    @Override
    protected void initChannel(SocketChannel channel) {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast("active", new ChannelActiveHandler());
        //Socket 連接心跳檢測
        pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
        pipeline.addLast("socketChoose", new SocketChooseHandler());
        pipeline.addLast("commonhandler",new NettyServerHandler());
    }
}

(3) 編寫新建連接處理器

package com.example.dzx.netty.qiyanproject.server;

import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * @author dzx
 * @ClassName:
 * @Description: 客戶端新建連接處理器
 * @date 2023年06月30日 21:27:16
 */

@ChannelHandler.Sharable
@Slf4j
public class ChannelActiveHandler extends ChannelInboundHandlerAdapter {

    /**
     * 有客戶端連接服務(wù)器會(huì)觸發(fā)此函數(shù)
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //獲取客戶端連接的遠(yuǎn)程地址
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        //獲取客戶端的IP地址
        String clientIp = insocket.getAddress().getHostAddress();
        //獲取客戶端的端口號(hào)
        int clientPort = insocket.getPort();
        //獲取連接通道唯一標(biāo)識(shí)
        ChannelId channelId = ctx.channel().id();
        //如果map中不包含此連接,就保存連接
        if (General.CHANNEL_MAP.containsKey(channelId)) {
            log.info("Socket------客戶端【" + channelId + "】是連接狀態(tài),連接通道數(shù)量: " + General.CHANNEL_MAP.size());
        } else {
            //保存連接
            General.CHANNEL_MAP.put(channelId, ctx);
            log.info("Socket------客戶端【" + channelId + "】連接netty服務(wù)器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
            log.info("Socket------連接通道數(shù)量: " + General.CHANNEL_MAP.size());
        }
    }
}

(4)編寫協(xié)議初始化解碼器,用來判定實(shí)際使用什么協(xié)議(實(shí)現(xiàn)websocket和socket同時(shí)支持的關(guān)鍵點(diǎn)就在這里)

package com.example.dzx.netty.qiyanproject.server;

/**
 * @author 500007
 * @ClassName:
 * @Description:
 * @date 2023年06月30日 21:29:17
 */

import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * @author dzx
 * @ClassName:
 * @Description:  協(xié)議初始化解碼器.用來判定實(shí)際使用什么協(xié)議,以用來處理前端websocket或者后端netty客戶端的連接或通信
 * @date 2023年06月30日 21:31:24
 */
@Component
@Slf4j
public class SocketChooseHandler extends ByteToMessageDecoder {
    /** 默認(rèn)暗號(hào)長度為23 */
    private static final int MAX_LENGTH = 23;
    /** WebSocket握手的協(xié)議前綴 */
    private static final String WEBSOCKET_PREFIX = "GET /";
    @Resource
    private SpringContextUtil springContextUtil;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        String protocol = getBufStart(in);
        if (protocol.startsWith(WEBSOCKET_PREFIX)) {
            springContextUtil.getBean(PipelineAdd.class).websocketAdd(ctx);

            //對(duì)于 webSocket ,不設(shè)置超時(shí)斷開
            ctx.pipeline().remove(IdleStateHandler.class);
//            ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class);
            this.putChannelType(ctx.channel().id(), true);
        }else{
            this.putChannelType(ctx.channel().id(), false);
        }
        in.resetReaderIndex();
        ctx.pipeline().remove(this.getClass());
    }

    private String getBufStart(ByteBuf in){
        int length = in.readableBytes();
        if (length > MAX_LENGTH) {
            length = MAX_LENGTH;
        }

        // 標(biāo)記讀位置
        in.markReaderIndex();
        byte[] content = new byte[length];
        in.readBytes(content);
        return new String(content);
    }

    /**
     *
     * @param channelId
     * @param type
     */
    public void putChannelType(ChannelId channelId,Boolean type){
        if (General.CHANNEL_TYPE_MAP.containsKey(channelId)) {
            log.info("Socket------客戶端【" + channelId + "】是否websocket協(xié)議:"+type);
        } else {
            //保存連接
            General.CHANNEL_TYPE_MAP.put(channelId, type);
            log.info("Socket------客戶端【" + channelId + "】是否websocket協(xié)議:"+type);
        }
    }
}

(5)給NettyServerChannelInitializer初始化類中的commonhandler添加前置處理器

package com.example.dzx.netty.qiyanproject.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.stereotype.Component;

/**
 * @author dzx
 * @ClassName:
 * @Description: 給NettyServerChannelInitializer初始化類中的commonhandler添加前置處理器
 * @date 2023年06月30日 21:31:24
 */
@Component
public class PipelineAdd {

    public void websocketAdd(ChannelHandlerContext ctx) {
        System.out.println("PipelineAdd");
        ctx.pipeline().addBefore("commonhandler", "http-codec", new HttpServerCodec());
        ctx.pipeline().addBefore("commonhandler", "aggregator", new HttpObjectAggregator(999999999));
        ctx.pipeline().addBefore("commonhandler", "http-chunked", new ChunkedWriteHandler());
//        ctx.pipeline().addBefore("commonhandler","WebSocketServerCompression",new WebSocketServerCompressionHandler());
        ctx.pipeline().addBefore("commonhandler", "ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));

//        ctx.pipeline().addBefore("commonhandler","StringDecoder",new StringDecoder(CharsetUtil.UTF_8)); // 解碼器,將字節(jié)轉(zhuǎn)換為字符串
//        ctx.pipeline().addBefore("commonhandler","StringEncoder",new StringEncoder(CharsetUtil.UTF_8));
        // HttpServerCodec:將請(qǐng)求和應(yīng)答消息解碼為HTTP消息
//        ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
//
//        // HttpObjectAggregator:將HTTP消息的多個(gè)部分合成一條完整的HTTP消息
//        ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(999999999));
//
//        // ChunkedWriteHandler:向客戶端發(fā)送HTML5文件,文件過大會(huì)將內(nèi)存撐爆
//        ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
//
//        ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(999999999));
//
//        //用于處理websocket, /ws為訪問websocket時(shí)的uri
//        ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));

    }
}

(6)編寫業(yè)務(wù)處理器

package com.example.dzx.netty.qiyanproject.server;


import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Set;
import java.util.stream.Collectors;


/**
 * @author dzx
 * @ClassName:
 * @Description: netty服務(wù)端處理類
 * @date 2023年06月30日 21:27:16
 */
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {


    //由于繼承了SimpleChannelInboundHandler,這個(gè)方法必須實(shí)現(xiàn),否則報(bào)錯(cuò)
    //但實(shí)際應(yīng)用中,這個(gè)方法沒被調(diào)用
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buff = (ByteBuf) msg;
        String info = buff.toString(CharsetUtil.UTF_8);
        log.info("收到消息內(nèi)容:" + info);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // WebSocket消息處理
        String strMsg = "";
        if (msg instanceof WebSocketFrame) {
            log.info("WebSocket消息處理************************************************************");
            strMsg = ((TextWebSocketFrame) msg).text().trim();
            log.info("收到webSocket消息:" + strMsg);
        }
        // Socket消息處理
        else if (msg instanceof ByteBuf) {
            log.info("Socket消息處理=================================");
            ByteBuf buff = (ByteBuf) msg;
            strMsg = buff.toString(CharsetUtil.UTF_8).trim();
            log.info("收到socket消息:" + strMsg);
        }
//        else {
//            strMsg = msg.toString();
//        }
        this.channelWrite(ctx.channel().id(), strMsg);
    }

    /**
     * 有客戶端終止連接服務(wù)器會(huì)觸發(fā)此函數(shù)
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {

        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();

        String clientIp = insocket.getAddress().getHostAddress();

        ChannelId channelId = ctx.channel().id();

        //包含此客戶端才去刪除
        if (General.CHANNEL_MAP.containsKey(channelId)) {
            //刪除連接
            General.CHANNEL_MAP.remove(channelId);
            System.out.println();
            log.info("Socket------客戶端【" + channelId + "】退出netty服務(wù)器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
            log.info("Socket------連接通道數(shù)量: " + General.CHANNEL_MAP.size());
            General.CHANNEL_TYPE_MAP.remove(channelId);
        }
    }


    /**
     * 服務(wù)端給客戶端發(fā)送消息
     */
    public void channelWrite(ChannelId channelId, Object msg) throws Exception {

        ChannelHandlerContext ctx = General.CHANNEL_MAP.get(channelId);

        if (ctx == null) {
            log.info("Socket------通道【" + channelId + "】不存在");
            return;
        }

        if (msg == null || msg == "") {
            log.info("Socket------服務(wù)端響應(yīng)空的消息");
            return;
        }

        //將客戶端的信息直接返回寫入ctx
        log.info("Socket------服務(wù)端端返回報(bào)文......【" + channelId + "】" + " :" + (String) msg);
//        ctx.channel().writeAndFlush(msg);
//        ctx.writeAndFlush(msg);
        //刷新緩存區(qū)
//        ctx.flush();
        //過濾掉當(dāng)前通道id
        Set<ChannelId> channelIdSet = General.CHANNEL_MAP.keySet().stream().filter(id -> !id.asLongText().equalsIgnoreCase(channelId.asLongText())).collect(Collectors.toSet());
        //廣播消息到客戶端
        for (ChannelId id : channelIdSet) {
            //是websocket協(xié)議
            Boolean aBoolean = General.CHANNEL_TYPE_MAP.get(id);
            if(aBoolean!=null && aBoolean){
                General.CHANNEL_MAP.get(id).channel().writeAndFlush(new TextWebSocketFrame((String) msg));
            }else {
                ByteBuf byteBuf = Unpooled.copiedBuffer(((String) msg).getBytes());
                General.CHANNEL_MAP.get(id).channel().writeAndFlush(byteBuf);
            }
        }
    }

    /**
     * 處理空閑狀態(tài)事件
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        String socketString = ctx.channel().remoteAddress().toString();

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Socket------Client: " + socketString + " READER_IDLE 讀超時(shí)");
                ctx.disconnect();
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Socket------Client: " + socketString + " WRITER_IDLE 寫超時(shí)");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Socket------Client: " + socketString + " ALL_IDLE 總超時(shí)");
                ctx.disconnect();
            }
        }
    }

    /**
     * @DESCRIPTION: 發(fā)生異常會(huì)觸發(fā)此函數(shù)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.error("Socket------" + ctx.channel().id() + " 發(fā)生了錯(cuò)誤,此連接被關(guān)閉" + "此時(shí)連通數(shù)量: " + General.CHANNEL_MAP.size(),cause);
    }
}

?(7)spring上下文工具類

package com.example.dzx.netty.qiyanproject.netty1;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author dzx
 * @ClassName:
 * @Description: spring容器上下文工具類
 * @date 2023年06月30日 21:30:02
 */
@Component
public class SpringContextUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }

    /**
     * @Description: 獲取spring容器中的bean, 通過bean類型獲取
     */
    public static <T> T getBean(Class<T> beanClass) {
        return applicationContext.getBean(beanClass);
    }

}

(8)編寫全局map常量類

package com.example.dzx.netty.qiyanproject.constants;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 500007
 * @ClassName:
 * @Description:
 * @date 2023年07月02日 19:12:42
 */
public class General {

    /**
     * 管理一個(gè)全局map,保存連接進(jìn)服務(wù)端的通道數(shù)量
     */
    public static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();

    /**
     * 管理一個(gè)全局mao, 報(bào)存連接進(jìn)服務(wù)器的各個(gè)通道類型
     */
    public static final ConcurrentHashMap<ChannelId, Boolean> CHANNEL_TYPE_MAP = new ConcurrentHashMap<>();

}

三、netty客戶端

(1)編寫netty客戶端,用于測試向服務(wù)端的消息發(fā)送

package com.example.dzx.netty.qiyanproject.Socket;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author dzx
 * @ClassName:
 * @Description: netty客戶端
 * @date 2023年06月30日 21:30:02
 */
public class SocketClient {
    // 服務(wù)端IP
    static final String HOST = System.getProperty("host", "127.0.0.1");

    // 服務(wù)端開放端口
    static final int PORT = Integer.parseInt(System.getProperty("port", "7777"));

    // 日志打印
    private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);

    // 主函數(shù)啟動(dòng)
    public static void main(String[] args) throws InterruptedException {
        sendMessage("我是客戶端,我發(fā)送了一條數(shù)據(jù)給netty服務(wù)端。。");
    }

    /**
     * 核心方法(處理:服務(wù)端向客戶端發(fā)送的數(shù)據(jù)、客戶端向服務(wù)端發(fā)送的數(shù)據(jù))
     */
    public static void sendMessage(String content) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new SocketChannelInitializer());
                        }
                    });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            for (int i = 0; i < 3; i++) {
                future.channel().writeAndFlush(content);
                Thread.sleep(2000);
            }
            // 程序阻塞
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}

(2)編寫netty客戶端初始化處理器

package com.example.dzx.netty.qiyanproject.Socket;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;


/**
 * @author dzx
 * @ClassName:
 * @Description: netty客戶端初始化時(shí)設(shè)置出站和入站的編碼器和解碼器
 * @date 2023年06月30日 21:30:02
 */
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        p.addLast(new SocketHandler());
    }
}

(3)netty客戶端業(yè)務(wù)處理器,用于接收并處理服務(wù)端發(fā)送的消息數(shù)據(jù)

package com.example.dzx.netty.qiyanproject.Socket;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;


/**
 * @author dzx
 * @ClassName:
 * @Description: netty客戶端處理器
 * @date 2023年06月30日 21:30:02
 */
@Slf4j
public class SocketHandler extends ChannelInboundHandlerAdapter {

    // 日志打印
    private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        LOGGER.debug("SocketHandler Active(客戶端)");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        LOGGER.debug("####接收服務(wù)端發(fā)送過來的消息####");
        LOGGER.debug("SocketHandler read Message:" + msg);
        //獲取服務(wù)端連接的遠(yuǎn)程地址
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        //獲取服務(wù)端的IP地址
        String clientIp = insocket.getAddress().getHostAddress();
        //獲取服務(wù)端的端口號(hào)
        int clientPort = insocket.getPort();
        log.info("netty服務(wù)端[IP:" + clientIp + "--->PORT:" + clientPort + "]");

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug("####客戶端斷開連接####");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

至此,netty服務(wù)端和netty客戶端都編寫完畢,我們可以來進(jìn)行測試了。

四、測試

(1)前端 websocket 向 后端NetAssist測試工具發(fā)送消息

SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

?SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

?在前端窗口向后端 發(fā)送了一個(gè) 22222的 字符串,后端測試工具成功接收到消息并展示在對(duì)話框中。

(2)后端NetAssist向 前端 websocket 發(fā)送消息

SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

?SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

??在后端窗口向前端 發(fā)送了一個(gè){"deviceId":"11111","deviceName":"qz-01","deviceStatus":"2"}的 字符串,前端測試工具成功接收到消息并展示在對(duì)話框中。

五、代碼倉庫地址

完整項(xiàng)目已上傳至gitee倉庫,請(qǐng)點(diǎn)擊下方傳送門自行獲取,一鍵三連!!

https://gitee.com/dzxmy/netty-web-socketd-dnamic

SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦),netty,springboot,websocket,spring boot,websocket,netty

無法訪問就點(diǎn)擊下方傳送門去我的資源下載即可

https://download.csdn.net/download/qq_31905135/88044942?spm=1001.2014.3001.5503文章來源地址http://www.zghlxwxcb.cn/news/detail-532767.html

到了這里,關(guān)于SpringBoot項(xiàng)目整合WebSocket+netty實(shí)現(xiàn)前后端雙向通信(同時(shí)支持前端webSocket和socket協(xié)議哦)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot+Netty+Websocket實(shí)現(xiàn)消息推送

    SpringBoot+Netty+Websocket實(shí)現(xiàn)消息推送

    這樣一個(gè)需求:把設(shè)備異常的狀態(tài)每10秒推送到頁面并且以彈窗彈出來,這個(gè)時(shí)候用Websocket最為合適,今天主要是后端代碼展示。 添加依賴 定義netty端口號(hào) netty服務(wù)器 Netty配置 管理全局Channel以及用戶對(duì)應(yīng)的channel(推送消息) 管道配置 自定義CustomChannelHandler 推送消息接口及

    2024年02月04日
    瀏覽(19)
  • Netty系列(一):Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn)

    Netty是由JBOSS提供的一個(gè)java開源框架,現(xiàn)為 Github上的獨(dú)立項(xiàng)目。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。 也就是說,Netty 是一個(gè)基于NIO的客戶、服務(wù)器端的編程框架,使用Netty 可以確保你快速和簡單

    2023年04月25日
    瀏覽(21)
  • Springboot中使用netty 實(shí)現(xiàn) WebSocket 服務(wù)

    依賴 創(chuàng)建啟動(dòng)類 創(chuàng)建WebSocket 服務(wù) WsServerInitialzer 初始化 創(chuàng)建信息ChatHandler 處理類

    2024年02月14日
    瀏覽(19)
  • Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn)

    Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn)

    新建springboot項(xiàng)目,并在項(xiàng)目以來中導(dǎo)入netty包,用fastjson包處理jsonStr。 創(chuàng)建netty相關(guān)配置信息文件 yml配置文件—— application.yml netty配置實(shí)體類—— NettyProperties 與yml配置文件綁定 通過 @ConfigurationProperties(prefix = \\\"netty\\\") 注解讀取配置文件中的netty配置,通過反射注入值,需要在

    2024年02月06日
    瀏覽(27)
  • Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器

    Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器

    try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new JsonEnco

    2024年04月09日
    瀏覽(26)
  • 基于Springboot用Netty實(shí)現(xiàn)WebSocket及用戶身份校驗(yàn)

    說在前頭,文本主要參考: SpringBoot+WebSocket+Netty實(shí)現(xiàn)消息推送 Netty-11-channelHandler的生命周期 springboot整合netty指北 首先 需要了解下channel建立的生命周期 ChannelHandler的順序如下: 注意本次實(shí)現(xiàn)的重點(diǎn)是:在建立websocket時(shí)從請(qǐng)求標(biāo)頭header或者第一次消息對(duì)話時(shí)獲取用戶信息(如jw

    2024年02月04日
    瀏覽(23)
  • SpringBoot+Netty+Vue+Websocket實(shí)現(xiàn)在線推送/聊天系統(tǒng)

    SpringBoot+Netty+Vue+Websocket實(shí)現(xiàn)在線推送/聊天系統(tǒng)

    ok,那么今天的話也是帶來這個(gè)非常常用的一個(gè)技術(shù),那就是咱們完成nutty的一個(gè)應(yīng)用,今天的話,我會(huì)介紹地很詳細(xì),這樣的話,拿到這個(gè)博文的代碼就基本上可以按照自己的想法去構(gòu)建自己的一個(gè)在線應(yīng)用了。比如聊天,在線消息推送之類的。其實(shí)一開始我原來的想法做在

    2024年02月03日
    瀏覽(26)
  • 基于Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天、群聊系統(tǒng)

    基于Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天、群聊系統(tǒng)

    此文主要實(shí)現(xiàn)在好友添加、建群、聊天對(duì)話、群聊功能,使用Java作為后端語言進(jìn)行支持,界面友好,開發(fā)簡單。 2.1、下載安裝IntelliJ IDEA(后端語言開發(fā)工具),Mysql數(shù)據(jù)庫,微信Web開發(fā)者工具。 1.創(chuàng)建maven project 先創(chuàng)建一個(gè)名為SpringBootDemo的項(xiàng)目,選擇【New Project】 然后在彈出

    2024年02月14日
    瀏覽(41)
  • netty學(xué)習(xí)(3):SpringBoot整合netty實(shí)現(xiàn)多個(gè)客戶端與服務(wù)器通信

    netty學(xué)習(xí)(3):SpringBoot整合netty實(shí)現(xiàn)多個(gè)客戶端與服務(wù)器通信

    創(chuàng)建一個(gè)SpringBoot工程,然后創(chuàng)建三個(gè)子模塊 整體工程目錄:一個(gè)server服務(wù)(netty服務(wù)器),兩個(gè)client服務(wù)(netty客戶端) pom文件引入netty依賴,springboot依賴 NettySpringBootApplication NettyServiceHandler SocketInitializer NettyServer NettyStartListener application.yml Client1 NettyClientHandler SocketInitializ

    2024年02月11日
    瀏覽(56)
  • Spring Boot整合WebSocket實(shí)現(xiàn)實(shí)時(shí)通信,前端實(shí)時(shí)通信,前后端實(shí)時(shí)通信

    實(shí)時(shí)通信在現(xiàn)代Web應(yīng)用中扮演著越來越重要的角色,無論是在線聊天、股票價(jià)格更新還是實(shí)時(shí)通知,WebSocket都是實(shí)現(xiàn)這些功能的關(guān)鍵技術(shù)之一。Spring Boot作為一個(gè)簡化企業(yè)級(jí)應(yīng)用開發(fā)的框架,其對(duì)WebSocket的支持也非常友好。本文將詳細(xì)介紹如何在Spring Boot中整合WebSocket,實(shí)現(xiàn)一

    2024年04月27日
    瀏覽(51)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包