国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【天衍系列 03】深入理解Flink的Watermark:實(shí)時(shí)流處理的時(shí)間概念與亂序處理

這篇具有很好參考價(jià)值的文章主要介紹了【天衍系列 03】深入理解Flink的Watermark:實(shí)時(shí)流處理的時(shí)間概念與亂序處理。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

01 基本概念

Watermark 是用于處理事件時(shí)間的一種機(jī)制,用于表示事件時(shí)間流的進(jìn)展。在流處理中,由于事件到達(dá)的順序和延遲,系統(tǒng)需要一種機(jī)制來(lái)衡量事件時(shí)間的進(jìn)展,以便正確觸發(fā)窗口操作等。Watermark 就是用來(lái)標(biāo)記事件時(shí)間的進(jìn)展情況的一種特殊數(shù)據(jù)元素。

02 工作原理

Watermark 的生成方式通常是由系統(tǒng)根據(jù)數(shù)據(jù)流中的事件來(lái)自動(dòng)推斷生成的。一般來(lái)說(shuō),系統(tǒng)會(huì)根據(jù)事件時(shí)間戳和一定的策略來(lái)生成 Watermark,以此來(lái)表示事件時(shí)間的進(jìn)展。在 Flink 中,通常會(huì)有內(nèi)置的 Watermark 生成器或者用戶(hù)自定義的生成器來(lái)實(shí)現(xiàn)這個(gè)功能。

當(dāng)一個(gè) Watermark 被生成后,它會(huì)被發(fā)送到流處理的所有并行任務(wù)中。任務(wù)會(huì)根據(jù)接收到的 Watermark,將小于或等于 Watermark 的事件時(shí)間的數(shù)據(jù)觸發(fā)相關(guān)操作(如窗口計(jì)算),以此來(lái)確保計(jì)算的正確性。

03 優(yōu)勢(shì)與劣勢(shì)

優(yōu)點(diǎn):

  • Watermark 可以確保流處理系統(tǒng)正確處理事件時(shí)間,避免了由于亂序和延遲引起的計(jì)算錯(cuò)誤。
  • 可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)特征靈活調(diào)整 Watermark 生成的策略,以適應(yīng)不同的場(chǎng)景。
  • Watermark 的引入使得流處理系統(tǒng)更具健壯性,能夠處理各種實(shí)時(shí)數(shù)據(jù)場(chǎng)景。

缺點(diǎn):

  • Watermark 的生成可能會(huì)帶來(lái)一定的開(kāi)銷(xiāo),尤其是在數(shù)據(jù)量龐大、事件頻繁的情況下,可能會(huì)對(duì)系統(tǒng)性能產(chǎn)生一定影響。
  • 對(duì)于某些特殊的場(chǎng)景,例如極端亂序或者延遲過(guò)大的情況,Watermark 可能無(wú)法完全解決事件時(shí)間處理的問(wèn)題。

04 核心組件

  • Apache Flink中的水?。╓atermark)是事件時(shí)間處理的核心組件之一,它用于解決無(wú)序事件流中的事件時(shí)間問(wèn)題。水印是一種元數(shù)據(jù),用于告知系統(tǒng)事件時(shí)間流的進(jìn)度,從而使系統(tǒng)能夠在處理延遲的數(shù)據(jù)時(shí)做出正確的決策。

    以下是Flink中水印的核心組件:

    1. Watermark生成器(Watermark Generator)
      • Watermark生成器負(fù)責(zé)生成水印,并將其插入到數(shù)據(jù)流中。
      • 水印生成的策略通常與數(shù)據(jù)源有關(guān)。例如,對(duì)于有序的數(shù)據(jù)源,可以根據(jù)數(shù)據(jù)的事件時(shí)間直接生成水??;對(duì)于無(wú)序數(shù)據(jù)源,則可能需要一些啟發(fā)式方法來(lái)生成水印。
    2. AssignerWithPeriodicWatermarks
      • 這是一個(gè)Flink提供的接口,用于在數(shù)據(jù)流中分配水印。
      • 實(shí)現(xiàn)此接口的類(lèi)需要實(shí)現(xiàn)兩個(gè)方法:extractTimestamp()用于提取事件時(shí)間戳,getCurrentWatermark()用于生成當(dāng)前水印。
    3. AssignerWithPunctuatedWatermarks
      • 與上述相似,但是這個(gè)接口適用于在特定條件下(例如特定的事件)生成水印的場(chǎng)景。
    4. Watermark延遲(Watermark Lag)
      • 衡量系統(tǒng)中水印到達(dá)事件流的延遲程度。通常,水印到達(dá)得越快,系統(tǒng)對(duì)事件時(shí)間處理的準(zhǔn)確性就越高。
    5. Watermark策略(Watermarking Strategy)
      • 這是一個(gè)配置項(xiàng),用于確定水印生成的策略??梢曰诠潭ǖ臅r(shí)間間隔生成水印,也可以根據(jù)事件流的特性進(jìn)行自適應(yīng)調(diào)整。
    6. Watermark傳遞和處理
      • Flink通過(guò)數(shù)據(jù)流將水印傳遞給各個(gè)操作符(operators),從而確保水印在整個(gè)流處理拓?fù)渲袀鬟f。
      • 在處理過(guò)程中,水印用于確定事件時(shí)間窗口(Event Time Windows)的關(guān)閉時(shí)機(jī),以及觸發(fā)一些基于事件時(shí)間的操作,如觸發(fā)窗口計(jì)算等。
    7. 處理水印(Handling Watermarks)
      • 在窗口計(jì)算等操作中,F(xiàn)link需要根據(jù)水印來(lái)判斷是否可以觸發(fā)計(jì)算操作,以此保證結(jié)果的正確性和完整性。

    水印的核心作用在于解決事件時(shí)間處理中的亂序問(wèn)題,通過(guò)適當(dāng)?shù)乃〔呗院蜕蓹C(jī)制,可以有效地處理延遲數(shù)據(jù)和亂序數(shù)據(jù),保證數(shù)據(jù)處理的準(zhǔn)確性和時(shí)效性。

05 Watermark 生成器 使用

在 Apache Flink 中,提供了一些內(nèi)置的 Watermark 生成器,這些生成器可以用于簡(jiǎn)化在流處理中的 Watermark 管理。以下是一些常用的內(nèi)置 Watermark 生成器:

  1. BoundedOutOfOrdernessTimestampExtractor:

    • 描述: 這是 Flink 內(nèi)置的基于有界亂序時(shí)間的 Watermark 生成器。

    • 用法: 用戶(hù)可以通過(guò)指定最大允許的亂序時(shí)間來(lái)創(chuàng)建一個(gè) BoundedOutOfOrdernessTimestampExtractor 實(shí)例。通常情況下,用戶(hù)需要實(shí)現(xiàn) extractTimestamp 方法,從事件中提取事件時(shí)間戳。

    • 示例:

      public class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<MyEvent> {
          public MyTimestampExtractor(Time maxOutOfOrderness) {
              super(maxOutOfOrderness);
          }
      
          @Override
          public long extractTimestamp(MyEvent event) {
              return event.getTimestamp();
          }
      }
      
  2. AscendingTimestampExtractor:

    • 描述: 這是一個(gè)簡(jiǎn)單的 Watermark 生成器,適用于按照事件時(shí)間戳升序排列的數(shù)據(jù)流。

    • 用法: 用戶(hù)只需實(shí)現(xiàn) extractAscendingTimestamp 方法,從事件中提取事件時(shí)間戳。

    • 示例:

      public class MyAscendingTimestampExtractor extends AscendingTimestampExtractor<MyEvent> {
          @Override
          public long extractAscendingTimestamp(MyEvent event) {
              return event.getTimestamp();
          }
      }
      
  3. AssignerWithPunctuatedWatermarks:

    • 描述: 這是一種特殊類(lèi)型的 Watermark 生成器,它可以基于某些事件的屬性產(chǎn)生 Watermark。

    • 用法: 用戶(hù)需要實(shí)現(xiàn) checkAndGetNextWatermark 方法,根據(jù)事件的某些屬性來(lái)判斷是否生成 Watermark。

    • 示例:

      public class MyPunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
          @Override
          public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
              return element.getTimestamp();
          }
      
          @Override
          public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
              // 根據(jù) lastElement 的某些屬性判斷是否生成 Watermark
              if (lastElement.getProperty() > threshold) {
                  return new Watermark(extractedTimestamp);
              }
              return null; // 如果不生成 Watermark,則返回 null
          }
      }
      

這些內(nèi)置的 Watermark 生成器提供了靈活性和方便性,使得在 Flink 中實(shí)現(xiàn)基于事件時(shí)間的處理變得更加容易。根據(jù)具體的業(yè)務(wù)需求和數(shù)據(jù)特征,可以選擇合適的 Watermark 生成器來(lái)確保準(zhǔn)確的事件時(shí)間處理。

06 應(yīng)用場(chǎng)景

在Apache Flink 1.18中,水印(Watermark)是事件時(shí)間處理的核心組件,用于解決事件時(shí)間流處理中的亂序和延遲數(shù)據(jù)的問(wèn)題。下面是一些Flink 1.18中集成Watermark水印的應(yīng)用場(chǎng)景:

  1. 流式窗口操作
    • 在流式處理中,經(jīng)常需要對(duì)事件進(jìn)行窗口化操作,例如按時(shí)間窗口、會(huì)話(huà)窗口等進(jìn)行聚合計(jì)算。Watermark的到達(dá)可以作為觸發(fā)窗口計(jì)算的信號(hào),確保窗口在事件時(shí)間上的正確性。這種情況下,Watermark能夠確保窗口內(nèi)的數(shù)據(jù)已經(jīng)全部到達(dá),可以進(jìn)行聚合計(jì)算,同時(shí)還能夠處理延遲的數(shù)據(jù)。
  2. 處理亂序數(shù)據(jù)
    • 在實(shí)際的數(shù)據(jù)流中,事件通常不會(huì)按照嚴(yán)格的時(shí)間順序到達(dá),可能存在亂序的情況。Watermark可以幫助系統(tǒng)理清事件的先后順序,確保在事件時(shí)間上的正確性。通過(guò)適當(dāng)設(shè)置Watermark的生成策略,可以根據(jù)數(shù)據(jù)特性來(lái)處理亂序數(shù)據(jù),保證數(shù)據(jù)處理的正確性。
  3. 事件時(shí)間窗口計(jì)算
    • 在處理事件時(shí)間窗口時(shí),Watermark起到了關(guān)鍵作用。它確定了窗口的關(guān)閉時(shí)機(jī),即在Watermark達(dá)到窗口的結(jié)束時(shí)間時(shí),系統(tǒng)可以安全地關(guān)閉該窗口,并對(duì)其中的數(shù)據(jù)進(jìn)行計(jì)算。這確保了窗口計(jì)算的正確性,同時(shí)也能夠處理延遲數(shù)據(jù),使得窗口計(jì)算能夠在數(shù)據(jù)到達(dá)時(shí)即時(shí)進(jìn)行。
  4. 處理遲到的數(shù)據(jù)
    • Watermark還可以用于處理遲到的數(shù)據(jù),即已經(jīng)超過(guò)窗口關(guān)閉時(shí)限但仍然到達(dá)的數(shù)據(jù)。通過(guò)設(shè)置適當(dāng)?shù)难舆t容忍閾值,可以容忍一定程度的遲到數(shù)據(jù),并將其納入窗口計(jì)算中。這樣可以提高數(shù)據(jù)處理的完整性和準(zhǔn)確性。
    • 實(shí)時(shí)數(shù)據(jù)監(jiān)控和異常檢測(cè)
    • 在實(shí)時(shí)數(shù)據(jù)流中,通常需要對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)監(jiān)控和異常檢測(cè)。Watermark可以用于確定事件時(shí)間的進(jìn)度,從而實(shí)現(xiàn)實(shí)時(shí)監(jiān)控和異常檢測(cè)。例如,可以基于事件時(shí)間窗口對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,發(fā)現(xiàn)突發(fā)的異常情況,并及時(shí)采取相應(yīng)的措施。

總的來(lái)說(shuō),F(xiàn)link 1.18中集成Watermark水印的應(yīng)用場(chǎng)景涵蓋了廣泛的實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域,包括流式窗口操作、處理亂序數(shù)據(jù)、事件時(shí)間窗口計(jì)算、處理遲到的數(shù)據(jù)以及實(shí)時(shí)數(shù)據(jù)監(jiān)控和異常檢測(cè)等方面。Watermark作為事件時(shí)間處理的核心組件,為Flink提供了處理實(shí)時(shí)數(shù)據(jù)流的強(qiáng)大功能,能夠確保數(shù)據(jù)處理的準(zhǔn)確性和時(shí)效性。

07 注意事項(xiàng)

Apache Flink 中水?。╓atermark)的使用是關(guān)鍵的,特別是在處理事件時(shí)間(Event Time)數(shù)據(jù)時(shí)。水印是一種機(jī)制,用于處理無(wú)序事件流,并確保在執(zhí)行窗口操作時(shí)數(shù)據(jù)的完整性和正確性。以下是在使用 Flink 1.18 中水印的一些注意事項(xiàng):

  1. 水印生成器(Watermark Generators)的選擇
    • Flink 提供了多種內(nèi)置的水印生成器,如 BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor。
    • BoundedOutOfOrdernessTimestampExtractor 適用于處理帶有亂序的數(shù)據(jù)流,它會(huì)為每個(gè)事件引入一定的延遲。
    • AscendingTimestampExtractor 適用于處理按事件順序到達(dá)的數(shù)據(jù)流,它假定數(shù)據(jù)已經(jīng)按照事件時(shí)間排序。
  2. 水印延遲(Watermark Lag)的設(shè)置
    • 設(shè)置水印延遲是非常重要的,它決定了 Flink 在處理數(shù)據(jù)時(shí)能夠容忍的事件延遲時(shí)間。
    • 如果設(shè)置的水印延遲過(guò)小,可能會(huì)導(dǎo)致窗口操作不正確,因?yàn)?Flink 認(rèn)為某些事件已經(jīng)到達(dá),但實(shí)際上它們還沒(méi)有到達(dá)。
    • 如果設(shè)置的水印延遲過(guò)大,可能會(huì)導(dǎo)致窗口操作的延遲增加,因?yàn)?Flink 需要等待更長(zhǎng)時(shí)間以確保數(shù)據(jù)的完整性。
  3. 數(shù)據(jù)源的處理
    • 在讀取數(shù)據(jù)源時(shí),確保正確地分配時(shí)間戳并生成水印。這通常需要在數(shù)據(jù)源的讀取邏輯中明確指定時(shí)間戳和水印生成的邏輯。
  4. 水印與窗口操作的關(guān)系
    • 在執(zhí)行窗口操作(如窗口聚合、窗口計(jì)算等)時(shí),水印的生成和處理是至關(guān)重要的。
    • 水印確保在觸發(fā)窗口計(jì)算時(shí),F(xiàn)link 已經(jīng)收到了窗口結(jié)束時(shí)間之前的所有數(shù)據(jù),從而確保計(jì)算結(jié)果的準(zhǔn)確性。
  5. 定期檢查水印生成是否正常
    • 在部署 Flink 作業(yè)時(shí),建議定期檢查水印的生成情況??梢酝ㄟ^(guò) Flink 的監(jiān)控界面或日志來(lái)查看水印的生成情況,并根據(jù)需要調(diào)整水印生成的邏輯和設(shè)置。
  6. 監(jiān)控和調(diào)試
    • 在使用水印時(shí),需要重點(diǎn)關(guān)注作業(yè)的監(jiān)控和調(diào)試,以確保水印的生成和處理是符合預(yù)期的。
    • 如果發(fā)現(xiàn)數(shù)據(jù)延遲或窗口計(jì)算不正確,可以通過(guò)監(jiān)控?cái)?shù)據(jù)流和日志來(lái)定位和解決問(wèn)題,可能需要調(diào)整水印的生成邏輯或調(diào)整水印延遲來(lái)改善作業(yè)的性能和準(zhǔn)確性。
  7. 數(shù)據(jù)傾斜和性能優(yōu)化
    • 在使用水印時(shí),需要注意數(shù)據(jù)傾斜可能會(huì)影響水印的生成和處理性能??梢酝ㄟ^(guò)合理的數(shù)據(jù)分片和并行處理來(lái)減輕數(shù)據(jù)傾斜帶來(lái)的影響,從而提高作業(yè)的性能和穩(wěn)定性。

總的來(lái)說(shuō),水印在 Flink 中的使用是非常重要的,它能夠確保在處理事件時(shí)間數(shù)據(jù)時(shí)保持?jǐn)?shù)據(jù)的完整性和正確性。因此,在設(shè)計(jì)和部署 Flink 作業(yè)時(shí),需要特別注意水印的生成和處理,以確保作業(yè)能夠正確運(yùn)行并獲得良好的性能表現(xiàn)。

08 案例分析

8.1 窗口統(tǒng)計(jì)數(shù)據(jù)不準(zhǔn)

當(dāng)涉及到事件時(shí)間處理時(shí),延遲和亂序是非常常見(jiàn)的情況。下面是一個(gè)簡(jiǎn)單的案例,演示了在事件時(shí)間處理中可能遇到的延遲和亂序問(wèn)題。

假設(shè)我們有一個(gè)用于監(jiān)控網(wǎng)站用戶(hù)訪問(wèn)的實(shí)時(shí)數(shù)據(jù)流。每個(gè)事件都包含用戶(hù)ID、訪問(wèn)時(shí)間戳和訪問(wèn)的網(wǎng)頁(yè)URL。我們想要計(jì)算每個(gè)用戶(hù)在每小時(shí)內(nèi)訪問(wèn)的不同網(wǎng)頁(yè)數(shù)量。

考慮到網(wǎng)絡(luò)傳輸和數(shù)據(jù)處理可能會(huì)引入延遲和亂序,我們的數(shù)據(jù)流可能如下所示:

Event 1: {UserID: 1, Timestamp: 12:00:05, URL: "example.com/page1"}
Event 2: {UserID: 2, Timestamp: 12:00:10, URL: "example.com/page2"}
Event 3: {UserID: 1, Timestamp: 12:00:15, URL: "example.com/page2"}
Event 4: {UserID: 1, Timestamp: 11:59:58, URL: "example.com/page3"}   <-- 延遲
Event 5: {UserID: 2, Timestamp: 12:00:02, URL: "example.com/page4"}   <-- 亂序

在這個(gè)示例中,Event 4由于延遲而晚于其他事件到達(dá),而Event 5由于亂序而在其本應(yīng)到達(dá)的時(shí)間之前到達(dá)。

如果沒(méi)有使用水印機(jī)制,Flink 可能會(huì)錯(cuò)誤地將 Event 4 的數(shù)據(jù)統(tǒng)計(jì)到 12:00:00 ~ 12:01:00 的窗口中,這是因?yàn)?Flink 默認(rèn)情況下是根據(jù)接收到事件的時(shí)間來(lái)進(jìn)行處理的,而不是根據(jù)事件實(shí)際發(fā)生的事件時(shí)間。

8.2 水印是如何解決延遲與亂序問(wèn)題?

在上述案例中,F(xiàn)link 的水?。╓atermark)機(jī)制通過(guò)指示事件時(shí)間的上限,幫助系統(tǒng)確定事件時(shí)間窗口的邊界。水印本質(zhì)上是一種元數(shù)據(jù),它告知 Flink 在某個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù)已經(jīng)全部到達(dá)。

下面簡(jiǎn)要說(shuō)明水印如何在案例中發(fā)揮作用:

  1. 處理延遲數(shù)據(jù)
    • 當(dāng) Event 4 發(fā)生延遲到達(dá)時(shí),水印會(huì)逐漸推進(jìn),最終達(dá)到 Event 4 的事件時(shí)間戳(11:59:58)。
    • Flink 知道在水印之前的所有數(shù)據(jù)都已經(jīng)到達(dá),因此即使 Event 4 晚到,也不會(huì)影響窗口的觸發(fā)。
  2. 處理亂序數(shù)據(jù)
    • 當(dāng) Event 5 由于亂序提前到達(dá)時(shí),水印仍然在逐漸推進(jìn)。
    • Flink 通過(guò)水印判斷,在當(dāng)前水印之前的所有數(shù)據(jù)都已到達(dá),因此可以觸發(fā)相應(yīng)的窗口計(jì)算。
  3. 窗口觸發(fā)
    • Flink 會(huì)根據(jù)水印確定觸發(fā)窗口的時(shí)機(jī)。當(dāng)水印到達(dá)某個(gè)時(shí)間戳?xí)r,F(xiàn)link 知道在該水印之前的數(shù)據(jù)已經(jīng)全部到達(dá),可以安全地觸發(fā)窗口計(jì)算。
    • 比如,在水印到達(dá) 12:00:05 時(shí),F(xiàn)link 可以觸發(fā) 12:00:00 - 12:01:00 的窗口計(jì)算,處理這一時(shí)段內(nèi)的數(shù)據(jù)。

綜合來(lái)說(shuō),水印幫助 Flink 在事件時(shí)間處理中正確處理延遲和亂序的數(shù)據(jù),確保窗口操作的準(zhǔn)確性和完整性。通過(guò)逐漸推進(jìn)水印,系統(tǒng)能夠在事件時(shí)間軸上有序地進(jìn)行處理,而不會(huì)受到延遲和亂序數(shù)據(jù)的影響。

8.3 詳細(xì)分析

假設(shè)我們有以下十條亂序的事件數(shù)據(jù),每條數(shù)據(jù)包含事件時(shí)間戳和相應(yīng)的值:

事件時(shí)間戳(毫秒)  值
1000               10
2000               15
3000               12
1500               8
2500               18
1200               6
1800               14
4000               20
3500               16
3200               9

我們將使用Watermark來(lái)處理這些數(shù)據(jù),并進(jìn)行窗口統(tǒng)計(jì)。假設(shè)窗口大小為2秒,最大亂序時(shí)間為1秒。

使用Watermark前的統(tǒng)計(jì)

  1. 當(dāng)接收到事件時(shí)間戳為1000毫秒時(shí),將值10加入窗口。
  2. 當(dāng)接收到事件時(shí)間戳為2000毫秒時(shí),將值15加入窗口。
  3. 當(dāng)接收到事件時(shí)間戳為3000毫秒時(shí),將值12加入窗口。
  4. 當(dāng)接收到事件時(shí)間戳為1500毫秒時(shí),將值8加入窗口。
  5. 當(dāng)接收到事件時(shí)間戳為2500毫秒時(shí),將值18加入窗口。
  6. 當(dāng)接收到事件時(shí)間戳為1200毫秒時(shí),將值6加入窗口。
  7. 當(dāng)接收到事件時(shí)間戳為1800毫秒時(shí),將值14加入窗口。
  8. 當(dāng)接收到事件時(shí)間戳為4000毫秒時(shí),將值20加入窗口。
  9. 當(dāng)接收到事件時(shí)間戳為3500毫秒時(shí),將值16加入窗口。
  10. 當(dāng)接收到事件時(shí)間戳為3200毫秒時(shí),將值9加入窗口。

使用Watermark后的統(tǒng)計(jì)

Watermark的計(jì)算過(guò)程如下: Watermark = max(當(dāng)前Watermark, 當(dāng)前事件時(shí)間 - 最大亂序時(shí)間)

在這個(gè)例子中,我們?cè)O(shè)定最大亂序時(shí)間為1秒,即1000毫秒。

  1. 當(dāng)收到事件時(shí)間戳為1000毫秒時(shí),Watermark = max(0, 1000 - 1000) = 0毫秒。
  2. 當(dāng)收到事件時(shí)間戳為2000毫秒時(shí),Watermark = max(0, 2000 - 1000) = 1000毫秒。
  3. 當(dāng)收到事件時(shí)間戳為3000毫秒時(shí),Watermark = max(1000, 3000 - 1000) = 2000毫秒。
  4. 當(dāng)收到事件時(shí)間戳為1500毫秒時(shí),Watermark = max(2000, 1500 - 1000) = 2000毫秒。
  5. 當(dāng)收到事件時(shí)間戳為2500毫秒時(shí),Watermark = max(2000, 2500 - 1000) = 2000毫秒。
  6. 當(dāng)收到事件時(shí)間戳為1200毫秒時(shí),Watermark = max(2000, 1200 - 1000) = 2000毫秒。
  7. 當(dāng)收到事件時(shí)間戳為1800毫秒時(shí),Watermark = max(2000, 1800 - 1000) = 2000毫秒。
  8. 當(dāng)收到事件時(shí)間戳為4000毫秒時(shí),Watermark = max(2000, 4000 - 1000) = 3000毫秒。
  9. 當(dāng)收到事件時(shí)間戳為3500毫秒時(shí),Watermark = max(3000, 3500 - 1000) = 3000毫秒。
  10. 當(dāng)收到事件時(shí)間戳為3200毫秒時(shí),Watermark = max(3000, 3200 - 1000) = 3000毫秒。

Watermark確定了什么時(shí)候觸發(fā)窗口統(tǒng)計(jì)。在本例中,當(dāng)Watermark超過(guò)窗口的結(jié)束時(shí)間時(shí),窗口將被關(guān)閉,并進(jìn)行統(tǒng)計(jì)。因此,Watermark確保了即使在亂序數(shù)據(jù)的情況下,窗口統(tǒng)計(jì)也能夠按照正確的事件時(shí)間順序進(jìn)行。

為了更清晰地展示W(wǎng)atermark的影響,以下是每個(gè)事件被處理時(shí)的Watermark狀態(tài)和窗口統(tǒng)計(jì)的結(jié)果:

事件時(shí)間戳(毫秒)  值   Watermark    窗口統(tǒng)計(jì)結(jié)果
1000               10   0            10
2000               15   1000         25
3000               12   2000         27
1500               8    2000         27
2500               18   2000         30
1200               6    2000         30
1800               14   2000         32
4000               20   3000         36
3500               16   3000         36
3200               9    3000         36

這里的窗口統(tǒng)計(jì)結(jié)果是在Watermark觸發(fā)時(shí)計(jì)算的。在Watermark超過(guò)窗口結(jié)束時(shí)間時(shí),窗口會(huì)被關(guān)閉,并進(jìn)行統(tǒng)計(jì)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-829518.html

09 項(xiàng)目實(shí)戰(zhàn)demo

9.1 pom依賴(lài)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink_connector_file</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--屬性設(shè)置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--編譯編碼UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--輸出報(bào)告編碼UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <!--通用依賴(lài)-->
    <dependencies>

        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
        </dependency>

        <!--集成日志框架 end-->

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <!-- flink讀取Text File文件依賴(lài) start-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- flink讀取Text File文件依賴(lài) end-->

        <!-- flink基礎(chǔ)依賴(lài) start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- flink基礎(chǔ)依賴(lài) end -->

    </dependencies>

    <!--編譯打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--資源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.aurora.KafkaStreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件統(tǒng)一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--編譯打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

9.2 log4j2.properties配置

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

9.3 Watermark水印作業(yè)

package com.aurora.demo;


import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;

/**
 * 描述:Flink集成Watermark水印
 *
 * @author 淺夏的貓
 * @version 1.0.0
 * @date 2024-02-08 10:31:40
 */
public class WatermarkStreamingJob {

    private static final Logger logger = LoggerFactory.getLogger(WatermarkStreamingJob.class);

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建 執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 自定義數(shù)據(jù)源,每隔1000ms下發(fā)一條數(shù)據(jù)
        SourceFunction<JSONObject> dataSource = new SourceFunction<>() {
            private volatile boolean running = true;

            @Override
            public void run(SourceContext<JSONObject> sourceContext) throws Exception {
                while (running) {

                    long timestamp = System.currentTimeMillis();
                    timestamp = timestamp - new Random().nextInt(11) + 10;
                    // 將時(shí)間戳轉(zhuǎn)換為 LocalDateTime 對(duì)象
                    LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
                    // 定義日期時(shí)間格式
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    // 格式化日期時(shí)間對(duì)象為指定格式的字符串
                    String format = formatter.format(dateTime);

                    JSONObject dataObj = new JSONObject();
                    int transId = 8;
                    dataObj.put("userId", "user_" + transId);
                    dataObj.put("timestamp", timestamp);
                    dataObj.put("datetime", format);
                    dataObj.put("url", "example.com/page" + transId);

                    logger.info("數(shù)據(jù)源url={},用戶(hù)={},交易時(shí)間={},系統(tǒng)時(shí)間={}", "example.com/page" + transId, "user_" + transId, format);
                    Thread.sleep(1000);

                    sourceContext.collect(dataObj);


                }
            }

            @Override
            public void cancel() {
                running = false;
            }
        };

        //創(chuàng)建水印策略處理事件發(fā)生時(shí)間
        TimestampAssignerSupplier<JSONObject> timestampAssignerSupplier = new TimestampAssignerSupplier<JSONObject>() {
            @Override
            public TimestampAssigner<JSONObject> createTimestampAssigner(Context context) {
                return new TimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        //使用自定義的事件發(fā)生時(shí)間來(lái)做水印,確保窗口統(tǒng)計(jì)的是按照我們的時(shí)間字段統(tǒng)計(jì),提高準(zhǔn)確度,否則默認(rèn)使用消費(fèi)時(shí)間
                        return element.getLong("timestamp");
                    }
                };
            }
        };

        //創(chuàng)建數(shù)據(jù)流
        env.addSource(dataSource).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner(timestampAssignerSupplier))
                //按照url分組
                .keyBy(new KeySelector<JSONObject, Object>() {
                    @Override
                    public Object getKey(JSONObject jsonObject) throws Exception {
                        return jsonObject.getString("url");
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .reduce(new ReduceFunction<JSONObject>() {
                    @Override
                    public JSONObject reduce(JSONObject reduceResult, JSONObject record) throws Exception {
                        logger.info("窗口統(tǒng)計(jì)url={},用戶(hù)流水={},次數(shù)={}", reduceResult.getString("url"), reduceResult.getString("userId"), reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum"));
                        int urlNum = reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum");
                        reduceResult.put("urlNum", urlNum + 1);
                        return reduceResult;
                    }
                })
                .print();

        // 執(zhí)行任務(wù)
        env.execute("WatermarkStreamingJob");
    }
}

到了這里,關(guān)于【天衍系列 03】深入理解Flink的Watermark:實(shí)時(shí)流處理的時(shí)間概念與亂序處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【多線(xiàn)程系列-03】深入理解java中線(xiàn)程的生命周期,任務(wù)調(diào)度

    【多線(xiàn)程系列-03】深入理解java中線(xiàn)程的生命周期,任務(wù)調(diào)度

    多線(xiàn)程系列整體欄目 內(nèi)容 鏈接地址 【一】深入理解進(jìn)程、線(xiàn)程和CPU之間的關(guān)系 https://blog.csdn.net/zhenghuishengq/article/details/131714191 【二】java創(chuàng)建線(xiàn)程的方式到底有幾種?(詳解) https://blog.csdn.net/zhenghuishengq/article/details/127968166 【三】深入理解java中線(xiàn)程的生命周期,任務(wù)調(diào)度 ht

    2024年02月17日
    瀏覽(26)
  • 【天衍系列 05】Flink集成KafkaSink組件:實(shí)現(xiàn)流式數(shù)據(jù)的可靠傳輸 & 高效協(xié)同

    【天衍系列 05】Flink集成KafkaSink組件:實(shí)現(xiàn)流式數(shù)據(jù)的可靠傳輸 & 高效協(xié)同

    Flink版本: 本文主要是基于Flink1.14.4 版本 導(dǎo)言: Apache Flink 作為流式處理領(lǐng)域的先鋒,為實(shí)時(shí)數(shù)據(jù)處理提供了強(qiáng)大而靈活的解決方案。其中,KafkaSink 是 Flink 生態(tài)系統(tǒng)中的關(guān)鍵組件之一,扮演著將 Flink 處理的數(shù)據(jù)可靠地發(fā)送到 Kafka 主題的角色。本文將深入探討 KafkaSink 的工作

    2024年02月20日
    瀏覽(50)
  • 7.2、如何理解Flink中的水位線(xiàn)(Watermark)

    7.2、如何理解Flink中的水位線(xiàn)(Watermark)

    目錄 0、版本說(shuō)明 1、什么是水位線(xiàn)? 2、水位線(xiàn)使用場(chǎng)景? 3、設(shè)計(jì)水位線(xiàn)主要為了解決什么問(wèn)題? 4、怎樣在flink中生成水位線(xiàn)? 4.1、自定義標(biāo)記 Watermark 生成器 4.2、自定義周期性 Watermark 生成器 4.3、內(nèi)置Watermark生成器 - 有序流水位線(xiàn)生成器 4.4、內(nèi)置Watermark生成器 - 亂序流

    2024年02月08日
    瀏覽(18)
  • Flink詳解系列之五--水位線(xiàn)(watermark)

    Flink詳解系列之五--水位線(xiàn)(watermark)

    1、概念 在Flink中,水位線(xiàn)是一種衡量Event Time進(jìn)展的機(jī)制,用來(lái)處理實(shí)時(shí)數(shù)據(jù)中的亂序問(wèn)題的,通常是水位線(xiàn)和窗口結(jié)合使用來(lái)實(shí)現(xiàn)。 從設(shè)備生成實(shí)時(shí)流事件,到Flink的source,再到多個(gè)oparator處理數(shù)據(jù),過(guò)程中會(huì)受到網(wǎng)絡(luò)延遲、背壓等多種因素影響造成數(shù)據(jù)亂序。在進(jìn)行窗口處

    2024年02月13日
    瀏覽(20)
  • Flink系列之:深入理解ttl和checkpoint,F(xiàn)link SQL應(yīng)用ttl案例

    Flink TTL(Time To Live)是一種機(jī)制,用于設(shè)置數(shù)據(jù)的過(guò)期時(shí)間,控制數(shù)據(jù)在內(nèi)存或狀態(tài)中的存活時(shí)間。通過(guò)設(shè)置TTL,可以自動(dòng)刪除過(guò)期的數(shù)據(jù),從而釋放資源并提高性能。 在Flink中,TTL可以應(yīng)用于不同的組件和場(chǎng)景,包括窗口、狀態(tài)和表。 窗口:對(duì)于窗口操作,可以將TTL應(yīng)用于

    2024年02月03日
    瀏覽(29)
  • FPGA信號(hào)處理系列文章——深入淺出理解多相濾波器

    提示:文章寫(xiě)完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔 多相濾波是,按照相位均勻劃分把數(shù)字濾波器的系統(tǒng)函數(shù)H(z)分解成若干個(gè)具有不同相位的組,形成多個(gè)分支,在每個(gè)分支上實(shí)現(xiàn)濾波。 采用多相濾波結(jié)構(gòu),可利用多個(gè)階數(shù)較低的濾波來(lái)實(shí)現(xiàn)原本階數(shù)較

    2024年02月05日
    瀏覽(57)
  • 【flink番外篇】6、flink的WaterMark(介紹、基本使用、kafka的水印以及超出最大允許延遲數(shù)據(jù)的處理)介紹及示例(1) - 介紹

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月01日
    瀏覽(29)
  • 【flink番外篇】6、flink的WaterMark(介紹、基本使用、kafka的水印以及超出最大允許延遲數(shù)據(jù)的處理)介紹及示例 - 完整版

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(27)
  • 【極數(shù)系列】Flink項(xiàng)目入門(mén)搭建(03)

    【極數(shù)系列】Flink項(xiàng)目入門(mén)搭建(03)

    gitee地址:https://gitee.com/shawsongyue/aurora.git 源碼直接下載可運(yùn)行,模塊:aurora_flink Flink 版本:1.18.0 Jdk 版本:11 tips:transformer處寫(xiě)主啟動(dòng)類(lèi) tips:resource目錄下增加該配置,主要用于日志打印 tips:編寫(xiě)了一個(gè)簡(jiǎn)單的有界數(shù)據(jù)流處理demo程序 step1 :創(chuàng)建flink程序運(yùn)行所需環(huán)境 ste

    2024年01月24日
    瀏覽(18)
  • 深入理解 Flink(一)Flink 架構(gòu)設(shè)計(jì)原理

    深入理解 Flink(一)Flink 架構(gòu)設(shè)計(jì)原理

    深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達(dá)鏈接: 深入理解 Flink (一)Flink 架構(gòu)設(shè)計(jì)原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯(cuò)深入分析 深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級(jí)原理詳解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包