StreamOperatorStateHandler
在StreamTask啟動(dòng)初始化時(shí)通過(guò)StreamTaskStateInitializerImpl::streamOperatorStateContext會(huì)為每個(gè)StreamOperator 創(chuàng)建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個(gè)StreamOperatorStateHandler成員變量,調(diào)用AbstractStreamOperator::initializeState方法中會(huì)初始化StreamOperatorStateHandler類型的成員變量, StreamOperatorStateHandler對(duì)象變量封裝了keyedStatedBackend和operatorStateBackend,用于統(tǒng)一管理SteamOperator的狀態(tài)。
OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
AbstractStreamOperator::initializeState(StreamTaskStateInitializer)
StreamTaskStateInitializerImpl::streamOperatorStateContext //此時(shí)會(huì)創(chuàng)建keyedStatedBackend和operatorStateBackend
StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態(tài)管理
StreamOperatorStateHandler::initializeOperatorState
StateInitializationContextImpl::new //封裝DefaultKeyedStateStore和OperatorStateStore
CheckpointedStreamOperator::initializeState(StateInitializationContext)//調(diào)用用戶定義函數(shù)中的initializeState方法,可獲取Operator State
StreamingRuntimeContext::setKeyedStateStore
Flink中主要有兩種StateBackend:
- HashMapStateBackend //內(nèi)存
- EmbeddedRocksDBStateBackend //內(nèi)存+磁盤(pán)
每個(gè)StreamTask一個(gè)StateBackend成員變量,在構(gòu)造函數(shù)中進(jìn)行初始化,通過(guò)用戶代碼中設(shè)置或StateBackendLoader::loadStateBackendFromConfig從配置中加載,默認(rèn)為HashMapStateBackend。簡(jiǎn)單起見(jiàn),以HashMapStateBackend為例剖析創(chuàng)建KeyedStatedBackend和OperatorStateBackend以及處理數(shù)據(jù)流時(shí)是如何使用KeyedState和OperatorState的。
OperatorState
OperatorState創(chuàng)建流程:
OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
AbstractStreamOperator::initializeState
StreamTaskStateInitializerImpl::streamOperatorStateContext
StreamTaskStateInitializerImpl::operatorStateBackend
HashMapStateBackend::createOperatorStateBackend //創(chuàng)建DefaultOperatorStateBackend
StreamOperatorStateHandler::new //創(chuàng)建StreamOperatorStateHandler
StreamOperatorStateHandler::initializeOperatorState //調(diào)用CheckpointedFunction::initializeState
StateInitializationContextImpl::new //該實(shí)例可getOperatorStateStore
使用Operator State的用戶業(yè)務(wù)代碼需要實(shí)現(xiàn)CheckpointedFunction接口,該接口中有以兩個(gè)下方法:
void initializeState(FunctionInitializationContext context) throws Exception;
void snapshotState(FunctionSnapshotContext context) throws Exception;
其中initializeState方法則會(huì)被StreamOperatorStateHandler.initializeOperatorState 調(diào)用,在initializeState方法中可使用
FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::new
PartitionableListState::new //內(nèi)部是ArrayList
因此通過(guò)OperatorStateStore獲取的ListState內(nèi)部本質(zhì)上是一個(gè)ArrayList, 業(yè)務(wù)代碼中可以調(diào)用add方法向這個(gè)內(nèi)部List添加元素,由StateBackend管理每個(gè)Operator State,這樣就實(shí)現(xiàn)了一個(gè)分布式狀態(tài)管理,借助Checkpoint可以實(shí)現(xiàn)狀態(tài)持久化及容災(zāi)恢復(fù)。
OperatorStateStore有三個(gè)獲取狀態(tài)方法:
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
throws Exception
KeyedState
KeyedState創(chuàng)建流程如下:
OperatorChain::initializeStateAndOpenOperators //調(diào)用每個(gè)Operator的initializeState和Open方法
AbstractStreamOperator::initializeState
StreamTaskStateInitializerImpl::streamOperatorStateContext
StreamTaskStateInitializerImpl::keyedStatedBackend
HashMapStateBackend::createKeyedStateBackend //創(chuàng)建HeapKeyedStateBackend
HeapKeyedStateBackendBuilder::build
InternalKeyContextImpl::new //用于保存當(dāng)前正在處理的key
StreamOperatorStateHandler::new //創(chuàng)建StreamOperatorStateHandler
DefaultKeyedStateStore::new //創(chuàng)建DefaultKeyedStateStore
StreamingRuntimeContext::setKeyedStateStore //設(shè)置keyedStateStore成員變量
AbstractStreamUdfOperator::open
FunctionUtils::openFunction
RichFunction::open
KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState時(shí),用戶自定義函數(shù)實(shí)現(xiàn)RichFunction接口,在open方法中調(diào)用getRuntimeContext().getState方法獲取狀態(tài):
getRuntimeContext().getState() //獲取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedState
LatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabled
TtlStateFactory::createStateAndWrapWithTtlIfEnabled //包裝TTL
HeapKeyedStateBackend::createInternalState
HeapKeyedStateBackend::tryRegisterStateTable //這里很關(guān)鍵,對(duì)每個(gè)State創(chuàng)建一個(gè)StateTable
CopyOnWriteStateTable::new//異步快照,這里傳遞了當(dāng)前KeyedStateBackend的InternalKeyContext
StateTable::new //根據(jù)當(dāng)前Task管理的KeyGroups數(shù)量創(chuàng)建StateMap數(shù)組
CopyOnWriteStateTable::createStateMap //一個(gè)KeyGroup一個(gè)StateMap
CopyOnWriteStateMap::new //存儲(chǔ)key及其對(duì)應(yīng)的狀態(tài)
HeapValueState::create
HeapValueState::new //有個(gè)成員變量指向存儲(chǔ)當(dāng)前state的CopyOnWriteStateMap
HeapValueState::setCurrentNamespace //默認(rèn)為VoidNamespace
KeyedState有以下幾種類型
ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 獲取HeapValueState
ListState<T> getListState(ListStateDescriptor<T> stateProperties)獲取HeapListState
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)獲取HeapMapState
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)獲取HeapAggregatingState
getReducingState(ReducingStateDescriptor<T> stateProperties)獲取HeapReducingState
RocksDBStateBackend
EmbeddedRocksDBStateBackend 管理OperatorState與HashMapStateBackend 一樣,也是通過(guò)DefaultOperatorStateBackend進(jìn)行管理的。
EmbeddedRocksDBStateBackend 管理KeyedState則是使用RocksDBKeyedStateBackend實(shí)現(xiàn),這樣可以借助磁盤(pán)加內(nèi)存進(jìn)行大狀態(tài)管理:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-657241.html
RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState
總結(jié)
Flink內(nèi)置狀態(tài)管理是相比其他分布式流式處理系統(tǒng)最大的優(yōu)勢(shì)之一,不用借助外部存儲(chǔ)組件,就可實(shí)現(xiàn)高效可靠的分布式狀態(tài)管理,極大降低了學(xué)習(xí)和使用成本。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-657241.html
到了這里,關(guān)于Flink源碼之State創(chuàng)建流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!