国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))

這篇具有很好參考價值的文章主要介紹了7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點(diǎn)的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時以三個示例來展示watermark的實(shí)際可能的應(yīng)用場景。
本文依賴kafka環(huán)境可用。
本文分為四個部分,即EventTime與watermark介紹、watermark基本使用、kafka作為數(shù)據(jù)源的watermark使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)。

一、事件時間與watermark

1、EventTime介紹

處理時間是指執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時間。
事件時間是每個事件在其生產(chǎn)設(shè)備上發(fā)生的時間。

下圖形象的展示了event time 和 processing time的所處階段。一般將Flink data source下的箭頭表示為到達(dá)Flink的時間,即injestion time攝入時間。
7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

2、Eventtime與watermark

支持事件時間的流處理器需要一種方法來測量事件時間的進(jìn)度。例如,當(dāng)事件時間超過一小時結(jié)束時,需要通知構(gòu)建每小時窗口的窗口運(yùn)算符,以便操作員可以關(guān)閉正在進(jìn)行的窗口。

事件時間可以獨(dú)立于處理時間進(jìn)行。例如,在一個程序中,操作員的當(dāng)前事件時間可能略微落后于處理時間(考慮接收事件的延遲),而兩者以相同的速度進(jìn)行。另一方面,另一個流程序可能會通過快進(jìn)已經(jīng)在 Kafka 主題(或其他消息隊(duì)列)中緩沖的一些歷史數(shù)據(jù),在幾秒鐘的處理中完成數(shù)周的事件時間。

Flink 中衡量事件時間進(jìn)度的機(jī)制是水印。水印作為數(shù)據(jù)流的一部分流動,并帶有時間戳 t。水?。╰) 聲明事件時間已到達(dá)該流中的時間 t,這意味著流中不應(yīng)再有時間戳為 t’ <= t 的元素(即時間戳早于或等于水印的事件)。

下圖顯示了具有(邏輯)時間戳和內(nèi)聯(lián)流動水印的事件流。在此示例中,事件是按順序排列的(相對于其時間戳),這意味著水印只是流中的周期性標(biāo)記。

7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

水印對于無序流至關(guān)重要,如下圖所示,事件不按其時間戳排序。通常,水印是一種聲明,即到流中的該點(diǎn),直到某個時間戳的所有事件都應(yīng)該已到達(dá)。水印到達(dá)運(yùn)算符后,運(yùn)算符可以將其內(nèi)部事件時鐘提前到水印的值。
7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

3、Watermarks in Parallel Streams

水印在source Function處或緊隨source Function之后生成。source Function的每個并行子任務(wù)通常獨(dú)立生成其水印。這些水印定義該特定并行源的事件時間。

當(dāng)水印流經(jīng)流媒體程序時,當(dāng)水印到達(dá)的時候會觸發(fā)事件時間的計(jì)算(they advance the event time at the operators where they arrive)。每當(dāng)operator觸發(fā)(advance)其事件時間時,它都會為其后續(xù)運(yùn)算符在下游生成新的水印。

一些運(yùn)算符使用多個輸入流;例如,union,或 keyBy(…) 或 partition(…) 函數(shù)的運(yùn)算符。此類運(yùn)算符的當(dāng)前事件時間是其輸入流的事件時間的最小值。當(dāng)其輸入流更新其事件時間時,運(yùn)算符也會更新。

		this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
		// start so that our lowest watermark would be Long.MIN_VALUE.
		this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;

下圖顯示了流經(jīng)并行流的事件和水印以及跟蹤事件時間的運(yùn)算符的示例。

7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

4、Lateness延遲

某些元素可能會違反水印條件,這意味著即使在 Watermark(t) 發(fā)生之后,也會出現(xiàn)更多時間戳為 t’ <= t 的元素。事實(shí)上,在許多現(xiàn)實(shí)世界的設(shè)置中,某些元素可以任意延遲,因此無法指定某個事件時間戳的所有元素發(fā)生的時間。此外,即使延遲是有限制的,通常也不希望將水印延遲太多,因?yàn)檫@會導(dǎo)致事件時間窗口的評估延遲太多。

出于這個原因,流應(yīng)用可能會明確地期望一些后期元素。延遲元素是在系統(tǒng)的事件時鐘(由水印指示)已經(jīng)過了延遲元素時間戳的時間之后到達(dá)的元素。有關(guān)如何在事件時間窗口中使用延遲元素的搜集(一般用OutputTag)與使用視情況而定。代碼示例如下:

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

5、watermark介紹

watermark就是給數(shù)據(jù)再額外的加的一個時間列,watermark是個時間戳。

watermark= 數(shù)據(jù)的事件時間 - 最大允許的延遲時間或亂序時間

watermark= 當(dāng)前窗口的最大的事件時間 - 最大允許的延遲時間或亂序時間
這樣可以保證watermark水位線會一直上升(變大),不會下降

  • 窗口計(jì)算的觸發(fā)條件為
    1、窗口中有數(shù)據(jù)
    2、watermark>= 窗口的結(jié)束時間

6、Watermark 策略簡介

使用 Flink API 時需要設(shè)置一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構(gòu)建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根據(jù)策略實(shí)例化一個可分配時間戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根據(jù)策略實(shí)例化一個 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常情況下,不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。例如,想要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個 lambda 表達(dá)式作為時間戳分配器,那么可以按照如下方式實(shí)現(xiàn):

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
//其中 TimestampAssigner 的設(shè)置與否是可選的,大多數(shù)情況下,可以不用去特別指定。例如,當(dāng)使用 Kafka 或 Kinesis 數(shù)據(jù)源時,你可以直接從 Kafka/Kinesis 數(shù)據(jù)源記錄中獲取到時間戳。        

7、使用 Watermark 策略

WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用

  • 第一種是直接在數(shù)據(jù)源上使用,相比第二種會更好。因?yàn)閿?shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通常可以更精準(zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。直接在源上指定 WatermarkStrategy 意味著必須使用特定數(shù)據(jù)源接口,參考下文的kafka部分,以及有關(guān)每個分區(qū)的 watermark 是如何生成以及工作的。
  • 第二種是直接在非數(shù)據(jù)源的操作之后使用,僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略時,才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

使用 WatermarkStrategy 去獲取流并生成帶有時間戳的元素和 watermark 的新流時,如果原始流已經(jīng)具有時間戳或 watermark,則新指定的時間戳分配器將覆蓋原有的時間戳和 watermark。

8、處理空閑數(shù)據(jù)源

如果數(shù)據(jù)源中的某一個分區(qū)/分片在一段時間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會獲得任何新數(shù)據(jù)去生成 watermark。
稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時候就會出現(xiàn)問題。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會發(fā)生變化。

為了解決這個問題,可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。WatermarkStrategy 為此提供了一個工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

9、自定義 WatermarkGenerator

可以針對每個事件去生成 watermark。但是由于每個 watermark 都會在下游做一些計(jì)算,因此過多的 watermark 會降低程序性能。

TimestampAssigner 是一個可以從事件數(shù)據(jù)中提取時間戳字段的簡單函數(shù),但是 WatermarkGenerator 的編寫相對就要復(fù)雜一些了,將在接下來的兩小節(jié)中介紹如何實(shí)現(xiàn)此接口。WatermarkGenerator 接口代碼如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 將以前互相獨(dú)立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了進(jìn)來。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每來一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的調(diào)用,也許會生成新的 watermark,也許不會。
     *
     * <p>調(diào)用此方法生成 watermark 的間隔時間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本質(zhì)上是有兩種:周期性生成和標(biāo)記生成。

  • 周期性生成器通常通過 onEvent() 觀察傳入的事件數(shù)據(jù),然后在框架調(diào)用 onPeriodicEmit() 時發(fā)出 watermark。
  • 標(biāo)記生成器將查看 onEvent() 中的事件數(shù)據(jù),并等待檢查在流中攜帶 watermark 的特殊標(biāo)記事件或打點(diǎn)數(shù)據(jù)。當(dāng)獲取到這些事件數(shù)據(jù)時,它將立即發(fā)出 watermark。通常情況下,標(biāo)記生成器不會通過 onPeriodicEmit() 發(fā)出 watermark。

1)、自定義周期性 Watermark 生成器

周期性生成器會觀察流事件數(shù)據(jù)并定期生成 watermark(其生成可能取決于流數(shù)據(jù),或者完全基于處理時間)。

生成 watermark 的時間間隔(每 n 毫秒)可以通過 ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都會調(diào)用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一個 watermark,則將發(fā)出新的 watermark。

如下是兩個使用周期性 watermark 生成器的簡單示例。
Flink 已經(jīng)附帶了 BoundedOutOfOrdernessWatermarks,它實(shí)現(xiàn)了 WatermarkGenerator,其工作原理與下面的 BoundedOutOfOrdernessGenerator 相似。可以在這里參閱如何使用它的內(nèi)容。

/**
 * 該 watermark 生成器可以覆蓋的場景是:數(shù)據(jù)源在一定程度上亂序。
 * 即某個最新到達(dá)的時間戳為 t 的元素將在最早到達(dá)的時間戳為 t 的元素之后最多 n 毫秒到達(dá)。
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxOutOfOrderness = 3500; // 3.5 秒
    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發(fā)出的 watermark = 當(dāng)前最大時間戳 - 最大亂序時間
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * 該生成器生成的 watermark 滯后于處理時間固定量。它假定元素會在有限延遲后到達(dá) Flink。
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxTimeLag = 5000; // 5 秒

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 處理時間場景下不需要實(shí)現(xiàn)
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

2)、自定義標(biāo)記 Watermark 生成器

標(biāo)記 watermark 生成器觀察流事件數(shù)據(jù)并在獲取到帶有 watermark 信息的特殊事件元素時發(fā)出 watermark。

如下是實(shí)現(xiàn)標(biāo)記生成器的方法,當(dāng)事件帶有某個指定標(biāo)記時,該生成器就會發(fā)出 watermark:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已經(jīng)實(shí)現(xiàn)
    }
}

10、Watermark 策略與 Kafka 連接器

當(dāng)使用 Apache Kafka 連接器作為數(shù)據(jù)源時,每個 Kafka 分區(qū)可能有一個簡單的事件時間模式(遞增的時間戳或有界無序)。然而,當(dāng)使用 Kafka 數(shù)據(jù)源時,多個分區(qū)常常并行使用,因此交錯來自各個分區(qū)的事件數(shù)據(jù)就會破壞每個分區(qū)的事件時間模式(這是 Kafka 消費(fèi)客戶端所固有的)。

在這種情況下,你可以使用 Flink 中可識別 Kafka 分區(qū)的 watermark 生成機(jī)制。使用此特性,將在 Kafka 消費(fèi)端內(nèi)部針對每個 Kafka 分區(qū)生成 watermark,并且不同分區(qū) watermark 的合并方式與在數(shù)據(jù)流 shuffle 時的合并方式相同。

例如,如果每個 Kafka 分區(qū)中的事件時間戳嚴(yán)格遞增,則使用時間戳單調(diào)遞增按分區(qū)生成的 watermark 將生成完美的全局 watermark。注意,在示例中未使用 TimestampAssigner,而是使用了 Kafka 記錄自身的時間戳。

下圖展示了如何使用單 kafka 分區(qū) watermark 生成機(jī)制,以及在這種情況下 watermark 如何通過 dataflow 傳播。
7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

//kafka數(shù)據(jù)源示例,沒有使用withTimestampAssigner
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);

//非kafka數(shù)據(jù)源示例,使用了withTimestampAssigner
WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);

11、算子處理 Watermark 的方式

一般情況下,在將 watermark 轉(zhuǎn)發(fā)到下游之前,需要算子對其進(jìn)行觸發(fā)的事件完全進(jìn)行處理。例如,WindowOperator 將首先計(jì)算該 watermark 觸發(fā)的所有窗口數(shù)據(jù),當(dāng)且僅當(dāng)由此 watermark 觸發(fā)計(jì)算進(jìn)而生成的所有數(shù)據(jù)被轉(zhuǎn)發(fā)到下游之后,其才會被發(fā)送到下游。換句話說,由于此 watermark 的出現(xiàn)而產(chǎn)生的所有數(shù)據(jù)元素都將在此 watermark 之前發(fā)出。

相同的規(guī)則也適用于 TwoInputStreamOperator。但是,在這種情況下,算子當(dāng)前的 watermark 會取其兩個輸入的最小值。

二、示例1:每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù)

每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時間

1、實(shí)現(xiàn)

  • bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;
}

  • 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù)
 */
public class WatermarkDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No"+random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模擬延遲數(shù)據(jù)
					long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后時間 " + df.format(subway.getEnterTime()));
					
					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 設(shè)置watermark = 當(dāng)前最大的事件時間 - 最大允許的延遲時間或亂序時間
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
				.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時間
				.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件時間列

		// 計(jì)算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark
				.keyBy(Subway::getSNo)
				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
				.sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();
	}

}

2、驗(yàn)證

輸出結(jié)果,按照預(yù)期計(jì)算出了結(jié)果

Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后時間 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后時間 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后時間 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后時間 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后時間 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后時間 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后時間 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后時間 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后時間 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后時間 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后時間 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后時間 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后時間 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后時間 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后時間 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。

三、示例2:kafka數(shù)據(jù)源,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù)

注意該示例中,發(fā)送數(shù)據(jù)時不需要eventtime,flink會以kafka發(fā)送數(shù)據(jù)的時間戳為eventtime,也不需要withTimestampAssigner指定eventtime,詳見實(shí)現(xiàn)源碼。
每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)

1、maven依賴

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

2、實(shí)現(xiàn)

  • bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

  • 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class KafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// source
		// 準(zhǔn)備kafka連接參數(shù)
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "server1:9092");
		props.setProperty("group.id", "flink");
		props.setProperty("auto.offset.reset", "latest");
		props.setProperty("flink.partition-discovery.interval-millis", "5000");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "2000");
		// 使用連接參數(shù)創(chuàng)建FlinkKafkaConsumer/kafkaSource
		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
		// 使用kafkaSource
		DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// transformation
		// 設(shè)置watermark = 當(dāng)前最大的事件時間 - 最大允許的延遲時間或亂序時間
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
		.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時間
		);

		// 計(jì)算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}

3、驗(yàn)證

1)、驗(yàn)證步驟

1、啟動kafka,創(chuàng)建topic
2、啟動應(yīng)用程序
3、在kafka命令行中輸入符合格式要求的數(shù)據(jù)
4、觀察應(yīng)用程序的控制臺輸出

2)、驗(yàn)證

啟動kafka、創(chuàng)建topic、啟動應(yīng)用程序不再贅述,如果不清楚的參考本人kafka專欄。
1、在kafka命令控制臺輸入數(shù)據(jù)

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6


2、觀察應(yīng)用程序中的控制臺輸出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

四、示例3:處理延遲數(shù)據(jù)超過能接受的時間,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù)

每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個入口人數(shù),最多接受延遲3s的數(shù)據(jù),超過可接受范圍則另外接收。
一般而言,針對延遲超過計(jì)算窗口的數(shù)據(jù)處理方式不同,視具體的情況而定。
本示例僅僅是打印出來。
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時間

1、實(shí)現(xiàn)

  • bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

}

  • 實(shí)現(xiàn)

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class WatermarkLatenessDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模擬延遲數(shù)據(jù)
					long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后時間 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 設(shè)置最大允許延遲時間3s
		SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
				WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件時間列
		);

		// 接收延遲超過允許范圍內(nèi)的數(shù)據(jù),供其他方式處理
		OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));

		SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
				.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");

		DataStream<Subway> result2 = result1.getSideOutput(latenessData);

		// sink
		result1.print("延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù)");
		result2.print("延遲不在計(jì)算窗口內(nèi)數(shù)據(jù)");

		// execute
		env.execute();

	}

}

2、驗(yàn)證

驗(yàn)證比較簡單,就是由系統(tǒng)自己生成數(shù)據(jù),然后觀察應(yīng)用程序的控制臺的輸出是否與預(yù)期一致,經(jīng)驗(yàn)證,本示例與預(yù)期一致。

Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后時間 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后時間 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后時間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后時間 17:06:07
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后時間 17:05:59
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后時間 17:06:00
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后時間 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后時間 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后時間 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后時間 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后時間 17:06:07
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后時間 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后時間 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后時間 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后時間 17:06:23
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后時間 17:06:19
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后時間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后時間 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后時間 17:06:15
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后時間 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后時間 17:06:18
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后時間 17:06:34
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后時間 17:06:24
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后時間 17:06:28
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后時間 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后時間 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后時間 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后時間 17:06:26
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后時間 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后時間 17:06:31

以上,詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時以三個示例來展示watermark的實(shí)際可能的應(yīng)用場景。文章來源地址http://www.zghlxwxcb.cn/news/detail-631217.html

到了這里,關(guān)于7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 41、Flink之Hive 方言介紹及詳細(xì)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(21)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(完整版)

    【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(完整版)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月05日
    瀏覽(36)
  • 10、Flink的source、transformations、sink的詳細(xì)示例(二)-source和transformation示例【補(bǔ)充示例】

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(16)
  • 12、Flink source和sink 的 clickhouse 詳細(xì)示例

    12、Flink source和sink 的 clickhouse 詳細(xì)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月13日
    瀏覽(18)
  • Flink(五)source、transformations、sink的詳細(xì)示例(一)

    Flink(五)source、transformations、sink的詳細(xì)示例(一)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月15日
    瀏覽(17)
  • 5、Flink 的 source、transformations、sink的詳細(xì)示例(一)

    5、Flink 的 source、transformations、sink的詳細(xì)示例(一)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月14日
    瀏覽(20)
  • 【flink番外篇】2、flink的23種算子window join 和interval join 數(shù)據(jù)傾斜、分區(qū)介紹及詳細(xì)示例(3)- 數(shù)據(jù)傾斜處理、分區(qū)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(2)- keyby、reduce和Aggregations

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(23)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(1)- map、flatmap和filter

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(20)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(3)-window、distinct、join等

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包