国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

這篇具有很好參考價(jià)值的文章主要介紹了springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、SpringCloud 簡介

Spring Cloud 是一系列框架的有序集合如服務(wù)發(fā)現(xiàn)注冊、配置中心、消息總線、負(fù)載均衡、熔斷器、數(shù)據(jù)監(jiān)控等。

SpringCloud 將多個(gè)服務(wù)框架組合起來,通過Spring Boot進(jìn)行再封裝,屏蔽掉了復(fù)雜的配置和實(shí)現(xiàn)原理,最終給開發(fā)者提供了一套簡單易懂、易部署和易維護(hù)的分布式系統(tǒng)開發(fā)工具包。

springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

?Spring Cloud是一個(gè)基于SpringBoot實(shí)現(xiàn)的微服務(wù)開發(fā)方案,Spring boot 是 Spring 的一套快速配置框架??梢曰趕pring boot 快速開發(fā)單個(gè)微服務(wù)。

二、NACOS簡介

一個(gè)更易于構(gòu)建云原生應(yīng)用的動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、配置管理和服務(wù)管理平臺(tái)。

Nacos 致力于幫助您發(fā)現(xiàn)、配置和管理微服務(wù)。Nacos 提供了一組簡單易用的特性集,幫助您快速實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、服務(wù)配置、服務(wù)元數(shù)據(jù)及流量管理。

Nacos 幫助您更敏捷和容易地構(gòu)建、交付和管理微服務(wù)平臺(tái)。 Nacos 是構(gòu)建以“服務(wù)”為中心的現(xiàn)代應(yīng)用架構(gòu) (例如微服務(wù)范式、云原生范式) 的服務(wù)基礎(chǔ)設(shè)施。

1、Nacos中的概念

地域

物理的數(shù)據(jù)中心,資源創(chuàng)建成功后不能更換。

可用區(qū)

同一地域內(nèi),電力和網(wǎng)絡(luò)互相獨(dú)立的物理區(qū)域。同一可用區(qū)內(nèi),實(shí)例的網(wǎng)絡(luò)延遲較低。

接入點(diǎn)

地域的某個(gè)服務(wù)的入口域名。

命名空間

用于進(jìn)行租戶粒度的配置隔離。不同的命名空間下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用場景之一是不同環(huán)境的配置的區(qū)分隔離,例如開發(fā)測試環(huán)境和生產(chǎn)環(huán)境的資源(如配置、服務(wù))隔離等。

配置

在系統(tǒng)開發(fā)過程中,開發(fā)者通常會(huì)將一些需要變更的參數(shù)、變量等從代碼中分離出來獨(dú)立管理,以獨(dú)立的配置文件的形式存在。目的是讓靜態(tài)的系統(tǒng)工件或者交付物(如 WAR,JAR 包等)更好地和實(shí)際的物理運(yùn)行環(huán)境進(jìn)行適配。配置管理一般包含在系統(tǒng)部署的過程中,由系統(tǒng)管理員或者運(yùn)維人員完成。配置變更是調(diào)整系統(tǒng)運(yùn)行時(shí)的行為的有效手段。

配置管理

系統(tǒng)配置的編輯、存儲(chǔ)、分發(fā)、變更管理、歷史版本管理、變更審計(jì)等所有與配置相關(guān)的活動(dòng)。

配置項(xiàng)

一個(gè)具體的可配置的參數(shù)與其值域,通常以 param-key=param-value 的形式存在。例如我們常配置系統(tǒng)的日志輸出級別(logLevel=INFO|WARN|ERROR) 就是一個(gè)配置項(xiàng)。

配置集

一組相關(guān)或者不相關(guān)的配置項(xiàng)的集合稱為配置集。在系統(tǒng)中,一個(gè)配置文件通常就是一個(gè)配置集,包含了系統(tǒng)各個(gè)方面的配置。例如,一個(gè)配置集可能包含了數(shù)據(jù)源、線程池、日志級別等配置項(xiàng)。

配置集 ID

Nacos 中的某個(gè)配置集的 ID。配置集 ID 是組織劃分配置的維度之一。Data ID 通常用于組織劃分系統(tǒng)的配置集。一個(gè)系統(tǒng)或者應(yīng)用可以包含多個(gè)配置集,每個(gè)配置集都可以被一個(gè)有意義的名稱標(biāo)識(shí)。Data ID 通常采用類 Java 包(如 com.taobao.tc.refund.log.level)的命名規(guī)則保證全局唯一性。此命名規(guī)則非強(qiáng)制。

配置分組

Nacos 中的一組配置集,是組織配置的維度之一。通過一個(gè)有意義的字符串(如 Buy 或 Trade )對配置集進(jìn)行分組,從而區(qū)分 Data ID 相同的配置集。當(dāng)您在 Nacos 上創(chuàng)建一個(gè)配置時(shí),如果未填寫配置分組的名稱,則配置分組的名稱默認(rèn)采用 DEFAULT_GROUP 。配置分組的常見場景:不同的應(yīng)用或組件使用了相同的配置類型,如 database_url 配置和 MQ_topic 配置。

配置快照

Nacos 的客戶端 SDK 會(huì)在本地生成配置的快照。當(dāng)客戶端無法連接到 Nacos Server 時(shí),可以使用配置快照顯示系統(tǒng)的整體容災(zāi)能力。配置快照類似于 Git 中的本地 commit,也類似于緩存,會(huì)在適當(dāng)?shù)臅r(shí)機(jī)更新,但是并沒有緩存過期(expiration)的概念。

服務(wù)

通過預(yù)定義接口網(wǎng)絡(luò)訪問的提供給客戶端的軟件功能。

服務(wù)名

服務(wù)提供的標(biāo)識(shí),通過該標(biāo)識(shí)可以唯一確定其指代的服務(wù)。

服務(wù)注冊中心

存儲(chǔ)服務(wù)實(shí)例和服務(wù)負(fù)載均衡策略的數(shù)據(jù)庫。

服務(wù)發(fā)現(xiàn)

在計(jì)算機(jī)網(wǎng)絡(luò)上,(通常使用服務(wù)名)對服務(wù)下的實(shí)例的地址和元數(shù)據(jù)進(jìn)行探測,并以預(yù)先定義的接口提供給客戶端進(jìn)行查詢。

元信息

Nacos數(shù)據(jù)(如配置和服務(wù))描述信息,如服務(wù)版本、權(quán)重、容災(zāi)策略、負(fù)載均衡策略、鑒權(quán)配置、各種自定義標(biāo)簽 (label),從作用范圍來看,分為服務(wù)級別的元信息、集群的元信息及實(shí)例的元信息。

應(yīng)用

用于標(biāo)識(shí)服務(wù)提供方的服務(wù)的屬性。

服務(wù)分組

不同的服務(wù)可以歸類到同一分組。

虛擬集群

同一個(gè)服務(wù)下的所有服務(wù)實(shí)例組成一個(gè)默認(rèn)集群, 集群可以被進(jìn)一步按需求劃分,劃分的單位可以是虛擬集群。

實(shí)例

提供一個(gè)或多個(gè)服務(wù)的具有可訪問網(wǎng)絡(luò)地址(IP:Port)的進(jìn)程。

權(quán)重

實(shí)例級別的配置。權(quán)重為浮點(diǎn)數(shù)。權(quán)重越大,分配給該實(shí)例的流量越大。

健康檢查

以指定方式檢查服務(wù)下掛載的實(shí)例 (Instance) 的健康度,從而確認(rèn)該實(shí)例 (Instance) 是否能提供服務(wù)。根據(jù)檢查結(jié)果,實(shí)例 (Instance) 會(huì)被判斷為健康或不健康。對服務(wù)發(fā)起解析請求時(shí),不健康的實(shí)例 (Instance) 不會(huì)返回給客戶端。

健康保護(hù)閾值

為了防止因過多實(shí)例 (Instance) 不健康導(dǎo)致流量全部流向健康實(shí)例 (Instance) ,繼而造成流量壓力把健康實(shí)例 (Instance) 壓垮并形成雪崩效應(yīng),應(yīng)將健康保護(hù)閾值定義為一個(gè) 0 到 1 之間的浮點(diǎn)數(shù)。當(dāng)域名健康實(shí)例數(shù) (Instance) 占總服務(wù)實(shí)例數(shù) (Instance) 的比例小于該值時(shí),無論實(shí)例 (Instance) 是否健康,都會(huì)將這個(gè)實(shí)例 (Instance) 返回給客戶端。這樣做雖然損失了一部分流量,但是保證了集群中剩余健康實(shí)例 (Instance) 能正常工作。

2、Nacos 架構(gòu)

基礎(chǔ)架構(gòu)如下:

springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

?邏輯架構(gòu)及組件如下:

springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

?

  • 服務(wù)管理:實(shí)現(xiàn)服務(wù)CRUD,域名CRUD,服務(wù)健康狀態(tài)檢查,服務(wù)權(quán)重管理等功能
  • 配置管理:實(shí)現(xiàn)配置管CRUD,版本管理,灰度管理,監(jiān)聽管理,推送軌跡,聚合數(shù)據(jù)等功能
  • 元數(shù)據(jù)管理:提供元數(shù)據(jù)CURD 和打標(biāo)能力
  • 插件機(jī)制:實(shí)現(xiàn)三個(gè)模塊可分可合能力,實(shí)現(xiàn)擴(kuò)展點(diǎn)SPI機(jī)制
  • 事件機(jī)制:實(shí)現(xiàn)異步化事件通知,sdk數(shù)據(jù)變化異步通知等邏輯
  • 日志模塊:管理日志分類,日志級別,日志可移植性(尤其避免沖突),日志格式,異常碼+幫助文檔
  • 回調(diào)機(jī)制:sdk通知數(shù)據(jù),通過統(tǒng)一的模式回調(diào)用戶處理。接口和數(shù)據(jù)結(jié)構(gòu)需要具備可擴(kuò)展性
  • 尋址模式:解決ip,域名,nameserver、廣播等多種尋址模式,需要可擴(kuò)展
  • 推送通道:解決server與存儲(chǔ)、server間、server與sdk間推送性能問題
  • 容量管理:管理每個(gè)租戶,分組下的容量,防止存儲(chǔ)被寫爆,影響服務(wù)可用性
  • 流量管理:按照租戶,分組等多個(gè)維度對請求頻率,長鏈接個(gè)數(shù),報(bào)文大小,請求流控進(jìn)行控制
  • 緩存機(jī)制:容災(zāi)目錄,本地緩存,server緩存機(jī)制。容災(zāi)目錄使用需要工具
  • 啟動(dòng)模式:按照單機(jī)模式,配置模式,服務(wù)模式,dns模式,或者all模式,啟動(dòng)不同的程序+UI
  • 一致性協(xié)議:解決不同數(shù)據(jù),不同一致性要求情況下,不同一致性機(jī)制
  • 存儲(chǔ)模塊:解決數(shù)據(jù)持久化、非持久化存儲(chǔ),解決數(shù)據(jù)分片問題
  • Nameserver:解決namespace到clusterid的路由問題,解決用戶環(huán)境與nacos物理環(huán)境映射問題
  • CMDB:解決元數(shù)據(jù)存儲(chǔ),與三方cmdb系統(tǒng)對接問題,解決應(yīng)用,人,資源關(guān)系
  • Metrics:暴露標(biāo)準(zhǔn)metrics數(shù)據(jù),方便與三方監(jiān)控系統(tǒng)打通
  • Trace:暴露標(biāo)準(zhǔn)trace,方便與SLA系統(tǒng)打通,日志白平化,推送軌跡等能力,并且可以和計(jì)量計(jì)費(fèi)系統(tǒng)打通
  • 接入管理:相當(dāng)于阿里云開通服務(wù),分配身份、容量、權(quán)限過程
  • 用戶管理:解決用戶管理,登錄,sso等問題
  • 權(quán)限管理:解決身份識(shí)別,訪問控制,角色管理等問題
  • 審計(jì)系統(tǒng):擴(kuò)展接口方便與不同公司審計(jì)系統(tǒng)打通
  • 通知系統(tǒng):核心數(shù)據(jù)變更,或者操作,方便通過SMS系統(tǒng)打通,通知到對應(yīng)人數(shù)據(jù)變更
  • OpenAPI:暴露標(biāo)準(zhǔn)Rest風(fēng)格HTTP接口,簡單易用,方便多語言集成
  • Console:易用控制臺(tái),做服務(wù)管理、配置管理等操作
  • SDK:多語言sdk
  • Agent:dns-f類似模式,或者與mesh等方案集成
  • CLI:命令行對產(chǎn)品進(jìn)行輕量化管理,像git一樣好用

?部署架構(gòu)如下:

springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

?nacos 官網(wǎng)以及幫助文檔和部署手冊:https://nacos.io/zh-cn/index.html

nacos github:? ?https://github.com/alibaba/nacos

三、NACOS源碼分析

1、Nacos注冊源碼分析-Clinet端

cosumer啟動(dòng)的時(shí)候,從nacos server上讀取指定服務(wù)名稱的實(shí)例列表,緩存到本地內(nèi)存中。

開啟一個(gè)定時(shí)任務(wù),每隔10s去nacos server上拉取服務(wù)列表

nacos的push機(jī)制:

通過心跳檢測發(fā)現(xiàn)服務(wù)提供者出現(xiàn)心態(tài)超時(shí)的時(shí)候,推送一個(gè)push消息到consumer,更新本地的緩存數(shù)據(jù)。

客戶端Client

我們自己的項(xiàng)目在配置了nacos作為注冊中心后,至少要配置這么一個(gè)屬性

spring.cloud.nacos.discovery.server-addr=ip地址:8848
# 從邏輯上看,這個(gè)是通過grpc去注冊還是通過http去注冊。false-http1.x注冊  true-gRPC注冊,默認(rèn)是true,也就是通過gRPC去注冊,畢竟gRPC的性能上要比http1.x高很多
spring.cloud.nacos.discovery.ephemeral=false

這個(gè)屬性會(huì)讓應(yīng)用找到nacos的server地址去注冊。如果不配置的話,會(huì)一直報(bào)錯(cuò)

springboot的@EnableAutoConfiguration這里就不再講解了。都到nacos的源碼了,springboot默認(rèn)是熟悉的。

我們再去打開NacosServiceRegistryAutoConfiguration這個(gè)類。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosServiceManager nacosServiceManager,
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

}

其中第三個(gè)類NacosAutoServiceRegistration實(shí)現(xiàn)了一個(gè)抽象類AbstractAutoServiceRegistration.

public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware,
        ApplicationListener<WebServerInitializedEvent> {
    
    @Override
    @SuppressWarnings("deprecation")
    public void onApplicationEvent(WebServerInitializedEvent event) {
        bind(event);
    }

    @Deprecated
    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                    .getServerNamespace())) {
                return;
            }
        }
        this.port.compareAndSet(0, event.getWebServer().getPort());
        this.start();
    }
    public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get()) {
            this.context.publishEvent(
                    new InstancePreRegisteredEvent(this, getRegistration()));
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            this.context.publishEvent(
                    new InstanceRegisteredEvent<>(this, getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }
}

這里有實(shí)現(xiàn)一個(gè)ApplicationListener<WebServerInitializedEvent>的類,這個(gè)類是spring的一個(gè)監(jiān)聽事件(觀察者模式),而這個(gè)事件就是webserver初始化的時(shí)候去觸發(fā)的。onApplicationEvent方法調(diào)用了bind()方法。而bind()中又調(diào)用了start().

start()中有一個(gè)register()。而這個(gè)register就是NacosServiceRegistry中的register()。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {

    @Override
    public void register(Registration registration) {

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }

        NamingService namingService = namingService();
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();

        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            if (nacosDiscoveryProperties.isFailFast()) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                        registration.toString(), e);
                rethrowRuntimeException(e);
            }
            else {
                log.warn("Failfast is false. {} register failed...{},", serviceId,
                        registration.toString(), e);
            }
        }
    }
}
  • getNacosInstanceFromRegistration?獲取注冊的實(shí)例信息。
private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    instance.setIp(registration.getHost());
    instance.setPort(registration.getPort());
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    instance.setMetadata(registration.getMetadata());
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}
  • namingService.registerInstance(serviceId, group, instance);

clientProxy有3個(gè)實(shí)現(xiàn)類,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。

這個(gè)類的構(gòu)造方法中有個(gè)init(properties)方法,這個(gè)方法中給clientProxy賦值了。走的是NamingClientProxyDelegate方法。一般情況下,帶有delegate的方法都是委派模式。

public NacosNamingService(String serverList) throws NacosException {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    init(properties);
}

public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);

    this.changeNotifier = new InstancesChangeNotifier();
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}
基于http1.x協(xié)議注冊
  • NamingClientProxyDelegate.registerService

    委派這里做了一個(gè)可執(zhí)行的判斷

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}

NamingClientProxyDelegate.getExecuteClientProxy

做了一個(gè)判斷,配置ephemeral=false就走h(yuǎn)ttp,否則grpc。這里請注意,如果nacos-server還是用的1.x.x版本的話,會(huì)報(bào)錯(cuò)的。因?yàn)?.x.x增加一個(gè)grpc的支持,會(huì)額外的多增加一個(gè)端口,默認(rèn)對外提供端口為8848和9848

private NamingClientProxy getExecuteClientProxy(Instance instance) {
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
  • NamingHttpClientProxy.registerService

    這里的clientProxy=NamingHttpClientProxy

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                       instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    final Map<String, String> params = new HashMap<String, String>(32);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, groupedServiceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put(IP_PARAM, instance.getIp());
    params.put(PORT_PARAM, String.valueOf(instance.getPort()));
    params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
    params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
    params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
    params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
    params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));

    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

NamingHttpClientProxy.reqApi

public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
    return reqApi(api, params, Collections.EMPTY_MAP, method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
    throws NacosException {
    return reqApi(api, params, body, serverListManager.getServerList(), method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
                     String method) throws NacosException {

    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }

    NacosException exception = new NacosException();

    if (serverListManager.isDomain()) {
        String nacosDomain = serverListManager.getNacosDomain();
        for (int i = 0; i < maxRetry; i++) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    } else {
        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());

        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            index = (index + 1) % servers.size();
        }
    }

    NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
                        exception.getErrMsg());

    throw new NacosException(exception.getErrCode(),
                             "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

}

serverListManager.isDomain()這個(gè)判斷是配置了幾個(gè)nacos server的值,如果是一個(gè)的話,走if邏輯,如果多余1個(gè)的話,走else邏輯。

else中的servers就是nacos server服務(wù)列表,通過Ramdom拿到一個(gè)隨機(jī)數(shù),然后去callServer(),如果發(fā)現(xiàn)其中的一個(gè)失敗,那么index+1 獲取下一個(gè)服務(wù)節(jié)點(diǎn)再去callServer。如果所有的都失敗的話,則拋出錯(cuò)誤。

NamingHttpClientProxy.callServer

前邊的判斷支線省略,拼接url,拼好了后,進(jìn)入try邏輯塊中,這里封裝了一個(gè)nacosRestTemplate類。請求完成后,返回一個(gè)restResult,拿到了請求結(jié)果后,把請求結(jié)果code放入了一個(gè)交MetricsMonitor的類中了,從代碼上看很明顯是監(jiān)控相關(guān)的類,點(diǎn)擊進(jìn)去果然發(fā)現(xiàn)是prometheus相關(guān)的。這里我們不擴(kuò)展了,繼續(xù)回到主線。

如果返回結(jié)果是200的話,把result.content返回去。

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    String namespace = params.get(CommonParams.NAMESPACE_ID);
    String group = params.get(CommonParams.GROUP_NAME);
    String serviceName = params.get(CommonParams.SERVICE_NAME);
    params.putAll(getSecurityHeaders(namespace, group, serviceName));
    Header header = NamingHttpUtil.builderHeader();

    String url;
    if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
        url = curServer + api;
    } else {
        if (!InternetAddressUtil.containsPort(curServer)) {
            curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }

    try {
        HttpRestResult<String> restResult = nacosRestTemplate
            .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();

        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
            .observe(end - start);

        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}
  • NacosRestTemplate.exchangeForm

    關(guān)鍵方法:this.requestClient().execute()

  • public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
                                              String httpMethod, Type responseType) throws Exception {
        RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
            header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
        return execute(url, httpMethod, requestHttpEntity, responseType);
    }
    
    private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
                                          Type responseType) throws Exception {
        URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
        if (logger.isDebugEnabled()) {
            logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
        }
    
        ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
        HttpClientResponse response = null;
        try {
            response = this.requestClient().execute(uri, httpMethod, requestEntity);
            return responseHandler.handle(response);
        } finally {
            if (response != null) {
                response.close();
            }
        }
    }
    private final HttpClientRequest requestClient;
        
    private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();
    
    public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
        super(logger);
        this.requestClient = requestClient;
    }
    private HttpClientRequest requestClient() {
        if (CollectionUtils.isNotEmpty(interceptors)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Execute via interceptors :{}", interceptors);
            }
            return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
        }
        return requestClient;
    }

    HttpClientBeanHolder.getNacosRestTemplate

    典型的雙重檢查鎖。

  • public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
        if (httpClientFactory == null) {
            throw new NullPointerException("httpClientFactory is null");
        }
        String factoryName = httpClientFactory.getClass().getName();
        NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
        if (nacosRestTemplate == null) {
            synchronized (SINGLETON_REST) {
                nacosRestTemplate = SINGLETON_REST.get(factoryName);
                if (nacosRestTemplate != null) {
                    return nacosRestTemplate;
                }
                nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
                SINGLETON_REST.put(factoryName, nacosRestTemplate);
            }
        }
        return nacosRestTemplate;
    }

    而NamingHttpClientFactory是一個(gè)AbstractHttpClientFactory的實(shí)現(xiàn)類,由于NamingHttpClientProxy沒有重寫createNacosRestTemplate方法。所以最終引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。

    private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
    public NacosRestTemplate getNacosRestTemplate() {
        return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
    }
    
    private static class NamingHttpClientFactory extends AbstractHttpClientFactory {
            
        @Override
        protected HttpClientConfig buildHttpClientConfig() {
            return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
                .setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
        }
    
        @Override
        protected Logger assignLogger() {
            return NAMING_LOGGER;
        }
    }

    AbstractHttpClientFactory.createNacosRestTemplate

    @Override
    public NacosRestTemplate createNacosRestTemplate() {
        HttpClientConfig httpClientConfig = buildHttpClientConfig();
        final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
    
        // enable ssl
        initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
            @Override
            public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
                clientRequest.setSSLContext(loadSSLContext());
                clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
            }
        }, new TlsFileWatcher.FileChangeListener() {
            @Override
            public void onChanged(String filePath) {
                clientRequest.setSSLContext(loadSSLContext());
            }
        });
    
        return new NacosRestTemplate(assignLogger(), clientRequest);
    }

    JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);

    可以看到這里定義了一個(gè)JdkHttpClientRequest 。

    再往下跟就到j(luò)ava.net.HttpURLConnection的調(diào)用,去請求nacos-server的地址,再往下的就不做分析了,進(jìn)入了http的通訊層了。

    最終返回了一個(gè)結(jié)果,如果是200的話,就注冊成功了。失敗了就會(huì)拋出異常。

    基于gRPC http2.0的注冊

    這里同樣的從gRPC和http的委派來進(jìn)行分析

    NamingClientProxyDelegate.registerService

    代碼上邊已經(jīng)分析過,我們直接進(jìn)入gRPC的實(shí)現(xiàn)。

    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
    • NamingGrpcClientProxy.registerService

      redoService.cacheInstanceForRedo 這個(gè)從名稱上看應(yīng)該是重試機(jī)制,

    • @Override
      public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
          NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                             instance);
          redoService.cacheInstanceForRedo(serviceName, groupName, instance);
          doRegisterService(serviceName, groupName, instance);
      }
      • NamingGrpcRedoService.cacheInstanceForRedo

        這里看起來只是給ConcurrentMap中存放一個(gè)redoData,并沒有其他的邏輯,后續(xù)可能會(huì)用到這個(gè)?;氐街骶€,繼續(xù)往下走。

      • private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
        public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
            String key = NamingUtils.getGroupedName(serviceName, groupName);
            InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
            synchronized (registeredInstances) {
                registeredInstances.put(key, redoData);
            }
        }
        • NamingGrpcClientProxy.doRegisterService

          request是根據(jù)構(gòu)造函數(shù)封裝的一個(gè)實(shí)例,requestToServer去進(jìn)行注冊。

        • public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
              InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                            NamingRemoteConstants.REGISTER_INSTANCE, instance);
              requestToServer(request, Response.class);
              redoService.instanceRegistered(serviceName, groupName);
          }

          NamingGrpcClientProxy.requestToServer

          request.putAllHeader推測是跟權(quán)限校驗(yàn)相關(guān)的,我搭建的沒有設(shè)置鑒權(quán),所以都是空的。

          然后根據(jù)rpcClient去調(diào)用request方法。根據(jù)超時(shí)時(shí)間判斷的,這2個(gè)分支最終都會(huì)進(jìn)入一個(gè)方法,默認(rèn)是3s的超時(shí)時(shí)間。

          最終返回一個(gè)response結(jié)果。

          private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
              throws NacosException {
              try {
                  request.putAllHeader(
                      getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
                  Response response =
                      requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
                  if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
                      throw new NacosException(response.getErrorCode(), response.getMessage());
                  }
                  if (responseClass.isAssignableFrom(response.getClass())) {
                      return (T) response;
                  }
                  NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                                      response.getClass().getName(), responseClass.getName());
              } catch (Exception e) {
                  throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
              }
              throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
          }
          • RpcClient.request

          這里的校驗(yàn)暫且不看,直切主線, response = this.currentConnection.request(request, timeoutMills);

          再進(jìn)入到request方法。

        • public Response request(Request request, long timeoutMills) throws NacosException {
              int retryTimes = 0;
              Response response;
              Exception exceptionThrow = null;
              long start = System.currentTimeMillis();
              while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
                  boolean waitReconnect = false;
                  try {
                      if (this.currentConnection == null || !isRunning()) {
                          waitReconnect = true;
                          throw new NacosException(NacosException.CLIENT_DISCONNECT,
                                                   "Client not connected, current status:" + rpcClientStatus.get());
                      }
                      response = this.currentConnection.request(request, timeoutMills);
                      if (response == null) {
                          throw new NacosException(SERVER_ERROR, "Unknown Exception.");
                      }
                      if (response instanceof ErrorResponse) {
                          if (response.getErrorCode() == NacosException.UN_REGISTER) {
                              synchronized (this) {
                                  waitReconnect = true;
                                  if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                                      LoggerUtils.printIfErrorEnabled(LOGGER,
                                                                      "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                                                      currentConnection.getConnectionId(), request.getClass().getSimpleName());
                                      switchServerAsync();
                                  }
                              }
          
                          }
                          throw new NacosException(response.getErrorCode(), response.getMessage());
                      }
                      // return response.
                      lastActiveTimeStamp = System.currentTimeMillis();
                      return response;
          
                  } catch (Exception e) {
                      if (waitReconnect) {
                          try {
                              // wait client to reconnect.
                              Thread.sleep(Math.min(100, timeoutMills / 3));
                          } catch (Exception exception) {
                              // Do nothing.
                          }
                      }
          
                      LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                                                      request, retryTimes, e.getMessage());
          
                      exceptionThrow = e;
          
                  }
                  retryTimes++;
          
              }
          
              if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                  switchServerAsyncOnRequestFail();
              }
          
              if (exceptionThrow != null) {
                  throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                      : new NacosException(SERVER_ERROR, exceptionThrow);
              } else {
                  throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
              }
          }
          • GrpcConnection.request

          這里的就是封裝的rpc請求,和服務(wù)端進(jìn)行交互的邏輯。在這里封裝了一個(gè)PayLoad類

        • @Override
          public Response request(Request request, long timeouts) throws NacosException {
              Payload grpcRequest = GrpcUtils.convert(request);
              ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
              Payload grpcResponse;
              try {
                  grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
              } catch (Exception e) {
                  throw new NacosException(NacosException.SERVER_ERROR, e);
              }
          
              return (Response) GrpcUtils.parse(grpcResponse);
          }

          2、Nacos注冊源碼分析-Server端

        • 接收注冊

          客戶端和服務(wù)端之間進(jìn)行交互的話,一定需要建立一個(gè)網(wǎng)絡(luò)連接。這里的grpc的源碼相對來說比較復(fù)雜,就簡單分析nacos相關(guān)的。

          工程名稱是nacos-console。

          BaseGrpcServer在啟動(dòng)的時(shí)候會(huì)綁定很多的Handler。

        • springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析

          ?而基于grpc的通信,會(huì)進(jìn)入server端的InstanceRequestHandler

          InstanceRequestHandler.handle

          從handle方法中可以根據(jù)type走到registerInstance中。

          最終進(jìn)入到EphemeralClientOperationServiceImpl.registerInstance

          public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
              
              private final EphemeralClientOperationServiceImpl clientOperationService;
              
              public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
                  this.clientOperationService = clientOperationService;
              }
              
              @Override
              @Secured(action = ActionTypes.WRITE)
              public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
                  Service service = Service
                          .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
                  switch (request.getType()) {
                      case NamingRemoteConstants.REGISTER_INSTANCE:
                          // 注冊
                          return registerInstance(service, request, meta);
                      case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                          // 取消注冊
                          return deregisterInstance(service, request, meta);
                      default:
                          throw new NacosException(NacosException.INVALID_PARAM,
                                  String.format("Unsupported request type %s", request.getType()));
                  }
              }
              
              private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
                      throws NacosException {
                  // 注冊實(shí)例
                  clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
                  NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                          meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                          request.getInstance().getIp(), request.getInstance().getPort()));
                  return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
              }
              
              private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
                  clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
                  NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
                          meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
                          service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
                  return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
              }
          }

          EphemeralClientOperationServiceImpl.registerInstance

          這里的clientManager.getClient(client)說明跳轉(zhuǎn)到下邊的建立長連接

        • @Override
          public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
              NamingUtils.checkInstanceIsLegal(instance);
              // 獲取一個(gè)單例的Service,也就是注冊的實(shí)例
              Service singleton = ServiceManager.getInstance().getSingleton(service);
              if (!singleton.isEphemeral()) {
                  throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                                                  String.format("Current service %s is persistent service, can't register ephemeral instance.",
                                                                singleton.getGroupedServiceName()));
              }
              // 這里的Client是客戶端的長連接,會(huì)進(jìn)入到ClientManagerDelegate的一個(gè)委托,最終進(jìn)入到connectionBasedClientManager中
              Client client = clientManager.getClient(clientId);
              if (!clientIsLegal(client, clientId)) {
                  return;
              }
              InstancePublishInfo instanceInfo = getPublishInfo(instance);
              // 對這個(gè)實(shí)例進(jìn)行注冊
              client.addServiceInstance(singleton, instanceInfo);
              client.setLastUpdatedTime();
              client.recalculateRevision();
              NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
              NotifyCenter
                  .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
          }

          AbstractClient.addServiceInstance

        • // 這個(gè)ConcurrentHashMap就是保存實(shí)例和發(fā)布信息關(guān)系的
          protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
          @Override
          public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
              if (null == publishers.put(service, instancePublishInfo)) {
                  if (instancePublishInfo instanceof BatchInstancePublishInfo) {
                      MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
                  } else {
                      MetricsMonitor.incrementInstanceCount();
                  }
              }
              // 這里有一個(gè)事件,ClientChangeEvent
              NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
              Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
              return true;
          }

          ClientServiceIndexesManager

        • // 應(yīng)用Service和clientId的映射,一個(gè)應(yīng)用Service有多個(gè)服務(wù),所以會(huì)建立多個(gè)長連接,用Set來保存clientId
          private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
          // 應(yīng)用Service和訂閱者clientId的關(guān)系
          private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
          @Override
          public void onEvent(Event event) {
              if (event instanceof ClientEvent.ClientDisconnectEvent) {
                  handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
              } else if (event instanceof ClientOperationEvent) {
                  handleClientOperation((ClientOperationEvent) event);
              }
          }
          
          private void handleClientOperation(ClientOperationEvent event) {
              Service service = event.getService();
              String clientId = event.getClientId();
              if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
                  // 注冊
                  addPublisherIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
                  // 取消注冊
                  removePublisherIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
                  // 訂閱
                  addSubscriberIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
                  // 取消訂閱
                  removeSubscriberIndexes(service, clientId);
              }
          }

          建立長連接(這里的過程比較難一些,還在持續(xù)學(xué)習(xí)中)

          GrpcBiStreamRequestAcceptor這個(gè)類是建立連接的。

          每一個(gè)grpc請求過來后,都會(huì)進(jìn)入到GrpcBiStreamRequestAcceptor.requestBiStream的方法中。

          而會(huì)話的長連接id就是這里的ConnectionId。

          @Service
          public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
               @Autowired
              ConnectionManager connectionManager;
              
              private void traceDetailIfNecessary(Payload grpcRequest) {
                  String clientIp = grpcRequest.getMetadata().getClientIp();
                  String connectionId = CONTEXT_KEY_CONN_ID.get();
                  try {
                      if (connectionManager.traced(clientIp)) {
                          Loggers.REMOTE_DIGEST.info("[{}]Bi stream request receive, meta={},body={}", connectionId,
                                  grpcRequest.getMetadata().toByteString().toStringUtf8(),
                                  grpcRequest.getBody().toByteString().toStringUtf8());
                      }
                  } catch (Throwable throwable) {
                      Loggers.REMOTE_DIGEST.error("[{}]Bi stream request error,payload={},error={}", connectionId,
                              grpcRequest.toByteString().toStringUtf8(), throwable);
                  }
                  
              }
              @Override
              public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
                  
                  StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
                      
                      final String connectionId = CONTEXT_KEY_CONN_ID.get();
                      
                      final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
                      
                      final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
                      
                      String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
                      
                      String clientIp = "";
                      
                      @Override
                      public void onNext(Payload payload) {
                          
                          clientIp = payload.getMetadata().getClientIp();
                          traceDetailIfNecessary(payload);
                          
                          Object parseObj;
                          try {
                              parseObj = GrpcUtils.parse(payload);
                          } catch (Throwable throwable) {
                              Loggers.REMOTE_DIGEST
                                      .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
                              return;
                          }
                          
                          if (parseObj == null) {
                              Loggers.REMOTE_DIGEST
                                      .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
                                              payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
                              return;
                          }
                          if (parseObj instanceof ConnectionSetupRequest) {
                              ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
                              Map<String, String> labels = setUpRequest.getLabels();
                              String appName = "-";
                              if (labels != null && labels.containsKey(Constants.APPNAME)) {
                                  appName = labels.get(Constants.APPNAME);
                              }
                              
                              ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                                      remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                                      setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                              metaInfo.setTenant(setUpRequest.getTenant());
                              Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
                              connection.setAbilities(setUpRequest.getAbilities());
                              boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
                              // 這里會(huì)有一個(gè)connectionManager.register
                              if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                                  //Not register to the connection manager if current server is over limit or server is starting.
                                  try {
                                      Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
                                              rejectSdkOnStarting ? " server is not started" : " server is over limited.");
                                      connection.request(new ConnectResetRequest(), 3000L);
                                      connection.close();
                                  } catch (Exception e) {
                                      //Do nothing.
                                      if (connectionManager.traced(clientIp)) {
                                          Loggers.REMOTE_DIGEST
                                                  .warn("[{}]Send connect reset request error,error={}", connectionId, e);
                                      }
                                  }
                              }
                              
                          } else if (parseObj instanceof Response) {
                              Response response = (Response) parseObj;
                              if (connectionManager.traced(clientIp)) {
                                  Loggers.REMOTE_DIGEST
                                          .warn("[{}]Receive response of server request  ,response={}", connectionId, response);
                              }
                              RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
                              connectionManager.refreshActiveTime(connectionId);
                          } else {
                              Loggers.REMOTE_DIGEST
                                      .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
                                              parseObj);
                          }
                          
                      }
                      
                      @Override
                      public void onError(Throwable t) {
                          if (connectionManager.traced(clientIp)) {
                              Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
                          }
                          
                          if (responseObserver instanceof ServerCallStreamObserver) {
                              ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                              if (serverCallStreamObserver.isCancelled()) {
                                  //client close the stream.
                              } else {
                                  try {
                                      serverCallStreamObserver.onCompleted();
                                  } catch (Throwable throwable) {
                                      //ignore
                                  }
                              }
                          }
                          
                      }
                      
                      @Override
                      public void onCompleted() {
                          if (connectionManager.traced(clientIp)) {
                              Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
                          }
                          if (responseObserver instanceof ServerCallStreamObserver) {
                              ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                              if (serverCallStreamObserver.isCancelled()) {
                                  //client close the stream.
                              } else {
                                  try {
                                      serverCallStreamObserver.onCompleted();
                                  } catch (Throwable throwable) {
                                      //ignore
                                  }
                                  
                              }
                          }
                      }
                  };
                  
                  return streamObserver;
              }
          }
          • ConnectionManager.register

            這里的connections是用來管理所有的長連接的文章來源地址http://www.zghlxwxcb.cn/news/detail-712805.html

          • Map<String, Connection> connections = new ConcurrentHashMap<>();
            
            public synchronized boolean register(String connectionId, Connection connection) {
            
                if (connection.isConnected()) {
                    String clientIp = connection.getMetaInfo().clientIp;
                    if (connections.containsKey(connectionId)) {
                        return true;
                    }
                    if (checkLimit(connection)) {
                        return false;
                    }
                    if (traced(clientIp)) {
                        connection.setTraced(true);
                    }
                    connections.put(connectionId, connection);
                    if (!connectionForClientIp.containsKey(clientIp)) {
                        connectionForClientIp.put(clientIp, new AtomicInteger(0));
                    }
                    connectionForClientIp.get(clientIp).getAndIncrement();
            
                    clientConnectionEventListenerRegistry.notifyClientConnected(connection);
            
                    LOGGER.info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
                                connection);
                    return true;
            
                }
                return false;
            }

到了這里,關(guān)于springcloud/springboot集成NACOS 做注冊和配置中心以及nacos源碼分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Spring Cloud Gateway集成Nacos作為注冊中心和配置中心

    本篇文章將介紹Spring Cloud Alibaba體系下Spring Cloud Gateway的搭建,服務(wù)注冊中心和分布式配置中心使用Nacos,后續(xù)將會(huì)持續(xù)更新,介紹集成Sentinel,如何做日志鏈路追蹤,如何做全鏈路灰度發(fā)布設(shè)計(jì),以及Spring Cloud Gateway的擴(kuò)展等。 ? Spring Boot,Spring Cloud,Discovery,Config等基礎(chǔ)依

    2024年02月11日
    瀏覽(506)
  • .net core 6 集成nacos的服務(wù)注冊和配置中心

    .net core 6 集成nacos的服務(wù)注冊和配置中心

    1、安裝nuget包 2、加上配置文件 ????????注意: ????????\\\"ConfigUseRpc\\\": false ????????\\\"NamingUseRpc\\\": false ????????http連接選false否則配置中心可能會(huì)獲取不到內(nèi)容 3、注冊 啟動(dòng)后 4、使用服務(wù)發(fā)現(xiàn) 5、注冊配置中心 隨意添加配置 6、使用配置中心 7、配置變化的監(jiān)聽方法

    2024年01月16日
    瀏覽(24)
  • springcloud使用nacos搭建注冊中心

    springcloud使用nacos搭建注冊中心

    nacos安裝這里就不細(xì)說了,(Nacos下載以及搭建環(huán)境_你非檸檬為何心酸142的博客-CSDN博客) 大家也可以去網(wǎng)上安裝好,這里主要講搭建 ,我們需要手動(dòng)啟動(dòng)nacos, 輸入(.startup.cmd?-m?standalone),出現(xiàn)一下圖標(biāo)就代表ok ?下面是我的pom.xml文件 ?首先是父工程所需要的依賴,需要注意的

    2024年02月08日
    瀏覽(90)
  • SpringCloud 注冊中心(Nacos)快速入門

    SpringCloud 注冊中心(Nacos)快速入門

    作者:大三的土狗 專欄:SpringCloud ??Nacos一個(gè)更易于構(gòu)建云原生應(yīng)用的動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、配置管理和服務(wù)管理平臺(tái),目前來看還是大多數(shù)公司使用Nacos多于Eureka。 ??Nacos 致力于幫助您發(fā)現(xiàn)、配置和管理微服務(wù)。Nacos 提供了一組簡單易用的特性集,幫助您快速實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)

    2024年02月03日
    瀏覽(89)
  • SpringCloud Alibaba(一)微服務(wù)簡介+Nacos的安裝部署與使用+Nacos集成springboot實(shí)現(xiàn)服務(wù)注冊+Feign實(shí)現(xiàn)服務(wù)之間的遠(yuǎn)程調(diào)用+負(fù)載均衡+領(lǐng)域劃分

    SpringCloud Alibaba(一)微服務(wù)簡介+Nacos的安裝部署與使用+Nacos集成springboot實(shí)現(xiàn)服務(wù)注冊+Feign實(shí)現(xiàn)服務(wù)之間的遠(yuǎn)程調(diào)用+負(fù)載均衡+領(lǐng)域劃分

    目錄 一.認(rèn)識(shí)微服務(wù) 1.0.學(xué)習(xí)目標(biāo) 1.1.單體架構(gòu) 單體架構(gòu)的優(yōu)缺點(diǎn)如下: 1.2.分布式架構(gòu) 分布式架構(gòu)的優(yōu)缺點(diǎn): 1.3.微服務(wù) 微服務(wù)的架構(gòu)特征: 1.4.SpringCloud 1.5Nacos注冊中心 1.6.總結(jié) 二、Nacos基本使用安裝部署+服務(wù)注冊 (一)linux安裝包方式單節(jié)點(diǎn)安裝部署 1. jdk安裝配置 2. na

    2024年02月09日
    瀏覽(29)
  • SpringCloud(17~21章):Alibaba入門簡介、Nacos服務(wù)注冊和配置中心、Sentinel實(shí)現(xiàn)熔斷與限流、Seata處理分布式事務(wù)

    SpringCloud(17~21章):Alibaba入門簡介、Nacos服務(wù)注冊和配置中心、Sentinel實(shí)現(xiàn)熔斷與限流、Seata處理分布式事務(wù)

    Spring Cloud Netflix項(xiàng)目進(jìn)入維護(hù)模式 https://spring.io/blog/2018/12/12/spring-cloud-greenwich-rc1-available-now 說明 Spring Cloud Netflix Projects Entering Maintenance Mode 什么是維護(hù)模式 將模塊置于維護(hù)模式,意味著 Spring Cloud 團(tuán)隊(duì)將不會(huì)再向模塊添加新功能。我們將修復(fù) block 級別的 bug 以及安全問題,我

    2024年01月19日
    瀏覽(42)
  • SpringCloud(2) 注冊中心Eureka、Nacos

    SpringCloud(2) 注冊中心Eureka、Nacos

    注冊中心是微服務(wù)中必須要使用的組件,考察我們使用微服務(wù)的程度。 注冊中心的核心作用是:服務(wù)注冊和發(fā)現(xiàn)。 常見的注冊中心: Eureka、Nacos 、Zookeeper 下面我們以 Eureka 注冊中心為例,說一下注冊中心的作用: 假如我們有一個(gè)訂單服務(wù) order-service ,需要消費(fèi)用戶服務(wù) u

    2024年02月11日
    瀏覽(16)
  • springcloud整合nacos實(shí)現(xiàn)注冊發(fā)現(xiàn)中心

    springcloud整合nacos實(shí)現(xiàn)注冊發(fā)現(xiàn)中心

    高可用性:Nacos是一個(gè)高可用的注冊中心,它支持多節(jié)點(diǎn)部署和集群模式,保證了服務(wù)的穩(wěn)定性和可用性。當(dāng)某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),其他節(jié)點(diǎn)可以接管服務(wù)注冊和發(fā)現(xiàn)的功能,確保系統(tǒng)的正常運(yùn)行。 動(dòng)態(tài)配置:Nacos不僅可以作為注冊中心,還提供了配置中心的功能。Spring Clo

    2024年02月16日
    瀏覽(23)
  • 【微服務(wù)】SpringCloud-Nacos注冊中心

    【微服務(wù)】SpringCloud-Nacos注冊中心

    ?? 博客主頁: ??@不會(huì)壓彎的小飛俠 ? 歡迎關(guān)注: ?? 點(diǎn)贊 ?? 收藏 ? 留言 ? ? 系列專欄: ??SpringCloud專欄 ? 知足上進(jìn),不負(fù)野心。 ?? 歡迎大佬指正,一起學(xué)習(xí)!一起加油! Nacos是SpringCloudAlibaba的組件,而SpringCloudAlibaba也遵循SpringCloud中定義的服務(wù)注冊、服務(wù)發(fā)現(xiàn)

    2024年02月02日
    瀏覽(27)
  • SpringCloud實(shí)用篇1——eureka注冊中心 Ribbon負(fù)載均衡原理 nacos注冊中心

    SpringCloud實(shí)用篇1——eureka注冊中心 Ribbon負(fù)載均衡原理 nacos注冊中心

    單體架構(gòu): 將業(yè)務(wù)的所有功能集中在一個(gè)項(xiàng)目中開發(fā),打成一個(gè)包部署。 優(yōu)點(diǎn):架構(gòu)簡單;部署成本低(打jar包、部署、負(fù)載均衡就完成了) 缺點(diǎn):耦合度高(維護(hù)困難、升級困難,不利于大項(xiàng)目開發(fā)) 分布式架構(gòu) 根據(jù)業(yè)務(wù)功能對系統(tǒng)做拆分,每個(gè)業(yè)務(wù)功能模塊作為獨(dú)立

    2024年02月13日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包