每個(gè)ExecutionVertex分配Slot后,JobMaster就會(huì)向Slot所在的TaskExecutor提交RPC請(qǐng)求執(zhí)行Task,接口為TaskExecutorGateway::submitTask
CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout);
TaskDeploymentDescriptor 中包含當(dāng)前Task的執(zhí)行邏輯、Job信息、輸入輸出信息
submitTask 方法核心就是構(gòu)造org.apache.flink.runtime.taskmanager.Task實(shí)例,該實(shí)例繼承自Runnable接口,有個(gè)Thread成員變量,構(gòu)造完成后就啟動(dòng)線程執(zhí)行Task邏輯。
TaskExecutor::submitTask
Task.startTaskThread
Task.run
Task.doRun
Task::setupPartitionsAndGates //初始化Task的輸入輸出
RuntimeEnvironment::new //封裝task執(zhí)行上下文信息
Task::loadAndInstantiateInvokable //TaskInvokables實(shí)例化
StreamTask::new
StreamTask::createRecordWriterDelegate //創(chuàng)建Writer,為每個(gè)StreamEdge創(chuàng)建一個(gè)Writer
StreamTask::createStateBackend //創(chuàng)建StateBackend,一個(gè)task一個(gè)StateBackend實(shí)例
StreamTask::createCheckpointStorage
SubtaskCheckpointCoordinatorImpl::new
Task::restoreAndInvoke
TaskInvokable::restore
TaskInvokable::invoke //處理輸入元素
TaskInvokable::cleanUp
Task的Invokable Class是在StreamGraph中添加Operator形成StreamNode時(shí)確定的,對(duì)不同的算子有不同的InvokableClass:
- SourceStreamTask.class //LegacySource算子
- SourceOperatorStreamTask //Source算子
- OneInputStreamTask.class //輸入是一個(gè)算子
- TwoInputStreamTask:class //輸入是兩個(gè)算子
- MultipleInputStreamTask.class //輸入有多個(gè)算子
以上這些類都繼承自org.apache.flink.streaming.runtime.tasks.StreamTask
在調(diào)用TaskInvokable::restore時(shí)會(huì)執(zhí)行:
StreamTask::restore
StreamTask::restoreInternal //創(chuàng)建OperatorChain
RegularOperatorChain::new
OperatorChain::new
OperatorChain::createOutputCollector
OperatorChain::createOperatorChain
OperatorChain::createOperator
StreamOperatorFactoryUtil.createOperator //創(chuàng)建Operator,在每個(gè)算子的StreamConfig中定義了每個(gè)Operator具體類型,比如StreamMap, StreamFlatMap
SimpleOperatorFactory::createStreamOperator //創(chuàng)建StreamOperator包裝了用戶函數(shù),, StreamOperator包裝了代碼中用戶函數(shù),會(huì)調(diào)用用戶函數(shù)中的open/close等生命周期函數(shù)
AbstractUdfStreamOperator::setup
AbstractStreamOperator::setup //設(shè)置用用自定義函數(shù)中的RuntimeContext成員變量
StreamingRuntimeContext::new //
StreamTask::init //子類做初始化,創(chuàng)建InputGate、StreamTaskInput、DataOutput及InputProcessor
StreamTask::restoreGates
StreamTask::createStreamTaskStateInitializer
StreamTaskStateInitializerImpl::new //
OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
AbstractStreamOperator::initializeState
StreamTaskStateInitializerImpl::streamOperatorStateContext //此時(shí)會(huì)創(chuàng)建keyedStatedBackend和operatorStateBackend
StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態(tài)管理
StreamOperatorStateHandler::initializeOperatorState
StateInitializationContextImpl::new
AbstractUdfStreamOperator::initializeState//調(diào)用用戶定義函數(shù)中的initializeState方法,可獲取Operator State
StreamingFunctionUtils::restoreFunctionState
StreamingRuntimeContext::setKeyedStateStore
StreamOperator::open //調(diào)用getRuntimeContext().getState可獲取keySate
StreamTask::invoke
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput
整個(gè)過程在StreamTask.java的注釋中有說明:
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* +----> finish-operators()
* +----> close-operators()
* +----> common cleanup
* +----> task specific cleanup()
- 首先創(chuàng)建OperatorChain,依次創(chuàng)建出每個(gè)StreamOperator
- 調(diào)用Operator的setup方法,初始化StreamingRuntimeContext
- 調(diào)用子類init方法初始化
- 調(diào)用initializeState初始化每個(gè)算子的狀態(tài),此時(shí)會(huì)為每個(gè)StreamOperator創(chuàng)建keyedStatedBackend和operatorStateBackend,然后會(huì)調(diào)用用戶定義函數(shù)中的initializeState方法,用于創(chuàng)建Operator State
- 調(diào)用算子的open方法,便于用戶在自定義函數(shù)open中進(jìn)行初始化,比如初始化keyState
- 調(diào)用processInput處理流中數(shù)據(jù)
SourceStreamTask重載了StreamTask::processInput,該方法中直接起一個(gè)線程調(diào)用SourceFunction::run方法。
OneInputStreamTask則不同,它重載了StreamTask的init方法,在init方法中創(chuàng)建了StreamOneInputProcessor
OneInputStreamTask::init
OneInputStreamTask::createCheckpointedInputGate
OneInputStreamTask::createDataOutput //創(chuàng)建StreamTaskNetworkOutput
OneInputStreamTask::createTaskInput //創(chuàng)建StreamTaskNetworkInput
StreamOneInputProcessor::new
在StreamTask::processInput則是調(diào)用InputProcessor::processInput不斷讀取數(shù)據(jù)進(jìn)行處理文章來源:http://www.zghlxwxcb.cn/news/detail-648966.html
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循環(huán)不斷從buffer中讀取StreamElement
處理
AbstractStreamTaskNetworkInput::processElement
StreamTaskNetworkOutput::emitRecord //調(diào)用operator的setKeyContextElement和processElement
OneInputStreamOperator::setKeyContextElement
AbstractStreamOperator::setKeyContextElement1
AbstractStreamOperator::setCurrentKey //
StreamOperatorStateHandler::setCurrentKey //設(shè)置狀態(tài)當(dāng)前key
Input::processElement //調(diào)用StreamOperator的processElement方法
以上Task從提交到起線程執(zhí)行起來的整個(gè)過程,在初始化過程中為每個(gè)StreamOperator進(jìn)行狀態(tài)后端的初始化相當(dāng)重要,后續(xù)處理流的過程中會(huì)使用這些狀態(tài)后端存儲(chǔ)管理狀態(tài)。文章來源地址http://www.zghlxwxcb.cn/news/detail-648966.html
到了這里,關(guān)于Flink源碼之StreamTask啟動(dòng)流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!