1. RPC基本介紹
-
RPC(Remote Procedure Call)
:遠(yuǎn)程 過程調(diào)用,是一個計(jì)算機(jī) 通信協(xié)議。該協(xié)議允許運(yùn) 行于一臺計(jì)算機(jī)的程序調(diào) 用另一臺計(jì)算機(jī)的子程序, 而程序員無需額外地為這 個交互作用編程 -
兩個或多個應(yīng)用程序都分 布在不同的服務(wù)器上,它 們之間的調(diào)用都像是本地 方法調(diào)用一樣(如圖)
-
常見的 RPC 框架有: 比較知名的如阿里的Dubbo、google的gRPC、Go語言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。
2. RPC調(diào)用流程圖
術(shù)語說明:在RPC中,
- Client叫服務(wù)消費(fèi)者
- Server叫服務(wù)提供者
3. PRC調(diào)用流程說明
- **服務(wù)消費(fèi)方(client)**以本地調(diào)用方式調(diào)用服務(wù)
- client stub 接收到調(diào)用后負(fù)責(zé)將方法、參數(shù)等封裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw
- client stub 將消息進(jìn)行編碼并發(fā)送到服務(wù)端
- server stub 收到消息后進(jìn)行解碼
- server stub 根據(jù)解碼結(jié)果調(diào)用本地的服務(wù)
- 地服務(wù)執(zhí)行并將結(jié)果返回給 server stub
- server stub 將返回導(dǎo)入結(jié)果進(jìn)行編碼并發(fā)送至消費(fèi)方
- client stub 接收到消息并進(jìn)行解碼
- 服務(wù)消費(fèi)方(client)得到結(jié)果
小結(jié):RPC 的目標(biāo)就是將 2-8 這些步驟都封裝起來,用戶無需關(guān)心這些細(xì)節(jié),可以像調(diào)用本地方法一樣即可完成遠(yuǎn)程服務(wù)調(diào)用。
4. 自己實(shí)現(xiàn) dubbo RPC(基于Netty)
需求說明
- dubbo 底層使用了 Netty 作為網(wǎng)絡(luò)通訊框架,要求用Netty 實(shí)現(xiàn)一個簡單的RPC框架
- 模仿 dubbo,消費(fèi)者和提供者約定接口和協(xié)議,消費(fèi)者遠(yuǎn)程調(diào)用提供者的服務(wù),提供者返回一個字符串,消費(fèi)者打印提供者返回的數(shù)據(jù)。底層網(wǎng)絡(luò)通信使用Netty4.1.20
設(shè)計(jì)說明
- 創(chuàng)建一個接口,定義抽象方法。用于消費(fèi)者和提供者之間的約定。
- 創(chuàng)建一個提供者,該類需要監(jiān)聽消費(fèi)者的請求,并按照約定返回?cái)?shù)據(jù)。
- 創(chuàng)建一個消費(fèi)者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用Netty請求提供者返回?cái)?shù)據(jù)
4.1 公共接口 publicinterface包
4.1.1 HelloService
package site.zhourui.nioAndNetty.netty.dubborpc.publicinterface;
//這個是接口,是服務(wù)提供方和 服務(wù)消費(fèi)方都需要
public interface HelloService {
String hello(String msg);
}
4.2 遠(yuǎn)程調(diào)用netty包
本質(zhì)上就是客戶端訪問服務(wù)端
4.2.1 NettyClientHandler
- 我們實(shí)現(xiàn)了Callable方法
- setPara(String para)方法: 設(shè)置要發(fā)給服務(wù)器端的信息
- 我們將ctx在channelActive時抽取為全局對象context,方便我們在其他方法也能使用(這里就是call方法)
- call方法:
- 開啟子線程向服務(wù)端發(fā)送消息
- 發(fā)送完成后該子線程進(jìn)行wait,等待服務(wù)提供方處理并返回?cái)?shù)據(jù)(被喚醒)
- 喚醒后打印服務(wù)端返回?cái)?shù)據(jù)全局變量result中的數(shù)據(jù)
- channelRead方法:
- 收到服務(wù)器的返回?cái)?shù)據(jù)后,將返回?cái)?shù)據(jù)放在全局變量result中
- 喚醒等待的線程
- 因?yàn)閏hannelRead和call方法是有同步關(guān)系的所有要加上
synchronized
加鎖- 小結(jié): 代碼執(zhí)行流程
- channelActive()
- setPara()設(shè)置需要發(fā)送的數(shù)據(jù)
- call(wait之前代碼)被代理對象調(diào)用, 發(fā)送數(shù)據(jù)給服務(wù)器-> wait
- 等待被喚醒(channelRead)->notify
- call(wait之后代碼)
package site.zhourui.nioAndNetty.netty.dubborpc.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;//上下文
private String result; //返回的結(jié)果
private String para; //客戶端調(diào)用方法時,傳入的參數(shù)
//與服務(wù)器的連接創(chuàng)建后,就會被調(diào)用, 這個方法是第一個被調(diào)用(1)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" channelActive 被調(diào)用 ");
context = ctx; //因?yàn)槲覀冊谄渌椒〞褂玫?ctx
}
//收到服務(wù)器的數(shù)據(jù)后,調(diào)用方法 (4)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(" channelRead 被調(diào)用 ");
result = msg.toString();
notify(); //喚醒等待的線程
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//被代理對象調(diào)用, 發(fā)送數(shù)據(jù)給服務(wù)器,-> wait -> 等待被喚醒(channelRead) -> 返回結(jié)果 (3)-》5
@Override
public synchronized Object call() throws Exception {
System.out.println(" call1 被調(diào)用 ");
context.writeAndFlush(para);
//進(jìn)行wait
wait(); //等待channelRead 方法獲取到服務(wù)器的結(jié)果后,喚醒
System.out.println(" call2 被調(diào)用 ");
return result; //服務(wù)方返回的結(jié)果
}
void setPara(String para) {
System.out.println(" setPara ");
this.para = para;
}
}
4.2.2 NettyClient
說明:
創(chuàng)建線程池executor
initClient():
- 初始化NettyClientHandler 設(shè)為全局對象client
- 創(chuàng)建客戶端并連接客戶端
- StringDecoder():字符串編碼器
- StringEncoder():字符串解碼器
- pipeline.addLast(client):將加入自定義handler-client
編寫方法getBean使用代理模式,獲取一個代理對象
public Object getBean(final Class<?> serivceClass, final String providerName)
- serivceClass: 需要代理的Class對象
- providerName: 協(xié)議以及需要發(fā)送的數(shù)據(jù)
- 如果client為空就初始化initClient
- client.setPara():使用自定義handler的全局對象client設(shè)置需要發(fā)送的數(shù)據(jù)
- executor.submit(client): 將我們的自定義handler提交給異步線程池,因?yàn)镹ettyClientHandler 實(shí)現(xiàn)了Callable方法,會自動調(diào)用call方法
- .get():異步任務(wù)執(zhí)行完成后獲取返回結(jié)果
- 將返回結(jié)果return
package site.zhourui.nioAndNetty.netty.dubborpc.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
//創(chuàng)建線程池
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
private int count = 0;
//編寫方法使用代理模式,獲取一個代理對象
public Object getBean(final Class<?> serivceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serivceClass}, (proxy, method, args) -> {
System.out.println("(proxy, method, args) 進(jìn)入...." + (++count) + " 次");
//{} 部分的代碼,客戶端每調(diào)用一次 hello, 就會進(jìn)入到該代碼
if (client == null) {
initClient();
}
//設(shè)置要發(fā)給服務(wù)器端的信息
//providerName 協(xié)議頭 args[0] 就是客戶端調(diào)用api hello(???), 參數(shù)
client.setPara(providerName + args[0]);
//
return executor.submit(client).get();
});
}
//初始化客戶端
private static void initClient() {
client = new NettyClientHandler();
//創(chuàng)建EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
}
);
try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.2.3 NettyServerHandler
- 當(dāng)通道發(fā)生讀事件時
- 獲取客戶端發(fā)送的消息,并調(diào)用服務(wù)
- 按照協(xié)議規(guī)則取出數(shù)據(jù)(HelloService#hello#)
- HelloService# 為協(xié)議頭
- hello為數(shù)據(jù)
- 回復(fù)客戶端調(diào)用結(jié)果
package site.zhourui.nioAndNetty.netty.dubborpc.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import site.zhourui.nioAndNetty.netty.dubborpc.customer.ClientBootstrap;
import site.zhourui.nioAndNetty.netty.dubborpc.provider.HelloServiceImpl;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取客戶端發(fā)送的消息,并調(diào)用服務(wù)
System.out.println("msg=" + msg);
//客戶端在調(diào)用服務(wù)器的api 時,我們需要定義一個協(xié)議
//比如我們要求 每次發(fā)消息是都必須以某個字符串開頭 "HelloService#hello#你好"
if(msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.2.4 NettyServer
- 啟動客戶端
- StringDecoder
- StringEncoder
- NettyServerHandler
package site.zhourui.nioAndNetty.netty.dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
public static void startServer(String hostName, int port) {
startServer0(hostName,port);
}
//編寫一個方法,完成對NettyServer的初始化和啟動
private static void startServer0(String hostname, int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler()); //業(yè)務(wù)處理器
}
}
);
ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服務(wù)提供方開始提供服務(wù)~~");
channelFuture.channel().closeFuture().sync();
}catch (Exception e) {
e.printStackTrace();
}
finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.3 customer 包
4.3.1 ClientBootstrap
- 設(shè)置providerName:我們發(fā)送的數(shù)據(jù)(協(xié)議+數(shù)據(jù))
- 創(chuàng)建一個消費(fèi)者
- 創(chuàng)建代理對象
- 通過代理對象調(diào)用服務(wù)提供者的方法(服務(wù))
package site.zhourui.nioAndNetty.netty.dubborpc.customer;
import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyClient;
import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;
public class ClientBootstrap {
//這里定義協(xié)議頭
public static final String providerName = "HelloService#hello#";
public static void main(String[] args) throws Exception{
//創(chuàng)建一個消費(fèi)者
NettyClient customer = new NettyClient();
//創(chuàng)建代理對象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
for (;; ) {
Thread.sleep(2 * 1000);
//通過代理對象調(diào)用服務(wù)提供者的方法(服務(wù))
String res = service.hello("你好 dubbo~");
System.out.println("調(diào)用的結(jié)果 res= " + res);
}
}
}
4.4 provider 包
4.4.1 HelloServiceImpl
服務(wù)端提供方的實(shí)現(xiàn),遠(yuǎn)程真正被調(diào)用的方法
package site.zhourui.nioAndNetty.netty.dubborpc.provider;
import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;
public class HelloServiceImpl implements HelloService {
private static int count = 0;
//當(dāng)有消費(fèi)方調(diào)用該方法時, 就返回一個結(jié)果
@Override
public String hello(String msg) {
System.out.println("收到客戶端消息=" + msg);
//根據(jù)mes 返回不同的結(jié)果
if(msg != null) {
return "你好客戶端, 我已經(jīng)收到你的消息 [" + msg + "] 第" + (++count) + " 次";
} else {
return "你好客戶端, 我已經(jīng)收到你的消息 ";
}
}
}
4.4.1 ServerBootstrap
ServerBootstrap 會啟動一個服務(wù)提供者,就是 NettyServer
package site.zhourui.nioAndNetty.netty.dubborpc.provider;
import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyServer;
//ServerBootstrap 會啟動一個服務(wù)提供者,就是 NettyServer
public class ServerBootstrap {
public static void main(String[] args) {
//代碼代填..
NettyServer.startServer("127.0.0.1", 7000);
}
}
4.5 測試
-
啟動ServerBootstrap
-
啟動ClientBootstrap
4.5.1 debug看一下ClientBootstrap啟動
首先還是先啟動服務(wù)端ServerBootstrap
-
debug啟動ClientBootstrap
-
NettyClient(),此時只是初始化了全局屬性
-
getBean:創(chuàng)建代理對象
-
先看看入?yún)⑹鞘裁磾?shù)據(jù)
-
如果client沒有被初始化就初始化
-
設(shè)置要發(fā)給服務(wù)器端的信息
-
executor.submit:提交異步任務(wù)就會來到NettyClientHandler的call方法
-
call方法執(zhí)行到wait()方法后,channelRead不久后就會收到服務(wù)端的調(diào)用結(jié)果然后喚醒call方法的子線程繼續(xù)執(zhí)行文章來源:http://www.zghlxwxcb.cn/news/detail-568677.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-568677.html
-
到了這里,關(guān)于Netty核心技術(shù)十一--用Netty 自己 實(shí)現(xiàn) dubbo RPC的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!