国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

說說FLINK細(xì)粒度滑動(dòng)窗口如何處理

這篇具有很好參考價(jià)值的文章主要介紹了說說FLINK細(xì)粒度滑動(dòng)窗口如何處理。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

分析&回答

Flink的窗口機(jī)制是其底層核心之一,也是高效流處理的關(guān)鍵。Flink窗口分配的基類是WindowAssigner抽象類,下面的類圖示出了Flink能夠提供的所有窗口類型。

說說FLINK細(xì)粒度滑動(dòng)窗口如何處理,大數(shù)據(jù),flink,java,大數(shù)據(jù)

Flink窗口分為滾動(dòng)(tumbling)、滑動(dòng)(sliding)和會(huì)話(session)窗口三大類,本文要說的是滑動(dòng)窗口。

下圖示出一個(gè)典型的統(tǒng)計(jì)用戶訪問的滑動(dòng)窗口,來自官方文檔。

說說FLINK細(xì)粒度滑動(dòng)窗口如何處理,大數(shù)據(jù),flink,java,大數(shù)據(jù)

假設(shè)每兩條虛線之間代表1分鐘時(shí)間差,那么窗口大?。╯ize)就是2分鐘,滑動(dòng)步長(slide)是1分鐘。若時(shí)間特征為事件時(shí)間,代碼如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由圖可知,當(dāng)前滑動(dòng)窗口與上一個(gè)滑動(dòng)窗口會(huì)有重疊。在窗口大小size是步長slide的2倍的情況下,(幾乎)每個(gè)DataStream元素都會(huì)處于2個(gè)窗口內(nèi)。

我們簡(jiǎn)單參考一下相關(guān)的Flink源碼,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源碼。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
        boolean isSkippedElement = true;
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
        if (windowAssigner instanceof MergingWindowAssigner) {
            // 會(huì)話窗口的處理邏輯,略去
        } else {
            for (W window : elementWindows) {
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
 
                triggerContext.key = key;
                triggerContext.window = window;
                TriggerResult triggerResult = triggerContext.onElement(element);
 
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }
        // 最后是側(cè)輸出遲到數(shù)據(jù)的邏輯,略去
    }
復(fù)制代碼

該方法先調(diào)用WindowAssigner.assignWindows()方法,根據(jù)輸入元素的時(shí)間戳判斷它應(yīng)該屬于哪些窗口。接著遍歷所有窗口,將該元素加入對(duì)應(yīng)的窗口狀態(tài)(即緩存)中,并根據(jù)觸發(fā)器返回的TriggerResult決定是輸出(fire)還是清除(purge)窗口的內(nèi)容,emitWindowContents()方法會(huì)調(diào)用用戶函數(shù)。最后,還要調(diào)用registerCleanupTimer()方法注冊(cè)計(jì)時(shí)器用來在窗口徹底過期時(shí)清除窗口狀態(tài)。

以下是SlidingEventTimeWindows.assignWindows()方法的源碼。

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                 start > timestamp - size;
                 start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
 
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
復(fù)制代碼

這段代碼就不難理解了,先調(diào)用getWindowStartWithOffset()方法根據(jù)元素的時(shí)間戳計(jì)算出其窗口的起點(diǎn)時(shí)間戳,再逐次循環(huán)向后滑動(dòng),產(chǎn)生size / slide個(gè)窗口。我們可以將size / slide叫做“粒度”,亦即上述代碼中返回的Collection集合的大小。粒度越大(“細(xì)”),滑動(dòng)窗口之間的重合也越大。

代碼讀完了,有一個(gè)貌似稀松平常的需求:

以3分鐘的頻率實(shí)時(shí)計(jì)算App內(nèi)各個(gè)子模塊近24小時(shí)的PV和UV。

直覺上我們需要用粒度為1440 / 3 = 480的滑動(dòng)窗口來實(shí)現(xiàn)它,但是細(xì)粒度的滑動(dòng)窗口會(huì)帶來性能問題,有兩點(diǎn):

狀態(tài) 由代碼可知,WindowOperator內(nèi)維護(hù)了窗口本身的內(nèi)部狀態(tài)windowState(類型為InternalAppendingState)。對(duì)于一個(gè)元素,會(huì)將其寫入對(duì)應(yīng)的(key, window)二元組所圈定的狀態(tài)中??梢?,如果粒度為480,那么每個(gè)元素到來,更新windowState時(shí)都要遍歷480個(gè)窗口并寫入,開銷是非常大的。在采用HDFS/RocksDB作為狀態(tài)后端時(shí),checkpoint的瓶頸也尤其明顯。

定時(shí)器 在Flink中,定時(shí)器的實(shí)際實(shí)現(xiàn)是TimerHeapInternalTimer類,并且是用Flink自己實(shí)現(xiàn)的優(yōu)先隊(duì)列維護(hù)在堆內(nèi)存中的。而在WindowOperator中,每一個(gè)(key, window)二元組都需要注冊(cè)兩個(gè)定時(shí)器:一是觸發(fā)器注冊(cè)的定時(shí)器,用于決定窗口數(shù)據(jù)何時(shí)輸出;二是registerCleanupTimer()方法注冊(cè)的清理定時(shí)器,用于在窗口徹底過期(如allowedLateness過期)之后及時(shí)清理掉窗口的內(nèi)部狀態(tài)。細(xì)粒度滑動(dòng)窗口會(huì)造成維護(hù)的定時(shí)器增多,內(nèi)存負(fù)擔(dān)加重。

在官方文檔Windows最后一節(jié)的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官會(huì)問:預(yù)聚合不能解決細(xì)粒度窗口的問題嗎?答案是不能。預(yù)聚合只是讓AggregateFunction/ReduceFunction之后的數(shù)據(jù)量降低,但是進(jìn)入WindowOperator的窗口狀態(tài)的數(shù)據(jù)還是沒變的。換句話說,就算觸發(fā)器實(shí)現(xiàn)為FIRE_AND_PURGE,遍歷大量窗口并寫入狀態(tài)的開銷也是無法消除的。

扯了這么多,有解決方案嗎?

當(dāng)然是有的,辦法總比困難多。我們一般使用 滾動(dòng)窗口+在線存儲(chǔ)+讀時(shí)聚合 的思路作為workaround。簡(jiǎn)單來講就是:

棄用滑動(dòng)窗口,用長度等于原滑動(dòng)窗口步長的滾動(dòng)窗口代替; 每個(gè)滾動(dòng)窗口將其周期內(nèi)的數(shù)據(jù)做聚合,打入外部在線存儲(chǔ)(內(nèi)存數(shù)據(jù)庫如Redis,LSM-based NoSQL存儲(chǔ)如HBase); 掃描在線存儲(chǔ)中對(duì)應(yīng)時(shí)間區(qū)間(可以靈活指定)的所有行,并將計(jì)算結(jié)果返回給前端展示。 針對(duì)上面的PV/UV問題,如果采用Redis作為在線存儲(chǔ),我們可以將時(shí)間戳放在key內(nèi),并設(shè)定24小時(shí)過期時(shí)間。用數(shù)字字符串存儲(chǔ)3分鐘周期內(nèi)的PV量,用HyperLogLog存儲(chǔ)3分鐘周期內(nèi)的UV量。近24小時(shí)的PV和UV就分別可以通過簡(jiǎn)單加減和HyperLogLog的pfmerge/pfcount命令得出了。當(dāng)然,實(shí)際操作起來還是要根據(jù)需求和服務(wù)器資源而定。

喵嗚面試助手:一站式解決面試問題,你可以搜索微信小程序 [喵嗚面試助手]?或關(guān)注 [喵嗚刷題] -> 面試助手?免費(fèi)刷題。如有好的面試知識(shí)或技巧期待您的共享!文章來源地址http://www.zghlxwxcb.cn/news/detail-686696.html

到了這里,關(guān)于說說FLINK細(xì)粒度滑動(dòng)窗口如何處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Apache Flink連載(二十八):Flink細(xì)粒度資源管理(1)-適用場(chǎng)景和原理

    Apache Flink連載(二十八):Flink細(xì)粒度資源管理(1)-適用場(chǎng)景和原理

    ???個(gè)人主頁:IT貧道-CSDN博客 ??? 私聊博主:私聊博主加WX好友,獲取更多資料哦~ ??? 博主個(gè)人B棧地址:豹哥教你學(xué)編程的個(gè)人空間-豹哥教你學(xué)編程個(gè)人主頁-嗶哩嗶哩視頻 目錄

    2024年02月19日
    瀏覽(18)
  • 【Flink實(shí)戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡(jiǎn)化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語句一起使用來改變執(zhí)行計(jì)劃的。本章介紹如何使用 SQL 提示來實(shí)現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強(qiáng) planner:沒有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計(jì)信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(25)
  • 記錄Flink 線上碰到j(luò)ava.lang.OutOfMemoryError: GC overhead limit exceeded如何處理?

    記錄Flink 線上碰到j(luò)ava.lang.OutOfMemoryError: GC overhead limit exceeded如何處理?

    這個(gè)問題是Flink TM內(nèi)存中我們常見的,看到這個(gè)問題我們就要想到下面這句話: 程序在垃圾回收上花了很多時(shí)間,卻收集一點(diǎn)點(diǎn)內(nèi)存,伴隨著會(huì)出現(xiàn)CPU的升高。 是不是大家出現(xiàn)這個(gè)問題都會(huì)出現(xiàn)上面這種情況呢。那我的問題出現(xiàn)如下: 發(fā)現(xiàn)JVM Heap堆內(nèi)存過高。那么堆內(nèi)存包含

    2024年02月03日
    瀏覽(22)
  • Flink流數(shù)據(jù)窗口與時(shí)間

    隨著大數(shù)據(jù)時(shí)代的到來,流處理技術(shù)變得越來越重要。流處理系統(tǒng)可以實(shí)時(shí)地處理大量數(shù)據(jù),為實(shí)時(shí)應(yīng)用提供有價(jià)值的信息。Apache Flink是一個(gè)流處理框架,它可以處理大規(guī)模的流數(shù)據(jù),并提供豐富的功能,如窗口操作、時(shí)間操作等。在本文中,我們將深入探討Flink流數(shù)據(jù)窗口

    2024年02月20日
    瀏覽(19)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink窗口函數(shù)

    前面指定了窗口的分配器, 接著我們需要來指定如何計(jì)算, 這事由window function來負(fù)責(zé). 一旦窗口關(guān)閉, window function 去計(jì)算處理窗口中的每個(gè)元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一種. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以對(duì)

    2024年02月11日
    瀏覽(37)
  • flink 最后一個(gè)窗口一直沒有新數(shù)據(jù),窗口不關(guān)閉問題

    flink 最后一個(gè)窗口一直沒有新數(shù)據(jù),窗口不關(guān)閉問題

    窗口類型:滾動(dòng)窗口 代碼: 代碼部分邏輯說明 若設(shè)置了自動(dòng)生成watermark 參數(shù),根據(jù)打印日志,設(shè)置對(duì)應(yīng)的時(shí)間(多久沒新數(shù)據(jù)寫入,觸發(fā)窗口計(jì)算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定義的watermark: watermark 周期生成()的疑問: 1、默認(rèn)200ms,會(huì)連續(xù)生成4次后,

    2024年01月18日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時(shí)間滾動(dòng)動(dòng)窗口

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時(shí)間滾動(dòng)動(dòng)窗口

    在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個(gè)消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個(gè)窗口,用來收集

    2024年02月11日
    瀏覽(22)
  • (增加細(xì)粒度資源管理)深入理解flink的task slot相關(guān)概念

    (增加細(xì)粒度資源管理)深入理解flink的task slot相關(guān)概念

    之前對(duì)flink的task slot的理解太淺了,重新捋一下相關(guān)知識(shí)點(diǎn) 我們知道,flink中每個(gè)TaskManager都是一個(gè)?JVM?進(jìn)程,可以在單獨(dú)的線程中執(zhí)行一個(gè)或多個(gè)?subtask(線程)。 但是TaskManager?的計(jì)算資源是有限的,并不是所有任務(wù)都可以放在同一個(gè)?TaskManager?上并行執(zhí)行。并行的任務(wù)越多

    2024年03月11日
    瀏覽(31)
  • 說說Flink雙流join

    說說Flink雙流join

    Flink雙流JOIN主要分為兩大類 一類是基于原生State的Connect算子操作 另一類是基于窗口的JOIN操作。其中基于窗口的JOIN可細(xì)分為window join和interval join兩種。 基于原生State的Connect算子操作 實(shí)現(xiàn)原理:底層原理依賴Flink的State狀態(tài)存儲(chǔ),通過將數(shù)據(jù)存儲(chǔ)到State中進(jìn)行關(guān)聯(lián)join, 最終輸出

    2024年02月10日
    瀏覽(25)
  • 說說Flink運(yùn)行模式

    說說Flink運(yùn)行模式

    1.開發(fā)者模式 ? ??在idea中運(yùn)行Flink程序的方式就是開發(fā)模式。 2.local-cluster模式 ? ??Flink中的Local-cluster(本地集群)模式,單節(jié)點(diǎn)運(yùn)行,主要用于測(cè)試, 學(xué)習(xí)。 3.Standalone模式 ????????獨(dú)立集群模式,由Flink自身提供計(jì)算資源。 4.Yarn模式 把Flink應(yīng)用提交給Yarn的ResourceManager Flin

    2024年02月10日
    瀏覽(16)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包