從啟動(dòng)命令flink-daemon.sh中可以看出StandaloneSession入口類為org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 從該類的main方法會(huì)進(jìn)入ClusterEntrypoint::runCluster中, 該方法中會(huì)創(chuàng)建出主要服務(wù)和組件。
StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
initializeServices(configuration, pluginManager); //初始化服務(wù)
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
//創(chuàng)建核心組件
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
...ignore code
}
}
可以看出關(guān)鍵代碼是調(diào)用initializeServices以及創(chuàng)建Cluster Component。
protected void initializeServices(Configuration configuration, PluginManager pluginManager)
throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
rpcSystem = RpcSystem.load(configuration);
commonRpcService =
RpcUtils.createRemoteRpcService(
rpcSystem,
configuration,
configuration.getString(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.getString(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration, commonRpcService.getAddress(), rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
}
}
在initializeServices中首先創(chuàng)建commonRpcService,這個(gè)RPCService實(shí)例是JobManager提供RPC服務(wù)的核心,可以看出它會(huì)有個(gè)地址和監(jiān)聽端口號(hào),commonRpcService可將繼承自Gateway的服務(wù)實(shí)例包裝成AkkaActor對(duì)外提供RPC服務(wù),比如ResourceManager、Dispatcher。此外還創(chuàng)建了其他服務(wù):
haService: 可通過HAService獲取ResourceManager/Dispatcher/RestEndpoint的地址,同時(shí)也提供選主服務(wù),組件啟動(dòng)時(shí)需向HAService注冊(cè),如果被選主成功,則會(huì)調(diào)用監(jiān)聽器的grandLeadership回調(diào)函數(shù)
BlobServer: 可用來提供存儲(chǔ)大對(duì)象存儲(chǔ)服務(wù)
heartbeatServices:為組件間傳遞心跳信息
metricRegistry:提供metric上報(bào)和查詢服務(wù),監(jiān)聽端口不同,新建了一個(gè)RpcService專為Metric服務(wù)
processMetricGroup:注冊(cè)系統(tǒng)運(yùn)行狀態(tài)信息的Metric,比如GC/Memory/Network運(yùn)行時(shí)狀況,添加Metric都是通過一個(gè)MetricGroup添加
executionGraphInfoStore:緩存Job執(zhí)行時(shí)信息,比如ExecutionGrap
初始化服務(wù)創(chuàng)建完成后,通過DefaultDispatcherResourceManagerComponentFactory:create創(chuàng)建JobManager的三大核心組件:Dispacher/ResourceManager/RestEndpointServer, 都是通過工廠方法創(chuàng)建:
DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory
這些組件是JobManager向HAService注冊(cè)獲取leadership后,被ElectionService回調(diào)grantLeadership函數(shù)中創(chuàng)建出具體組件實(shí)例。
RestServer
RestServer并不是一個(gè)RPCServer,沒有繼承RpcGateway,只提供HTTP接口服務(wù),然后將請(qǐng)求轉(zhuǎn)交給Dispatcher處理,它的生成啟動(dòng)流程如下:
SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通過Netty啟動(dòng)Rest服務(wù)
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler處理客戶端提交Job
WebMonitorEndpoint::initializeHandlers //關(guān)聯(lián)Rest請(qǐng)求的Header和Handler
WebMonitorEndpoint::startInternal //競(jìng)選leader
ResourceManager
RM生成啟動(dòng)過程是ResourceManagerServiceImpl先競(jìng)選leader成功后再創(chuàng)建出具體的ResourceManager
ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//調(diào)用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start
Dispatcher
Dispacher生成啟動(dòng)過程是DefaultDispatcherRunner選主后再創(chuàng)建出具體實(shí)例文章來源:http://www.zghlxwxcb.cn/news/detail-637379.html
DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//創(chuàng)建SessionDispatcherLeaderProcess并調(diào)用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//創(chuàng)建Dispatcher并調(diào)用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart
總結(jié)
JobManager的啟動(dòng)過程就是創(chuàng)建三大組件RestServer/RM/Dispacher實(shí)例初始化的過程,RestSever通過Netty啟動(dòng)HTTP服務(wù),RM/Dispacher被AkkaRpcService包裝成AkkaActor提供本地或遠(yuǎn)程RPC服務(wù),RestServer僅僅是接受請(qǐng)求解析消息后由具體Handler處理,JobGrap提交執(zhí)行會(huì)轉(zhuǎn)發(fā)給Dispatcher處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-637379.html
到了這里,關(guān)于Flink源碼之JobManager啟動(dòng)流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!