国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink源碼之StreamTask啟動(dòng)流程

這篇具有很好參考價(jià)值的文章主要介紹了Flink源碼之StreamTask啟動(dòng)流程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

每個(gè)ExecutionVertex分配Slot后,JobMaster就會(huì)向Slot所在的TaskExecutor提交RPC請(qǐng)求執(zhí)行Task,接口為TaskExecutorGateway::submitTask

CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout); 

TaskDeploymentDescriptor 中包含當(dāng)前Task的執(zhí)行邏輯、Job信息、輸入輸出信息
Flink源碼之StreamTask啟動(dòng)流程,BigData,flink,大數(shù)據(jù)

submitTask 方法核心就是構(gòu)造org.apache.flink.runtime.taskmanager.Task實(shí)例,該實(shí)例繼承自Runnable接口,有個(gè)Thread成員變量,構(gòu)造完成后就啟動(dòng)線程執(zhí)行Task邏輯。

TaskExecutor::submitTask
Task.startTaskThread
Task.run
Task.doRun
Task::setupPartitionsAndGates //初始化Task的輸入輸出
RuntimeEnvironment::new //封裝task執(zhí)行上下文信息
Task::loadAndInstantiateInvokable //TaskInvokables實(shí)例化
StreamTask::new
    StreamTask::createRecordWriterDelegate //創(chuàng)建Writer,為每個(gè)StreamEdge創(chuàng)建一個(gè)Writer
    StreamTask::createStateBackend //創(chuàng)建StateBackend,一個(gè)task一個(gè)StateBackend實(shí)例
    StreamTask::createCheckpointStorage
    SubtaskCheckpointCoordinatorImpl::new 
Task::restoreAndInvoke
TaskInvokable::restore 
TaskInvokable::invoke //處理輸入元素
TaskInvokable::cleanUp

Task的Invokable Class是在StreamGraph中添加Operator形成StreamNode時(shí)確定的,對(duì)不同的算子有不同的InvokableClass:

  • SourceStreamTask.class //LegacySource算子
  • SourceOperatorStreamTask //Source算子
  • OneInputStreamTask.class //輸入是一個(gè)算子
  • TwoInputStreamTask:class //輸入是兩個(gè)算子
  • MultipleInputStreamTask.class //輸入有多個(gè)算子

以上這些類都繼承自org.apache.flink.streaming.runtime.tasks.StreamTask

Flink源碼之StreamTask啟動(dòng)流程,BigData,flink,大數(shù)據(jù)

在調(diào)用TaskInvokable::restore時(shí)會(huì)執(zhí)行:

StreamTask::restore
StreamTask::restoreInternal //創(chuàng)建OperatorChain
RegularOperatorChain::new
OperatorChain::new
OperatorChain::createOutputCollector
OperatorChain::createOperatorChain
OperatorChain::createOperator
StreamOperatorFactoryUtil.createOperator  //創(chuàng)建Operator,在每個(gè)算子的StreamConfig中定義了每個(gè)Operator具體類型,比如StreamMap, StreamFlatMap
SimpleOperatorFactory::createStreamOperator //創(chuàng)建StreamOperator包裝了用戶函數(shù),, StreamOperator包裝了代碼中用戶函數(shù),會(huì)調(diào)用用戶函數(shù)中的open/close等生命周期函數(shù)
	AbstractUdfStreamOperator::setup
	AbstractStreamOperator::setup //設(shè)置用用自定義函數(shù)中的RuntimeContext成員變量
    	StreamingRuntimeContext::new  //
    
StreamTask::init //子類做初始化,創(chuàng)建InputGate、StreamTaskInput、DataOutput及InputProcessor
StreamTask::restoreGates
	StreamTask::createStreamTaskStateInitializer
		StreamTaskStateInitializerImpl::new //
    OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
    	AbstractStreamOperator::initializeState 
			StreamTaskStateInitializerImpl::streamOperatorStateContext //此時(shí)會(huì)創(chuàng)建keyedStatedBackend和operatorStateBackend
		StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態(tài)管理
		StreamOperatorStateHandler::initializeOperatorState
		    StateInitializationContextImpl::new
			AbstractUdfStreamOperator::initializeState//調(diào)用用戶定義函數(shù)中的initializeState方法,可獲取Operator State
				StreamingFunctionUtils::restoreFunctionState
		StreamingRuntimeContext::setKeyedStateStore
	StreamOperator::open //調(diào)用getRuntimeContext().getState可獲取keySate
StreamTask::invoke
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput

整個(gè)過程在StreamTask.java的注釋中有說明:

 * -- invoke()
 *       |
 *       +----> Create basic utils (config, etc) and load the chain of operators
 *       +----> operators.setup()
 *       +----> task specific init()
 *       +----> initialize-operator-states()
 *       +----> open-operators()
 *       +----> run()
 *       +----> finish-operators()
 *       +----> close-operators()
 *       +----> common cleanup
 *       +----> task specific cleanup()
  1. 首先創(chuàng)建OperatorChain,依次創(chuàng)建出每個(gè)StreamOperator
  2. 調(diào)用Operator的setup方法,初始化StreamingRuntimeContext
  3. 調(diào)用子類init方法初始化
  4. 調(diào)用initializeState初始化每個(gè)算子的狀態(tài),此時(shí)會(huì)為每個(gè)StreamOperator創(chuàng)建keyedStatedBackend和operatorStateBackend,然后會(huì)調(diào)用用戶定義函數(shù)中的initializeState方法,用于創(chuàng)建Operator State
  5. 調(diào)用算子的open方法,便于用戶在自定義函數(shù)open中進(jìn)行初始化,比如初始化keyState
  6. 調(diào)用processInput處理流中數(shù)據(jù)

SourceStreamTask重載了StreamTask::processInput,該方法中直接起一個(gè)線程調(diào)用SourceFunction::run方法。

OneInputStreamTask則不同,它重載了StreamTask的init方法,在init方法中創(chuàng)建了StreamOneInputProcessor

OneInputStreamTask::init
OneInputStreamTask::createCheckpointedInputGate
OneInputStreamTask::createDataOutput //創(chuàng)建StreamTaskNetworkOutput
OneInputStreamTask::createTaskInput //創(chuàng)建StreamTaskNetworkInput
StreamOneInputProcessor::new

在StreamTask::processInput則是調(diào)用InputProcessor::processInput不斷讀取數(shù)據(jù)進(jìn)行處理

StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循環(huán)不斷從buffer中讀取StreamElement
處理
AbstractStreamTaskNetworkInput::processElement
    StreamTaskNetworkOutput::emitRecord //調(diào)用operator的setKeyContextElement和processElement
        OneInputStreamOperator::setKeyContextElement
        AbstractStreamOperator::setKeyContextElement1
        AbstractStreamOperator::setCurrentKey //
            StreamOperatorStateHandler::setCurrentKey //設(shè)置狀態(tài)當(dāng)前key
        Input::processElement  //調(diào)用StreamOperator的processElement方法

以上Task從提交到起線程執(zhí)行起來的整個(gè)過程,在初始化過程中為每個(gè)StreamOperator進(jìn)行狀態(tài)后端的初始化相當(dāng)重要,后續(xù)處理流的過程中會(huì)使用這些狀態(tài)后端存儲(chǔ)管理狀態(tài)。文章來源地址http://www.zghlxwxcb.cn/news/detail-648966.html

到了這里,關(guān)于Flink源碼之StreamTask啟動(dòng)流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 說說Flink on yarn的啟動(dòng)流程

    說說Flink on yarn的啟動(dòng)流程

    核心流程 FlinkYarnSessionCli 啟動(dòng)的過程中首先會(huì)檢查Yarn上有沒有足夠的資源去啟動(dòng)所需要的container,如果有,則 上傳一些flink的jar和配置文件到HDFS ,這里主要是啟動(dòng)AM進(jìn)程和TaskManager進(jìn)程的相關(guān)依賴jar包和配置文件。 接著yarn client會(huì)首先向RM 申請(qǐng)一個(gè)container來作為ApplicationMas

    2024年02月10日
    瀏覽(26)
  • 深入理解 Flink(五)Flink Standalone 集群?jiǎn)?dòng)源碼剖析

    深入理解 Flink(五)Flink Standalone 集群?jiǎn)?dòng)源碼剖析

    深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達(dá)鏈接: 深入理解 Flink (一)Flink 架構(gòu)設(shè)計(jì)原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯(cuò)深入分析 深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級(jí)原理詳解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    瀏覽(32)
  • Flink源碼之Checkpoint執(zhí)行流程

    Flink源碼之Checkpoint執(zhí)行流程

    Checkpoint完整流程如上圖所示: JobMaster的CheckpointCoordinator向所有SourceTask發(fā)送RPC觸發(fā)一次CheckPoint SourceTask向下游廣播CheckpointBarrier SouceTask完成狀態(tài)快照后向JobMaster發(fā)送快照結(jié)果 非SouceTask在Barrier對(duì)齊后完成狀態(tài)快照向JobMaster發(fā)送快照結(jié)果 JobMaster保存SubTask快照結(jié)果 JobMaster收到所

    2024年02月11日
    瀏覽(44)
  • Flink源碼之State創(chuàng)建流程

    Flink源碼之State創(chuàng)建流程

    StreamOperatorStateHandler 在StreamTask啟動(dòng)初始化時(shí)通過StreamTaskStateInitializerImpl::streamOperatorStateContext會(huì)為每個(gè)StreamOperator 創(chuàng)建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個(gè)StreamOperatorStateHandler成員變量,調(diào)用AbstractStreamOperator::initializeState方法中會(huì)初始化StreamOperatorStateH

    2024年02月12日
    瀏覽(21)
  • Flink window 源碼分析1:窗口整體執(zhí)行流程

    Flink window 源碼分析1:窗口整體執(zhí)行流程

    注:本文源碼為flink 1.18.0版本。 其他相關(guān)文章: Flink window 源碼分析1:窗口整體執(zhí)行流程 Flink window 源碼分析2:Window 的主要組件 Flink window 源碼分析3:WindowOperator Flink window 源碼分析4:WindowState Window 本質(zhì)上就是借助狀態(tài)后端緩存著一定時(shí)間段內(nèi)的數(shù)據(jù),然后在達(dá)到某些條件

    2024年01月16日
    瀏覽(27)
  • 大數(shù)據(jù)Flink(六十一):Flink流處理程序流程和項(xiàng)目準(zhǔn)備

    大數(shù)據(jù)Flink(六十一):Flink流處理程序流程和項(xiàng)目準(zhǔn)備

    文章目錄 Flink流處理程序流程和項(xiàng)目準(zhǔn)備 一、Flink流處理程序的一般流程

    2024年02月11日
    瀏覽(28)
  • 【大數(shù)據(jù)】Flink 詳解(六):源碼篇 Ⅰ

    【大數(shù)據(jù)】Flink 詳解(六):源碼篇 Ⅰ

    《 Flink 詳解 》系列(已完結(jié)),共包含以下 10 10 10 篇文章: 【大數(shù)據(jù)】Flink 詳解(一):基礎(chǔ)篇(架構(gòu)、并行度、算子) 【大數(shù)據(jù)】Flink 詳解(二):核心篇 Ⅰ(窗口、WaterMark) 【大數(shù)據(jù)】Flink 詳解(三):核心篇 Ⅱ(狀態(tài) State) 【大數(shù)據(jù)】Flink 詳解(四):核心篇

    2024年02月10日
    瀏覽(47)
  • 【Flink】Flink提交流程

    【Flink】Flink提交流程

    我們通常在學(xué)習(xí)的時(shí)候需要掌握大數(shù)據(jù)組件的原理以便更好的掌握這個(gè)大數(shù)據(jù)組件,F(xiàn)link實(shí)際生產(chǎn)開發(fā)過程中最常見的就是提交到y(tǒng)arn上進(jìn)行調(diào)度,模式使用的 Per-Job模式,下面我們就給大家講下Flink提交Per-Job任務(wù)到y(tǒng)arn上的流程,流程圖如下 ?(1)客戶端將作業(yè)提交給 YARN 的資

    2024年02月11日
    瀏覽(14)
  • 源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行

    源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行 Flink版本:1.13.6 前置知識(shí):源節(jié)點(diǎn)的Checkpoint是由Checkpointcoordinate觸發(fā),具體是通過RPC調(diào)用TaskManager中對(duì)應(yīng)的Task的StreamTask類的performChecpoint方法執(zhí)行Checkpoint。 本文思路:本文先分析checkpoint階段,然后再分析數(shù)據(jù)讀取階段,

    2024年02月14日
    瀏覽(28)
  • 【Flink】詳解Flink任務(wù)提交流程

    【Flink】詳解Flink任務(wù)提交流程

    通常我們會(huì)使用 bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar 方式啟動(dòng)任務(wù);我們看一下 flink 文件中到底做了什么,以下是其部分源碼 可以看到,第一步將相對(duì)地址轉(zhuǎn)換成絕對(duì)地址;第二步獲取 Flink 配置信息,這個(gè)信息放在 bin 目錄下的 config. sh 中;第三步獲取 JV

    2024年02月14日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包