国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink源碼之State創(chuàng)建流程

這篇具有很好參考價(jià)值的文章主要介紹了Flink源碼之State創(chuàng)建流程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink源碼之State創(chuàng)建流程,BigData,flink,大數(shù)據(jù)

StreamOperatorStateHandler

在StreamTask啟動(dòng)初始化時(shí)通過(guò)StreamTaskStateInitializerImpl::streamOperatorStateContext會(huì)為每個(gè)StreamOperator 創(chuàng)建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個(gè)StreamOperatorStateHandler成員變量,調(diào)用AbstractStreamOperator::initializeState方法中會(huì)初始化StreamOperatorStateHandler類型的成員變量, StreamOperatorStateHandler對(duì)象變量封裝了keyedStatedBackend和operatorStateBackend,用于統(tǒng)一管理SteamOperator的狀態(tài)。

 OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
    	AbstractStreamOperator::initializeState(StreamTaskStateInitializer) 
			StreamTaskStateInitializerImpl::streamOperatorStateContext //此時(shí)會(huì)創(chuàng)建keyedStatedBackend和operatorStateBackend
			StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態(tài)管理
			StreamOperatorStateHandler::initializeOperatorState
		    StateInitializationContextImpl::new //封裝DefaultKeyedStateStore和OperatorStateStore
			CheckpointedStreamOperator::initializeState(StateInitializationContext)//調(diào)用用戶定義函數(shù)中的initializeState方法,可獲取Operator State
		StreamingRuntimeContext::setKeyedStateStore

Flink中主要有兩種StateBackend:

  • HashMapStateBackend //內(nèi)存
  • EmbeddedRocksDBStateBackend //內(nèi)存+磁盤(pán)

每個(gè)StreamTask一個(gè)StateBackend成員變量,在構(gòu)造函數(shù)中進(jìn)行初始化,通過(guò)用戶代碼中設(shè)置或StateBackendLoader::loadStateBackendFromConfig從配置中加載,默認(rèn)為HashMapStateBackend。簡(jiǎn)單起見(jiàn),以HashMapStateBackend為例剖析創(chuàng)建KeyedStatedBackend和OperatorStateBackend以及處理數(shù)據(jù)流時(shí)是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState創(chuàng)建流程:

OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::operatorStateBackend
            HashMapStateBackend::createOperatorStateBackend //創(chuàng)建DefaultOperatorStateBackend
        StreamOperatorStateHandler::new //創(chuàng)建StreamOperatorStateHandler
        StreamOperatorStateHandler::initializeOperatorState //調(diào)用CheckpointedFunction::initializeState
        	StateInitializationContextImpl::new //該實(shí)例可getOperatorStateStore

使用Operator State的用戶業(yè)務(wù)代碼需要實(shí)現(xiàn)CheckpointedFunction接口,該接口中有以兩個(gè)下方法:

void initializeState(FunctionInitializationContext context) throws Exception;

void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法則會(huì)被StreamOperatorStateHandler.initializeOperatorState 調(diào)用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::new
	PartitionableListState::new  //內(nèi)部是ArrayList

因此通過(guò)OperatorStateStore獲取的ListState內(nèi)部本質(zhì)上是一個(gè)ArrayList, 業(yè)務(wù)代碼中可以調(diào)用add方法向這個(gè)內(nèi)部List添加元素,由StateBackend管理每個(gè)Operator State,這樣就實(shí)現(xiàn)了一個(gè)分布式狀態(tài)管理,借助Checkpoint可以實(shí)現(xiàn)狀態(tài)持久化及容災(zāi)恢復(fù)。

OperatorStateStore有三個(gè)獲取狀態(tài)方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
            throws Exception

KeyedState

KeyedState創(chuàng)建流程如下:

OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::keyedStatedBackend
            HashMapStateBackend::createKeyedStateBackend //創(chuàng)建HeapKeyedStateBackend
            	HeapKeyedStateBackendBuilder::build
            		InternalKeyContextImpl::new //用于保存當(dāng)前正在處理的key
            		
        StreamOperatorStateHandler::new //創(chuàng)建StreamOperatorStateHandler
            DefaultKeyedStateStore::new //創(chuàng)建DefaultKeyedStateStore
        StreamingRuntimeContext::setKeyedStateStore //設(shè)置keyedStateStore成員變量
    AbstractStreamUdfOperator::open
    	FunctionUtils::openFunction
    		RichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState時(shí),用戶自定義函數(shù)實(shí)現(xiàn)RichFunction接口,在open方法中調(diào)用getRuntimeContext().getState方法獲取狀態(tài):

getRuntimeContext().getState() //獲取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedState
    LatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabled
    TtlStateFactory::createStateAndWrapWithTtlIfEnabled //包裝TTL
    HeapKeyedStateBackend::createInternalState
    HeapKeyedStateBackend::tryRegisterStateTable //這里很關(guān)鍵,對(duì)每個(gè)State創(chuàng)建一個(gè)StateTable
    	CopyOnWriteStateTable::new//異步快照,這里傳遞了當(dāng)前KeyedStateBackend的InternalKeyContext
    	StateTable::new //根據(jù)當(dāng)前Task管理的KeyGroups數(shù)量創(chuàng)建StateMap數(shù)組
    	CopyOnWriteStateTable::createStateMap //一個(gè)KeyGroup一個(gè)StateMap
    	CopyOnWriteStateMap::new //存儲(chǔ)key及其對(duì)應(yīng)的狀態(tài)
   HeapValueState::create
   		HeapValueState::new //有個(gè)成員變量指向存儲(chǔ)當(dāng)前state的CopyOnWriteStateMap
   	HeapValueState::setCurrentNamespace  //默認(rèn)為VoidNamespace

KeyedState有以下幾種類型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 獲取HeapValueState

ListState<T> getListState(ListStateDescriptor<T> stateProperties)獲取HeapListState

MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)獲取HeapMapState

getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)獲取HeapAggregatingState

getReducingState(ReducingStateDescriptor<T> stateProperties)獲取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState與HashMapStateBackend 一樣,也是通過(guò)DefaultOperatorStateBackend進(jìn)行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState則是使用RocksDBKeyedStateBackend實(shí)現(xiàn),這樣可以借助磁盤(pán)加內(nèi)存進(jìn)行大狀態(tài)管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

總結(jié)

Flink內(nèi)置狀態(tài)管理是相比其他分布式流式處理系統(tǒng)最大的優(yōu)勢(shì)之一,不用借助外部存儲(chǔ)組件,就可實(shí)現(xiàn)高效可靠的分布式狀態(tài)管理,極大降低了學(xué)習(xí)和使用成本。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-657241.html

到了這里,關(guān)于Flink源碼之State創(chuàng)建流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink源碼之TaskManager啟動(dòng)流程

    Flink源碼之TaskManager啟動(dòng)流程

    從啟動(dòng)命令flink-daemon.sh可以看出TaskManger入口類為org.apache.flink.runtime.taskexecutor.TaskManagerRunner 在TaskManagerRunner構(gòu)造函數(shù)中,可以看出與JobManger類似,也是先構(gòu)造出一些公共服務(wù): 這些服務(wù)在構(gòu)造TaskExecutor時(shí)作為構(gòu)造函數(shù)參數(shù)傳入 構(gòu)造TaskExecutor前會(huì)先構(gòu)造TaskManagerServices輔助Task

    2024年02月13日
    瀏覽(20)
  • Flink源碼之JobMaster啟動(dòng)流程

    Flink源碼之JobMaster啟動(dòng)流程

    Flink中Graph轉(zhuǎn)換流程如下: Flink Job提交時(shí)各種類型Graph轉(zhuǎn)換流程中,JobGraph是Client端形成StreamGraph后經(jīng)過(guò)Operator Chain優(yōu)化后形成的,然后提交給JobManager的Restserver,最終轉(zhuǎn)發(fā)給JobManager的Dispatcher處理。 本文主要解析從JobGraph轉(zhuǎn)換為ExecutionGraph過(guò)程,執(zhí)行棧如下: 在整個(gè)提交過(guò)程中

    2024年02月13日
    瀏覽(12)
  • Flink 學(xué)習(xí)七 Flink 狀態(tài)(flink state)

    Flink 學(xué)習(xí)七 Flink 狀態(tài)(flink state)

    流式計(jì)算邏輯中,比如sum,max; 需要記錄和后面計(jì)算使用到一些歷史的累計(jì)數(shù)據(jù), 狀態(tài)就是 :用戶在程序邏輯中用于記錄信息的變量 在Flink 中 ,狀態(tài)state 不僅僅是要記錄狀態(tài);在程序運(yùn)行中如果失敗,是需要重新恢復(fù),所以這個(gè)狀態(tài)也是需要持久化;一遍后續(xù)程序繼續(xù)運(yùn)行 1.1 row state 我

    2024年02月09日
    瀏覽(33)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(2) - operator state

    【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(2) - operator state

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月22日
    瀏覽(29)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(1) - Keyed State

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(23)
  • Flink window 源碼分析1:窗口整體執(zhí)行流程

    Flink window 源碼分析1:窗口整體執(zhí)行流程

    注:本文源碼為flink 1.18.0版本。 其他相關(guān)文章: Flink window 源碼分析1:窗口整體執(zhí)行流程 Flink window 源碼分析2:Window 的主要組件 Flink window 源碼分析3:WindowOperator Flink window 源碼分析4:WindowState Window 本質(zhì)上就是借助狀態(tài)后端緩存著一定時(shí)間段內(nèi)的數(shù)據(jù),然后在達(dá)到某些條件

    2024年01月16日
    瀏覽(27)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例 - 完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)
  • flink學(xué)習(xí)之state

    state作用 保留當(dāng)前key的歷史狀態(tài)。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失沒(méi)有setstate state案例 比如起點(diǎn)的小說(shuō)不能被下載。別人只能通過(guò)截屏,提取文字的方式盜版小

    2024年02月09日
    瀏覽(17)
  • flink 的 State

    flink 的 State

    目錄 一、前言 二、什么是State 2.1:什么時(shí)候需要?dú)v史數(shù)據(jù) 2.2:為什么要容錯(cuò),以及checkpoint如何進(jìn)行容錯(cuò) 2.3:state basckend 又是什么 三、有哪些常見(jiàn)的是 State 四、 State的使用 五、State backend 5.1??MemoryStateBackend: 5.2? FsStatebackend: 5.3??RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    瀏覽(19)
  • Flink State 狀態(tài)管理

    狀態(tài)在Flink中叫做State,用來(lái)保存中間計(jì)算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點(diǎn)內(nèi)容: 狀態(tài)數(shù)據(jù)的存儲(chǔ)和訪問(wèn) 在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。 狀態(tài)數(shù)據(jù)的備份和恢復(fù) 作業(yè)失敗是無(wú)法避免的,那么就要考慮如何高效地將狀態(tài)

    2024年01月17日
    瀏覽(24)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包