深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達(dá)鏈接:
深入理解 Flink (一)Flink 架構(gòu)設(shè)計(jì)原理
深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯(cuò)深入分析
深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級原理詳解
深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析
深入理解 Flink (五)Flink Standalone 集群啟動源碼剖析
深入理解 Flink (六)Flink Job 提交和 Flink Graph 詳解
深入理解 Flink (七)Flink Slot 管理詳解
深入理解 Flink (八)Flink Task 部署初始化和啟動詳解
Flink Window 常見需求背景
需求描述
每隔 5 秒,計(jì)算最近 10 秒單詞出現(xiàn)的次數(shù) —— 滑動窗口
每隔 5 秒,計(jì)算最近 5 秒單詞出現(xiàn)的次數(shù) —— 滾動窗口
關(guān)于 Flink time 種類 TimeCharacteristic
- ProcessingTime
- IngestionTime
- EventTime
WindowAssigner 的子類
- SlidingProcessingTimeWindows
- SlidingEventTimeWindows
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
使用 EventTime + WaterMark 處理亂序數(shù)據(jù)
示意圖:
- 使用 onPeriodicEmit 方法發(fā)送 watermark,默認(rèn)每 200ms 發(fā)一次。
- 窗口起始時(shí)間默認(rèn)按各個(gè)時(shí)區(qū)的整點(diǎn)時(shí)間,支持自定義 offset。
Flink Watermark 機(jī)制定義
有序的流的 Watermarks
無序的流的 Watermarks
多并行度流的 Watermarks
深入理解 Flink Watermark
Flink Window 觸發(fā)的條件:
- watermark 時(shí)間 >= window_end_time
- 在 [window_start_time, window_end_time) 區(qū)間中有數(shù)據(jù)存在(注意是左閉右開的區(qū)間),而且是以 event time 來計(jì)算的
Flink 處理太過延遲數(shù)據(jù)
Flink 丟棄延遲太多的數(shù)據(jù)
企業(yè)生產(chǎn)中一般不用。
Flink 指定允許再次遲到的時(shí)間
治標(biāo)不治本,企業(yè)生產(chǎn)中一般不用。
Flink 收集遲到的數(shù)據(jù)單獨(dú)處理
企業(yè)生產(chǎn)中應(yīng)用較為廣泛。
Flink 多并行度 Watermark
一個(gè) window 可能會接受到多個(gè) waterMark,我們以最小的為準(zhǔn)。
Flink Window 概述
官網(wǎng)介紹
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
Flink Window 分類
Flink 的 window 分為兩種類型的 Window,分別是:Keyed Windows 和 Non-Keyed Windows,他們的使用方式不同:
// Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Window 的生命周期
- 當(dāng)屬于某個(gè)窗口的第一個(gè)元素到達(dá)的時(shí)候,就會創(chuàng)建一個(gè)窗口。
- 當(dāng)時(shí)間(event or processing time)超過 window 的結(jié)束時(shí)間戳加上用戶指定的允許延遲(Allowed Lateness)時(shí),窗口將被完全刪除。
- 每個(gè) Window 之上,都綁定有一個(gè) Trigger 或者一個(gè) Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用來執(zhí)行窗口內(nèi)數(shù)據(jù)的計(jì)算。
- 可以給 Window 指定一個(gè) Evictor,它能夠在 after the trigger fires 以及 before and/or after the function is applied 從窗口中刪除元素。
Flink Window 類型
Flink 流批同一前后的 Window 分類:
tumblingwindows —— 滾動窗口
slidingwindows —— 滑動窗口
session windows —— 會話窗口
global windows —— 全局窗口
Flink Window 操作使用
高級玩法:自定義 Trigger、自定義 Evictor,讀者可自行搜索相關(guān)文章與代碼。
Flink Window 增量聚合
- reduce(ReduceFunction)
- aggregate(AggregateFunction)
- sum()
- min()
- max()
- sum()
Flink Window 全量聚合
- apply(WindowFunction)
- process(ProcessWindowFunction)
Flink Window Join
// 在 Flink 中對兩個(gè) DataStream 做 Join
// 1、指定兩張表
// 2、指定這兩張表的鏈接字段
stream.join(otherStream) // 兩個(gè)流進(jìn)行關(guān)聯(lián)
.where(<KeySelector>) // 選擇第一個(gè)流的key作為關(guān)聯(lián)字段
.equalTo(<KeySelector>) // 選擇第二個(gè)流的key作為關(guān)聯(lián)字段
.window(<WindowAssigner>) // 設(shè)置窗口的類型
.apply(<JoinFunction>) // 對結(jié)果做操作 process apply = foreach
Tumbling Window Join
Sliding Window Join
Session Window Join
文章來源:http://www.zghlxwxcb.cn/news/detail-819747.html
Interval Join
核心代碼示例:文章來源地址http://www.zghlxwxcb.cn/news/detail-819747.html
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
到了這里,關(guān)于深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!