01 基本概念
Watermark 是用于處理事件時(shí)間的一種機(jī)制,用于表示事件時(shí)間流的進(jìn)展。在流處理中,由于事件到達(dá)的順序和延遲,系統(tǒng)需要一種機(jī)制來(lái)衡量事件時(shí)間的進(jìn)展,以便正確觸發(fā)窗口操作等。Watermark 就是用來(lái)標(biāo)記事件時(shí)間的進(jìn)展情況的一種特殊數(shù)據(jù)元素。
02 工作原理
Watermark 的生成方式通常是由系統(tǒng)根據(jù)數(shù)據(jù)流中的事件來(lái)自動(dòng)推斷生成的。一般來(lái)說(shuō),系統(tǒng)會(huì)根據(jù)事件時(shí)間戳和一定的策略來(lái)生成 Watermark,以此來(lái)表示事件時(shí)間的進(jìn)展。在 Flink 中,通常會(huì)有內(nèi)置的 Watermark 生成器或者用戶(hù)自定義的生成器來(lái)實(shí)現(xiàn)這個(gè)功能。
當(dāng)一個(gè) Watermark 被生成后,它會(huì)被發(fā)送到流處理的所有并行任務(wù)中。任務(wù)會(huì)根據(jù)接收到的 Watermark,將小于或等于 Watermark 的事件時(shí)間的數(shù)據(jù)觸發(fā)相關(guān)操作(如窗口計(jì)算),以此來(lái)確保計(jì)算的正確性。
03 優(yōu)勢(shì)與劣勢(shì)
優(yōu)點(diǎn):
- Watermark 可以確保流處理系統(tǒng)正確處理事件時(shí)間,避免了由于亂序和延遲引起的計(jì)算錯(cuò)誤。
- 可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)特征靈活調(diào)整 Watermark 生成的策略,以適應(yīng)不同的場(chǎng)景。
- Watermark 的引入使得流處理系統(tǒng)更具健壯性,能夠處理各種實(shí)時(shí)數(shù)據(jù)場(chǎng)景。
缺點(diǎn):
- Watermark 的生成可能會(huì)帶來(lái)一定的開(kāi)銷(xiāo),尤其是在數(shù)據(jù)量龐大、事件頻繁的情況下,可能會(huì)對(duì)系統(tǒng)性能產(chǎn)生一定影響。
- 對(duì)于某些特殊的場(chǎng)景,例如極端亂序或者延遲過(guò)大的情況,Watermark 可能無(wú)法完全解決事件時(shí)間處理的問(wèn)題。
04 核心組件
-
Apache Flink中的水?。╓atermark)是事件時(shí)間處理的核心組件之一,它用于解決無(wú)序事件流中的事件時(shí)間問(wèn)題。水印是一種元數(shù)據(jù),用于告知系統(tǒng)事件時(shí)間流的進(jìn)度,從而使系統(tǒng)能夠在處理延遲的數(shù)據(jù)時(shí)做出正確的決策。
以下是Flink中水印的核心組件:
-
Watermark生成器(Watermark Generator):
- Watermark生成器負(fù)責(zé)生成水印,并將其插入到數(shù)據(jù)流中。
- 水印生成的策略通常與數(shù)據(jù)源有關(guān)。例如,對(duì)于有序的數(shù)據(jù)源,可以根據(jù)數(shù)據(jù)的事件時(shí)間直接生成水??;對(duì)于無(wú)序數(shù)據(jù)源,則可能需要一些啟發(fā)式方法來(lái)生成水印。
-
AssignerWithPeriodicWatermarks:
- 這是一個(gè)Flink提供的接口,用于在數(shù)據(jù)流中分配水印。
- 實(shí)現(xiàn)此接口的類(lèi)需要實(shí)現(xiàn)兩個(gè)方法:
extractTimestamp()
用于提取事件時(shí)間戳,getCurrentWatermark()
用于生成當(dāng)前水印。
-
AssignerWithPunctuatedWatermarks:
- 與上述相似,但是這個(gè)接口適用于在特定條件下(例如特定的事件)生成水印的場(chǎng)景。
-
Watermark延遲(Watermark Lag):
- 衡量系統(tǒng)中水印到達(dá)事件流的延遲程度。通常,水印到達(dá)得越快,系統(tǒng)對(duì)事件時(shí)間處理的準(zhǔn)確性就越高。
-
Watermark策略(Watermarking Strategy):
- 這是一個(gè)配置項(xiàng),用于確定水印生成的策略??梢曰诠潭ǖ臅r(shí)間間隔生成水印,也可以根據(jù)事件流的特性進(jìn)行自適應(yīng)調(diào)整。
-
Watermark傳遞和處理:
- Flink通過(guò)數(shù)據(jù)流將水印傳遞給各個(gè)操作符(operators),從而確保水印在整個(gè)流處理拓?fù)渲袀鬟f。
- 在處理過(guò)程中,水印用于確定事件時(shí)間窗口(Event Time Windows)的關(guān)閉時(shí)機(jī),以及觸發(fā)一些基于事件時(shí)間的操作,如觸發(fā)窗口計(jì)算等。
-
處理水印(Handling Watermarks):
- 在窗口計(jì)算等操作中,F(xiàn)link需要根據(jù)水印來(lái)判斷是否可以觸發(fā)計(jì)算操作,以此保證結(jié)果的正確性和完整性。
水印的核心作用在于解決事件時(shí)間處理中的亂序問(wèn)題,通過(guò)適當(dāng)?shù)乃〔呗院蜕蓹C(jī)制,可以有效地處理延遲數(shù)據(jù)和亂序數(shù)據(jù),保證數(shù)據(jù)處理的準(zhǔn)確性和時(shí)效性。
-
Watermark生成器(Watermark Generator):
05 Watermark 生成器 使用
在 Apache Flink 中,提供了一些內(nèi)置的 Watermark 生成器,這些生成器可以用于簡(jiǎn)化在流處理中的 Watermark 管理。以下是一些常用的內(nèi)置 Watermark 生成器:
-
BoundedOutOfOrdernessTimestampExtractor:
-
描述: 這是 Flink 內(nèi)置的基于有界亂序時(shí)間的 Watermark 生成器。
-
用法: 用戶(hù)可以通過(guò)指定最大允許的亂序時(shí)間來(lái)創(chuàng)建一個(gè)
BoundedOutOfOrdernessTimestampExtractor
實(shí)例。通常情況下,用戶(hù)需要實(shí)現(xiàn)extractTimestamp
方法,從事件中提取事件時(shí)間戳。 -
示例:
public class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<MyEvent> { public MyTimestampExtractor(Time maxOutOfOrderness) { super(maxOutOfOrderness); } @Override public long extractTimestamp(MyEvent event) { return event.getTimestamp(); } }
-
-
AscendingTimestampExtractor:
-
描述: 這是一個(gè)簡(jiǎn)單的 Watermark 生成器,適用于按照事件時(shí)間戳升序排列的數(shù)據(jù)流。
-
用法: 用戶(hù)只需實(shí)現(xiàn)
extractAscendingTimestamp
方法,從事件中提取事件時(shí)間戳。 -
示例:
public class MyAscendingTimestampExtractor extends AscendingTimestampExtractor<MyEvent> { @Override public long extractAscendingTimestamp(MyEvent event) { return event.getTimestamp(); } }
-
-
AssignerWithPunctuatedWatermarks:
-
描述: 這是一種特殊類(lèi)型的 Watermark 生成器,它可以基于某些事件的屬性產(chǎn)生 Watermark。
-
用法: 用戶(hù)需要實(shí)現(xiàn)
checkAndGetNextWatermark
方法,根據(jù)事件的某些屬性來(lái)判斷是否生成 Watermark。 -
示例:
public class MyPunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> { @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getTimestamp(); } @Override public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) { // 根據(jù) lastElement 的某些屬性判斷是否生成 Watermark if (lastElement.getProperty() > threshold) { return new Watermark(extractedTimestamp); } return null; // 如果不生成 Watermark,則返回 null } }
-
這些內(nèi)置的 Watermark 生成器提供了靈活性和方便性,使得在 Flink 中實(shí)現(xiàn)基于事件時(shí)間的處理變得更加容易。根據(jù)具體的業(yè)務(wù)需求和數(shù)據(jù)特征,可以選擇合適的 Watermark 生成器來(lái)確保準(zhǔn)確的事件時(shí)間處理。
06 應(yīng)用場(chǎng)景
在Apache Flink 1.18中,水印(Watermark)是事件時(shí)間處理的核心組件,用于解決事件時(shí)間流處理中的亂序和延遲數(shù)據(jù)的問(wèn)題。下面是一些Flink 1.18中集成Watermark水印的應(yīng)用場(chǎng)景:
-
流式窗口操作:
- 在流式處理中,經(jīng)常需要對(duì)事件進(jìn)行窗口化操作,例如按時(shí)間窗口、會(huì)話(huà)窗口等進(jìn)行聚合計(jì)算。Watermark的到達(dá)可以作為觸發(fā)窗口計(jì)算的信號(hào),確保窗口在事件時(shí)間上的正確性。這種情況下,Watermark能夠確保窗口內(nèi)的數(shù)據(jù)已經(jīng)全部到達(dá),可以進(jìn)行聚合計(jì)算,同時(shí)還能夠處理延遲的數(shù)據(jù)。
-
處理亂序數(shù)據(jù):
- 在實(shí)際的數(shù)據(jù)流中,事件通常不會(huì)按照嚴(yán)格的時(shí)間順序到達(dá),可能存在亂序的情況。Watermark可以幫助系統(tǒng)理清事件的先后順序,確保在事件時(shí)間上的正確性。通過(guò)適當(dāng)設(shè)置Watermark的生成策略,可以根據(jù)數(shù)據(jù)特性來(lái)處理亂序數(shù)據(jù),保證數(shù)據(jù)處理的正確性。
-
事件時(shí)間窗口計(jì)算:
- 在處理事件時(shí)間窗口時(shí),Watermark起到了關(guān)鍵作用。它確定了窗口的關(guān)閉時(shí)機(jī),即在Watermark達(dá)到窗口的結(jié)束時(shí)間時(shí),系統(tǒng)可以安全地關(guān)閉該窗口,并對(duì)其中的數(shù)據(jù)進(jìn)行計(jì)算。這確保了窗口計(jì)算的正確性,同時(shí)也能夠處理延遲數(shù)據(jù),使得窗口計(jì)算能夠在數(shù)據(jù)到達(dá)時(shí)即時(shí)進(jìn)行。
-
處理遲到的數(shù)據(jù):
- Watermark還可以用于處理遲到的數(shù)據(jù),即已經(jīng)超過(guò)窗口關(guān)閉時(shí)限但仍然到達(dá)的數(shù)據(jù)。通過(guò)設(shè)置適當(dāng)?shù)难舆t容忍閾值,可以容忍一定程度的遲到數(shù)據(jù),并將其納入窗口計(jì)算中。這樣可以提高數(shù)據(jù)處理的完整性和準(zhǔn)確性。
- 實(shí)時(shí)數(shù)據(jù)監(jiān)控和異常檢測(cè):
- 在實(shí)時(shí)數(shù)據(jù)流中,通常需要對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)監(jiān)控和異常檢測(cè)。Watermark可以用于確定事件時(shí)間的進(jìn)度,從而實(shí)現(xiàn)實(shí)時(shí)監(jiān)控和異常檢測(cè)。例如,可以基于事件時(shí)間窗口對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,發(fā)現(xiàn)突發(fā)的異常情況,并及時(shí)采取相應(yīng)的措施。
總的來(lái)說(shuō),F(xiàn)link 1.18中集成Watermark水印的應(yīng)用場(chǎng)景涵蓋了廣泛的實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域,包括流式窗口操作、處理亂序數(shù)據(jù)、事件時(shí)間窗口計(jì)算、處理遲到的數(shù)據(jù)以及實(shí)時(shí)數(shù)據(jù)監(jiān)控和異常檢測(cè)等方面。Watermark作為事件時(shí)間處理的核心組件,為Flink提供了處理實(shí)時(shí)數(shù)據(jù)流的強(qiáng)大功能,能夠確保數(shù)據(jù)處理的準(zhǔn)確性和時(shí)效性。
07 注意事項(xiàng)
Apache Flink 中水?。╓atermark)的使用是關(guān)鍵的,特別是在處理事件時(shí)間(Event Time)數(shù)據(jù)時(shí)。水印是一種機(jī)制,用于處理無(wú)序事件流,并確保在執(zhí)行窗口操作時(shí)數(shù)據(jù)的完整性和正確性。以下是在使用 Flink 1.18 中水印的一些注意事項(xiàng):
-
水印生成器(Watermark Generators)的選擇:
- Flink 提供了多種內(nèi)置的水印生成器,如 BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor。
- BoundedOutOfOrdernessTimestampExtractor 適用于處理帶有亂序的數(shù)據(jù)流,它會(huì)為每個(gè)事件引入一定的延遲。
- AscendingTimestampExtractor 適用于處理按事件順序到達(dá)的數(shù)據(jù)流,它假定數(shù)據(jù)已經(jīng)按照事件時(shí)間排序。
-
水印延遲(Watermark Lag)的設(shè)置:
- 設(shè)置水印延遲是非常重要的,它決定了 Flink 在處理數(shù)據(jù)時(shí)能夠容忍的事件延遲時(shí)間。
- 如果設(shè)置的水印延遲過(guò)小,可能會(huì)導(dǎo)致窗口操作不正確,因?yàn)?Flink 認(rèn)為某些事件已經(jīng)到達(dá),但實(shí)際上它們還沒(méi)有到達(dá)。
- 如果設(shè)置的水印延遲過(guò)大,可能會(huì)導(dǎo)致窗口操作的延遲增加,因?yàn)?Flink 需要等待更長(zhǎng)時(shí)間以確保數(shù)據(jù)的完整性。
-
數(shù)據(jù)源的處理:
- 在讀取數(shù)據(jù)源時(shí),確保正確地分配時(shí)間戳并生成水印。這通常需要在數(shù)據(jù)源的讀取邏輯中明確指定時(shí)間戳和水印生成的邏輯。
-
水印與窗口操作的關(guān)系:
- 在執(zhí)行窗口操作(如窗口聚合、窗口計(jì)算等)時(shí),水印的生成和處理是至關(guān)重要的。
- 水印確保在觸發(fā)窗口計(jì)算時(shí),F(xiàn)link 已經(jīng)收到了窗口結(jié)束時(shí)間之前的所有數(shù)據(jù),從而確保計(jì)算結(jié)果的準(zhǔn)確性。
-
定期檢查水印生成是否正常:
- 在部署 Flink 作業(yè)時(shí),建議定期檢查水印的生成情況??梢酝ㄟ^(guò) Flink 的監(jiān)控界面或日志來(lái)查看水印的生成情況,并根據(jù)需要調(diào)整水印生成的邏輯和設(shè)置。
-
監(jiān)控和調(diào)試:
- 在使用水印時(shí),需要重點(diǎn)關(guān)注作業(yè)的監(jiān)控和調(diào)試,以確保水印的生成和處理是符合預(yù)期的。
- 如果發(fā)現(xiàn)數(shù)據(jù)延遲或窗口計(jì)算不正確,可以通過(guò)監(jiān)控?cái)?shù)據(jù)流和日志來(lái)定位和解決問(wèn)題,可能需要調(diào)整水印的生成邏輯或調(diào)整水印延遲來(lái)改善作業(yè)的性能和準(zhǔn)確性。
-
數(shù)據(jù)傾斜和性能優(yōu)化:
- 在使用水印時(shí),需要注意數(shù)據(jù)傾斜可能會(huì)影響水印的生成和處理性能??梢酝ㄟ^(guò)合理的數(shù)據(jù)分片和并行處理來(lái)減輕數(shù)據(jù)傾斜帶來(lái)的影響,從而提高作業(yè)的性能和穩(wěn)定性。
總的來(lái)說(shuō),水印在 Flink 中的使用是非常重要的,它能夠確保在處理事件時(shí)間數(shù)據(jù)時(shí)保持?jǐn)?shù)據(jù)的完整性和正確性。因此,在設(shè)計(jì)和部署 Flink 作業(yè)時(shí),需要特別注意水印的生成和處理,以確保作業(yè)能夠正確運(yùn)行并獲得良好的性能表現(xiàn)。
08 案例分析
8.1 窗口統(tǒng)計(jì)數(shù)據(jù)不準(zhǔn)
當(dāng)涉及到事件時(shí)間處理時(shí),延遲和亂序是非常常見(jiàn)的情況。下面是一個(gè)簡(jiǎn)單的案例,演示了在事件時(shí)間處理中可能遇到的延遲和亂序問(wèn)題。
假設(shè)我們有一個(gè)用于監(jiān)控網(wǎng)站用戶(hù)訪問(wèn)的實(shí)時(shí)數(shù)據(jù)流。每個(gè)事件都包含用戶(hù)ID、訪問(wèn)時(shí)間戳和訪問(wèn)的網(wǎng)頁(yè)URL。我們想要計(jì)算每個(gè)用戶(hù)在每小時(shí)內(nèi)訪問(wèn)的不同網(wǎng)頁(yè)數(shù)量。
考慮到網(wǎng)絡(luò)傳輸和數(shù)據(jù)處理可能會(huì)引入延遲和亂序,我們的數(shù)據(jù)流可能如下所示:
Event 1: {UserID: 1, Timestamp: 12:00:05, URL: "example.com/page1"}
Event 2: {UserID: 2, Timestamp: 12:00:10, URL: "example.com/page2"}
Event 3: {UserID: 1, Timestamp: 12:00:15, URL: "example.com/page2"}
Event 4: {UserID: 1, Timestamp: 11:59:58, URL: "example.com/page3"} <-- 延遲
Event 5: {UserID: 2, Timestamp: 12:00:02, URL: "example.com/page4"} <-- 亂序
在這個(gè)示例中,Event 4由于延遲而晚于其他事件到達(dá),而Event 5由于亂序而在其本應(yīng)到達(dá)的時(shí)間之前到達(dá)。
如果沒(méi)有使用水印機(jī)制,Flink 可能會(huì)錯(cuò)誤地將 Event 4 的數(shù)據(jù)統(tǒng)計(jì)到 12:00:00 ~ 12:01:00 的窗口中,這是因?yàn)?Flink 默認(rèn)情況下是根據(jù)接收到事件的時(shí)間來(lái)進(jìn)行處理的,而不是根據(jù)事件實(shí)際發(fā)生的事件時(shí)間。
8.2 水印是如何解決延遲與亂序問(wèn)題?
在上述案例中,F(xiàn)link 的水?。╓atermark)機(jī)制通過(guò)指示事件時(shí)間的上限,幫助系統(tǒng)確定事件時(shí)間窗口的邊界。水印本質(zhì)上是一種元數(shù)據(jù),它告知 Flink 在某個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù)已經(jīng)全部到達(dá)。
下面簡(jiǎn)要說(shuō)明水印如何在案例中發(fā)揮作用:
-
處理延遲數(shù)據(jù):
- 當(dāng) Event 4 發(fā)生延遲到達(dá)時(shí),水印會(huì)逐漸推進(jìn),最終達(dá)到 Event 4 的事件時(shí)間戳(11:59:58)。
- Flink 知道在水印之前的所有數(shù)據(jù)都已經(jīng)到達(dá),因此即使 Event 4 晚到,也不會(huì)影響窗口的觸發(fā)。
-
處理亂序數(shù)據(jù):
- 當(dāng) Event 5 由于亂序提前到達(dá)時(shí),水印仍然在逐漸推進(jìn)。
- Flink 通過(guò)水印判斷,在當(dāng)前水印之前的所有數(shù)據(jù)都已到達(dá),因此可以觸發(fā)相應(yīng)的窗口計(jì)算。
-
窗口觸發(fā):
- Flink 會(huì)根據(jù)水印確定觸發(fā)窗口的時(shí)機(jī)。當(dāng)水印到達(dá)某個(gè)時(shí)間戳?xí)r,F(xiàn)link 知道在該水印之前的數(shù)據(jù)已經(jīng)全部到達(dá),可以安全地觸發(fā)窗口計(jì)算。
- 比如,在水印到達(dá) 12:00:05 時(shí),F(xiàn)link 可以觸發(fā) 12:00:00 - 12:01:00 的窗口計(jì)算,處理這一時(shí)段內(nèi)的數(shù)據(jù)。
綜合來(lái)說(shuō),水印幫助 Flink 在事件時(shí)間處理中正確處理延遲和亂序的數(shù)據(jù),確保窗口操作的準(zhǔn)確性和完整性。通過(guò)逐漸推進(jìn)水印,系統(tǒng)能夠在事件時(shí)間軸上有序地進(jìn)行處理,而不會(huì)受到延遲和亂序數(shù)據(jù)的影響。
8.3 詳細(xì)分析
假設(shè)我們有以下十條亂序的事件數(shù)據(jù),每條數(shù)據(jù)包含事件時(shí)間戳和相應(yīng)的值:
事件時(shí)間戳(毫秒) 值
1000 10
2000 15
3000 12
1500 8
2500 18
1200 6
1800 14
4000 20
3500 16
3200 9
我們將使用Watermark來(lái)處理這些數(shù)據(jù),并進(jìn)行窗口統(tǒng)計(jì)。假設(shè)窗口大小為2秒,最大亂序時(shí)間為1秒。
使用Watermark前的統(tǒng)計(jì):
- 當(dāng)接收到事件時(shí)間戳為1000毫秒時(shí),將值10加入窗口。
- 當(dāng)接收到事件時(shí)間戳為2000毫秒時(shí),將值15加入窗口。
- 當(dāng)接收到事件時(shí)間戳為3000毫秒時(shí),將值12加入窗口。
- 當(dāng)接收到事件時(shí)間戳為1500毫秒時(shí),將值8加入窗口。
- 當(dāng)接收到事件時(shí)間戳為2500毫秒時(shí),將值18加入窗口。
- 當(dāng)接收到事件時(shí)間戳為1200毫秒時(shí),將值6加入窗口。
- 當(dāng)接收到事件時(shí)間戳為1800毫秒時(shí),將值14加入窗口。
- 當(dāng)接收到事件時(shí)間戳為4000毫秒時(shí),將值20加入窗口。
- 當(dāng)接收到事件時(shí)間戳為3500毫秒時(shí),將值16加入窗口。
- 當(dāng)接收到事件時(shí)間戳為3200毫秒時(shí),將值9加入窗口。
使用Watermark后的統(tǒng)計(jì):
Watermark的計(jì)算過(guò)程如下: Watermark = max(當(dāng)前Watermark, 當(dāng)前事件時(shí)間 - 最大亂序時(shí)間)
在這個(gè)例子中,我們?cè)O(shè)定最大亂序時(shí)間為1秒,即1000毫秒。
- 當(dāng)收到事件時(shí)間戳為1000毫秒時(shí),Watermark = max(0, 1000 - 1000) = 0毫秒。
- 當(dāng)收到事件時(shí)間戳為2000毫秒時(shí),Watermark = max(0, 2000 - 1000) = 1000毫秒。
- 當(dāng)收到事件時(shí)間戳為3000毫秒時(shí),Watermark = max(1000, 3000 - 1000) = 2000毫秒。
- 當(dāng)收到事件時(shí)間戳為1500毫秒時(shí),Watermark = max(2000, 1500 - 1000) = 2000毫秒。
- 當(dāng)收到事件時(shí)間戳為2500毫秒時(shí),Watermark = max(2000, 2500 - 1000) = 2000毫秒。
- 當(dāng)收到事件時(shí)間戳為1200毫秒時(shí),Watermark = max(2000, 1200 - 1000) = 2000毫秒。
- 當(dāng)收到事件時(shí)間戳為1800毫秒時(shí),Watermark = max(2000, 1800 - 1000) = 2000毫秒。
- 當(dāng)收到事件時(shí)間戳為4000毫秒時(shí),Watermark = max(2000, 4000 - 1000) = 3000毫秒。
- 當(dāng)收到事件時(shí)間戳為3500毫秒時(shí),Watermark = max(3000, 3500 - 1000) = 3000毫秒。
- 當(dāng)收到事件時(shí)間戳為3200毫秒時(shí),Watermark = max(3000, 3200 - 1000) = 3000毫秒。
Watermark確定了什么時(shí)候觸發(fā)窗口統(tǒng)計(jì)。在本例中,當(dāng)Watermark超過(guò)窗口的結(jié)束時(shí)間時(shí),窗口將被關(guān)閉,并進(jìn)行統(tǒng)計(jì)。因此,Watermark確保了即使在亂序數(shù)據(jù)的情況下,窗口統(tǒng)計(jì)也能夠按照正確的事件時(shí)間順序進(jìn)行。
為了更清晰地展示W(wǎng)atermark的影響,以下是每個(gè)事件被處理時(shí)的Watermark狀態(tài)和窗口統(tǒng)計(jì)的結(jié)果:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-829518.html
事件時(shí)間戳(毫秒) 值 Watermark 窗口統(tǒng)計(jì)結(jié)果
1000 10 0 10
2000 15 1000 25
3000 12 2000 27
1500 8 2000 27
2500 18 2000 30
1200 6 2000 30
1800 14 2000 32
4000 20 3000 36
3500 16 3000 36
3200 9 3000 36
這里的窗口統(tǒng)計(jì)結(jié)果是在Watermark觸發(fā)時(shí)計(jì)算的。在Watermark超過(guò)窗口結(jié)束時(shí)間時(shí),窗口會(huì)被關(guān)閉,并進(jìn)行統(tǒng)計(jì)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-829518.html
09 項(xiàng)目實(shí)戰(zhàn)demo
9.1 pom依賴(lài)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsy</groupId>
<artifactId>aurora_flink_connector_file</artifactId>
<version>1.0-SNAPSHOT</version>
<!--屬性設(shè)置-->
<properties>
<!--java_JDK版本-->
<java.version>11</java.version>
<!--maven打包插件-->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--編譯編碼UTF-8-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--輸出報(bào)告編碼UTF-8-->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<!--通用依賴(lài)-->
<dependencies>
<!--集成日志框架 start-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<!--集成日志框架 end-->
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- flink讀取Text File文件依賴(lài) start-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink讀取Text File文件依賴(lài) end-->
<!-- flink基礎(chǔ)依賴(lài) start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink基礎(chǔ)依賴(lài) end -->
</dependencies>
<!--編譯打包-->
<build>
<finalName>${project.name}</finalName>
<!--資源文件打包-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.aurora.KafkaStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件統(tǒng)一管理-->
<pluginManagement>
<plugins>
<!--maven打包插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--編譯打包插件-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
9.2 log4j2.properties配置
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp
9.3 Watermark水印作業(yè)
package com.aurora.demo;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;
/**
* 描述:Flink集成Watermark水印
*
* @author 淺夏的貓
* @version 1.0.0
* @date 2024-02-08 10:31:40
*/
public class WatermarkStreamingJob {
private static final Logger logger = LoggerFactory.getLogger(WatermarkStreamingJob.class);
public static void main(String[] args) throws Exception {
// 創(chuàng)建 執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 自定義數(shù)據(jù)源,每隔1000ms下發(fā)一條數(shù)據(jù)
SourceFunction<JSONObject> dataSource = new SourceFunction<>() {
private volatile boolean running = true;
@Override
public void run(SourceContext<JSONObject> sourceContext) throws Exception {
while (running) {
long timestamp = System.currentTimeMillis();
timestamp = timestamp - new Random().nextInt(11) + 10;
// 將時(shí)間戳轉(zhuǎn)換為 LocalDateTime 對(duì)象
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// 定義日期時(shí)間格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 格式化日期時(shí)間對(duì)象為指定格式的字符串
String format = formatter.format(dateTime);
JSONObject dataObj = new JSONObject();
int transId = 8;
dataObj.put("userId", "user_" + transId);
dataObj.put("timestamp", timestamp);
dataObj.put("datetime", format);
dataObj.put("url", "example.com/page" + transId);
logger.info("數(shù)據(jù)源url={},用戶(hù)={},交易時(shí)間={},系統(tǒng)時(shí)間={}", "example.com/page" + transId, "user_" + transId, format);
Thread.sleep(1000);
sourceContext.collect(dataObj);
}
}
@Override
public void cancel() {
running = false;
}
};
//創(chuàng)建水印策略處理事件發(fā)生時(shí)間
TimestampAssignerSupplier<JSONObject> timestampAssignerSupplier = new TimestampAssignerSupplier<JSONObject>() {
@Override
public TimestampAssigner<JSONObject> createTimestampAssigner(Context context) {
return new TimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
//使用自定義的事件發(fā)生時(shí)間來(lái)做水印,確保窗口統(tǒng)計(jì)的是按照我們的時(shí)間字段統(tǒng)計(jì),提高準(zhǔn)確度,否則默認(rèn)使用消費(fèi)時(shí)間
return element.getLong("timestamp");
}
};
}
};
//創(chuàng)建數(shù)據(jù)流
env.addSource(dataSource).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(timestampAssignerSupplier))
//按照url分組
.keyBy(new KeySelector<JSONObject, Object>() {
@Override
public Object getKey(JSONObject jsonObject) throws Exception {
return jsonObject.getString("url");
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.reduce(new ReduceFunction<JSONObject>() {
@Override
public JSONObject reduce(JSONObject reduceResult, JSONObject record) throws Exception {
logger.info("窗口統(tǒng)計(jì)url={},用戶(hù)流水={},次數(shù)={}", reduceResult.getString("url"), reduceResult.getString("userId"), reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum"));
int urlNum = reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum");
reduceResult.put("urlNum", urlNum + 1);
return reduceResult;
}
})
.print();
// 執(zhí)行任務(wù)
env.execute("WatermarkStreamingJob");
}
}
到了這里,關(guān)于【天衍系列 03】深入理解Flink的Watermark:實(shí)時(shí)流處理的時(shí)間概念與亂序處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!