前言
Dubbo 協(xié)議層的核心SPI接口是org.apache.dubbo.rpc.Protocol
,通過擴展該接口和圍繞的相關接口,就可以讓 Dubbo 使用我們自定義的協(xié)議來通信。默認的協(xié)議是 dubbo,本文提供一個 Grpc 協(xié)議的實現(xiàn)。
設計思路
Google 提供了 Java 的 Grpc 實現(xiàn),所以我們站在巨人的肩膀上即可,就不用重復造輪子了。
首先,我們要實現(xiàn) Protocol 接口,服務暴露時開啟我們的 GrpcServer,綁定本地端口,用于后續(xù)處理連接和請求。
服務端如何處理grpc請求呢???
方案一,是把暴露的所有服務 Invoker 都封裝成grpc的 Service,全部統(tǒng)一讓 GrpcServer 處理,但是這么做太麻煩了。方案二,是提供一個 DispatcherService,統(tǒng)一處理客戶端發(fā)來的grpc請求,再根據(jù)參數(shù)查找要調用的服務,執(zhí)行本地調用返回結果。本文采用方案二。
客戶端引用服務時,我們創(chuàng)建 GrpcInvoker 對象,和服務端建立連接并生成 DispatcherService 的本地存根 Stub 對象,發(fā)起 RPC 調用時只需把 RpcInvocation 轉換成 Protobuf 消息發(fā)出去即可。
實現(xiàn)GrpcProtocol
項目結構
首先,我們新建一個dubbo-extension-protocol-grpc
模塊,引入必要的依賴。
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-api</artifactId>
<version>${dubbo.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.56.1</version>
</dependency>
</dependencies>
項目結構:
main
--java
----dubbo.extension.rpc.grpc
------message
--------RequestData.java
--------ResponseData.java
------Codec.java
------DispatcherService.java
------DispatcherServiceGrpc.java
------GrpcExporter.java
------GrpcInvoker.java
------GrpcProtocol.java
------GrpcProtocolServer.java
--resources
----META-INF/dubbo
------org.apache.dubbo.rpc.Protocol
服務&消息定義
然后是定義grpc的 Service 和消息格式
DispatcherService.proto 請求分發(fā)服務的定義
syntax = "proto3";
option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc";
option java_outer_classname = "DispatcherServiceProto";
option objc_class_prefix = "HLW";
import "RequestData.proto";
import "ResponseData.proto";
service DispatcherService {
rpc dispatch (RequestData) returns (ResponseData) {}
}
RequestData.proto 請求消息的定義,主要是對 Invocation 的描述
syntax = "proto3";
option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "RequestDataProto";
option objc_class_prefix = "HLW";
message RequestData {
string targetServiceUniqueName = 1;
string methodName = 2;
string serviceName = 3;
repeated bytes parameterTypes = 4;
string parameterTypesDesc = 5;
repeated bytes arguments = 6;
bytes attachments = 7;
}
ResponseData.proto 響應消息的定義,主要是對 AppResponse 的描述
syntax = "proto3";
option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "ResponseataProto";
option objc_class_prefix = "HLW";
message ResponseData {
int32 status = 1;
string errorMessage = 2;
bytes result = 3;
bytes attachments = 4;
}
使用protobuf-maven-plugin
插件把 proto 文件生成對應的 Java 類。
協(xié)議實現(xiàn)
新建 GrpcProtocol 類,繼承 AbstractProtocol,實現(xiàn) Protocol 協(xié)議細節(jié)。
核心是:服務暴露時開啟 Grpc 服務,引用服務時生成對應的 Invoker。
public class GrpcProtocol extends AbstractProtocol {
@Override
protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {
return new GrpcInvoker<>(type, url);
}
@Override
public int getDefaultPort() {
return 18080;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
GrpcExporter<T> exporter = new GrpcExporter<>(invoker);
exporterMap.put(invoker.getInterface().getName(), exporter);
openServer(invoker.getUrl());
return exporter;
}
private void openServer(URL url) {
String key = serviceKey(url);
ProtocolServer protocolServer = serverMap.get(key);
if (protocolServer == null) {
synchronized (serverMap) {
protocolServer = serverMap.get(key);
if (protocolServer == null) {
serverMap.put(key, createServer(url));
}
}
}
}
private ProtocolServer createServer(URL url) {
return new GrpcProtocolServer(url, exporterMap);
}
}
新建 GrpcProtocolServer 類實現(xiàn) ProtocolServer 接口,核心是啟動 GrpcServer,并添加 DispatcherService 處理請求。
public class GrpcProtocolServer implements ProtocolServer {
private final Server server;
public GrpcProtocolServer(URL url, Map<String, Exporter<?>> exporterMap) {
server = ServerBuilder.forPort(url.getPort())
.addService(new DispatcherService(exporterMap))
.build();
try {
server.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public String getAddress() {
return null;
}
@Override
public void setAddress(String address) {
}
@Override
public void close() {
server.shutdown();
}
}
新建 DispatcherService 類實現(xiàn) Grpc Service,用來處理客戶端的grpc請求。核心是把 RequestData 解碼成 RpcInvocation,再查找本地 Invoker 調用并返回結果。
public class DispatcherService extends DispatcherServiceGrpc.DispatcherServiceImplBase {
private final Map<String, Exporter<?>> exporterMap;
public DispatcherService(Map<String, Exporter<?>> exporterMap) {
this.exporterMap = exporterMap;
}
@Override
public void dispatch(RequestData request, StreamObserver<ResponseData> responseObserver) {
RpcInvocation invocation = Codec.decodeInvocation(request);
ResponseData responseData;
try {
Invoker<?> invoker = exporterMap.get(invocation.getServiceName()).getInvoker();
Object returnValue = invoker.invoke(invocation).get().getValue();
responseData = Codec.encodeResponse(returnValue, null);
} catch (Exception e) {
responseData = Codec.encodeResponse(null, e);
}
responseObserver.onNext(responseData);
responseObserver.onCompleted();
}
}
新建 GrpcInvoker 類實現(xiàn) Invoker 接口,服務引用時會創(chuàng)建它,目的是發(fā)起 RPC 調用時通過 Stub 發(fā)一個請求到 DispatcherService,實現(xiàn)grpc協(xié)議的 RPC 調用。
public class GrpcInvoker<T> extends AbstractInvoker<T> {
private static final Map<String, DispatcherServiceGrpc.DispatcherServiceFutureStub> STUB_MAP = new ConcurrentHashMap<>();
public GrpcInvoker(Class<T> type, URL url) {
super(type, url);
}
private DispatcherServiceGrpc.DispatcherServiceFutureStub getStub() {
String key = getUrl().getAddress();
DispatcherServiceGrpc.DispatcherServiceFutureStub stub = STUB_MAP.get(key);
if (stub == null) {
synchronized (STUB_MAP) {
stub = STUB_MAP.get(key);
if (stub == null) {
STUB_MAP.put(key, stub = createClient(getUrl()));
}
}
}
return stub;
}
private DispatcherServiceGrpc.DispatcherServiceFutureStub createClient(URL url) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext().build();
return DispatcherServiceGrpc.newFutureStub(channel);
}
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
RequestData requestData = Codec.encodeInvocation((RpcInvocation) invocation);
ResponseData responseData = getStub().dispatch(requestData).get();
return Codec.decodeResponse(responseData, invocation);
}
}
最后是編解碼器 Codec,它的作用是對 RequestData、ResponseData 對象的編解碼。對于請求來說,要編解碼的是 RpcInvocation;對于響應來說,要編解碼的是返回值和異常信息。
方法實參是 Object[] 類型,附帶參數(shù)是 Map 類型,本身不能直接通過 Protobuf 傳輸,我們會先利用 Serialization 序列化成字節(jié)數(shù)組后再傳輸。
public class Codec {
private static final Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getDefaultExtension();
public static RequestData encodeInvocation(RpcInvocation invocation) {
RequestData.Builder builder = RequestData.newBuilder()
.setTargetServiceUniqueName(invocation.getTargetServiceUniqueName())
.setMethodName(invocation.getMethodName())
.setServiceName(invocation.getServiceName());
for (Class<?> parameterType : invocation.getParameterTypes()) {
builder.addParameterTypes(serialize(parameterType));
}
builder.setParameterTypesDesc(invocation.getParameterTypesDesc());
for (Object argument : invocation.getArguments()) {
builder.addArguments(serialize(argument));
}
builder.setAttachments(serialize(invocation.getAttachments()));
return builder.build();
}
public static RpcInvocation decodeInvocation(RequestData requestData) {
RpcInvocation invocation = new RpcInvocation();
invocation.setTargetServiceUniqueName(requestData.getTargetServiceUniqueName());
invocation.setMethodName(requestData.getMethodName());
invocation.setServiceName(requestData.getServiceName());
List<ByteString> parameterTypesList = requestData.getParameterTypesList();
Class<?>[] parameterTypes = new Class[parameterTypesList.size()];
for (int i = 0; i < parameterTypesList.size(); i++) {
parameterTypes[i] = (Class<?>) deserialize(parameterTypesList.get(i));
}
invocation.setParameterTypes(parameterTypes);
invocation.setParameterTypesDesc(requestData.getParameterTypesDesc());
List<ByteString> argumentsList = requestData.getArgumentsList();
Object[] arguments = new Object[argumentsList.size()];
for (int i = 0; i < argumentsList.size(); i++) {
arguments[i] = deserialize(argumentsList.get(i));
}
invocation.setArguments(arguments);
invocation.setAttachments((Map<String, String>) deserialize(requestData.getAttachments()));
return invocation;
}
public static Result decodeResponse(ResponseData responseData, Invocation invocation) {
AppResponse appResponse = new AppResponse();
if (responseData.getStatus() == 200) {
appResponse.setValue(deserialize(responseData.getResult()));
appResponse.setAttachments((Map<String, String>) deserialize(responseData.getAttachments()));
} else {
appResponse.setException(new RuntimeException(responseData.getErrorMessage()));
}
return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);
}
private static Object deserialize(ByteString byteString) {
try {
InputStream inputStream = new ByteArrayInputStream(byteString.toByteArray());
ObjectInput objectInput = serialization.deserialize(null, inputStream);
return objectInput.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static ByteString serialize(Object obj) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutput output = serialization.serialize(null, outputStream);
output.writeObject(obj);
output.flushBuffer();
return ByteString.copyFrom(outputStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static ResponseData encodeResponse(Object returnValue, Throwable throwable) {
ResponseData.Builder builder = ResponseData.newBuilder();
if (throwable == null) {
builder.setStatus(200);
builder.setResult(serialize(returnValue));
builder.setAttachments(serialize(new HashMap<>()));//先忽略
} else {
builder.setStatus(500);
builder.setErrorMessage(throwable.getMessage());
}
return builder.build();
}
}
實現(xiàn)完畢,最后是讓 Dubbo 可以加載到我們自定義的 GrpcProtocol,可以通過 SPI 的方式。新建META-INF/dubbo/org.apache.dubbo.rpc.Protocol
文件,內容:
grpc=dubbo.extension.rpc.grpc.GrpcProtocol
服務提供方使用自定義協(xié)議:
ProtocolConfig protocolConfig = new ProtocolConfig("grpc", 10880);
消費方使用自定義協(xié)議:文章來源:http://www.zghlxwxcb.cn/news/detail-805950.html
ReferenceConfig#setUrl("grpc://127.0.0.1:10880");
尾巴
Protocol 層關心的是如何暴露服務和引用服務,以及如何讓雙方使用某個具體的協(xié)議來通信,以完成 RPC 調用。如果你覺得官方提供的 dubbo 協(xié)議無法滿足你的業(yè)務,就可以通過擴展 Protocol 接口來實現(xiàn)你自己的私有協(xié)議。文章來源地址http://www.zghlxwxcb.cn/news/detail-805950.html
到了這里,關于自定義Dubbo RPC通信協(xié)議的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!