国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 源碼學習|使用 Watermark 策略(WatermarkStrategy)【v2 修訂版】

這篇具有很好參考價值的文章主要介紹了Flink 源碼學習|使用 Watermark 策略(WatermarkStrategy)【v2 修訂版】。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

使用事件時間時,需要通過 Flink API 的 WatermarkStrategy 接口配置 watermark 的生成策略。

我們將逐段來看這個 API 的各個部分。

繼承關(guān)系

Flink 使用 WatermarkStrategy<T> 接口來構(gòu)建 Watermark 策略,其中泛型 T 為輸入數(shù)據(jù)流類型。

WatermarkStrategy 接口繼承了 TimestampAssignerSupplierWatermarkGeneratorSupplier,即相當于包含了 TimestampAssignerWatermarkGenerator,具體地:

  • TimestampAssigner 用于從消息記錄中提取事件時間戳
  • WatermarkGenerator 用于根據(jù)事件時間戳生成 watermark

源碼flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L19【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import java.time.Duration;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

@Public
public interface WatermarkStrategy<T>
     extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

TimestampAssignerSupplierWatermarkGeneratorSupplier 均繼承了 Serializable,所以WatermarkStrategy 也繼承了 Serializable,是可序列化的。這是因為在分布式計算過程中,WatermarkStrategy 可能會在不同節(jié)點之間傳輸。

接口方法

需要實現(xiàn)或重寫的方法

WatermarkStrategy 接口中,有如下 3 個方法是需要被實現(xiàn)的,這 3 個方法分別對應(yīng)需要確定的 3 個問題:

  • createWatermarkGenerator():返回實現(xiàn)了 WatermarkGenerator 接口的對象,該對象用于根據(jù)輸入流記錄的事件時間生成 watermark
  • createTimestampAssigner():返回實現(xiàn)了 TimestampAssigner 接口的對象,該對象用于從輸入流中獲取每條記錄的事件時間
  • getAlignmentParameters():返回 WatermarkAlignmentParams 類的實例,該實例用于控制是否需要對齊不同輸入流的 watermark

源碼|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy(部分)

// ------------------------------------------------------------------------
//  Methods that implementors need to implement.
// ------------------------------------------------------------------------

/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

/**
 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
 */
@Override
default TimestampAssigner<T> createTimestampAssigner(
        TimestampAssignerSupplier.Context context) {
    // By default, this is {@link RecordTimestampAssigner},
    // for cases where records come out of a source with valid timestamps, for example from
    // Kafka.
    return new RecordTimestampAssigner<>();
}

/**
 * Provides configuration for watermark alignment of a maximum watermark of multiple
 * sources/tasks/partitions in the same watermark group. The group may contain completely
 * independent sources (e.g. File and Kafka).
 *
 * <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
 * the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
 */
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
    return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}

其中,只有 createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) 方法是非默認的,這樣就可以通過 Java 的 lambda 表達式語法來實現(xiàn) WatermarkStrategy 接口。

指定 WatermarkGenerator 的方法

WatermarkStrategy 接口提供了如下 4 個直接指定 WatermarkGenerator 以實現(xiàn) WatermarkStrategy 的默認方法。因為 WatermarkStrategy 接口只有 createWatermarkGenerator 這 1 個方法沒有默認實現(xiàn),所以在實現(xiàn)上均使用了 Java 的 lambda 表達式。

  • forMonotonousTimestamps:使用 AscendingTimestampsWatermarks 生成 watermark,適用于事件時間單調(diào)遞增的場景
  • forBoundedOutOfOrderness:使用 BoundedOutOfOrdernessWatermarks 生成 watermark,適用于事件時間雖然不單調(diào)遞增,但延遲時間有限的場景
  • forGenerator:使用參數(shù)提供的 WatermarkGeneratorSupplier 生成的 watermark 生成器
  • noWatermarks:不使用 watermark

源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy.java:197

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
 return (ctx) -> new AscendingTimestampsWatermarks<>();
}

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
 return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
 return generatorSupplier::createWatermarkGenerator;
}

static <T> WatermarkStrategy<T> noWatermarks() {
 return (ctx) -> new NoWatermarksGenerator<>();
}
指定 TimestampAssigner 的方法

WatermarkStrategy 接口中,提供了如下 3 個指定 TimestampAssigner 的默認方法。這些方法會創(chuàng)建一個新的 WatermarkStrategy 類,并將調(diào)用該方法的基礎(chǔ) WatermarkStrategy 類封裝起來;在調(diào)用新類的 createWatermarkGenerator 方法時,會使用新類中重寫的方法;在調(diào)用新類的其他方法時,會返回被封裝的基礎(chǔ)類的方法。

  • withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner):使用參數(shù)指定的 TimestampAssigner 的提供者類
  • withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner):使用參數(shù)指定的可序列化的 TimestampAssigner

源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy(部分)

default WatermarkStrategy<T> withTimestampAssigner(
     TimestampAssignerSupplier<T> timestampAssigner) {
 checkNotNull(timestampAssigner, "timestampAssigner");
 return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}

default WatermarkStrategy<T> withTimestampAssigner(
     SerializableTimestampAssigner<T> timestampAssigner) {
 checkNotNull(timestampAssigner, "timestampAssigner");
 return new WatermarkStrategyWithTimestampAssigner<>(
         this, TimestampAssignerSupplier.of(timestampAssigner));
}
制定處理空閑數(shù)據(jù)源的方法

WatermarkStrategy 接口中,提供了 withIdleness(Duration idleTimeout) 方法用于處理存在空閑數(shù)據(jù)源的情況,其參數(shù)為標記空閑數(shù)據(jù)源的時間閾值,即當某個數(shù)據(jù)源超過 idleTimeout 沒有生產(chǎn)消息時則標記為空閑數(shù)據(jù)源。

源碼|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy#withIdleness

/**
 * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
 * created {@link WatermarkGenerator}.
 *
 * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
 * stream for that amount of time, then that partition is considered "idle" and will not hold
 * back the progress of watermarks in downstream operators.
 *
 * <p>Idleness can be important if some partitions have little data and might not have events
 * during some periods. Without idleness, these streams can stall the overall event time
 * progress of the application.
 */
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
    checkNotNull(idleTimeout, "idleTimeout");
    checkArgument(
            !(idleTimeout.isZero() || idleTimeout.isNegative()),
            "idleTimeout must be greater than zero");
    return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}
指定 AlignmentParameters 的方法

WatermarkStrategy 接口中,還提供了如下 2 個方法,用于指定 watermark 的對齊方法。在實現(xiàn)上,這 2 個方法也與指定 TimestampAssigner 的方法類似,在新類中重寫 getAlignmentParameters() 方法。

  • withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift):指定 watermark 對齊策略;其中參數(shù) watermarkGroup 為 watermark 的組名,maxAllowedWatermarkDrift 為在暫停消費前允許超出 watermark 時間
  • withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval):指定 watermark 對齊策略;其中新增的參數(shù) updateInterval 為 task 上報 watermark 以及調(diào)度器制定對齊 watermark 的時間間隔

源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy(部分)

@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
        String watermarkGroup, Duration maxAllowedWatermarkDrift) {
    return withWatermarkAlignment(
            watermarkGroup,
            maxAllowedWatermarkDrift,
            WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}

@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
        String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
    return new WatermarksWithWatermarkAlignment<T>(
            this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}

整體說明

通過上述設(shè)計,使 WatermarkStrategy 接口允許使用實現(xiàn) WatermarkGenerator 方法的 Java Lambda 表達式實現(xiàn);對于已實現(xiàn)的 WatermarkStrategy 匿名類,允許通過調(diào)用指定 TimestampAssignerAlignmentParameters 的方法,來覆蓋掉這兩個方法的默認實現(xiàn)。文章來源地址http://www.zghlxwxcb.cn/news/detail-846325.html

到了這里,關(guān)于Flink 源碼學習|使用 Watermark 策略(WatermarkStrategy)【v2 修訂版】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink之Watermark

    流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件

    2024年02月10日
    瀏覽(29)
  • 【入門Flink】- 09Flink水位線Watermark

    【入門Flink】- 09Flink水位線Watermark

    在 窗口的處理過程 中,基于數(shù)據(jù)的時間戳,自定義一個 “邏輯時鐘” 。這個時鐘的時間不會自動流逝;它的時間進展,就是靠著新到數(shù)據(jù)的時間戳來推動的。 用來衡量 事件時間 進展的標記,就被稱作 “水位線”(Watermark) 。 具體實現(xiàn)上,水位線可以看作一條 特殊的數(shù)

    2024年01月17日
    瀏覽(23)
  • Flink Watermark和時間語義

    Flink Watermark和時間語義

    時間語義: EventTime :事件創(chuàng)建時間; Ingestion Time :數(shù)據(jù)進入 Flink 的時間; Processing Time :執(zhí)行操作算子的本地系統(tǒng)時間,與機器無關(guān)。不同的時間語義有不同的應(yīng)用場合,我們往往更關(guān)系事件時間 Event Time 。數(shù)據(jù)生成的時候就會自動注入時間戳, Event Time 可以從日志數(shù)據(jù)的

    2024年02月03日
    瀏覽(29)
  • 【FLink】水位線(Watermark)

    【FLink】水位線(Watermark)

    目錄 1、關(guān)于時間語義 1.1事件時間 1.2處理時間?編輯 2、什么是水位線 2.1 順序流和亂序流 2.2亂序數(shù)據(jù)的處理 2.3 水位線的特性 3 、水位線的生成 3.1 生成水位線的總體原則 3.2 水位線生成策略 3.3 Flink內(nèi)置水位線 3.3.1?有序流中內(nèi)置水位線設(shè)置 3.4.2?斷點式水位線生成器(Punc

    2024年02月21日
    瀏覽(19)
  • Flink之Watermark水印、水位線

    在Apache Flink中,Watermark(水?。┦且环N用于處理事件時間(eventtime)的時間指示器。它模擬了事件流中事件時間進展的概念。 事件時間是指事件實際發(fā)生的時間,在分布式流處理中經(jīng)常用于處理無序事件流。然而,由于網(wǎng)絡(luò)延遲、亂序事件的到達以及分布式處理的特點,事件

    2024年02月08日
    瀏覽(22)
  • 1分鐘理解Flink中Watermark機制

    本文隸屬于專欄《董工的1000個大數(shù)據(jù)技術(shù)體系》摘要,該專欄為筆者原創(chuàng),引用請注明來源,不足和錯誤之處請在評論區(qū)幫忙指出,謝謝! 目錄 前言 一、watermark是什么? 二、亂序數(shù)據(jù)處理

    2024年02月15日
    瀏覽(12)
  • 【Flink】Flink 中的時間和窗口之水位線(Watermark)

    【Flink】Flink 中的時間和窗口之水位線(Watermark)

    這里先介紹一下什么是 時間語義 , 時間語義 在Flink中是一種很重要的概念,下面介紹的 水位線 就是基于 時間語義 來講的。 在Flink中我們提到的時間語義一般指的是 事件時間 和 處理時間 : 處理時間(Processing Time) ,一般指執(zhí)行處理操作的系統(tǒng)時間,也就是Flink的窗口算子

    2024年02月07日
    瀏覽(21)
  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達鏈接: 深入理解 Flink (一)Flink 架構(gòu)設(shè)計原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯深入分析 深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級原理詳解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    瀏覽(50)
  • Flink詳解系列之五--水位線(watermark)

    Flink詳解系列之五--水位線(watermark)

    1、概念 在Flink中,水位線是一種衡量Event Time進展的機制,用來處理實時數(shù)據(jù)中的亂序問題的,通常是水位線和窗口結(jié)合使用來實現(xiàn)。 從設(shè)備生成實時流事件,到Flink的source,再到多個oparator處理數(shù)據(jù),過程中會受到網(wǎng)絡(luò)延遲、背壓等多種因素影響造成數(shù)據(jù)亂序。在進行窗口處

    2024年02月13日
    瀏覽(20)
  • 7.2、如何理解Flink中的水位線(Watermark)

    7.2、如何理解Flink中的水位線(Watermark)

    目錄 0、版本說明 1、什么是水位線? 2、水位線使用場景? 3、設(shè)計水位線主要為了解決什么問題? 4、怎樣在flink中生成水位線? 4.1、自定義標記 Watermark 生成器 4.2、自定義周期性 Watermark 生成器 4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器 4.4、內(nèi)置Watermark生成器 - 亂序流

    2024年02月08日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包