導(dǎo)讀: 首先做個(gè)自我介紹,我目前在阿里云云計(jì)算平臺(tái),從事研究 Flink 和 Hudi 結(jié)合方向的相關(guān)工作。
目前,F(xiàn)link + Hudi 的方案推廣大概已經(jīng)有了一年半的時(shí)間,在國(guó)內(nèi)流行度也已比較高,主流的公司也會(huì)嘗試去迭代他們的數(shù)倉(cāng)方案。所以,今天我介紹的主題是 Flink 和 Hudi 在數(shù)據(jù)湖 Streaming 方向的一些探索和實(shí)踐,將會(huì)圍繞以下四點(diǎn)展開(kāi):
- Apache Hudi 背景介紹
- Flink Hudi 設(shè)計(jì)
- Hudi 應(yīng)用場(chǎng)景
- Hudi RoadMap
點(diǎn)擊查看直播回放
Apache Hudi背景介紹
首先和大家分享下數(shù)據(jù)湖發(fā)展的歷史背景,以及Hudi的基本特性。
1. 數(shù)據(jù)湖發(fā)展的歷史背景
在我個(gè)人觀點(diǎn)看來(lái),傳統(tǒng)的數(shù)倉(cāng)方案(如 Hive)其實(shí)本身也是數(shù)據(jù)湖,而且我會(huì)把Hudi、Iceberg、Delta Lake 都看成是數(shù)倉(cāng)下一代新的解決方案,而不僅僅只是一種湖格式。那為什么近一年來(lái)會(huì)有數(shù)據(jù)湖這一新的數(shù)倉(cāng)形態(tài)的誕生?
伴隨著目前云存儲(chǔ)(尤其是對(duì)象存儲(chǔ))逐步成熟的大背景,數(shù)據(jù)湖的解決方案也會(huì)逐步往云原生靠近。如圖一所示,湖格式會(huì)適配云廠商的對(duì)象存儲(chǔ),做云廠商多云和云廠商用例,同時(shí)適配比較流行的大數(shù)據(jù)計(jì)算框架(如Spark、Flink),以及查詢端的 Presto、trino 以及傳統(tǒng) Hive 引擎,因此誕生了這樣一套新的數(shù)倉(cāng)解決方案。
2. Hudi 的四大核心特性
由上可知,Hudi 作為下一代的數(shù)倉(cāng)解決方案,借助上下游的計(jì)算和查詢引擎,實(shí)現(xiàn)替代傳統(tǒng) Hive 離線數(shù)倉(cāng)的一套新方案,其核心特色整體可以總結(jié)為以下四點(diǎn):
- 開(kāi)放性
開(kāi)放性體現(xiàn)在兩個(gè)方面:
第一方面,上游支持多種數(shù)據(jù)源格式。比如傳統(tǒng)數(shù)據(jù)庫(kù)的 change log 日志、消息隊(duì)列 log 等傳輸方式,都會(huì)在 source 端會(huì)有非常豐富的支持。****
第二方面,下游查詢端也同樣支持多種查詢引擎。像主流的 OLAP 引擎 Presto、國(guó)內(nèi)比較火的 Starrocks、云廠商的 amazon redshift、數(shù)據(jù)分析產(chǎn)品 impala,都會(huì)對(duì)接到這樣一套數(shù)倉(cāng)架構(gòu)里面。
所以開(kāi)放性是 Hudi 的第一個(gè)特點(diǎn)。
- 豐富的事務(wù)支持
Hudi 對(duì)事務(wù)的支持程度,會(huì)比原來(lái) Hive 數(shù)倉(cāng)的要求更高,更豐富。其中核心特點(diǎn)是支持在文件存儲(chǔ)布局上做更新。在傳統(tǒng)基于 Hive 的 T + 1 更新方案中,數(shù)據(jù)重復(fù)度會(huì)比較高,只能實(shí)現(xiàn)天級(jí)別的數(shù)據(jù)新鮮度。并且伴隨著業(yè)務(wù)需求越來(lái)越復(fù)雜,實(shí)時(shí)性要求越來(lái)越高,對(duì)數(shù)倉(cāng)存儲(chǔ)體系提出了更高的要求,對(duì)端到端的數(shù)據(jù)新鮮度要求做到分鐘級(jí)或者是秒級(jí)。
其次是更新效率要求提高,不要每次都去 overwrite 整張表或者整個(gè) partition 去更新,而是能夠精確到文件粒度的局部更新來(lái)提升存儲(chǔ)和計(jì)算效率。Hudi很好地滿足了這些需求,因此,對(duì)ACID語(yǔ)義的增強(qiáng)是這套數(shù)倉(cāng)架構(gòu)的第二大特點(diǎn)。
- 基于ACID語(yǔ)義的增量處理
在我看來(lái),第三個(gè)亮點(diǎn)是在ACID語(yǔ)義基礎(chǔ)上衍生出來(lái)的增量處理,尤其Hudi提出的TimeTravel概念,或者直接對(duì)接Flink,Spark Streaming等流式處理引擎的方式,不管是近實(shí)時(shí)還是常駐的Streaming服務(wù),本質(zhì)都是一種流式消費(fèi),都可以理解為一種增量ETL處理。相對(duì)于傳統(tǒng)batch調(diào)度,在計(jì)算上會(huì)更加高效,尤其像Flink這種有狀態(tài)的計(jì)算框架,會(huì)復(fù)用之前的計(jì)算結(jié)果,直接實(shí)現(xiàn)端到端的全鏈路增量處理。其次,在數(shù)據(jù)新鮮度上有一個(gè)數(shù)量級(jí)的提升,從“天級(jí)別”提升到“分鐘級(jí)別”。
比方說(shuō),國(guó)內(nèi)目前有些實(shí)踐用戶會(huì)嘗試使用Flink計(jì)算框架做湖表的Streaming消費(fèi),直接通過(guò)一套增量ETL鏈路去分析從源端注入過(guò)來(lái)的數(shù)據(jù),構(gòu)建傳統(tǒng)數(shù)倉(cāng)的分層。還有一點(diǎn),很多小伙伴會(huì)好奇TimeTravel這種incremental的查詢?cè)O(shè)計(jì),查詢兩個(gè)快照之間的增量數(shù)據(jù),有什么用途?如果你是批形式調(diào)度的查詢,主流場(chǎng)景是ADS端到下游的同步,比如說(shuō)將數(shù)倉(cāng)的生產(chǎn)結(jié)果同步到其他庫(kù)表(如ES,Mysql)可通過(guò)這種TimeTravel定期做這種批量同步,當(dāng)你對(duì)ADS的同步時(shí)間度要求沒(méi)這么高,就可以用這種冪等TimeTravel的查詢方式比較高效地同步到其他下游端。
以上三點(diǎn),是相對(duì)于主流Hive架構(gòu)地三個(gè)核心區(qū)別,也是目前國(guó)內(nèi)外湖倉(cāng)項(xiàng)目正在努力的方向。
- 智能化調(diào)度
再補(bǔ)充一點(diǎn),在Hudi里面,會(huì)盡量?jī)?yōu)化文件布局,將小文件管理這種數(shù)據(jù)治理的方案做到框架內(nèi)部,實(shí)現(xiàn)智能化調(diào)度。這是Hudi區(qū)別于其他數(shù)倉(cāng)方案如Delta Lake,IceBerg的核心特點(diǎn)。
Flink + Hudi設(shè)計(jì)
1. Hudi寫入pipeline(多算子組成的微服務(wù)架構(gòu))
從圖二中可以看出,Hudi寫入pipeline是一個(gè)Serverless的微服務(wù)架構(gòu),核心是在整個(gè)pipeline的服務(wù)起來(lái)之后,不管是Flink,還是Spark Streaming,整套服務(wù)可以對(duì)表本身能達(dá)到自治理的狀態(tài)。所以,不光光考慮數(shù)據(jù)高效寫入,同時(shí)還需要考慮寫入過(guò)程中的文件管理,盡量避免產(chǎn)生太多小文件從而優(yōu)化查詢端的效率。通過(guò)定期的文件合并,文件清理,避免出現(xiàn)小文件數(shù)量爆炸式增長(zhǎng)的情況出現(xiàn)。
另一方面,是ACID事務(wù)性, 尤其是完成一個(gè)待更新的ACID事務(wù),需考慮多方面因素。當(dāng)單job或者單節(jié)點(diǎn)要fail over時(shí),Hudi可以保證快速找到之前寫入的錯(cuò)誤數(shù)據(jù),并且實(shí)現(xiàn)rollback回滾。所以,Hudi的事務(wù)層支持是目前三個(gè)湖存儲(chǔ)里面做的最完善,最高效的。
以copy on write的具體實(shí)現(xiàn)為例,會(huì)將上游的SQL原生數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換成Hudi的數(shù)據(jù)結(jié)構(gòu),為了支持并發(fā)寫入,我們會(huì)對(duì)每個(gè)shuffle后的數(shù)據(jù)分bucket。
主要有兩點(diǎn):
第一個(gè)是新增數(shù)據(jù),會(huì)盡量寫入到當(dāng)前已存在的比較小的bucket里面。同時(shí),為了避免生成小文件,也會(huì)盡量保證每個(gè)bucket的大小和預(yù)期大小相同。
第二點(diǎn)是更新數(shù)據(jù),Hudi設(shè)計(jì)了key主鍵,每個(gè)key的消息都維護(hù)在一個(gè)bucket內(nèi)部,每次更新都會(huì)寫入到之前的bucket里面。而IceBerg,Delta Lake就只管寫入,不會(huì)去管文件布局,因此他們會(huì)把查詢端的一些合并和清理做得很重,所以查詢效率會(huì)比較低。相比之下,Hudi復(fù)雜的寫入過(guò)程和bucket策略就是在權(quán)衡和考慮讀寫效率。這里所說(shuō)的bucket概念,有點(diǎn)類似Snowflake里的micro partition概念,會(huì)在傳統(tǒng)的partition分區(qū)下面再細(xì)化,以文件粒度來(lái)維護(hù)某個(gè)range下消息的生命周期。以更細(xì)粒度去維護(hù)生命周期,可有效提升數(shù)據(jù)更新和查詢效率。
第二個(gè)算子之后,數(shù)據(jù)根據(jù)每個(gè)bucket做好分區(qū),我們會(huì)按照bucket ID做一遍shuffle,交給write task去寫入。為何要按照bucket ID重新shuffle?主要是為了維護(hù)兩個(gè)write task不能同時(shí)并發(fā)修改一個(gè)bucket的更新語(yǔ)義,否則容易造成更新沖突。
所以,從整體上看,這三個(gè)算子可以高效保證并發(fā)寫入、更新??梢员容^明顯看到,第二個(gè)算子的并發(fā)度其實(shí)決定了整個(gè)更新的并發(fā)度,決定當(dāng)前能夠同時(shí)更新和寫入的bucket數(shù)量,而后面的算子可以自由獨(dú)立地?cái)U(kuò)展。從實(shí)踐經(jīng)驗(yàn)推薦第二個(gè)和第三個(gè)算子的并發(fā)設(shè)置一樣,當(dāng)吞吐量不是很高的時(shí)候,一個(gè)bucket交給一個(gè)write task去寫,吞吐量比較高的時(shí)候,可能一個(gè)bucket的write task可能會(huì)分多個(gè),可以調(diào)整到1:2的比例。
后臺(tái)還會(huì)啟動(dòng)clean commits的清理任務(wù)。數(shù)據(jù)commit操作發(fā)生在coordinator組件內(nèi),保證每個(gè)write task的commit大概對(duì)齊了checkpoint之后,數(shù)據(jù)才會(huì)flush出去,并且有一部分元數(shù)據(jù)信息,會(huì)統(tǒng)一提交給coordinator,coordinator收集到統(tǒng)計(jì)信息之后,會(huì)去結(jié)合checkpoint完成的事件做一次提交,真實(shí)的提交是在coordinator內(nèi)。當(dāng)coordinator完成提交之后,Hudi表會(huì)發(fā)起一個(gè)新的事務(wù),只有當(dāng)write task看到這個(gè)新的事務(wù),才能夠發(fā)起新事務(wù)的寫入動(dòng)作。所以中間存在一個(gè)異步等待的過(guò)程,類似于一個(gè)小型的狀態(tài)機(jī)。
而Flink的快照所保證的語(yǔ)義其實(shí)是一個(gè)best effort語(yǔ)義,一旦收到某個(gè)checkpoint的成功事件,就標(biāo)志前面的狀態(tài)都是成功的,但中間可能存在checkpoint被abort情況。
因?yàn)镠udi需要保證每個(gè)寫入的完整性和Exactly once語(yǔ)義,就需要考量中間的寫入不能越界,比如說(shuō)checkpoint的事件數(shù)據(jù)不能寫入下個(gè)checkpoint,這樣Exactly once語(yǔ)義就沒(méi)辦法保證。
在0.11版本會(huì)嘗試做一些優(yōu)化,比方說(shuō)checkpoint被abort之后的狀態(tài)能否復(fù)用。里面涉及一個(gè)狀態(tài)的切換,相對(duì)會(huì)比較復(fù)雜。不像Spark Streaming每次都是微批的抽象,每次先發(fā)起一個(gè)任務(wù),天然保證了exactly once,容錯(cuò)語(yǔ)義交給框架。Flink怎樣把這個(gè)異步算法和很強(qiáng)的exactly once語(yǔ)義結(jié)合在一起,是這套架構(gòu)的一個(gè)難點(diǎn)所在。
2. 小文件策略
接下來(lái),我們仔細(xì)看看文件寫入的第二個(gè)算子bucket assign的具體決策。即新消息如何去選擇放到哪個(gè)bucket,如圖三所示,分兩種情況介紹。
首先,左側(cè)框圖中有三個(gè)bucket,藍(lán)色代表當(dāng)前已經(jīng)存儲(chǔ)文件的大小,如果是insert數(shù)據(jù),策略是每次選擇當(dāng)前剩余空間最多的bucket寫入。為何不考慮選擇剩余空間最少的bucket呢?因?yàn)樾枰紤]到COW的寫放大問(wèn)題,效率比較低。更新數(shù)據(jù)時(shí),先找到維護(hù)當(dāng)前key的bucket,然后寫入。這樣并不會(huì)造成文件大小的無(wú)限增長(zhǎng),因?yàn)槊總€(gè)record記錄更新前后的大小基本近似,文件大小不會(huì)有明顯的變化。影響文件大小的主要是insert數(shù)據(jù),文件大小會(huì)設(shè)置閾值,維持在120M左右。
圖中右側(cè)框圖是一個(gè)比較極端的情況,兩個(gè)bucket只剩下很小的寫入空間,考慮到寫放大影響,會(huì)重新創(chuàng)建一個(gè)新的bucket重新寫入。
為了提高并發(fā)寫的吞吐量,會(huì)給每個(gè)bucket assign task分配一套獨(dú)立的bucket管理策略,并利用Hash算法把bucket ID以固定的規(guī)則hash到每個(gè)bucket assign task 下面,做到了并發(fā)決策。因此,控制bucket assign task并發(fā)度就相對(duì)控制了寫入小文件數(shù)量,在寫入吞吐量和小文件之間的權(quán)衡。
3. 全量 + 增量 讀取
介紹完數(shù)據(jù)寫入過(guò)程,再看下數(shù)據(jù)讀取的流讀部分。流讀的全量讀和增量讀是如何實(shí)現(xiàn)的? 如圖四所示,Hudi中TimeLine保存每個(gè)事務(wù)提交的毫秒時(shí)間戳,每個(gè)時(shí)間戳?xí)?duì)應(yīng)一個(gè)快照版本,會(huì)記錄在元數(shù)據(jù)里面。全量讀時(shí)會(huì)掃全表的文件,會(huì)把整個(gè)全表的文件掃描出來(lái),當(dāng)你沒(méi)有配置內(nèi)置的Metadata索引表時(shí),會(huì)直接掃全表,把文件系統(tǒng)中所有的文件都找出來(lái)。如果啟用了Metadata表,就會(huì)在Metadata表(KV存儲(chǔ))里掃描這個(gè)文件信息,以相對(duì)比較高的效率掃描全表文件,然后發(fā)給下游,并且增量的部分會(huì)定期(默認(rèn)60s)監(jiān)聽(tīng)掃描TimeLine觀察有沒(méi)有新的commits,同步發(fā)給下游讀寫,每次增量的部分會(huì)基于上一次下發(fā)的時(shí)間線點(diǎn)位,然后一直查找到當(dāng)前最新的commit time。
Split mornitor算子負(fù)責(zé)維護(hù)這樣一套監(jiān)聽(tīng)增量文件信息的規(guī)則,下發(fā)給真正執(zhí)行讀取的task。
最近在master版本也支持了批模式的TimeTravel查詢(某個(gè)時(shí)間段的點(diǎn)查),以前的版本雖然支持但是會(huì)有些問(wèn)題,比如增量部分meta文件如果被archive、或者被清理,數(shù)據(jù)完整性就沒(méi)有保證。新版本在保證在讀取效率前提下,通過(guò)實(shí)現(xiàn)兩個(gè)快照、commit之間的批模式增量讀取方式應(yīng)對(duì)這兩個(gè)問(wèn)題,保證數(shù)據(jù)完整度。
Hudi應(yīng)用場(chǎng)景
目前Flink + Hudi在國(guó)內(nèi)已經(jīng)是非常流行的技術(shù)架構(gòu),這邊總結(jié)三個(gè)應(yīng)用場(chǎng)景向大家介紹一下。
1. 近實(shí)時(shí)DB數(shù)據(jù)入倉(cāng)/湖
這套架構(gòu)的DB數(shù)據(jù)入湖入倉(cāng)核心特色是把原來(lái)T + 1的數(shù)據(jù)新鮮度提升到分鐘級(jí)別。 數(shù)據(jù)新鮮度通過(guò)目前比較火的以Debezium、Maxwell為代表的CDC(change Data Capture)技術(shù)實(shí)現(xiàn)。以Streaming近實(shí)時(shí)的方式同步到數(shù)倉(cāng)里面。在傳統(tǒng)的Hive數(shù)倉(cāng)中想保證實(shí)時(shí)是非常困難的,尤其是文件更新,湖表實(shí)時(shí)寫入更新,基本不可能實(shí)現(xiàn)。CDC技術(shù)對(duì)數(shù)倉(cāng)本身存儲(chǔ)是有要求的,首先是更新效率得足夠高,能夠支持以Streaming方式寫入,并且能夠非常高效的更新。尤其是CDC log在更新過(guò)程還可能會(huì)亂序,如何保證這種亂序更新的ACID語(yǔ)義,是有很高要求的,當(dāng)前能滿足亂序更新的湖格式只有Hudi能做到,而且Hudi還考慮到了更新的效率問(wèn)題,是目前來(lái)說(shuō)比較先進(jìn)的架構(gòu)。
圖五下方的方案相比上面的方案,比較適合目前體量比較大(每天增量能達(dá)到億級(jí)別地)、數(shù)據(jù)平臺(tái)比較健全的公司,中間有一套統(tǒng)一的數(shù)據(jù)同步方案(匯總不同源表數(shù)據(jù)同步至消息隊(duì)列),消息隊(duì)列承擔(dān)了數(shù)據(jù)的容錯(cuò)、容災(zāi)、緩存功能。同時(shí),這套方案的擴(kuò)展性也更加好。通過(guò)kafka的topic subscribe方式,可以比較靈活地分發(fā)數(shù)據(jù)。
2. 近實(shí)時(shí)OLAP
第二個(gè)場(chǎng)景是近實(shí)時(shí)的OLAP場(chǎng)景,分鐘級(jí)別的端到端數(shù)據(jù)新鮮度,同時(shí)又非常開(kāi)放的OLAP查詢引擎可以適配。其實(shí)是對(duì)kappa架構(gòu)或者是原先Streaming數(shù)倉(cāng)架構(gòu)的一套新解法。在沒(méi)有這套架構(gòu)之前,實(shí)時(shí)分析會(huì)跳過(guò)Hudi直接把數(shù)據(jù)雙寫到OLAP系統(tǒng)中,比如ClickHouse、ES、MongoDB等。當(dāng)倉(cāng)存儲(chǔ)已經(jīng)可以支持高效率分級(jí)別更新,能夠?qū)覱LAP引擎,那么這套架構(gòu)就被大大簡(jiǎn)化,首先不用雙寫,一份數(shù)據(jù)就可以保證only one truth語(yǔ)義,避免雙寫帶來(lái)數(shù)據(jù)完整性的問(wèn)題。其次因?yàn)楹袷奖旧硎欠浅i_(kāi)放的,在查詢端引擎可以有更多選擇,比如Hudi就支持Presto、trino、Spark、Starrocks、以及云廠商的redshift引擎,會(huì)有非常高的靈活度。、
所以,這種近實(shí)時(shí)的OLAP架構(gòu),總結(jié)就是以下兩點(diǎn): ①統(tǒng)一上游存儲(chǔ)端;②開(kāi)放下游查詢端。
但這套架構(gòu)的數(shù)據(jù)新鮮度大概是5分鐘級(jí)別,如果要做到像kappa秒級(jí)別的架構(gòu)的話,目前Hudi還是不太適合的,因?yàn)楸旧肀容^依賴Flink的CheckPoint機(jī)制(支持端到端的exactly once語(yǔ)義),所以不能做到高頻次的提交。
3. 近實(shí)時(shí)ETL
第三個(gè)場(chǎng)景是目前比較前沿的架構(gòu),在國(guó)內(nèi)也慢慢開(kāi)始嘗試這套架構(gòu)。當(dāng)數(shù)據(jù)源數(shù)據(jù)體量本身不大的時(shí)候,比方說(shuō)源頭過(guò)來(lái)的并不是kafka,可能源頭只是一個(gè)Mysql的binlog,QPS每秒可能也就幾百。那么這套架構(gòu)是一個(gè)非常穩(wěn)定且省事的架構(gòu),不光光是實(shí)現(xiàn)了這種端到端的增量處理,同時(shí)還解決中間數(shù)據(jù)入倉(cāng)的需求。其實(shí)就是提供了兩套抽象,首先承擔(dān)了一個(gè)數(shù)倉(cāng)中間存儲(chǔ)的一個(gè)存儲(chǔ)抽象,把數(shù)據(jù)直接以湖格式入倉(cāng);第二個(gè)是提供Queue能力,類似于Kafka這種消息隊(duì)列的能力,可用Streaming方式增量消費(fèi),并且可以在其上做一些增量計(jì)算。 就這一套架構(gòu)直接統(tǒng)一原來(lái)的Lambda和kappa架構(gòu),就是kafka的存儲(chǔ)抽象加數(shù)倉(cāng)文件的存儲(chǔ)抽象合并在一個(gè)存儲(chǔ)抽象里面,同時(shí)沒(méi)有增加過(guò)多的存儲(chǔ)成本。大家可能后面用的都是對(duì)象存儲(chǔ)或者是以廉價(jià)存儲(chǔ)的形式存在的HDFS。
整套架構(gòu)解決了兩個(gè)問(wèn)題,第一是雙寫問(wèn)題。 在Lambda架構(gòu)下,數(shù)據(jù)先寫kafka,然后入倉(cāng),保證這兩份數(shù)據(jù)的一致性語(yǔ)義比較難。而且kafka開(kāi)啟exactly once寫入后吞吐量會(huì)下降很多,Kafka和HDFS之間的數(shù)據(jù)如何保證一致性呢?有人會(huì)理解去流讀kafka,把kafka數(shù)據(jù)再起一個(gè)job同步到HDFS,這樣計(jì)算資源、維護(hù)作業(yè)、同步成本都是原來(lái)的兩倍。
第二個(gè)解決的問(wèn)題是中間層查詢需求。中間層數(shù)據(jù)直接入倉(cāng),并且不是以高效率方式更新,當(dāng)需要對(duì)中間層DWD表做一些join操作時(shí),可以直接和引擎對(duì)接,而不需要去考慮說(shuō)Lambda架構(gòu)T+1更新效率的問(wèn)題。湖格式分鐘級(jí)時(shí)效性很大程度緩解了這個(gè)問(wèn)題。
除此之外,這套架構(gòu)還有個(gè)好處,可以根據(jù)不同地應(yīng)用場(chǎng)景選擇豐富的OLAP查詢引擎,直接以外表的方式接入庫(kù)存儲(chǔ),很方便地進(jìn)行OLAP分析。
4. 阿里云VVP實(shí)時(shí)入湖
接下來(lái),簡(jiǎn)單講一下目前阿里云VVP產(chǎn)品實(shí)時(shí)入湖的集成,主要還是入湖狀態(tài),阿里云內(nèi)置Flink版本會(huì)有一個(gè)內(nèi)置Hudi connector,大家可以通過(guò)FlinkSQL方式快速構(gòu)建入湖任務(wù),直接寫湖表,對(duì)接Hudi CDC connector或者對(duì)接kafka CDC format,實(shí)現(xiàn)數(shù)據(jù)快速入湖。
并且在入湖過(guò)程中,提供了商業(yè)版特性,如schema evolution。CE,CTAS語(yǔ)法支持schema evolution然后同時(shí)我們會(huì)主推DLF catalog元數(shù)據(jù)管理組件,DLF catalog會(huì)和EMR的DLF無(wú)縫集成,若是EMR通過(guò)spark寫入,這邊也可以看到,F(xiàn)link入湖任務(wù)寫完之后也可以管理,通過(guò)DLF組件,可以直接通過(guò)EMR查詢端引擎分析Hudi格式數(shù)據(jù)。
這是目前的一套推給商業(yè)化用戶的技術(shù)方案,入湖通過(guò)VVP服務(wù),分析通過(guò)EMR,后期VVP可能會(huì)集成更多能力,如流批統(tǒng)一,滿足用戶的流讀需求。
近期Hudi RoadMap
如圖九所示,我將簡(jiǎn)單介紹下近期Hudi 0.12版本以及1.0版本將會(huì)做的一些feature。
首先,我們會(huì)推出類似于Delta2.0的CDC Feed功能,因?yàn)槟壳拔覀冎С值腃DC要求輸入必須是一個(gè)CDC,Hudi會(huì)用CDC格式把它存下來(lái)。CDC Feed的區(qū)別就是不用保證整體的輸入是CDC格式,即使出現(xiàn)absert語(yǔ)義,或者CDC的中間數(shù)據(jù)有丟失,可以完整還原出CDC給主端。這個(gè)特性在讀寫吞吐量和資源上做些權(quán)衡,不會(huì)像目前這套架構(gòu)處理CDC那么高效。
第二點(diǎn)是Meta Service服務(wù)。把元數(shù)據(jù)的管理插件化,通過(guò)統(tǒng)一的Meta Service plugable形式,統(tǒng)一管理Hudi上的表和任務(wù)。
第三點(diǎn),我們目前還在規(guī)劃做Secondary Index二級(jí)索引。因?yàn)樵谀壳暗膍aster版本中,F(xiàn)link Spark都已實(shí)現(xiàn)data skipping能力(在寫入時(shí),如果用戶開(kāi)啟Meta Data表,同時(shí)開(kāi)啟data skipping,會(huì)額外記錄每個(gè)column的統(tǒng)計(jì)信息),最典型的是每個(gè)column會(huì)建一個(gè)Max, Min,開(kāi)啟一個(gè)元數(shù)據(jù)的加速,提升文件級(jí)別的查詢效率。后續(xù)還會(huì)支持類似于數(shù)據(jù)庫(kù)的二級(jí)索引,為某個(gè)專門的column實(shí)現(xiàn)類似于LSM的抽象索引,構(gòu)建適用于點(diǎn)查場(chǎng)景的高效索引方案。
最后,后續(xù)我們還會(huì)做類似于特征工程的按列更新功能開(kāi)發(fā),類似于Clickhouse的Merge Tree抽象,獨(dú)立存儲(chǔ)某個(gè)column。 因?yàn)樵跈C(jī)器學(xué)習(xí)的特征工程中大量特征需要成千上萬(wàn)個(gè)字段,每次生成一個(gè)特征都需要更新一個(gè)column,所以要求單個(gè)column要具備高效的更新能力。為了適配這樣的場(chǎng)景,Hudi也會(huì)持續(xù)去探索。
答疑
Q1:直接使用Hudi存儲(chǔ)更新,相比直接CDC到Starrocks,這兩種方案哪一個(gè)更好?感覺(jué)Starrocks的QPS應(yīng)該會(huì)高于Hudi的更新速度。
A1:的確是這樣,因?yàn)镾tarrocks service 會(huì)使用類似于LSM的高效主鍵索引,并且內(nèi)存里面做partition策略,維護(hù)比較多的二級(jí)索引,元數(shù)據(jù)信息。而且,最重要的一點(diǎn)就是在寫入更新main table時(shí)會(huì)使用攢批的操作,先把多次寫入?yún)R入buffer然后進(jìn)行一次flash,并且在數(shù)據(jù)的flush上也可以攢批,這也是為什么Starrocks更新效率上更高的原因。
但同時(shí),因?yàn)橛辛藄erver集群,會(huì)帶來(lái)兩個(gè)問(wèn)題,首先,會(huì)帶來(lái)較高的運(yùn)維成本,其次,內(nèi)存模式相比于Hudi的serverless格式的開(kāi)銷會(huì)更大。
Hudi格式在開(kāi)放性上,對(duì)于Starrocks會(huì)有一定優(yōu)勢(shì),不光可以對(duì)接Starrocks、還可以對(duì)接Presto、Spark等主流OLAP引擎。
這就是他們的區(qū)別,兩種方案的側(cè)重點(diǎn)不同,還需要根據(jù)實(shí)際應(yīng)用場(chǎng)景進(jìn)行選擇。如果只是做OLAP應(yīng)用,Starrocks更適合。但如果想構(gòu)建數(shù)倉(cāng),使用Starrocks代替Hudi的成本應(yīng)該太高了。因?yàn)镠udi面向的場(chǎng)景主要時(shí)數(shù)倉(cāng),迭代的主要時(shí)Hive傳統(tǒng)數(shù)倉(cāng),更有優(yōu)勢(shì)。
Q2:流量數(shù)據(jù)入湖場(chǎng)景下,使用MOR(Merge on Read)表,還是COW(Copy on Write)表更合適?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-794815.html
A2:如果流量數(shù)據(jù)體量比較大,建議使用MOR表。以目前的實(shí)測(cè)方案,QPS不超過(guò)兩萬(wàn),COW表還是可以支撐的。超過(guò)兩萬(wàn)之后,比較推薦MOR online compaction方式。如果QPS更高,那可能需要把壓縮任務(wù)再獨(dú)立出來(lái),這是目前能給到的一個(gè)方案。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-794815.html
到了這里,關(guān)于實(shí)時(shí)數(shù)據(jù)湖 Flink Hudi 實(shí)踐探索的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!