国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

7.2、如何理解Flink中的水位線(Watermark)

這篇具有很好參考價(jià)值的文章主要介紹了7.2、如何理解Flink中的水位線(Watermark)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

0、版本說(shuō)明

1、什么是水位線?

2、水位線使用場(chǎng)景?

3、設(shè)計(jì)水位線主要為了解決什么問(wèn)題?

4、怎樣在flink中生成水位線?

4.1、自定義標(biāo)記 Watermark 生成器

4.2、自定義周期性 Watermark 生成器

4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器

4.4、內(nèi)置Watermark生成器 - 亂序流水位線生成器

4.5、在 讀取數(shù)據(jù)源時(shí) 添加水位線

5、水位線和窗口的關(guān)系?

6、水位線在各個(gè)算子間的傳遞

6.1、測(cè)試用例 - 不設(shè)置?withIdleness 超時(shí)時(shí)間

6.2、測(cè)試用例 - 設(shè)置?withIdleness 超時(shí)時(shí)間


0、版本說(shuō)明

? ? ? ? 開發(fā)語(yǔ)言:java1.8

? ? ? ? Flink版本:1.17

? ? ? ? 官網(wǎng)鏈接:官網(wǎng)鏈接

1、什么是水位線?

? ? ? ? Flink中水位線是一條特殊的數(shù)據(jù)(long timestamp)

????????它會(huì)以時(shí)間戳的形式作為一條標(biāo)識(shí)數(shù)據(jù)插入到數(shù)據(jù)流中

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


2、水位線使用場(chǎng)景?

????????使用事件時(shí)間(EventTime)做流式計(jì)算任務(wù)時(shí),需要根據(jù)事件時(shí)間生成水位線(Watermark)

????????通過(guò)水位線來(lái)觸發(fā)窗口計(jì)算,水位線作為衡量事件時(shí)間(EventTime)進(jìn)展的標(biāo)識(shí)


3、設(shè)計(jì)水位線主要為了解決什么問(wèn)題?

????????設(shè)計(jì)水位線主要是為了解決實(shí)時(shí)流中數(shù)據(jù)亂序和遲到的問(wèn)題

????????思考:什么原因造成了數(shù)據(jù)流的亂序呢?

? ? ? ? ? ? ? ? 如今數(shù)據(jù)采集、數(shù)據(jù)傳輸大多都在分布式系統(tǒng)中完成

????????????????各個(gè)機(jī)器節(jié)點(diǎn)因?yàn)榫W(wǎng)絡(luò)和自身性能的原因 導(dǎo)致了數(shù)據(jù)的亂序和遲到


4、怎樣在flink中生成水位線?

????????Flink中支持在 數(shù)據(jù)源和普通DataStream上添加水位線生成策略(WatermarkStrategy)

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)

4.1、自定義標(biāo)記 Watermark 生成器

標(biāo)記 Watermark 生成器特點(diǎn):

????????每條數(shù)據(jù)到來(lái)后,都會(huì)為其生成一條?Watermark

適用場(chǎng)景:

????????數(shù)據(jù)量小且數(shù)據(jù)有序

代碼示例:????????

Step1:自定義 標(biāo)記水位線生成器 實(shí)現(xiàn)類

// 自定義 標(biāo)記水位線生成器 實(shí)現(xiàn)類
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {

    // 每進(jìn)入一條數(shù)據(jù),都會(huì)調(diào)用一次 onEvent 方法
    @Override
    /*
     * 參數(shù)說(shuō)明:
     *   @event : 進(jìn)入到該方法的事件數(shù)據(jù)
     *   @eventTimestamp : 時(shí)間戳提取器提取的時(shí)間戳
     * */
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        //發(fā)射水位線
        output.emitWatermark(new Watermark(eventTimestamp));
    }

    // 不需要實(shí)現(xiàn)
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

Step2:自定義 標(biāo)記性水位線生成策略 實(shí)現(xiàn)類

// TODO 自定義 標(biāo)記性水位線生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 實(shí)例化一個(gè) 事件時(shí)間提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };
        return timestampAssigner;
    }

    // TODO 實(shí)例化一個(gè) watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PeriodWatermarkGenerator<>();
    }
}

Step3:使用?標(biāo)記性水位線生成策略

// TODO 使用 自定義標(biāo)記 Watermark 生成器
public class UserPeriodWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // 3.為 DataStream 添加水位線生成策略 (使用 自定義WatermarkStrategy 實(shí)現(xiàn)類)
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());

        // 4.通過(guò) processFunction實(shí)例 查看生成的水位線
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 5.觸發(fā)程序執(zhí)行
        env.execute();
    }
}

查看運(yùn)行結(jié)果:

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


4.2、自定義周期性 Watermark 生成器

標(biāo)記 Watermark 生成器特點(diǎn):

? ? ? ? 基于處理時(shí)間,周期性生成?Watermark

適用場(chǎng)景:

????????數(shù)據(jù)量大且可能存在一定程度數(shù)據(jù)延遲(亂序)

代碼示例:????????

Step1:自定義 周期性水位線生成器 實(shí)現(xiàn)類

// 自定義 周期性水位線生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 設(shè)置變量,用來(lái)保存 當(dāng)前最大的事件時(shí)間
    private long currentMaxTimestamp;
    // 設(shè)置變量,指定最大的亂序時(shí)間(等待時(shí)間)
    private final long maxOutOfOrderness = 0000; // 3 秒

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 只更新當(dāng)前最大時(shí)間戳,不再發(fā)生水位線
        if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
    }

    // 周期性 生成水位線
    // 每個(gè) setAutoWatermarkInterval 時(shí)間,調(diào)用一次該方法
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發(fā)出的 watermark = 當(dāng)前最大時(shí)間戳 - 最大亂序時(shí)間
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
    }
}

Setp2:自定義 周期性水位線生成策略 實(shí)現(xiàn)類

// 自定義 周期性水位線生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 實(shí)例化一個(gè) 事件時(shí)間提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };

        return timestampAssigner;
    }

    // TODO 實(shí)例化一個(gè) watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedWatermarkGenerator<>();
    }

}

Step3:周期性水位線生成策略

// TODO 使用 自定義周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 獲取 WatermarkStrategy實(shí)例 (方式1:通過(guò) WatermarkStrategy實(shí)現(xiàn)類獲取)
        PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();

        // TODO 獲取 WatermarkStrategy實(shí)例 (方式2:通過(guò) WatermarkStrategy工具類獲取) 推薦
        WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
                .withTimestampAssigner((event, timestamp) -> event.f1);

        // 3.使用 自定義水位線策略實(shí)例 來(lái)提取時(shí)間戳&生成水位線
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);

        // 4.通過(guò) processFunction實(shí)例 查看生成的水位線
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }
}

查看運(yùn)行結(jié)果:

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器

有序流水位線生成器特點(diǎn):

? ? ? ? 基于處理時(shí)間,周期性生成?Watermark,最大亂序時(shí)間為0

適用場(chǎng)景:

? ? ? ? 大數(shù)量有序流

代碼示例:

// TODO 內(nèi)置Watermark生成器 - 有序流水位線生成器
public class UserForMonotonousTimestamps {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 創(chuàng)建 內(nèi)置水位線生成策略
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 內(nèi)置水位線生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通過(guò) processFunction實(shí)例 查看生成的水位線
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }
}

查看運(yùn)行結(jié)果:

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


4.4、內(nèi)置Watermark生成器 - 亂序流水位線生成器

亂序流水位線生成器特點(diǎn):

? ? ? ? 基于處理時(shí)間,周期性生成?Watermark,可以這是最大亂序時(shí)間

適用場(chǎng)景:

? ? ? ? 大數(shù)量亂序流

代碼示例:

// TODO 內(nèi)置Watermark生成器 - 亂序流水位線生成器
public class UserForBoundedOutOfOrderness {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 獲取 WatermarkStrategy實(shí)例
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 設(shè)置最大亂序時(shí)間為1s
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 內(nèi)置水位線生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通過(guò) processFunction實(shí)例 查看生成的水位線
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }
}

查看運(yùn)行結(jié)果:

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


4.5、在 讀取數(shù)據(jù)源時(shí) 添加水位線

// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.創(chuàng)建 Source 對(duì)象
Source source = DataGeneratorSource、KafkaSource...

// 3.讀取 source時(shí)添加水位線
env
        .fromSource(source, WatermarkStrategy實(shí)例, "source name")   
        .print()
;

// 4.觸發(fā)程序執(zhí)行
env.execute();

5、水位線和窗口的關(guān)系?

窗口什么時(shí)候創(chuàng)建?

????????當(dāng)窗口內(nèi)的第一條數(shù)據(jù)到達(dá)時(shí)

窗口什么時(shí)候觸發(fā)計(jì)算?

????????當(dāng)閾值水位線到達(dá)窗口時(shí)

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


6、水位線在各個(gè)算子間的傳遞

????????下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值

測(cè)試代碼:

// TODO 測(cè)試水位線的傳遞
public class TransmitWaterMark {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3); 

        // 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

        source
                .partitionCustom(
                        new Partitioner<String>() {
                            @Override
                            public int partition(String key, int numPartitions) {
                                if (key.equals("a")) {
                                    return 0;
                                } else if (key.equals("b")) {
                                    return 1;
                                } else {
                                    return 2;
                                }
                            }
                        }, value -> value.split(",")[0]
                )
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                //.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
                                .withTimestampAssigner((element,recordTimestamp) -> element.f1)
                                .withIdleness(Duration.ofSeconds(5))  //空閑等待5s
                )
                .process(new ShowProcessFunction()).setParallelism(1)
                .print();
        
        env.execute();
    }
}

6.1、測(cè)試用例 - 不設(shè)置?withIdleness 超時(shí)時(shí)間

現(xiàn)象:如果上游某一個(gè)子任務(wù)一直沒(méi)有數(shù)據(jù)更新,下游算子的水位線一直不會(huì)變化

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)


6.2、測(cè)試用例 - 設(shè)置?withIdleness 超時(shí)時(shí)間

現(xiàn)象:如果上游某一個(gè)子任務(wù)`在指定時(shí)間內(nèi)`數(shù)據(jù)更新,下游算子的水位線將不受該子任務(wù)最小值的影響

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)

7.2、如何理解Flink中的水位線(Watermark),# Flink API 使用技巧,flink,大數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-720219.html

到了這里,關(guān)于7.2、如何理解Flink中的水位線(Watermark)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink詳解系列之五--水位線(watermark)

    Flink詳解系列之五--水位線(watermark)

    1、概念 在Flink中,水位線是一種衡量Event Time進(jìn)展的機(jī)制,用來(lái)處理實(shí)時(shí)數(shù)據(jù)中的亂序問(wèn)題的,通常是水位線和窗口結(jié)合使用來(lái)實(shí)現(xiàn)。 從設(shè)備生成實(shí)時(shí)流事件,到Flink的source,再到多個(gè)oparator處理數(shù)據(jù),過(guò)程中會(huì)受到網(wǎng)絡(luò)延遲、背壓等多種因素影響造成數(shù)據(jù)亂序。在進(jìn)行窗口處

    2024年02月13日
    瀏覽(20)
  • [AIGC] 深入理解Flink中的窗口、水位線和定時(shí)器

    Apache Flink是一種流處理和批處理的混合引擎,它提供了一套豐富的APIs,以滿足不同的數(shù)據(jù)處理需求。在本文中,我們主要討論Flink中的三個(gè)核心機(jī)制:窗口(Windows)、水位線(Watermarks)和定時(shí)器(Timers)。 在流處理應(yīng)用中,一種常見(jiàn)的需求是計(jì)算某個(gè)時(shí)間范圍內(nèi)的數(shù)據(jù),這

    2024年03月27日
    瀏覽(27)
  • ES節(jié)點(diǎn)磁盤水位線cluster.routing.allocation.disk.watermark

    為了控制es節(jié)點(diǎn)磁盤寫入大小,es設(shè)置了水位線這一參數(shù),具體有兩個(gè): cluster.routing.allocation.disk.watermark.low ? (Dynamic) Controls the low watermark for disk usage. It defaults to? 85% , meaning that Elasticsearch will not allocate shards to nodes that have more than 85% disk used. It can alternatively be set to a ratio value

    2024年02月09日
    瀏覽(19)
  • Flink--8、時(shí)間語(yǔ)義、水位線(事件和窗口、水位線和窗口的工作原理、生產(chǎn)水位線、水位線的傳遞、遲到數(shù)據(jù)的處理)

    Flink--8、時(shí)間語(yǔ)義、水位線(事件和窗口、水位線和窗口的工作原理、生產(chǎn)水位線、水位線的傳遞、遲到數(shù)據(jù)的處理)

    ?????????????????????? 星光下的趕路人star的個(gè)人主頁(yè) ?????????????????????? 將自己生命力展開的人,他的存在,對(duì)別人就是愈療 1、從《星球大戰(zhàn)》說(shuō)起 為了更加清晰地說(shuō)明兩種語(yǔ)義的區(qū)別,我們來(lái)舉一個(gè)非常經(jīng)典的例

    2024年02月07日
    瀏覽(23)
  • flink生成水位線記錄方式--周期性水位線生成器

    在flink基于事件的時(shí)間處理中,水位線記錄的生成是一個(gè)很重要的環(huán)節(jié),本文就來(lái)記錄下幾種水位線記錄的生成方式的其中一種:周期性水位線生成器 1.1 BoundedOutOfOrdernessTimeStampExtractor 他會(huì)接收一個(gè)表示最大延遲的參數(shù),比如1分鐘,意味著如果到達(dá)的元素的事件時(shí)間和之前到

    2024年02月07日
    瀏覽(21)
  • flink生成水位線記錄方式--基于特殊記錄的水位線生成器

    在flink基于事件的時(shí)間處理中,水位線記錄的生成是一個(gè)很重要的環(huán)節(jié),本文就來(lái)記錄下幾種水位線記錄的生成方式的其中一種:基于特殊記錄的水位線生成器 我們發(fā)送的事件中,如果帶有某條特殊記錄的元素代表了某種進(jìn)度的標(biāo)識(shí)的話,我們可以基于這條特殊的記錄生成水

    2024年02月07日
    瀏覽(21)
  • flink水位線

    flink水位線

    目錄 一、什么是水位線 1》有序流中的水位線 2》亂序流中的水位線 3》水位線特性 二、水位線和窗口的工作原理 1》窗口 三、 生成水位線 1》生成水位線的總體原則 2》水位線生成策略 3》?Flink內(nèi)置水位線 四、自定義水位線生成器 1》周期性水位線生成器(Periodic Generator)

    2024年04月23日
    瀏覽(20)
  • Flink-水位線和時(shí)間語(yǔ)義

    Flink-水位線和時(shí)間語(yǔ)義

    在實(shí)際應(yīng)用中,事件時(shí)間語(yǔ)義會(huì)更為常見(jiàn)。一般情況下,業(yè)務(wù)日志數(shù)據(jù)中都會(huì)記錄數(shù)據(jù)生成的時(shí)間戳(timestamp),它就可以作為事件時(shí)間的判斷基礎(chǔ)。 在Flink中,由于處理時(shí)間比較簡(jiǎn)單,早期版本默認(rèn)的時(shí)間語(yǔ)義是處理時(shí)間;而考慮到事件時(shí)間在實(shí)際應(yīng)用中更為廣泛,從Fli

    2024年02月04日
    瀏覽(34)
  • Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    ??:可樂(lè) 可樂(lè)的生產(chǎn)日期?= 事件時(shí)間(可樂(lè)產(chǎn)生的時(shí)間); 可樂(lè)被喝的時(shí)間 = 處理時(shí)間(可樂(lè)被處理【喝掉=處理】的時(shí)間)。 機(jī)器時(shí)間:可能不準(zhǔn)確(例如:A可樂(lè)廠的時(shí)鐘比較慢,B可樂(lè)廠的時(shí)鐘比較快,但實(shí)際上B產(chǎn)生可樂(lè)的時(shí)間比A產(chǎn)生可樂(lè)的時(shí)間慢,卻被先處理了)

    2024年02月01日
    瀏覽(20)
  • Flink-水位線的設(shè)置以及傳遞

    Flink-水位線的設(shè)置以及傳遞

    6.2.1 概述 分類 有序流 無(wú)序流 判斷的時(shí)間延遲 延遲時(shí)間判定 6.2.2 水位線的設(shè)置 分析 DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本質(zhì)還是個(gè)算子,傳入的參數(shù)是WatermarkStrategy的生成策略 但是WatermarkStrategy是一個(gè)接口 有序流 因此調(diào)用靜態(tài)方法forMonotonousT

    2023年04月15日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包