為了讓代碼有更強大的表現(xiàn)力和易用性,F(xiàn)link 本身提供了多層 API
在更底層,我們可以不定義任何具體的算子(比如 map,filter,或者 window),而只是提煉出一個統(tǒng)一的“處理”(process)操作——它是所有轉(zhuǎn)換算子的一個概括性的表達,可以自定義處理邏輯,所以這一層接口就被叫作“處理函數(shù)”(process function)。是整個DataStream API的基礎(chǔ)
7.1 基本處理函數(shù)
處理函數(shù)主要是定義數(shù)據(jù)流的轉(zhuǎn)換操作,F(xiàn)link提供的處理函數(shù)類接口ProcessFunction
7.1.1 處理函數(shù)的功能和使用
我們之前講過的MapFunction(一一處理,僅僅拿到數(shù)據(jù))、AggregateFunction(窗口聚合,除了數(shù)據(jù)還可以拿到當(dāng)前的狀態(tài))
另外,RichMapFunction提供了獲取上下文的方法——getRuntimeContext(),可以拿到狀態(tài),并行度、任務(wù)名等運行時信息
但上面這些無法拿到事件的時間戳或者當(dāng)前水位線。
而在很多應(yīng)用需求中,要求我們對時間有更精細的控制,需要能夠獲取水位線,甚至要“把控時間”、定義什么時候做什么事,這就不是基本的時間窗口能夠?qū)崿F(xiàn)的了,所以這個時候就要用到底層的API——處理函數(shù)ProcessFunction了
- 提供“定時服務(wù)”,可以通過它訪問事件流中的事件、時間戳、水位線,甚至可以注冊“定時事件”
- 繼承了AbstractRichFunction,擁有富函數(shù)所有特性
- 可以直接將數(shù)據(jù)輸出到側(cè)輸出流
使用:
? 直接基于 DataStream 調(diào)用.process()方法就可以了。方法需要傳入一個 ProcessFunction 作為參數(shù),用來定義處理邏輯。
stream.process(new MyProcessFunction())
7.1.2 ProcessFunction解析
public abstract class ProcessFunction<I, O> extends AbstractRichFunction{
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3);
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out);
}
1.抽象方法.processElement()
- var1:正在處理的數(shù)據(jù)
- var2:上下文
- var3:“收集器”,用于返回數(shù)據(jù)
2.非抽象方法.onTimer()
- 用于定義定時觸發(fā)的操作
7.1.3 處理函數(shù)的分類
Flink 中的處理函數(shù)其實是一個大家族,ProcessFunction 只是其中一員
Flink 提供了 8 個不同的處理函數(shù):
(1) ProcessFunction
? 最基本的處理函數(shù),基于 DataStream 直接調(diào)用.process()時作為參數(shù)傳入
(2) KeyedProcessFunction
? 對流按鍵分區(qū)后的處理函數(shù),基于 KeyedStream 調(diào)用.process()時作為參數(shù)傳入。要想使用定時器,比如基于 KeyedStream
(3) ProcessWindowFunction
? 開窗之后的處理函數(shù),也是全窗口函數(shù)的代表。基于 WindowedStream 調(diào)用.process()時作為參數(shù)傳入
(4)ProcessAllWindowFunction
? 同樣是開窗之后的處理函數(shù),基于 AllWindowedStream 調(diào)用.process()時作為參數(shù)傳入
(5) CoProcessFunction
? 合并(connect)兩條流之后的處理函數(shù),基于 ConnectedStreams 調(diào)用.process()時作為參數(shù)傳入
(6) ProcessJoinFunction
? 間隔連接(interval join)兩條流之后的處理函數(shù),基于 IntervalJoined 調(diào)用.process()時作為參數(shù)傳入
(7)BroadcastProcessFunction
? 廣播連接流處理函數(shù),基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。這里的“廣播連接流”BroadcastConnectedStream,是一個未 keyBy 的普通 DataStream 與一個廣播流(BroadcastStream)做連接(conncet)之后的產(chǎn)物
(8) KeyedBroadcastProcessFunction
? 按鍵分區(qū)的廣播連接流處理函數(shù),同樣是基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流,是一個 KeyedStream 與廣播流(BroadcastStream)做連接之后的產(chǎn)物
7.2 按鍵分區(qū)處理函數(shù)
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
只有在 KeyedStream 中才支持使用 TimerService 設(shè)置定時器的操作,所以一般情況下,我們都是先做了 keyBy 分區(qū)之后,再去定義處理操作;代碼中更加常見的處理函數(shù)是 KeyedProcessFunction,最基本的 ProcessFunction 反而出鏡率沒那么高。KeyedProcessFunction 可以說是處理函數(shù)中的“嫡系部隊”,可以認(rèn)為是 ProcessFunction 的一個擴展。
7.2.1 定時器(Timer)和定時服務(wù)(TimerService)
首先通過定時服務(wù)注冊一個定時器,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一個 TimerService 對象。
TimerService 是 Flink 關(guān)于時間和定時器的基礎(chǔ)服務(wù)接口,包含以下六個方法:
// 獲取當(dāng)前的處理時間
long currentProcessingTime();
// 獲取當(dāng)前的水位線(事件時間)
long currentWatermark();
// 注冊處理時間定時器,當(dāng)處理時間超過 time 時觸發(fā)
void registerProcessingTimeTimer(long time);
// 注冊事件時間定時器,當(dāng)水位線超過 time 時觸發(fā)
void registerEventTimeTimer(long time);
// 刪除觸發(fā)時間為 time 的處理時間定時器
void deleteProcessingTimeTimer(long time);
// 刪除觸發(fā)時間為 time 的處理時間定時器
void deleteEventTimeTimer(long time);
7.2.2 KeyedProcessFunction的使用
與 ProcessFunction 的定義幾乎完全一樣,區(qū)別只是在于類型參數(shù)多了一個 K, 這是當(dāng)前按鍵分區(qū)的 key 的類型。在KeyedProcessFunction中可以注冊定時器,定義定時器觸發(fā)邏輯。
KeyedProcessFunction是個抽象類,繼承了AbstractRichFunction。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
主要有兩個核心的方法:
// 定義處理每個元素的邏輯
public abstract void processElement(I value, Context ctx, Collector<O> out)
// 定時器觸發(fā)時處理邏輯
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
從上面可以看到,參數(shù)里面都有Context(這里OnTimerContext繼承了Context),所以都可以通過
ctx.timerService().registerEventTimeTimer(long time);
去注冊定時器。
示例:
自定義數(shù)據(jù)源
public class CustomSource implements SourceFunction<Event> {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 直接發(fā)出一條數(shù)據(jù)
ctx.collect(new Event("Mark", "./hhhh.com", 1000L));
// 中間停頓5秒
Thread.sleep(5000L);
// 發(fā)出10秒后的數(shù)據(jù)
ctx.collect(new Event("Mark", "/home", 11000L));
Thread.sleep(5000L);
// 發(fā)出 10 秒+1ms 后的數(shù)據(jù)
ctx.collect(new Event("Alice", "./cart", 11001L));
Thread.sleep(5000L);
}
@Override
public void cancel() {
}
}
創(chuàng)建一個KeyedProcessFunction實現(xiàn)類
public class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean, Event, String> {
@Override
public void processElement(Event value, KeyedProcessFunction<Boolean, Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("數(shù)據(jù)到達,時間戳為:" + ctx.timestamp());
out.collect("數(shù)據(jù)到達,水位線為:" + ctx.timerService().currentWatermark());
// 注冊一個 1 秒后的定時器
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1000L);
out.collect(String.format("注冊定時器:%d%n-------分割線-------", ctx.timestamp() + 1000L));
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Boolean, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("定時器觸發(fā),觸發(fā)時間:" + timestamp);
}
}
主函數(shù)
public class EventTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
}));
stream.keyBy(data->true).process(new MyKeyedProcessFunction()).print();
env.execute();
}
}
輸出結(jié)果:
數(shù)據(jù)到達,時間戳為:1000
數(shù)據(jù)到達,水位線為:-9223372036854775808
注冊定時器:2000
-------分割線-------
數(shù)據(jù)到達,時間戳為:11000
數(shù)據(jù)到達,水位線為:999
注冊定時器:12000
-------分割線-------
定時器觸發(fā),觸發(fā)時間:2000
數(shù)據(jù)到達,時間戳為:11001
數(shù)據(jù)到達,水位線為:10999
注冊定時器:12001
-------分割線-------
定時器觸發(fā),觸發(fā)時間:12000
定時器觸發(fā),觸發(fā)時間:12001
輸出結(jié)果解釋:
當(dāng)?shù)谝粭l數(shù)據(jù) Event(“Mark”, “./hhhh.com”, 1000L) 過來,由于水位線生成的周期是默認(rèn)(200ms)一次,所以第一次數(shù)據(jù)過來時,水位線沒有更新,為默認(rèn)值Long.MIN_VALUE,此時注冊一個以事件時間為準(zhǔn)加1000ms的定時器。所以輸出就是:
數(shù)據(jù)到達,時間戳為:1000
數(shù)據(jù)到達,水位線為:-9223372036854775808
注冊定時器:2000
-------分割線-------過了200ms后,到了水位線生成時間,此時最大時間戳為1000,由于沒有設(shè)置水位線延遲,所以默認(rèn)減1ms。此時水位線為:1000-1=999.并未達到定時器觸發(fā)時間(2000)
過了5秒鐘第二條數(shù)據(jù) Event(“Mark”, “/home”, 11000L) 過來,輸出并注冊了一個12000的定時器:
數(shù)據(jù)到達,時間戳為:11000
數(shù)據(jù)到達,水位線為:999
注冊定時器:12000
-------分割線-------達到水位線生成時間后,更新為11000-1=10999,此時達到(注冊定時器:2000)觸發(fā)時間,所以輸出:
定時器觸發(fā),觸發(fā)時間:2000
過了5秒,數(shù)據(jù) Event(“Alice”, “./cart”, 11001L) 過來,輸出并注冊了一個12001的定時器
數(shù)據(jù)到達,時間戳為:11001
數(shù)據(jù)到達,水位線為:10999
注冊定時器:12001
-------分割線-------達到水位線生成時間后,更新為11001-1=11000
過了5秒,數(shù)據(jù)發(fā)送執(zhí)行完畢,第三條數(shù)據(jù)發(fā)出后再過 5 秒,沒有更多的數(shù)據(jù)生成了,整個程序運行結(jié)束將要退出,此時 Flink 會自動將水位線推進到長整型的最大值(Long.MAX_VALUE)。于是所有尚未觸發(fā)的定時器這時就統(tǒng)一觸發(fā)了,輸出
定時器觸發(fā),觸發(fā)時間:12000
定時器觸發(fā),觸發(fā)時間:12001
7.3 窗口處理函數(shù)
除了按鍵分區(qū)的處理,還有就是窗口數(shù)據(jù)的處理,常用的有:
- ProcessWindowFunction
- ProcessAllWindowFunction
7.3.1 窗口處理函數(shù)的使用
進行窗口計算,我們可以直接調(diào)用現(xiàn)成的簡單聚合方法(sum/max/min),也可以通過調(diào)用.reduce()或.aggregate()來自定義一般的增量聚合函數(shù)(ReduceFunction/AggregateFucntion);而對于更加復(fù)雜、需要窗口信息和額外狀態(tài)的一些場景,我們還可以直接使用全窗口函數(shù)、把數(shù)據(jù)全部收集保存在窗口內(nèi),等到觸發(fā)窗口計算時再統(tǒng)一處理。窗口處理函數(shù)就是一種典型的全窗口函數(shù)。
窗口處理函數(shù) ProcessWindowFunction 的使用與其他窗口函數(shù)類似,也是基于WindowedStream 直接調(diào)用方法就可以,只不過這時調(diào)用的是.process()
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
7.3.2 ProcessWindowFunction 解析
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction
/*
IN: 輸入數(shù)據(jù)類型
OUT:輸出數(shù)據(jù)類型
KEY:數(shù)據(jù)中key的類型
W:窗口類型
*/
方法:
// 窗口數(shù)據(jù)的處理
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out);
/*
key: 鍵
context: 上下文
elements: 窗口收集到用來計算的所有數(shù)據(jù),這是一個可迭代的集合類型
out: 發(fā)送輸出結(jié)果的收集器
*/
// 這主要是方便我們進行窗口的清理工作。如果我們自定義了窗口狀態(tài),那么必須在.clear()方法中進行顯式地清除,避免內(nèi)存溢出
public void clear(Context context);
還定義了一個抽象類
public abstract class Context implements java.io.Serializable
// 我們之前可以看到,processFunction用的都是Context,但是這里ProcessWindowFunction 自己定義了一個Context,他是沒有定時器的。為什么呢?因為本身窗口操作已經(jīng)起到了一個觸發(fā)計算的時間點,一般情況下是沒有必要去做定時操作的。如果非要這么做,可以使用窗口觸發(fā)器Trigger,里面有一個TriggerContext
ProcessAllWindowFunction的用法相似
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessAllWindowFunction())
7.4 應(yīng)用案例——TopN
窗口的計算處理,在實際應(yīng)用中非常常見。對于一些比較復(fù)雜的需求,如果增量聚合函數(shù)無法滿足,我們就需要考慮使用窗口處理函數(shù)這樣的“大招”了。
網(wǎng)站中一個非常經(jīng)典的例子,就是實時統(tǒng)計一段時間內(nèi)的熱門 url。例如,需要統(tǒng)計最近
10 秒鐘內(nèi)最熱門的兩個 url 鏈接,并且每 5 秒鐘更新一次。我們知道,這可以用一個滑動窗口來實現(xiàn),而“熱門度”一般可以直接用訪問量來表示。于是就需要開滑動窗口收集 url 的訪問數(shù)據(jù),按照不同的 url 進行統(tǒng)計,而后匯總排序并最終輸出前兩名。這其實就是著名的“Top N” 問題。
很顯然,簡單的增量聚合可以得到 url 鏈接的訪問量,但是后續(xù)的排序輸出 Top N 就很難實現(xiàn)了。所以接下來我們用窗口處理函數(shù)進行實現(xiàn)。
7.4.1 使用 ProcessAllWindowFunction
一種最簡單的想法是,我們干脆不區(qū)分 url 鏈接,而是將所有訪問數(shù)據(jù)都收集起來,統(tǒng)一進行統(tǒng)計計算。所以可以不做 keyBy,直接基于 DataStream 開窗,然后使用全窗口函數(shù)ProcessAllWindowFunction 來進行處理。
在窗口中可以用一個 HashMap 來保存每個 url 的訪問次數(shù),只要遍歷窗口中的所有數(shù)據(jù), 自然就能得到所有 url 的熱門度。最后把 HashMap 轉(zhuǎn)成一個列表 ArrayList,然后進行排序、取出前兩名輸出就可以了
public class ProcessAllWindowTopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.getTimestamp();
}
}));
SingleOutputStreamOperator<String> urlStream = stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event event) throws Exception {
return event.getUrl();
}
});
SingleOutputStreamOperator<String> result = urlStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<String, String, TimeWindow>.Context context,
Iterable<String> elements, Collector<String> out) throws Exception {
HashMap<String, Long> urlCountMap = new HashMap<>(10);
for (String url : elements) {
if (urlCountMap.containsKey(url)) {
long count = urlCountMap.get(url);
urlCountMap.put(url, count + 1);
} else {
urlCountMap.put(url, 1L);
}
}
// 轉(zhuǎn)存為ArrayList
ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();
for (String key : urlCountMap.keySet()) {
mapList.add(Tuple2.of(key, urlCountMap.get(key)));
}
mapList.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return o2.f1.intValue() - o1.f1.intValue();
}
});
// 取排序后的前兩名,構(gòu)建輸出結(jié)果
StringBuilder result = new StringBuilder();
result.append("========================================\n");
for (int i = 0; i < 2; i++) {
Tuple2<String, Long> temp = mapList.get(i);
String info = "瀏覽量 No." + (i + 1) +
" url:" + temp.f0 +
" 瀏覽量:" + temp.f1 +
" 窗口結(jié)束時間:" + new Timestamp(context.window().getEnd()) + "\n";
result.append(info);
}
result.append("========================================\n");
out.collect(result.toString());
}
});
result.print();
env.execute();
}
}
7.4.2 使用KeyedProcessFunction
直接將所有數(shù)據(jù)放在一個分區(qū)上進行開窗操作。這相當(dāng)于將并行度強行設(shè)置為 1,在實際應(yīng)用中是要盡量避免的。
思路:
(1)讀取數(shù)據(jù)源
(2)提取時間戳并生成水位線
(3)按照url進行keyBy分區(qū)
(4)開長度為10s步長為5的滑動窗口
(5)使用增量聚合函數(shù) AggregateFunction,并結(jié)合全窗口函數(shù) WindowFunction 進行窗口聚合,得到每個 url、在每個統(tǒng)計窗口內(nèi)的瀏覽量,包裝成 UrlViewCount
(6)按照窗口進行 keyBy 分區(qū)操作
(7)對同一窗口的統(tǒng)計結(jié)果數(shù)據(jù),使用 KeyedProcessFunction 進行收集并排序輸出
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-7iv6fbOt-1693232836517)(第七章處理函數(shù).assets/image-20230406003916609.png)]
// 自定義增量聚合
public class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
便于按窗口統(tǒng)計
public class UrlViewCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
this.url = url;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "UrlViewCount{" +
"url='" + url + '\'' +
", count=" + count +
", windowStart=" + windowStart +
", windowEnd=" + windowEnd +
'}';
}
}
窗口聚合函數(shù)
public class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void process(String url, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
System.out.println(url);
System.out.println(elements);
out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
}
}
排序取TopN
public class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
private Integer n;
// 定義一個列表狀態(tài)
private ListState<UrlViewCount> urlViewCountListState;
public TopN(Integer n) {
this.n = n;
}
@Override
public void open(Configuration parameters) throws Exception {
// 從環(huán)境中獲取列表狀態(tài)句柄
urlViewCountListState = getRuntimeContext().getListState(
new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
}
@Override
public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
// 將 count 數(shù)據(jù)添加到列表狀態(tài)中,保存起來
urlViewCountListState.add(value);
// 注冊 window end + 1ms 后的定時器,等待所有數(shù)據(jù)到齊開始排序
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// 將數(shù)據(jù)從列表狀態(tài)變量中取出,放入 ArrayList,方便排序
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for (UrlViewCount urlViewCount : urlViewCountListState.get()) {
urlViewCountArrayList.add(urlViewCount);
}
// 清空狀態(tài),釋放資源
urlViewCountListState.clear();
// 排 序
urlViewCountArrayList.sort(new Comparator<UrlViewCount>(){
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return o2.count.intValue() - o1.count.intValue();
}
});
// 取前兩名,構(gòu)建輸出結(jié)果
StringBuilder result = new StringBuilder(); result.append("========================================\n");
result.append("窗口結(jié)束時間:" + new Timestamp(timestamp - 1) + "\n");
for (int i = 0; i < this.n; i++) {
UrlViewCount UrlViewCount = urlViewCountArrayList.get(i); String info = "No." + (i + 1) + " "
+ "url:" + UrlViewCount.url + " "
+ "瀏覽量:" + UrlViewCount.count + "\n"; result.append(info);
} result.append("========================================\n");
out.collect(result.toString());
}
}
主方法:
public class KeyedProcessTopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 從自定義數(shù)據(jù)源讀取數(shù)據(jù)
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
}));
SingleOutputStreamOperator<UrlViewCount> urlCountStream = eventStream.keyBy(Event::getUrl)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd).process(new TopN(2));
result.print();
env.execute();
}
}
其實這里面是可以優(yōu)化的。每次其實是把所有url——count都會發(fā)過來,保存到一個列表狀態(tài)中。雖然只是一個窗口的,但是如果數(shù)據(jù)量大的話還是可以優(yōu)化的。
7.5 側(cè)輸出流
處理函數(shù)還有另外一個特有功能,就是將自定義的數(shù)據(jù)放入“側(cè)輸出流”(side output)輸出。這個概念我們并不陌生,之前在講到窗口處理遲到數(shù)據(jù)時,最后一招就是輸出到側(cè)輸出流。而這種處理方式的本質(zhì),其實就是處理函數(shù)的側(cè)輸出流功能。
我們之前講到的絕大多數(shù)轉(zhuǎn)換算子,輸出的都是單一流,流里的數(shù)據(jù)類型只能有一種。而側(cè)輸出流可以認(rèn)為是“主流”上分叉出的“支流”,所以可以由一條流產(chǎn)生出多條流,而且這些流中的數(shù)據(jù)類型還可以不一樣。利用這個功能可以很容易地實現(xiàn)“分流”操作。
具體應(yīng)用時,只要在處理函數(shù)的.processElement()或者.onTimer()方法中,調(diào)用上下文的.output()方法就可以了
DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> process = eventStream.process(new ProcessFunction<Integer, Long>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, Long>.Context ctx, Collector<Long> out) throws Exception {
out.collect(Long.valueOf(value));
ctx.output(outputTag, "side-output: " + value);
}
});
這里 output()方法需要傳入兩個參數(shù),第一個是一個“輸出標(biāo)簽”O(jiān)utputTag,用來標(biāo)識側(cè)輸出流,一般會在外部統(tǒng)一聲明;第二個就是要輸出的數(shù)據(jù)。
我們可以在外部先將 OutputTag 聲明出來文章來源:http://www.zghlxwxcb.cn/news/detail-684128.html
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
如果想要獲取這個側(cè)輸出流,可以基于處理之后的 DataStream 直接調(diào)用.getSideOutput() 方法,傳入對應(yīng)的 OutputTag,這個方式與窗口 API 中獲取側(cè)輸出流是完全一樣的。文章來源地址http://www.zghlxwxcb.cn/news/detail-684128.html
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
到了這里,關(guān)于《Flink學(xué)習(xí)筆記》——第七章 處理函數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!