注:本文源碼為flink 1.18.0版本。
其他相關(guān)文章:
Flink window 源碼分析1:窗口整體執(zhí)行流程
Flink window 源碼分析2:Window 的主要組件
Flink window 源碼分析3:WindowOperator
Flink window 源碼分析4:WindowState
1 window 的重要組件
Window 本質(zhì)上就是借助狀態(tài)后端緩存著一定時間段內(nèi)的數(shù)據(jù),然后在達到某些條件時觸發(fā)對這些緩存數(shù)據(jù)的聚合計算,輸出外部系統(tǒng)。
其主要組件有:Window Assigners、Triggers、Evictors。這三個組件的詳細講解請看筆記:Flink window 源碼分析2:Window 的主要組件。
- Window Assigners
Window assigner 定義了 stream 中的元素如何被分發(fā)到各個窗口。
Time Window 會創(chuàng)建一個 EventTimeTrigger 用來制定窗口觸發(fā)時間。Count Window 和 GlobalWindow需要指定窗口觸發(fā)器。
可以通過繼承 WindowAssigner 抽象類實現(xiàn)自定義。 - Triggers
決定窗口是否觸發(fā)。
Trigger 接口中有些主要的方法:onElement、onEventTime、onProcessingTime。 - Evictors(可選擇是否指定)
在 trigger 觸發(fā)后、調(diào)用窗口函數(shù)之前或之后從窗口中刪除元素。
指定 evictors 可以避免預聚合(pre-aggregation),因為窗口內(nèi)所有元素必須在應用計算之前傳遞給 evictors。
Flink 不保證窗口內(nèi)元素的順序。這意味著雖然 evictors 可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。
2 window 的觸發(fā)過程
KeyedStream 調(diào)用 window 函數(shù)會生成 WindowStream。WindowedStream 可以調(diào)用 reduce、aggregate、apply、process 等函數(shù)。 以下是 window 函數(shù)使用示例。
source.keyBy((KeySelector<Tuple2<Long, String>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply(...);
window 函數(shù)源碼如下。返回了一個 WindowedStream。
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(
WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
觀察 reduce、aggregate、apply、process 等處理函數(shù),會看到會進一步調(diào)用屬性 builder 的對應的 reduce、aggregate、apply、process 方法。以 reduce 源碼為例,可以看到倒數(shù)第2行 builder.reduce。
@Internal
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
ProcessWindowFunction<T, R, K, W> function,
TypeInformation<R> resultType) {
// clean the closures
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
final String opName = builder.generateOperatorName();
final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
return input.transform(opName, resultType, operator).setDescription(opDescription);
}
builder 是 WindowOperatorBuilder,在 WindowedStream 的構(gòu)造函數(shù)中有其初始化。builder 定義為:文章來源:http://www.zghlxwxcb.cn/news/detail-793899.html
private final WindowOperatorBuilder<T, K, W> builder;
進一步觀察 builder.reduce(),看到其最終返回是 WindowOperator。在函數(shù)返回時會判斷evictor是否為空,走不同的構(gòu)造 WindowOperator 的邏輯,如果 evictor 不為空就構(gòu)造 EvictingWindowOperator 對象,否則就構(gòu)造 WindowOperator 對象,其實 EvictingWindowOperator 是 WindowOperator 的一個子類,只是多了一個刪除數(shù)據(jù)的邏輯。WindowOperator 在創(chuàng)建時會傳入一個 StateDescriptor 用于創(chuàng)建狀態(tài),存儲中間結(jié)果或元素。
WindowOperator 的具體分析見:WindowOperator 的分析筆記。文章來源地址http://www.zghlxwxcb.cn/news/detail-793899.html
public <R> WindowOperator<K, T, ?, R, W> reduce(
ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
Preconditions.checkNotNull(function, "WindowFunction cannot be null");
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException(
"ReduceFunction of apply can not be a RichFunction.");
}
if (evictor != null) {
return buildEvictingWindowOperator(
new InternalIterableWindowFunction<>(
new ReduceApplyWindowFunction<>(reduceFunction, function)));
} else {
ReducingStateDescriptor<T> stateDesc =
new ReducingStateDescriptor<>(
WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));
return buildWindowOperator(
stateDesc, new InternalSingleValueWindowFunction<>(function));
}
}
到了這里,關(guān)于Flink window 源碼分析1:窗口整體執(zhí)行流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!