目錄
keyed state
Keyed DataStream
使用 Keyed State
實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的計(jì)數(shù)窗口
狀態(tài)有效期 (TTL)
過(guò)期數(shù)據(jù)的清理
全量快照時(shí)進(jìn)行清理
增量數(shù)據(jù)清理
在 RocksDB 壓縮時(shí)清理
Operator State算子狀態(tài)
Broadcast State廣播狀態(tài)
keyed state
Keyed DataStream
使用 keyed state,首先需要為DataStream指定 key(主鍵)。這個(gè)主鍵用于狀態(tài)分區(qū)(也會(huì)給數(shù)據(jù)流中的記錄本身分區(qū))。
使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 來(lái)指定 key。它將生成 KeyedStream,接下來(lái)允許使用 keyed state 操作。
Keyselector函數(shù)接收單條記錄作為輸入,返回這條記錄的 key。該 key 可以為任何類型,但是它的計(jì)算產(chǎn)生方式必須是具備確定性的。
Flink的數(shù)據(jù)模型不基于key-value對(duì),因此實(shí)際上將數(shù)據(jù)集在物理上封裝成 key和 value是沒(méi)有必要的。Key是“虛擬”的。它們定義為基于實(shí)際數(shù)據(jù)的函數(shù),用以操縱分組算子。
使用 Keyed State
keyed state接口提供不同類型狀態(tài)的訪問(wèn)接口,這些狀態(tài)都作用于當(dāng)前輸入數(shù)據(jù)的key。
換句話說(shuō),這些狀態(tài)僅可在KeyedStream上使用,在Java/Scala API上可以通過(guò)stream.keyBy(...)得到 KeyedStream,在Python API上可以通過(guò) stream.key_by(...) 得到 KeyedStream。
所有支持的狀態(tài)類型如下所示:
ValueState<T>: 保存一個(gè)可以更新和檢索的值
Liststate<T>: 保存一個(gè)元素的列表??梢酝@個(gè)列表中追加數(shù)據(jù),并在當(dāng)前的列表上進(jìn)行檢索。
ReducingState<T>: 保存一個(gè)單值,表示添加到狀態(tài)的所有值的聚合。但使用 add(T) 增加元素,會(huì)使用提供的 ReduceFunction 進(jìn)行聚合。
AggregatingState<IN, OUT>: 保留一個(gè)單值,表示添加到狀態(tài)的所有值的聚合。使用 add(IN) 添加的元素會(huì)用指定的 AggregateFunction 進(jìn)行聚合。
MapState<UK, UV>: 維護(hù)了一個(gè)映射列表。 你可以添加鍵值對(duì)到狀態(tài)中,也可以獲得反映當(dāng)前所有映射的迭代器。
所有類型的狀態(tài)還有一個(gè)clear() 方法,清除當(dāng)前 key 下的狀態(tài)數(shù)據(jù),也就是當(dāng)前輸入元素的 key。
實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的計(jì)數(shù)窗口
我們把元組的第一個(gè)元素當(dāng)作 key。 該函數(shù)將出現(xiàn)的次數(shù)以及總和存儲(chǔ)在 “ValueState” 中。
一旦出現(xiàn)次數(shù)達(dá)到 2,則將平均值發(fā)送到下游,并清除狀態(tài)重新開始。 請(qǐng)注意,我們會(huì)為每個(gè)不同的 key(元組中第一個(gè)元素)保存一個(gè)單獨(dú)的值。
必須創(chuàng)建一個(gè) StateDescriptor,才能得到對(duì)應(yīng)的狀態(tài)句柄。 這保存了狀態(tài)名稱,狀態(tài)所持有值的類型,并且可能包含用戶指定的函數(shù),例如ReduceFunction。
根據(jù)不同的狀態(tài)類型,可以創(chuàng)建ValueStateDescriptor,ListstateDescriptor, AggregatingStateDescriptor, ReducingStateDescriptor 或MapStateDescriptor。
狀態(tài)通過(guò) RuntimeContext 進(jìn)行訪問(wèn),因此只能在 rich functions 中使用。
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
class CountWindowAverage(FlatMapFunction):
??? def __init__(self):
??????? self.sum = None
??? def open(self, runtime_context: RuntimeContext):
??????? descriptor = ValueStateDescriptor(
??????????? "average",? # the state name
??????????? Types.PICKLED_BYTE_ARRAY()? # type information
??????? )
??????? self.sum = runtime_context.get_state(descriptor)
??? def flat_map(self, value):
??????? # access the state value
??????? current_sum = self.sum.value()
??????? if current_sum is None:
??????????? current_sum = (0, 0)
??????? # update the count
??????? current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
??????? # update the state
??????? self.sum.update(current_sum)
??????? # if the count reaches 2, emit the average and clear the state
??????? if current_sum[0] >= 2:
??????????? self.sum.clear()
??????????? yield value[0], int(current_sum[1] / current_sum[0])
env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \
??? .key_by(lambda row: row[0]) \
??? .flat_map(CountWindowAverage()) \
??? .print()
env.execute()
# the printed output will be (1,4) and (1,5)
狀態(tài)有效期 (TTL)
任何類型的keyed state都可以有有效期(TTL)。如果配置了TTL且狀態(tài)值已過(guò)期,則會(huì)盡最大可能清除對(duì)應(yīng)的值。所有狀態(tài)類型都支持單元素的TTL。 這意味著列表元素和映射元素將獨(dú)立到期。
在使用狀態(tài) TTL 前,需要先構(gòu)建一個(gè)StateTtlConfig 配置對(duì)象。 然后把配置傳遞到state descriptor中啟用TTL功能。
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
ttl_config = StateTtlConfig \
? .new_builder(Time.seconds(1)) \
? .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
? .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
? .build()
state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)
TTL配置有以下幾個(gè)選項(xiàng):
newBuilder 的第一個(gè)參數(shù)表示數(shù)據(jù)的有效期,是必選項(xiàng)。
TTL 的更新策略(默認(rèn)是 OnCreateAndWrite):
StateTtlConfig.UpdateType.OnCreateAndWrite - 僅在創(chuàng)建和寫入時(shí)更新
StateTtlConfig.UpdateType.OnReadAndWrite - 讀取時(shí)也更新
數(shù)據(jù)在過(guò)期但還未被清理時(shí)的可見性配置如下(默認(rèn)為 NeverReturnExpired):
??? StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回過(guò)期數(shù)據(jù)
??? (注意: 在PyFlink作業(yè)中,狀態(tài)的讀寫緩存都將失效,這將導(dǎo)致一部分的性能損失)
??? NeverReturnExpired 情況下,過(guò)期數(shù)據(jù)就像不存在一樣,不管是否被物理刪除。這對(duì)于不能訪問(wèn)過(guò)期數(shù)據(jù)的場(chǎng)景下非常有用,比如敏感數(shù)據(jù)。
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 會(huì)返回過(guò)期但未清理的數(shù)據(jù)
??? (注意: 在PyFlink作業(yè)中,狀態(tài)的讀緩存將會(huì)失效,這將導(dǎo)致一部分的性能損失)
??? ReturnExpiredIfNotCleanedUp 在數(shù)據(jù)被物理刪除前都會(huì)返回。
過(guò)期數(shù)據(jù)的清理
默認(rèn)情況下,過(guò)期數(shù)據(jù)會(huì)在讀取的時(shí)候被刪除,例如 ValueState#value,同時(shí)會(huì)有后臺(tái)線程定期清理(如果 StateBackend 支持的話)??梢酝ㄟ^(guò) StateTtlConfig 配置關(guān)閉后臺(tái)清理.
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig
ttl_config = StateTtlConfig \
? .new_builder(Time.seconds(1)) \
? .disable_cleanup_in_background() \
? .build()
可以配置更細(xì)粒度的后臺(tái)清理策略。當(dāng)前的實(shí)現(xiàn)中 HeapStateBackend 依賴增量數(shù)據(jù)清理,RocksDBStateBackend 利用壓縮過(guò)濾器進(jìn)行后臺(tái)清理。
全量快照時(shí)進(jìn)行清理
可以啟用全量快照時(shí)進(jìn)行清理的策略,這可以減少整個(gè)快照的大小。當(dāng)前實(shí)現(xiàn)中不會(huì)清理本地的狀態(tài),但從上次快照恢復(fù)時(shí),不會(huì)恢復(fù)那些已經(jīng)刪除的過(guò)期數(shù)據(jù)。
該策略可以通過(guò) StateTtlConfig 配置進(jìn)行配置,這種策略在 RocksDBStateBackend 的增量 checkpoint 模式下無(wú)效。
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig
ttl_config = StateTtlConfig \
? .new_builder(Time.seconds(1)) \
? .cleanup_full_snapshot() \
? .build()
這種清理方式可以在任何時(shí)候通過(guò) StateTtlConfig 啟用或者關(guān)閉,比如在從 savepoint 恢復(fù)時(shí)。
增量數(shù)據(jù)清理
現(xiàn)在僅 Heap state backend 支持增量清除機(jī)制。
增量式清理狀態(tài)數(shù)據(jù),在狀態(tài)訪問(wèn)或/和處理時(shí)進(jìn)行。如果沒(méi)有 state 訪問(wèn),也沒(méi)有處理數(shù)據(jù),則不會(huì)清理過(guò)期數(shù)據(jù)。增量清理會(huì)增加數(shù)據(jù)處理的耗時(shí)。
如果某個(gè)狀態(tài)開啟了該清理策略,則會(huì)在存儲(chǔ)后端保留一個(gè)所有狀態(tài)的惰性全局迭代器。
每次觸發(fā)增量清理時(shí),從迭代器中選擇已經(jīng)過(guò)期的數(shù)進(jìn)行清理。
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig
ttl_config = StateTtlConfig \
? .new_builder(Time.seconds(1)) \
? .cleanup_incrementally(10, True) \
? .build()
該策略有兩個(gè)參數(shù)。 第一個(gè)是每次清理時(shí)檢查狀態(tài)的條目數(shù),在每個(gè)狀態(tài)訪問(wèn)時(shí)觸發(fā)。
第二個(gè)參數(shù)表示是否在處理每條記錄時(shí)觸發(fā)清理。 Heap backend 默認(rèn)會(huì)檢查 5 條狀態(tài),并且關(guān)閉在每條記錄時(shí)觸發(fā)清理。
在 RocksDB 壓縮時(shí)清理
如果使用 RocksDB state backend,則會(huì)啟用 Flink 為 RocksDB 定制的壓縮過(guò)濾器。RocksDB 會(huì)周期性的對(duì)數(shù)據(jù)進(jìn)行合并壓縮從而減少存儲(chǔ)空間。 Flink 提供的 RocksDB 壓縮過(guò)濾器會(huì)在壓縮時(shí)過(guò)濾掉已經(jīng)過(guò)期的狀態(tài)數(shù)據(jù)。
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig
ttl_config = StateTtlConfig \
? .new_builder(Time.seconds(1)) \
? .cleanup_in_rocksdb_compact_filter(1000) \
? .build()
Flink 處理一定條數(shù)的狀態(tài)數(shù)據(jù)后,會(huì)使用當(dāng)前時(shí)間戳來(lái)檢測(cè) RocksDB 中的狀態(tài)是否已經(jīng)過(guò)期, 你可以通過(guò) StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定處理狀態(tài)的條數(shù)。
時(shí)間戳更新的越頻繁,狀態(tài)的清理越及時(shí),但由于壓縮會(huì)有調(diào)用 JNI 的開銷,因此會(huì)影響整體的壓縮性能。
RocksDB backend 的默認(rèn)后臺(tái)清理策略會(huì)每處理 1000 條數(shù)據(jù)進(jìn)行一次。
注意:
??? 壓縮時(shí)調(diào)用 TTL 過(guò)濾器會(huì)降低速度。TTL 過(guò)濾器需要解析上次訪問(wèn)的時(shí)間戳,并對(duì)每個(gè)將參與壓縮的狀態(tài)進(jìn)行是否過(guò)期檢查。 對(duì)于集合型狀態(tài)類型(比如 list 和 map),會(huì)對(duì)集合中每個(gè)元素進(jìn)行檢查。
??? 對(duì)于元素序列化后長(zhǎng)度不固定的列表狀態(tài),TTL 過(guò)濾器需要在每次 JNI 調(diào)用過(guò)程中,額外調(diào)用 Flink 的 java 序列化器, 從而確定下一個(gè)未過(guò)期數(shù)據(jù)的位置。
??? 對(duì)已有的作業(yè),這個(gè)清理方式可以在任何時(shí)候通過(guò) StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟后。
Operator State算子狀態(tài)
Python DataStream API 仍無(wú)法支持算子狀態(tài)
算子狀態(tài)(或者非 keyed 狀態(tài))是綁定到一個(gè)并行算子實(shí)例的狀態(tài)。Kafka Connector 是 Flink 中使用算子狀態(tài)一個(gè)很具有啟發(fā)性的例子。
Kafka consumer 每個(gè)并行實(shí)例維護(hù)了 topic partitions 和偏移量的 map 作為它的算子狀態(tài)。
當(dāng)并行度改變的時(shí)候,算子狀態(tài)支持將狀態(tài)重新分發(fā)給各并行算子實(shí)例。處理重分發(fā)過(guò)程有多種不同的方案。
在典型的有狀態(tài) Flink 應(yīng)用中你無(wú)需使用算子狀態(tài)。它大都作為一種特殊類型的狀態(tài)使用。用于實(shí)現(xiàn) source/sink,以及無(wú)法對(duì) state 進(jìn)行分區(qū)而沒(méi)有主鍵的這類場(chǎng)景中。
Broadcast State廣播狀態(tài)
Python DataStream API 仍無(wú)法支持廣播狀態(tài)
廣播狀態(tài)是一種特殊的算子狀態(tài)。引入它的目的在于支持一個(gè)流中的元素需要廣播到所有下游任務(wù)的使用情形。在這些任務(wù)中廣播狀態(tài)用于保持所有子任務(wù)狀態(tài)相同。
該狀態(tài)接下來(lái)可在第二個(gè)處理記錄的數(shù)據(jù)流中訪問(wèn)??梢栽O(shè)想包含了一系列用于處理其他流中元素規(guī)則的低吞吐量數(shù)據(jù)流,這個(gè)例子自然而然地運(yùn)用了廣播狀態(tài)。
考慮到上述這類使用情形,廣播狀態(tài)和其他算子狀態(tài)的不同之處在于:
它具有 map 格式,
它僅在一些特殊的算子中可用。這些算子的輸入為一個(gè)廣播數(shù)據(jù)流和非廣播數(shù)據(jù)流,文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-686697.html
這類算子可以擁有不同命名的多個(gè)廣播狀態(tài) 。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-686697.html
到了這里,關(guān)于Flink流批一體計(jì)算(19):PyFlink DataStream API之State的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!