學習文檔:Flink 官方文檔 - DataStream API - 狀態(tài)與容錯 - 使用狀態(tài)
相關(guān)文檔:
- 有狀態(tài)流處理背后的概念:Flink|《Flink 官方文檔 - 概念透析 - 有狀態(tài)流處理》學習筆記
- Redis 過期 key 的刪除機制:Redis|過期 key 的刪除機制
學習筆記如下:
鍵控流(Keyed DataStream)
如果要使用鍵控狀態(tài),則必須要為 DataStream 指定 key。這個主鍵將用于對數(shù)據(jù)流中的記錄分區(qū),同時也會用于狀態(tài)分區(qū)。
可以使用 DataStream 中的 keyBy(KeySelector)
(Java / Scala)或 key_by(KeySelector)
來指定 key,在指定 key 后,數(shù)據(jù)流將變成鍵控流(KeyedStream),并允許使用基于 Keyed state 的操作。
KeySelector 接受每條記錄作為輸入,并返回這條記錄的 key。該 key 可以是任何類型,但它的計算產(chǎn)生方式必須是具有確定性的(詳見 Flink|《Flink 官方文檔 - 概念透析 - 有狀態(tài)流處理》學習筆記)。例如:
// some ordinary POJO
public class WC {
public String word;
public int count;
public String getWord() { return word; }
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(WC::getWord);
Flink 的數(shù)據(jù)類型并不基于 key - value 對,因此實際上將數(shù)據(jù)集在物理上封裝為 key 和 value 是沒有必要的。
鍵控狀態(tài)(Keyed State)
以下 Keyed State 只能在 KeyedStream 上使用:
-
ValueState<T>
:保存一個可以更新和檢索的值;這個值可以通過update(T)
進行更新,通過T value()
進行檢索。 -
ListState<T>
:保存一個元素的列表;可以通過add(T)
或者addAll(List<T>)
添加元素,通過Iterable<T> get()
獲取整個列表,通過update(List<T>)
覆蓋當前的列表。 -
ReducingState<T>
:保存一個值,表示添加到狀態(tài)的所有值聚合后的結(jié)果;使用add(T)
添加元素,并使用提供的reduceFunction
進行聚合。 -
AggregatingState<IN, OUT>
:保存一個值,表示添加到狀態(tài)的所有值聚合后的結(jié)果;使用add(T)
添加元素,并使用提供的AggregateFunction
進行聚合。與ReducingState
不同的時,聚合類型可能與添加到狀態(tài)的元素類型不同。 -
MapState<UK, UV>
:保存一個映射列表;可以使用put(UK, UV)
或putAll(Map<UK, UV>)
添加映射,使用get(UK)
來檢索特定的 key,使用entires()
、keys()
、values()
分別檢索映射、鍵和值的可迭代視圖,使用isEmpty()
判斷是否包含任何鍵值對。
所有的類型狀態(tài)還有一個 clear()
方法,用于清除當前 key 下的狀態(tài)數(shù)據(jù),也就是當前輸入元素的 key。
需要注意的是:
- 這些狀態(tài)對象僅用于與狀態(tài)交互。狀態(tài)本身不一定存儲在內(nèi)存中,還可能在磁盤或其他位置。
- 從狀態(tài)中獲取的值取決于輸入元素所代表的 Key,在不同 key 上調(diào)用同一個接口,可能得到不同的值。
在使用中,必須創(chuàng)建一個 StateDescriptor
,才能獲得對應(yīng)的狀態(tài)句柄。在狀態(tài)句柄中,記錄了狀態(tài)名稱、狀態(tài)所持有值的類型以及用戶所指定的函數(shù)。根據(jù)不同的狀態(tài)類型,可以創(chuàng)建 ValueStateDescriptor
、ListStateDescriptor
、AggregatingStateDescriptor
、ReducingStateDescriptor
或 MapStateDescriptor
。
狀態(tài)通過 RuntimeContext
進行訪問,因此只能在 rich functions
中使用。
樣例:計數(shù)窗口,這個 UDF 會計算每兩個相鄰的元素的平均值并發(fā)送到下游。
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // access the state value Tuple2<Long, Long> currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(value -> value.f0) .flatMap(new CountWindowAverage()) .print(); // the printed output will be (1,4) and (1,5)
- 定義 UDF 的
ValueState
類型的私有屬性sum
,其值為一個元組,元組中第一個元素用于存儲計數(shù)結(jié)果,第二個元素存儲求和結(jié)果。- 在
open()
方法中,定義了狀態(tài)句柄ValueStateDescriptor
,定義了狀態(tài)名稱、狀態(tài)類型和狀態(tài)的初始值,并將其狀態(tài)存儲到屬性sum
中。- 使用
sum.value()
獲取當前狀態(tài)的值- 使用
sum.updaste()
更新當前狀態(tài)的值- 使用
sum.clear()
清空當前狀態(tài)的值
狀態(tài)有效期(TTL)
任何類型的 Keyed State 都可以設(shè)置有效期(TTL)。如果配置了 TTL 且狀態(tài)已過期,則會盡最大可能清除對應(yīng)的值。
任何狀態(tài)類型都支持單元素的 TTL。這意味著列表元素和映射元素將單獨計算到期時間。
在使用 TTL 前,需要先構(gòu)建 StateTtlConfig
配置對象,然后把配置傳遞到 State Descriptor 中啟用 TTL 功能。
TTL 配置的選項
-
數(shù)據(jù)的有效期:
newBuilder()
的第一個參數(shù),必選 -
更新策略:
setUpdateType()
的第一個參數(shù),可選,默認為onCreateAndWrite
-
StateTtlConfig.UpdateType.onCreateAndWrite
:僅在創(chuàng)建和寫入時更新 -
StateTtlConfig.UpdateType.onReadAndWrite
:在讀取時也更新
-
-
數(shù)據(jù)在未被清理時的可見性配置:
setStateVisibility()
的第一個參數(shù),可選,默認為NeverReturnedExpired
-
StateTtlConfig.StateVisibility.NeverReturnExpired
- 不返回過期數(shù)據(jù) -
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
- 會返回過期但未清理的數(shù)據(jù)
-
-
關(guān)門后臺清理:
disableCleanupInBackground()
,可選,添加則關(guān)閉過期數(shù)據(jù)的后臺清理 -
開啟全量快照時清理:
cleanupFullSnapshot()
,可選,添加則開啟在全來那個快照時進行清理 -
開啟 Heap Backend 增量數(shù)據(jù)清理:
cleanupIncrementally()
,可選,添加則在訪問和處理時進行檢查過期數(shù)據(jù)并清理 -
開啟 RcoksDB Backend 壓縮時數(shù)據(jù)清理:
cleanupInRocksdbCompactFilter()
,可選,添加在開啟壓縮時數(shù)據(jù)清理
需要注意的是:因為在開啟 TTL 特性后,狀態(tài)上次的修改時間會和數(shù)據(jù)一起保存在 state backend 中,所以開啟這個特性會增加狀態(tài)數(shù)據(jù)的存儲。
TTL 的清理策略
默認情況下,過期數(shù)據(jù)會在讀取的時候被刪除,同時也會有后臺進程定期清理。
在實現(xiàn)上,HeapStateBackend
依賴增量數(shù)據(jù)清理,RocksDBStateBackend
利用壓縮過濾器進行后臺清理。
- 全量快照時進行清理:在全量快照時進行清理的策略,可以減少整體快照的大小。當前實現(xiàn)中不會清理本地狀態(tài),但從上次快照恢復(fù)時,不會恢復(fù)那些已經(jīng)刪除的過期數(shù)據(jù)。這種清理策略可以在任何時候通過
StateTtlConfig
啟動或者關(guān)閉。 - 增量數(shù)據(jù)清理:如果開啟增量式清理狀態(tài)數(shù)據(jù),在會狀態(tài)訪問和處理時進行清理。對于開啟了增量數(shù)據(jù)清理策略的狀態(tài),會在存儲后端保留一個所有狀態(tài)的惰性全局迭代器,每次出發(fā)增量清理時,從迭代器中選擇已經(jīng)過期的數(shù)據(jù)進行清理。該策略有兩個參數(shù),第一個表示每次清理時檢查狀態(tài)的條目數(shù),第二個參數(shù)表示是否在處理每條記錄時都觸發(fā)清理。Heap backend 默認會檢查 5 條狀態(tài),并且關(guān)閉在每條記錄時觸發(fā)清理。
- 壓縮時清理:RcoksDB 會周期性地對數(shù)據(jù)進行合并壓縮從而減少存儲空間,F(xiàn)link 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經(jīng)過期的數(shù)據(jù)。該策略有一個參數(shù),該參數(shù)表示每處理多少條數(shù)據(jù)進行一次清理。
Flink 的狀態(tài)清理策略與 Redis 的被動清理 + 主動清理有很多相似之處,詳見 Redis|過期 key 的刪除機制。
算子狀態(tài)(Operator State)
算子狀態(tài)是綁定到一個并行算子實例的狀態(tài)。例如,Kafka Connector 是 Flink 中就使用了算子狀態(tài),Kafka consumer 的每個并行實例維護了 topic partitions 和偏移量的 map 作為它的算子狀態(tài)。當并行度改變的時候,算子狀態(tài)支持將狀態(tài)重新分發(fā)給各并行算子實例。
算子狀態(tài)通常用于實現(xiàn) source / sink,以及一些沒有 key 而無法對 state 進行分區(qū)的場景。
廣播狀態(tài)(Broadcast State)
廣播狀態(tài)時一種特殊的算子狀態(tài),用于將狀態(tài)廣播到所有下游任務(wù)。通過廣播狀態(tài),可以保持所有子任務(wù)狀態(tài)相同。
廣播狀態(tài)與其他算子狀態(tài)的差異:
- 它具有 map 格式
- 僅在輸入為一個廣播數(shù)據(jù)流和一個非廣播數(shù)據(jù)流的算子中可用
- 可以擁有多個不同名稱的廣播狀態(tài)
使用算子狀態(tài)
通過實現(xiàn) CheckpointedFunction
接口來使用算子狀態(tài)。在 CheckpointedFunction
中提供了訪問 non-keyed state 的方法,需要實現(xiàn)如下兩個方法:
-
void snapshotState(FunctionSnapshotContext context) throws Exception
:在進行 checkpoint 時調(diào)用 -
void initializeState(FunctionInitializationContext context) throws Exception
:在 UDF 初始化時調(diào)用,這里的初始化包括第一次啟動時的初始化,以及從 checkpoint 恢復(fù)的初始化。
當前算子狀態(tài)會以 list 的形式存在,這些狀態(tài)彼此獨立,方便在改變并發(fā)后進行狀態(tài)的重新分派。有如下幾種重新分配的模式:
-
Even-split redistribution
:每個算子都保存一個列表形式的狀態(tài)集合,整個狀態(tài)由所有的列表拼接而成。當作業(yè)恢復(fù)或重新分配的時候,整個狀態(tài)會按照算子的并發(fā)度進行均勻分配。 -
Union redistribution
:每個算子保存一個列表形式的狀態(tài)集合,整個狀態(tài)由所有的列表拼接而成。當作業(yè)恢復(fù)或重新分配的時候,每個算子都將得到所有的狀態(tài)數(shù)據(jù)。如果狀態(tài)的數(shù)量很大時不要使用這個特性,可能導(dǎo)致內(nèi)存溢出的問題。
樣例:
SinkFunction
在CheckpointedFunction
中進行數(shù)據(jù)緩存,然后統(tǒng)一發(fā)送到下游。public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink(int threshold) { this.threshold = threshold; this.bufferedElements = new ArrayList<>(); } @Override public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception { bufferedElements.add(value); if (bufferedElements.size() >= threshold) { for (Tuple2<String, Integer> element: bufferedElements) { // send it to the sink } bufferedElements.clear(); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } }
initializeState()
:接受一個FunctionInitializationContext
參數(shù),并用來初始化 non-keyed state 的容器,這個容器是一個ListState
類型的checkpointedState
嗎,用于在 checkpoint 時保存 non-keyed state 對戲那個。與 keyed state 類似,在初始化時狀態(tài)句柄descriptor
時,也會包括狀態(tài)名稱、狀態(tài)類型等信息。如果是從 checkpoint 中恢復(fù)(即context.isRestored()
),則將checkpointedState
中的元素讀取并添加到bufferedElements
中。snapshotState()
:在快照時,清空checkpointedState
并將bufferedElements
中緩存的元素全部添加到checkpointedState
中。invoke()
:將傳入的數(shù)據(jù)添加到bufferedElements
中進行緩存;當緩存數(shù)量達到閾值后統(tǒng)一寫出并將緩存清空。
在調(diào)用 getOperatorStateStore()
后,調(diào)用不同的獲取狀態(tài)對象的接口,會使用不同的狀態(tài)分配算法。例如調(diào)用 getUnionListState(descriptor)
會使用 union redistribution 算法,而調(diào)用 getListState(descriptor)
則會使用 even-split redistribution 算法。文章來源:http://www.zghlxwxcb.cn/news/detail-778736.html
使用帶狀態(tài)的 Source Function
樣例:文章來源地址http://www.zghlxwxcb.cn/news/detail-778736.html
public static class CounterSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction { /** current offset for exactly once semantics */ private Long offset = 0L; /** flag for job cancellation */ private volatile boolean isRunning = true; /** 存儲 state 的變量. */ private ListState<Long> state; @Override public void run(SourceContext<Long> ctx) { final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); offset += 1; } } } @Override public void cancel() { isRunning = false; } @Override public void initializeState(FunctionInitializationContext context) throws Exception { state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>( "state", LongSerializer.INSTANCE)); // 從我們已保存的狀態(tài)中恢復(fù) offset 到內(nèi)存中,在進行任務(wù)恢復(fù)的時候也會調(diào)用此初始化狀態(tài)的方法 for (Long l : state.get()) { offset = l; } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { state.clear(); state.add(offset); } }
run()
:為了保證更新狀態(tài)以及輸出的原子性(用于實現(xiàn) exactly-once 語義),需要在發(fā)送數(shù)據(jù)前獲取數(shù)據(jù)源的全局鎖。snapshotState()
:在快照時,我們存儲當前偏移量即可。initializeState()
:在啟動或恢復(fù)時,我們需要恢復(fù)偏移量。
到了這里,關(guān)于Flink|《Flink 官方文檔 - DataStream API - 狀態(tài)與容錯 - 使用狀態(tài)》學習筆記的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!