一、什么是狀態(tài)
無(wú)狀態(tài)計(jì)算的例子: 例如一個(gè)加法算子,第一次輸入2+3=5
那么以后我多次數(shù)據(jù)2+3
的時(shí)候得到的結(jié)果都是5
。得出的結(jié)論就是,相同的輸入都會(huì)得到相同的結(jié)果,與次數(shù)無(wú)關(guān)。
有狀態(tài)計(jì)算的例子: 訪問(wèn)量的統(tǒng)計(jì),我們都知道Nginx
的訪問(wèn)日志一個(gè)請(qǐng)求一條日志,基于此我們就可以統(tǒng)計(jì)訪問(wèn)量。如下,/api/a
這個(gè)url
第一此訪問(wèn)的時(shí)候,返回的結(jié)果就是 count1
,但當(dāng)?shù)诙卧L問(wèn)的時(shí)候,返回的結(jié)果變成了2
。為什么Flink
知道之前已經(jīng)處理過(guò)一次 hello world
,這就是state
發(fā)揮作用了,這里是被稱為keyed state
存儲(chǔ)了之前需要統(tǒng)計(jì)的數(shù)據(jù),keyby
接口的調(diào)用會(huì)創(chuàng)建keyed stream
對(duì)key
進(jìn)行劃分,這是使用keyed state
的前提。得出的結(jié)論就是,相同的輸入得到不同的結(jié)果,與次數(shù)有關(guān)。這就是有狀態(tài)的數(shù)據(jù)。
什么場(chǎng)景下會(huì)大量使用到這種狀態(tài)數(shù)據(jù)啦?簡(jiǎn)單舉幾個(gè)例子:
【1】去重的需求中,比如說(shuō)我們只想知道這100
個(gè)同事都屬于那幾個(gè)部門(mén)的等等。
【2】窗口計(jì)算,已進(jìn)入未觸發(fā)的數(shù)據(jù)。比如,我們一分鐘統(tǒng)計(jì)一次,1-2
之間的1.5
這個(gè)時(shí)候的數(shù)據(jù)對(duì)于2
來(lái)說(shuō)就是一個(gè)有狀態(tài)的數(shù)據(jù),因?yàn)?code>2的結(jié)果與1.5
有關(guān)。
【3】機(jī)器學(xué)習(xí)/深度學(xué)習(xí),訓(xùn)練的模型及參數(shù)。這對(duì)于機(jī)器學(xué)習(xí)的同學(xué)深入感觸。比如,第一次輸入hello
,機(jī)器會(huì)給我一個(gè)反饋,那么下次會(huì)基于這個(gè)反饋?zhàn)鲞M(jìn)一步的學(xué)習(xí)處理。那么上一步的結(jié)果對(duì)于我而言就是一種有狀態(tài)的輸入。
【4】訪問(wèn)歷史數(shù)據(jù),需要與昨日進(jìn)行對(duì)比。昨日的數(shù)據(jù)對(duì)于今日而言也屬于一種狀態(tài)。你品,你細(xì)品。
為什么要管理狀態(tài),用內(nèi)存不香嗎?首先流失作業(yè)是有它的標(biāo)準(zhǔn)的,不是什么東西隨隨便便就說(shuō)自己這個(gè)是流失處理。首先,7*24小時(shí)運(yùn)行,高可靠,你內(nèi)存不行吧,你的容量總有用完的時(shí)候吧。其次,數(shù)據(jù)不丟失不重,恰好計(jì)算一次,你內(nèi)存要實(shí)現(xiàn)需要備份和恢復(fù),你還總伴隨著小部分?jǐn)?shù)據(jù)的丟失吧。最后,數(shù)據(jù)實(shí)時(shí)產(chǎn)生,不延遲,你內(nèi)存不夠橫向擴(kuò)展時(shí),你需要延遲吧。
理想的狀態(tài)管理就是下面描述的樣子,Flink
也都幫我們實(shí)現(xiàn)了。
二、狀態(tài)的類型
Managed State & Raw State
Managed State | Raw State | |
---|---|---|
狀態(tài)管理方式 | Flink Runtime 管理 —自動(dòng)存儲(chǔ),自動(dòng)恢復(fù) —內(nèi)存管理上有優(yōu)化 | 用戶自己管理(Flink不知道你在State中存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)的) —要自己實(shí)例化 |
狀態(tài)數(shù)據(jù)結(jié)構(gòu) | 已知的數(shù)據(jù)結(jié)構(gòu) —value,list,map… | 字節(jié)數(shù)據(jù) —byte[] |
推薦使用場(chǎng)景 | 大多數(shù)情況下均可使用 | 自定義 Operator 時(shí)可以使用(當(dāng)Managed State 不夠時(shí)使用) |
Managed Stated 分為: Keyed Stated
和Operator State
【1】Keyed Stated: 只能用于keyBy
生成的KeyedStream
上的算子。每一個(gè)key
對(duì)應(yīng)一個(gè)State
,一個(gè)Operator
實(shí)例處理多個(gè)Key
,訪問(wèn)相應(yīng)的多個(gè)State
。相同Key
會(huì)在相同的實(shí)例中處理。整個(gè)過(guò)程如果沒(méi)有keyBy
操作,它是沒(méi)有KeyedStream
的,而Keyed Stated
只能應(yīng)用在KeyedStream
上。
并發(fā)改變: State
隨著Key
在實(shí)例間遷移。例如:實(shí)例A
中之前處理KeyA
與KeyB
,后面我擴(kuò)展了實(shí)例B
,那么 實(shí)例A
就只需要處理KeyA
,KeyB
就交給 實(shí)例B
進(jìn)行處理。安裝狀態(tài)進(jìn)行分離,可以理解為分布式。
通過(guò) RuntimeContext 訪問(wèn),說(shuō)明Operator
是一個(gè)Rich Function
,否則是拿不到RuntimeContext
。
支持的數(shù)據(jù)結(jié)構(gòu): ValueState
、ListState
、ReducingState
、AggregatingState
、MapState
【2】Operator State: 可以用于所有的算子,常用于source
上,例如FlinkKafkaConsumer
。一個(gè)Operator
實(shí)例對(duì)應(yīng)一個(gè)State
,所以一個(gè)Operator
中會(huì)處理多個(gè)key
,可以理解為集群。
并發(fā)改變: Operator State
沒(méi)有key
,并發(fā)改變的時(shí)候就需要重新分配。內(nèi)置了兩種方案:均勻分配和合并后每個(gè)得到全量。
訪問(wèn)方式: 實(shí)現(xiàn)CheckpointedFunction
或ListCheckpointed
接口。
支持的數(shù)據(jù)結(jié)構(gòu): ListState
三、Keyed State 使用示例
什么是 keyed state: 對(duì)于keyed state
,有兩個(gè)特點(diǎn):
【1】只能應(yīng)用于KeyedStream 的函數(shù)與操作中,例如Keyed UDF
, window state
;
【2】keyed state
是已經(jīng)分區(qū) / 劃分好的,每一個(gè) key 只能屬于某一個(gè) keyed state;
對(duì)于如何理解已經(jīng)分區(qū)的概念,我們需要看一下keyby
的語(yǔ)義,大家可以看到下圖左邊有三個(gè)并發(fā),右邊也是三個(gè)并發(fā),左邊的詞進(jìn)來(lái)之后,通過(guò)keyby
會(huì)進(jìn)行相應(yīng)的分發(fā)。例如對(duì)于hello word
,hello
這個(gè)詞通過(guò)hash
運(yùn)算永遠(yuǎn)只會(huì)到右下方并發(fā)的task
上面去。
什么是 operator state
【1】又稱為non-keyed state
,每一個(gè)operator state
都僅與一個(gè)operator
的實(shí)例綁定。
【2】常見(jiàn)的operator state
是source state
,例如記錄當(dāng)前source
的offset
再看一段使用operator state
的word count
代碼:
這里的fromElements
會(huì)調(diào)用FromElementsFunction
的類,其中就使用了類型為list state
的operator state
。如下幾種Keyed State
之間的依賴關(guān)系,都是state
的子類。它們的訪問(wèn)方式和數(shù)據(jù)結(jié)構(gòu)都有一定的區(qū)別。
狀態(tài)數(shù)據(jù)類型 | 訪問(wèn)接口 | 備注 | |
---|---|---|---|
ValueState | 單個(gè)值 | [update(T) 修改/T value 獲取] | 例如 WordCount 用 word 做 key,state就是單個(gè)的數(shù)值。這個(gè)單個(gè)也可以是字符串、對(duì)象等都有可能。訪問(wèn)方式只有上面兩種。 |
MapState | Map | put(UK key, UV value) putAll(Map<UK,UV> map) remove(UK key) boolean contains(UK key) UV get(UK key) Iterable<Map.Entry> entries() Iterable<Map.Entry> iterator() Iterable keys() Iterable values() | 能夠操作具體的對(duì)象的key |
ListState | List | add/ addAll(List) update(List) Iterable get() | |
ReducingState | 單個(gè)值 | add/ addAll(List) update(List) T get() | 與 List 是同一個(gè)父類,這個(gè)add是直接將數(shù)據(jù)更新進(jìn)了 Reducing的結(jié)果里面。舉個(gè)例子,例如我們統(tǒng)計(jì)1分鐘的結(jié)果,list是先將數(shù)據(jù)添加到list中,等到1分鐘的時(shí)候全來(lái)出來(lái)統(tǒng)計(jì)。而 Reducing是來(lái)一條就統(tǒng)計(jì)一條結(jié)果。好處是節(jié)省內(nèi)存。 |
AggregatingState | 單個(gè)值 | add(IN)/OUT get() | 與 List 是同一個(gè)父類,與Reducing的不同是,Reducing輸入和輸出的類型都是相同的。而Aggregating 是可以不同的。例如,我要計(jì)算一個(gè)平局值,Reducing是算好返回,而Aggregating會(huì)返回總和和個(gè)數(shù)。 |
舉個(gè)ValueState
的案例
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數(shù)據(jù)流
DataStream<Event> events = env.addSource(source);
DataStream<Alert> alerts = events
// 生成 keyedStata 通過(guò) sourceAddress
.keyBy(Event::sourceAddress)
// StateMachineMapper 狀態(tài)機(jī)
.flatMap(new StateMachineMapper());
//我么看下?tīng)顟B(tài)機(jī)怎么寫(xiě) 實(shí)現(xiàn) RichFlatMapFunction
@SuppressWarnings("serial")
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {
private ValueState<LeaderLatch.State> currentState;
@Override
public void open(Configuration conf) {
// 獲取一個(gè) valueState
currentState = getRuntimeContext().getState(
new ValueStateDescriptor<>("state", State.class));
}
//來(lái)一條數(shù)據(jù)處理一條
@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// 獲取 value
State state = currentState.value();
if (state == null) {
state = State.Initial;//State 是本地的變量
}
// 把事件對(duì)狀態(tài)的影響加上去,得到一個(gè)狀態(tài)
State nextState = state.transition(evt.type());
//判斷狀態(tài)是否合法
if (nextState == State.InvalidTransition) {
//扔出去
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
}
//是否不能繼續(xù)轉(zhuǎn)化了,例如取消的訂單
else if (nextState.isTerminal()) {
// 從 state 中清楚掉
currentState.clear();
}
else {
// 修改狀態(tài)
currentState.update(nextState);
}
}
}
四、CheckPoint 與 state 的關(guān)系
Checkpoint
是從source
觸發(fā)到下游所有節(jié)點(diǎn)完成的一次全局操作。下圖可以有一個(gè)對(duì)Checkpoint
的直觀感受,紅框里面可以看到一共觸發(fā)了 569K
次Checkpoint
,然后全部都成功完成,沒(méi)有fail
的。
**state 其實(shí)就是 Checkpoint 所做的主要持久化備份的主要數(shù)據(jù),**看下圖的具體數(shù)據(jù)統(tǒng)計(jì),其state
也就9kb
大小 。
五、狀態(tài)如何保存和恢復(fù)
Checkpoint
定時(shí)制作分布式快照,對(duì)程序的狀態(tài)進(jìn)行備份。發(fā)生故障時(shí),將整個(gè)作業(yè)的Task
都回滾到最后一次成功Checkpoint
中的狀態(tài),然后從保存的點(diǎn)繼續(xù)處理。
必要條件: 數(shù)據(jù)源支持重發(fā)(如果不重發(fā),丟失的消息就真的丟了)
一致性語(yǔ)義: 恰好一次(如果p
相同,單線程,多個(gè)線程時(shí),可能有的算子對(duì)其已經(jīng)計(jì)算了一次了,有的沒(méi)有就需要注意),至少一次。
// 獲取運(yùn)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//狀態(tài)數(shù)據(jù)
//兩個(gè)checkpoint 觸發(fā)間隔設(shè)置1S,越頻繁追的數(shù)據(jù)就越少,io消耗也越大
env.enableCheckpointing(1000);
//EXACTLY_ONCE語(yǔ)義說(shuō)明 Checkpoint是要對(duì)替的,這樣消息不會(huì)重復(fù),也不會(huì)對(duì)丟。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//兩個(gè)checkpoint 最少等待500ms 例如第一個(gè)checkpoint做了700ms按理300ms后就要做下一個(gè)checkpoint。但是它們之間的等待時(shí)間300ms<500ms 此時(shí),就會(huì)延長(zhǎng)200ms減少checkpoint過(guò)于頻繁,影響業(yè)務(wù)。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoint多久超時(shí),如果這個(gè)checkpoint在1分鐘內(nèi)還沒(méi)做完,那就失敗了
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同時(shí)最多有多少個(gè)checkpoint進(jìn)行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//當(dāng)重新分配并發(fā)度,拆分task時(shí),是否保存checkpoint。如果不保存就需要使用savepoint來(lái)保存數(shù)據(jù),放到外部的介質(zhì)中。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
Checkpoint vs Savepoint
Checkpoint | Savepoint | |
---|---|---|
觸發(fā)管理方式 | 由Flink自動(dòng)觸發(fā)并管理 | 由用戶手動(dòng)觸發(fā)并管理 |
主要用途 | 在 Task 發(fā)生異常時(shí)快速恢復(fù),例如網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的超時(shí)異常 | 有計(jì)劃的進(jìn)行備份,使作業(yè)能停止后再恢復(fù),例如修改代碼、調(diào)整并發(fā)。 |
特點(diǎn) | 輕量、自動(dòng)從故障中服務(wù)、在作業(yè)停止后默認(rèn)清除 | 持久、以標(biāo)準(zhǔn)格式存儲(chǔ),允許代碼或配置發(fā)生變化、手動(dòng)觸發(fā) savepoint 恢復(fù)。 |
可選的狀態(tài)存儲(chǔ)方式:
【1】MemoryStateBackend
:構(gòu)造方法:
MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)
存儲(chǔ)方式: State
:TaskManager
內(nèi)存。Checkpoint
:JobManager
內(nèi)存。
容量限制: 單個(gè)State maxStateSize
默認(rèn)5M
。maxStateSize <= akka.framesize
默認(rèn)10M
??偞笮〔怀^(guò)JobManager
內(nèi)存。
推薦使用場(chǎng)景: 本地測(cè)試,幾乎無(wú)狀態(tài)的作業(yè),比如ETL/JobManager
不容易掛,或影響不大的情況。不推薦在生產(chǎn)場(chǎng)景使用。
【2】FsStateBackend: 構(gòu)造方法:
FsStateBackend(URL checkpointDataUri, boolean asynchronousSnapshots)
存儲(chǔ)方式: State
:TaskManager
內(nèi)存。Checkpoint
:外部文件系統(tǒng)(本地或HDFS
)。
容量限制: 單個(gè)TaskManager
上State
總量不超過(guò)它的內(nèi)存。總大小不超過(guò)配置的文件系統(tǒng)容量(會(huì)定期清理)。
推薦使用場(chǎng)景: 常規(guī)使用狀態(tài)的作業(yè),例如分鐘級(jí)窗口聚合、join
。需要開(kāi)啟HA
的作業(yè)。可以在生產(chǎn)環(huán)境使用。
【3】RocksDBStateBackend: 構(gòu)造方法:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-760551.html
RocksDBStateBackend(URL checkpointDataUri, boolean enableIncrementalCheckpointing)
存儲(chǔ)方式: State
:TaskManager
上的KV
數(shù)據(jù)庫(kù)(實(shí)際使用內(nèi)存+磁盤(pán))。Checkpoint
:外部文件系統(tǒng)(本地或HDFS
)。
容量限制: 單個(gè)TaskManager
上State
總量不超過(guò)它的內(nèi)存+磁盤(pán),單個(gè)key
最大2G
??偞笮〔怀^(guò)配置的文件系統(tǒng)容量。
推薦使用場(chǎng)景: 超大狀態(tài)的作業(yè),例如天級(jí)窗口聚合。需要開(kāi)啟HA
的作業(yè)。對(duì)狀態(tài)讀寫(xiě)性能要求比較高的作業(yè)??梢栽谏a(chǎn)環(huán)境使用。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-760551.html
到了這里,關(guān)于Flink 狀態(tài)管理與容錯(cuò)機(jī)制(CheckPoint & SavePoint)的關(guān)系的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!