国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

自定義Dubbo RPC通信協(xié)議

這篇具有很好參考價值的文章主要介紹了自定義Dubbo RPC通信協(xié)議。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

Dubbo 協(xié)議層的核心SPI接口是org.apache.dubbo.rpc.Protocol,通過擴展該接口和圍繞的相關接口,就可以讓 Dubbo 使用我們自定義的協(xié)議來通信。默認的協(xié)議是 dubbo,本文提供一個 Grpc 協(xié)議的實現(xiàn)。

設計思路

Google 提供了 Java 的 Grpc 實現(xiàn),所以我們站在巨人的肩膀上即可,就不用重復造輪子了。

首先,我們要實現(xiàn) Protocol 接口,服務暴露時開啟我們的 GrpcServer,綁定本地端口,用于后續(xù)處理連接和請求。
服務端如何處理grpc請求呢???
方案一,是把暴露的所有服務 Invoker 都封裝成grpc的 Service,全部統(tǒng)一讓 GrpcServer 處理,但是這么做太麻煩了。方案二,是提供一個 DispatcherService,統(tǒng)一處理客戶端發(fā)來的grpc請求,再根據(jù)參數(shù)查找要調用的服務,執(zhí)行本地調用返回結果。本文采用方案二。
客戶端引用服務時,我們創(chuàng)建 GrpcInvoker 對象,和服務端建立連接并生成 DispatcherService 的本地存根 Stub 對象,發(fā)起 RPC 調用時只需把 RpcInvocation 轉換成 Protobuf 消息發(fā)出去即可。

實現(xiàn)GrpcProtocol

項目結構

首先,我們新建一個dubbo-extension-protocol-grpc模塊,引入必要的依賴。

<dependencies>
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-rpc-api</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-all</artifactId>
        <version>1.56.1</version>
    </dependency>
</dependencies>

項目結構:

main
--java
----dubbo.extension.rpc.grpc
------message
--------RequestData.java
--------ResponseData.java
------Codec.java
------DispatcherService.java
------DispatcherServiceGrpc.java
------GrpcExporter.java
------GrpcInvoker.java
------GrpcProtocol.java
------GrpcProtocolServer.java
--resources
----META-INF/dubbo
------org.apache.dubbo.rpc.Protocol

服務&消息定義

然后是定義grpc的 Service 和消息格式
DispatcherService.proto 請求分發(fā)服務的定義

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc";
option java_outer_classname = "DispatcherServiceProto";
option objc_class_prefix = "HLW";

import "RequestData.proto";
import "ResponseData.proto";

service DispatcherService {
  rpc dispatch (RequestData) returns (ResponseData) {}
}

RequestData.proto 請求消息的定義,主要是對 Invocation 的描述

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "RequestDataProto";
option objc_class_prefix = "HLW";

message RequestData {
  string targetServiceUniqueName = 1;
  string methodName = 2;
  string serviceName = 3;
  repeated bytes parameterTypes = 4;
  string parameterTypesDesc = 5;
  repeated bytes arguments = 6;
  bytes attachments = 7;
}

ResponseData.proto 響應消息的定義,主要是對 AppResponse 的描述

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "ResponseataProto";
option objc_class_prefix = "HLW";

message ResponseData {
  int32 status = 1;
  string errorMessage = 2;
  bytes result = 3;
  bytes attachments = 4;
}

使用protobuf-maven-plugin插件把 proto 文件生成對應的 Java 類。

協(xié)議實現(xiàn)

新建 GrpcProtocol 類,繼承 AbstractProtocol,實現(xiàn) Protocol 協(xié)議細節(jié)。
核心是:服務暴露時開啟 Grpc 服務,引用服務時生成對應的 Invoker。

public class GrpcProtocol extends AbstractProtocol {

    @Override
    protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {
        return new GrpcInvoker<>(type, url);
    }

    @Override
    public int getDefaultPort() {
        return 18080;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        GrpcExporter<T> exporter = new GrpcExporter<>(invoker);
        exporterMap.put(invoker.getInterface().getName(), exporter);
        openServer(invoker.getUrl());
        return exporter;
    }

    private void openServer(URL url) {
        String key = serviceKey(url);
        ProtocolServer protocolServer = serverMap.get(key);
        if (protocolServer == null) {
            synchronized (serverMap) {
                protocolServer = serverMap.get(key);
                if (protocolServer == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        }
    }

    private ProtocolServer createServer(URL url) {
        return new GrpcProtocolServer(url, exporterMap);
    }
}

新建 GrpcProtocolServer 類實現(xiàn) ProtocolServer 接口,核心是啟動 GrpcServer,并添加 DispatcherService 處理請求。

public class GrpcProtocolServer implements ProtocolServer {

    private final Server server;

    public GrpcProtocolServer(URL url, Map<String, Exporter<?>> exporterMap) {
        server = ServerBuilder.forPort(url.getPort())
                .addService(new DispatcherService(exporterMap))
                .build();
        try {
            server.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String getAddress() {
        return null;
    }

    @Override
    public void setAddress(String address) {

    }

    @Override
    public void close() {
        server.shutdown();
    }
}

新建 DispatcherService 類實現(xiàn) Grpc Service,用來處理客戶端的grpc請求。核心是把 RequestData 解碼成 RpcInvocation,再查找本地 Invoker 調用并返回結果。

public class DispatcherService extends DispatcherServiceGrpc.DispatcherServiceImplBase {

    private final Map<String, Exporter<?>> exporterMap;

    public DispatcherService(Map<String, Exporter<?>> exporterMap) {
        this.exporterMap = exporterMap;
    }

    @Override
    public void dispatch(RequestData request, StreamObserver<ResponseData> responseObserver) {
        RpcInvocation invocation = Codec.decodeInvocation(request);
        ResponseData responseData;
        try {
            Invoker<?> invoker = exporterMap.get(invocation.getServiceName()).getInvoker();
            Object returnValue = invoker.invoke(invocation).get().getValue();
            responseData = Codec.encodeResponse(returnValue, null);
        } catch (Exception e) {
            responseData = Codec.encodeResponse(null, e);
        }
        responseObserver.onNext(responseData);
        responseObserver.onCompleted();
    }
}

新建 GrpcInvoker 類實現(xiàn) Invoker 接口,服務引用時會創(chuàng)建它,目的是發(fā)起 RPC 調用時通過 Stub 發(fā)一個請求到 DispatcherService,實現(xiàn)grpc協(xié)議的 RPC 調用。

public class GrpcInvoker<T> extends AbstractInvoker<T> {

    private static final Map<String, DispatcherServiceGrpc.DispatcherServiceFutureStub> STUB_MAP = new ConcurrentHashMap<>();

    public GrpcInvoker(Class<T> type, URL url) {
        super(type, url);
    }

    private DispatcherServiceGrpc.DispatcherServiceFutureStub getStub() {
        String key = getUrl().getAddress();
        DispatcherServiceGrpc.DispatcherServiceFutureStub stub = STUB_MAP.get(key);
        if (stub == null) {
            synchronized (STUB_MAP) {
                stub = STUB_MAP.get(key);
                if (stub == null) {
                    STUB_MAP.put(key, stub = createClient(getUrl()));
                }
            }
        }
        return stub;
    }

    private DispatcherServiceGrpc.DispatcherServiceFutureStub createClient(URL url) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext().build();
        return DispatcherServiceGrpc.newFutureStub(channel);
    }

    @Override
    protected Result doInvoke(Invocation invocation) throws Throwable {
        RequestData requestData = Codec.encodeInvocation((RpcInvocation) invocation);
        ResponseData responseData = getStub().dispatch(requestData).get();
        return Codec.decodeResponse(responseData, invocation);
    }
}

最后是編解碼器 Codec,它的作用是對 RequestData、ResponseData 對象的編解碼。對于請求來說,要編解碼的是 RpcInvocation;對于響應來說,要編解碼的是返回值和異常信息。
方法實參是 Object[] 類型,附帶參數(shù)是 Map 類型,本身不能直接通過 Protobuf 傳輸,我們會先利用 Serialization 序列化成字節(jié)數(shù)組后再傳輸。

public class Codec {

    private static final Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getDefaultExtension();

    public static RequestData encodeInvocation(RpcInvocation invocation) {
        RequestData.Builder builder = RequestData.newBuilder()
                .setTargetServiceUniqueName(invocation.getTargetServiceUniqueName())
                .setMethodName(invocation.getMethodName())
                .setServiceName(invocation.getServiceName());
        for (Class<?> parameterType : invocation.getParameterTypes()) {
            builder.addParameterTypes(serialize(parameterType));
        }
        builder.setParameterTypesDesc(invocation.getParameterTypesDesc());
        for (Object argument : invocation.getArguments()) {
            builder.addArguments(serialize(argument));
        }
        builder.setAttachments(serialize(invocation.getAttachments()));
        return builder.build();
    }

    public static RpcInvocation decodeInvocation(RequestData requestData) {
        RpcInvocation invocation = new RpcInvocation();
        invocation.setTargetServiceUniqueName(requestData.getTargetServiceUniqueName());
        invocation.setMethodName(requestData.getMethodName());
        invocation.setServiceName(requestData.getServiceName());
        List<ByteString> parameterTypesList = requestData.getParameterTypesList();
        Class<?>[] parameterTypes = new Class[parameterTypesList.size()];
        for (int i = 0; i < parameterTypesList.size(); i++) {
            parameterTypes[i] = (Class<?>) deserialize(parameterTypesList.get(i));
        }
        invocation.setParameterTypes(parameterTypes);
        invocation.setParameterTypesDesc(requestData.getParameterTypesDesc());
        List<ByteString> argumentsList = requestData.getArgumentsList();
        Object[] arguments = new Object[argumentsList.size()];
        for (int i = 0; i < argumentsList.size(); i++) {
            arguments[i] = deserialize(argumentsList.get(i));
        }
        invocation.setArguments(arguments);
        invocation.setAttachments((Map<String, String>) deserialize(requestData.getAttachments()));
        return invocation;
    }

    public static Result decodeResponse(ResponseData responseData, Invocation invocation) {
        AppResponse appResponse = new AppResponse();
        if (responseData.getStatus() == 200) {
            appResponse.setValue(deserialize(responseData.getResult()));
            appResponse.setAttachments((Map<String, String>) deserialize(responseData.getAttachments()));
        } else {
            appResponse.setException(new RuntimeException(responseData.getErrorMessage()));
        }
        return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);
    }

    private static Object deserialize(ByteString byteString) {
        try {
            InputStream inputStream = new ByteArrayInputStream(byteString.toByteArray());
            ObjectInput objectInput = serialization.deserialize(null, inputStream);
            return objectInput.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static ByteString serialize(Object obj) {
        try {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            ObjectOutput output = serialization.serialize(null, outputStream);
            output.writeObject(obj);
            output.flushBuffer();
            return ByteString.copyFrom(outputStream.toByteArray());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static ResponseData encodeResponse(Object returnValue, Throwable throwable) {
        ResponseData.Builder builder = ResponseData.newBuilder();
        if (throwable == null) {
            builder.setStatus(200);
            builder.setResult(serialize(returnValue));
            builder.setAttachments(serialize(new HashMap<>()));//先忽略
        } else {
            builder.setStatus(500);
            builder.setErrorMessage(throwable.getMessage());
        }
        return builder.build();
    }
}

實現(xiàn)完畢,最后是讓 Dubbo 可以加載到我們自定義的 GrpcProtocol,可以通過 SPI 的方式。新建META-INF/dubbo/org.apache.dubbo.rpc.Protocol文件,內容:

grpc=dubbo.extension.rpc.grpc.GrpcProtocol

服務提供方使用自定義協(xié)議:

ProtocolConfig protocolConfig = new ProtocolConfig("grpc", 10880);

消費方使用自定義協(xié)議:

ReferenceConfig#setUrl("grpc://127.0.0.1:10880");

尾巴

Protocol 層關心的是如何暴露服務和引用服務,以及如何讓雙方使用某個具體的協(xié)議來通信,以完成 RPC 調用。如果你覺得官方提供的 dubbo 協(xié)議無法滿足你的業(yè)務,就可以通過擴展 Protocol 接口來實現(xiàn)你自己的私有協(xié)議。文章來源地址http://www.zghlxwxcb.cn/news/detail-805950.html

到了這里,關于自定義Dubbo RPC通信協(xié)議的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 分布式RPC框架Dubbo詳解

    分布式RPC框架Dubbo詳解

    目錄 ? 1.架構演進 1.1 單體架構 1.2? 垂直架構 1.3 分布式架構 1.4 SOA架構 1.5 微服務架構 2.RPC框架 2.1 RPC基本概念介紹 2.1.1 RPC協(xié)議 2.1.2 RPC框架 2.1.3 RPC與HTTP、TCP/ UDP、Socket的區(qū)別 2.1.4 RPC的運行流程 ?2.1.5 為什么需要RPC 2.2 Dubbo? 2.2.1 Dubbo 概述 2.2.2 Dubbo實戰(zhàn) ? 架構演進如下圖: 這

    2024年02月07日
    瀏覽(38)
  • 用Netty自己實現(xiàn)Dubbo RPC

    用Netty自己實現(xiàn)Dubbo RPC

    1. RPC(Remote Procedure Call)— 遠程過程調用,是一個計算機通信協(xié)議. 該協(xié)議 允許運行在一臺計算機中的程序調用另一臺計算機的子程序,而程序員無需額外地為這個交互作用編程; 2. 兩個或多個應用程序都分布在不同的服務器上,它們之間的調用都像是本地方法調用一樣 (如圖): 3.常

    2024年02月10日
    瀏覽(23)
  • 深入淺出:理解 RPC 和 Dubbo 架構

    深入淺出:理解 RPC 和 Dubbo 架構

    Apache Dubbo是一款高性能的Java RPC框架.其前身是阿里巴巴公司開源的一個高性能,輕量級的開源Java RPC框架,可以和Spring框架無縫集成. Dubbo 官網(wǎng) RPC介紹 Remote Procedure Call 遠程過程調用,是分布式架構的核心,按響應方式分以下兩種: 同步調用:客戶端調用服務方方法,等待直到服務方返

    2023年04月12日
    瀏覽(23)
  • 應用架構演變過程、rpc及Dubbo簡介

    ????????單一應用架構 - 垂直應用架構 - 分布式服務架構?- 微服務架構。 單一應用架構 ????????當網(wǎng)站流量很小時,只需一個應用,將所有功能都部署在一起,以減少部署節(jié)點和成本。 此時,用于簡化增刪改查工作量的 數(shù)據(jù)訪問框架(ORM) 是關鍵。 ????????缺點:

    2024年02月02日
    瀏覽(22)
  • 【Dubbo3云原生微服務開發(fā)實戰(zhàn)】「Dubbo前奏導學」 RPC服務的底層原理和實現(xiàn)

    【Dubbo3云原生微服務開發(fā)實戰(zhàn)】「Dubbo前奏導學」 RPC服務的底層原理和實現(xiàn)

    Dubbo是一款高效而強大的RPC服務框架,它旨在解決微服務架構下的服務監(jiān)控和通信問題。該框架提供了Java、Golang等多語言的SDK,使得使用者可以輕松構建和開發(fā)微服務。Dubbo具備遠程地址發(fā)現(xiàn)和通信能力,可通過Dubbo獨有的身臨其境的服務治理特驗為主導,以提高開發(fā)人員的功

    2024年02月05日
    瀏覽(21)
  • 不滿足于RPC,詳解Dubbo的服務調用鏈路

    不滿足于RPC,詳解Dubbo的服務調用鏈路

    【收藏向】從用法到源碼,一篇文章讓你精通Dubbo的SPI機制 面試Dubbo ,卻問我和Springcloud有什么區(qū)別? 超簡單,手把手教你搭建Dubbo工程(內附源碼) Dubbo最核心功能——服務暴露的配置、使用及原理 并不簡單的代理,Dubbo是如何做服務引用的 經(jīng)過前面一系列的鋪墊,今天終

    2024年02月16日
    瀏覽(26)
  • Java 【dubbo rpc改feign調用】controller注解處理

    【框架改造問題點記錄,dubbo改為spring cloud alibaba】 【第三篇】controller注解處理 【描述】項目之前用了jboss,引入了很多ws.rs包,controller參數(shù)注解使用QueryParam。改造時批量替換成了@RequestParam(代表必傳)。但是前端并不會傳全部參數(shù),會導致400,持續(xù)更新… 不加注解,表示

    2024年02月17日
    瀏覽(19)
  • 微服務學習 | Springboot整合Dubbo+Nacos實現(xiàn)RPC調用

    微服務學習 | Springboot整合Dubbo+Nacos實現(xiàn)RPC調用

    ??? 個人主頁 :鼠鼠我捏,要死了捏的主頁? ??? 系列專欄 :Golang全棧-專欄 ??? 個人學習筆記,若有缺誤,歡迎評論區(qū)指正 ? 前些天發(fā)現(xiàn)了一個巨牛的人工智能學習網(wǎng)站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到網(wǎng)站AI學習網(wǎng)站。 目錄 前言 快速上手

    2024年02月19日
    瀏覽(22)
  • Netty核心技術十一--用Netty 自己 實現(xiàn) dubbo RPC

    Netty核心技術十一--用Netty 自己 實現(xiàn) dubbo RPC

    RPC(Remote Procedure Call) :遠程 過程調用,是一個計算機 通信協(xié)議。該協(xié)議允許運 行于一臺計算機的程序調 用另一臺計算機的子程序, 而程序員無需額外地為這 個交互作用編程 兩個或多個應用程序都分 布在不同的服務器上,它 們之間的調用都像是本地 方法調用一樣(如圖

    2024年02月16日
    瀏覽(20)
  • Dubbo源碼解析第一期:如何使用Netty4構建RPC

    Dubbo源碼解析第一期:如何使用Netty4構建RPC

    ????????早期學習和使用Dubbo的時候(那時候Dubbo還沒成為Apache頂級項目),寫過一些源碼解讀,但隨著Dubbo發(fā)生了翻天覆地的變化,那些文章早已過時,所以現(xiàn)在計劃針對最新的Apache Dubbo源碼來進行“閱讀理解”,希望和大家一起再探Dubbo的實現(xiàn)。由于能力有限,如果文章

    2024年01月21日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包