国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))

這篇具有很好參考價(jià)值的文章主要介紹了Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時(shí)以三個(gè)示例來展示watermark的實(shí)際可能的應(yīng)用場景。
本文依賴kafka環(huán)境可用。
本文分為四個(gè)部分,即EventTime與watermark介紹、watermark基本使用、kafka作為數(shù)據(jù)源的watermark使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)。

一、事件時(shí)間與watermark

1、EventTime介紹

處理時(shí)間是指執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間。
事件時(shí)間是每個(gè)事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間。

下圖形象的展示了event time 和 processing time的所處階段。一般將Flink data source下的箭頭表示為到達(dá)Flink的時(shí)間,即injestion time攝入時(shí)間。
Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

2、Eventtime與watermark

支持事件時(shí)間的流處理器需要一種方法來測量事件時(shí)間的進(jìn)度。例如,當(dāng)事件時(shí)間超過一小時(shí)結(jié)束時(shí),需要通知構(gòu)建每小時(shí)窗口的窗口運(yùn)算符,以便操作員可以關(guān)閉正在進(jìn)行的窗口。

事件時(shí)間可以獨(dú)立于處理時(shí)間進(jìn)行。例如,在一個(gè)程序中,操作員的當(dāng)前事件時(shí)間可能略微落后于處理時(shí)間(考慮接收事件的延遲),而兩者以相同的速度進(jìn)行。另一方面,另一個(gè)流程序可能會(huì)通過快進(jìn)已經(jīng)在 Kafka 主題(或其他消息隊(duì)列)中緩沖的一些歷史數(shù)據(jù),在幾秒鐘的處理中完成數(shù)周的事件時(shí)間。

Flink 中衡量事件時(shí)間進(jìn)度的機(jī)制是水印。水印作為數(shù)據(jù)流的一部分流動(dòng),并帶有時(shí)間戳 t。水?。╰) 聲明事件時(shí)間已到達(dá)該流中的時(shí)間 t,這意味著流中不應(yīng)再有時(shí)間戳為 t’ <= t 的元素(即時(shí)間戳早于或等于水印的事件)。

下圖顯示了具有(邏輯)時(shí)間戳和內(nèi)聯(lián)流動(dòng)水印的事件流。在此示例中,事件是按順序排列的(相對于其時(shí)間戳),這意味著水印只是流中的周期性標(biāo)記。

Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

水印對于無序流至關(guān)重要,如下圖所示,事件不按其時(shí)間戳排序。通常,水印是一種聲明,即到流中的該點(diǎn),直到某個(gè)時(shí)間戳的所有事件都應(yīng)該已到達(dá)。水印到達(dá)運(yùn)算符后,運(yùn)算符可以將其內(nèi)部事件時(shí)鐘提前到水印的值。
Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

3、Watermarks in Parallel Streams

水印在source Function處或緊隨source Function之后生成。source Function的每個(gè)并行子任務(wù)通常獨(dú)立生成其水印。這些水印定義該特定并行源的事件時(shí)間。

當(dāng)水印流經(jīng)流媒體程序時(shí),當(dāng)水印到達(dá)的時(shí)候會(huì)觸發(fā)事件時(shí)間的計(jì)算(they advance the event time at the operators where they arrive)。每當(dāng)operator觸發(fā)(advance)其事件時(shí)間時(shí),它都會(huì)為其后續(xù)運(yùn)算符在下游生成新的水印。

一些運(yùn)算符使用多個(gè)輸入流;例如,union,或 keyBy(…) 或 partition(…) 函數(shù)的運(yùn)算符。此類運(yùn)算符的當(dāng)前事件時(shí)間是其輸入流的事件時(shí)間的最小值。當(dāng)其輸入流更新其事件時(shí)間時(shí),運(yùn)算符也會(huì)更新。

		this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
		// start so that our lowest watermark would be Long.MIN_VALUE.
		this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;

下圖顯示了流經(jīng)并行流的事件和水印以及跟蹤事件時(shí)間的運(yùn)算符的示例。

Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

4、Lateness延遲

某些元素可能會(huì)違反水印條件,這意味著即使在 Watermark(t) 發(fā)生之后,也會(huì)出現(xiàn)更多時(shí)間戳為 t’ <= t 的元素。事實(shí)上,在許多現(xiàn)實(shí)世界的設(shè)置中,某些元素可以任意延遲,因此無法指定某個(gè)事件時(shí)間戳的所有元素發(fā)生的時(shí)間。此外,即使延遲是有限制的,通常也不希望將水印延遲太多,因?yàn)檫@會(huì)導(dǎo)致事件時(shí)間窗口的評估延遲太多。

出于這個(gè)原因,流應(yīng)用可能會(huì)明確地期望一些后期元素。延遲元素是在系統(tǒng)的事件時(shí)鐘(由水印指示)已經(jīng)過了延遲元素時(shí)間戳的時(shí)間之后到達(dá)的元素。有關(guān)如何在事件時(shí)間窗口中使用延遲元素的搜集(一般用OutputTag)與使用視情況而定。代碼示例如下:

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

5、watermark介紹

watermark就是給數(shù)據(jù)再額外的加的一個(gè)時(shí)間列,watermark是個(gè)時(shí)間戳。

watermark= 數(shù)據(jù)的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間

watermark= 當(dāng)前窗口的最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
這樣可以保證watermark水位線會(huì)一直上升(變大),不會(huì)下降

  • 窗口計(jì)算的觸發(fā)條件為
    1、窗口中有數(shù)據(jù)
    2、watermark>= 窗口的結(jié)束時(shí)間

6、Watermark 策略簡介

使用 Flink API 時(shí)需要設(shè)置一個(gè)同時(shí)包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構(gòu)建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根據(jù)策略實(shí)例化一個(gè)可分配時(shí)間戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根據(jù)策略實(shí)例化一個(gè) watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常情況下,不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。例如,想要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個(gè) lambda 表達(dá)式作為時(shí)間戳分配器,那么可以按照如下方式實(shí)現(xiàn):

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
//其中 TimestampAssigner 的設(shè)置與否是可選的,大多數(shù)情況下,可以不用去特別指定。例如,當(dāng)使用 Kafka 或 Kinesis 數(shù)據(jù)源時(shí),你可以直接從 Kafka/Kinesis 數(shù)據(jù)源記錄中獲取到時(shí)間戳。        

7、使用 Watermark 策略

WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用

  • 第一種是直接在數(shù)據(jù)源上使用,相比第二種會(huì)更好。因?yàn)閿?shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通??梢愿珳?zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。直接在源上指定 WatermarkStrategy 意味著必須使用特定數(shù)據(jù)源接口,參考下文的kafka部分,以及有關(guān)每個(gè)分區(qū)的 watermark 是如何生成以及工作的。
  • 第二種是直接在非數(shù)據(jù)源的操作之后使用,僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略時(shí),才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

使用 WatermarkStrategy 去獲取流并生成帶有時(shí)間戳的元素和 watermark 的新流時(shí),如果原始流已經(jīng)具有時(shí)間戳或 watermark,則新指定的時(shí)間戳分配器將覆蓋原有的時(shí)間戳和 watermark。

8、處理空閑數(shù)據(jù)源

如果數(shù)據(jù)源中的某一個(gè)分區(qū)/分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。
稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問題。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化。

為了解決這個(gè)問題,可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。WatermarkStrategy 為此提供了一個(gè)工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

9、自定義 WatermarkGenerator

可以針對每個(gè)事件去生成 watermark。但是由于每個(gè) watermark 都會(huì)在下游做一些計(jì)算,因此過多的 watermark 會(huì)降低程序性能。

TimestampAssigner 是一個(gè)可以從事件數(shù)據(jù)中提取時(shí)間戳字段的簡單函數(shù),但是 WatermarkGenerator 的編寫相對就要復(fù)雜一些了,將在接下來的兩小節(jié)中介紹如何實(shí)現(xiàn)此接口。WatermarkGenerator 接口代碼如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 將以前互相獨(dú)立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了進(jìn)來。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每來一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時(shí)間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的調(diào)用,也許會(huì)生成新的 watermark,也許不會(huì)。
     *
     * <p>調(diào)用此方法生成 watermark 的間隔時(shí)間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本質(zhì)上是有兩種:周期性生成和標(biāo)記生成。

  • 周期性生成器通常通過 onEvent() 觀察傳入的事件數(shù)據(jù),然后在框架調(diào)用 onPeriodicEmit() 時(shí)發(fā)出 watermark。
  • 標(biāo)記生成器將查看 onEvent() 中的事件數(shù)據(jù),并等待檢查在流中攜帶 watermark 的特殊標(biāo)記事件或打點(diǎn)數(shù)據(jù)。當(dāng)獲取到這些事件數(shù)據(jù)時(shí),它將立即發(fā)出 watermark。通常情況下,標(biāo)記生成器不會(huì)通過 onPeriodicEmit() 發(fā)出 watermark。

1)、自定義周期性 Watermark 生成器

周期性生成器會(huì)觀察流事件數(shù)據(jù)并定期生成 watermark(其生成可能取決于流數(shù)據(jù),或者完全基于處理時(shí)間)。

生成 watermark 的時(shí)間間隔(每 n 毫秒)可以通過 ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都會(huì)調(diào)用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一個(gè) watermark,則將發(fā)出新的 watermark。

如下是兩個(gè)使用周期性 watermark 生成器的簡單示例。
Flink 已經(jīng)附帶了 BoundedOutOfOrdernessWatermarks,它實(shí)現(xiàn)了 WatermarkGenerator,其工作原理與下面的 BoundedOutOfOrdernessGenerator 相似。可以在這里參閱如何使用它的內(nèi)容。

/**
 * 該 watermark 生成器可以覆蓋的場景是:數(shù)據(jù)源在一定程度上亂序。
 * 即某個(gè)最新到達(dá)的時(shí)間戳為 t 的元素將在最早到達(dá)的時(shí)間戳為 t 的元素之后最多 n 毫秒到達(dá)。
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxOutOfOrderness = 3500; // 3.5 秒
    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發(fā)出的 watermark = 當(dāng)前最大時(shí)間戳 - 最大亂序時(shí)間
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * 該生成器生成的 watermark 滯后于處理時(shí)間固定量。它假定元素會(huì)在有限延遲后到達(dá) Flink。
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxTimeLag = 5000; // 5 秒

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 處理時(shí)間場景下不需要實(shí)現(xiàn)
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

2)、自定義標(biāo)記 Watermark 生成器

標(biāo)記 watermark 生成器觀察流事件數(shù)據(jù)并在獲取到帶有 watermark 信息的特殊事件元素時(shí)發(fā)出 watermark。

如下是實(shí)現(xiàn)標(biāo)記生成器的方法,當(dāng)事件帶有某個(gè)指定標(biāo)記時(shí),該生成器就會(huì)發(fā)出 watermark:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已經(jīng)實(shí)現(xiàn)
    }
}

10、Watermark 策略與 Kafka 連接器

當(dāng)使用 Apache Kafka 連接器作為數(shù)據(jù)源時(shí),每個(gè) Kafka 分區(qū)可能有一個(gè)簡單的事件時(shí)間模式(遞增的時(shí)間戳或有界無序)。然而,當(dāng)使用 Kafka 數(shù)據(jù)源時(shí),多個(gè)分區(qū)常常并行使用,因此交錯(cuò)來自各個(gè)分區(qū)的事件數(shù)據(jù)就會(huì)破壞每個(gè)分區(qū)的事件時(shí)間模式(這是 Kafka 消費(fèi)客戶端所固有的)。

在這種情況下,你可以使用 Flink 中可識(shí)別 Kafka 分區(qū)的 watermark 生成機(jī)制。使用此特性,將在 Kafka 消費(fèi)端內(nèi)部針對每個(gè) Kafka 分區(qū)生成 watermark,并且不同分區(qū) watermark 的合并方式與在數(shù)據(jù)流 shuffle 時(shí)的合并方式相同。

例如,如果每個(gè) Kafka 分區(qū)中的事件時(shí)間戳嚴(yán)格遞增,則使用時(shí)間戳單調(diào)遞增按分區(qū)生成的 watermark 將生成完美的全局 watermark。注意,在示例中未使用 TimestampAssigner,而是使用了 Kafka 記錄自身的時(shí)間戳。

下圖展示了如何使用單 kafka 分區(qū) watermark 生成機(jī)制,以及在這種情況下 watermark 如何通過 dataflow 傳播。
Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)),# Flink專欄,flink,kafka,watermaker,flink 水印,flink kafka 水印,flink 水印策略,flink 水印自定義

//kafka數(shù)據(jù)源示例,沒有使用withTimestampAssigner
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);

//非kafka數(shù)據(jù)源示例,使用了withTimestampAssigner
WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);

11、算子處理 Watermark 的方式

一般情況下,在將 watermark 轉(zhuǎn)發(fā)到下游之前,需要算子對其進(jìn)行觸發(fā)的事件完全進(jìn)行處理。例如,WindowOperator 將首先計(jì)算該 watermark 觸發(fā)的所有窗口數(shù)據(jù),當(dāng)且僅當(dāng)由此 watermark 觸發(fā)計(jì)算進(jìn)而生成的所有數(shù)據(jù)被轉(zhuǎn)發(fā)到下游之后,其才會(huì)被發(fā)送到下游。換句話說,由于此 watermark 的出現(xiàn)而產(chǎn)生的所有數(shù)據(jù)元素都將在此 watermark 之前發(fā)出。

相同的規(guī)則也適用于 TwoInputStreamOperator。但是,在這種情況下,算子當(dāng)前的 watermark 會(huì)取其兩個(gè)輸入的最小值。

二、示例1:每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)

每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時(shí)間

1、實(shí)現(xiàn)

  • bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;
}

  • 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)
 */
public class WatermarkDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No"+random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模擬延遲數(shù)據(jù)
					long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后時(shí)間 " + df.format(subway.getEnterTime()));
					
					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 設(shè)置watermark = 當(dāng)前最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
				.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時(shí)間
				.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件時(shí)間列

		// 計(jì)算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark
				.keyBy(Subway::getSNo)
				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
				.sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();
	}

}

2、驗(yàn)證

輸出結(jié)果,按照預(yù)期計(jì)算出了結(jié)果

Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后時(shí)間 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后時(shí)間 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后時(shí)間 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后時(shí)間 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后時(shí)間 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后時(shí)間 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后時(shí)間 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后時(shí)間 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后時(shí)間 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后時(shí)間 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后時(shí)間 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后時(shí)間 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。

三、示例2:kafka數(shù)據(jù)源,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)

注意該示例中,發(fā)送數(shù)據(jù)時(shí)不需要eventtime,flink會(huì)以kafka發(fā)送數(shù)據(jù)的時(shí)間戳為eventtime,也不需要withTimestampAssigner指定eventtime,詳見實(shí)現(xiàn)源碼。
每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)

1、maven依賴

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

2、實(shí)現(xiàn)

  • bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

  • 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class KafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// source
		// 準(zhǔn)備kafka連接參數(shù)
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "server1:9092");
		props.setProperty("group.id", "flink");
		props.setProperty("auto.offset.reset", "latest");
		props.setProperty("flink.partition-discovery.interval-millis", "5000");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "2000");
		// 使用連接參數(shù)創(chuàng)建FlinkKafkaConsumer/kafkaSource
		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
		// 使用kafkaSource
		DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// transformation
		// 設(shè)置watermark = 當(dāng)前最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
		.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時(shí)間
		);

		// 計(jì)算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}

3、驗(yàn)證

1)、驗(yàn)證步驟

1、啟動(dòng)kafka,創(chuàng)建topic
2、啟動(dòng)應(yīng)用程序
3、在kafka命令行中輸入符合格式要求的數(shù)據(jù)
4、觀察應(yīng)用程序的控制臺(tái)輸出

2)、驗(yàn)證

啟動(dòng)kafka、創(chuàng)建topic、啟動(dòng)應(yīng)用程序不再贅述,如果不清楚的參考本人kafka專欄。
1、在kafka命令控制臺(tái)輸入數(shù)據(jù)

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6


2、觀察應(yīng)用程序中的控制臺(tái)輸出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

四、示例3:處理延遲數(shù)據(jù)超過能接受的時(shí)間,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)

每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù),超過可接受范圍則另外接收。
一般而言,針對延遲超過計(jì)算窗口的數(shù)據(jù)處理方式不同,視具體的情況而定。
本示例僅僅是打印出來。
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時(shí)間

1、實(shí)現(xiàn)

  • bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

}

  • 實(shí)現(xiàn)

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class WatermarkLatenessDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模擬延遲數(shù)據(jù)
					long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后時(shí)間 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 設(shè)置最大允許延遲時(shí)間3s
		SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
				WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件時(shí)間列
		);

		// 接收延遲超過允許范圍內(nèi)的數(shù)據(jù),供其他方式處理
		OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));

		SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
				.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");

		DataStream<Subway> result2 = result1.getSideOutput(latenessData);

		// sink
		result1.print("延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù)");
		result2.print("延遲不在計(jì)算窗口內(nèi)數(shù)據(jù)");

		// execute
		env.execute();

	}

}

2、驗(yàn)證

驗(yàn)證比較簡單,就是由系統(tǒng)自己生成數(shù)據(jù),然后觀察應(yīng)用程序的控制臺(tái)的輸出是否與預(yù)期一致,經(jīng)驗(yàn)證,本示例與預(yù)期一致。

Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后時(shí)間 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后時(shí)間 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后時(shí)間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后時(shí)間 17:06:07
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后時(shí)間 17:05:59
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后時(shí)間 17:06:00
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后時(shí)間 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后時(shí)間 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后時(shí)間 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后時(shí)間 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后時(shí)間 17:06:07
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后時(shí)間 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后時(shí)間 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后時(shí)間 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后時(shí)間 17:06:23
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后時(shí)間 17:06:19
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后時(shí)間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后時(shí)間 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后時(shí)間 17:06:15
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后時(shí)間 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后時(shí)間 17:06:18
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后時(shí)間 17:06:34
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后時(shí)間 17:06:24
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后時(shí)間 17:06:28
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后時(shí)間 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后時(shí)間 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后時(shí)間 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后時(shí)間 17:06:26
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后時(shí)間 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后時(shí)間 17:06:31

以上,詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時(shí)以三個(gè)示例來展示watermark的實(shí)際可能的應(yīng)用場景。文章來源地址http://www.zghlxwxcb.cn/news/detail-614106.html

到了這里,關(guān)于Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 41、Flink之Hive 方言介紹及詳細(xì)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(21)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(完整版)

    【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(完整版)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月05日
    瀏覽(36)
  • 10、Flink的source、transformations、sink的詳細(xì)示例(二)-source和transformation示例【補(bǔ)充示例】

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(16)
  • 12、Flink source和sink 的 clickhouse 詳細(xì)示例

    12、Flink source和sink 的 clickhouse 詳細(xì)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月13日
    瀏覽(18)
  • Flink(五)source、transformations、sink的詳細(xì)示例(一)

    Flink(五)source、transformations、sink的詳細(xì)示例(一)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月15日
    瀏覽(17)
  • 5、Flink 的 source、transformations、sink的詳細(xì)示例(一)

    5、Flink 的 source、transformations、sink的詳細(xì)示例(一)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月14日
    瀏覽(20)
  • 【flink番外篇】2、flink的23種算子window join 和interval join 數(shù)據(jù)傾斜、分區(qū)介紹及詳細(xì)示例(3)- 數(shù)據(jù)傾斜處理、分區(qū)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(2)- keyby、reduce和Aggregations

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(23)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(1)- map、flatmap和filter

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(20)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細(xì)示例(3)-window、distinct、join等

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包