摘要:本文整理自字節(jié)跳動基礎(chǔ)架構(gòu)工程師李明,在 Apache Paimon Meetup 的分享。本篇內(nèi)容主要分為四個部分:
背景
方案設(shè)計
當前進展
未來規(guī)劃
點擊查看原文視頻 & 演講PPT文章來源地址http://www.zghlxwxcb.cn/news/detail-626808.html
一、背景
?
早期的數(shù)倉生產(chǎn)體系主要以離線數(shù)倉為主,業(yè)務按照自己的業(yè)務需求將數(shù)倉分為不同的層次,例如 DWD、DWS、ADS 等。在離線數(shù)倉中,業(yè)務數(shù)據(jù)會經(jīng)過離線 ETL 加工進入數(shù)倉,層與層之間的數(shù)據(jù)轉(zhuǎn)換也會使用離線 ETL 來進行處理。ADS 層可以直接對外提供 Serving 能力,中間層通常會使用 Hive 來存儲中間數(shù)據(jù)?;?Hive 也可以提供一些 OLAP QUERY 的能力。
在離線數(shù)倉生產(chǎn)體系下,優(yōu)勢是離線數(shù)倉的生產(chǎn)體系非常完整,工具鏈也比較成熟,存儲和維護的成本比較低,對于用戶的開發(fā)門檻相對也比較低。但劣勢也非常明顯,首先數(shù)據(jù)新鮮度非常低,通常是 T+1 級別,一般是小時級,甚至是天級。其次 changelog 支持不完善,雖然是面向Table開發(fā),但中間存儲 Hive 主要支持 append 類型的數(shù)據(jù),同時離線 ETL 更適合處理全量數(shù)據(jù),而不是增量更新。
隨著數(shù)據(jù)量的增多,離線 ETL 的執(zhí)行時間越來越長,同時業(yè)務對數(shù)據(jù)新鮮度的要求也越來越高。業(yè)務迫切的需要一種新的低延遲數(shù)倉生產(chǎn)體系。因此基于離線數(shù)倉進一步演進出了實時數(shù)倉生產(chǎn)體系。
比較典型的是 Lambda 架構(gòu)的實時數(shù)倉生產(chǎn)體系。在 Lambda 架構(gòu)的實時數(shù)倉生產(chǎn)體系中,業(yè)務需要維護兩條鏈路,將生產(chǎn)鏈路分為了流處理層和批處理層。流處理層主要用于實時處理增量數(shù)據(jù),作為批處理層的加速層,這層通常會選用 Storm、Flink 等實時計算引擎來進行數(shù)據(jù)處理。而中間結(jié)果則采用 Kafka 進行存儲,以提供低延遲的流式消費能力。
批處理層和離線數(shù)倉相同,完成 T+1 的數(shù)據(jù)結(jié)果產(chǎn)出。服務層則會綜合流處理層和批處理層的結(jié)果對外提供服務。
隨著流式計算引擎的不斷發(fā)展,以 Flink 為例,已經(jīng)實現(xiàn)了計算層的流批統(tǒng)一,在一些場景中可以完全移除掉批處理層,由流處理層來完成全量+增量的計算。為了提供中間關(guān)鍵數(shù)據(jù)的 OLAP 查詢能力,仍然需要將 Kafka 的數(shù)據(jù)再 Dump 到 Hive 中一份。
在實時數(shù)倉生產(chǎn)體系中,優(yōu)勢是數(shù)據(jù)新鮮度非常高,同時基于流處理層也可以做很多的預計算,來降低查詢的延遲。
劣勢也比較明顯:
- 第一,數(shù)倉的維護人員需要維護從計算到存儲的兩條技術(shù)棧完全不同的鏈路,開發(fā)和維護的成本都比較高。
- 第二,存儲成本高。Kafka 為了提供低延遲的流式消費能力,相比于離線常用的 HDFS,S3 等離線存儲,存儲的成本會更高。同時,為了讓中間數(shù)據(jù)能夠提供離線查詢的能力,還需要額外存儲一份離線的全量數(shù)據(jù)。
- 第三,離線和實時鏈路的數(shù)據(jù)口徑比較難對齊。這是因為采用了完全不同的兩套技術(shù)棧在構(gòu)建流處理層和批處理層。雖然邏輯抽象是相同的,但在具體實現(xiàn)上仍然有差別。并且流處理層的數(shù)據(jù)在不斷地進行增量處理,和離線處理層很難基于固定的時間點進行結(jié)果對齊。
- 最后在流處理鏈路中的中間結(jié)果,它是不可以被查詢的,因為 Kafka 只支持流式順序消費,沒有點查、batch 查詢的能力。雖然可以通過將 Kafka 數(shù)據(jù) Dump 到 Hive 中一份,但實時性比較差。
盡管計算引擎已經(jīng)實現(xiàn)了流批統(tǒng)一,但實時數(shù)倉其他的痛點很大程度是由于存儲功能存在一定的限制而導致的。隨著數(shù)據(jù)湖技術(shù)的興起,一種新的存儲技術(shù)產(chǎn)生了,它能支持高效的數(shù)據(jù)流批讀寫、數(shù)據(jù)回溯以及數(shù)據(jù)更新。基于數(shù)據(jù)湖可以構(gòu)建出新的數(shù)倉生產(chǎn)體系——Streaming Warehouse。
在 Streaming Warehouse 中,每個中間表都被抽象為 Dynamic Table,能夠同時支持流式和批式訪問,為用戶提供了和離線數(shù)倉相同的生產(chǎn)體驗?;?Streaming Warehouse 可以帶來以下收益。
首先,為用戶提供了統(tǒng)一的Table抽象,用戶只需要維護一套 Schema。同時也統(tǒng)一了技術(shù)棧,大幅降低了業(yè)務的開發(fā)和運維成本。
其次,它采用了流批一體的存儲,支持流式消費和 OLAP 查詢,可以隨時查詢實時計算的中間結(jié)果。
最后,在保證數(shù)據(jù)新鮮度的情況下,存儲成本相比實時數(shù)倉會更低一些。中間存儲可以選用相對廉價的 HDFS 和 S3 這樣的存儲。
接下來我們對這三種數(shù)倉生產(chǎn)體系做一個整體的對比。
-
在數(shù)據(jù)新鮮度方面,實時數(shù)倉和 Streaming Warehouse 的數(shù)據(jù)新鮮度是比較接近的,都是近似于實時的生產(chǎn)體驗。
-
在查詢延遲方面,三種數(shù)倉生產(chǎn)體系的查詢延遲都相對較低,但實時數(shù)倉的中間結(jié)果查詢需要付出更多的成本,比如將中間結(jié)果需要導出到Hive等。
-
在開發(fā)成本方面,Streaming Warehouse 和離線數(shù)倉的開發(fā)成本比較接近,它們的開發(fā)模式類似,可以很容易的進行開發(fā)和數(shù)據(jù)驗證,門檻較低。實時數(shù)倉由于中間結(jié)果不可查,想要做 debug 和數(shù)據(jù)驗證的成本開銷會比較高。
-
在運維成本方面,Streaming Warehouse 和離線數(shù)倉的運維成本也是比較接近的,因為它們的生產(chǎn)體系類似。對于運維人員,只需要維護一條鏈路,使用同一套技術(shù)棧。同時 Streaming Warehouse 和離線數(shù)倉都可以選擇更廉價的離線存儲,存儲成本會更低一些。
那么思考一下 Streaming Warehouse 是否真的完全覆蓋了我們的需求?
先來看一個業(yè)務場景,這是一個比較典型的商品訂單關(guān)聯(lián)計算的業(yè)務場景。在這個場景中,訂單數(shù)據(jù)和商品數(shù)據(jù)會經(jīng)過一些簡單的加工,導入到 Streaming Warehouse 中的 ODS 層的表,也就是訂單表和商品表。
然后訂單表和商品表會進一步拼接為 DWD 層的商品訂單明細表。最后對 DWD 層的表做一些聚合計算,產(chǎn)生 DWS 層的數(shù)據(jù)結(jié)果表。例如統(tǒng)計今天所有商品的營收,統(tǒng)計今天銷售量 Top 10 的商品信息等。
在這樣一個業(yè)務場景中,業(yè)務在數(shù)倉中可能也會進行一些常見的操作,比如業(yè)務可能會去修改訂單表的字段。那么如果修改了訂單表的字段,怎么去判斷這次修改可能會影響到下游的哪些表呢?這反映出目前 Streaming Warehouse 中缺乏一個血緣管理的業(yè)務能力。
另外如果訂單表數(shù)據(jù)出錯了,如何去做生產(chǎn)鏈路的數(shù)據(jù)訂正呢?在離線數(shù)倉中,可以很方便的進行任務重跑、Overwrite 等操作。在 Streaming Warehouse 中目前也可以很方便的去做這樣的操作嗎?
由于 Streaming Warehouse 是基于實時生產(chǎn)鏈路,所以不僅需要對這個表進行訂正,還需要對它下游的表同時進行處理。在整個訂正的過程中,數(shù)據(jù)的中間變化不應該被服務層可見。比如聚合結(jié)果已經(jīng)到了 10,在訂正的過程中,這個結(jié)果可能會回退到1,然后再逐漸累加到 10。
除了上述兩個問題外,在進行 OLAP 查詢時,如果想要分析 Top 10 商品在整個營收中所占的比重如何進行呢?如果是離線數(shù)倉,我們可以在兩個表就緒之后進行 batch 查詢。而在 Streaming Warehouse 中并沒有就緒的概念,這兩張表又來源于兩個不同的任務,任務之間并沒有任何的數(shù)據(jù)對齊的操作。當我們進行多表關(guān)聯(lián)查詢的時候,它的計算結(jié)果并不是完全一致的,缺少一個一致性的保證。
下面我們來總結(jié)一下在 Streaming Warehouse 中存在的問題。
-
缺少血緣管理功能,包括表的血緣關(guān)系以及數(shù)據(jù)的血緣關(guān)系。表血緣關(guān)系是指這個表的上下游依賴,而數(shù)據(jù)血緣關(guān)系則是指這份數(shù)據(jù)來源于上游的哪些數(shù)據(jù),同時下游基于這份數(shù)據(jù)生產(chǎn)出了哪些數(shù)據(jù)。
-
缺少統(tǒng)一的版本管理能力。在離線數(shù)倉中,我們可以按照小時、天來對數(shù)據(jù)進行對齊。而在 Streaming Warehouse 中,由于我們都是流式進行處理,沒有數(shù)據(jù)對齊、版本劃分的概念,就會導致進行多表關(guān)聯(lián)查詢的時候缺少一致性的保證。
-
數(shù)據(jù)訂正困難。在進行訂正的過程中,需要進行鏈路雙跑、業(yè)務邏輯修正等大量的人工操作,運維成本較高。
基于以上的問題,我們提出了一個基于 Flink 和 Paimon 構(gòu)建 Streaming Warehouse,并對外提供數(shù)據(jù)一致性管理的能力。
二、方案設(shè)計
下面我們介紹一下基于 Flink 和 Paimon 實現(xiàn)數(shù)據(jù)一致性管理方案的詳細設(shè)計。
在一致性管理方案的整體設(shè)計中,主要包含兩個部分。
-
第一部分,建立上下游的血緣關(guān)系,我們會引入 System Database 來記錄 Streaming Warehouse 中所有表和數(shù)據(jù)的血緣關(guān)系。同時,在任務提交以及數(shù)據(jù)生產(chǎn)的過程中,會自動的把表以及數(shù)據(jù)之間的血緣關(guān)系寫入到血緣關(guān)系表中。
-
第二部分,我們會在 Streaming Warehouse 中引入數(shù)據(jù)版本控制的能力,數(shù)據(jù)會按照版本來保持可見性,并且協(xié)調(diào)多表數(shù)據(jù)版本處理的一致性。
下面我們詳細介紹一下這兩部分的方案設(shè)計。
首先是血緣關(guān)系中的Table血緣關(guān)系管理。我們在 Streaming Warehouse 中引入了 System Database,并在這個 System Database 中創(chuàng)建了 Source 和 Sink 的血緣關(guān)系表。在任務的提交階段,會解析這個任務使用到的 Table 表,并將這些信息記錄到 Paimon 的血緣關(guān)系表中。
上圖是我們的一個表結(jié)構(gòu),主要用來記錄表和任務之間的關(guān)聯(lián)關(guān)系?;谶@個關(guān)聯(lián)關(guān)系,我們可以構(gòu)建出表與表之間的數(shù)據(jù)血緣關(guān)系。
在數(shù)據(jù)血緣關(guān)系中會為數(shù)據(jù)劃分一個版本,并將版本信息記錄到數(shù)據(jù)血緣關(guān)系的表中。目前我們以 Flink 的 Checkpoint 作為數(shù)據(jù)版本的一個劃分標志,這是因為在 Flink 中目前 Paimon 表是依賴 Checkpoint 來實現(xiàn)數(shù)據(jù)提交的。
在 Flink 的 Checkpoint 制作成功之后,這意味著一個新的版本的數(shù)據(jù)產(chǎn)生了,我們會自動記錄消費與生產(chǎn)之間的 Snapshot 的關(guān)系。
接下來介紹數(shù)據(jù)版本控制的設(shè)計,首先介紹一下基本概念。
-
第一個概念是 Flink Checkpoint。這個是 Flink 定期用來持久化狀態(tài),制作快照的一個功能,主要用于容錯、兩階段提交等。
-
第二個概念是 Paimon Snapshot。在 Flink 制作 Checkpoint 的時候 Paimon 會產(chǎn)生 1 個或 2 個Snapshot,這取決于 Paimon 在這個過程中是否有進行過 Compaction,但至少會產(chǎn)生一個 Snapshot 來作為新的數(shù)據(jù)版本。
-
第三個概念是 Data Version,也就是數(shù)據(jù)版本。計算引擎在計算的時候會按照數(shù)據(jù)的版本進行數(shù)據(jù)的對齊,然后進行處理,從而實現(xiàn)一個微批模式的處理。
目前,短期內(nèi)我們是將 Paimon Snapshot 和 Data Version 兩個概念進行了對齊,也就是說一個 Paimon Snapshot 就對應數(shù)據(jù)的一個版本。
先簡單看一個數(shù)據(jù)對齊的示例。假設(shè)我們有 Job-A 和 Job-B,他們分別基于 Table-A 產(chǎn)出了自己的下游表 Table-B 和 Table-C。當 Job-C 想要對 Table-B 和 Table-C 進行關(guān)聯(lián)查詢的時候,它就可以基于一致性的版本去做自己的 QUERY。
比如 Job-A 基于 Table-A 的 Snapshot-20 產(chǎn)出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20產(chǎn)出了 Table-C 的 Snapshot-15。那么 Job-C 的查詢就應該基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 進行計算,從而實現(xiàn)計算的一致性。
接下來介紹一下數(shù)據(jù)對齊的實現(xiàn),它的實現(xiàn)分為兩個部分。
- 在提交階段,需要去血緣關(guān)系表中查詢上下游表的一致性版本,并且基于查詢結(jié)果給對應的上游表設(shè)置起始的消費位置。
-
在運行階段,按照消費的 Snapshot 來協(xié)調(diào) Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 發(fā)出 Checkpoint 的請求時,會強制要求將 Checkpoint 插入到兩個 Snapshot 的數(shù)據(jù)之間。如果當前的 Snapshot 還沒有完全被消費,這個 Checkpoint 的觸發(fā)會被推遲,從而實現(xiàn)按照 Snapshot 對數(shù)據(jù)進行劃分和處理。
在 Flink 的 Checkpoint 成功之后,它會通知Sink的算子來進行 Table 的 commit。在 commit 完成之后,這份 Snapshot 的數(shù)據(jù)就可以被下游可見了。此時會由 Commit Listener 將數(shù)據(jù)的血緣關(guān)系寫入到 System Table 中,用來記錄這份血緣關(guān)系。
當我們實現(xiàn)上面兩個功能之后,具體有哪些應用場景呢?
- 第一,數(shù)據(jù)血緣的自動化管理。數(shù)據(jù)血緣關(guān)系在整個數(shù)倉中是非常重要的一個部分?;谘夑P(guān)系我們可以快速的進行數(shù)據(jù)溯源,風險評估等。同時也可以基于血緣關(guān)系分析這些表的使用方、使用數(shù)量、數(shù)據(jù)走向,從而進行實際應用價值的評估。
- 第二,查詢一致性的能力,我們可以為 OLAP 查詢自動按照數(shù)據(jù)版本來做數(shù)據(jù)對齊,并且保證查詢結(jié)果的一致性。同時基于一致性數(shù)據(jù)進行開發(fā)和 debug,可以降低開發(fā)和運維成本,不再需要業(yè)務方手動進行多表對齊的操作。
- 第三,數(shù)據(jù)訂正?;跀?shù)據(jù)一致性管理以及數(shù)據(jù)血緣關(guān)系,可以簡化數(shù)據(jù)訂正的過程。首先按照血緣關(guān)系我們可以自動的創(chuàng)建下游需要訂正的表的鏡像表,然后再進行訂正??梢蕴峁﹥煞N訂正方式,全量訂正和增量訂正。
- 全量訂正,可以基于一致性版本的數(shù)據(jù)從上游進行全量消費,產(chǎn)生一個全鏈路的新數(shù)據(jù)。在整個數(shù)據(jù)生產(chǎn)追上延遲之后,可以對表進行一個自動切換。
- 增量訂正,可以考慮和 Flink 的 Savepoint 機制相結(jié)合,從而不用再從零開始去初始化狀態(tài),減少需要回溯的數(shù)據(jù)量。
三、當前進展
下面我們介紹一下目前數(shù)據(jù)一致性管理的階段性進展。
在社區(qū)里,目前我們發(fā)起了相關(guān)的 issue、PIP 以及郵件進行討論,大家感興趣的話可以關(guān)注一下相應的進展。如果有新的需求和想法的話,也歡迎大家一起來交流。
在字節(jié)內(nèi)部,目前我們完成了一個 POC 版本的開發(fā)和測試。在這個版本中,我們提供了一個第三方的外部服務,用來管理血緣關(guān)系,協(xié)調(diào)數(shù)據(jù)版本等。
四、未來規(guī)劃
最后介紹一下在 Streaming Warehouse 上的未來規(guī)劃。
- 第一,端到端延遲優(yōu)化。在 POC 的過程中,我們發(fā)現(xiàn)端到端的延遲很大程度上取決于 Flink Checkpoint 的間隔,同時在內(nèi)部收集一些業(yè)務需求的時候,業(yè)務對端到端延遲要求比較高。這樣會帶來一個問題,當我們降低 Checkpoint 的頻率時,會導致比較多的小文件,這需要做一些權(quán)衡。下一階段我們會著重解決端到端延遲的問題。
- 第二,數(shù)據(jù)訂正能力增強。目前這個是業(yè)務在實時數(shù)倉生產(chǎn)中反饋比較多的痛點,業(yè)務希望數(shù)據(jù)訂正的成本可以足夠低,同時訂正過程產(chǎn)生的中間結(jié)果對外不可見。
- 第三,狀態(tài)復用。在數(shù)倉生產(chǎn)中有很多場景是多表關(guān)聯(lián)。目前在 Flink 中,Join 算子會存儲左右兩條流的數(shù)據(jù)明細,在多表級聯(lián) Join 的場景下,每個 Join 算子都會存儲之前的 Join 結(jié)果,相當于多存儲了一次前面表的明細,會產(chǎn)生非常嚴重的狀態(tài)膨脹的問題。業(yè)務希望這些狀態(tài)可以被復用,也就是說相同表的數(shù)據(jù)只用被存儲一份,這樣的話可以大幅度的減少狀態(tài)存儲的開銷。同時業(yè)務也希望這個中間狀態(tài)是可以被查詢的。假設(shè)這些狀態(tài)可以被存儲到 Paimon 的表中,采用 Lookup Join 的方式去訪問。那么我們就可以使用 Flink 的 SQL 直接查詢中間狀態(tài)。
Q&A
問:血緣關(guān)系解析是基于 Flink 的 calcite 嗎?
答:不是,是基于 FlinkTableFactory 進行實現(xiàn),在創(chuàng)建 DynamicTableSource 和 DynamicTableSink 時,提取相關(guān)的 Table 信息和任務信息,然后寫入到 Paimon 的血緣關(guān)系表中。
問:針對于任務出錯,數(shù)據(jù)訂正,具體是怎么操作的呢?也就是恢復正常的一個處理流程是怎么樣的,大概需要多長時間能夠恢復正常呢?
答:我們的目標是希望數(shù)據(jù)訂正的流程可以在系統(tǒng)內(nèi)自動完成,初期設(shè)想是在訂正時,基于表的血緣關(guān)系對下游表產(chǎn)生相應的鏡像表,然后將任務雙跑在這條鏡像鏈路上,基于數(shù)據(jù)血緣關(guān)系可以實現(xiàn)數(shù)據(jù)仍然按照相同的版本進行處理。在兩條鏈路的延遲基本對齊時,進行任務以及表的切換。處理時間依賴處理的數(shù)據(jù)量,鏈路的復雜度等。
問:大佬有考慮在此基礎(chǔ)上做一個統(tǒng)一的 Paimon 管理服務嗎?例如 Paimon 的元數(shù)據(jù)管理,Compaction 管理,血緣管理等等
答:目前只考慮了實現(xiàn)元數(shù)據(jù)管理、血緣管理等,對于 Compaction 管理,可能更適合在 Table Service 這樣的服務中進行。
問:業(yè)務周期跨度比較大,F(xiàn)link Join 緩存全量的數(shù)據(jù)?
答:Flink 全量 Join 數(shù)據(jù)會在狀態(tài)中存儲 Table 的所有數(shù)據(jù),同時對于級聯(lián) Join 會產(chǎn)生非常嚴重的狀態(tài)膨脹問題。根據(jù) Join 的原理,可以考慮將 Join 實現(xiàn)為 Lookup Join + Delta Join,對于歷史數(shù)據(jù),采用 Lookup join 去查歷史表數(shù)據(jù),而對于最近的增量數(shù)據(jù),將其存儲在狀態(tài)中,通過狀態(tài)查詢進行 Join,這樣可以將大量的全量數(shù)據(jù)存儲在 Paimon 表中,狀態(tài)里只緩存少部分數(shù)據(jù)。這依賴版本管理的能力來區(qū)分數(shù)據(jù)是 Join 歷史數(shù)據(jù)還是增量數(shù)據(jù)。
問:字段血緣關(guān)系會做嗎?要根據(jù) SQL 語法解析的吧
答:暫時不考慮字段血緣關(guān)系的實現(xiàn)。文章來源:http://www.zghlxwxcb.cn/news/detail-626808.html
點擊查看原文視頻 & 演講PPT
到了這里,關(guān)于基于 Flink & Paimon 實現(xiàn) Streaming Warehouse 數(shù)據(jù)一致性管理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!