在學(xué)習(xí) RPC 框架之前,我們先來手寫一個(gè)RPC。
我們?cè)趯W(xué)習(xí)的過程中,一定要做到知其然,還要知其所以然。
架構(gòu)演進(jìn)
單體架構(gòu)
要知道,在以前單體架構(gòu)的時(shí)候,會(huì)將所有的應(yīng)用功能都集中在一個(gè)服務(wù)當(dāng)中。
單體架構(gòu)初始開發(fā)簡(jiǎn)單,所有的功能都在一個(gè)項(xiàng)目中,容易理解整個(gè)應(yīng)用的業(yè)務(wù),而且部署也比較簡(jiǎn)單,就一個(gè)服務(wù)。
還有就是方便測(cè)試和更容易實(shí)現(xiàn)跨多個(gè)業(yè)務(wù)功能的事務(wù)性操作。
但是單體服務(wù)也存在很多缺點(diǎn):可維護(hù)性差、難以擴(kuò)展、可用性低等等
拆分服務(wù)
既然單體服務(wù)有這么多缺點(diǎn),那咋辦嘛,將服務(wù)根據(jù)業(yè)務(wù)需求進(jìn)行拆分唄。
拆分服務(wù)之后,那服務(wù)之間需要互相調(diào)用啊,采用什么方式交流呢(通信方式)?數(shù)據(jù)的一致性怎么保證呢?
首先我們可能會(huì)想到,服務(wù)拆分了,但是數(shù)據(jù)庫可以不用拆啊,多個(gè)服務(wù)共享統(tǒng)一份數(shù)據(jù),
這樣數(shù)據(jù)的一致性就很容易保證了。
或者對(duì)于數(shù)據(jù)一致性要求不高的服務(wù)采用消息中間件,保證數(shù)據(jù)的最終一致性就行;
再或者,服務(wù)之間通過RPC這種通訊機(jī)制通信也行啊。
RPC只是一種泛概念,在不同時(shí)期有不同的表現(xiàn)方式。
早期基于Web的RPC。如XML-RPC:使用XML格式編碼其調(diào)用和HTTP作為傳輸機(jī)制。雖然它支持跨語言調(diào)用,但由于XML的冗余性,效率相對(duì)較低。
現(xiàn)代RPC系統(tǒng),如JSON-RPC:一個(gè)無狀態(tài)、輕量級(jí)的遠(yuǎn)程過程調(diào)用(RPC)協(xié)議,以JSON(JavaScript Object Notation)作為數(shù)據(jù)格式,可以使用各種傳輸協(xié)議。
在現(xiàn)代分布式系統(tǒng)開發(fā)中,RPC對(duì)任何開發(fā)者來說都是一項(xiàng)重要的技能,因此學(xué)習(xí)RPC就顯得很重要了。
在正式學(xué)習(xí)RPC框架之前,我們手動(dòng)實(shí)現(xiàn)一個(gè)PRC框架,方便于后面對(duì)其他RPC框架的學(xué)習(xí)。
手寫RPC
首先來看看上面這張圖,做一個(gè)詳細(xì)的介紹: 兩個(gè)角色、三個(gè)項(xiàng)目
兩個(gè)角色:一個(gè)服務(wù)提供者和一個(gè)服務(wù)消費(fèi)者
三個(gè)項(xiàng)目:rpc-api、rpc-provider和rpc-consumer
API工程是整個(gè)服務(wù)的標(biāo)準(zhǔn):
各類服務(wù)傳輸過程當(dāng)中的傳輸對(duì)象的標(biāo)準(zhǔn),包括接口的標(biāo)準(zhǔn)。
Provider工程:
將依賴于上面的標(biāo)準(zhǔn),就是API工程,并且將會(huì)去實(shí)現(xiàn)IService,即實(shí)現(xiàn)標(biāo)準(zhǔn)當(dāng)中定義的接口。還要基于網(wǎng)絡(luò)對(duì)外提供服務(wù),因此會(huì)包含Net Server這個(gè)模塊來提供網(wǎng)絡(luò)服務(wù)。主要用來接收和解析網(wǎng)絡(luò)請(qǐng)求,并去調(diào)用 Service Dispatch 來完成整個(gè)服務(wù)的調(diào)用和分發(fā)。
Consumer工程:
Net Client 主要是完成網(wǎng)絡(luò)的調(diào)用。
ProxyFactory,動(dòng)態(tài)代理模式,主要在調(diào)用的過程當(dāng)中屏蔽網(wǎng)絡(luò)通訊相關(guān)的一些細(xì)節(jié),使得我們開發(fā)人員在使用過程中不再關(guān)注網(wǎng)絡(luò)細(xì)節(jié)。
工程搭建步驟
為了簡(jiǎn)單演示,rpc-api項(xiàng)目中只有一個(gè)接口,接口中有一個(gè)方法:addUser();
接口的具體實(shí)現(xiàn)在rpc-provider中;rpc-consumer中模擬調(diào)用addUser() 方法。
建立三個(gè)maven項(xiàng)目,首先創(chuàng)建API工程
搭建 rpc-api
如何新建maven工程這里就不做介紹了,下面只包含具體代碼
1、pom文件
只包含一個(gè)lombok依賴,主要是為了方便不用手寫getter和setter
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mntalk</groupId>
<artifactId>rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
</project>
2、UserDto
三個(gè)屬性name、age、userId
因?yàn)樾枰诰W(wǎng)絡(luò)中傳輸,實(shí)現(xiàn)了Serializable,并添加了序列化id
package com.mntalk.dto;
import lombok.Data;
import java.io.Serializable;
@Data
public class UserDto implements Serializable {
private static final long serialVersionUID = 4266781378102409837L;
private String name;
private int age;
private String userId;
}
3、UserService
定義接口和方法
package com.mntalk.api;
import com.mntalk.dto.UserDto;
public interface UserService {
// 添加用戶
public UserDto addUser(UserDto userDto);
}
4、RPACommonReqDto
定義統(tǒng)一的傳輸標(biāo)準(zhǔn),這里先定義成這樣,后面再詳細(xì)解釋,為什么會(huì)包含者幾個(gè)屬性
package com.mntalk.dto;
import lombok.Data;
import java.io.Serializable;
/**
* 統(tǒng)一傳輸標(biāo)準(zhǔn)
*/
@Data
public class RPACommonReqDto implements Serializable {
private static final long serialVersionUID = 6212822493972023391L;
// 方法名
private String methodName;
// 類的全路徑名
private String classPath;
// 形參列表
private Object[] args;
}
5、將工程打成jar包,方便provider和consumer工程引入進(jìn)行實(shí)現(xiàn)和調(diào)用
具體怎么打包就不做介紹了,可以使用命令打包,也可以接觸idea打包
搭建 rpc-provider
提供接口實(shí)現(xiàn),并使用Socket網(wǎng)絡(luò)編程,模擬等待調(diào)用
1、pom文件
引入rpc-api工程
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mntalk</groupId>
<artifactId>rpc-privider</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.mntalk</groupId>
<artifactId>rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2、實(shí)現(xiàn)UserService
實(shí)現(xiàn)具體的addUser方法,這里就模擬已經(jīng)添加到數(shù)據(jù)庫,并生成了userId,然后將生成的信息返回。
UserServiceImpl業(yè)務(wù)實(shí)現(xiàn)
package com.mntalk.impl;
import com.mntalk.api.UserService;
import com.mntalk.dto.UserDto;
import java.util.Random;
public class UserServiceImpl implements UserService {
@Override
public UserDto addUser(UserDto userDto) {
// todo xxx
// 模擬插入數(shù)據(jù)庫并生成了userId
System.out.println("接收:" + userDto);
userDto.setUserId(new Random().nextInt(100000) + "");
System.out.println("設(shè)置了用戶id:" + userDto);
return userDto ;
}
}
3、NetServer
根據(jù)上面實(shí)現(xiàn)手寫RPC的思路圖,現(xiàn)在需要Socket網(wǎng)絡(luò)編程,模擬等待consumer來進(jìn)行調(diào)用。
這里采用線程池來進(jìn)行異步實(shí)現(xiàn),為什么不不直接在主線程中等待呢?
因?yàn)镾erverSocket的accept() 是典型的 Blocking IO,會(huì)阻塞工作線程。
package com.mntalk.net;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 對(duì)外提供服務(wù)
*/
public class NetServer {
static final ExecutorService threadPool = Executors.newFixedThreadPool(50);
public static void startUp(int port) throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
// // 阻塞等待客戶端連接
// Socket socket = serverSocket.accept();
//
// // 典型的 Blocking IO,會(huì)阻塞工作線程
// socket.getOutputStream();
// socket.getInputStream();
// 多線程方式
// 線程池
while (true) {
Socket socket = serverSocket.accept();
threadPool.submit(new RPCProcessor(socket));
}
}
}
4、RPCProcessor
具體的socket處理,實(shí)現(xiàn)Runnable接口,方便放入多線程處理。
其中很重要的一點(diǎn)就是模擬服務(wù)的分發(fā) ServiceDispatch.dispatch(reqObject);
package com.mntalk.net;
import com.mntalk.dispatch.ServiceDispatch;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Optional;
public class RPCProcessor implements Runnable {
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
private final Socket socket;
/**
* 因?yàn)樾枰幚韘ocket的流,所以需要注入socket
*/
public RPCProcessor(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 將輸入輸出流包裝成對(duì)象流
objectInputStream = new ObjectInputStream(socket.getInputStream());
// 讀取客戶端對(duì)象
Object reqObject = objectInputStream.readObject();
// 服務(wù)分發(fā)
Object respObj = ServiceDispatch.dispatch(reqObject);
// 將結(jié)果進(jìn)行輸出
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(respObj);
objectOutputStream.flush();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
if (Optional.ofNullable(objectInputStream).isPresent()) {
objectInputStream.close();
}
if (Optional.ofNullable(objectOutputStream).isPresent()) {
objectOutputStream.close();
}
} catch (IOException e) {
System.out.println("遠(yuǎn)程調(diào)用流關(guān)閉錯(cuò)誤:" + e.getMessage());
}
}
}
}
5、ServiceDispatch 服務(wù)分發(fā)
在這里詳細(xì)解釋一下,為什么之前要定義一個(gè) RPACommonReqDto 標(biāo)準(zhǔn)傳輸對(duì)象,并且包含三個(gè)屬性:
最主要的就是為了下面通過反射生成具體的instance,然后調(diào)用方法。
通過classPath可以生成字節(jié)碼對(duì)象;通過方法參數(shù)列表,可以得到參數(shù)類型列表,然后再通過方法名,就可以得到具體的方法;
然后就可以進(jìn)行方法調(diào)用了。
package com.mntalk.dispatch;
import com.mntalk.dto.RPACommonReqDto;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
public class ServiceDispatch {
/**
* 服務(wù)分發(fā)
*/
public static Object dispatch(Object reqObject) {
// 大膽猜測(cè)一下,reqObject 中應(yīng)該包含了分發(fā)的內(nèi)容屬性
// type == 1 --> addUser
// type == 2 --> deleteUser
// 上面的這樣太呆板了
// 基于反射機(jī)制,就很靈活 instance方法的調(diào)用
// classpath 全路徑
// 方法名 + 形參列表的類型列表
// 實(shí)例對(duì)象 Class.newInstance
// 方法在調(diào)用過程當(dāng)中參數(shù)的值 ( getType 可以獲得 形參列表的類型列表 )
RPACommonReqDto reqDto = (RPACommonReqDto)reqObject;
String classPath = reqDto.getClassPath();
String methodName = reqDto.getMethodName();
Object[] args = reqDto.getArgs();
Class[] types = null;
if (args != null && args.length > 0) {
types = new Class[args.length];
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
}
Object respObject = null;
// 方法獲取及調(diào)用
try {
Class<?> clazz = Class.forName(classPath);
Method method = clazz.getDeclaredMethod(methodName, types);
Constructor<?> constructor = clazz.getDeclaredConstructor();
respObject = method.invoke(constructor.newInstance(), args);
} catch (Exception e) {
e.printStackTrace();
}
return respObject;
}
}
搭建 rpc-consumer
模擬調(diào)用
1、pom文件
引入rpc-api工程
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mntalk</groupId>
<artifactId>rpc-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.mntalk</groupId>
<artifactId>rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2、TestRPC
模擬調(diào)用
package com.mntalk;
import com.mntalk.api.UserService;
import com.mntalk.dto.UserDto;
import com.mntalk.proxy.FactoryProxy;
public class TestRPC {
public static void main(String[] args) {
UserService userService = FactoryProxy.getInstanceByClassType(UserService.class);
UserDto userDto = new UserDto();
userDto.setName("feiz");
userDto.setAge(18);
System.out.println("invoke before: " + userDto);
userDto = userService.addUser(userDto); // 實(shí)際上是跑到RPCInvocationHandler的invoke方法中去執(zhí)行邏輯了
System.out.println("invoke after: " + userDto);
}
}
3、FactoryProxy 代理工廠
通過JDK的動(dòng)態(tài)代理,底層會(huì)采用ASM字節(jié)碼重組技術(shù),會(huì)生成一個(gè)新的class字節(jié)碼對(duì)象,
然后由ClassLoader將字節(jié)碼對(duì)象加載到JVM進(jìn)程當(dāng)中,經(jīng)由這個(gè)類的實(shí)例,
去創(chuàng)建出來了一個(gè)實(shí)例對(duì)象,這個(gè)class對(duì)象實(shí)現(xiàn)了interfaceClazz接口。
package com.mntalk.proxy;
import com.mntalk.api.UserService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
/**
* 代理工廠
*/
public class FactoryProxy {
// 寫死的情況(并不是我們需要的)
// public static UserService getUserServiceInstance() {
// return (UserService) null;
// }
// 你傳入什么樣的一個(gè)接口,我希望得到的是你傳入接口的實(shí)例對(duì)象
public static <T> T getInstanceByClassType(Class<T> interfaceClazz) {
/**
* 需要用到JDK的動(dòng)態(tài)代理
*
* ClassLoader loader, 類加載器Bootstrap、Application、Extension、戶自定義類加載器 這里默認(rèn)用Application
* Class<?>[] interfaces, 實(shí)現(xiàn)的接口列表
* InvocationHandler h 處理器
*/
return (T) Proxy.newProxyInstance(FactoryProxy.class.getClassLoader(),
new Class[]{interfaceClazz},
new RPCInvocationHandler()
);
}
}
4、RPCInvocationHandler
RPCInvocationHandler
package com.mntalk.proxy;
import com.mntalk.dto.UserDto;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
// 創(chuàng)建實(shí)例對(duì)象時(shí)使用
public class RPCInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("11111");
UserDto userDto = new UserDto();
userDto.setAge(99);
userDto.setName("feifei");
return userDto;
}
}
在TestRPC中點(diǎn)擊運(yùn)行
實(shí)際上userService.addUser(userDto);是跑到RPCInvocationHandler的invoke方法中去執(zhí)行邏輯了
在invoke方法中,實(shí)現(xiàn)了目標(biāo)對(duì)象的保護(hù)和增強(qiáng),
那么我們可以將網(wǎng)絡(luò)實(shí)現(xiàn)的細(xì)節(jié)放到這里面來實(shí)現(xiàn),讓調(diào)用者無感知這件事情。
5、在invoke中組裝網(wǎng)絡(luò)請(qǐng)求
這里監(jiān)聽的是9999,因?yàn)閜rovider中使用的是9999端口,自己根據(jù)自己的進(jìn)行調(diào)整就行
package com.mntalk.proxy;
import com.mntalk.anno.ServiceMapped;
import com.mntalk.dto.RPACommonReqDto;
import com.mntalk.dto.UserDto;
import com.mntalk.net.NetClient;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
// 創(chuàng)建實(shí)例對(duì)象時(shí)使用
public class RPCInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 組裝 RPACommonReqDto 對(duì)象,完成網(wǎng)絡(luò)的調(diào)用。。。
RPACommonReqDto commonReqDto = new RPACommonReqDto();
commonReqDto.setArgs(args);
commonReqDto.setMethodName(method.getName());
String classPath = method.getDeclaringClass().getDeclaredAnnotation(ServiceMapped.class).value();
commonReqDto.setClassPath(classPath); // 使用注解的方式替代寫死的方式
// 完成網(wǎng)絡(luò)的調(diào)用(host暫時(shí)寫死)
return NetClient.callRemoteService("localhost", 9999, commonReqDto);
}
}
6、NetClient 擬調(diào)用
package com.mntalk.net;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Optional;
/**
* 網(wǎng)絡(luò)調(diào)用
*/
public class NetClient {
public static Object callRemoteService(String host, int port, Object reqObject) {
ObjectOutputStream objectOutputStream = null;
ObjectInputStream objectInputStream = null;
Object respObject = null;
Socket socket = null;
try {
socket = new Socket(host, port);
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(reqObject);
objectOutputStream.flush();
objectInputStream = new ObjectInputStream(socket.getInputStream());
respObject = objectInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (Optional.ofNullable(objectInputStream).isPresent()) {
objectInputStream.close();
}
if (Optional.ofNullable(objectOutputStream).isPresent()) {
objectOutputStream.close();
}
} catch (IOException e) {
System.out.println("遠(yuǎn)程調(diào)用流關(guān)閉錯(cuò)誤:" + e.getMessage());
}
}
return respObject;
}
}
7、使用注解替代classPath寫死的情況
在rpc-api中定義注解**@ServiceMapped**, 并且在 UserService 接口上添加注解 @ServiceMapped(“com.mntalk.impl.UserServiceImpl”)
package com.mntalk.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface ServiceMapped {
String value();
}
7、在TestRPC中運(yùn)行
如下圖,consumer調(diào)用provider成功,模擬插入數(shù)據(jù),生成id并返回了。
小結(jié)
上面我們手動(dòng)實(shí)現(xiàn)了一個(gè)RPC, 能夠發(fā)起遠(yuǎn)程調(diào)用了,但是與 Dubbo 這樣的框架相比,還有什么需要優(yōu)化的點(diǎn)?
我們現(xiàn)在僅僅只是完成了跨進(jìn)程、跨網(wǎng)絡(luò)的調(diào)用。
這個(gè)機(jī)制還是有很多可以優(yōu)化和調(diào)整的地方,比如說
序列化方式
給予Java的網(wǎng)絡(luò)編程,Java的序列化還是不夠妥當(dāng),Java的序列化和反序列化在安全性方面考慮得非常多,
把一個(gè)Java序列化和反序列化,都會(huì)把整個(gè)類的層級(jí)結(jié)構(gòu)進(jìn)行序列化,包括序列化ID的檢查,所以序列化出來的二進(jìn)制占用的空間是非常大的。
這樣在網(wǎng)絡(luò)中傳輸就會(huì)占用更大的帶寬,帶來更大的數(shù)據(jù)傳輸效率的影響。
Dubbo中默認(rèn)采用Hessian2序列化
Hessian2是一種緊湊的、對(duì)各種語言友好的二進(jìn)制協(xié)議,它在性能和跨語言互操作性方面都表現(xiàn)不錯(cuò)。
網(wǎng)絡(luò)編程中流的處理
我們現(xiàn)在采用的是基于線程池的異步處理方式。
可以使用更為友好的網(wǎng)絡(luò)編程方式進(jìn)行處理,比如 MINA、Netty等等,Dubbo底層的默認(rèn)通訊框架就是Netty
Netty:
這是一個(gè)異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,可以快速開發(fā)可維護(hù)的高性能協(xié)議服務(wù)器與客戶端。
Netty廣泛用于開發(fā)網(wǎng)絡(luò)游戲、大數(shù)據(jù)傳輸應(yīng)用、實(shí)時(shí)通訊系統(tǒng)等。它提供了對(duì)多種傳輸類型的支持,例如TCP和UDP的socket服務(wù)。
最后說一句(求關(guān)注,求贊,別白嫖我)
最近無意間獲得一份阿里大佬寫的刷題筆記和面經(jīng),一下子打通了我的任督二脈,進(jìn)大廠原來沒那么難。
這是大佬寫的, 7701頁的阿里大佬寫的刷題筆記,讓我offer拿到手軟
求一鍵三連:點(diǎn)贊、分享、收藏文章來源:http://www.zghlxwxcb.cn/news/detail-798709.html
點(diǎn)贊對(duì)我真的非常重要!在線求贊,加個(gè)關(guān)注我會(huì)非常感激!@小鄭說編程文章來源地址http://www.zghlxwxcb.cn/news/detail-798709.html
到了這里,關(guān)于如何手寫一個(gè)RPC?的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!