從啟動命令flink-daemon.sh可以看出TaskManger入口類為org.apache.flink.runtime.taskexecutor.TaskManagerRunner
TaskManagerRunner::main
TaskManagerRunner::runTaskManagerProcessSecurely
TaskManagerRunner::runTaskManager //構造TaskManagerRunner并調用start()方法
TaskManagerRunner::new //核心
在TaskManagerRunner構造函數中,可以看出與JobManger類似,也是先構造出一些公共服務:
highAvailabilityServices//用于獲取JobManger的地址
rpcService //將TaskExecutor包裝為AkkaActor提供RPC服務
heartbeatServices //心跳服務,與JobManger通信
metricRegistry //metric服務,提供metric注冊和查詢
blobCacheService //緩存Blob
這些服務在構造TaskExecutor時作為構造函數參數傳入
構造TaskExecutor前會先構造TaskManagerServices輔助TaskExecutor實現(xiàn)其核心功能
TaskManagerRunner::createTaskExecutorService
TaskManagerRunner::startTaskManager // 構造MetricGroup和相關服務
TaskManagerServices.fromConfiguration//讀取TaskManger的配置信息啟動TaskManager相關服務
TaskExecutor::new //核心
啟動TaskEexector后會與ResouceManager建立連接,將自身信息注冊到RM后發(fā)送Slot報告給RM,具體調用鏈路如下:
TaskManagerRunner::start
TaskExecutorToServiceAdapter::start
TaskExecutor::start
TaskExecutor::onStart
TaskExecutor::startTaskExecutorServices //獲取ResourceManager地址后與ResourceManager建立連接,發(fā)送Slot報告
ResourceManagerLeaderListener::notifyLeaderAddress
TaskExecutor::notifyOfNewResourceManagerLeader
TaskExecutor::reconnectToResourceManager
TaskExecutor::tryConnectToResourceManager
TaskExecutor::connectToResourceManager
TaskExecutorToResourceManagerConnection::start
RegisteredRpcConnection::start
RegisteredRpcConnection::createNewRegistration
TaskExecutorToResourceManagerConnection::generateRegistration
RetryingRegistration::startRegistration //與resourcemanager建立連接
RetryingRegistration::register
ResourceManagerRegistration::invokeRegistration //向ResourceManager注冊TaskExecutorRegistration信息
ResourceManagerGateway.registerTaskExecutor
TaskExecutorToResourceManagerConnection::onRegistrationSuccess
ResourceManagerRegistrationListener::onRegistrationSuccess
TaskExecutor::establishResourceManagerConnection
ResourceManagerGateway.sendSlotReport //發(fā)送自身slot信息給ResourceManager
HeartbeatManagerImpl::monitorTarget//與RM建立心跳連接,當接到來自RM的心跳請求時,就會將SlotReport發(fā)送給RM作為心跳回應
TaskExecutor提供了以下兩個核心方法:
//RM將Slot分配給JobMaster請求TM將具體Slot信息發(fā)送給JobMaster
CompletableFuture<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId,
@RpcTimeout Time timeout);
//執(zhí)行JobMaster提交的物理Task
CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout);
TaskManager中管理Slot的實現(xiàn)類TaskSlotTableImpl,該實例記錄了Slot的分配信息。
HeartBeat
在TaskExecutor構造函數中有兩個HeartbeatManager,實現(xiàn)類都是HeartbeatManagerImpl,此類是接受心跳請求,發(fā)送心跳響應:
ResourceManagerHeartbeatManager //響應RM的心跳請求,心跳響應中帶上SlotReport
JobManagerHeartbeatManager //響應JobMaster的心跳請求, 心跳響應中帶上AccumulatorReport
調用HeartbeatManagerImpl.monitorTarget(ResourceID resourceID, HeartbeatTarget heartbeatTarget) 與目標對象建立心跳連接。
HeartbeatManager還有個實現(xiàn)類是HeartbeatManagerSenderImpl,用于主動向監(jiān)控目標發(fā)送心跳請求,比如在ResourceManager中創(chuàng)建的就是HeartbeatManagerSenderImpl,TaskManager啟動時向ResourceManager注冊后,RM就會調用HeartbeatManagerSenderImpl.monitor監(jiān)控TM, 并定時向TM的HeartbeatManagerImpl發(fā)送心跳請求。同樣,在JobMaster中創(chuàng)建的也是HeartbeatManagerSenderImpl,JobMaster定時向執(zhí)行當前Job的TM發(fā)送心跳請求,TM響應與該Job相關信息。文章來源:http://www.zghlxwxcb.cn/news/detail-640471.html
綜上,TM啟動后向RM注冊,與TM通過心跳信息同步Slot分配狀況,接受RM的Slot分配請求向JobMaster提供Slot后,就可以接受JobMaster 執(zhí)行具體的物理Task了。文章來源地址http://www.zghlxwxcb.cn/news/detail-640471.html
到了這里,關于Flink源碼之TaskManager啟動流程的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!