《Flink 架構(gòu)》系列(已完結(jié)),共包含以下 6 篇文章:
- Flink 架構(gòu)(一):系統(tǒng)架構(gòu)
- Flink 架構(gòu)(二):數(shù)據(jù)傳輸
- Flink 架構(gòu)(三):事件時(shí)間處理
- Flink 架構(gòu)(四):狀態(tài)管理
- Flink 架構(gòu)(五):檢查點(diǎn) Checkpoint(看完即懂)
- Flink 架構(gòu)(六):保存點(diǎn) Savepoint
?? 如果您覺得這篇文章有用 ?? 的話,請(qǐng)給博主一個(gè)一鍵三連 ?????? 吧 (點(diǎn)贊 ??、關(guān)注 ??、收藏 ??)?。?!您的支持 ?????? 將激勵(lì) ?? 博主輸出更多優(yōu)質(zhì)內(nèi)容?。。?/p>
在之前的博客中,我們強(qiáng)調(diào)了時(shí)間語義對(duì)于流處理應(yīng)用的重要性并解釋了 處理時(shí)間 和 事件時(shí)間 的差異。雖然處理時(shí)間是基于處理機(jī)器的本地時(shí)間,相對(duì)容易理解,但它會(huì)產(chǎn)生一些較為隨意、不一致且無法重現(xiàn)的結(jié)果。相反,事件時(shí)間語義會(huì)生成可重現(xiàn)且一致性的結(jié)果,這也是很多流處理用例的剛性需求。但和基于處理時(shí)間語義的應(yīng)用相比,基于事件時(shí)間的應(yīng)用需要一些額外的配置。此外,相比純粹使用處理時(shí)間的引擎,支持事件時(shí)間的流處理引擎內(nèi)部要更加復(fù)雜。
Flink 不僅針對(duì)常見的事件時(shí)間操作提供了直觀易用的原語,還支持一些表達(dá)能力很強(qiáng)的 API,允許使用者以自定義算子的方式實(shí)現(xiàn)更高級(jí)的事件時(shí)間處理應(yīng)用。在面對(duì)這些高級(jí)應(yīng)用時(shí),充分理解 Flink 內(nèi)部事件處理機(jī)制通常會(huì)有所幫助,有時(shí)候更是必要的。在《流處理基礎(chǔ)概念(二):時(shí)間語義(處理時(shí)間、事件時(shí)間、水位線)》一文中,我們介紹了 Flink 在提供處理時(shí)間語義時(shí)所采用的兩個(gè)概念:記錄時(shí)間戳 和 水位線。接下面我們會(huì)介紹 Flink 內(nèi)部如何實(shí)現(xiàn)和處理時(shí)間戳及水位線以支持事件時(shí)間語義的流式應(yīng)用。
1.時(shí)間戳
在事件時(shí)間模式下,F(xiàn)link 流式應(yīng)用處理的所有記錄都必須包含時(shí)間戳。時(shí)間戳將記錄和特定時(shí)間點(diǎn)進(jìn)行關(guān)聯(lián),這些時(shí)間點(diǎn)通常是記錄所對(duì)應(yīng)事件的發(fā)生時(shí)間。但實(shí)際上應(yīng)用可以自由選擇時(shí)間戳的含義,只要保證流記錄的時(shí)間戳?xí)S著數(shù)據(jù)流的前進(jìn)大致遞增即可。正如前文所述,基本上所有現(xiàn)實(shí)應(yīng)用場(chǎng)景都會(huì)出現(xiàn)一定程度的時(shí)間戳亂序。
當(dāng) Flink 以事件時(shí)間模式處理數(shù)據(jù)流時(shí),會(huì)根據(jù)記錄的時(shí)間戳觸發(fā)時(shí)間相關(guān)算子的計(jì)算。例如,時(shí)間窗口算子會(huì)根據(jù)記錄關(guān)聯(lián)的時(shí)間戳將其分配到窗口中。Flink 內(nèi)部采用 8 字節(jié)的 Long
值對(duì)時(shí)間戳進(jìn)行編碼,并將它們以元數(shù)據(jù)(metadata
)的形式附加在記錄上。內(nèi)置算子會(huì)將這個(gè) Long
值解析為毫秒精度的 Unix
時(shí)間戳(自 1970-01-01-00:00:00.000
以來的毫秒數(shù))。但自定義算子可以有自己的時(shí)間戳解析機(jī)制,如將精度調(diào)整為微秒。
2.水位線
除了記錄的時(shí)間戳,F(xiàn)link 基于事件時(shí)間的應(yīng)用還必須提供 水位線(watermark
)。水位線用于在事件時(shí)間應(yīng)用中推斷每個(gè)任務(wù)當(dāng)前的事件時(shí)間?;跁r(shí)間的算子會(huì)使用這個(gè)時(shí)間來觸發(fā)計(jì)算并推動(dòng)進(jìn)度前進(jìn)。例如:基于時(shí)間窗口的任務(wù)會(huì)在其事件時(shí)間超過窗口結(jié)束邊界時(shí)進(jìn)行最終的窗口計(jì)算并發(fā)出結(jié)果。
當(dāng)一個(gè)算子接收到時(shí)間為 T 的水位線,就可以認(rèn)為不會(huì)再收到任何時(shí)間戳小于或等于 T 的事件了。水位線無論對(duì)于事件時(shí)間窗口還是處理亂序事件的算子都很關(guān)鍵。算子一旦收到某個(gè)水位線,就相當(dāng)于接收到信號(hào):某個(gè)特定時(shí)間區(qū)間的時(shí)間戳已經(jīng)到齊,可以觸發(fā)窗口計(jì)算或?qū)邮盏臄?shù)據(jù)進(jìn)行排序了。
在 Flink 中,水位線是利用一些包含 Long
值時(shí)間戳的特殊記錄來實(shí)現(xiàn)的。如上圖所示,它們像帶有額外時(shí)間戳的常規(guī)記錄一樣在數(shù)據(jù)流中移動(dòng)。
水位線擁有兩個(gè)基本屬性:
- 必須單調(diào)遞增。這是為了確保任務(wù)中的事件時(shí)間時(shí)鐘正確前進(jìn),不會(huì)倒退。
- 和記錄的時(shí)間戳存在聯(lián)系。一個(gè)時(shí)間戳為 T 的水位線表示,接下來所有記錄的時(shí)間戳一定都大于 T。
第二個(gè)屬性可用來處理數(shù)據(jù)流中時(shí)間戳亂序的記錄,例如上圖中的時(shí)間戳為 3 和 5 的記錄。對(duì)基于時(shí)間的算子任務(wù)而言,其收集和處理的記錄可能會(huì)包含亂序的時(shí)間戳。這些算子只有當(dāng)自己的事件時(shí)間時(shí)鐘(由接收的水位線驅(qū)動(dòng))指示不必再等那些包含相關(guān)時(shí)間戳的記錄時(shí),才會(huì)最終觸發(fā)計(jì)算。當(dāng)任務(wù)收到一個(gè)違反水位線屬性,即時(shí)間戳小于或等于前一個(gè)水位線的記錄時(shí),該記錄本應(yīng)參與的計(jì)算可能已經(jīng)完成。我們稱此類記錄為 遲到記錄(late record
)。為了處理遲到記錄,F(xiàn)link 提供了不同的機(jī)制,我們將在后續(xù)討論它們。
水位線的意義之一在于它允許應(yīng)用控制結(jié)果的完整性和延遲。如果水位線和記錄的時(shí)間戳非常接近,那結(jié)果的處理延遲就會(huì)很低,因?yàn)槿蝿?wù)無須等待過多記錄就可以觸發(fā)最終計(jì)算。但同時(shí)結(jié)果的完整性可能會(huì)受影響,因?yàn)榭赡苡胁糠窒嚓P(guān)記錄被視為遲到記錄,沒能參與運(yùn)算。相反,非常 “保守” 的水位線會(huì)增加處理延遲,但同時(shí)結(jié)果的完整性也會(huì)有所提升。
3.水位線傳播和事件時(shí)間
Flink 內(nèi)部將水位線實(shí)現(xiàn)為特殊的記錄,它們可以通過算子任務(wù)進(jìn)行接收和發(fā)送。任務(wù)內(nèi)部的 時(shí)間服務(wù)(time service
)會(huì)維護(hù)一些 計(jì)時(shí)器(timer
),它們依靠接收到水位線來激活。這些計(jì)時(shí)器是由任務(wù)在時(shí)間服務(wù)內(nèi)注冊(cè),并在將來的某個(gè)時(shí)間點(diǎn)執(zhí)行計(jì)算。例如:窗口算子會(huì)為每個(gè)活動(dòng)窗口注冊(cè)一個(gè)計(jì)時(shí)器,它們會(huì)在事件時(shí)間超過窗口的結(jié)束時(shí)間時(shí)清理窗口狀態(tài)。
當(dāng)任務(wù)接收到一個(gè)水位線時(shí)會(huì)執(zhí)行以下操作:
- 基于水位線記錄的時(shí)間戳更新內(nèi)部事件時(shí)間時(shí)鐘。
- 任務(wù)的時(shí)間服務(wù)會(huì)找出所有觸發(fā)時(shí)間小于更新后事件時(shí)間的計(jì)時(shí)器。對(duì)于每個(gè)到期的計(jì)時(shí)器,調(diào)用回調(diào)函數(shù),利用它來執(zhí)行計(jì)算或發(fā)出記錄。
- 任務(wù)根據(jù)更新后的事件時(shí)間將水位線發(fā)出。
Flink 對(duì)通過 DataStream API 訪問時(shí)間戳和水位線有一定限制。普通函數(shù)無法讀寫記錄的時(shí)間戳或水位線,但一系列處理函數(shù)(
process function
)除外。它們可以讀取當(dāng)前正在處理記錄的時(shí)間戳,獲得當(dāng)前算子的事件時(shí)間,還能注冊(cè)計(jì)時(shí)器。所有函數(shù)的 API 都無法支持設(shè)置發(fā)出記錄的時(shí)間戳、調(diào)整任務(wù)的事件時(shí)間時(shí)鐘或發(fā)出水位線。為發(fā)出記錄配置時(shí)間戳的工作需要由基于時(shí)間的 DataStream 算子任務(wù)來完成,這樣才能確保時(shí)間戳和發(fā)出的水位線對(duì)齊。舉例而言,時(shí)間窗口算子任務(wù)會(huì)在發(fā)送觸發(fā)窗口計(jì)算的水位線時(shí)間戳之前,將所有經(jīng)過窗口計(jì)算所得結(jié)果的時(shí)間戳設(shè)為窗口的結(jié)束時(shí)間。
接下來我們?cè)敿?xì)解釋一下任務(wù)在收到一個(gè)新的水位線之后,將如何發(fā)送水位線和更新其內(nèi)部事件時(shí)間時(shí)鐘。Flink 會(huì)將數(shù)據(jù)流劃分為不同的分區(qū),并將它們交由不同的算子任務(wù)來并行執(zhí)行。每個(gè)分區(qū)作為一個(gè)數(shù)據(jù)流,都會(huì)包含帶有時(shí)間戳的記錄以及水位線。根據(jù)算子的上下游連接情況,其任務(wù)可能需要同時(shí)接收來自多個(gè)輸入分區(qū)的記錄和水位線,也可能需要將它們發(fā)送到多個(gè)輸出分區(qū)。下面我們將詳細(xì)介紹一個(gè)任務(wù)如何將水位線發(fā)送至多個(gè)輸出任務(wù),以及它從多個(gè)輸入任務(wù)獲取水位線后如何推動(dòng)事件時(shí)間時(shí)鐘前進(jìn)。
一個(gè)任務(wù)會(huì)為它的每個(gè)輸入分區(qū)都維護(hù)一個(gè) 分區(qū)水位線(partition watermark
)。當(dāng)收到某個(gè)分區(qū)傳來的水位線后,任務(wù)會(huì)以接收值和當(dāng)前值中較大的那個(gè)去更新對(duì)應(yīng)分區(qū)水位線的值。隨后,任務(wù)會(huì)把事件時(shí)間時(shí)鐘調(diào)整為所有分區(qū)水位線中最小的那個(gè)值。如果事件時(shí)間時(shí)鐘向前推動(dòng),任務(wù)會(huì)先處理因此而觸發(fā)的所有計(jì)時(shí)器,之后才會(huì)把對(duì)應(yīng)的水位線發(fā)往所有連接的輸出分區(qū),以實(shí)現(xiàn)事件時(shí)間到全部下游任務(wù)的廣播。
下圖展示了一個(gè)有 4 個(gè)輸入分區(qū)和 3 個(gè)輸出分區(qū)的任務(wù)在接收到水位線后,是如何更新它的分區(qū)水位線和事件時(shí)間時(shí)鐘,并將水位線發(fā)出的。
對(duì)于那些有著兩條或多條輸入數(shù)據(jù)流的算子,如 Union
或 CoFlatMap
,它們的任務(wù)同樣是利用全部分區(qū)水位線中的最小值來計(jì)算事件時(shí)間時(shí)鐘,并沒有考慮分區(qū)是否來自不同的輸入流。這就導(dǎo)致所有輸入的記錄都必須基于同一個(gè)事件時(shí)間時(shí)鐘來處理。如果不同輸入流的事件時(shí)間沒有對(duì)齊,那么該行為就會(huì)導(dǎo)致一些問題。
Flink 的水位線處理和傳播算法保證了算子任務(wù)所發(fā)出的記錄時(shí)間戳和水位線一定會(huì)對(duì)齊。然而,這依賴于一個(gè)事實(shí):所有分區(qū)都會(huì)持續(xù)提供自增的水位線。只要有一個(gè)分區(qū)的水位線沒有前進(jìn),或分區(qū)完全空閑下來不再發(fā)送任何記錄或水位線,任務(wù)的事件時(shí)間時(shí)鐘就不會(huì)前進(jìn),繼而導(dǎo)致計(jì)時(shí)器無法觸發(fā)。這種情形會(huì)給那些靠時(shí)鐘前進(jìn)來執(zhí)行計(jì)算或清除狀態(tài)的時(shí)間相關(guān)算子帶來麻煩。因此,如果一個(gè)任務(wù)沒有從全部輸入任務(wù)以常規(guī)間隔接收新的水位線,就會(huì)導(dǎo)致時(shí)間相關(guān)算子的處理延遲或狀態(tài)大小激增。
當(dāng)算子兩個(gè)輸入流的水位線差距很大時(shí),也會(huì)產(chǎn)生類似影響。對(duì)于一個(gè)有兩個(gè)輸入流的任務(wù)而言,其事件時(shí)間時(shí)鐘會(huì)受制于那個(gè)相對(duì)較慢的流,而較快流的記錄或中間結(jié)果會(huì)在狀態(tài)中緩沖,直到事件時(shí)間時(shí)鐘到達(dá)允許處理它們的那個(gè)點(diǎn)。
4.時(shí)間戳分配和水位線生成
到目前為止,我們已經(jīng)解釋了時(shí)間戳和水位線的含義以及它們?cè)?Flink 內(nèi)部的處理邏輯,但一直沒涉及它們的來源。時(shí)間戳和水位線通常都是在數(shù)據(jù)流剛剛進(jìn)入流處理應(yīng)用的時(shí)候分配和生成的。由于不同的應(yīng)用會(huì)選擇不同的時(shí)間戳,而水位線依賴于時(shí)間戳和數(shù)據(jù)流本身的特征,所以應(yīng)用必須顯式地分配時(shí)間戳和生成水位線。Flink DataStream 應(yīng)用可以通過三種方式完成該工作:文章來源:http://www.zghlxwxcb.cn/news/detail-833895.html
- 在數(shù)據(jù)源完成:我們可以利用 SourceFunction 在應(yīng)用讀入數(shù)據(jù)流的時(shí)候分配時(shí)間戳和生成水位線。源函數(shù)會(huì)發(fā)出一條記錄流。每個(gè)發(fā)出的記錄都可以附加一個(gè)時(shí)間戳,水位線可以作為特殊記錄在任何時(shí)間點(diǎn)發(fā)出。如果源函數(shù)(臨時(shí)性地)不再發(fā)出水位線,可以把自己聲明成空閑。Flink 會(huì)在后續(xù)算子計(jì)算水位線的時(shí)候把那些來自于空閑源函數(shù)的流分區(qū)排除在外。數(shù)據(jù)源空閑聲明機(jī)制 可以用來解決上面提到的水位線不向前推進(jìn)的問題。我們會(huì)在后續(xù)詳細(xì)討論數(shù)據(jù)源函數(shù)。
-
周期分配器(
periodic assigner
):DataStream API 提供了一個(gè)名為AssignerWithPeriodicWatermarks
的用戶自定義函數(shù),它可以用來從每條記錄提取時(shí)間戳,并周期性地響應(yīng)獲取當(dāng)前水位線的查詢請(qǐng)求。提取出來的時(shí)間戳?xí)郊拥礁髯缘挠涗浬?,查詢得到的水位線會(huì)注入到數(shù)據(jù)流中。這個(gè)函數(shù)會(huì)在后續(xù)介紹。 -
定點(diǎn)分配器(
punctuated assigner
):另一個(gè)支持從記錄中提取時(shí)間戳的用戶自定義函數(shù)叫作AssignerWithPunctuatedWatermarks
。它可用于需要根據(jù)特殊輸入記錄生成水位線的情況。和AssignerWithPeriodicWatermarks
函數(shù)不同,這個(gè)函數(shù)不會(huì)強(qiáng)制你從每條記錄中都提取一個(gè)時(shí)間戳(雖然這樣也行)。這個(gè)函數(shù)也會(huì)在后續(xù)介紹。
用戶自定義的時(shí)間戳分配函數(shù)通常都會(huì)盡可能地靠近數(shù)據(jù)源算子,因?yàn)樵诮?jīng)過其他算子處理后,記錄順序和它們的時(shí)間戳?xí)兊秒y以推斷。這也是為什么不建議在流式應(yīng)用中途覆蓋已有的時(shí)間戳和水位線(雖然這可以通過用戶自定義函數(shù)實(shí)現(xiàn))。文章來源地址http://www.zghlxwxcb.cn/news/detail-833895.html
到了這里,關(guān)于【大數(shù)據(jù)】Flink 架構(gòu)(三):事件時(shí)間處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!