国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

《Flink學(xué)習(xí)筆記》——第七章 處理函數(shù)

這篇具有很好參考價值的文章主要介紹了《Flink學(xué)習(xí)筆記》——第七章 處理函數(shù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

為了讓代碼有更強大的表現(xiàn)力和易用性,F(xiàn)link 本身提供了多層 API

在更底層,我們可以不定義任何具體的算子(比如 map,filter,或者 window),而只是提煉出一個統(tǒng)一的“處理”(process)操作——它是所有轉(zhuǎn)換算子的一個概括性的表達,可以自定義處理邏輯,所以這一層接口就被叫作“處理函數(shù)”(process function)。是整個DataStream API的基礎(chǔ)

7.1 基本處理函數(shù)

處理函數(shù)主要是定義數(shù)據(jù)流的轉(zhuǎn)換操作,F(xiàn)link提供的處理函數(shù)類接口ProcessFunction

7.1.1 處理函數(shù)的功能和使用

我們之前講過的MapFunction(一一處理,僅僅拿到數(shù)據(jù))、AggregateFunction(窗口聚合,除了數(shù)據(jù)還可以拿到當(dāng)前的狀態(tài))

另外,RichMapFunction提供了獲取上下文的方法——getRuntimeContext(),可以拿到狀態(tài),并行度、任務(wù)名等運行時信息

但上面這些無法拿到事件的時間戳或者當(dāng)前水位線。

而在很多應(yīng)用需求中,要求我們對時間有更精細的控制,需要能夠獲取水位線,甚至要“把控時間”、定義什么時候做什么事,這就不是基本的時間窗口能夠?qū)崿F(xiàn)的了,所以這個時候就要用到底層的API——處理函數(shù)ProcessFunction了

  • 提供“定時服務(wù)”,可以通過它訪問事件流中的事件、時間戳、水位線,甚至可以注冊“定時事件”
  • 繼承了AbstractRichFunction,擁有富函數(shù)所有特性
  • 可以直接將數(shù)據(jù)輸出到側(cè)輸出流

使用:

? 直接基于 DataStream 調(diào)用.process()方法就可以了。方法需要傳入一個 ProcessFunction 作為參數(shù),用來定義處理邏輯。

stream.process(new MyProcessFunction())
7.1.2 ProcessFunction解析
public abstract class ProcessFunction<I, O> extends AbstractRichFunction{
    public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3);
    public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out);
}

1.抽象方法.processElement()

  • var1:正在處理的數(shù)據(jù)
  • var2:上下文
  • var3:“收集器”,用于返回數(shù)據(jù)

2.非抽象方法.onTimer()

  • 用于定義定時觸發(fā)的操作
7.1.3 處理函數(shù)的分類

Flink 中的處理函數(shù)其實是一個大家族,ProcessFunction 只是其中一員

Flink 提供了 8 個不同的處理函數(shù):

(1) ProcessFunction

? 最基本的處理函數(shù),基于 DataStream 直接調(diào)用.process()時作為參數(shù)傳入

(2) KeyedProcessFunction

? 對流按鍵分區(qū)后的處理函數(shù),基于 KeyedStream 調(diào)用.process()時作為參數(shù)傳入。要想使用定時器,比如基于 KeyedStream

(3) ProcessWindowFunction

? 開窗之后的處理函數(shù),也是全窗口函數(shù)的代表。基于 WindowedStream 調(diào)用.process()時作為參數(shù)傳入

(4)ProcessAllWindowFunction

? 同樣是開窗之后的處理函數(shù),基于 AllWindowedStream 調(diào)用.process()時作為參數(shù)傳入

(5) CoProcessFunction

? 合并(connect)兩條流之后的處理函數(shù),基于 ConnectedStreams 調(diào)用.process()時作為參數(shù)傳入

(6) ProcessJoinFunction

? 間隔連接(interval join)兩條流之后的處理函數(shù),基于 IntervalJoined 調(diào)用.process()時作為參數(shù)傳入

(7)BroadcastProcessFunction

? 廣播連接流處理函數(shù),基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。這里的“廣播連接流”BroadcastConnectedStream,是一個未 keyBy 的普通 DataStream 與一個廣播流(BroadcastStream)做連接(conncet)之后的產(chǎn)物

(8) KeyedBroadcastProcessFunction

? 按鍵分區(qū)的廣播連接流處理函數(shù),同樣是基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流,是一個 KeyedStream 與廣播流(BroadcastStream)做連接之后的產(chǎn)物

7.2 按鍵分區(qū)處理函數(shù)

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

只有在 KeyedStream 中才支持使用 TimerService 設(shè)置定時器的操作,所以一般情況下,我們都是先做了 keyBy 分區(qū)之后,再去定義處理操作;代碼中更加常見的處理函數(shù)是 KeyedProcessFunction,最基本的 ProcessFunction 反而出鏡率沒那么高。KeyedProcessFunction 可以說是處理函數(shù)中的“嫡系部隊”,可以認(rèn)為是 ProcessFunction 的一個擴展。

7.2.1 定時器(Timer)和定時服務(wù)(TimerService)

首先通過定時服務(wù)注冊一個定時器,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一個 TimerService 對象。

TimerService 是 Flink 關(guān)于時間和定時器的基礎(chǔ)服務(wù)接口,包含以下六個方法:

// 獲取當(dāng)前的處理時間
long currentProcessingTime();

// 獲取當(dāng)前的水位線(事件時間)
long currentWatermark();

// 注冊處理時間定時器,當(dāng)處理時間超過 time 時觸發(fā)
void registerProcessingTimeTimer(long time);

// 注冊事件時間定時器,當(dāng)水位線超過 time 時觸發(fā)
void registerEventTimeTimer(long time);

// 刪除觸發(fā)時間為 time 的處理時間定時器
void deleteProcessingTimeTimer(long time);

// 刪除觸發(fā)時間為 time 的處理時間定時器
void deleteEventTimeTimer(long time);
7.2.2 KeyedProcessFunction的使用

與 ProcessFunction 的定義幾乎完全一樣,區(qū)別只是在于類型參數(shù)多了一個 K, 這是當(dāng)前按鍵分區(qū)的 key 的類型。在KeyedProcessFunction中可以注冊定時器,定義定時器觸發(fā)邏輯。

KeyedProcessFunction是個抽象類,繼承了AbstractRichFunction。

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

主要有兩個核心的方法:

// 定義處理每個元素的邏輯
public abstract void processElement(I value, Context ctx, Collector<O> out)

// 定時器觸發(fā)時處理邏輯
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

從上面可以看到,參數(shù)里面都有Context(這里OnTimerContext繼承了Context),所以都可以通過

ctx.timerService().registerEventTimeTimer(long time);

去注冊定時器。

示例:

自定義數(shù)據(jù)源

public class CustomSource implements SourceFunction<Event> {
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        // 直接發(fā)出一條數(shù)據(jù)
        ctx.collect(new Event("Mark", "./hhhh.com", 1000L));

        // 中間停頓5秒
        Thread.sleep(5000L);

        // 發(fā)出10秒后的數(shù)據(jù)
        ctx.collect(new Event("Mark", "/home", 11000L));
        Thread.sleep(5000L);

        // 發(fā)出 10 秒+1ms 后的數(shù)據(jù)
        ctx.collect(new Event("Alice", "./cart", 11001L));
        Thread.sleep(5000L);

    }

    @Override
    public void cancel() {

    }
}

創(chuàng)建一個KeyedProcessFunction實現(xiàn)類

public class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean, Event, String> {
    @Override
    public void processElement(Event value, KeyedProcessFunction<Boolean, Event, String>.Context ctx, Collector<String> out) throws Exception {
        out.collect("數(shù)據(jù)到達,時間戳為:" + ctx.timestamp());
        out.collect("數(shù)據(jù)到達,水位線為:" + ctx.timerService().currentWatermark());
        // 注冊一個 1 秒后的定時器
        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1000L);
        out.collect(String.format("注冊定時器:%d%n-------分割線-------", ctx.timestamp() + 1000L));
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Boolean, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        out.collect("定時器觸發(fā),觸發(fā)時間:" + timestamp);
    }
}

主函數(shù)

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));

        stream.keyBy(data->true).process(new MyKeyedProcessFunction()).print();

        env.execute();

    }
}

輸出結(jié)果:

數(shù)據(jù)到達,時間戳為:1000
數(shù)據(jù)到達,水位線為:-9223372036854775808
注冊定時器:2000
-------分割線-------
數(shù)據(jù)到達,時間戳為:11000
數(shù)據(jù)到達,水位線為:999
注冊定時器:12000
-------分割線-------
定時器觸發(fā),觸發(fā)時間:2000
數(shù)據(jù)到達,時間戳為:11001
數(shù)據(jù)到達,水位線為:10999
注冊定時器:12001
-------分割線-------
定時器觸發(fā),觸發(fā)時間:12000
定時器觸發(fā),觸發(fā)時間:12001

輸出結(jié)果解釋:

當(dāng)?shù)谝粭l數(shù)據(jù) Event(“Mark”, “./hhhh.com”, 1000L) 過來,由于水位線生成的周期是默認(rèn)(200ms)一次,所以第一次數(shù)據(jù)過來時,水位線沒有更新,為默認(rèn)值Long.MIN_VALUE,此時注冊一個以事件時間為準(zhǔn)加1000ms的定時器。所以輸出就是:

數(shù)據(jù)到達,時間戳為:1000
數(shù)據(jù)到達,水位線為:-9223372036854775808
注冊定時器:2000
-------分割線-------

過了200ms后,到了水位線生成時間,此時最大時間戳為1000,由于沒有設(shè)置水位線延遲,所以默認(rèn)減1ms。此時水位線為:1000-1=999.并未達到定時器觸發(fā)時間(2000)

過了5秒鐘第二條數(shù)據(jù) Event(“Mark”, “/home”, 11000L) 過來,輸出并注冊了一個12000的定時器:

數(shù)據(jù)到達,時間戳為:11000
數(shù)據(jù)到達,水位線為:999
注冊定時器:12000
-------分割線-------

達到水位線生成時間后,更新為11000-1=10999,此時達到(注冊定時器:2000)觸發(fā)時間,所以輸出:

定時器觸發(fā),觸發(fā)時間:2000

過了5秒,數(shù)據(jù) Event(“Alice”, “./cart”, 11001L) 過來,輸出并注冊了一個12001的定時器

數(shù)據(jù)到達,時間戳為:11001
數(shù)據(jù)到達,水位線為:10999
注冊定時器:12001
-------分割線-------

達到水位線生成時間后,更新為11001-1=11000

過了5秒,數(shù)據(jù)發(fā)送執(zhí)行完畢,第三條數(shù)據(jù)發(fā)出后再過 5 秒,沒有更多的數(shù)據(jù)生成了,整個程序運行結(jié)束將要退出,此時 Flink 會自動將水位線推進到長整型的最大值(Long.MAX_VALUE)。于是所有尚未觸發(fā)的定時器這時就統(tǒng)一觸發(fā)了,輸出

定時器觸發(fā),觸發(fā)時間:12000
定時器觸發(fā),觸發(fā)時間:12001

7.3 窗口處理函數(shù)

除了按鍵分區(qū)的處理,還有就是窗口數(shù)據(jù)的處理,常用的有:

  • ProcessWindowFunction
  • ProcessAllWindowFunction
7.3.1 窗口處理函數(shù)的使用

進行窗口計算,我們可以直接調(diào)用現(xiàn)成的簡單聚合方法(sum/max/min),也可以通過調(diào)用.reduce()或.aggregate()來自定義一般的增量聚合函數(shù)(ReduceFunction/AggregateFucntion);而對于更加復(fù)雜、需要窗口信息和額外狀態(tài)的一些場景,我們還可以直接使用全窗口函數(shù)、把數(shù)據(jù)全部收集保存在窗口內(nèi),等到觸發(fā)窗口計算時再統(tǒng)一處理。窗口處理函數(shù)就是一種典型的全窗口函數(shù)。

窗口處理函數(shù) ProcessWindowFunction 的使用與其他窗口函數(shù)類似,也是基于WindowedStream 直接調(diào)用方法就可以,只不過這時調(diào)用的是.process()

stream.keyBy( t -> t.f0 )
    .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessWindowFunction())
7.3.2 ProcessWindowFunction 解析
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
        extends AbstractRichFunction

/*
IN: 輸入數(shù)據(jù)類型
OUT:輸出數(shù)據(jù)類型
KEY:數(shù)據(jù)中key的類型
W:窗口類型
*/

方法:

// 窗口數(shù)據(jù)的處理
public abstract void process(
            KEY key, Context context, Iterable<IN> elements, Collector<OUT> out);
/*
key: 鍵
context: 上下文
elements: 窗口收集到用來計算的所有數(shù)據(jù),這是一個可迭代的集合類型
out: 發(fā)送輸出結(jié)果的收集器
*/


// 這主要是方便我們進行窗口的清理工作。如果我們自定義了窗口狀態(tài),那么必須在.clear()方法中進行顯式地清除,避免內(nèi)存溢出
public void clear(Context context);

還定義了一個抽象類

public abstract class Context implements java.io.Serializable
// 我們之前可以看到,processFunction用的都是Context,但是這里ProcessWindowFunction 自己定義了一個Context,他是沒有定時器的。為什么呢?因為本身窗口操作已經(jīng)起到了一個觸發(fā)計算的時間點,一般情況下是沒有必要去做定時操作的。如果非要這么做,可以使用窗口觸發(fā)器Trigger,里面有一個TriggerContext

ProcessAllWindowFunction的用法相似

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
	.process(new MyProcessAllWindowFunction())

7.4 應(yīng)用案例——TopN

窗口的計算處理,在實際應(yīng)用中非常常見。對于一些比較復(fù)雜的需求,如果增量聚合函數(shù)無法滿足,我們就需要考慮使用窗口處理函數(shù)這樣的“大招”了。

網(wǎng)站中一個非常經(jīng)典的例子,就是實時統(tǒng)計一段時間內(nèi)的熱門 url。例如,需要統(tǒng)計最近

10 秒鐘內(nèi)最熱門的兩個 url 鏈接,并且每 5 秒鐘更新一次。我們知道,這可以用一個滑動窗口來實現(xiàn),而“熱門度”一般可以直接用訪問量來表示。于是就需要開滑動窗口收集 url 的訪問數(shù)據(jù),按照不同的 url 進行統(tǒng)計,而后匯總排序并最終輸出前兩名。這其實就是著名的“Top N” 問題。

很顯然,簡單的增量聚合可以得到 url 鏈接的訪問量,但是后續(xù)的排序輸出 Top N 就很難實現(xiàn)了。所以接下來我們用窗口處理函數(shù)進行實現(xiàn)。

7.4.1 使用 ProcessAllWindowFunction

一種最簡單的想法是,我們干脆不區(qū)分 url 鏈接,而是將所有訪問數(shù)據(jù)都收集起來,統(tǒng)一進行統(tǒng)計計算。所以可以不做 keyBy,直接基于 DataStream 開窗,然后使用全窗口函數(shù)ProcessAllWindowFunction 來進行處理。

在窗口中可以用一個 HashMap 來保存每個 url 的訪問次數(shù),只要遍歷窗口中的所有數(shù)據(jù), 自然就能得到所有 url 的熱門度。最后把 HashMap 轉(zhuǎn)成一個列表 ArrayList,然后進行排序、取出前兩名輸出就可以了

public class ProcessAllWindowTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.getTimestamp();
            }
        }));

        SingleOutputStreamOperator<String> urlStream = stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event event) throws Exception {
                return event.getUrl();
            }
        });

        SingleOutputStreamOperator<String> result = urlStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(ProcessAllWindowFunction<String, String, TimeWindow>.Context context,
                                        Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>(10);
                        for (String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }

                        // 轉(zhuǎn)存為ArrayList
                        ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();
                        for (String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }
                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue();
                            }
                        });

                        // 取排序后的前兩名,構(gòu)建輸出結(jié)果
                        StringBuilder result = new StringBuilder();
                        result.append("========================================\n");
                        for (int i = 0; i < 2; i++) {
                            Tuple2<String, Long> temp = mapList.get(i);
                            String info = "瀏覽量 No." + (i + 1) +
                                    "  url:" + temp.f0 +
                                    "  瀏覽量:" + temp.f1 +
                                    "  窗口結(jié)束時間:" + new Timestamp(context.window().getEnd()) + "\n";
                            result.append(info);
                        }
                        result.append("========================================\n");
                        out.collect(result.toString());
                    }
                });
        result.print();
        env.execute();
    }
}

7.4.2 使用KeyedProcessFunction

直接將所有數(shù)據(jù)放在一個分區(qū)上進行開窗操作。這相當(dāng)于將并行度強行設(shè)置為 1,在實際應(yīng)用中是要盡量避免的。

思路:

(1)讀取數(shù)據(jù)源

(2)提取時間戳并生成水位線

(3)按照url進行keyBy分區(qū)

(4)開長度為10s步長為5的滑動窗口

(5)使用增量聚合函數(shù) AggregateFunction,并結(jié)合全窗口函數(shù) WindowFunction 進行窗口聚合,得到每個 url、在每個統(tǒng)計窗口內(nèi)的瀏覽量,包裝成 UrlViewCount

(6)按照窗口進行 keyBy 分區(qū)操作

(7)對同一窗口的統(tǒng)計結(jié)果數(shù)據(jù),使用 KeyedProcessFunction 進行收集并排序輸出

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-7iv6fbOt-1693232836517)(第七章處理函數(shù).assets/image-20230406003916609.png)]

// 自定義增量聚合
public class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(Event event, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long aLong, Long acc1) {
        return null;
    }
}

便于按窗口統(tǒng)計

public class UrlViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
        this.url = url;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + windowStart +
                ", windowEnd=" + windowEnd +
                '}';
    }
}

窗口聚合函數(shù)

public class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
    @Override
    public void process(String url, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
        long start = context.window().getStart();
        long end = context.window().getEnd();
        System.out.println(url);
        System.out.println(elements);
        out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
    }
}

排序取TopN

public class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
    private Integer n;
    // 定義一個列表狀態(tài)
    private ListState<UrlViewCount> urlViewCountListState;

    public TopN(Integer n) {
        this.n = n;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 從環(huán)境中獲取列表狀態(tài)句柄
        urlViewCountListState = getRuntimeContext().getListState(
                new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
    }

    @Override
    public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
        // 將 count 數(shù)據(jù)添加到列表狀態(tài)中,保存起來
        urlViewCountListState.add(value);
        // 注冊 window end + 1ms 后的定時器,等待所有數(shù)據(jù)到齊開始排序
        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        // 將數(shù)據(jù)從列表狀態(tài)變量中取出,放入 ArrayList,方便排序
        ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
        for (UrlViewCount urlViewCount : urlViewCountListState.get()) {
            urlViewCountArrayList.add(urlViewCount);
        }

        // 清空狀態(tài),釋放資源
        urlViewCountListState.clear();

        // 排 序
        urlViewCountArrayList.sort(new Comparator<UrlViewCount>(){
            @Override
            public int compare(UrlViewCount o1, UrlViewCount o2) {
                return o2.count.intValue() - o1.count.intValue();
            }
        });

        // 取前兩名,構(gòu)建輸出結(jié)果
        StringBuilder result = new StringBuilder(); result.append("========================================\n");
        result.append("窗口結(jié)束時間:" + new Timestamp(timestamp - 1) + "\n");
        for (int i = 0; i < this.n; i++) {
            UrlViewCount UrlViewCount = urlViewCountArrayList.get(i); String info = "No." + (i + 1) + " "
                    + "url:" + UrlViewCount.url + " "
                    + "瀏覽量:" + UrlViewCount.count + "\n"; result.append(info);
        } result.append("========================================\n");
        out.collect(result.toString());

    }
}

主方法:

public class KeyedProcessTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 從自定義數(shù)據(jù)源讀取數(shù)據(jù)
        SingleOutputStreamOperator<Event> eventStream	=	env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element,	long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));

        SingleOutputStreamOperator<UrlViewCount> urlCountStream = eventStream.keyBy(Event::getUrl)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd).process(new TopN(2));

        result.print();
        env.execute();

    }
}

其實這里面是可以優(yōu)化的。每次其實是把所有url——count都會發(fā)過來,保存到一個列表狀態(tài)中。雖然只是一個窗口的,但是如果數(shù)據(jù)量大的話還是可以優(yōu)化的。

7.5 側(cè)輸出流

處理函數(shù)還有另外一個特有功能,就是將自定義的數(shù)據(jù)放入“側(cè)輸出流”(side output)輸出。這個概念我們并不陌生,之前在講到窗口處理遲到數(shù)據(jù)時,最后一招就是輸出到側(cè)輸出流。而這種處理方式的本質(zhì),其實就是處理函數(shù)的側(cè)輸出流功能。

我們之前講到的絕大多數(shù)轉(zhuǎn)換算子,輸出的都是單一流,流里的數(shù)據(jù)類型只能有一種。而側(cè)輸出流可以認(rèn)為是“主流”上分叉出的“支流”,所以可以由一條流產(chǎn)生出多條流,而且這些流中的數(shù)據(jù)類型還可以不一樣。利用這個功能可以很容易地實現(xiàn)“分流”操作。

具體應(yīng)用時,只要在處理函數(shù)的.processElement()或者.onTimer()方法中,調(diào)用上下文的.output()方法就可以了

DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> process = eventStream.process(new ProcessFunction<Integer, Long>() {
    @Override
    public void processElement(Integer value, ProcessFunction<Integer, Long>.Context ctx, Collector<Long> out) throws Exception {
        out.collect(Long.valueOf(value));
        ctx.output(outputTag, "side-output: " + value);
    }
});

這里 output()方法需要傳入兩個參數(shù),第一個是一個“輸出標(biāo)簽”O(jiān)utputTag,用來標(biāo)識側(cè)輸出流,一般會在外部統(tǒng)一聲明;第二個就是要輸出的數(shù)據(jù)。

我們可以在外部先將 OutputTag 聲明出來

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

如果想要獲取這個側(cè)輸出流,可以基于處理之后的 DataStream 直接調(diào)用.getSideOutput() 方法,傳入對應(yīng)的 OutputTag,這個方式與窗口 API 中獲取側(cè)輸出流是完全一樣的。文章來源地址http://www.zghlxwxcb.cn/news/detail-684128.html

DataStream<String> stringStream = longStream.getSideOutput(outputTag);

到了這里,關(guān)于《Flink學(xué)習(xí)筆記》——第七章 處理函數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 《操作系統(tǒng)真象還原》學(xué)習(xí)筆記:第七章 中斷

    《操作系統(tǒng)真象還原》學(xué)習(xí)筆記:第七章 中斷

    由于 CPU 獲知了計算機中發(fā)生的某些事,CPU 暫停正在執(zhí)行的程序,轉(zhuǎn)而去執(zhí)行處理該事件的程序,當(dāng)這段程序執(zhí)行完畢后,CPU 繼續(xù)執(zhí)行剛才的程序。整個過程稱為中斷處理,也稱為中斷。 把中斷按事件來源分類,來自CPU外部的中斷就稱為外部中斷,來自CPU內(nèi)部的中斷就稱為

    2024年02月11日
    瀏覽(31)
  • 《Pytorch深度學(xué)習(xí)和圖神經(jīng)網(wǎng)絡(luò)(卷 1)》學(xué)習(xí)筆記——第七章

    《Pytorch深度學(xué)習(xí)和圖神經(jīng)網(wǎng)絡(luò)(卷 1)》學(xué)習(xí)筆記——第七章

    這一章內(nèi)容有點豐富,多用了一些時間,實例就有四五個。 這章內(nèi)容是真多?。。▽W(xué)完之后又回到開頭感嘆) 將圖像從基礎(chǔ)像素到局部信息再到整體信息 即將圖片由低級特征到高級特征進行逐級計算,逐級累計。 計算機中對圖片的處理可以理解為離散微積分的過程。 利用

    2024年02月12日
    瀏覽(21)
  • 第七章 函數(shù)矩陣

    第七章 函數(shù)矩陣

    和矩陣函數(shù)不同的是,函數(shù)矩陣本質(zhì)上是一個矩陣,是以函數(shù)作為元素的矩陣。 矩陣函數(shù)本質(zhì)上是一個矩陣,是以矩陣作為自變量的函數(shù)。 函數(shù)矩陣和數(shù)字矩陣的運算法則完全相同。 不過矩陣的元素 a i j ( x ) a_{ij}(x) a ij ? ( x ) 需要是閉區(qū)間 [ a , b ] [a,b] [ a , b ] 上的實函數(shù)

    2024年02月04日
    瀏覽(22)
  • Python之第七章 函數(shù) --- 基礎(chǔ)

    Python之第七章 函數(shù) --- 基礎(chǔ)

    目錄 Python之第七章 函數(shù) --- 基本 1.模塊化程序設(shè)計 1.基本思想 2.特點 2.定義函數(shù) 1.格式: 2.函數(shù)名: 3.形式參數(shù): 4.函數(shù)體 ?編輯 3.函數(shù)調(diào)用 1.作用 2.格式 3.調(diào)用方式 4.實例 4.return語句 1.作用 2.注意 3.return可以返回任意Python的對象 5.函數(shù)參數(shù) 1.位置參數(shù) ?2.參數(shù) 3.默

    2024年02月09日
    瀏覽(28)
  • 【OpenCV】第七章: 圖像平滑處理

    【OpenCV】第七章: 圖像平滑處理

    第七章: 圖像平滑處理 1、什么是圖像平滑處理 圖像平滑處理就是,將圖像中與 周圍像素點的像素值差異較大的像素點 調(diào)整成 和周圍像素點像素值 相近的值。 例如: 2、為什么要進行平滑處理? 因為圖像在采集(生成)、傳輸、處理的過程中常常會存在一定的噪聲干擾,比如

    2024年02月03日
    瀏覽(23)
  • 第七章——函數(shù)(C++的編程模塊)

    第七章——函數(shù)(C++的編程模塊)

    復(fù)習(xí)函數(shù)的基本知識 要使用C++函數(shù),必須完成如下工作: 提供函數(shù)定義 提供函數(shù)原型 調(diào)用函數(shù)? 庫函數(shù)是已經(jīng)定義和編譯好的函數(shù),同時可以使用標(biāo)準(zhǔn)庫頭文件提供其原型,因此只需要正確地調(diào)用這種函數(shù)即可。但是創(chuàng)建自己的函數(shù)時,必須自行處理上面提到的3個方面。

    2024年02月13日
    瀏覽(24)
  • 自然語言處理: 第七章GPT的搭建

    自然語言處理: 第七章GPT的搭建

    在以transformer架構(gòu)為框架的大模型遍地開花后,大模型的方向基本分成了三類分別是: decoder-only架構(gòu) , 其中以GPT系列為代表 encoder-only架構(gòu),其中以BERT系列為代表 encoder-decoder架構(gòu),標(biāo)準(zhǔn)的transformer架構(gòu)以BART和T5為代表 大模型的使用方法如下: 分解成pre-train 和fine-tuning ,其中pr

    2024年02月13日
    瀏覽(27)
  • 【MySQL新手到通關(guān)】第七章 聚合函數(shù)使用詳解

    【MySQL新手到通關(guān)】第七章 聚合函數(shù)使用詳解

    為了方便測試,我們導(dǎo)入一些數(shù)據(jù) 數(shù)據(jù)如下 什么是聚合函數(shù) 聚合函數(shù)作用于一組數(shù)據(jù),并對一組數(shù)據(jù)返回一個值。 聚合函數(shù)類型 AVG() 求平均值 SUM() 求和 MAX() 求最大值 MIN() 求最小值 COUNT() 求總行數(shù) 聚合函數(shù)語法 聚合函數(shù)不能嵌套調(diào)用。比如不能出現(xiàn)類似“AVG(SUM(字段名稱

    2024年02月08日
    瀏覽(21)
  • 【高數(shù)筆記】第七章 微分方程

    【高數(shù)筆記】第七章 微分方程

    微分方程 :含有導(dǎo)數(shù)的方程叫微分方程 階 :微分方程的導(dǎo)數(shù)最高是幾階導(dǎo)數(shù)。微分方程中所出現(xiàn)的未知函數(shù)的最高階導(dǎo)數(shù)的階數(shù),叫做微分方程的階 解 :微分方程的解是一個函數(shù),將這個函數(shù)代入,方程為恒等式。 通解 :如果微分方程的解中含有任意常數(shù),且任意常數(shù)的

    2024年02月08日
    瀏覽(30)
  • 信息技術(shù)導(dǎo)論 第七章 區(qū)塊鏈 筆記

    7.1.1?? 區(qū)塊鏈基本概念和特征 1、區(qū)塊鏈的基本概念 從科技層面來看,區(qū)塊鏈涉及數(shù)學(xué)、密碼學(xué)、互聯(lián)網(wǎng)和計算機編程等很多科學(xué)技術(shù)問題。從應(yīng)用視角來看,簡單來說,區(qū)塊鏈?zhǔn)且粋€分布式的共享賬本和數(shù)據(jù)庫,具有去中心化、不可算改、全程留痕、可以追溯,集體維護、

    2024年02月07日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包