国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

手寫RPC框架--5.Netty業(yè)務(wù)邏輯

這篇具有很好參考價值的文章主要介紹了手寫RPC框架--5.Netty業(yè)務(wù)邏輯。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RPC框架-Gitee代碼(麻煩點個Starred, 支持一下吧)
RPC框架-GitHub代碼(麻煩點個Starred, 支持一下吧)

5.Netty業(yè)務(wù)邏輯

a.加入基礎(chǔ)的Netty代碼

1.在DcyRpcBootstrap類的start()方法中加入netty代碼 (待完善)

/**
 * 啟動netty服務(wù)
 */
public void start() {
    // 1.創(chuàng)建EventLoopGroup,老板只負(fù)責(zé)處理請求,之后會將請求分發(fā)給worker,1比2的比例
    NioEventLoopGroup boss = new NioEventLoopGroup(2);
    NioEventLoopGroup worker = new NioEventLoopGroup(10);

    try{
        // 2.服務(wù)器端啟動輔助對象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置服務(wù)器
        serverBootstrap = serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // TODO 核心內(nèi)容,需要添加很多入棧和出棧的handler
                        socketChannel.pipeline().addLast(null);
                    }
                });

        // 4.綁定端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

        // 5.阻塞操作
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.在ReferenceConfig類的get()方法中加入netty代碼 (待完善)

/**
 * 代理設(shè)計模式,生成一個API接口的代理對象
 * @return 代理對象
 */
public T get() {
    // 使用動態(tài)代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用動態(tài)代理生成代理對象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
            // 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
            // 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
            }
            // 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果
            // 定義線程池 EventLoopGroup
            NioEventLoopGroup group = new NioEventLoopGroup();
            // 啟動一個客戶端需要一個輔助類 bootstrap
            Bootstrap bootstrap = new Bootstrap();

            try {
                bootstrap = bootstrap.group(group)
                        .remoteAddress(address)
                        // 選擇初始化一個什么樣的channel
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(null);
                            }
                        });


                // 3.連接到遠(yuǎn)程節(jié)點;等待連接完成
                ChannelFuture channelFuture = bootstrap.connect().sync();
                // 4.獲取channel并且寫數(shù)據(jù),發(fā)送消息到服務(wù)器端
                channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes(StandardCharsets.UTF_8)));
                // 5.阻塞程序,等待接收消息
                channelFuture.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                try {
                    group.shutdownGracefully().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            return null;
        }
    });

    return (T) helloProxy;
}

b.對通道channel進行緩存

每次啟動程序都會建立一個新的Netty連接,顯示是對不合適的
解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存

1.在DcyRpcBootstrap類的中添加一個全局的緩存:對通道進行緩存

// Netty的連接緩存
public static final Map<InetSocketAddress, Channel> CHANNEL_CACHE = new ConcurrentHashMap<>();

2.在ReferenceConfig類的get()方法中進行修改:查詢緩存是否存在通道(address),若未命中,則建立新的channel并進行緩存

/**
 * 代理設(shè)計模式,生成一個API接口的代理對象
 * @return 代理對象
 */
public T get() {
    // 使用動態(tài)代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用動態(tài)代理生成代理對象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
            // 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
            // 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
            }
            // 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果

            // 每次在這都會建立一個新的連接,對程序不合適
            // 解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存
            // 1.從全局緩存中獲取一個通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // 建立新的channel
                // 定義線程池 EventLoopGroup
                NioEventLoopGroup group = new NioEventLoopGroup();
                // 啟動一個客戶端需要一個輔助類 bootstrap
                Bootstrap bootstrap = new Bootstrap();

                try {
                    bootstrap = bootstrap.group(group)
                            .remoteAddress(address)
                            // 選擇初始化一個什么樣的channel
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    socketChannel.pipeline().addLast(null);
                                }
                            });


                    // 3.嘗試連接服務(wù)器
                    channel = bootstrap.connect().sync().channel();
                    // 緩存
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            if (channel == null){
                throw new NetworkException("獲取通道channel發(fā)生了異常。");
            }
            ChannelFuture channelFuture = channel.writeAndFlush(new Object());

            return null;
        }
    });

    return (T) helloProxy;
}

c.對代碼進行重構(gòu)優(yōu)化

1.在com.dcyrpc.discovery下創(chuàng)建NettyBootstrapInitializer類:提供Bootstrap的單例

/**
 * 提供Bootstrap的單例
 */
public class NettyBootstrapInitializer {

    private static final Bootstrap bootstrap = new Bootstrap();
    
    static {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                // 選擇初始化一個什么樣的channel
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(null);
                    }
                });
    }

    private NettyBootstrapInitializer() {
    }

    public static Bootstrap getBootstrap() {
        return bootstrap;
    }
}

2.在ReferenceConfig類的get()方法中進行代碼的優(yōu)化

/**
 * 代理設(shè)計模式,生成一個API接口的代理對象
 * @return 代理對象
 */
public T get() {
    // 使用動態(tài)代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用動態(tài)代理生成代理對象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
            // 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
            // 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
            }
            // 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果

            // 每次在這都會建立一個新的連接,對程序不合適
            // 解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存
            // 1.從全局緩存中獲取一個通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // await()方法會阻塞,會等待連接成功再返回
                // sync和await都是阻塞當(dāng)前線程,獲取返回值。因為連接過程和發(fā)送數(shù)據(jù)過程是異步的
                // 如果發(fā)生了異常,sync會主動在主線程拋出異常,await不會,異常在子線程中處理,需要使用future處理
//                    channel = NettyBootstrapInitializer.getBootstrap().connect(address).await().channel();

                // 使用addListener執(zhí)行異步操作
                CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                    if (promise.isDone()) {
                        // 異步的,已經(jīng)完成
                        log.info("已經(jīng)和【{}】成功建立連接。", address);
                        channelFuture.complete(promise.channel());
                    } else if (!promise.isSuccess()) {
                        channelFuture.completeExceptionally(promise.cause());
                    }
                });

                // 阻塞獲取channel
                channel = channelFuture.get(3, TimeUnit.SECONDS);

                // 緩存channel
                DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
            }
            if (channel == null){
                throw new NetworkException("獲取通道channel發(fā)生了異常。");
            }

            /**
             * ---------------------------同步策略---------------------------
             */
//                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
//                // get()阻塞獲取結(jié)果
//                // getNow()獲取當(dāng)前的結(jié)果,如果未處理完成,返回null
//                if (channelFuture.isDone()) {
//                    Object object = channelFuture.getNow();
//                } else if (!channelFuture.isSuccess()) {
//                    // 發(fā)生問題,需要捕獲異常。
//                    // 子線程可以捕獲異步任務(wù)的異常
//                    Throwable cause = channelFuture.cause();
//                    throw new RuntimeException(cause);
//                }

            /**
             * ---------------------------異步策略---------------------------
             */
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // TODO 需要將completableFuture暴露出去
            channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
                // 當(dāng)前的promise返回的結(jié)果是,writeAndFlush的返回結(jié)果
                // 一旦數(shù)據(jù)被寫出去,這個promise也就結(jié)束了
//                    if (promise.isDone()) {
//                        completableFuture.complete(promise.getNow());
//                    }

                // 只需要處理異常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            return completableFuture.get(3, TimeUnit.SECONDS);
        }
    });

    return (T) helloProxy;
}

d.完成基礎(chǔ)通信

1.在DcyRpcBootstrap類的start()方法中添加 handler:SimpleChannelInboundHandler

/**
 * 啟動netty服務(wù)
 */
public void start() {
    // 1.創(chuàng)建EventLoopGroup,老板只負(fù)責(zé)處理請求,之后會將請求分發(fā)給worker,1比2的比例
    NioEventLoopGroup boss = new NioEventLoopGroup(2);
    NioEventLoopGroup worker = new NioEventLoopGroup(10);

    try{
        // 2.服務(wù)器端啟動輔助對象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置服務(wù)器
        serverBootstrap = serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // TODO 核心內(nèi)容,需要添加很多入棧和出棧的handler
                        socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info("byteBuf --> {}", byteBuf.toString(Charset.defaultCharset()));

                                channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer("dcyrpc--hello".getBytes()));
                            }
                        });
                    }
                });

        // 4.綁定端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

        // 5.阻塞操作
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.在NettyBootstrapInitializer類的初始化Netty的靜態(tài)代碼塊中添加 handler:SimpleChannelInboundHandler

static {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 選擇初始化一個什么樣的channel
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                            log.info("msg --> {}", msg.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
}

e.異步獲取服務(wù)器的返回結(jié)果

1.在DcyRpcBootstrap類的中添加一個全局的對外掛起的 completableFuture

// 定義全局的對外掛起的 completableFuture
public static final Map<Long, CompletableFuture<Object>> PENDING_REQUEST = new HashMap<>(128);

2.在ReferenceConfig類中的get()方法完成對,completableFuture暴露出去

/**
 * 代理設(shè)計模式,生成一個API接口的代理對象
 * @return 代理對象
 */
public T get() {
    // 使用動態(tài)代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用動態(tài)代理生成代理對象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
            // 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
            // 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
            }
            
            // 1.從全局緩存中獲取一個通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // 使用addListener執(zhí)行異步操作
                CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                    if (promise.isDone()) {
                        // 異步的,已經(jīng)完成
                        log.info("已經(jīng)和【{}】成功建立連接。", address);
                        channelFuture.complete(promise.channel());
                    } else if (!promise.isSuccess()) {
                        channelFuture.completeExceptionally(promise.cause());
                    }
                });

                // 阻塞獲取channel
                channel = channelFuture.get(3, TimeUnit.SECONDS);

                // 緩存channel
                DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
            }
            if (channel == null){
                log.error("獲取或建立與【{}】通道時發(fā)生了異常。", address);
                throw new NetworkException("獲取通道時發(fā)生了異常。");
            }

            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // TODO 需要將completableFuture暴露出去
            DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);

            channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {

                // 只需要處理異常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            // 如果沒有地方處理這個completableFuture,這里會阻塞等待 complete 方法的執(zhí)行
            // 在Netty的pipeline中最終的handler的處理結(jié)果 調(diào)用complete
            return completableFuture.get(10, TimeUnit.SECONDS);
        }
    });

    return (T) helloProxy;
}

3.在NettyBootstrapInitializer類的初始化Netty的靜態(tài)代碼塊中:尋找與之匹配的待處理 completeFuture

tatic {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 選擇初始化一個什么樣的channel
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                            // 異步
                            // 服務(wù)提供方,給予的結(jié)果
                            String result = msg.toString(Charset.defaultCharset());
                            // 從全局的掛起的請求中,尋找與之匹配的待處理 completeFuture
                            CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
                            completableFuture.complete(result);
                        }
                    });
                }
            });
}

f.調(diào)整代碼

在core模塊com.dcyrpc下創(chuàng)建proxy.handler

在handler包下創(chuàng)建RpcConsumerInvocationHandler類,實現(xiàn)InvocationHandler接口

  • ReferenceConfig類下的InvocationHandler匿名內(nèi)部類拷貝到該RpcConsumerInvocationHandler類中
/**
 * 該類封裝了客戶端通信的基礎(chǔ)邏輯,每一個代理對象的遠(yuǎn)程調(diào)用過程都封裝在invoke方法中
 * 1.發(fā)現(xiàn)可用服務(wù)
 * 2.建立連接
 * 3.發(fā)送請求
 * 4.得到結(jié)果
 */
@Slf4j
public class RpcConsumerInvocationHandler implements InvocationHandler {

    // 接口
    private Class<?> interfaceRef;

    // 注冊中心
    private Registry registry;

    public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry) {
        this.interfaceRef = interfaceRef;
        this.registry = registry;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
        //  - 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
        InetSocketAddress address = registry.lookup(interfaceRef.getName());
        if (log.isInfoEnabled()){
            log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
        }

        // 2.嘗試獲取一個可用的通道
        Channel channel = getAvailableChannel(address);
        if (log.isInfoEnabled()){
            log.info("獲取了和【{}】建立的連接通道,準(zhǔn)備發(fā)送數(shù)據(jù)", address);
        }

        /**
         * ---------------------------封裝報文---------------------------
         */
        // 3.封裝報文

        /**
         * ---------------------------同步策略---------------------------
         */
//                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
//                // get()阻塞獲取結(jié)果
//                // getNow()獲取當(dāng)前的結(jié)果,如果未處理完成,返回null
//                if (channelFuture.isDone()) {
//                    Object object = channelFuture.getNow();
//                } else if (!channelFuture.isSuccess()) {
//                    // 發(fā)生問題,需要捕獲異常。
//                    // 子線程可以捕獲異步任務(wù)的異常
//                    Throwable cause = channelFuture.cause();
//                    throw new RuntimeException(cause);
//                }

        /**
         * ---------------------------異步策略---------------------------
         */

        // 4.寫出報文
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        // 將completableFuture暴露出去
        DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);

        channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
            // 需要處理異常
            if (!promise.isSuccess()) {
                completableFuture.completeExceptionally(promise.cause());
            }
        });

        // 如果沒有地方處理這個completableFuture,這里會阻塞等待 complete 方法的執(zhí)行
        // 在Netty的pipeline中最終的handler的處理結(jié)果 調(diào)用complete
        // 5.獲得響應(yīng)的結(jié)果
        return completableFuture.get(10, TimeUnit.SECONDS);
    }

    /**
     * 根據(jù)地址獲取一個可用的通道
     * @param address
     * @return
     */
    private Channel getAvailableChannel(InetSocketAddress address) {
        // 1.嘗試從緩存中獲取通道
        Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

        // 2.拿不到就建立新連接
        if (channel == null) {

            // 使用addListener執(zhí)行異步操作
            CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
            NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                if (promise.isDone()) {
                    // 異步的,已經(jīng)完成
                    log.info("已經(jīng)和【{}】成功建立連接。", address);
                    channelFuture.complete(promise.channel());
                } else if (!promise.isSuccess()) {
                    channelFuture.completeExceptionally(promise.cause());
                }
            });

            // 阻塞獲取channel
            try {
                channel = channelFuture.get(3, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("獲取通道時發(fā)生異常。{}", e);
                throw new DiscoveryException(e);
            }

            // 緩存channel
            DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
        }

        // 3.建立連接失敗
        if (channel == null){
            log.error("獲取或建立與【{}】通道時發(fā)生了異常。", address);
            throw new NetworkException("獲取通道時發(fā)生了異常。");
        }

        // 4.返回通道
        return channel;
    }
}

ReferenceConfig類的get()方法被修改為:讓整個代碼可讀性更高,更簡潔

/**
 * 代理設(shè)計模式,生成一個API接口的代理對象
 * @return 代理對象
 */
public T get() {
    // 使用動態(tài)代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class<T>[] classes = new Class[]{interfaceRef};
    InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry);

    // 使用動態(tài)代理生成代理對象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);

    return (T) helloProxy;
}

g.處理handler (優(yōu)化)

在core模塊com.dcyrpc下創(chuàng)建channelhandler.handler

channelhandler.handler包下創(chuàng)建MySimpleChannelInboundHandler類:處理響應(yīng)結(jié)果

繼承 SimpleChannelInboundHandler<ByteBuf>,重寫read0方法

拷貝NettyBootstrapInitializer靜態(tài)代碼塊中的匿名內(nèi)部類SimpleChannelInboundHandler的代碼

public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        // 異步
        // 服務(wù)提供方,給予的結(jié)果
        String result = msg.toString(Charset.defaultCharset());
        // 從全局的掛起的請求中,尋找與之匹配的待處理 completeFuture
        CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
        completableFuture.complete(result);
    }
}

channelhandler包下創(chuàng)建ConsumerChannelInitializer,繼承 ChannelInitializer<SocketChannel>,重寫initChannel方法

拷貝NettyBootstrapInitializer靜態(tài)代碼塊中的匿名內(nèi)部類ChannelInitializer的代碼

public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new MySimpleChannelInboundHandler());
    }
}

NettyBootstrapInitializer類的初始化Netty的靜態(tài)代碼塊中:優(yōu)化handler的匿名內(nèi)部類文章來源地址http://www.zghlxwxcb.cn/news/detail-703330.html

static {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 選擇初始化一個什么樣的channel
            .channel(NioSocketChannel.class)
            .handler(new ConsumerChannelInitializer());
}

到了這里,關(guān)于手寫RPC框架--5.Netty業(yè)務(wù)邏輯的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Java框架】RPC遠(yuǎn)程調(diào)用

    RPC(Remote Procedure Call)叫作遠(yuǎn)程過程調(diào)用,它是利用網(wǎng)絡(luò)從遠(yuǎn)程計算機上請求服務(wù),可以理解為把程序的一部分放在其他遠(yuǎn)程計算機上執(zhí)行。通過網(wǎng)絡(luò)通信將調(diào)用請求發(fā)送至遠(yuǎn)程計算機后,利用遠(yuǎn)程計算機的系統(tǒng)資源執(zhí)行這部分程序,最終返回遠(yuǎn)程計算機上的執(zhí)行結(jié)果。 RP

    2024年02月15日
    瀏覽(30)
  • 10 - 網(wǎng)絡(luò)通信優(yōu)化之通信協(xié)議:如何優(yōu)化RPC網(wǎng)絡(luò)通信?

    10 - 網(wǎng)絡(luò)通信優(yōu)化之通信協(xié)議:如何優(yōu)化RPC網(wǎng)絡(luò)通信?

    微服務(wù)框架中 SpringCloud 和 Dubbo 的使用最為廣泛,行業(yè)內(nèi)也一直存在著對兩者的比較,很多技術(shù)人會為這兩個框架哪個更好而爭辯。 我記得我們部門在搭建微服務(wù)框架時,也在技術(shù)選型上糾結(jié)良久,還曾一度有過激烈的討論。當(dāng)前 SpringCloud 炙手可熱,具備完整的微服務(wù)生態(tài),

    2024年02月11日
    瀏覽(24)
  • Netty優(yōu)化-rpc

    1.3 RPC 框架 1)準(zhǔn)備工作 這些代碼可以認(rèn)為是現(xiàn)成的,無需從頭編寫練習(xí) 為了簡化起見,在原來聊天項目的基礎(chǔ)上新增 Rpc 請求和響應(yīng)消息 請求消息 響應(yīng)消息 服務(wù)器架子 服務(wù)器 handler 客戶端架子 客戶端handler 服務(wù)器端的 service 獲取 相關(guān)配置 application.properties 業(yè)務(wù)類 計數(shù)器

    2024年02月08日
    瀏覽(17)
  • RPC分布式網(wǎng)絡(luò)通信框架(二)—— moduo網(wǎng)絡(luò)解析

    RPC分布式網(wǎng)絡(luò)通信框架(二)—— moduo網(wǎng)絡(luò)解析

    網(wǎng)絡(luò)部分,包括尋找rpc服務(wù)主機,發(fā)起rpc調(diào)用請求和響應(yīng)rpc調(diào)用結(jié)果,使用muduo網(wǎng)絡(luò)和zookeeper 服務(wù)配置中心 (專門做服務(wù)發(fā)現(xiàn)) 其中MprpcApplication類負(fù)責(zé)框架的一些初始化操作,注意去除類拷貝構(gòu)造和移動構(gòu)造函數(shù)(實現(xiàn)單例模式)。其中項目還構(gòu)建了MprpcConfig類負(fù)責(zé)讀取服

    2024年02月17日
    瀏覽(27)
  • 基于netty的rpc遠(yuǎn)程調(diào)用

    ??????這是一個手寫RPC項目,用于實現(xiàn)遠(yuǎn)程過程調(diào)用(RPC)通信?????? 歡迎star串門 : https://github.com/red-velet/ ??Q-PRC 簡單的RPC框架的實現(xiàn) :該RPC框架實現(xiàn)了基本的遠(yuǎn)程過程調(diào)用功能,允許客戶端通過網(wǎng)絡(luò)調(diào)用遠(yuǎn)程服務(wù)的方法,實現(xiàn)分布式系統(tǒng)之間的通信和協(xié)作。 基于

    2024年02月14日
    瀏覽(19)
  • RPC分布式網(wǎng)絡(luò)通信框架(一)—— protobuf的使用

    RPC分布式網(wǎng)絡(luò)通信框架(一)—— protobuf的使用

    常見序列化和反序列化協(xié)議有XML、JSON、protobuf,相比于其他protobuf更有優(yōu)勢: 1、protobuf是二進制存儲的,xml和json都是文本存儲的。故protobuf占用帶寬較低 2、protobuf不需要存儲額外的信息。 json如何存儲數(shù)據(jù)?鍵值對。例:Name:”zhang san”, pwd: “12345”。 protobuf存儲數(shù)據(jù)的方式

    2024年02月16日
    瀏覽(27)
  • 【Flink網(wǎng)絡(luò)通訊(一)】Flink RPC框架的整體設(shè)計

    【Flink網(wǎng)絡(luò)通訊(一)】Flink RPC框架的整體設(shè)計

    我們從整體的角度看一下Flink RPC通信框架的設(shè)計與實現(xiàn),了解其底層Akka通信框架的基礎(chǔ)概念及二者之間的關(guān)系。 ? Akka是使用Scala語言編寫的庫,用于在JVM上簡化編寫具有可容錯、高可伸縮性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一個用于構(gòu)建可擴展、彈性、快速響

    2024年02月21日
    瀏覽(21)
  • 手寫簡單的RPC

    手寫簡單的RPC

    RPC(Remote Procedure Call,遠(yuǎn)程過程調(diào)用)是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計算機程序上請求服務(wù),而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡(luò)通信模型中,RPC跨越了傳輸層和應(yīng)用層。RPC使得開發(fā)包括網(wǎng)絡(luò)分布

    2024年04月22日
    瀏覽(19)
  • 如何手寫一個RPC?

    如何手寫一個RPC?

    在學(xué)習(xí) RPC 框架之前,我們先來手寫一個RPC。 我們在學(xué)習(xí)的過程中,一定要做到知其然,還要知其所以然。 單體架構(gòu) 要知道,在以前單體架構(gòu)的時候,會將所有的應(yīng)用功能都集中在一個服務(wù)當(dāng)中。 單體架構(gòu)初始開發(fā)簡單,所有的功能都在一個項目中,容易理解整個應(yīng)用的業(yè)

    2024年01月17日
    瀏覽(19)
  • 手寫rpc和redis

    rpc框架搭建 consumer 消費者應(yīng)用 provider 提供的服務(wù) Provider-common 公共類模塊 rpc 架構(gòu) service-Registration 服務(wù)發(fā)現(xiàn) nacos nacos配置中心 load-balancing 負(fù)載均衡 redis-trench 手寫redis實現(xiàn)和鏈接 rpc框架核心代碼 相關(guān)的gitub倉庫地址:(https://github.com/zhaoyiwen-wuxian/RpcTrench.git) master分支,進行切

    2024年01月24日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包