国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

源碼解析FlinkKafkaConsumer支持周期性水位線發(fā)送

這篇具有很好參考價(jià)值的文章主要介紹了源碼解析FlinkKafkaConsumer支持周期性水位線發(fā)送。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

背景

當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程

FlinkKafkaConsumer水位線發(fā)送

1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時(shí)候會(huì)構(gòu)建一個(gè)周期性的水位線發(fā)送線程并啟動(dòng)

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
            PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
                    new PeriodicWatermarkEmitter<>(
                            checkpointLock,
                            subscribedPartitionStates,
                            watermarkOutputMultiplexer,
                            processingTimeProvider,
                            autoWatermarkInterval);

            periodicEmitter.start();
        }

2.隨后,PeriodicWatermarkEmitter中注冊處理時(shí)間定時(shí)器,周期性執(zhí)行

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }

        @Override
        public void onProcessingTime(long timestamp) {

            synchronized (checkpointLock) {
                for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
                    // 這里當(dāng)前算子任務(wù)消費(fèi)的kafka 分區(qū)分別記錄每個(gè)分區(qū)的水位值
                    state.onPeriodicEmit();
                }
				//這里當(dāng)前算子會(huì)把自己消費(fèi)的kafka分區(qū)的所有水位線取最小值后當(dāng)成當(dāng)前算子任務(wù)自身的水位線發(fā)送出去,注意這里是當(dāng)前算子任務(wù)級(jí)別的
                watermarkOutputMultiplexer.onPeriodicEmit();
            }

            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }
    }

3.對應(yīng)state.onPeriodicEmit();記錄每個(gè)kafka分區(qū)的水位線方法

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
        if (next != null) {
            output.emitWatermark(new Watermark(next.getTimestamp()));
        }
    }
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代碼如下:
        public DeferredOutput(OutputState state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            state.setWatermark(watermark.getTimestamp());
        }
所以這里最終效果只是對應(yīng)state(kafka分區(qū)[注意,一個(gè)算子任務(wù)有可能消費(fèi)好幾個(gè)kafka分區(qū)])上設(shè)置了水位線
        /**
         * Returns true if the watermark was advanced, that is if the new watermark is larger than
         * the previous one.
         *
         * <p>Setting a watermark will clear the idleness flag.
         */
        public boolean setWatermark(long watermark) {
            this.idle = false;
            final boolean updated = watermark > this.watermark;
            // 這里也可以看出來,即使代碼里面發(fā)送了更小值的水位線,水位線也不會(huì)回退
            this.watermark = Math.max(watermark, this.watermark);
            return updated;
        }        

4.對應(yīng)算子任務(wù)組合當(dāng)前任務(wù)消費(fèi)的所有分區(qū)水位線的方法文章來源地址http://www.zghlxwxcb.cn/news/detail-718112.html

private void updateCombinedWatermark() {
        long minimumOverAllOutputs = Long.MAX_VALUE;

        boolean hasOutputs = false;
        boolean allIdle = true;
        for (OutputState outputState : watermarkOutputs) {
            if (!outputState.isIdle()) {
                minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
                allIdle = false;
            }
            hasOutputs = true;
        }

        // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
        // at its initial Long.MAX_VALUE state and we must not emit that
        // 如果算子任務(wù)不消費(fèi)任何分區(qū),它不會(huì)發(fā)出任何水位線,這里是不是就是kafka消費(fèi)者要小于kafka主題的原因所在???
        if (!hasOutputs) {
            return;
        }

        if (allIdle) {// 如果當(dāng)前算子任務(wù)處于空閑時(shí)間,標(biāo)識(shí)空閑,以便后續(xù)算子可以繼續(xù)推進(jìn)
            underlyingOutput.markIdle();
        } else if (minimumOverAllOutputs > combinedWatermark) {
            combinedWatermark = minimumOverAllOutputs;
            underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
        }
    }```

    

到了這里,關(guān)于源碼解析FlinkKafkaConsumer支持周期性水位線發(fā)送的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink生成水位線記錄方式--周期性水位線生成器

    在flink基于事件的時(shí)間處理中,水位線記錄的生成是一個(gè)很重要的環(huán)節(jié),本文就來記錄下幾種水位線記錄的生成方式的其中一種:周期性水位線生成器 1.1 BoundedOutOfOrdernessTimeStampExtractor 他會(huì)接收一個(gè)表示最大延遲的參數(shù),比如1分鐘,意味著如果到達(dá)的元素的事件時(shí)間和之前到

    2024年02月07日
    瀏覽(21)
  • Abaqus CAE 2018插件使用詳解:基于周期性邊界條件定義3D幾何模型的實(shí)踐指南**

    注:這篇文章是為了幫助讀者更好地理解和使用Abaqus CAE 2018的插件來定義周期性邊界條件。所提供的信息是基于我個(gè)人的理解和實(shí)踐,如有不準(zhǔn)確或者有更好的建議,歡迎讀者們指正和交流。 一、 引言 在進(jìn)行無限或半無限域建模時(shí),周期性邊界條件可為我們提供了一種模擬

    2024年02月11日
    瀏覽(26)
  • 使用Dream3D和MATLAB從綜合構(gòu)建微結(jié)構(gòu)到創(chuàng)建具有周期性邊界條件的Abaqus輸入文件的一站式解決方案

    聲明 :本文中的所有內(nèi)容僅供學(xué)術(shù)研究和討論,不保證完全無誤。對于使用本文內(nèi)容可能產(chǎn)生的任何后果,作者不承擔(dān)任何責(zé)任。希望大家在使用時(shí),結(jié)合自己的實(shí)際情況進(jìn)行酌情調(diào)整。 當(dāng)我們面臨材料力學(xué)問題,包括材料的疲勞、斷裂和塑性等行為的仿真時(shí),一個(gè)常見的

    2024年02月10日
    瀏覽(44)
  • 為什么RIP使用UDP,OSPF使用IP,而BGP使用TCP?為什么RIP周期性地和鄰站交換路由信息而BGP卻不這樣做?

    RIP只和鄰站交換信息,使用UDP無可靠保障,但開銷小,可以滿足RIP要求; OSPF使用可靠的洪泛法,直接使用IP,靈活、開銷小; BGP需要交換整個(gè)路由表和更新信息,TCP提供可靠交付以減少帶寬消耗; RIP使用不保證可靠交付的UDP,因此必須不斷地(周期性地)和鄰站交換信息才

    2024年02月02日
    瀏覽(34)
  • 【正點(diǎn)原子STM32】RTC實(shí)時(shí)時(shí)鐘(RTC方案、BCD碼、時(shí)間戳、RTC相關(guān)寄存器和HAL庫驅(qū)動(dòng)、RTC基本配置步驟、RTC基本驅(qū)動(dòng)步驟、時(shí)間設(shè)置和讀取、RTC鬧鐘配置和RTC周期性自動(dòng)喚醒配置)

    【正點(diǎn)原子STM32】RTC實(shí)時(shí)時(shí)鐘(RTC方案、BCD碼、時(shí)間戳、RTC相關(guān)寄存器和HAL庫驅(qū)動(dòng)、RTC基本配置步驟、RTC基本驅(qū)動(dòng)步驟、時(shí)間設(shè)置和讀取、RTC鬧鐘配置和RTC周期性自動(dòng)喚醒配置)

    一、RTC簡介 二、STM32 RTC框圖介紹 2.1、STM32 F1 RTC結(jié)構(gòu)框圖 2.2、STM32 F4 / F7 / H7 RTC結(jié)構(gòu)框圖 三、RTC相關(guān)寄存器介紹 3.1、RTC基本配置步驟 3.2、RTC相關(guān)寄存器(F1) 3.3、RTC相關(guān)寄存器(F4 / F7 / H7) 四、RTC相關(guān)HAL庫驅(qū)動(dòng)介紹 4.1、RTC相關(guān)HAL庫驅(qū)動(dòng)(F1) 4.2、RTC相關(guān)HAL庫驅(qū)動(dòng)(F4 / F7 /

    2024年03月27日
    瀏覽(25)
  • 【框架源碼】Spring源碼解析之Bean生命周期流程

    【框架源碼】Spring源碼解析之Bean生命周期流程

    觀看本文前,我們先思考一個(gè)問題,什么是Spring的bean的生命周期?這也是我們在面試的時(shí)候,面試官常問的一個(gè)問題。 在沒有Spring之前,我們創(chuàng)建對象的時(shí)候,采用new的方式,當(dāng)對象不在被使用的時(shí)候,由Java的垃圾回收機(jī)制回收。 而 Spring 中的對象是 bean,bean 和普通的 J

    2024年02月09日
    瀏覽(21)
  • Spring之Bean生命周期源碼解析

    Spring之Bean生命周期源碼解析

    ClassPathBeanDefinitionScanner.java ClassPathScanningCandidateComponentProvider.java 通過組件索引尋找 這里的 componentsIndex 在初始化的時(shí)候會(huì)嘗試解析 META-INF/spring.components 文件中的配置信息 把斷點(diǎn)打在 ClassPathScanningCandidateComponentProvider 的 setResourceLoader 方法上調(diào)試可以看到堆棧 可以看到,的確

    2024年02月11日
    瀏覽(29)
  • 【深入Spring源碼解析:解密Bean的生命周期】

    Spring是Java企業(yè)級(jí)應(yīng)用開發(fā)領(lǐng)域的一顆明星,它提供了很多方便開發(fā)人員的工具和思想。在分布式系統(tǒng)中,Spring的分布式遠(yuǎn)程協(xié)作方案,比如REST、Web服務(wù)以及消息傳遞等,也是不可或缺的。 你知道嗎?在我們使用Spring時(shí),容器中存放的所有對象,在Spring啟動(dòng)的時(shí)候就完成了實(shí)

    2024年02月05日
    瀏覽(28)
  • 【Spring】Spring之Bean生命周期源碼解析

    什么是bean的生命周期 是指bean在spring中是如何生成,如何銷毀的; spring創(chuàng)建對象的過程,就是IOC(控制反轉(zhuǎn))的過程; JFR Java Flight Record,java飛行記錄,類似于飛機(jī)的黑匣子,是JVM內(nèi)置的基于事件的JDK監(jiān)控記錄框架,主要用于問題定位和持續(xù)監(jiān)控; 入口代碼: Spring啟動(dòng)的時(shí)

    2024年02月15日
    瀏覽(23)
  • 【Spring專題】Spring之Bean的生命周期源碼解析——上(掃描生成BeanDefinition)

    【Spring專題】Spring之Bean的生命周期源碼解析——上(掃描生成BeanDefinition)

    由于Spring源碼分析是一個(gè)前后聯(lián)系比較強(qiáng)的過程,而且這邊分析,也是按照代碼順序講解的,所以不了解前置知識(shí)的情況下,大概率沒辦法看懂當(dāng)前的內(nèi)容。所以,特別推薦看看我前面的文章(自上而下次序): Spring底層核心原理解析——引導(dǎo)篇【學(xué)習(xí)難度: ★★☆☆☆ 】

    2024年02月13日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包