国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink源碼之JobManager啟動(dòng)流程

這篇具有很好參考價(jià)值的文章主要介紹了Flink源碼之JobManager啟動(dòng)流程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

從啟動(dòng)命令flink-daemon.sh中可以看出StandaloneSession入口類為org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 從該類的main方法會(huì)進(jìn)入ClusterEntrypoint::runCluster中, 該方法中會(huì)創(chuàng)建出主要服務(wù)和組件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager); //初始化服務(wù)

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);
		//創(chuàng)建核心組件
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        this);
	...ignore code
    }
}

可以看出關(guān)鍵代碼是調(diào)用initializeServices以及創(chuàng)建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)
        throws Exception {

    LOG.info("Initializing cluster services.");

    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        commonRpcService =
                RpcUtils.createRemoteRpcService(
                        rpcSystem,
                        configuration,
                        configuration.getString(JobManagerOptions.ADDRESS),
                        getRPCPortRange(configuration),
                        configuration.getString(JobManagerOptions.BIND_HOST),
                        configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        // update the configuration used to create the high availability services
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        ioExecutor =
                Executors.newFixedThreadPool(
                        ClusterEntrypointUtils.getPoolSize(configuration),
                        new ExecutorThreadFactory("cluster-io"));
        haServices = createHaServices(configuration, ioExecutor, rpcSystem);
        blobServer = new BlobServer(configuration, haServices.createBlobStore());
        blobServer.start();
        heartbeatServices = createHeartbeatServices(configuration);
        metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

        final RpcService metricQueryServiceRpcService =
                MetricUtils.startRemoteMetricsRpcService(
                        configuration, commonRpcService.getAddress(), rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, null);

        final String hostname = RpcUtils.getHostname(commonRpcService);

        processMetricGroup =
                MetricUtils.instantiateProcessMetricGroup(
                        metricRegistry,
                        hostname,
                        ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                configuration));

        executionGraphInfoStore =
                createSerializableExecutionGraphStore(
                        configuration, commonRpcService.getScheduledExecutor());
    }
}

在initializeServices中首先創(chuàng)建commonRpcService,這個(gè)RPCService實(shí)例是JobManager提供RPC服務(wù)的核心,可以看出它會(huì)有個(gè)地址和監(jiān)聽端口號(hào),commonRpcService可將繼承自Gateway的服務(wù)實(shí)例包裝成AkkaActor對(duì)外提供RPC服務(wù),比如ResourceManager、Dispatcher。此外還創(chuàng)建了其他服務(wù):

haService: 可通過HAService獲取ResourceManager/Dispatcher/RestEndpoint的地址,同時(shí)也提供選主服務(wù),組件啟動(dòng)時(shí)需向HAService注冊(cè),如果被選主成功,則會(huì)調(diào)用監(jiān)聽器的grandLeadership回調(diào)函數(shù)
BlobServer: 可用來提供存儲(chǔ)大對(duì)象存儲(chǔ)服務(wù)
heartbeatServices:為組件間傳遞心跳信息
metricRegistry:提供metric上報(bào)和查詢服務(wù),監(jiān)聽端口不同,新建了一個(gè)RpcService專為Metric服務(wù)
processMetricGroup:注冊(cè)系統(tǒng)運(yùn)行狀態(tài)信息的Metric,比如GC/Memory/Network運(yùn)行時(shí)狀況,添加Metric都是通過一個(gè)MetricGroup添加
executionGraphInfoStore:緩存Job執(zhí)行時(shí)信息,比如ExecutionGrap

初始化服務(wù)創(chuàng)建完成后,通過DefaultDispatcherResourceManagerComponentFactory:create創(chuàng)建JobManager的三大核心組件:Dispacher/ResourceManager/RestEndpointServer, 都是通過工廠方法創(chuàng)建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

這些組件是JobManager向HAService注冊(cè)獲取leadership后,被ElectionService回調(diào)grantLeadership函數(shù)中創(chuàng)建出具體組件實(shí)例。

RestServer

RestServer并不是一個(gè)RPCServer,沒有繼承RpcGateway,只提供HTTP接口服務(wù),然后將請(qǐng)求轉(zhuǎn)交給Dispatcher處理,它的生成啟動(dòng)流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通過Netty啟動(dòng)Rest服務(wù)
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler處理客戶端提交Job
WebMonitorEndpoint::initializeHandlers //關(guān)聯(lián)Rest請(qǐng)求的Header和Handler
WebMonitorEndpoint::startInternal //競(jìng)選leader

ResourceManager

RM生成啟動(dòng)過程是ResourceManagerServiceImpl先競(jìng)選leader成功后再創(chuàng)建出具體的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//調(diào)用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成啟動(dòng)過程是DefaultDispatcherRunner選主后再創(chuàng)建出具體實(shí)例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//創(chuàng)建SessionDispatcherLeaderProcess并調(diào)用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//創(chuàng)建Dispatcher并調(diào)用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

總結(jié)

Flink源碼之JobManager啟動(dòng)流程,BigData,flink,大數(shù)據(jù)
JobManager的啟動(dòng)過程就是創(chuàng)建三大組件RestServer/RM/Dispacher實(shí)例初始化的過程,RestSever通過Netty啟動(dòng)HTTP服務(wù),RM/Dispacher被AkkaRpcService包裝成AkkaActor提供本地或遠(yuǎn)程RPC服務(wù),RestServer僅僅是接受請(qǐng)求解析消息后由具體Handler處理,JobGrap提交執(zhí)行會(huì)轉(zhuǎn)發(fā)給Dispatcher處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-637379.html

到了這里,關(guān)于Flink源碼之JobManager啟動(dòng)流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink JobManager的高可用配置

    Flink JobManager的高可用配置

    在flink執(zhí)行中,jobManager是一個(gè)負(fù)責(zé)執(zhí)行流式應(yīng)用執(zhí)行和檢查點(diǎn)生成的組件,一旦發(fā)生故障,那么其負(fù)責(zé)的所有應(yīng)用都會(huì)被取消,所以我們需要對(duì)JobManager配置高可用的模式 配置JobManager的高可用需要使用到zookeeper,一方面zookeeper可以進(jìn)行領(lǐng)導(dǎo)的選舉工作,這樣備用jobmanager就可以

    2024年02月09日
    瀏覽(19)
  • flink任務(wù)內(nèi)存調(diào)優(yōu),TaskManager、JobManager內(nèi)存配置

    flink任務(wù)內(nèi)存調(diào)優(yōu),TaskManager、JobManager內(nèi)存配置

    ????????Flink是基于java的JVM運(yùn)行,擁有高效的數(shù)據(jù)處理能力,但是考慮到用戶在 Flink 上運(yùn)行的應(yīng)用的多樣性,盡管flink框架已經(jīng)為所有配置項(xiàng)提供合理的默認(rèn)值,仍無法滿足所有情況下的需求。 為了給用戶生產(chǎn)提供最大化的價(jià)值, Flink 允許用戶在整體上以及細(xì)粒度上對(duì)集

    2024年02月06日
    瀏覽(37)
  • Apache Hudi初探(二)(與flink的結(jié)合)--flink寫hudi的操作(JobManager端的提交操作)

    Apache Hudi初探(二)(與flink的結(jié)合)--flink寫hudi的操作(JobManager端的提交操作)

    在Apache Hudi初探(一)(與flink的結(jié)合)中,我們提到了 Pipelines.hoodieStreamWrite 寫hudi文件 ,這個(gè)操作真正寫hudi是在 Pipelines.hoodieStreamWrite 方法下的 transform(opName(\\\"stream_write\\\", conf), TypeInformation.of(Object.class), operatorFactory) ,具體分析一下寫入的過程。 對(duì)于 transform(opName(\\\"stream_write\\\", conf), Ty

    2024年02月12日
    瀏覽(23)
  • Docker中flink-cluster-jobmanager-1運(yùn)行失敗解決方法

    這是flink-cluster-jobmanager-1報(bào)錯(cuò)日志 2023-12-07 23:34:26 [ERROR] The execution result is empty. 2023-12-07 23:34:25 Starting Job Manager 2023-12-07 23:34:26 [ERROR] Could not get JVM parameters and dynamic configurations properly. 2023-12-07 23:34:26 [ERROR] Raw output from BashJavaUtils: 2023-12-07 23:34:26 INFO [] - Loading configuration property:

    2024年02月03日
    瀏覽(18)
  • flink源碼分析 - flink命令啟動(dòng)分析

    flink版本: flink-1.12.1 源碼位置:? flink-dist/src/main/flink-bin/bin/flink flink命令源碼: 首先講第一段: 工作中,很多人喜歡用符號(hào)鏈接(軟連接)去將原始命令鏈接到一個(gè)新的文件。 例如:? 將 /home/aaa鏈接到/opt/soft/flink-1.12.1/bin/flink,? 實(shí)際使用的時(shí)候就可以用 aaa去代替flink命令。 例如

    2024年01月18日
    瀏覽(20)
  • 說說Flink on yarn的啟動(dòng)流程

    說說Flink on yarn的啟動(dòng)流程

    核心流程 FlinkYarnSessionCli 啟動(dòng)的過程中首先會(huì)檢查Yarn上有沒有足夠的資源去啟動(dòng)所需要的container,如果有,則 上傳一些flink的jar和配置文件到HDFS ,這里主要是啟動(dòng)AM進(jìn)程和TaskManager進(jìn)程的相關(guān)依賴jar包和配置文件。 接著yarn client會(huì)首先向RM 申請(qǐng)一個(gè)container來作為ApplicationMas

    2024年02月10日
    瀏覽(26)
  • 深入理解 Flink(五)Flink Standalone 集群?jiǎn)?dòng)源碼剖析

    深入理解 Flink(五)Flink Standalone 集群?jiǎn)?dòng)源碼剖析

    深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達(dá)鏈接: 深入理解 Flink (一)Flink 架構(gòu)設(shè)計(jì)原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯(cuò)深入分析 深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級(jí)原理詳解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    瀏覽(32)
  • Flink源碼之Checkpoint執(zhí)行流程

    Flink源碼之Checkpoint執(zhí)行流程

    Checkpoint完整流程如上圖所示: JobMaster的CheckpointCoordinator向所有SourceTask發(fā)送RPC觸發(fā)一次CheckPoint SourceTask向下游廣播CheckpointBarrier SouceTask完成狀態(tài)快照后向JobMaster發(fā)送快照結(jié)果 非SouceTask在Barrier對(duì)齊后完成狀態(tài)快照向JobMaster發(fā)送快照結(jié)果 JobMaster保存SubTask快照結(jié)果 JobMaster收到所

    2024年02月11日
    瀏覽(44)
  • Flink源碼之State創(chuàng)建流程

    Flink源碼之State創(chuàng)建流程

    StreamOperatorStateHandler 在StreamTask啟動(dòng)初始化時(shí)通過StreamTaskStateInitializerImpl::streamOperatorStateContext會(huì)為每個(gè)StreamOperator 創(chuàng)建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個(gè)StreamOperatorStateHandler成員變量,調(diào)用AbstractStreamOperator::initializeState方法中會(huì)初始化StreamOperatorStateH

    2024年02月12日
    瀏覽(21)
  • Flink window 源碼分析1:窗口整體執(zhí)行流程

    Flink window 源碼分析1:窗口整體執(zhí)行流程

    注:本文源碼為flink 1.18.0版本。 其他相關(guān)文章: Flink window 源碼分析1:窗口整體執(zhí)行流程 Flink window 源碼分析2:Window 的主要組件 Flink window 源碼分析3:WindowOperator Flink window 源碼分析4:WindowState Window 本質(zhì)上就是借助狀態(tài)后端緩存著一定時(shí)間段內(nèi)的數(shù)據(jù),然后在達(dá)到某些條件

    2024年01月16日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包