RPC框架-Gitee代碼(麻煩點個Starred, 支持一下吧)
RPC框架-GitHub代碼(麻煩點個Starred, 支持一下吧)
5.Netty業(yè)務(wù)邏輯
a.加入基礎(chǔ)的Netty代碼
1.在DcyRpcBootstrap
類的start()
方法中加入netty代碼 (待完善)
/**
* 啟動netty服務(wù)
*/
public void start() {
// 1.創(chuàng)建EventLoopGroup,老板只負(fù)責(zé)處理請求,之后會將請求分發(fā)給worker,1比2的比例
NioEventLoopGroup boss = new NioEventLoopGroup(2);
NioEventLoopGroup worker = new NioEventLoopGroup(10);
try{
// 2.服務(wù)器端啟動輔助對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3.配置服務(wù)器
serverBootstrap = serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// TODO 核心內(nèi)容,需要添加很多入棧和出棧的handler
socketChannel.pipeline().addLast(null);
}
});
// 4.綁定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 5.阻塞操作
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.在ReferenceConfig
類的get()
方法中加入netty代碼 (待完善)
/**
* 代理設(shè)計模式,生成一個API接口的代理對象
* @return 代理對象
*/
public T get() {
// 使用動態(tài)代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class[] classes = new Class[]{interfaceRef};
// 使用動態(tài)代理生成代理對象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
// 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
log.info("method-->{}", method.getName());
log.info("args-->{}", args);
// 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
// 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
InetSocketAddress address = registry.lookup(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
}
// 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果
// 定義線程池 EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
// 啟動一個客戶端需要一個輔助類 bootstrap
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap = bootstrap.group(group)
.remoteAddress(address)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(null);
}
});
// 3.連接到遠(yuǎn)程節(jié)點;等待連接完成
ChannelFuture channelFuture = bootstrap.connect().sync();
// 4.獲取channel并且寫數(shù)據(jù),發(fā)送消息到服務(wù)器端
channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes(StandardCharsets.UTF_8)));
// 5.阻塞程序,等待接收消息
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return null;
}
});
return (T) helloProxy;
}
b.對通道channel進行緩存
每次啟動程序都會建立一個新的Netty連接,顯示是對不合適的
解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存
1.在DcyRpcBootstrap
類的中添加一個全局的緩存:對通道進行緩存
// Netty的連接緩存
public static final Map<InetSocketAddress, Channel> CHANNEL_CACHE = new ConcurrentHashMap<>();
2.在ReferenceConfig
類的get()
方法中進行修改:查詢緩存是否存在通道(address),若未命中,則建立新的channel并進行緩存
/**
* 代理設(shè)計模式,生成一個API接口的代理對象
* @return 代理對象
*/
public T get() {
// 使用動態(tài)代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class[] classes = new Class[]{interfaceRef};
// 使用動態(tài)代理生成代理對象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
// 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
log.info("method-->{}", method.getName());
log.info("args-->{}", args);
// 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
// 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
InetSocketAddress address = registry.lookup(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
}
// 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果
// 每次在這都會建立一個新的連接,對程序不合適
// 解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存
// 1.從全局緩存中獲取一個通道
Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
if (channel == null) {
// 建立新的channel
// 定義線程池 EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
// 啟動一個客戶端需要一個輔助類 bootstrap
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap = bootstrap.group(group)
.remoteAddress(address)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(null);
}
});
// 3.嘗試連接服務(wù)器
channel = bootstrap.connect().sync().channel();
// 緩存
DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (channel == null){
throw new NetworkException("獲取通道channel發(fā)生了異常。");
}
ChannelFuture channelFuture = channel.writeAndFlush(new Object());
return null;
}
});
return (T) helloProxy;
}
c.對代碼進行重構(gòu)優(yōu)化
1.在com.dcyrpc.discovery
下創(chuàng)建NettyBootstrapInitializer
類:提供Bootstrap的單例
/**
* 提供Bootstrap的單例
*/
public class NettyBootstrapInitializer {
private static final Bootstrap bootstrap = new Bootstrap();
static {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(null);
}
});
}
private NettyBootstrapInitializer() {
}
public static Bootstrap getBootstrap() {
return bootstrap;
}
}
2.在ReferenceConfig
類的get()
方法中進行代碼的優(yōu)化
/**
* 代理設(shè)計模式,生成一個API接口的代理對象
* @return 代理對象
*/
public T get() {
// 使用動態(tài)代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class[] classes = new Class[]{interfaceRef};
// 使用動態(tài)代理生成代理對象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
// 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
log.info("method-->{}", method.getName());
log.info("args-->{}", args);
// 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
// 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
InetSocketAddress address = registry.lookup(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
}
// 2.使用netty連接服務(wù)器,發(fā)送 調(diào)用的 服務(wù)名字+方法名字+參數(shù)列表,得到結(jié)果
// 每次在這都會建立一個新的連接,對程序不合適
// 解決方案:緩存channel,嘗試從緩存中獲取channel。如果為空,則創(chuàng)建新的連接并進行緩存
// 1.從全局緩存中獲取一個通道
Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
if (channel == null) {
// await()方法會阻塞,會等待連接成功再返回
// sync和await都是阻塞當(dāng)前線程,獲取返回值。因為連接過程和發(fā)送數(shù)據(jù)過程是異步的
// 如果發(fā)生了異常,sync會主動在主線程拋出異常,await不會,異常在子線程中處理,需要使用future處理
// channel = NettyBootstrapInitializer.getBootstrap().connect(address).await().channel();
// 使用addListener執(zhí)行異步操作
CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
if (promise.isDone()) {
// 異步的,已經(jīng)完成
log.info("已經(jīng)和【{}】成功建立連接。", address);
channelFuture.complete(promise.channel());
} else if (!promise.isSuccess()) {
channelFuture.completeExceptionally(promise.cause());
}
});
// 阻塞獲取channel
channel = channelFuture.get(3, TimeUnit.SECONDS);
// 緩存channel
DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
}
if (channel == null){
throw new NetworkException("獲取通道channel發(fā)生了異常。");
}
/**
* ---------------------------同步策略---------------------------
*/
// ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
// // get()阻塞獲取結(jié)果
// // getNow()獲取當(dāng)前的結(jié)果,如果未處理完成,返回null
// if (channelFuture.isDone()) {
// Object object = channelFuture.getNow();
// } else if (!channelFuture.isSuccess()) {
// // 發(fā)生問題,需要捕獲異常。
// // 子線程可以捕獲異步任務(wù)的異常
// Throwable cause = channelFuture.cause();
// throw new RuntimeException(cause);
// }
/**
* ---------------------------異步策略---------------------------
*/
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// TODO 需要將completableFuture暴露出去
channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
// 當(dāng)前的promise返回的結(jié)果是,writeAndFlush的返回結(jié)果
// 一旦數(shù)據(jù)被寫出去,這個promise也就結(jié)束了
// if (promise.isDone()) {
// completableFuture.complete(promise.getNow());
// }
// 只需要處理異常
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
return completableFuture.get(3, TimeUnit.SECONDS);
}
});
return (T) helloProxy;
}
d.完成基礎(chǔ)通信
1.在DcyRpcBootstrap
類的start()
方法中添加 handler:SimpleChannelInboundHandler
/**
* 啟動netty服務(wù)
*/
public void start() {
// 1.創(chuàng)建EventLoopGroup,老板只負(fù)責(zé)處理請求,之后會將請求分發(fā)給worker,1比2的比例
NioEventLoopGroup boss = new NioEventLoopGroup(2);
NioEventLoopGroup worker = new NioEventLoopGroup(10);
try{
// 2.服務(wù)器端啟動輔助對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3.配置服務(wù)器
serverBootstrap = serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// TODO 核心內(nèi)容,需要添加很多入棧和出棧的handler
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("byteBuf --> {}", byteBuf.toString(Charset.defaultCharset()));
channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer("dcyrpc--hello".getBytes()));
}
});
}
});
// 4.綁定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 5.阻塞操作
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.在NettyBootstrapInitializer
類的初始化Netty的靜態(tài)代碼塊中添加 handler:SimpleChannelInboundHandler
static {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
log.info("msg --> {}", msg.toString(Charset.defaultCharset()));
}
});
}
});
}
e.異步獲取服務(wù)器的返回結(jié)果
1.在DcyRpcBootstrap
類的中添加一個全局的對外掛起的 completableFuture
// 定義全局的對外掛起的 completableFuture
public static final Map<Long, CompletableFuture<Object>> PENDING_REQUEST = new HashMap<>(128);
2.在ReferenceConfig
類中的get()
方法完成對,completableFuture暴露出去
/**
* 代理設(shè)計模式,生成一個API接口的代理對象
* @return 代理對象
*/
public T get() {
// 使用動態(tài)代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class[] classes = new Class[]{interfaceRef};
// 使用動態(tài)代理生成代理對象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 調(diào)用sayHi()方法,事實上會走進這個代碼段當(dāng)中
// 已經(jīng)知道m(xù)ethod(具體的方法),args(參數(shù)列表)
log.info("method-->{}", method.getName());
log.info("args-->{}", args);
// 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
// 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
InetSocketAddress address = registry.lookup(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
}
// 1.從全局緩存中獲取一個通道
Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
if (channel == null) {
// 使用addListener執(zhí)行異步操作
CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
if (promise.isDone()) {
// 異步的,已經(jīng)完成
log.info("已經(jīng)和【{}】成功建立連接。", address);
channelFuture.complete(promise.channel());
} else if (!promise.isSuccess()) {
channelFuture.completeExceptionally(promise.cause());
}
});
// 阻塞獲取channel
channel = channelFuture.get(3, TimeUnit.SECONDS);
// 緩存channel
DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
}
if (channel == null){
log.error("獲取或建立與【{}】通道時發(fā)生了異常。", address);
throw new NetworkException("獲取通道時發(fā)生了異常。");
}
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// TODO 需要將completableFuture暴露出去
DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
// 只需要處理異常
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
// 如果沒有地方處理這個completableFuture,這里會阻塞等待 complete 方法的執(zhí)行
// 在Netty的pipeline中最終的handler的處理結(jié)果 調(diào)用complete
return completableFuture.get(10, TimeUnit.SECONDS);
}
});
return (T) helloProxy;
}
3.在NettyBootstrapInitializer
類的初始化Netty的靜態(tài)代碼塊中:尋找與之匹配的待處理 completeFuture
tatic {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 異步
// 服務(wù)提供方,給予的結(jié)果
String result = msg.toString(Charset.defaultCharset());
// 從全局的掛起的請求中,尋找與之匹配的待處理 completeFuture
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
completableFuture.complete(result);
}
});
}
});
}
f.調(diào)整代碼
在core模塊com.dcyrpc
下創(chuàng)建proxy.handler
包
在handler包下創(chuàng)建RpcConsumerInvocationHandler
類,實現(xiàn)InvocationHandler
接口
- 把
ReferenceConfig
類下的InvocationHandler
匿名內(nèi)部類拷貝到該RpcConsumerInvocationHandler
類中
/**
* 該類封裝了客戶端通信的基礎(chǔ)邏輯,每一個代理對象的遠(yuǎn)程調(diào)用過程都封裝在invoke方法中
* 1.發(fā)現(xiàn)可用服務(wù)
* 2.建立連接
* 3.發(fā)送請求
* 4.得到結(jié)果
*/
@Slf4j
public class RpcConsumerInvocationHandler implements InvocationHandler {
// 接口
private Class<?> interfaceRef;
// 注冊中心
private Registry registry;
public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry) {
this.interfaceRef = interfaceRef;
this.registry = registry;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1.發(fā)現(xiàn)服務(wù),從注冊中心,尋找一個可用的服務(wù)
// - 傳入服務(wù)的名字,返回ip+端口 (InetSocketAddress可以封裝端口/ip/host name)
InetSocketAddress address = registry.lookup(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服務(wù)調(diào)用方,發(fā)現(xiàn)了服務(wù){(diào)}的可用主機{}", interfaceRef.getName(), address);
}
// 2.嘗試獲取一個可用的通道
Channel channel = getAvailableChannel(address);
if (log.isInfoEnabled()){
log.info("獲取了和【{}】建立的連接通道,準(zhǔn)備發(fā)送數(shù)據(jù)", address);
}
/**
* ---------------------------封裝報文---------------------------
*/
// 3.封裝報文
/**
* ---------------------------同步策略---------------------------
*/
// ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
// // get()阻塞獲取結(jié)果
// // getNow()獲取當(dāng)前的結(jié)果,如果未處理完成,返回null
// if (channelFuture.isDone()) {
// Object object = channelFuture.getNow();
// } else if (!channelFuture.isSuccess()) {
// // 發(fā)生問題,需要捕獲異常。
// // 子線程可以捕獲異步任務(wù)的異常
// Throwable cause = channelFuture.cause();
// throw new RuntimeException(cause);
// }
/**
* ---------------------------異步策略---------------------------
*/
// 4.寫出報文
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// 將completableFuture暴露出去
DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
// 需要處理異常
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
// 如果沒有地方處理這個completableFuture,這里會阻塞等待 complete 方法的執(zhí)行
// 在Netty的pipeline中最終的handler的處理結(jié)果 調(diào)用complete
// 5.獲得響應(yīng)的結(jié)果
return completableFuture.get(10, TimeUnit.SECONDS);
}
/**
* 根據(jù)地址獲取一個可用的通道
* @param address
* @return
*/
private Channel getAvailableChannel(InetSocketAddress address) {
// 1.嘗試從緩存中獲取通道
Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
// 2.拿不到就建立新連接
if (channel == null) {
// 使用addListener執(zhí)行異步操作
CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
if (promise.isDone()) {
// 異步的,已經(jīng)完成
log.info("已經(jīng)和【{}】成功建立連接。", address);
channelFuture.complete(promise.channel());
} else if (!promise.isSuccess()) {
channelFuture.completeExceptionally(promise.cause());
}
});
// 阻塞獲取channel
try {
channel = channelFuture.get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("獲取通道時發(fā)生異常。{}", e);
throw new DiscoveryException(e);
}
// 緩存channel
DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
}
// 3.建立連接失敗
if (channel == null){
log.error("獲取或建立與【{}】通道時發(fā)生了異常。", address);
throw new NetworkException("獲取通道時發(fā)生了異常。");
}
// 4.返回通道
return channel;
}
}
ReferenceConfig
類的get()
方法被修改為:讓整個代碼可讀性更高,更簡潔
/**
* 代理設(shè)計模式,生成一個API接口的代理對象
* @return 代理對象
*/
public T get() {
// 使用動態(tài)代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<T>[] classes = new Class[]{interfaceRef};
InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry);
// 使用動態(tài)代理生成代理對象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);
return (T) helloProxy;
}
g.處理handler (優(yōu)化)
在core模塊com.dcyrpc
下創(chuàng)建channelhandler.handler
包
在channelhandler.handler
包下創(chuàng)建MySimpleChannelInboundHandler
類:處理響應(yīng)結(jié)果
繼承 SimpleChannelInboundHandler<ByteBuf>
,重寫read0
方法
拷貝NettyBootstrapInitializer
靜態(tài)代碼塊中的匿名內(nèi)部類SimpleChannelInboundHandler
的代碼
public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 異步
// 服務(wù)提供方,給予的結(jié)果
String result = msg.toString(Charset.defaultCharset());
// 從全局的掛起的請求中,尋找與之匹配的待處理 completeFuture
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
completableFuture.complete(result);
}
}
在channelhandler
包下創(chuàng)建ConsumerChannelInitializer
,繼承 ChannelInitializer<SocketChannel>
,重寫initChannel
方法
拷貝NettyBootstrapInitializer
靜態(tài)代碼塊中的匿名內(nèi)部類ChannelInitializer
的代碼文章來源:http://www.zghlxwxcb.cn/news/detail-703330.html
public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MySimpleChannelInboundHandler());
}
}
在NettyBootstrapInitializer
類的初始化Netty的靜態(tài)代碼塊中:優(yōu)化handler的匿名內(nèi)部類文章來源地址http://www.zghlxwxcb.cn/news/detail-703330.html
static {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
// 選擇初始化一個什么樣的channel
.channel(NioSocketChannel.class)
.handler(new ConsumerChannelInitializer());
}
到了這里,關(guān)于手寫RPC框架--5.Netty業(yè)務(wù)邏輯的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!