1.窗口聚合算子
在Flink中窗口聚合算子主要分類兩類
- 滾動(dòng)聚合算子(增量聚合)
- 全窗口聚合算子(全量聚合)
1.1 滾動(dòng)聚合算子
滾動(dòng)聚合算子一次只處理一條數(shù)據(jù),通過算子中的累加器對聚合結(jié)果進(jìn)行更新,當(dāng)窗口觸發(fā)時(shí)再從累加器中取結(jié)果數(shù)據(jù),一般使用算子如下:
- aggregate
- max
- maxBy
- min
- minBy
- reduce
- sum
這里以aggregate
算子作為示例
// ...
// 每10s統(tǒng)計(jì)一次每個(gè)用戶最近30s的行為條數(shù)
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId())
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 參數(shù)1:窗口長度 參數(shù)2:滑動(dòng)步長即計(jì)算頻率
.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
// 這里給一個(gè)初始值
@Override
public Tuple2<String, Integer> createAccumulator() {
return Tuple2.of("", 0);
}
// 在累加器中統(tǒng)計(jì)每個(gè)用戶行為條數(shù)(來一條更新一次)
@Override
public Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {
Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);
return result;
}
// 將累加器中的更新結(jié)果給到getResult方法,輸出
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
return accumulator;
}
// 這個(gè)方法在流式計(jì)算中可以不用實(shí)現(xiàn),在上下游數(shù)據(jù)進(jìn)行合并時(shí)需要用到,以spark為例,上有map和下游reduce的計(jì)算結(jié)果需要合并時(shí)需要實(shí)現(xiàn)這個(gè)方法
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);
return merged;
}
});
// ...
只展示部分代碼,冗余代碼已省略.
圖解如下:
1.2 全窗口聚合算子
全窗口聚合算子會(huì)將數(shù)據(jù)記錄在狀態(tài)容器中,當(dāng)窗口觸發(fā)時(shí)會(huì)將整個(gè)窗口中的數(shù)據(jù)交給聚合函數(shù),根據(jù)具體邏輯將這些數(shù)據(jù)進(jìn)行計(jì)算,常用算子如下:
- apply
- process
這里以apply
算子為例文章來源:http://www.zghlxwxcb.cn/news/detail-724206.html
// ...
// 每10s統(tǒng)計(jì)一次最近30s每個(gè)用戶行為發(fā)生事件最大兩條數(shù)據(jù)
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
// 泛型1: 數(shù)據(jù)數(shù)據(jù)類型 泛型2: 輸出數(shù)據(jù)類型 泛型3: key類型 泛型4: 窗口類型
.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {
/**
*@Param s 本次傳入的key
*@Param window 本次傳入窗口的各種元信息
*@Param input 本次輸入的所有數(shù)據(jù)
*@Param out 輸出數(shù)據(jù)
**/
@Override
public void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {
// 創(chuàng)建集合接收迭代器中的數(shù)據(jù)
ArrayList<UserEvent2> userEvent2List = new ArrayList<>();
// 遍歷迭代器,也就是輸入數(shù)據(jù)
for (UserEvent2 userEvent2 : input) {
// 將數(shù)據(jù)添加到集合中
userEvent2List.add(userEvent2);
}
// 將集合中的數(shù)據(jù)根據(jù)用戶行為發(fā)生事件進(jìn)行排序
Collections.sort(userEvent2List, new Comparator<UserEvent2>() {
@Override
public int compare(UserEvent2 o1, UserEvent2 o2) {
// 倒序排序
return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());
}
});
// 將每個(gè)用戶行為發(fā)生時(shí)間最大的兩條數(shù)據(jù)輸出
for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {
out.collect(userEvent2List.get(i));
}
}
});
// ...
只展示部分代碼,冗余代碼已省略.
圖解如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-724206.html
到了這里,關(guān)于Flink之窗口聚合算子的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!