一. state相關(guān)
1. state種類
按照數(shù)據(jù)的劃分和擴(kuò)張方式,F(xiàn)link中大致分為2類:
- Keyed States:記錄每個(gè)Key對(duì)應(yīng)的狀態(tài)值
因?yàn)橐粋€(gè)任務(wù)的并行度有多少,就會(huì)有多少個(gè)子任務(wù),當(dāng)key的范圍大于并行度時(shí),就會(huì)出現(xiàn)一個(gè)subTask上可能包含多個(gè)Key(),但不同Task上不會(huì)出現(xiàn)相同的Key(解決了shuffle的問(wèn)題?)
?
常用的 MapState、ValueState。
- Operator States:記錄每個(gè)Task對(duì)應(yīng)的狀態(tài)值數(shù)據(jù)類型。
?
2. State的存在形式
Keyed State 和 Operator State 存在兩種形式:managed (托管狀態(tài))和 raw(原始狀態(tài))。
- 托管狀態(tài)是由Flink框架管理的狀態(tài),原始狀態(tài)是由用戶自行管理狀態(tài)的具體數(shù)據(jù)結(jié)構(gòu)。
- 通常所有的 datastream functions 都可以使用托管狀態(tài),但是原始狀態(tài)接口僅僅能夠在實(shí)現(xiàn) operators的時(shí)候使用。
- 推薦使用 managed state 而不是使用 raw state,因?yàn)槭褂猛泄軤顟B(tài)的時(shí)候 Flink 可以在 parallelism 發(fā)生改變的情況下能夠動(dòng)態(tài)重新分配狀態(tài),而且還能更好的進(jìn)行內(nèi)存管理。
?
3. state在哪產(chǎn)生
沒(méi)有狀態(tài)的操作
從概念上講, 源表從來(lái)不會(huì)在狀態(tài)中被完全保存。 形如 SELECT … FROM … WHERE
這種只包含字段映射或過(guò)濾器的查詢的查詢語(yǔ)句通常是無(wú)狀態(tài)的管道。
諸如 join、 聚合或去重操作需要在 Flink 抽象的容錯(cuò)存儲(chǔ)內(nèi)保存中間結(jié)果??聪聅um的狀態(tài)操作
@Internal
public class StreamGroupedReduceOperator<IN>
extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private static final String STATE_NAME = "_op_state";
private transient ValueState<IN> values;
private final TypeSerializer<IN> serializer;
public StreamGroupedReduceOperator(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
super(reducer);
this.serializer = serializer;
}
@Override
public void open() throws Exception {
super.open();
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
//獲得value state
values = getPartitionedState(stateId);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
//如果currentValue不為null,則說(shuō)明不是第一次啟動(dòng),也就是在hdfs上已經(jīng)存儲(chǔ)了中間狀態(tài)
if (currentValue != null) {
//先做一個(gè)聚合,然后再更新,之后輸出到下游
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
//第一次啟動(dòng)直接更新數(shù)據(jù),之后輸出到下游
values.update(value);
output.collect(element.replace(value));
}
}
}
?
4. state 內(nèi)存設(shè)置
從 Flink1.10 開(kāi)始,F(xiàn)link 默認(rèn)將 state 內(nèi)存大小配置為每個(gè) task slot 的托管內(nèi)存。
調(diào)試內(nèi)存性能的問(wèn)題主要是通過(guò)調(diào)整配置項(xiàng),來(lái)提高Flink的托管內(nèi)存:
taskmanager.memory.managed.size
//推薦使用比例計(jì)算
taskmanager.memory.managed.fraction
具體調(diào)優(yōu)案例分析可見(jiàn):Flink on yarn雙流join問(wèn)題分析+性能調(diào)優(yōu)思路
?
?
二. state backend
Flink狀態(tài)后端主要負(fù)責(zé)兩件事:本地的狀態(tài)管理、將檢查點(diǎn)(checkpoint)狀態(tài)寫(xiě)入遠(yuǎn)程存儲(chǔ)。
flink state可以存儲(chǔ)在java堆內(nèi)存內(nèi)或者內(nèi)存之外。
默認(rèn)情況下,使用MemoryStateBackend,F(xiàn)link的state會(huì)保存在taskManager的內(nèi)存中,而checkpoint會(huì)保存在jobManager的內(nèi)存中。
?
1. 三種狀態(tài)后端
flink提供三種開(kāi)箱即用的State Backend:
狀態(tài)后端 | 數(shù)據(jù)存儲(chǔ) | 容量限制 | 場(chǎng)景 |
---|---|---|---|
MemoryStateBackend |
State:TaskManager 內(nèi)存中
Checkpoint:存儲(chǔ)在jobManager 內(nèi)存
|
單個(gè)State maxStateSize默認(rèn)為5M
maxStateSize <= akka.frame.size默認(rèn)10M
Checkpoint總大小不能超過(guò)JobMananger的內(nèi)存
|
本地測(cè)試
狀態(tài)比較少的作業(yè)
不推薦生產(chǎn)環(huán)境中使用
|
FsStateBackend |
State:TaskManager 內(nèi)存
Checkpoint:外部文件系統(tǒng)(本地或HDFS)
|
單個(gè)TaskManager上State總量不能超過(guò)TM內(nèi)存
總數(shù)據(jù)大小不超過(guò)文件系統(tǒng)容量
|
窗口時(shí)間比較長(zhǎng),如分鐘級(jí)別窗口聚合,Join等
需要開(kāi)啟HA的作業(yè)
可在生產(chǎn)環(huán)境中使用
|
RocksDBStateBackend |
將所有的狀態(tài)序列化之后, 存入本地的 RocksDB 數(shù)據(jù)庫(kù)中.(一種 NoSql 數(shù) 據(jù)庫(kù), KV 形式存儲(chǔ))
State: TaskManager 中的KV數(shù)據(jù)庫(kù)(實(shí)際使用內(nèi)存+磁盤(pán))
Checkpoint:外部文件系統(tǒng)(本地或HDFS)
|
單TaskManager 上 State總量不超過(guò)其內(nèi)存+磁盤(pán)大小,單 Key最大容量2G
總大小不超過(guò)配置的文件系統(tǒng)容量
|
超大狀態(tài)作業(yè)
需要開(kāi)啟HA的
作業(yè)生產(chǎn)環(huán)境可用
|
?
2. 如何在hdfs中存儲(chǔ)?
Keyed States 和 Operator States 會(huì)存儲(chǔ)在一個(gè)帶有編號(hào)的 chk 目錄中,比如說(shuō)一個(gè) flink 任務(wù)的 Keyed States 的 subTask 個(gè)數(shù)是4,Operator States 對(duì)應(yīng)的 subTask 也是 4,那么 chk 會(huì)存一個(gè)元數(shù)據(jù)文件 _metadata ,四個(gè) Keyed States 文件,四個(gè) Operator States 的文件。
也就是說(shuō) Keyed States 和 Operator States 會(huì)分別存儲(chǔ) subTask 總數(shù)個(gè)狀態(tài)文件。
?
3. 設(shè)置checkpoint
一般需求,我們的 Checkpoint 時(shí)間間隔可以設(shè)置為分鐘級(jí)別(1-5 分鐘)。
3.1. 大狀態(tài)下設(shè)置checkpoint
對(duì)于狀態(tài)很大的任務(wù)每次 Checkpoint 訪問(wèn) HDFS 比較耗時(shí),可以設(shè)置為 5~10 分鐘一次Checkpoint,并且調(diào)大兩次 Checkpoint 之間的暫停間隔,例如設(shè)置兩次 Checkpoint 之 間至少暫停 4 或 8 分鐘。
具體案例分析可見(jiàn):Flink on yarn雙流join問(wèn)題分析+性能調(diào)優(yōu)思路
?
3.2. EXACTLY_ONCE下設(shè)置分析checkpoint
如果 Checkpoint 語(yǔ)義配置為 EXACTLY_ONCE,那么在 Checkpoint 過(guò)程中還會(huì)存在 barrier 對(duì)齊的過(guò)程,可以通過(guò) Flink Web UI 的 Checkpoint 選項(xiàng)卡來(lái)查看 Checkpoint 過(guò)程中各階段的耗時(shí)情況,從而確定到底是哪個(gè)階段導(dǎo)致 Checkpoint 時(shí)間過(guò)長(zhǎng)然后針對(duì)性的解決問(wèn)題。
?
?
三. State設(shè)置過(guò)期時(shí)間
使用 flink 進(jìn)行實(shí)時(shí)計(jì)算中,會(huì)遇到一些狀態(tài)數(shù)不斷累積,導(dǎo)致?tīng)顟B(tài)量越來(lái)越大的情形。例如,作業(yè)中定義了超長(zhǎng)的時(shí)間窗口,或者在動(dòng)態(tài)表上應(yīng)用了無(wú)限范圍的 GROUP BY 語(yǔ)句,以及執(zhí)行了沒(méi)有時(shí)間窗口限制的雙流 JOIN 等等操作。
對(duì)于這些情況,經(jīng)常導(dǎo)致堆內(nèi)存出現(xiàn) OOM,或者堆外內(nèi)存(RocksDB)用量持續(xù)增長(zhǎng)導(dǎo)致超出容器的配額上限,造成作業(yè)的頻繁崩潰。
從 Flink 1.6 版本開(kāi)始引入了 State TTL 特性,該特性可以允許對(duì)作業(yè)中定義的 Keyed 狀態(tài)進(jìn)行超時(shí)自動(dòng)清理,對(duì)于Table API 和 SQL 模塊引入了空閑狀態(tài)保留時(shí)間(Idle State Retention Time)進(jìn)行狀態(tài)管理。
?
1. datastream的TTL
要使用 State TTL 功能,首先要定義一個(gè) StateTtlConfig 對(duì)象。State TTL功能所指定的過(guò)期時(shí)間并不是全局生效的,而是和某個(gè)具體的算子狀態(tài)所綁定。
以下描述了state的構(gòu)建、配置:過(guò)期時(shí)間、狀態(tài)時(shí)間戳的更新,對(duì)過(guò)期數(shù)據(jù)的處理等內(nèi)容。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1)) //過(guò)期時(shí)間:上次訪問(wèn)的時(shí)間 +TTL 超過(guò)了當(dāng)前時(shí)間,則表明狀態(tài)過(guò)期了。
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //狀態(tài)時(shí)間戳更新的時(shí)間
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //已過(guò)期但是還未處理的狀態(tài)怎么處理,NeverReturnExpired:一旦狀態(tài)過(guò)期,則永遠(yuǎn)不會(huì)被返回給調(diào)用方
//清理策略:
.cleanupFullSnapshot() //對(duì)過(guò)期狀態(tài)不主動(dòng)處理。默認(rèn)情況下,過(guò)期值只有在顯式讀出時(shí)才會(huì)被刪除,例如通過(guò)調(diào)用 ValueState.value() 方法。
.cleanupIncrementally(1024,true)//增量清理,可配置讀取若干條記錄就執(zhí)行一次清理,并可指定每次清理多少條失效記錄。
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL配置不是check/savepoints的一部分,而是Flink在當(dāng)前運(yùn)行的作業(yè)中如何處理它的一種方式。
?
小結(jié):
state TTL 機(jī)制,應(yīng)對(duì)通用的狀態(tài)暴增特別有效。然而,機(jī)制不能保證一定可以及時(shí)清理掉失效的狀態(tài),以及目前僅支持 Processing Time 時(shí)間模式等等。
?
?
2.Table API和SQL的狀態(tài)管理
針對(duì) Table API 和 SQL 模塊的持續(xù)查詢/聚合語(yǔ)句,F(xiàn)link 還提供了另一項(xiàng)失效狀態(tài)清理機(jī)制,這就是 Idle State Retention Time。
2.1. 問(wèn)題描述與分析
如下,官網(wǎng)的例子一個(gè)持續(xù)查詢的分組語(yǔ)句,沒(méi)有時(shí)間窗口的定義,理論上會(huì)無(wú)限地計(jì)算下去,但這里會(huì)出現(xiàn)一個(gè)問(wèn)題:隨著時(shí)間的推移,內(nèi)存的狀態(tài)會(huì)積累很多,直到狀態(tài)達(dá)到了存儲(chǔ)系統(tǒng)的極限,作業(yè)崩潰。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
針對(duì)上面的問(wèn)題,F(xiàn)link 提出了空閑狀態(tài)保留時(shí)間(Idle State Retention Time)的概念,如下描述:
通過(guò)為每個(gè)狀態(tài)設(shè)置Timer,如果這個(gè)狀態(tài)中途被訪問(wèn)過(guò),則重新設(shè)置Timer;否則(如果狀態(tài)一直沒(méi)有被訪問(wèn))Timer到期時(shí)做狀態(tài)清理。
這樣就可以確保每個(gè)狀態(tài)能夠被及時(shí)的清理。
?
2.2. 狀態(tài)設(shè)置
streamTableEnvironment.getConfig().setIdleStateRetentionTime(
Time.minutes(idleStateRetentionTime),
Time.of(idleStateRetentionTime * 60 + 5, TimeUnit.MINUTES));
注意:
舊版本 Flink 允許只指定一個(gè)參數(shù),表示最早和最晚清理周期相同,但是這樣可能會(huì)導(dǎo)致同一時(shí)間段有很多狀態(tài)都到期,從而造成瞬間的處理壓力。
?
新版本(1.11)的 Flink 要求兩個(gè)參數(shù)之間的差距至少要達(dá)到 5 分鐘,從而避免大量狀態(tài)瞬間到期,對(duì)系統(tǒng)造成的沖擊。
?
2.3. 實(shí)現(xiàn)邏輯與源碼分析
使用CleanupState 來(lái)表示idle state retention time
//狀態(tài)空閑時(shí)間timer的注冊(cè)
public interface CleanupState {
default void registerProcessingCleanupTimer(
ValueState<Long> cleanupTimeState, //通過(guò)ValueState來(lái)維護(hù)狀態(tài)清理時(shí)間
long currentTime,
long minRetentionTime,
long maxRetentionTime,
TimerService timerService)
throws Exception {
//最近一次要清理狀態(tài)的時(shí)間
Long curCleanupTime = cleanupTimeState.value();
//如果curCleanupTime為空 或 維護(hù)的時(shí)間+最小的狀態(tài)空閑時(shí)間大于curCleanupTime
if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
//重新注冊(cè)一個(gè)timer,
//此時(shí)要注意:如果maxRetentionTime和minRetentionTime的間隔過(guò)小,就會(huì)頻繁的產(chǎn)生timer與更新valuestate,維護(hù)timer的成本將會(huì)變大。
long cleanupTime = currentTime + maxRetentionTime;
timerService.registerProcessingTimeTimer(cleanupTime);
//如果之前有timer則刪除
if (curCleanupTime != null) {
timerService.deleteProcessingTimeTimer(curCleanupTime);
}
//并更新清理時(shí)間,用于觸發(fā)下一次清理
cleanupTimeState.update(cleanupTime);
}
}
}
當(dāng)數(shù)據(jù)第一次出現(xiàn),或者curTime+minRetentionTime超過(guò)了最近的清理時(shí)間,就用curTime+maxRetentionTime,創(chuàng)建新的Timer,用于觸發(fā)下一次清理,如果有了過(guò)期的timer就刪除。
所以如果maxRetentionTime和minRetentionTime的間隔過(guò)小,就會(huì)頻繁的產(chǎn)生timer與更新valuestate,維護(hù)timer的成本將會(huì)變大。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-429376.html
?
?
?
參考:
Flink 狀態(tài)管理詳解(State TTL、Operator state、Keyed state)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-429376.html
到了這里,關(guān)于【狀態(tài)管理|概述】Flink的狀態(tài)管理:為什么需要state、怎么保存state、對(duì)于state過(guò)大怎么處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!