国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink-水位線的設(shè)置以及傳遞

這篇具有很好參考價(jià)值的文章主要介紹了Flink-水位線的設(shè)置以及傳遞。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

6.2 水位線

6.2.1 概述

  1. 分類
  • 有序流

Flink-水位線的設(shè)置以及傳遞

  • 無(wú)序流
    Flink-水位線的設(shè)置以及傳遞
    判斷的時(shí)間延遲
  1. 延遲時(shí)間判定

6.2.2 水位線的設(shè)置

  1. 分析

Flink-水位線的設(shè)置以及傳遞
DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本質(zhì)還是個(gè)算子,傳入的參數(shù)是WatermarkStrategy的生成策略

Flink-水位線的設(shè)置以及傳遞
但是WatermarkStrategy是一個(gè)接口

  • 有序流

Flink-水位線的設(shè)置以及傳遞

因此調(diào)用靜態(tài)方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
Flink-水位線的設(shè)置以及傳遞

AscendingTimestampsWatermarks這個(gè)繼承自BoundOutOfOrdernessWatermarks

Flink-水位線的設(shè)置以及傳遞
Flink-水位線的設(shè)置以及傳遞
Flink-水位線的設(shè)置以及傳遞

BoundOutOfOrdernessWatermarks這個(gè)類有onEvent和onPeriodicEmit這兩方法,因?yàn)閷?shí)現(xiàn)了WatermarkGenerator這個(gè)接口

Flink-水位線的設(shè)置以及傳遞

然后在調(diào)用接口中的默認(rèn)方法withTimestampAssigner得到返回WatermarkStrategy,參數(shù)是new SerializableTimestampAssigner的對(duì)象,重寫(xiě)extractTimestamp方法,這個(gè)方法作用是怎么樣從數(shù)據(jù)里面提取時(shí)間戳

Flink-水位線的設(shè)置以及傳遞

  • 亂序流

Flink-水位線的設(shè)置以及傳遞
因此調(diào)用靜態(tài)方法forBoundedOutOfOrderness(參數(shù)為最大亂序程度,也就是延遲時(shí)間)后new BoundOutOfOrdernessWatermarks返回 WatermarkGernerator

Flink-水位線的設(shè)置以及傳遞

BoundOutOfOrdernessWatermarks這個(gè)類有onEvent和onPeriodicEmit這兩方法,因?yàn)閷?shí)現(xiàn)了WatermarkGenerator這個(gè)接口(跟上面一樣了)

Flink-水位線的設(shè)置以及傳遞

后面也跟有序一樣,然后在調(diào)用接口中的默認(rèn)方法withTimestampAssigner得到返回WatermarkStrategy

  • 關(guān)系圖
    Flink-水位線的設(shè)置以及傳遞
  1. 完整代碼
public class WatermarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.輸入
        SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L))
            
//                //有序流的watermark生成
//                //forMonotonousTimestamps前指定泛型
//                .assignTimestampsAndWatermarks(WatermarkStrategy
//                        .<Event>forMonotonousTimestamps()//得到WatermarkGenerator
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {//返回WatermarkStrategy
//                            @Override
//                            //參數(shù)是當(dāng)前傳過(guò)來(lái)的數(shù)據(jù)element,另一個(gè)傳出的recordTimestamp是時(shí)間戳
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp;
//                            }
//                        })
//                )
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    //forMonotonousTimestamps前指定泛型
                    //forMonotonousTimestamps參數(shù)是最大亂序時(shí)間
                    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
        env.execute();
    }
}

6.2.3 自定義水位線

  1. 分析

Flink-水位線的設(shè)置以及傳遞

或者直接new 一個(gè)接口WatermarkStrategy重寫(xiě)createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取時(shí)間戳分配器的方法(生成TimeStampAssigner)創(chuàng)建watermark

Flink-水位線的設(shè)置以及傳遞

Flink-水位線的設(shè)置以及傳遞

Flink-水位線的設(shè)置以及傳遞

Flink-水位線的設(shè)置以及傳遞

WatermarkGenerator是個(gè)接口,有兩個(gè)方法分別是onEvent方法,主要目標(biāo)是要發(fā)出一個(gè)WatermarkOutput,另一個(gè)是onperiodicEmit方法,表示周期性的生成,周期性生成時(shí)間默認(rèn)是2秒,env調(diào)用getConfig后調(diào)用setAutoWatermarkInterval后可以更改周期性生成時(shí)間

Flink-水位線的設(shè)置以及傳遞
Flink-水位線的設(shè)置以及傳遞

WatermarkOutput也是一個(gè)接口,調(diào)用emitWatermark就能發(fā)出一個(gè)watermark,

Flink-水位線的設(shè)置以及傳遞

Flink-水位線的設(shè)置以及傳遞

除了WatermarkGenerator接口還有TimeStampAssigner也是個(gè)接口,里面只有一個(gè)方法叫做extractTimestamp,目的是從當(dāng)前數(shù)據(jù)提取時(shí)間戳,同時(shí)也會(huì)作為WatermarkGenerator這個(gè)接口中onEvent方法中傳入的參數(shù)eventTimestamp時(shí)間戳

  • 關(guān)系圖
    Flink-水位線的設(shè)置以及傳遞
    這圖估計(jì)也就我自己能看的懂了。。。
  1. 代碼
  • 正常水位線
// 自定義水位線的產(chǎn)生
public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();
        env.execute();
    }
    //內(nèi)部靜態(tài)類
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
        @Override
        //createTimestampAssigner方法生成TimeStampAssigner
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                //extractTimestamp,目的是從當(dāng)前數(shù)據(jù)提取時(shí)間戳
                public long extractTimestamp(Event element, long recordTimestamp)
                {
                    return element.timestamp; // 告訴程序數(shù)據(jù)源里的時(shí)間戳是哪一個(gè)字段
                }
            };
        }
        @Override
        //createWatermarkGenerator生成WatermarkGenerator
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    }
    //CustomPeriodicGenerator實(shí)現(xiàn)WatermarkGenerator接口,并重寫(xiě)方法
    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L; // 延遲時(shí)間
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 觀察到的最大時(shí)間戳
        @Override
        //更新當(dāng)前時(shí)間戳,這邊不發(fā)送水位線,目的是保存時(shí)間戳
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                output) {
            // 每來(lái)一條數(shù)據(jù)就調(diào)用一次
            maxTs = Math.max(event.timestamp, maxTs); // 更新最大時(shí)間戳
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 發(fā)射水位線,默認(rèn) 200ms 調(diào)用一次
            //-1毫秒都是為了貼切窗口閉合的時(shí)候左閉右開(kāi)設(shè)計(jì)
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

  • 斷點(diǎn)水位線

在onevent根據(jù)條件觸發(fā),onPeriodicEmit這個(gè)方法中就不用做了

    public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        @Override
        public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
        // 只有在遇到特定的 itemId 時(shí),才發(fā)出水位線
            if (r.user.equals("Mary")) {
                output.emitWatermark(new Watermark(r.timestamp - 1));
            }
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 不需要做任何事情,因?yàn)槲覀冊(cè)?onEvent 方法中發(fā)射了水位線
        }
    }

  • 在自定義數(shù)據(jù)源中發(fā)送水位線

使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,原來(lái)直接out.collect()的

Flink-水位線的設(shè)置以及傳遞

參數(shù)是當(dāng)前數(shù)據(jù)還有當(dāng)前數(shù)據(jù)的時(shí)間戳,跟水位線生成中extractTimestamp(Event element, long recordTimestamp)這個(gè)類似,也是一個(gè)數(shù)據(jù)是什么,一個(gè)時(shí)間戳是啥

然后發(fā)送水位線,用emitWatermark方法生成

public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();
    }
    // 泛型是數(shù)據(jù)源中的類型
    public static class ClickSourceWithWatermark implements SourceFunction<Event>
    {
        private boolean running = true;
        @Override
        public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒時(shí)間戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,并指明數(shù)據(jù)中的時(shí)間戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 發(fā)送水位線
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
}

6.2.4 水位線的傳遞

針對(duì)多個(gè)分區(qū),上游需要告訴下游水位線情況,采用的是廣播的方式給所有下游子任務(wù)

但是上游如果也是并行的,向下傳輸?shù)乃痪€可能有多個(gè),以上游發(fā)過(guò)來(lái)最小的時(shí)鐘為準(zhǔn),并且下游會(huì)有一個(gè)分區(qū)專門(mén)保存上游發(fā)過(guò)來(lái)的水位線最小的數(shù)據(jù)

Flink-水位線的設(shè)置以及傳遞

Flink-水位線的設(shè)置以及傳遞文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-413700.html

到了這里,關(guān)于Flink-水位線的設(shè)置以及傳遞的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink水位線

    flink水位線

    目錄 一、什么是水位線 1》有序流中的水位線 2》亂序流中的水位線 3》水位線特性 二、水位線和窗口的工作原理 1》窗口 三、 生成水位線 1》生成水位線的總體原則 2》水位線生成策略 3》?Flink內(nèi)置水位線 四、自定義水位線生成器 1》周期性水位線生成器(Periodic Generator)

    2024年04月23日
    瀏覽(20)
  • 【FLink】水位線(Watermark)

    【FLink】水位線(Watermark)

    目錄 1、關(guān)于時(shí)間語(yǔ)義 1.1事件時(shí)間 1.2處理時(shí)間?編輯 2、什么是水位線 2.1 順序流和亂序流 2.2亂序數(shù)據(jù)的處理 2.3 水位線的特性 3 、水位線的生成 3.1 生成水位線的總體原則 3.2 水位線生成策略 3.3 Flink內(nèi)置水位線 3.3.1?有序流中內(nèi)置水位線設(shè)置 3.4.2?斷點(diǎn)式水位線生成器(Punc

    2024年02月21日
    瀏覽(19)
  • Flink-水位線和時(shí)間語(yǔ)義

    Flink-水位線和時(shí)間語(yǔ)義

    在實(shí)際應(yīng)用中,事件時(shí)間語(yǔ)義會(huì)更為常見(jiàn)。一般情況下,業(yè)務(wù)日志數(shù)據(jù)中都會(huì)記錄數(shù)據(jù)生成的時(shí)間戳(timestamp),它就可以作為事件時(shí)間的判斷基礎(chǔ)。 在Flink中,由于處理時(shí)間比較簡(jiǎn)單,早期版本默認(rèn)的時(shí)間語(yǔ)義是處理時(shí)間;而考慮到事件時(shí)間在實(shí)際應(yīng)用中更為廣泛,從Fli

    2024年02月04日
    瀏覽(34)
  • Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    ??:可樂(lè) 可樂(lè)的生產(chǎn)日期?= 事件時(shí)間(可樂(lè)產(chǎn)生的時(shí)間); 可樂(lè)被喝的時(shí)間 = 處理時(shí)間(可樂(lè)被處理【喝掉=處理】的時(shí)間)。 機(jī)器時(shí)間:可能不準(zhǔn)確(例如:A可樂(lè)廠的時(shí)鐘比較慢,B可樂(lè)廠的時(shí)鐘比較快,但實(shí)際上B產(chǎn)生可樂(lè)的時(shí)間比A產(chǎn)生可樂(lè)的時(shí)間慢,卻被先處理了)

    2024年02月01日
    瀏覽(20)
  • 【入門(mén)Flink】- 09Flink水位線Watermark

    【入門(mén)Flink】- 09Flink水位線Watermark

    在 窗口的處理過(guò)程 中,基于數(shù)據(jù)的時(shí)間戳,自定義一個(gè) “邏輯時(shí)鐘” 。這個(gè)時(shí)鐘的時(shí)間不會(huì)自動(dòng)流逝;它的時(shí)間進(jìn)展,就是靠著新到數(shù)據(jù)的時(shí)間戳來(lái)推動(dòng)的。 用來(lái)衡量 事件時(shí)間 進(jìn)展的標(biāo)記,就被稱作 “水位線”(Watermark) 。 具體實(shí)現(xiàn)上,水位線可以看作一條 特殊的數(shù)

    2024年01月17日
    瀏覽(23)
  • Flink之Watermark水印、水位線

    在Apache Flink中,Watermark(水?。┦且环N用于處理事件時(shí)間(eventtime)的時(shí)間指示器。它模擬了事件流中事件時(shí)間進(jìn)展的概念。 事件時(shí)間是指事件實(shí)際發(fā)生的時(shí)間,在分布式流處理中經(jīng)常用于處理無(wú)序事件流。然而,由于網(wǎng)絡(luò)延遲、亂序事件的到達(dá)以及分布式處理的特點(diǎn),事件

    2024年02月08日
    瀏覽(22)
  • flink水位線傳播及任務(wù)事件時(shí)間

    flink水位線傳播及任務(wù)事件時(shí)間

    本文來(lái)講解一下flink的水位線傳播及對(duì)其對(duì)任務(wù)事件時(shí)間的影響 首先f(wàn)link是通過(guò)從源頭生成水位線記錄的方式來(lái)實(shí)現(xiàn)水位線傳播的,也就是說(shuō)水位線是嵌入在正常的記錄流中的特殊記錄,攜帶者水位線的時(shí)間戳,以下我們就通過(guò)圖片的方式來(lái)講解下水位線是如何傳播以及更新

    2024年02月16日
    瀏覽(21)
  • Flink詳解系列之五--水位線(watermark)

    Flink詳解系列之五--水位線(watermark)

    1、概念 在Flink中,水位線是一種衡量Event Time進(jìn)展的機(jī)制,用來(lái)處理實(shí)時(shí)數(shù)據(jù)中的亂序問(wèn)題的,通常是水位線和窗口結(jié)合使用來(lái)實(shí)現(xiàn)。 從設(shè)備生成實(shí)時(shí)流事件,到Flink的source,再到多個(gè)oparator處理數(shù)據(jù),過(guò)程中會(huì)受到網(wǎng)絡(luò)延遲、背壓等多種因素影響造成數(shù)據(jù)亂序。在進(jìn)行窗口處

    2024年02月13日
    瀏覽(20)
  • 【Flink】Flink 中的時(shí)間和窗口之水位線(Watermark)

    【Flink】Flink 中的時(shí)間和窗口之水位線(Watermark)

    這里先介紹一下什么是 時(shí)間語(yǔ)義 , 時(shí)間語(yǔ)義 在Flink中是一種很重要的概念,下面介紹的 水位線 就是基于 時(shí)間語(yǔ)義 來(lái)講的。 在Flink中我們提到的時(shí)間語(yǔ)義一般指的是 事件時(shí)間 和 處理時(shí)間 : 處理時(shí)間(Processing Time) ,一般指執(zhí)行處理操作的系統(tǒng)時(shí)間,也就是Flink的窗口算子

    2024年02月07日
    瀏覽(21)
  • 7.2、如何理解Flink中的水位線(Watermark)

    7.2、如何理解Flink中的水位線(Watermark)

    目錄 0、版本說(shuō)明 1、什么是水位線? 2、水位線使用場(chǎng)景? 3、設(shè)計(jì)水位線主要為了解決什么問(wèn)題? 4、怎樣在flink中生成水位線? 4.1、自定義標(biāo)記 Watermark 生成器 4.2、自定義周期性 Watermark 生成器 4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器 4.4、內(nèi)置Watermark生成器 - 亂序流

    2024年02月08日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包