深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達鏈接:
深入理解 Flink (一)Flink 架構(gòu)設(shè)計原理
深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯深入分析
深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級原理詳解
深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析
深入理解 Flink (五)Flink Standalone 集群啟動源碼剖析
深入理解 Flink (六)Flink Job 提交和 Flink Graph 詳解
深入理解 Flink (七)Flink Slot 管理詳解
深入理解 Flink (八)Flink Task 部署初始化和啟動詳解
前言
Flink 集群的邏輯概念:
JobManager(StandaloneSessionClusterEntrypoint) + TaskManager(TaskManagerRunner)
Flink 集群的物理概念:
ResourceManager(管理集群所有資源,管理集群所有從節(jié)點) + TaskExecutor(管理從節(jié)點資源,接收 Task 部署執(zhí)行)
在 Flink 不同的部署模式下(Standalone、YARN、K8S 等)只是最外層的封裝略有區(qū)別,實際運行的內(nèi)核并無差異。因此本文以 Standalone 集群為例,剖析 Flink 集群的啟動源碼。
Flink 集群啟動腳本分析
Flink 集群的啟動腳本位于 flink-dist 子項目中,flink-bin 下的 bin 目錄:
start-cluster.sh
根據(jù)具體組件的不同,腳本會按照以下流程執(zhí)行:
Flink 主節(jié)點 StandaloneSessionClusterEntrypoint 啟動源碼分析
JobManager 是 Flink 集群的主節(jié)點,它包含三大重要的組件:
1、ResourceManager
Flink 的集群資源管理器,只有一個,關(guān)于 slot 的管理和申請等工作,都由它負責
2、DispatcherRunner
負責接收用戶提交的 JobGragh, 然后啟動一個 JobMaster, JobMaster 類似于 YARN 集群中的 AppMaster 角色,類似于 Spark Job 中的 Driver 角色。內(nèi)部有一個持久服務(wù):JobGraghStore,用來存儲提交到 JobManager 中的 Job 的信息,也可以用作主節(jié)點宕機之后做 job 恢復之用。
3、WebMonitorEndpoint
里面維護了很多很多的 Handler,也還會啟動一個 Netty 服務(wù)端,用來接收外部的 rest 請求。如果客戶端通過 flink run 的方式來提交一個 job 到 flink 集群,最終是由 WebMonitorEndpoint 來接收處理,經(jīng)過路由解析處理之后決定使用哪一個 Handler 來執(zhí)行處理。Router 路由器 綁定了一大堆 Handler,例如:submitJob ===> JobSubmitHandler。
這里簡單說明一下 Flink 的資源管理架構(gòu),后續(xù)章節(jié)會展開詳述:
ResourceManager: 全局資源管理者 => SlotManager
JobMaster: 資源使用者 => SlotPool
TaskExecutor:資源提供者 => TaskSlotTable
以上三者的內(nèi)部,都有一個專門用來做 slot 管理的一個組件。對應(yīng)的要啟動這三個組件,都有一個對應(yīng)的 Factory,也就說,如果需要創(chuàng)建這些組件實例,那么都是通過這些 Factory 來創(chuàng)建。而這三個 Facotry 最終都會被封裝在一個 ComponentFactory 中。
StandaloneSessionClusterEntrypoint main 方法
// 入口,解析命令行參數(shù) 和 配置文件 flink-conf.yaml
StandaloneSessionClusterEntrypoint.main(){
ClusterEntrypoint.runClusterEntrypoint(entrypoint){
// 啟動插件組件,配置文件系統(tǒng)實例等
clusterEntrypoint.startCluster(){
runCluster(configuration, pluginManager){
// 第一步:初始化各種服務(wù)(8個基礎(chǔ)服務(wù))
// 比較重要的:HAService,BlobServer, RpcServices, HeatbeatServices,....
initializeServices(configuration, pluginManager);
// 第二步:創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的工廠實例
// 其實內(nèi)部包含了三個重要的成員變量:
// 創(chuàng)建 ResourceManager 的工廠實例
// 創(chuàng)建 DispatcherRunner 的工廠實例
// 創(chuàng)建 WebMonitorEndpoint 的工廠實例
createDispatcherResourceManagerComponentFactory(configuration);
// 第三步:創(chuàng)建 集群運行需要的一些組件:WebMonitorEndpoint,DispatcherRunner, ResourceManager 等
// 創(chuàng)建和啟動 ResourceManager
// 創(chuàng)建和啟動 DispatcherRunner
// 創(chuàng)建和啟動 WebMonitorEndpoint
clusterComponent = dispatcherResourceManagerComponentFactory.create(...);
}
}
}
}
基礎(chǔ)服務(wù)組件初始化
initializeServices(){
// 初始化和啟動 AkkaRpcService,內(nèi)部其實包裝了一個 ActorSystem
commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...);
// 啟動一個 JMXService,用于客戶端鏈接 JobManager JVM 進行監(jiān)控
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// 初始化一個負責 IO 的線程池, Flink 大量使用了 異步編程。
// 這個線程池的線程的數(shù)量,默認是:cpu core 個數(shù) * 4
ioExecutor = Executors.newFixedThreadPool(...);
// 初始化 HA 服務(wù)組件,負責 HA 服務(wù)的是:ZooKeeperHaServices
haServices = createHaServices(configuration, ioExecutor);
// 初始化 BlobServer 服務(wù)端
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices
heartbeatServices = createHeartbeatServices(configuration);
// 啟動 metrics(性能監(jiān)控) 相關(guān)的服務(wù),內(nèi)部也是啟動一個 ActorSystem
MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress());
// 初始化一個用來存儲 ExecutionGraph 的 Store, 實現(xiàn)是:FileArchivedExecutionGraphStore
archivedExecutionGraphStore = createSerializableExecutionGraphStore(...);
}
重要組件工廠實例初始化
DispatcherRunnerFactory,默認實現(xiàn):DefaultDispatcherRunnerFactory,生產(chǎn) DefaultDispatcherRunner
ResourceManagerFactory,默認實現(xiàn):StandaloneResourceManagerFactory,生產(chǎn) StandaloneResourceManager
RestEndpointFactory,默認實現(xiàn):SessionRestEndpointFactory,生產(chǎn) DispatcherRestEndpoint
三大重要組件初始化
Flink 源碼中,三大重要組件初始化按照一下流程進行:
三大重要組件初始化源碼解析
WebMonitorEndpoint 啟動和初始化源碼剖析
核心入口:
DispatcherResourceManagerComponentFactory.create(...)
啟動流程:
- 初始化一大堆 Handler 和 一個 Router,并且進行排序去重,之后,再把每個 Handler 注冊 到 Router 當中。
- 啟動一個 Netty 的服務(wù)端。
- 啟動內(nèi)部服務(wù):執(zhí)行競選。WebMonitorEndpoint 本身就是一個 LeaderContender 角色。如果競選成功,則回調(diào) isLeader() 方法。
- 競選成功,其實就只是把 WebMontiroEndpoint 的 address 以及跟 zookeeper 的 sessionID 寫入到 znode 中。
- 啟動一個關(guān)于 ExecutionGraph 的 Cache 的定時清理任務(wù)。
ResourceManager 啟動和初始化源碼剖析
核心入口:
DispatcherResourceManagerComponentFactory.create(...)
啟動流程:
1、ResourceManager 是 RpcEndpoint 的子類,所以在構(gòu)建 ResourceManager 對象完成之后,肯定會調(diào)用 start() 方法來啟動這個 RpcEndpoint,然后就跳轉(zhuǎn)到它的 onStart() 方法執(zhí)行。
2、ResourceManager 是 LeaderContender 的子類,會通過 LeaderElectionService 參加競選,如果競選成功,則會回調(diào) isLeader() 方法。
3、啟動 ResourceManager 需要的一些服務(wù):
兩個心跳服務(wù)
ResourceManager 和 TaskExecutor 之間的心跳
ResourceManager 和 JobMaster 之間的心跳
兩個定時服務(wù)
checkTaskManagerTimeoutsAndRedundancy() 檢查 TaskExecutor 的超時
checkSlotRequestTimeouts() 檢查 SlotRequest 超時
DispatcherRunner 啟動和初始化源碼剖析
核心入口:
DispatcherResourceManagerComponentFactory.create(...)
啟動流程:
1、啟動 JobGraphStore 服務(wù)
2、從 JobGraphStrore 恢復執(zhí)行 Job, 要啟動 Dispatcher
從節(jié)點 TaskManagerRunner 啟動源碼分析
TaskManager 是 Flink 的 worker 節(jié)點,負責 Flink 中本機 slot 資源的管理以及具體 task 的執(zhí)行。
TaskManager 上的基本資源單位是 slot,一個作業(yè)的 task 最終會部署在一個 TaskManager 的 slot 上運行,TaskManager 會負責維護本地的 slot 資源列表,并與 Flink Master 和 JobMaster 通信。
// 核心啟動入口
TaskManagerRunner.main(args){
runTaskManagerSecurely(args, ResourceID.generate()){
// 加載配置:解析 args 和 flink-conf.yaml 得到配置信息
Configuration configuration = loadConfiguration(args);
// 啟動 TaskManager
// 在Flink 當中,所有的組件(跟資源有關(guān))都有一個 ResourceID
// 后續(xù)還會見到很多的類似的ID的概念:AllocationID
runTaskManagerSecurely(configuration, resourceID){
// 啟動 TaskManager
// 這個具體實現(xiàn)是:首先初始化 TaskManagerRunner, TaskManager 啟動中,要初始化的一些服務(wù),都是在這個構(gòu)造方法里面!
// 最后,再調(diào)用 TaskManagerRunner.start() 來啟動,然后跳轉(zhuǎn)到 TaskExecutor 的 onStart() 開啟注冊。
runTaskManager(configuration, resourceID, pluginManager){
// 第一步:構(gòu)建 TaskManagerRunner 實例
// 具體實現(xiàn)中也做了兩件事:
// 第一件事: 初始化了一個 TaskManagerServices 對象! 其實這個動作就類似于 JobManager 啟動的時候的第一件大事(啟動8個服務(wù))
// 第二件是: 初始化 TaskExecutor(Standalone 集群中提供資源的角色,ResourceManager 其實就是管理集群中的從節(jié)點的管理角色)
// TaskExecutor 它是一個 RpcEndpoint,意味著,當 TaskExecutor 實例構(gòu)造完畢之后,啟動 RPC 服務(wù)就會跳轉(zhuǎn)到 onStart() 方法
taskManagerRunner = new TaskManagerRunner(...){
// 初始化一個線程池 ScheduledThreadPoolExecutor 用于處理回調(diào)
this.executor = Executors.newScheduledThreadPool(....)
// 獲取高可用模式:ZooKeeperHaServices
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...)
// 初始化 JMXServer 服務(wù)
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// 創(chuàng)建 RPC 服務(wù)
rpcService = createRpcService(configuration, highAvailabilityServices);
// 創(chuàng)建心跳服務(wù)
heartbeatServices = HeartbeatServices.fromConfiguration(conf);
// 創(chuàng)建 BlobCacheService,內(nèi)部會啟動兩個定時任務(wù):PermanentBlobCleanupTask 和 TransientBlobCleanupTask
blobCacheService = new BlobCacheService(....);
// 創(chuàng)建 TaskExecutorService,內(nèi)部其實就是創(chuàng)建 TaskExecutor 并且啟動,詳細內(nèi)容如下一部分闡述。
taskExecutorService = taskExecutorServiceFactory.createTaskExecutor(....){
// 創(chuàng)建 TaskExecutorToServiceAdapter,內(nèi)部封裝 TaskExecutor,它是 TaskManagerRunner 的成員變量
TaskManagerRunner::createTaskExecutorService;
}
}
// 第二步:啟動 TaskManagerRunner,然后跳轉(zhuǎn)到 TaskExecutor 中的 onStart() 方法
taskManagerRunner.start(){
taskExecutor.start();
}
}
}
}
}
TaskManager/TaskExecutor 注冊
TaskManager 是一個邏輯抽象,代表一臺服務(wù)器,這臺服務(wù)器的啟動,必然會包含一些服務(wù),另外再包含一個 TaskExecutor,存在于 TaskManager 的內(nèi)部,真實的幫助 TaskManager 完成各種核心操作,比如:
1、部署和執(zhí)行 StreamTask
2、管理和分配 slot
監(jiān)聽和獲取 ResourceManager 的地址
核心入口為:resourceManagerLeaderRetriever 的 start() 方法,具體實現(xiàn)方式見前面章節(jié):
https://blog.csdn.net/weixin_44512041/article/details/135493920
在注冊監(jiān)聽之后,如果發(fā)生了對應(yīng)的事件,則會收到一個響應(yīng),然后回調(diào):
ResourceManagerLeaderListener.notifyLeaderAddress();
內(nèi)部詳細實現(xiàn):
// 關(guān)閉原有的 ResouceManager 的鏈接
closeResourceManagerConnection(cause);
// 開啟注冊超時的延時調(diào)度任務(wù)
startRegistrationTimeout();
// 當前 TaskExecutor 完成和 ResourceManager 的鏈接
tryConnectToResourceManager();
最重要的是第三步,TaskExecutor 和 ResourceManager 建立連接,會進行注冊,心跳,Slot 匯報 三件大事。
TaskExecutor 開始注冊
核心入口:
TaskExecutorToResourceManagerConnection.start();
TaskExecutor 注冊失敗
核心入口:
TaskExecutorToResourceManagerConnection.onRegistrationFailure(failure);
TaskExecutor 注冊成功
核心入口:
TaskExecutorToResourceManagerConnection.onRegistrationSuccess(result.f1);
TaskExecutor 進行 Slot 匯報
當注冊成功,ResourceManager 會返回 TaskExecutorRegistrationSuccess 對象。然后回調(diào)下面的方法,進入到 slot 匯報的過程。
TaskExecutorToResourceManagerConnection.onRegistrationSuccess(TaskExecutorRegistrationSuccess success);
// 繼續(xù)回調(diào)
ResourceManagerRegistrationListener.onRegistrationSuccess(this, success);
// 封裝鏈接對象
establishResourceManagerConnection(resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, ....);
// 內(nèi)部實現(xiàn)
resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()
);
TaskExecutor 和 ResourceManager 心跳
Flink 中 ResourceManager、JobMaster、TaskExecutor 三者之間存在相互檢測的心跳機制,ResourceManager 會主動發(fā)送請求探測 JobMaster、TaskExecutor 是否存活,JobMaster 也會主動發(fā)送請求探測 TaskExecutor 是否存活,以便進行任務(wù)重啟或者失敗處理。
假定心跳系統(tǒng)中有兩種節(jié)點:sender 和 receiver。心跳機制是 sender 和 receivers 彼此相互檢測。但是檢測動作是 Sender 主動發(fā)起,即 Sender 主動發(fā)送請求探測 receiver 是否存活,因為 Sender 已經(jīng)發(fā)送過來了探測心跳請求,所以這樣 receiver 同時也知道 Sender 是存活的,然后 Reciver 給 Sender 回應(yīng)一個心跳表示自己也是活著的。具體表現(xiàn):
- Flink Sender 主動發(fā)送 Request 請求給 Receiver,要求 Receiver 回應(yīng)一個心跳;
- Flink Receiver 收到 Request 之后,通過 Receive 函數(shù)回應(yīng)一個心跳請求給 Sender;
ResourceManager 端心跳服務(wù)啟動
ResourceManager 在初始化的最后,執(zhí)行了:
ResourceManager.startHeartbeatServices();
啟動了兩個心跳服務(wù):
// 維持 TaskExecutor 和 ResourceManager 之間的心跳
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(),
getMainThreadExecutor(), log);
// 維持 JobMaster 和 ResourceManager 之間的心跳
jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(),
getMainThreadExecutor(), log);
具體是構(gòu)造了一個 HeartbeatManagerSenderImpl 實例對象,并且調(diào)用了:
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
heartbeatMonitor 內(nèi)部封裝了一個 heartbeatTarget,對于 ResourceManager 來說,每個注冊成功的 TaskExecutor 都會被構(gòu)建成一個 HeartbeatTarget ,然后構(gòu)建成一個 heartbeatMonitor。這個可以在 ResourceManager 端完成 TaskExecutor 注冊的時候進行驗證。
當 ResourceManager 端完成一個 TaskExecutor 的注冊的時候,馬上調(diào)用:
// 維持心跳
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// 給 TaskExecutor 發(fā)送心跳請求
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
這樣子,剛才注冊的 TaskExecutor 就先被封裝成一個 HeartbeatTarget, 然后被加入到 taskManagerHeartbeatManager 進行管理的時候,變成了 HeartbeatMonitor。當這句代碼完成執(zhí)行的時候,當前 ResourceManager 的心跳目標對象,就多了一個 TaskExecutor,然后當執(zhí)行:
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
就給 TaskExecutor 發(fā)送了一個心跳請求。
TaskExecutor 端心跳處理
當 TaskExecutor 接收到 ResourceManager 的心跳請求之后,進入內(nèi)部實現(xiàn):
TaskExecutor.heartbeatFromResourceManager(ResourceID resourceID);
// 內(nèi)部實現(xiàn)
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
// 內(nèi)部實現(xiàn)
reportHeartbeat(requestOrigin);
// 第一件事:進行心跳報告
heartbeatMonitor.reportHeartbeat();
// 記錄最后一次的心跳時間
lastHeartbeat = System.currentTimeMillis();
// 重設(shè)心跳超時相關(guān)的 時間 和 延遲調(diào)度任務(wù)
resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
// 先取消
cancelTimeout();
// 再重新調(diào)度
futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
// TaskExecutor 進行負載匯報
heartbeatTarget.receiveHeartbeat(.....);
// 給 ResourceManager 回復 TaskExecutor 的負載。
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
如果連續(xù) 5 次心跳請求沒有收到,也就是說,如果 50s 內(nèi)都沒有收到心跳請求,則執(zhí)行心跳超時處理。
heartbeatListener.notifyHeartbeatTimeout(resourceID);
超時處理也非常的暴力有效,F(xiàn)link 認為: 如果 TaskExecutor 收不到 ResourceManager 的心跳請求了,則認為當前 ResourceManager 死掉了。但是 Flink 集群肯定會有一個 active 的 ResourceManager 節(jié)點的。而且之前也注冊過監(jiān)聽,如果 Flink HA 集群的 Active 節(jié)點發(fā)生遷移,則 TaskExecutor 也一定已經(jīng)收到過通知了,然后現(xiàn)在需要做的,只是重新鏈接到新的 active ResourceManager 即可。文章來源:http://www.zghlxwxcb.cn/news/detail-788460.html
reconnectToResourceManager(
new TaskManagerException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))
);
TaskExecutor 向 ResourceManager 匯報負載
核心入口:HeartBeatManagerImpl 的 requestHeartbeat() 方法的最后一句代碼:文章來源地址http://www.zghlxwxcb.cn/news/detail-788460.html
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
到了這里,關(guān)于深入理解 Flink(五)Flink Standalone 集群啟動源碼剖析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!