国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口

這篇具有很好參考價(jià)值的文章主要介紹了《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

6.1 時(shí)間語(yǔ)義

6.1.1 Flink中的時(shí)間語(yǔ)義

對(duì)于一臺(tái)機(jī)器而言,時(shí)間就是系統(tǒng)時(shí)間。但是Flink是一個(gè)分布式處理系統(tǒng),多臺(tái)機(jī)器“各自為政”,沒(méi)有統(tǒng)一的時(shí)鐘,各自有各自的系統(tǒng)時(shí)間。而對(duì)于并行的子任務(wù)來(lái)說(shuō),在不同的節(jié)點(diǎn),系統(tǒng)時(shí)間就會(huì)有所差異。

我們知道一個(gè)集群有JobManager,作為管理者,是不是讓它統(tǒng)一向所有 TaskManager 發(fā)送同步時(shí)鐘信號(hào)就行了呢?這也是不行的。因?yàn)榫W(wǎng)絡(luò)傳輸會(huì)有延遲,而且這延遲是不確定的,所以 JobManager 發(fā)出的同步信號(hào)無(wú)法同時(shí)到達(dá)所有節(jié)點(diǎn);想要擁有一個(gè)全局統(tǒng)一的時(shí)鐘,在分布式系統(tǒng)里是做不到的。

另一個(gè)麻煩的問(wèn)題是,在流式處理的過(guò)程中,數(shù)據(jù)是在不同的節(jié)點(diǎn)間不停流動(dòng)的,這同樣也會(huì)有網(wǎng)絡(luò)傳輸?shù)难舆t。例如,上游任務(wù)在 8 點(diǎn) 59 分 59 秒發(fā)出一條數(shù)據(jù),到下游要做窗口計(jì)算時(shí)已經(jīng)是 9 點(diǎn)01 秒了,那這條數(shù)據(jù)到底該不該被收到 8 點(diǎn)~9 點(diǎn)的窗口呢?

流式數(shù)據(jù)處理過(guò)程:事件發(fā)生->生成數(shù)據(jù)->進(jìn)入分布式消息隊(duì)列->源算子讀取->轉(zhuǎn)換算子(窗口算子)做處理->輸出算子輸出

兩個(gè)重要的時(shí)間點(diǎn):數(shù)據(jù)的產(chǎn)生時(shí)間——事件時(shí)間;轉(zhuǎn)換算子(窗口算子)處理的事件——處理時(shí)間

我們?cè)诙x窗口操作時(shí),到底以哪種時(shí)間作為衡量標(biāo)準(zhǔn),就是所謂的時(shí)間語(yǔ)義。

1、處理時(shí)間

執(zhí)行處理操作的機(jī)器的系統(tǒng)時(shí)間

2、事件時(shí)間

事件在對(duì)應(yīng)設(shè)備上發(fā)生的時(shí)間,也就是數(shù)據(jù)生成的時(shí)間

舉例:用戶在手機(jī)上點(diǎn)擊某個(gè)按鈕生成點(diǎn)擊事件,點(diǎn)擊時(shí)手機(jī)上的時(shí)間是8:59:59,數(shù)據(jù)傳送到某個(gè)節(jié)點(diǎn)進(jìn)行計(jì)算處理時(shí)的時(shí)間為9:00:01。

8:59:59——事件時(shí)間
9:00:01——處理時(shí)間

如果我們以事件時(shí)間為準(zhǔn),則這條數(shù)據(jù)屬于8——9點(diǎn),如果我們以處理時(shí)間為準(zhǔn),則這條數(shù)據(jù)屬于9——10點(diǎn)

在實(shí)際應(yīng)用中,由于分布式系統(tǒng)中網(wǎng)絡(luò)傳輸延遲的不確定性,數(shù)據(jù)達(dá)到的順序往往是亂序的。例如:我現(xiàn)在以事件時(shí)間為準(zhǔn),進(jìn)行統(tǒng)計(jì)每一個(gè)小時(shí)的點(diǎn)擊量?,F(xiàn)在有三個(gè)點(diǎn)擊事件,事件時(shí)間分別為 a——8:50:00、b——8:59:59、 c——9:00:01,但是到達(dá)時(shí)間分別為9:02:00、09:01:00、08:58:00??梢钥吹絚事件最先到達(dá)。那么當(dāng)窗口接收到c事件時(shí),c事件的事件時(shí)間是9:00:01,這時(shí)窗口認(rèn)為現(xiàn)在已經(jīng)過(guò)了9點(diǎn)了。應(yīng)該馬上統(tǒng)計(jì)8——9點(diǎn)的點(diǎn)擊量。但是a、b事件由于延遲,在c事件到達(dá)之后才到達(dá)。導(dǎo)致沒(méi)有被統(tǒng)計(jì)到8——9點(diǎn)的點(diǎn)擊量中。

所以還不能簡(jiǎn)單的使用事件時(shí)間來(lái)當(dāng)作時(shí)鐘,還需要用另外的標(biāo)志來(lái)表示事件時(shí)間的進(jìn)展。這個(gè)標(biāo)志我們稱之為“水位線”。

6.1.2 哪種語(yǔ)義更重要

兩種語(yǔ)義都有各自適用的場(chǎng)景。通常來(lái)說(shuō)處理時(shí)間是計(jì)算效率的衡量標(biāo)準(zhǔn),而事件時(shí)間更符合業(yè)務(wù)的計(jì)算邏輯。

處理時(shí)間一般用在實(shí)時(shí)性極高,但結(jié)果準(zhǔn)確性要求不高的的場(chǎng)景。事件時(shí)間語(yǔ)義是以一定延遲為代價(jià),換來(lái)了處理結(jié)果的正確性。

除了事件時(shí)間和處理時(shí)間,F(xiàn)link 還有一個(gè)“攝入時(shí)間”(Ingestion Time)的概念,它是指數(shù)據(jù)進(jìn)入 Flink 數(shù)據(jù)流的時(shí)間,也就是 Source 算子讀入數(shù)據(jù)的時(shí)間。攝入時(shí)間相當(dāng)于是事件時(shí)間和處理時(shí)間的一個(gè)中和,它是把 Source 任務(wù)的處理時(shí)間,當(dāng)作了數(shù)據(jù)的產(chǎn)生時(shí)間添加到數(shù)據(jù)里。這種時(shí)間語(yǔ)義可以保證比較好的正確性,同時(shí)又不會(huì)引入太大的延遲。它的具體行為跟事件時(shí)間非常像,可以當(dāng)作特殊的事件時(shí)間來(lái)處理。

6.2 水位線

6.2.1 事件時(shí)間和窗口

前面已經(jīng)講過(guò),一個(gè)數(shù)據(jù)產(chǎn)生的時(shí)刻,就是流處理中事件觸發(fā)的時(shí)間點(diǎn),這就是“事件時(shí)間”。有時(shí)候我們不是來(lái)一個(gè)數(shù)據(jù)就處理輸出,而是要計(jì)算一段時(shí)間內(nèi)的數(shù)。比如實(shí)時(shí)統(tǒng)計(jì)每個(gè)小時(shí)的點(diǎn)擊量。例如:8——9點(diǎn)的點(diǎn)擊量,需要等數(shù)據(jù)到齊了才能統(tǒng)計(jì)輸出。那么這個(gè)8——9點(diǎn)就是一個(gè)窗口。而這里1個(gè)小時(shí)就是窗口的大小。

6.2.2 什么是水位線

只通過(guò)事件時(shí)間來(lái)判斷是否一個(gè)窗口的數(shù)據(jù)已經(jīng)到齊是不行的。我們可以基于事件時(shí)間去自定義一個(gè)時(shí)鐘,用來(lái)表示當(dāng)前時(shí)間的進(jìn)展。例如:我們定義一個(gè)時(shí)鐘,這個(gè)時(shí)鐘的時(shí)間邏輯是比事件時(shí)間晚5分鐘。當(dāng)一個(gè)數(shù)據(jù)過(guò)來(lái),它的事件時(shí)間是9:00:00,這時(shí)窗口會(huì)認(rèn)為是8:55:00。這時(shí),窗口認(rèn)為還沒(méi)有到9點(diǎn),所以8——9點(diǎn)的窗口統(tǒng)計(jì)還不到時(shí)間。會(huì)再等等,等收到大于或等于9:05:00的數(shù)據(jù)時(shí)才會(huì)進(jìn)行統(tǒng)計(jì)。這樣如果有事件時(shí)間為8:58:00的數(shù)據(jù)在9:04:00才到來(lái)時(shí),也能夠被統(tǒng)計(jì)到8——9點(diǎn)的窗口中。因?yàn)?:04:00的窗口時(shí)間是8:59:00并沒(méi)有到9點(diǎn)。

我們定義的這個(gè)時(shí)鐘,是用來(lái)衡量事件時(shí)間進(jìn)展的,是一個(gè)邏輯時(shí)鐘。

但僅僅通過(guò)定義一個(gè)邏輯時(shí)鐘,還不夠。還存在以下問(wèn)題:

  • 當(dāng)窗口聚合時(shí),要攢一批數(shù)據(jù)才會(huì)輸出結(jié)果,那么給下游的數(shù)據(jù)就會(huì)變少,時(shí)間進(jìn)度的控制就不夠精細(xì)了
  • 數(shù)據(jù)向下游任務(wù)傳遞時(shí),一般只能傳輸給一個(gè)子任務(wù)(除廣播外),這樣其他的并行子任務(wù)的時(shí)鐘就無(wú)法推進(jìn)了,不能進(jìn)行窗口計(jì)算。

解決辦法:

? 在數(shù)據(jù)流中加入一個(gè)時(shí)鐘標(biāo)記,記錄當(dāng)前的事件時(shí)間;這個(gè)標(biāo)記可以直接廣播到下游,當(dāng)下游任務(wù)收到這個(gè)標(biāo)記,就可以更新自己的時(shí)鐘了。由于類似于水流中用來(lái)做標(biāo)志的記號(hào),在 Flink 中,這種用來(lái)衡量事件時(shí)間(Event Time)進(jìn)展的標(biāo)記,就被稱作**“水位線”(Watermark)**。

1、有序流中的水位線

在理想狀態(tài)下,數(shù)據(jù)應(yīng)該按照它們生成的先后順序、排好隊(duì)進(jìn)入流中;這樣的話我們從每個(gè)數(shù)據(jù)中提取時(shí)間戳,就可以保證總是從小到大增長(zhǎng)的,從而插入的水位線也會(huì)不斷增長(zhǎng)、事件時(shí)鐘不斷向前推進(jìn)。

實(shí)際應(yīng)用中,如果當(dāng)前數(shù)據(jù)量非常大,可能會(huì)有很多數(shù)據(jù)的時(shí)間戳是相同的,這時(shí)每來(lái)一條數(shù)據(jù)就提取時(shí)間戳、插入水位線就做了大量的無(wú)用功。而且即使時(shí)間戳不同,同時(shí)涌來(lái)的數(shù)據(jù)時(shí)間差會(huì)非常?。ū热鐜缀撩耄?,往往對(duì)處理計(jì)算也沒(méi)什么影響。所以為了提高效率,一般會(huì)每隔一段時(shí)間生成一個(gè)水位線,這個(gè)水位線的時(shí)間戳,就是當(dāng)前最新數(shù)據(jù)的時(shí)間戳。這里周期時(shí)間是指處理時(shí)間(系統(tǒng)時(shí)間),而不是事件時(shí)間

2、亂序流中的水位線

我們知道在分布式系統(tǒng)中,數(shù)據(jù)在節(jié)點(diǎn)間傳輸,會(huì)因?yàn)榫W(wǎng)絡(luò)傳輸延遲的不確定性, 導(dǎo)致順序發(fā)生改變,這就是所謂的“亂序數(shù)據(jù)”。這里所說(shuō)的“亂序”(out-of-order),是指數(shù)據(jù)的先后順序不一致,主要就是基于數(shù)據(jù)的產(chǎn)生時(shí)間而言的。

最直觀的想法自然是跟之前一樣,我們還是靠數(shù)據(jù)來(lái)驅(qū)動(dòng),每來(lái)一個(gè)數(shù)據(jù)就提取它的時(shí)間戳、插入一個(gè)水位線。不過(guò)現(xiàn)在的情況是數(shù)據(jù)亂序,所以有可能新的時(shí)間戳比之前的還小,如果直接將這個(gè)時(shí)間的水位線再插入,我們的“時(shí)鐘”就回退了——水位線就代表了時(shí)鐘,時(shí)光不能倒流,所以水位線的時(shí)間戳也不能減小。解決思路也很簡(jiǎn)單:我們插入新的水位線時(shí),要先判斷一下時(shí)間戳是否比之前的大,否則就不再生成新的水位線。

如果考慮到大量數(shù)據(jù)同時(shí)到來(lái)的處理效率,我們同樣可以周期性地生成水位線。這時(shí)只需要保存一下之前所有數(shù)據(jù)中的最大時(shí)間戳,需要插入水位線時(shí),就直接以它作為時(shí)間戳生成新的水位線。這樣做盡管可以定義出一個(gè)事件時(shí)鐘,卻也會(huì)帶來(lái)一個(gè)非常大的問(wèn)題:我們無(wú)法正確處理“遲到”的數(shù)據(jù)。為了了讓窗口能夠正確收集到遲到的數(shù)據(jù),我們也可以等上 2 秒;也就是用當(dāng)前已有數(shù)據(jù)的最大時(shí)間戳減去 2 秒,就是要插入的水位線的時(shí)間戳

如果仔細(xì)觀察,我們可以知道,這種“等 2 秒”的策略其實(shí)并不能處理所有的亂序數(shù)據(jù)。因?yàn)橛袝r(shí)候不知道最大延遲是多少。當(dāng)你設(shè)置了等10秒,但是這時(shí)有條數(shù)據(jù)晚了20秒,就會(huì)被遺漏丟棄。所以這個(gè)時(shí)候我們需要去單獨(dú)處理“遲到”的數(shù)據(jù)。后面會(huì)講解這種情況的處理。

需要注意的地方:
1.由于水位線是周期性生成的,所以插入的位置不一定是在時(shí)間戳最大的數(shù)據(jù)后面。
2.這里一個(gè)窗口所收集的數(shù)據(jù),并不是之前所有已經(jīng)到達(dá)的數(shù)據(jù)。因?yàn)閿?shù)據(jù)屬于哪個(gè)窗口,是由數(shù)據(jù)本身的時(shí)間戳決定的,一個(gè)窗口只會(huì)收集真正屬于它的那些數(shù)據(jù)。

《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口,# Flink,flink,學(xué)習(xí),筆記

例如上圖中盡管水位線 W(20)之前有時(shí)間戳為 22 的數(shù)據(jù)到來(lái),但是10~20 秒的窗口中也不會(huì)收集這個(gè)數(shù)據(jù),進(jìn)行計(jì)算依然可以得到正確的結(jié)果。

3、水位線的特性
  • 水位線是插入到數(shù)據(jù)流中的一個(gè)標(biāo)記,可以認(rèn)為是一個(gè)特殊的數(shù)據(jù)

  • 水位線主要的內(nèi)容是一個(gè)時(shí)間戳,用來(lái)表示當(dāng)前事件時(shí)間的進(jìn)展

  • 水位線是基于數(shù)據(jù)的時(shí)間戳生成的

  • 水位線的時(shí)間戳必須單調(diào)遞增,以確保任務(wù)的事件時(shí)間時(shí)鐘一直向前推進(jìn)

  • 水位線可以通過(guò)設(shè)置延遲,來(lái)保證正確處理亂序數(shù)據(jù)

  • 一個(gè)水位線 Watermark(t),表示在當(dāng)前流中事件時(shí)間已經(jīng)達(dá)到了時(shí)間戳 t, 這代表 t 之前的所有數(shù)據(jù)都到齊了,之后流中不會(huì)出現(xiàn)時(shí)間戳 t’ ≤ t 的數(shù)據(jù)

6.2.3 如何生成水位線

生成水位線其實(shí)就是定義我們的時(shí)鐘的邏輯。

1、水位線生成的總體原則

我們知道,完美的水位線是“絕對(duì)正確”的,也就是一個(gè)水位線一旦出現(xiàn),就表示這個(gè)時(shí)間之前的數(shù)據(jù)已經(jīng)全部到齊、之后再也不會(huì)出現(xiàn)了。而完美的東西總是可望不可及,我們只能盡量去保證水位線的正確。如果對(duì)結(jié)果正確性要求很高、想要讓窗口收集到所有數(shù)據(jù),我們?cè)撛趺醋瞿??一個(gè)字,等。由于網(wǎng)絡(luò)傳輸?shù)难舆t不確定,為了獲取所有遲到數(shù)據(jù),我們只能等待更長(zhǎng)的時(shí)間。作為籌劃全局的程序員,我們當(dāng)然不會(huì)傻傻地一直等下去。那到底等多久呢?如果等太久,那很多遲到的數(shù)據(jù)基本不會(huì)遺漏,但是程序輸出延遲會(huì)增加。如果等待時(shí)間短,那遲到的數(shù)據(jù)會(huì)被遺漏,結(jié)果的準(zhǔn)確性難以保證。

所以水位線生成的總體原則:權(quán)衡低延遲和結(jié)果正確性

常用解決方案

a.需要對(duì)相關(guān)領(lǐng)域有一定的了解了,根據(jù)業(yè)務(wù)來(lái)定奪。
b.可以單獨(dú)創(chuàng)建一個(gè) Flink 作業(yè)來(lái)監(jiān)控事件流,建立概率分布或者機(jī)器學(xué)習(xí)模型,學(xué)習(xí)事件的遲到規(guī)律。得到分布規(guī)律之后,就可以選擇置信區(qū)間來(lái)確定延遲,作為水位線的生成策略了。例如,如果得到數(shù)據(jù)的遲到時(shí)間服從μ=1,σ=1 的正態(tài)分布,那么設(shè)置水位線延遲為 3 秒,就可以保證至少 97.7%的數(shù)據(jù)可以正確處理。
c.對(duì)遲到數(shù)據(jù)單獨(dú)處理

2、水位線生成策略

在Flink的DataStream API中,有一個(gè)單獨(dú)用于生成水位線的方法。為流中的數(shù)據(jù)分配時(shí)間戳,并生成水位線。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

數(shù)據(jù)本身不是有時(shí)間戳嗎,為什么還要為數(shù)據(jù)分配時(shí)間戳呢?

這是因?yàn)樵嫉臅r(shí)間戳只是寫(xiě)入日志數(shù)據(jù)的一個(gè)字段,如果不提取出來(lái)并明確把它分配給數(shù)據(jù), Flink 是無(wú)法知道數(shù)據(jù)真正產(chǎn)生的時(shí)間的。

參數(shù):

watermarkStrategy: 水位線策略

WatermarkStrategy繼承了TimestampAssignerSupplier——時(shí)間分配器和WatermarkGeneratorSupplier——水位線生成器

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>

TimestampAssigner:主要負(fù)責(zé)從流中數(shù)據(jù)元素的某個(gè)字段中提取時(shí)間戳,并分配給元素。時(shí)間戳的分配是生成水位線的基礎(chǔ)。

WatermarkGenerator:主要負(fù)責(zé)按照既定的方式, 基于時(shí)間戳生成水位線。在WatermarkGenerator 接口中,主要又有兩個(gè)方法:onEvent()和 onPeriodicEmit()

onEvent:每個(gè)事件(數(shù)據(jù))到來(lái)都會(huì)調(diào)用的方法,它的參數(shù)有當(dāng)前事件、時(shí)間戳, 以及允許發(fā)出水位線的一個(gè) WatermarkOutput,可以基于事件做各種操作

onPeriodicEmit:onPeriodicEmit:周期性調(diào)用的方法,可以由 WatermarkOutput 發(fā)出水位線。周期時(shí)間為處理時(shí)間,可以調(diào)用環(huán)境配置的.setAutoWatermarkInterval()方法來(lái)設(shè)置,默認(rèn)為

200ms

3、Flink內(nèi)置水位線生成器

atermarkStrategy 這個(gè)接口是一個(gè)生成水位線策略的抽象,讓我們可以靈活地實(shí)現(xiàn)自己的需求;

Flink提供了內(nèi)置的水位線生成器(WatermarkGenerator),不僅開(kāi)箱即用簡(jiǎn)化了編程,而且也為我們自定義水位線策略提供了模板

(1)有序流

對(duì)于有序流,主要特點(diǎn)就是時(shí)間戳單調(diào)增長(zhǎng)(Monotonously Increasing Timestamps),所以永遠(yuǎn)不會(huì)出現(xiàn)遲到數(shù)據(jù)的問(wèn)題。這是周期性生成水位線的最簡(jiǎn)單的場(chǎng)景。簡(jiǎn)單來(lái)說(shuō),就是直接拿當(dāng)前最大的時(shí)間戳作為水位線就可以了

WatermarkStrategy.forMonotonousTimestamps()

(2)亂序流

由于亂序流中需要等待遲到數(shù)據(jù)到齊,所以必須設(shè)置一個(gè)固定量的延遲時(shí)間

WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness)

這個(gè)方法需要傳入一個(gè) maxOutOfOrderness 參數(shù),表示“最大亂序程度”,它表示數(shù)據(jù)流中亂序數(shù)據(jù)時(shí)間戳的最大差值;

4、自定義水位線

兩種:

  • 周期性水位線生成器:周期性調(diào)用的方法onPeriodicEmit()中發(fā)出水位線
  • 斷點(diǎn)式水位線生成器:在事件觸發(fā)的方法onEvent()中發(fā)出水位線

(1)周期性水位線生成器(Periodic Generator)

周期性生成器一般是通過(guò) onEvent()觀察判斷輸入的事件,而在 onPeriodicEmit()里發(fā)出水位線。

public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
    @Override
    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        SerializableTimestampAssigner<Event> timestampAssigner = new SerializableTimestampAssigner<Event>(){

            @Override
            public long extractTimestamp(Event event, long recordTimestamp) {
                return event.getTimestamp();
            }
        };
        return timestampAssigner;
    }

    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        WatermarkGenerator<Event> watermarkGenerator = new WatermarkGenerator<Event>(){
            // 延遲
            private Long delayTime = 5000L;
            // 觀察到的最大時(shí)間戳,這里+ delayTime + 1L是為了防止溢出
            private Long maxTs = Long.MIN_VALUE + delayTime + 1L;

            @Override
            public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
                // 更新最大時(shí)間戳
                maxTs = Math.max(event.getTimestamp(), maxTs);
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 發(fā)射水位線,默認(rèn) 200ms 調(diào)用一次
                watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
            }
        };
        return watermarkGenerator;
    }
}

(2)斷點(diǎn)式水位生成器

斷點(diǎn)式生成器會(huì)不停地檢測(cè) onEvent()中的事件,當(dāng)發(fā)現(xiàn)帶有水位線信息的特殊事件時(shí), 就立即發(fā)出水位線。

public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
    @Override
    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        SerializableTimestampAssigner<Event> timestampAssigner = new SerializableTimestampAssigner<Event>(){

            @Override
            public long extractTimestamp(Event event, long recordTimestamp) {
                return event.getTimestamp();
            }
        };
        return timestampAssigner;
    }

    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        WatermarkGenerator<Event> watermarkGenerator = new WatermarkGenerator<Event>(){


            @Override
            public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
                // 只有在遇到特定的 itemId 時(shí),才發(fā)出水位線
                if (event.user.equals("Mary")) {
                    watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));
                }

            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 不需要做任何事情
            }
        };
        return watermarkGenerator;
    }
}
5、在自定義數(shù)據(jù)源中發(fā)送水位線

我們也可以在自定義的數(shù)據(jù)源中抽取事件時(shí)間,然后發(fā)送水位線。這里要注意的是,在自定義數(shù)據(jù)源中發(fā)送了水位線以后,就不能再在程序中使用assignTimestampsAndWatermarks 方法來(lái)生成水位線了。在自定義數(shù)據(jù)源中生成水位線和在程序中使用 assignTimestampsAndWatermarks 方法生成水位線二者只能取其一。

public class ClickSource extends RichParallelSourceFunction<Event> {
    // 聲明一個(gè)布爾變量,作為控制數(shù)據(jù)生成的標(biāo)識(shí)位
    private Boolean running = true;

    Random random = new Random();
    @Override
    public void run(SourceContext ctx) throws Exception {
        // 在指定的數(shù)據(jù)集中隨機(jī)選取數(shù)據(jù)

        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home",	"./cart",	"./fav",	"./prod?id=1", "./prod?id=2"};

        while (Boolean.TRUE.equals(running)){
            Event event = new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis());

            ctx.collectWithTimestamp(event, event.getTimestamp());
            ctx.emitWatermark(new Watermark(event.timestamp - 1L));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        System.out.println("open" + getRuntimeContext().getIndexOfThisSubtask());
    }

    @Override
    public void close() throws Exception {
        super.close();
        System.out.println("close" + getRuntimeContext().getIndexOfThisSubtask());
    }
}
6.2.4 水位線的傳遞

我們知道水位線是數(shù)據(jù)流中插入的一個(gè)標(biāo)記,用來(lái)表示事件時(shí)間的進(jìn)展,它會(huì)隨著數(shù)據(jù)一起在任務(wù)間傳遞。如果只是直通式(forward)的傳輸,那很簡(jiǎn)單,數(shù)據(jù)和水位線都是按照本身的順序依次傳遞、依次處理的;一旦水位線到達(dá)了算子任務(wù), 那么這個(gè)任務(wù)就會(huì)將它內(nèi)部的時(shí)鐘設(shè)為這個(gè)水位線的時(shí)間戳。

在這里,“任務(wù)的時(shí)鐘”其實(shí)仍然是各自為政的,并沒(méi)有統(tǒng)一的時(shí)鐘。實(shí)際應(yīng)用中往往上下游都有多個(gè)并行子任務(wù),為了統(tǒng)一推進(jìn)事件時(shí)間的進(jìn)展,我們要求上游任務(wù)處理完水位線、時(shí)鐘改變之后,要把當(dāng)前的水位線再次發(fā)出,廣播給所有的下游子任務(wù)。這樣,后續(xù)任務(wù)就不需要依賴原始數(shù)據(jù)中的時(shí)間戳(經(jīng)過(guò)轉(zhuǎn)化處理后,數(shù)據(jù)可能已經(jīng)改變了),也可以知道當(dāng)前事件時(shí)間了

可是還有另外一個(gè)問(wèn)題,那就是在“重分區(qū)”(redistributing)的傳輸模式下,一個(gè)任務(wù)有可能會(huì)收到來(lái)自不同分區(qū)上游子任務(wù)的數(shù)據(jù)。而不同分區(qū)的子任務(wù)時(shí)鐘并不同步,所以同一時(shí)刻發(fā)給下游任務(wù)的水位線可能并不相同。這時(shí)下游任務(wù)又該聽(tīng)誰(shuí)的呢?

這就要回到水位線定義的本質(zhì)了:它表示的是“當(dāng)前時(shí)間之前的數(shù)據(jù),都已經(jīng)到齊了”。這是一種保證,告訴下游任務(wù)“只要你接到這個(gè)水位線,就代表之后我不會(huì)再給你發(fā)更早的數(shù)據(jù)了,你可以放心做統(tǒng)計(jì)計(jì)算而不會(huì)遺漏數(shù)據(jù)”。所以如果一個(gè)任務(wù)收到了來(lái)自上游并行任務(wù)的不同的水位線,說(shuō)明上游各個(gè)分區(qū)處理得有快有慢,進(jìn)度各不相同比如上游有兩個(gè)并行子任務(wù)都發(fā)來(lái)了水位線,一個(gè)是 5 秒,一個(gè)是 7 秒;這代表第一個(gè)并行任務(wù)已經(jīng)處理完 5 秒之前的

所有數(shù)據(jù),而第二個(gè)并行任務(wù)處理到了 7 秒。那這時(shí)自己的時(shí)鐘怎么確定呢?當(dāng)然也要以“這之前的數(shù)據(jù)全部到齊”為標(biāo)準(zhǔn)。如果我們以較大的水位線 7 秒作為當(dāng)前時(shí)間,那就表示“7 秒前的數(shù)據(jù)都已經(jīng)處理完”,這顯然不是事實(shí)——第一個(gè)上游分區(qū)才處理到 5 秒,5~7 秒的數(shù)據(jù)還會(huì)不停地發(fā)來(lái);而如果以最小的水位線 5 秒作為當(dāng)前時(shí)鐘就不會(huì)有這個(gè)問(wèn)題了,因?yàn)榇_實(shí)所

有上游分區(qū)都已經(jīng)處理完,不會(huì)再發(fā) 5 秒前的數(shù)據(jù)了。這讓我們想到“木桶原理”:所有的上游并行任務(wù)就像圍成木桶的一塊塊木板,它們中最短的那一塊,決定了我們桶中的水位。

《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口,# Flink,flink,學(xué)習(xí),筆記

我們可以用一個(gè)具體的例子,將水位線在任務(wù)間傳遞的過(guò)程完整梳理一遍。如圖 6-12 所示,當(dāng)前任務(wù)的上游,有四個(gè)并行子任務(wù),所以會(huì)接收到來(lái)自四個(gè)分區(qū)的水位線;而下游有三個(gè)并行子任務(wù),所以會(huì)向三個(gè)分區(qū)發(fā)出水位線。具體過(guò)程如下:

(1)上游并行子任務(wù)發(fā)來(lái)不同的水位線,當(dāng)前任務(wù)會(huì)為每一個(gè)分區(qū)設(shè)置一個(gè)“分區(qū)水位線”(Partition Watermark),這是一個(gè)分區(qū)時(shí)鐘;而當(dāng)前任務(wù)自己的時(shí)鐘,就是所有分區(qū)時(shí)鐘里最小的那個(gè)。

(2)當(dāng)有一個(gè)新的水位線(第一分區(qū)的 4)從上游傳來(lái)時(shí),當(dāng)前任務(wù)會(huì)首先更新對(duì)應(yīng)的分區(qū)時(shí)鐘;然后再次判斷所有分區(qū)時(shí)鐘中的最小值,如果比之前大,說(shuō)明事件時(shí)間有了進(jìn)展,當(dāng)前任務(wù)的時(shí)鐘也就可以更新了。這里要注意,更新后的任務(wù)時(shí)鐘,并不一定是新來(lái)的那個(gè)分區(qū)水位線,比如這里改變的是第一分區(qū)的時(shí)鐘,但最小的分區(qū)時(shí)鐘是第三分區(qū)的 3,于是當(dāng)前任務(wù)時(shí)鐘就推進(jìn)到了 3。當(dāng)時(shí)鐘有進(jìn)展時(shí),當(dāng)前任務(wù)就會(huì)將自己的時(shí)鐘以水位線的形式,廣播給下游所有子任務(wù)。

(3)再次收到新的水位線(第二分區(qū)的 7)后,執(zhí)行同樣的處理流程。首先將第二個(gè)分區(qū)時(shí)鐘更新為 7,然后比較所有分區(qū)時(shí)鐘;發(fā)現(xiàn)最小值沒(méi)有變化,那么當(dāng)前任務(wù)的時(shí)鐘也不變,也不會(huì)向下游任務(wù)發(fā)出水位線。

(4)同樣道理,當(dāng)又一次收到新的水位線(第三分區(qū)的 6)之后,第三個(gè)分區(qū)時(shí)鐘更新為6,同時(shí)所有分區(qū)時(shí)鐘最小值變成了第一分區(qū)的 4,所以當(dāng)前任務(wù)的時(shí)鐘推進(jìn)到 4,并發(fā)出時(shí)間戳為 4 的水位線,廣播到下游各個(gè)分區(qū)任務(wù)。水位線在上下游任務(wù)之間的傳遞,非常巧妙地避免了分布式系統(tǒng)中沒(méi)有統(tǒng)一時(shí)鐘的問(wèn)題, 每個(gè)任務(wù)都以“處理完之前所有數(shù)據(jù)”為標(biāo)準(zhǔn)來(lái)確定自己的時(shí)鐘,就可以保證窗口處理的結(jié)果總是正確的。對(duì)于有多條流合并之后進(jìn)行處理的場(chǎng)景,水位線傳遞的規(guī)則是類似的。

備注:

水位線的默認(rèn)計(jì)算公式:水位線 = 觀察到的最大事件時(shí)間 – 最大延遲時(shí)間 – 1 毫秒

在數(shù)據(jù)流開(kāi)始之前,F(xiàn)link 會(huì)插入一個(gè)大小是負(fù)無(wú)窮大(在 Java 中是-Long.MAX_VALUE) 的水位線,而在數(shù)據(jù)流結(jié)束時(shí),F(xiàn)link 會(huì)插入一個(gè)正無(wú)窮大(Long.MAX_VALUE)的水位線,保證所有的窗口閉合以及所有的定時(shí)器都被觸發(fā)。對(duì)于離線數(shù)據(jù)集,F(xiàn)link 也會(huì)將其作為流讀入,也就是一條數(shù)據(jù)一條數(shù)據(jù)的讀取。在這種情況下,F(xiàn)link 對(duì)于離線數(shù)據(jù)集,只會(huì)插入兩次水位線,也就是在最開(kāi)始處插入負(fù)無(wú)窮大的水位線,在結(jié)束位置插入一個(gè)正無(wú)窮大的水位線。因?yàn)橹恍枰迦雰纱嗡痪€,就可以保證計(jì)算的正確,無(wú)需在數(shù)據(jù)流的中間插入水位線了

6.3 窗口

我們已經(jīng)了解了 Flink 中事件時(shí)間和水位線的概念,那它們有什么具體應(yīng)用呢?當(dāng)然是做基于時(shí)間的處理計(jì)算了。其中最常見(jiàn)的場(chǎng)景,就是窗口聚合計(jì)算。

之前我們已經(jīng)了解了 Flink 中基本的聚合操作。在流處理中,我們往往需要面對(duì)的是連續(xù)不斷、無(wú)休無(wú)止的無(wú)界流,不可能等到所有所有數(shù)據(jù)都到齊了才開(kāi)始處理。所以聚合計(jì)算其實(shí)只能針對(duì)當(dāng)前已有的數(shù)據(jù)——之后再有數(shù)據(jù)到來(lái),就需要繼續(xù)疊加、再次輸出結(jié)果。這樣似乎很“實(shí)時(shí)”,但現(xiàn)實(shí)中大量數(shù)據(jù)一般會(huì)同時(shí)到來(lái),需要并行處理,這樣頻繁地更新結(jié)果就會(huì)給系統(tǒng)帶來(lái)很大負(fù)擔(dān)了。

更加高效的做法是,把無(wú)界流進(jìn)行切分,每一段數(shù)據(jù)分別進(jìn)行聚合,結(jié)果只輸出一次。這就相當(dāng)于將無(wú)界流的聚合轉(zhuǎn)化為了有界數(shù)據(jù)集的聚合,這就是所謂的“窗口”(Window)聚合操作。窗口聚合其實(shí)是對(duì)實(shí)時(shí)性和處理效率的一個(gè)權(quán)衡。在實(shí)際應(yīng)用中,我們往往更關(guān)心一段時(shí)間內(nèi)數(shù)據(jù)的統(tǒng)計(jì)結(jié)果,比如在過(guò)去的 1 分鐘內(nèi)有多少用戶點(diǎn)擊了網(wǎng)頁(yè)。在這種情況下,我們就可以定義一個(gè)窗口,收集最近一分鐘內(nèi)的所有用戶點(diǎn)擊數(shù)據(jù),然后進(jìn)行聚合統(tǒng)計(jì),最終輸出一個(gè)結(jié)果就可以了。

6.3.1 窗口的概念

所以在 Flink 中,窗口其實(shí)并不是一個(gè)“框”,流進(jìn)來(lái)的數(shù)據(jù)被框住了就只能進(jìn)這一個(gè)窗口。相比之下,我們應(yīng)該把窗口理解成一個(gè)“桶”,如圖 6-15 所示。在 Flink 中,窗口可以把流切割成有限大小的多個(gè)“存儲(chǔ)桶”(bucket);每個(gè)數(shù)據(jù)都會(huì)分發(fā)到對(duì)應(yīng)的桶中,當(dāng)?shù)竭_(dá)窗口結(jié)束時(shí)間時(shí),就對(duì)每個(gè)桶中收集的數(shù)據(jù)進(jìn)行計(jì)算處理。

《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口,# Flink,flink,學(xué)習(xí),筆記

我們可以梳理一下事件時(shí)間語(yǔ)義下,之前例子中窗口的處理過(guò)程:

(1)第一個(gè)數(shù)據(jù)時(shí)間戳為 2,判斷之后創(chuàng)建第一個(gè)窗口[0, 10),并將 2 秒數(shù)據(jù)保存進(jìn)去;

(2)后續(xù)數(shù)據(jù)依次到來(lái),時(shí)間戳均在 [0, 10)范圍內(nèi),所以全部保存進(jìn)第一個(gè)窗口;

(3)11 秒數(shù)據(jù)到來(lái),判斷它不屬于[0, 10)窗口,所以創(chuàng)建第二個(gè)窗口[10, 20),并將 11秒的數(shù)據(jù)保存進(jìn)去。由于水位線設(shè)置延遲時(shí)間為 2 秒,所以現(xiàn)在的時(shí)鐘是 9 秒,第一個(gè)窗口也沒(méi)有到關(guān)閉時(shí)間;

(4)之后又有 9 秒數(shù)據(jù)到來(lái),同樣進(jìn)入[0, 10)窗口中;

(5)12 秒數(shù)據(jù)到來(lái),判斷屬于[10, 20)窗口,保存進(jìn)去。這時(shí)產(chǎn)生的水位線推進(jìn)到了 10 秒,所以 [0, 10)窗口應(yīng)該關(guān)閉了。第一個(gè)窗口收集到了所有的 7 個(gè)數(shù)據(jù),進(jìn)行處理計(jì)算后輸出結(jié)果,并將窗口關(guān)閉銷(xiāo)毀;

(6)同樣的,之后的數(shù)據(jù)依次進(jìn)入第二個(gè)窗口,遇到 20 秒的數(shù)據(jù)時(shí)會(huì)創(chuàng)建第三個(gè)窗口[20,

30)并將數(shù)據(jù)保存進(jìn)去;遇到 22 秒數(shù)據(jù)時(shí),水位線達(dá)到了 20 秒,第二個(gè)窗口觸發(fā)計(jì)算,輸出結(jié)果并關(guān)閉。

這里需要注意的是,F(xiàn)link 中窗口并不是靜態(tài)準(zhǔn)備好的,而是動(dòng)態(tài)創(chuàng)建——當(dāng)有落在這個(gè)窗口區(qū)間范圍的數(shù)據(jù)達(dá)到時(shí),才創(chuàng)建對(duì)應(yīng)的窗口。另外,這里我們認(rèn)為到達(dá)窗口結(jié)束時(shí)間時(shí), 窗口就觸發(fā)計(jì)算并關(guān)閉,事實(shí)上“觸發(fā)計(jì)算”和“窗口關(guān)閉”兩個(gè)行為也可以分開(kāi)

其實(shí)就是水位線只是告知不會(huì)再收到某個(gè)時(shí)間點(diǎn)后面的數(shù)據(jù)了。比如來(lái)了個(gè)水位線W(10),說(shuō)明不會(huì)再收到小于10的數(shù)據(jù)了,盡管已經(jīng)收到了事件時(shí)間大于10的數(shù)據(jù)了,比如上圖收到12、11。但是統(tǒng)計(jì)的時(shí)候是按照事件時(shí)間去統(tǒng)計(jì)的。w(10)來(lái)的時(shí)候會(huì)把事件時(shí)間落在[0,10)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)。

6.3.2 窗口的分類
1、按照驅(qū)動(dòng)類型分類
(1)時(shí)間窗口

就是按照時(shí)間段去截取數(shù)據(jù)

時(shí)間窗口類:TimeWindow

(2)計(jì)數(shù)窗口

計(jì)數(shù)窗口基于元素的個(gè)數(shù)來(lái)截取數(shù)據(jù)

為什么不把窗口區(qū)間定義成左開(kāi)右閉、包含上結(jié)束時(shí)間呢?這樣maxTimestamp 跟 end 一致,不就可以省去一個(gè)方法的定義嗎?

答:這主要是為了方便判斷窗口什么時(shí)候關(guān)閉。對(duì)于事件時(shí)間語(yǔ)義,窗口的關(guān)閉需要水位線推進(jìn)到窗口的結(jié)束時(shí)間;而我們知道,水位線 Watermark(t)代表的含義是“時(shí)間戳小于等于 t 的數(shù)據(jù)都已到齊,不會(huì)再來(lái)了”。為了簡(jiǎn)化分析,我們先不考慮亂序流設(shè)置的延遲時(shí)間。那么當(dāng)新到一個(gè)時(shí)間戳為 t 的數(shù)據(jù)時(shí),當(dāng)前水位線的時(shí)間推進(jìn)到了 t – 1(還記得亂序流里生成水位線的減一操作嗎?)。所以當(dāng)時(shí)間戳為 end 的數(shù)據(jù)到來(lái)時(shí),水位線推進(jìn)到了 end - 1;如果我們把窗口定義為不包含 end,那么當(dāng)前的水位線剛好就是 maxTimestamp,表示窗口能夠包含的數(shù)據(jù)都已經(jīng)到齊,我們就可以直接關(guān)閉窗口了。所以有了這樣的定義,我們就不需要再去考慮那煩人的“減一”了,直接看到時(shí)間戳為 end 的數(shù)據(jù),就關(guān)閉對(duì)應(yīng)的窗口。如果為亂序流設(shè)置了水位線延遲時(shí)間 delay,也只需要等到時(shí)間戳為 end + delay 的數(shù)據(jù),就可以關(guān)窗了。

2、按照窗口分配數(shù)據(jù)的規(guī)則分類
(1)滾動(dòng)窗口

按照固定大?。梢允枪潭ǖ臅r(shí)間間隔或者是固定的數(shù)據(jù)數(shù)量),對(duì)數(shù)據(jù)進(jìn)行劃分,窗口間沒(méi)有重疊。

(2)滑動(dòng)窗口

滑動(dòng)窗口的大小也是固定的。區(qū)別在于,窗口之間并不是首尾相接的, 而是可以“錯(cuò)開(kāi)”一定的位置。如果看作一個(gè)窗口的運(yùn)動(dòng),那么就像是向前小步“滑動(dòng)”一樣。同樣可以基于時(shí)間和計(jì)數(shù)。

(3)會(huì)話窗口

會(huì)話窗口顧名思義,是基于“會(huì)話”(session)來(lái)來(lái)對(duì)數(shù)據(jù)進(jìn)行分組的。這里的會(huì)話類似 Web 應(yīng)用中 session 的概念,不過(guò)并不表示兩端的通訊過(guò)程,而是借用會(huì)話超時(shí)失效的機(jī)制來(lái)描述窗口。簡(jiǎn)單來(lái)說(shuō),就是數(shù)據(jù)來(lái)了之后就開(kāi)啟一個(gè)會(huì)話窗口,如果接下來(lái)還有數(shù)據(jù)陸續(xù)到來(lái), 那么就一直保持會(huì)話;如果一段時(shí)間一直沒(méi)收到數(shù)據(jù),那就認(rèn)為會(huì)話超時(shí)失效,窗口自動(dòng)關(guān)閉。這就好像我們打電話一樣,如果時(shí)不時(shí)總能說(shuō)點(diǎn)什么,那說(shuō)明還沒(méi)聊完;如果陷入了尷尬的沉默,半天都沒(méi)話說(shuō),那自然就可以掛電話了。

與滑動(dòng)窗口和滾動(dòng)窗口不同,會(huì)話窗口只能基于時(shí)間來(lái)定義,而沒(méi)有“會(huì)話計(jì)數(shù)窗口”的概念。這很好理解,“會(huì)話”終止的標(biāo)志就是“隔一段時(shí)間沒(méi)有數(shù)據(jù)來(lái)”,如果不依賴時(shí)間而改成個(gè)數(shù),就成了“隔幾個(gè)數(shù)據(jù)沒(méi)有數(shù)據(jù)來(lái)”,這完全是自相矛盾的說(shuō)法。

而同樣是基于這個(gè)判斷標(biāo)準(zhǔn),這“一段時(shí)間”到底是多少就很重要了,必須明確指定。對(duì)于會(huì)話窗口而言,最重要的參數(shù)就是這段時(shí)間的長(zhǎng)度(size),它表示會(huì)話的超時(shí)時(shí)間,也就是兩個(gè)會(huì)話窗口之間的最小距離。如果相鄰兩個(gè)數(shù)據(jù)到來(lái)的時(shí)間間隔(Gap)小于指定的大小

(size),那說(shuō)明還在保持會(huì)話,它們就屬于同一個(gè)窗口;如果 gap 大于 size,那么新來(lái)的數(shù)據(jù)就應(yīng)該屬于新的會(huì)話窗口,而前一個(gè)窗口就應(yīng)該關(guān)閉了。在具體實(shí)現(xiàn)上,我們可以設(shè)置靜態(tài)固定的大小(size),也可以通過(guò)一個(gè)自定義的提取器(gap extractor)動(dòng)態(tài)提取最小間隔 gap 的值。

考慮到事件時(shí)間語(yǔ)義下的亂序流,這里又會(huì)有一些麻煩。相鄰兩個(gè)數(shù)據(jù)的時(shí)間間隔 gap 大于指定的 size,我們認(rèn)為它們屬于兩個(gè)會(huì)話窗口,前一個(gè)窗口就關(guān)閉;可在數(shù)據(jù)亂序的情況下,可能會(huì)有遲到數(shù)據(jù),它的時(shí)間戳剛好是在之前的兩個(gè)數(shù)據(jù)之間的。這樣一來(lái),之前我們判斷的間隔中就不是“一直沒(méi)有數(shù)據(jù)”,而縮小后的間隔有可能會(huì)比 size 還要小——這代表三個(gè)數(shù)據(jù)本來(lái)應(yīng)該屬于同一個(gè)會(huì)話窗口。

所以在 Flink 底層,對(duì)會(huì)話窗口的處理會(huì)比較特殊:每來(lái)一個(gè)新的數(shù)據(jù),都會(huì)創(chuàng)建一個(gè)新的會(huì)話窗口;然后判斷已有窗口之間的距離,如果小于給定的 size,就對(duì)它們進(jìn)行合并(merge) 操作。在 Window 算子中,對(duì)會(huì)話窗口會(huì)有單獨(dú)的處理邏輯。

我們可以看到,與前兩種窗口不同,會(huì)話窗口的長(zhǎng)度不固定,起始和結(jié)束時(shí)間也是不確定的,各個(gè)分區(qū)之間窗口沒(méi)有任何關(guān)聯(lián)。如圖,會(huì)話窗口之間一定是不會(huì)重疊的,而且會(huì)留有至少為 size 的間隔(session gap)。

(4)全局窗口

? 這種窗口全局有效,會(huì)把相同 key 的所有數(shù)據(jù)都分配到同一個(gè)窗口中;說(shuō)直白一點(diǎn),就跟沒(méi)分窗口一樣。無(wú)界流的數(shù)據(jù)永無(wú)止盡,所以這種窗口也沒(méi)有結(jié)束的時(shí)候,默認(rèn)是不會(huì)做觸發(fā)計(jì)算的。如果希望它能對(duì)數(shù)據(jù)進(jìn)行計(jì)算處理, 還需要自定義“觸發(fā)器”(Trigger)。

6.3.3 窗口API概覽
1、按鍵分區(qū)和非按鍵分區(qū)

在定義窗口操作之前,需要確定是基于按鍵分區(qū)還是非按鍵分區(qū)數(shù)據(jù)流上開(kāi)窗。

(1)按鍵分區(qū)

stream.keyBy(...).window(...);

(2)非按鍵分區(qū)

stream.windowAll(...)
2、代碼中窗口API的調(diào)用

窗口操作主要有兩個(gè)部分:窗口分配器(Window Assigners)和窗口函數(shù)(Window Functions)

stream.keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(<window function>)

.window()方法需要傳入一個(gè)窗口分配器,它指明了窗口的類型;而后面的.aggregate() 方法傳入一個(gè)窗口函數(shù)作為參數(shù),它用來(lái)定義窗口具體的處理邏輯。

**窗口分配器:**指定用哪一種窗口,時(shí)間 or 計(jì)數(shù)?滑動(dòng)、滾動(dòng)、會(huì)話?…

**窗口函數(shù):**對(duì)窗口的數(shù)據(jù)的計(jì)算邏輯

6.3.4 窗口分配器

定義窗口分配器(Window Assigners)是構(gòu)建窗口算子的第一步,它的作用就是定義數(shù)據(jù)應(yīng)該被“分配”到哪個(gè)窗口。

1、時(shí)間窗口
(1)滾動(dòng)處理時(shí)間窗口
stream.keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(...)

這里.of()方法需要傳入一個(gè) Time 類型的參數(shù) size,表示滾動(dòng)窗口的大小,我們這里創(chuàng)建了一個(gè)長(zhǎng)度為 5 秒的滾動(dòng)窗口。

另外,.of()還有一個(gè)重載方法,可以傳入兩個(gè) Time 類型的參數(shù):size 和 offset。第一個(gè)參數(shù)當(dāng)然還是窗口大小,第二個(gè)參數(shù)則表示窗口起始點(diǎn)的偏移量。這里需要多做一些解釋:對(duì)于我們之前的定義,滾動(dòng)窗口其實(shí)只有一個(gè) size 是不能唯一確定的。比如我們定義 1 天的滾動(dòng)窗口,從每天的 0 點(diǎn)開(kāi)始計(jì)時(shí)是可以的,統(tǒng)計(jì)的就是一個(gè)自然日的所有數(shù)據(jù);而如果從每天的

凌晨 2 點(diǎn)開(kāi)始計(jì)時(shí)其實(shí)也完全沒(méi)問(wèn)題,只不過(guò)統(tǒng)計(jì)的數(shù)據(jù)變成了每天 2 點(diǎn)到第二天 2 點(diǎn)。這個(gè)起始點(diǎn)的選取,其實(shí)對(duì)窗口本身的類型沒(méi)有影響;而為了方便應(yīng)用,默認(rèn)的起始點(diǎn)時(shí)間戳是窗口大小的整倍數(shù)。也就是說(shuō),如果我們定義 1 天的窗口,默認(rèn)就從 0 點(diǎn)開(kāi)始;如果定義 1 小時(shí)的窗口,默認(rèn)就從整點(diǎn)開(kāi)始。而如果我們非要不從這個(gè)默認(rèn)值開(kāi)始,那就可以通過(guò)設(shè)置偏移量offset 來(lái)調(diào)整。

這里讀者可能會(huì)覺(jué)得奇怪:這個(gè)功能好像沒(méi)什么用,非要弄個(gè)偏移量不是給自己找別扭嗎?這其實(shí)是有實(shí)際用途的。我們知道,不同國(guó)家分布在不同的時(shí)區(qū)。標(biāo)準(zhǔn)時(shí)間戳其實(shí)就是

1970 年 1 月 1 日 0 時(shí) 0 分 0 秒 0 毫秒開(kāi)始計(jì)算的一個(gè)毫秒數(shù),而這個(gè)時(shí)間是以 UTC 時(shí)間,也就是 0 時(shí)區(qū)(倫敦時(shí)間)為標(biāo)準(zhǔn)的。我們所在的時(shí)區(qū)是東八區(qū),也就是 UTC+8,跟 UTC 有 8 小時(shí)的時(shí)差。我們定義 1 天滾動(dòng)窗口時(shí),如果用默認(rèn)的起始點(diǎn),那么得到就是倫敦時(shí)間每天 0

點(diǎn)開(kāi)啟窗口,這時(shí)是北京時(shí)間早上 8 點(diǎn)。那怎樣得到北京時(shí)間每天 0 點(diǎn)開(kāi)啟的滾動(dòng)窗口呢?只要設(shè)置-8 小時(shí)的偏移量就可以了:

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
(2)滑動(dòng)處理時(shí)間窗口
stream.keyBy(...)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(...)
(3)處理時(shí)間會(huì)話窗口

靜態(tài)會(huì)話超時(shí)時(shí)間會(huì)話窗口

stream.keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)

動(dòng)態(tài)會(huì)話超時(shí)時(shí)間的會(huì)話窗口

.window(ProcessingTimeSessionWindows.withDynamicGap(
    new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
        @Override
        public long extract(Tuple2<String, Long> element) {
        // 提取 session gap 值返回, 單位毫秒
        return element.f0.length() * 1000;
        }
    }
))
(4)滾動(dòng)事件時(shí)間窗口
stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .aggregate(...)
(5)滑動(dòng)事件時(shí)間窗口
stream.keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(...)
(6)事件時(shí)間會(huì)話窗口
stream.keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)
2、計(jì)數(shù)窗口
(1)滾動(dòng)計(jì)數(shù)窗口
stream.keyBy(...).countWindow(10)
(2)滑動(dòng)計(jì)數(shù)窗口
stream.keyBy(...).countWindow(10)
3、全局窗口

全局窗口是計(jì)數(shù)窗口的底層實(shí)現(xiàn),一般在需要自定義窗口時(shí)使用。需要注意使用全局窗口,必須自行定義觸發(fā)器才能實(shí)現(xiàn)窗口計(jì)算,否則起不到任何作用。

stream.keyBy(...).window(GlobalWindows.create());
6.3.5 窗口函數(shù)

定義了窗口分配器,我們只是知道了數(shù)據(jù)屬于哪個(gè)窗口,可以將數(shù)據(jù)收集起來(lái)了;至于收集起來(lái)到底要做什么,其實(shí)還完全沒(méi)有頭緒。所以在窗口分配器之后,必須再接上一個(gè)定義窗口如何進(jìn)行計(jì)算的操作,這就是所謂的“窗口函數(shù)”(window functions)。

經(jīng)窗口分配器處理之后,數(shù)據(jù)可以分配到對(duì)應(yīng)的窗口中,而數(shù)據(jù)流經(jīng)過(guò)轉(zhuǎn)換得到的數(shù)據(jù)類型是 WindowedStream。這個(gè)類型并不是 DataStream,所以并不能直接進(jìn)行其他轉(zhuǎn)換,而必須進(jìn)一步調(diào)用窗口函數(shù),對(duì)收集到的數(shù)據(jù)進(jìn)行處理計(jì)算之后,才能最終再次得到 DataStream

窗口函數(shù)根據(jù)處理的方式可以分為兩類:增量聚合函數(shù)和全窗口函數(shù)

1、增量聚合函數(shù)

窗口將數(shù)據(jù)收集起來(lái),最基本的處理操作當(dāng)然就是進(jìn)行聚合。窗口對(duì)無(wú)限流的切分,可以看作得到了一個(gè)有界數(shù)據(jù)集。如果我們等到所有數(shù)據(jù)都收集齊,在窗口到了結(jié)束時(shí)間要輸出結(jié)果的一瞬間再去進(jìn)行聚合,顯然就不夠高效了——這相當(dāng)于真的在用批處理的思路來(lái)做實(shí)時(shí)流處理。

為了提高實(shí)時(shí)性,我們可以再次將流處理的思路發(fā)揚(yáng)光大:就像 DataStream 的簡(jiǎn)單聚合一樣,每來(lái)一條數(shù)據(jù)就立即進(jìn)行計(jì)算,中間只要保持一個(gè)簡(jiǎn)單的聚合狀態(tài)就可以了;區(qū)別只是在于不立即輸出結(jié)果,而是要等到窗口結(jié)束時(shí)間。等到窗口到了結(jié)束時(shí)間需要輸出計(jì)算結(jié)果的時(shí)候,我們只需要拿出之前聚合的狀態(tài)直接輸出,這無(wú)疑就大大提高了程序運(yùn)行的效率和實(shí)時(shí)性。

典型的增量聚合函數(shù)有兩個(gè):ReduceFunction 和 AggregateFunction。

(1)歸約函數(shù)

? 就是將中間結(jié)果和新來(lái)的數(shù)據(jù)兩兩歸約

? 窗口函數(shù)中也提供了 ReduceFunction:只要基于 WindowedStream 調(diào)用.reduce()方法,然后傳入 ReduceFunction 作為參數(shù),就可以指定以歸約兩個(gè)元素的方式去對(duì)窗口中數(shù)據(jù)進(jìn)行聚合了。

 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(1);

     SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
         WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
             @Override
             public long extractTimestamp(Event event, long l) {
                 return event.getTimestamp();
             }
         }));

     stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
         @Override
         public Tuple2<String, Long> map(Event event) throws Exception {
             return Tuple2.of(event.url, 1L);
         }
     })
         .keyBy(r->r.f0)
         .window(TumblingEventTimeWindows.of(Time.seconds(5)))
         .reduce(new MyReduceFunction()).print();

     env.execute();
 }


// 自定義歸約函數(shù)
public class MyReduceFunction implements ReduceFunction<Tuple2<String, Long>> {
    @Override
    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {

        // 定義累加規(guī)則
        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    }
}

ReduceFunction 可以解決大多數(shù)歸約聚合的問(wèn)題,但是這個(gè)接口有一個(gè)限制,就是聚合狀態(tài)的類型、輸出結(jié)果的類型都必須和輸入數(shù)據(jù)類型一樣。

(2)聚合函數(shù)

在有些情況下,還需要對(duì)狀態(tài)進(jìn)行進(jìn)一步處理才能得到輸出結(jié)果,這時(shí)它們的類型可能不同,使用 ReduceFunction 就會(huì)非常麻煩

例如,如果我們希望計(jì)算一組數(shù)據(jù)的平均值,應(yīng)該怎樣做聚合呢?很明顯,這時(shí)我們需要計(jì)算兩個(gè)狀態(tài)量:數(shù)據(jù)的總和(sum),以及數(shù)據(jù)的個(gè)數(shù)(count),而最終輸出結(jié)果是兩者的商

(sum/count)。如果用 ReduceFunction,那么我們應(yīng)該先把數(shù)據(jù)轉(zhuǎn)換成二元組(sum, count)的形式,然后進(jìn)行歸約聚合,最后再將元組的兩個(gè)元素相除轉(zhuǎn)換得到最后的平均值。本來(lái)應(yīng)該只是一個(gè)任務(wù),可我們卻需要 map-reduce-map 三步操作,這顯然不夠高效。

于是自然可以想到,如果取消類型一致的限制,讓輸入數(shù)據(jù)、中間狀態(tài)、輸出結(jié)果三者類型都可以不同,不就可以一步直接搞定了嗎?

Flink 的 Window API 中的 aggregate 就提供了這樣的操作。直接基于 WindowedStream 調(diào)用.aggregate() 方法, 就可以定義更加靈活的窗口聚合操作。這個(gè)方法需要傳入一個(gè)

AggregateFunction 的實(shí)現(xiàn)類作為參數(shù)。AggregateFunction 在源碼中的定義如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,這里有三種類型:輸入類型

(IN)、累加器類型(ACC)和輸出類型(OUT)。輸入類型 IN 就是輸入流中元素的數(shù)據(jù)類型;累加器類型 ACC 則是我們進(jìn)行聚合的中間狀態(tài)類型;而輸出類型當(dāng)然就是最終計(jì)算結(jié)果的類型了。

接口中有四個(gè)方法:

  • createAccumulator():創(chuàng)建一個(gè)累加器,這就是為聚合創(chuàng)建了一個(gè)初始狀態(tài),每個(gè)聚合任務(wù)只會(huì)調(diào)用一次

  • add():將輸入的元素添加到累加器中。這就是基于聚合狀態(tài),對(duì)新來(lái)的數(shù)據(jù)進(jìn)行進(jìn)一步聚合的過(guò)程。方法傳入兩個(gè)參數(shù):當(dāng)前新到的數(shù)據(jù) value,和當(dāng)前的累加器accumulator;返回一個(gè)新的累加器值,也就是對(duì)聚合狀態(tài)進(jìn)行更新。每條數(shù)據(jù)到來(lái)之后都會(huì)調(diào)用這個(gè)方法

  • getResult():從累加器中提取聚合的輸出結(jié)果。也就是說(shuō),我們可以定義多個(gè)狀態(tài), 然后再基于這些聚合的狀態(tài)計(jì)算出一個(gè)結(jié)果進(jìn)行輸出。比如之前我們提到的計(jì)算平均值,就可以把 sum 和 count 作為狀態(tài)放入累加器,而在調(diào)用這個(gè)方法時(shí)相除得到最終結(jié)果。這個(gè)方法只在窗口要輸出結(jié)果時(shí)調(diào)用

  • merge():合并兩個(gè)累加器,并將合并后的狀態(tài)作為一個(gè)累加器返回。這個(gè)方法只在需要合并窗口的場(chǎng)景下才會(huì)被調(diào)用;最常見(jiàn)的合并窗口(Merging Window)的場(chǎng)景就是會(huì)話窗口(Session Windows)

所以可以看到,AggregateFunction 的工作原理是:首先調(diào)用 createAccumulator()為任務(wù)初始化一個(gè)狀態(tài)(累加器);而后每來(lái)一個(gè)數(shù)據(jù)就調(diào)用一次 add()方法,對(duì)數(shù)據(jù)進(jìn)行聚合,得到的結(jié)果保存在狀態(tài)中;等到了窗口需要輸出時(shí),再調(diào)用 getResult()方法得到計(jì)算結(jié)果。很明顯, 與 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于輸入、中間狀態(tài)、輸出的類型可以不同,使得應(yīng)用更加靈活方便。

下面來(lái)看一個(gè)具體例子。我們知道,在電商網(wǎng)站中,PV(頁(yè)面瀏覽量)和 UV(獨(dú)立訪客數(shù))是非常重要的兩個(gè)流量指標(biāo)。一般來(lái)說(shuō),PV 統(tǒng)計(jì)的是所有的點(diǎn)擊量;而對(duì)用戶 id 進(jìn)行去重之后,得到的就是 UV。所以有時(shí)我們會(huì)用 PV/UV 這個(gè)比值,來(lái)表示“人均重復(fù)訪問(wèn)量”,也就是平均每個(gè)用戶會(huì)訪問(wèn)多少次頁(yè)面,這在一定程度上代表了用戶的粘度

代碼略

代碼中我們創(chuàng)建了事件時(shí)間滑動(dòng)窗口,統(tǒng)計(jì) 10 秒鐘的“人均 PV”,每 2 秒統(tǒng)計(jì)一次。由于聚合的狀態(tài)還需要做處理計(jì)算,因此窗口聚合時(shí)使用了更加靈活的 AggregateFunction。為了統(tǒng)計(jì) UV,我們用一個(gè) HashSet 保存所有出現(xiàn)過(guò)的用戶 id,實(shí)現(xiàn)自動(dòng)去重;而 PV 的統(tǒng)計(jì)則類似一個(gè)計(jì)數(shù)器,每來(lái)一個(gè)數(shù)據(jù)加一就可以了。所以這里的狀態(tài),定義為包含一個(gè) HashSet 和一個(gè) count 值的二元組(Tuple2<HashSet, Long>),每來(lái)一條數(shù)據(jù),就將 user 存入 HashSet,同時(shí) count 加 1。這里的 count 就是 PV,而 HashSet 中元素的個(gè)數(shù)(size)就是 UV;所以最終窗口的輸出結(jié)果,就是它們的比值。

這里沒(méi)有涉及會(huì)話窗口,所以 merge()方法可以不做任何操作。

另外,F(xiàn)link 也為窗口的聚合提供了一系列預(yù)定義的簡(jiǎn)單聚合方法, 可以直接基于

WindowedStream 調(diào)用。主要包括.sum()/max()/maxBy()/min()/minBy(),與 KeyedStream 的簡(jiǎn)單聚合非常相似。它們的底層,其實(shí)都是通過(guò) AggregateFunction 來(lái)實(shí)現(xiàn)的。

通過(guò) ReduceFunction 和 AggregateFunction 我們可以發(fā)現(xiàn),增量聚合函數(shù)其實(shí)就是在用流處理的思路來(lái)處理有界數(shù)據(jù)集,核心是保持一個(gè)聚合狀態(tài),當(dāng)數(shù)據(jù)到來(lái)時(shí)不停地更新?tīng)顟B(tài)。這就是 Flink 所謂的“有狀態(tài)的流處理”,通過(guò)這種方式可以極大地提高程序運(yùn)行的效率,所以在實(shí)際應(yīng)用中最為常見(jiàn)。

2、全窗口函數(shù)

窗口操作中的另一大類就是全窗口函數(shù)。與增量聚合函數(shù)不同,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù),并在內(nèi)部緩存起來(lái),等到窗口要輸出結(jié)果的時(shí)候再取出數(shù)據(jù)進(jìn)行計(jì)算。

很明顯,這就是典型的批處理思路了——先攢數(shù)據(jù),等一批都到齊了再正式啟動(dòng)處理流程。這樣做毫無(wú)疑問(wèn)是低效的:因?yàn)榇翱谌康挠?jì)算任務(wù)都積壓在了要輸出結(jié)果的那一瞬間,而在之前收集數(shù)據(jù)的漫長(zhǎng)過(guò)程中卻無(wú)所事事。這就好比平時(shí)不用功,到考試之前通宵抱佛腳,肯定不如把工夫花在日常積累上。

那為什么還需要有全窗口函數(shù)呢?這是因?yàn)橛行﹫?chǎng)景下,我們要做的計(jì)算必須基于全部的數(shù)據(jù)才有效,這時(shí)做增量聚合就沒(méi)什么意義了;另外,輸出的結(jié)果有可能要包含上下文中的一些信息(比如窗口的起始時(shí)間),這是增量聚合函數(shù)做不到的。所以,我們還需要有更豐富的窗口計(jì)算方式,這就可以用全窗口函數(shù)來(lái)實(shí)現(xiàn)。

在 Flink 中,全窗口函數(shù)也有兩種:WindowFunction 和 ProcessWindowFunction。

(1)窗口函數(shù)

我們可以基于 WindowedStream 調(diào)用.apply()方法,傳入一個(gè) WindowFunction 的實(shí)現(xiàn)類。

stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

這個(gè)類中可以獲取到包含窗口所有數(shù)據(jù)的可迭代集合( Iterable),還可以拿到窗口(Window)本身的信息。

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
	void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

當(dāng)窗口到達(dá)結(jié)束時(shí)間需要觸發(fā)計(jì)算時(shí),就會(huì)調(diào)用這里的 apply 方法。我們可以從 input 集合中取出窗口收集的數(shù)據(jù),結(jié)合 key 和 window 信息,通過(guò)收集器(Collector)輸出結(jié)果。這里 Collector 的用法,與 FlatMapFunction 中相同。

不過(guò)我們也看到了,WindowFunction 能提供的上下文信息較少,也沒(méi)有更高級(jí)的功能。事實(shí)上,它的作用可以被 ProcessWindowFunction 全覆蓋,所以之后可能會(huì)逐漸棄用。一般在實(shí)際應(yīng)用,直接使用 ProcessWindowFunction 就可以了。

(2)處理窗口函數(shù)

ProcessWindowFunction 是 Window API 中最底層的通用窗口函數(shù)接口。之所以說(shuō)它“最底層”,是因?yàn)槌丝梢阅玫酱翱谥械乃袛?shù)據(jù)之外,ProcessWindowFunction 還可以獲取到一個(gè)“上下文對(duì)象”(Context)。這個(gè)上下文對(duì)象非常強(qiáng)大,不僅能夠獲取窗口信息,還可以訪問(wèn)當(dāng)前的時(shí)間和狀態(tài)信息。這里的時(shí)間就包括了處理時(shí)間(processing time)和事件時(shí)間水位線(event time watermark)。這就使得 ProcessWindowFunction 更加靈活、功能更加豐富。事實(shí)上,ProcessWindowFunction 是 Flink 底層 API——處理函數(shù)(process function)中的一員,關(guān)于處理函數(shù)我們會(huì)在后續(xù)章節(jié)展開(kāi)講解。

當(dāng)然,這些好處是以犧牲性能和資源為代價(jià)的。作為一個(gè)全窗口函數(shù),ProcessWindowFunction 同樣需要將所有數(shù)據(jù)緩存下來(lái)、等到窗口觸發(fā)計(jì)算時(shí)才使用。它其實(shí)就是一個(gè)增強(qiáng)版的 WindowFunction。

具體使用跟 WindowFunction 非常類似,我們可以基于 WindowedStream 調(diào)用.process()方法,傳入一個(gè) ProcessWindowFunction 的實(shí)現(xiàn)類。下面是一個(gè)電商網(wǎng)站統(tǒng)計(jì)每小時(shí) UV 的例子

代碼略
3、增量聚合和全窗口函數(shù)的結(jié)合使用

這樣調(diào)用的處理機(jī)制是:基于第一個(gè)參數(shù)(增量聚合函數(shù))來(lái)處理窗口數(shù)據(jù),每來(lái)一個(gè)數(shù)據(jù)就做一次聚合;等到窗口需要觸發(fā)計(jì)算時(shí),則調(diào)用第二個(gè)參數(shù)(全窗口函數(shù))的處理邏輯輸出結(jié)果。需要注意的是,這里的全窗口函數(shù)就不再緩存所有數(shù)據(jù)了,而是直接將增量聚合函數(shù)的結(jié)果拿來(lái)當(dāng)作了 Iterable 類型的輸入。一般情況下,這時(shí)的可迭代集合中就只有一個(gè)元素了。

窗口處理的主體還是增量聚合,而引入全窗口函數(shù)又可以獲取到更多的信息包裝輸出

// ReduceFunction 與 WindowFunction 結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
    
// ReduceFunction 與 ProcessWindowFunction 結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T>	reduceFunction,	ProcessWindowFunction<T,	R,	K,	W> function)
    
// AggregateFunction 與 WindowFunction 結(jié)合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
    
// AggregateFunction 與 ProcessWindowFunction 結(jié)合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

代碼略
6.3.6 測(cè)試水位線和窗口的使用
代碼略
6.3.7 其它API
1、觸發(fā)器

用來(lái)控制窗口什么時(shí)候觸發(fā)計(jì)算

2、移除器

主要用來(lái)定義移除某些數(shù)據(jù)的邏輯

3、允許延遲

可以為窗口算子設(shè)置一個(gè)“允許的最大延遲”(Allowed Lateness)。也就是說(shuō),我們可以設(shè)定允許延遲一段時(shí)間,在這段時(shí)間內(nèi),窗口不會(huì)銷(xiāo)毀,繼續(xù)到來(lái)的數(shù)據(jù)依然可以進(jìn)入窗口中并觸發(fā)計(jì)算。直到水位線推進(jìn)到了 窗口結(jié)束時(shí)間 + 延遲時(shí)間,才真正將窗口的內(nèi)容清空,正式關(guān)閉窗口。

4、將遲到的數(shù)據(jù)放到側(cè)輸出流

我們自然會(huì)想到,即使可以設(shè)置窗口的延遲時(shí)間,終歸還是有限的,后續(xù)的數(shù)據(jù)還是會(huì)被丟棄。如果不想丟棄任何一個(gè)數(shù)據(jù),又該怎么做呢?

Flink 還提供了另外一種方式處理遲到數(shù)據(jù)。我們可以將未收入窗口的遲到數(shù)據(jù),放入“側(cè)輸出流”(side output)進(jìn)行另外的處理。所謂的側(cè)輸出流,相當(dāng)于是數(shù)據(jù)流的一個(gè)“分支”,這個(gè)流中單獨(dú)放置那些錯(cuò)過(guò)了該上的車(chē)、本該被丟棄的數(shù)據(jù)。

6.3.8 窗口的生命周期
1、窗口的創(chuàng)建

窗口的類型和基本信息由窗口分配器(window assigners)指定,但窗口不會(huì)預(yù)先創(chuàng)建好, 而是由數(shù)據(jù)驅(qū)動(dòng)創(chuàng)建。當(dāng)?shù)谝粋€(gè)應(yīng)該屬于這個(gè)窗口的數(shù)據(jù)元素到達(dá)時(shí),就會(huì)創(chuàng)建對(duì)應(yīng)的窗口。

2、窗口計(jì)算的觸發(fā)

除了窗口分配器,每個(gè)窗口還會(huì)有自己的窗口函數(shù)(window functions)和觸發(fā)器(trigger)。窗口函數(shù)可以分為增量聚合函數(shù)和全窗口函數(shù),主要定義了窗口中計(jì)算的邏輯;而觸發(fā)器則是指定調(diào)用窗口函數(shù)的條件。

對(duì)于不同的窗口類型,觸發(fā)計(jì)算的條件也會(huì)不同。例如,一個(gè)滾動(dòng)事件時(shí)間窗口,應(yīng)該在水位線到達(dá)窗口結(jié)束時(shí)間的時(shí)候觸發(fā)計(jì)算,屬于“定點(diǎn)發(fā)車(chē)”;而一個(gè)計(jì)數(shù)窗口,會(huì)在窗口中元素?cái)?shù)量達(dá)到定義大小時(shí)觸發(fā)計(jì)算,屬于“人滿就發(fā)車(chē)”。所以 Flink 預(yù)定義的窗口類型都有對(duì)應(yīng)內(nèi)置的觸發(fā)器。

對(duì)于事件時(shí)間窗口而言,除去到達(dá)結(jié)束時(shí)間的“定點(diǎn)發(fā)車(chē)”,還有另一種情形。當(dāng)我們?cè)O(shè)置了允許延遲,那么如果水位線超過(guò)了窗口結(jié)束時(shí)間、但還沒(méi)有到達(dá)設(shè)定的最大延遲時(shí)間,這期間內(nèi)到達(dá)的遲到數(shù)據(jù)也會(huì)觸發(fā)窗口計(jì)算。這類似于沒(méi)有準(zhǔn)時(shí)趕上班車(chē)的人又追上了車(chē),這時(shí)車(chē)要再次???、開(kāi)門(mén),將新的數(shù)據(jù)整合統(tǒng)計(jì)進(jìn)來(lái)。

3、窗口的銷(xiāo)毀

一般情況下,當(dāng)時(shí)間達(dá)到了結(jié)束點(diǎn),就會(huì)直接觸發(fā)計(jì)算輸出結(jié)果、進(jìn)而清除狀態(tài)銷(xiāo)毀窗口。這時(shí)窗口的銷(xiāo)毀可以認(rèn)為和觸發(fā)計(jì)算是同一時(shí)刻。這里需要注意, Flink 中只對(duì)時(shí)間窗口(TimeWindow)有銷(xiāo)毀機(jī)制;由于計(jì)數(shù)窗口(CountWindow)是基于全局窗口(GlobalWindw) 實(shí)現(xiàn)的,而全局窗口不會(huì)清除狀態(tài),所以就不會(huì)被銷(xiāo)毀。

在特殊的場(chǎng)景下,窗口的銷(xiāo)毀和觸發(fā)計(jì)算會(huì)有所不同。事件時(shí)間語(yǔ)義下,如果設(shè)置了允許延遲,那么在水位線到達(dá)窗口結(jié)束時(shí)間時(shí),仍然不會(huì)銷(xiāo)毀窗口;窗口真正被完全刪除的時(shí)間點(diǎn), 是窗口的結(jié)束時(shí)間加上用戶指定的允許延遲時(shí)間。

4、窗口API調(diào)用總結(jié)

Window API 首先按照時(shí)候按鍵分區(qū)分成兩類。keyBy 之后的 KeyedStream,可以調(diào)用.window()方法聲明按鍵分區(qū)窗口(Keyed Windows);而如果不做 keyBy,DataStream 也可以直接調(diào)用.windowAll()聲明非按鍵分區(qū)窗口。之后的方法調(diào)用就完全一樣了。

接下來(lái)首先是通過(guò).window()/.windowAll()方法定義窗口分配器,得到 WindowedStream; 然 后 通 過(guò) 各 種 轉(zhuǎn) 換 方 法 ( reduce/aggregate/apply/process ) 給 出 窗 口 函 數(shù)

(ReduceFunction/AggregateFunction/ProcessWindowFunction),定義窗口的具體計(jì)算處理邏輯, 轉(zhuǎn)換之后重新得到 DataStream。這兩者必不可少,是窗口算子(WindowOperator)最重要的組成部分。

此外,在這兩者之間,還可以基于 WindowedStream 調(diào)用.trigger()自定義觸發(fā)器、調(diào)用.evictor()定義移除器、調(diào)用.allowedLateness()指定允許延遲時(shí)間、調(diào)用.sideOutputLateData() 將遲到數(shù)據(jù)寫(xiě)入側(cè)輸出流,這些都是可選的 API,一般不需要實(shí)現(xiàn)。而如果定義了側(cè)輸出流, 可以基于窗口聚合之后的 DataStream 調(diào)用.getSideOutput()獲取側(cè)輸出流。

6.4 遲到的數(shù)據(jù)處理

對(duì)于亂序流,水位線本身就可以設(shè)置一個(gè)延遲時(shí)間;而做窗口計(jì)算時(shí),我們又可以設(shè)置窗口的允許延遲時(shí)間;另外窗口還有將遲到數(shù)據(jù)輸出到測(cè)輸出流的用法。所有的這些方法,它們之間有什么關(guān)系,我們又該怎樣合理利用呢?

6.4.1 設(shè)置水位線延遲時(shí)間

水位線是事件時(shí)間的進(jìn)展,它是我們整個(gè)應(yīng)用的全局邏輯時(shí)鐘。水位線生成之后,會(huì)隨著數(shù)據(jù)在任務(wù)間流動(dòng),從而給每個(gè)任務(wù)指明當(dāng)前的事件時(shí)間。所以從這個(gè)意義上講,水位線是一個(gè)覆蓋萬(wàn)物的存在,它并不只針對(duì)事件時(shí)間窗口有效。

之前我們講到觸發(fā)器時(shí)曾提到過(guò)“定時(shí)器”,時(shí)間窗口的操作底層就是靠定時(shí)器來(lái)控制觸發(fā)的。既然是底層機(jī)制,定時(shí)器自然就不可能是窗口的專利了;事實(shí)上它是 Flink 底層 API——處理函數(shù)(process function)的重要部分。

所以水位線其實(shí)是所有事件時(shí)間定時(shí)器觸發(fā)的判斷標(biāo)準(zhǔn)。那么水位線的延遲,當(dāng)然也就是全局時(shí)鐘的滯后,相當(dāng)于是上帝撥動(dòng)了琴弦,所有人的表都變慢了。

既然水位線這么重要,那一般情況就不應(yīng)該把它的延遲設(shè)置得太大,否則流處理的實(shí)時(shí)性就會(huì)大大降低。因?yàn)樗痪€的延遲主要是用來(lái)對(duì)付分布式網(wǎng)絡(luò)傳輸導(dǎo)致的數(shù)據(jù)亂序,而網(wǎng)絡(luò)傳輸?shù)膩y序程度一般并不會(huì)很大,大多集中在幾毫秒至幾百毫秒。所以實(shí)際應(yīng)用中,我們往往會(huì)給水位線設(shè)置一個(gè)“能夠處理大多數(shù)亂序數(shù)據(jù)的小延遲”,視需求一般設(shè)在毫秒~秒級(jí)。

當(dāng)我們?cè)O(shè)置了水位線延遲時(shí)間后,所有定時(shí)器就都會(huì)按照延遲后的水位線來(lái)觸發(fā)。如果一個(gè)數(shù)據(jù)所包含的時(shí)間戳,小于當(dāng)前的水位線,那么它就是所謂的“遲到數(shù)據(jù)”

6.4.2 允許窗口處理遲到數(shù)據(jù)

水位線延遲設(shè)置的比較小,那之后如果仍有數(shù)據(jù)遲到該怎么辦?對(duì)于窗口計(jì)算而言,如果水位線已經(jīng)到了窗口結(jié)束時(shí)間,默認(rèn)窗口就會(huì)關(guān)閉,那么之后再來(lái)的數(shù)據(jù)就要被丟棄了。

自然想到,F(xiàn)link 的窗口也是可以設(shè)置延遲時(shí)間,允許繼續(xù)處理遲到數(shù)據(jù)的。

這種情況下,由于大部分亂序數(shù)據(jù)已經(jīng)被水位線的延遲等到了,所以往往遲到的數(shù)據(jù)不會(huì)太多。這樣,我們會(huì)在水位線到達(dá)窗口結(jié)束時(shí)間時(shí),先快速地輸出一個(gè)近似正確的計(jì)算結(jié)果; 然后保持窗口繼續(xù)等到延遲數(shù)據(jù),每來(lái)一條數(shù)據(jù),窗口就會(huì)再次計(jì)算,并將更新后的結(jié)果輸出。這樣就可以逐步修正計(jì)算結(jié)果,最終得到準(zhǔn)確的統(tǒng)計(jì)值了。

類比班車(chē)的例子,我們可以這樣理解:大多數(shù)人是在發(fā)車(chē)時(shí)刻前后到達(dá)的,所以我們只要把表調(diào)慢,稍微等一會(huì)兒,絕大部分人就都上車(chē)了,這個(gè)把表調(diào)慢的時(shí)間就是水位線的延遲; 到點(diǎn)之后,班車(chē)就準(zhǔn)時(shí)出發(fā)了,不過(guò)可能還有該來(lái)的人沒(méi)趕上。于是我們就先慢慢往前開(kāi),這段時(shí)間內(nèi),如果遲到的人抓點(diǎn)緊還是可以追上的;如果有人追上來(lái)了,就停車(chē)開(kāi)門(mén)讓他上來(lái), 然后車(chē)?yán)^續(xù)向前開(kāi)。當(dāng)然我們的車(chē)不能一直慢慢開(kāi),需要有一個(gè)時(shí)間限制,這就是窗口的允許延遲時(shí)間。一旦超過(guò)了這個(gè)時(shí)間,班車(chē)就不再停留,開(kāi)上高速疾馳而去了。

所以我們將水位線的延遲和窗口的允許延遲數(shù)據(jù)結(jié)合起來(lái),最后的效果就是先快速實(shí)時(shí)地輸出一個(gè)近似的結(jié)果,而后再不斷調(diào)整,最終得到正確的計(jì)算結(jié)果?;叵肓魈幚淼陌l(fā)展過(guò)程, 這不就是著名的 Lambda 架構(gòu)嗎?原先需要兩套獨(dú)立的系統(tǒng)來(lái)同時(shí)保證實(shí)時(shí)性和結(jié)果的最終正確性,如今 Flink 一套系統(tǒng)就全部搞定了

6.4.3 將遲到數(shù)據(jù)放到窗口側(cè)輸出流

即使我們有了前面的雙重保證,可窗口不能一直等下去,最后總要真正關(guān)閉。窗口一旦關(guān)閉,后續(xù)的數(shù)據(jù)就都要被丟棄了。那如果真的還有漏網(wǎng)之魚(yú)又該怎么辦呢?

那就要用到最后一招了:用窗口的側(cè)輸出流來(lái)收集關(guān)窗以后的遲到數(shù)據(jù)。這種方式是最后

“兜底”的方法,只能保證數(shù)據(jù)不丟失;因?yàn)榇翱谝呀?jīng)真正關(guān)閉,所以是無(wú)法基于之前窗口的結(jié)果直接做更新的。我們只能將之前的窗口計(jì)算結(jié)果保存下來(lái),然后獲取側(cè)輸出流中的遲到數(shù)據(jù),判斷數(shù)據(jù)所屬的窗口,手動(dòng)對(duì)結(jié)果進(jìn)行合并更新。盡管有些煩瑣,實(shí)時(shí)性也不夠強(qiáng),但能夠保證最終結(jié)果一定是正確的。

如果還用趕班車(chē)來(lái)類比,那就是車(chē)已經(jīng)上高速開(kāi)走了,這班車(chē)是肯定趕不上了。不過(guò)我們還留下了行進(jìn)路線和聯(lián)系方式,遲到的人如果想辦法輾轉(zhuǎn)到了目的地,還是可以和大部隊(duì)會(huì)合的。最終,所有該到的人都會(huì)在目的地出現(xiàn)。

所以總結(jié)起來(lái),F(xiàn)link 處理遲到數(shù)據(jù),對(duì)于結(jié)果的正確性有三重保障:水位線的延遲,窗口允許遲到數(shù)據(jù),以及將遲到數(shù)據(jù)放入窗口側(cè)輸出流。我們可以回憶一下之前 6.3.5 小節(jié)統(tǒng)計(jì)每個(gè) url 瀏覽次數(shù)的代碼 UrlViewCountExample,稍作改進(jìn),增加處理遲到數(shù)據(jù)的功能。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-679569.html

代碼略

到了這里,關(guān)于《Flink學(xué)習(xí)筆記》——第六章 Flink的時(shí)間和窗口的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink中的時(shí)間和窗口操作

    Flink中的時(shí)間和窗口操作

    本專欄案例代碼和數(shù)據(jù)集鏈接: https://download.csdn.net/download/shangjg03/88477960 在大多數(shù)場(chǎng)景下,我們需要統(tǒng)計(jì)的數(shù)據(jù)流都是無(wú)界的,因此我們無(wú)法等待整個(gè)數(shù)據(jù)流終止后才進(jìn)行統(tǒng)計(jì)。通常情況下,我們只需要對(duì)某個(gè)時(shí)間范圍或者數(shù)量范圍內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析:如每隔五分鐘統(tǒng)計(jì)

    2024年02月08日
    瀏覽(32)
  • Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    Flink-【時(shí)間語(yǔ)義、窗口、水位線】

    ??:可樂(lè) 可樂(lè)的生產(chǎn)日期?= 事件時(shí)間(可樂(lè)產(chǎn)生的時(shí)間); 可樂(lè)被喝的時(shí)間 = 處理時(shí)間(可樂(lè)被處理【喝掉=處理】的時(shí)間)。 機(jī)器時(shí)間:可能不準(zhǔn)確(例如:A可樂(lè)廠的時(shí)鐘比較慢,B可樂(lè)廠的時(shí)鐘比較快,但實(shí)際上B產(chǎn)生可樂(lè)的時(shí)間比A產(chǎn)生可樂(lè)的時(shí)間慢,卻被先處理了)

    2024年02月01日
    瀏覽(20)
  • 【Apache Flink】基于時(shí)間和窗口的算子-配置時(shí)間特性

    【Apache Flink】基于時(shí)間和窗口的算子-配置時(shí)間特性

    Apache Flink 它提供了多種類型的時(shí)間和窗口概念,使得用戶能夠進(jìn)行準(zhǔn)確的時(shí)間計(jì)算。在數(shù)據(jù)處理任務(wù)中,時(shí)間的概念是非常重要的,對(duì)于一些復(fù)雜的實(shí)時(shí)流處理任務(wù),如事件按時(shí)間順序的聚合、分割和窗口計(jì)算,時(shí)間更是關(guān)鍵所在。而在這類任務(wù)中,選擇使用何種時(shí)間特性是

    2024年02月08日
    瀏覽(16)
  • flink時(shí)間窗口無(wú)新的數(shù)據(jù)進(jìn)來(lái)最后一個(gè)窗口不關(guān)閉

    測(cè)試反饋, 配置的flink任務(wù)提交上去后, 輸入數(shù)據(jù)源符合條件,到時(shí)間窗口的size。最后一個(gè)窗口沒(méi)有閉窗計(jì)算,數(shù)據(jù)并沒(méi)及時(shí)輸出告警 經(jīng)過(guò)調(diào)試發(fā)現(xiàn),watermark沒(méi)有向后繼續(xù)推進(jìn),導(dǎo)致無(wú)法閉窗, watermark的時(shí)間取的是數(shù)據(jù)中的業(yè)務(wù)時(shí)間,create_time。 因?yàn)闆](méi)有后續(xù)數(shù)據(jù)進(jìn)來(lái),

    2024年02月13日
    瀏覽(23)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時(shí)間滾動(dòng)動(dòng)窗口

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時(shí)間滾動(dòng)動(dòng)窗口

    在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開(kāi)始處理。當(dāng)然我們可以每來(lái)一個(gè)消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過(guò)去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。在這種情況下,我們必須定義一個(gè)窗口,用來(lái)收集

    2024年02月11日
    瀏覽(22)
  • Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

    Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

    窗口 window 是處理無(wú)限流的核心就是把無(wú)界的數(shù)據(jù)流,按照一定的規(guī)則劃分成一段一段的有界的數(shù)據(jù)流(桶),然后再這個(gè)有界的數(shù)據(jù)流里面去做計(jì)算; 2.1 滾動(dòng)窗口 相鄰窗口之間是沒(méi)有數(shù)據(jù)重合 window 大小可以是時(shí)間,可以是數(shù)據(jù)長(zhǎng)度 按照數(shù)據(jù)流是否可以是 keyed , 在分類,nonkey windo

    2024年02月09日
    瀏覽(26)
  • Qt第六章 多窗口編程

    Qt第六章 多窗口編程

    QMessageBox繼承自QDialog ,是一個(gè)Qt內(nèi)置的用來(lái)展示 信息或詢問(wèn)用戶一個(gè)問(wèn)題的模態(tài)對(duì)話框。 預(yù)設(shè)了四種類型: 像那些已經(jīng)寫(xiě)好的窗口,這些現(xiàn)成的東西都會(huì)有一些特性,就是他們的對(duì)象都不需要new或者說(shuō)他們就不需要拿到對(duì)象,他們?yōu)榱朔奖阄覀兪褂脮?huì)用一個(gè)靜態(tài)成員函數(shù)就

    2024年02月07日
    瀏覽(20)
  • 《計(jì)算機(jī)網(wǎng)絡(luò):自頂向下方法》學(xué)習(xí)筆記——第六章:鏈路層

    《計(jì)算機(jī)網(wǎng)絡(luò):自頂向下方法》學(xué)習(xí)筆記——第六章:鏈路層

    兩種截然不同類型的鏈路層信道 廣播信道 :這種信道用于連接有線局域網(wǎng)、衛(wèi)星網(wǎng)和混合光纖同軸電纜接入網(wǎng)中的多臺(tái)主機(jī)。 點(diǎn)對(duì)點(diǎn)通信鏈路 :這在諸如長(zhǎng)距離鏈路連接的兩臺(tái)路由器之間,或用戶辦公室計(jì)算機(jī)與它們所連接的鄰近以太網(wǎng)交換機(jī)之間等場(chǎng)合經(jīng)常能夠發(fā)現(xiàn)。

    2024年02月03日
    瀏覽(27)
  • 【UnityShader入門(mén)精要學(xué)習(xí)筆記】第六章(1)Unity中的基礎(chǔ)光照

    【UnityShader入門(mén)精要學(xué)習(xí)筆記】第六章(1)Unity中的基礎(chǔ)光照

    本系列為作者學(xué)習(xí)UnityShader入門(mén)精要而作的筆記,內(nèi)容將包括: 書(shū)本中句子照抄 + 個(gè)人批注 項(xiàng)目源碼 一堆新手會(huì)犯的錯(cuò)誤 潛在的太監(jiān)斷更,有始無(wú)終 總之適用于同樣開(kāi)始學(xué)習(xí)Shader的同學(xué)們進(jìn)行有取舍的參考。 一個(gè)物體為什么看起來(lái)是紅色的?從物理上解釋是因?yàn)檫@個(gè)物體

    2024年03月22日
    瀏覽(34)
  • 8 分鐘看完這 7000+ 字,F(xiàn)link 時(shí)間窗口和時(shí)間語(yǔ)義這對(duì)好朋友你一定搞得懂!外送窗口計(jì)算和水印一并搞懂?。?!

    目錄 一、時(shí)間語(yǔ)義 時(shí)間窗口 1. 前摘: 1.1 Flink的時(shí)間和窗口 1.2 什么是時(shí)間窗口和時(shí)間語(yǔ)義呢? 2. 時(shí)間窗口 2.1 舉個(gè)例子: 2.2 3個(gè)實(shí)時(shí)數(shù)據(jù)計(jì)算場(chǎng)景 3. 時(shí)間語(yǔ)義 二、Flink上進(jìn)行窗口計(jì)算: 1. 一個(gè)Flink窗口應(yīng)用的大致骨架結(jié)構(gòu) 2. Flink窗口的骨架結(jié)構(gòu)中有兩個(gè)必須的兩個(gè)操作:

    2024年01月23日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包