?
這次在 6月 Meetup 為大家?guī)淼氖荢caleph 基于 Apache SeaTunnel (Incubating) 的數(shù)據(jù)集成介紹,希望你有所收獲。
本次演講主要包括五個部分:
關(guān)于Scaleph
Scaleph架構(gòu)&功能簡介
SeaTunnel社區(qū)貢獻(xiàn)
系統(tǒng)演示
開發(fā)計劃
Apache SeaTunnel (Incubating)?
王奇
Apache SeaTunnel Contributor
搜索推薦工程師,大數(shù)據(jù) Java 開發(fā)
01 Scaleph的緣起
我最早是從事搜索推薦工作,在團(tuán)隊里面負(fù)責(zé)維護(hù)Dump系統(tǒng),主要是為我們的搜索引擎提供喂數(shù)據(jù)的功能,先給大家介紹在維護(hù)過程中主要的5個痛點問題:
及時性和穩(wěn)定性
搜索推薦是電商平臺的核心在線系統(tǒng),尤其是對數(shù)據(jù)的及時性和穩(wěn)定性要求非常高。由于搜索推薦會接收整個電商平臺 C 端的絕大部分流量,所以一旦服務(wù)出現(xiàn)波動的時候,可能就造成服務(wù)受損,導(dǎo)致用戶的體驗大打折扣。
業(yè)務(wù)復(fù)雜/大寬表設(shè)計
Dump系統(tǒng)會將電商平臺的商品、類目、品牌、店鋪、商品標(biāo)簽、數(shù)倉的實時/離線數(shù)據(jù)及模型數(shù)據(jù)會經(jīng)過一系列的預(yù)處理,最終輸出成一張大寬表,在這個過程中,業(yè)務(wù)的復(fù)雜性和多變性,會侵入到Dump系統(tǒng)中來,所以應(yīng)對的技術(shù)挑戰(zhàn)相對就更高了。
全量+實時索引
全量索引每天跑一次,主要目的是更新 T+1 頻率更新的數(shù)據(jù)。當(dāng)全量索引結(jié)束之后,我們會通過實時索引去刷新需要實時更新的數(shù)據(jù),比如說商品的價格、庫存變動相關(guān)的信息。
我們的上游數(shù)據(jù)來源非常多,有消息隊列、數(shù)據(jù)庫、大數(shù)據(jù)相關(guān)的存儲以及 dubbo 接口,由于是大寬表設(shè)計,以商品索引為例,大寬表會以商品為主,如果是店鋪索引,會以店鋪為主,根據(jù)數(shù)據(jù)的不同,上游的數(shù)據(jù)變動不一定是商品或店鋪維度的,數(shù)據(jù)也會產(chǎn)生一定的聯(lián)動更新。
搜索推薦服務(wù)當(dāng)時也承擔(dān)著C端絕大部分的流量,當(dāng)公司其他團(tuán)隊的性能跟不上的時候,他們一般會把數(shù)據(jù)通過Dump系統(tǒng)送到搜索引擎,然后我們團(tuán)隊代替他們返回給Web頁面,避免后續(xù)對他們發(fā)起二次請求調(diào)用。
同時,如果其他團(tuán)隊的業(yè)務(wù)系統(tǒng)產(chǎn)生了臟數(shù)據(jù),也需要Dump系統(tǒng)做數(shù)據(jù)保護(hù),防止數(shù)據(jù)外泄給C端用戶造成不好的影響,所以開發(fā)維護(hù)中的時候,也有很大的難度。
02 為什么引入Flink?
作為國內(nèi) Flink 的早期使用者,阿里巴巴在搜索推薦領(lǐng)域擁有悠久的歷史和成功的經(jīng)驗,在搜索推薦團(tuán)隊開發(fā)維護(hù) Dump 系統(tǒng)的職業(yè)經(jīng)歷促使我開始關(guān)注使用Flink做A/B實驗的報表、數(shù)據(jù)實時流之外的相關(guān)工作,主要也就是用Flink來實現(xiàn)Dump系統(tǒng)為搜索去提供Dump平臺的工作,使用Flink做數(shù)據(jù)集成有5個優(yōu)點:
天然的分布式支持:Flink支持多種部署和運行方式,單機、yarn、Kubernetes;
低延遲、海量吞吐:在眾多大廠中應(yīng)用廣泛;
生態(tài)支持:Flink提供了眾多開箱即用的connector,支持csv、avro數(shù)據(jù)格式,kafka、pulsar等消息系統(tǒng)以及眾多的存儲系統(tǒng),和大數(shù)據(jù)生態(tài)緊密結(jié)合;
基于分布式輕量異步快照機制實現(xiàn)exactly-once語義,為任務(wù)的失敗、重啟、遷移、升級等提供數(shù)據(jù)一致性保障;
metrics。Flink除了自身提供的 metrics 外,metrics 框架可以讓用戶為任務(wù)開發(fā)自定義的 metrics,豐富監(jiān)控指標(biāo);
03 為什么選擇SeaTunnel?
后來接觸到 SeaTunnel 的時候,很喜歡 SeaTunnel 的設(shè)計理念!SeaTunnel 是運行在 Flink 和Spark 之上,高性能和分布式海量數(shù)據(jù)的下一代集成框架。
重要的是它是開箱即用的,并且針對現(xiàn)有的生態(tài)可以實現(xiàn)無縫集成,因為運行在 Flink 和 Spark 之上,可以很方便地接入公司現(xiàn)有的 Flink 和 Spark 的基礎(chǔ)設(shè)施。另一方面 SeaTunnel 也有很多的生產(chǎn)案例,在進(jìn)入 Apache 基金會孵化之后,社區(qū)非?;钴S,未來可期。
04 關(guān)于Scaleph
項目出發(fā)點
我們最開始的想法就是為 SeaTunnel 提供 Web 頁面,能夠做一個數(shù)據(jù)集成的開源系統(tǒng)。目前我們最主要的目標(biāo)還是想為 SeaTunnel 做一個開源可視化的數(shù)據(jù)開發(fā)和管理系統(tǒng),后面期望 Scaleph 能夠最大程度的降低實時和離線數(shù)據(jù)任務(wù)的開發(fā)門檻,為開發(fā)人員提供一站式的數(shù)據(jù)開發(fā)平臺。
項目亮點
在真正的生產(chǎn)應(yīng)用中,進(jìn)行數(shù)據(jù)集成的時候,以可視化任務(wù)編排或 SQL 開發(fā)為數(shù)據(jù)集成的主要形式,我們認(rèn)為 Drag and Drop 可視化任務(wù)編排可以最大程度減輕用戶做數(shù)據(jù)集成的負(fù)擔(dān);
另外就是實現(xiàn)對作業(yè)進(jìn)行多版本管理,數(shù)據(jù)源的支持;
-
Flink集群支持多版本/多部署環(huán)境;
-
實時/周期任務(wù)也有相關(guān)的支持。
上面是我們系統(tǒng)的架構(gòu)圖,用戶主要使用 Web UI,通過作業(yè)管理功能封裝的 SeaTunnel 算子,用戶在頁面進(jìn)行拖拉拽配置,系統(tǒng)自動生成SeaTunnel的配置文件,最后通過資源管理中用戶上傳的資源 jar 包一起通過 Flinkful 庫提交到 Flink 集群中。資源管理的資源 jar 包的存在目的是支持用戶可以上傳自已研發(fā)的相關(guān) jar 包,補足SeaTunnel 相關(guān)的缺陷,或?qū)eaTunnel和Flink本身的功能進(jìn)行增強!
我們用 quartz 開發(fā)了一個調(diào)度任務(wù),當(dāng)任務(wù)提交到 Flink 后,任務(wù)會定時去 Flink 集群將任務(wù)信息拉過來,存儲到 MySQL 里面,最終用戶在 Web UI 頁面可以看到任務(wù)相關(guān)運行信息。
Scaleph功能簡介(數(shù)據(jù)開發(fā))
01?項目管理
主要是用戶創(chuàng)建數(shù)據(jù)同步任務(wù)的時候,能夠按照不同的業(yè)務(wù)維度進(jìn)行相關(guān)的管理工作。
02?作業(yè)管理
通過拖拉拽的操作可以創(chuàng)建SeaTunnel的數(shù)據(jù)任務(wù),然后進(jìn)行相應(yīng)的提交運行。
03?資源管理
SeaTunnel 是以 Apache2.0 開源證書進(jìn)行開源的,與 MySQL 的 JDBC 驅(qū)動包開源協(xié)議不兼容,SeaTunnel 的 jdbc connector 是不提供相關(guān)的 JDBC 驅(qū)動依賴的。當(dāng)用戶使用 jdbc connector 時,需要自行提供 JDBC 驅(qū)動包。我們在這里提供了資源管理的功能,用戶可以自己上傳驅(qū)動包,然后再把 SeaTunnel 任務(wù)和 MySQL 驅(qū)動一起提交到集群中以保證任務(wù)的正常運行。
04?集群管理
主要是提供Flink集群信息的錄入,目前可以支持Standalone Session 集群錄入,用戶錄入后,提交SeaTunnel作業(yè)時就可以選擇集群,任務(wù)就會在集群運行。
05?數(shù)據(jù)源管理
支持用戶提前錄入一些數(shù)據(jù)源信息,這樣就不用每個任務(wù)都把數(shù)據(jù)源信息輸入一遍。同時,還可以去實現(xiàn)數(shù)據(jù)源的共享和權(quán)限限制,防止數(shù)據(jù)源信息明文泄露。
Scaleph功能簡介(運維中心)
運維中心是一個實時任務(wù)和周期任務(wù)的運行日志,用戶提交任務(wù)的時候看到任務(wù)相關(guān)的信息,我們還提供了鏈接跳轉(zhuǎn)操作,用戶點擊可以跳轉(zhuǎn)到Flink的Web UI上面去,通過Flink官方的Web UI頁面,可以看到任務(wù)具體的執(zhí)行信息。
Scaleph功能簡介(數(shù)據(jù)標(biāo)準(zhǔn))
01?數(shù)據(jù)元
數(shù)據(jù)治理是個大的體系,大家比較關(guān)心元數(shù)據(jù)、數(shù)據(jù)血緣、數(shù)據(jù)資產(chǎn),但是數(shù)據(jù)標(biāo)準(zhǔn)也是數(shù)據(jù)治理的重要一環(huán),我們把公司自己內(nèi)部使用的標(biāo)準(zhǔn)系統(tǒng)開源出來,給大家分享數(shù)據(jù)標(biāo)準(zhǔn)的相關(guān)知識。
在很多數(shù)倉的開發(fā)過程中,由于是多人協(xié)作的,同樣一個含義的字段,在不同的模型表中,開發(fā)會定義不同的字段來表達(dá)同樣的含義和業(yè)務(wù)。數(shù)據(jù)標(biāo)準(zhǔn)希望能通過數(shù)據(jù)元,來統(tǒng)一數(shù)倉開發(fā)人員的模型字段定義。
02?參考數(shù)據(jù)
數(shù)倉中的數(shù)據(jù)是通過數(shù)據(jù)集成工具從業(yè)務(wù)系統(tǒng)中拉過來的,會不可避免地出現(xiàn)同樣含義的字段在不同業(yè)務(wù)系統(tǒng)中有不同的定義,而這些含義相同定義不同的字段就需要數(shù)倉人員去進(jìn)行維護(hù),而且維護(hù)的過程以線下文檔為主,可能存在維護(hù)過時的情況。
同時也會出現(xiàn)業(yè)務(wù)知識無法直接映射為數(shù)倉模型信息的問題,數(shù)據(jù)標(biāo)準(zhǔn)讓用戶可以在 Web 頁面中對這些業(yè)務(wù)知識進(jìn)行維護(hù)。
上圖是一個具體案例。這里是定義的兩個業(yè)務(wù)系統(tǒng),一個是系統(tǒng)A,一個是系統(tǒng)B,它們分別有不同的性別枚舉值,同時A/B系統(tǒng)的枚舉描述也都不一樣,那怎么辦?
這個時候,我們通過數(shù)倉開發(fā)人員可以定一套統(tǒng)一的標(biāo)準(zhǔn),比如把編碼統(tǒng)一定為0,1,2,相應(yīng)的描述也定義好,通過中間的一個參考數(shù)據(jù)映射,用戶就可以方便的去看。
03?后續(xù)設(shè)想
是否能在數(shù)據(jù)集成過程中,直接通過數(shù)據(jù)標(biāo)準(zhǔn)進(jìn)行Transform 操作,實現(xiàn)知識和模型自動維護(hù)和映射。
04?Scaleph功能亮點
數(shù)據(jù)的可視化開發(fā)。我們認(rèn)為在數(shù)據(jù)同步領(lǐng)域,可視化拖拉拽,可以幫助用戶快速創(chuàng)建數(shù)據(jù)集成任務(wù),用戶拖拉拽出兩個算子,填寫相應(yīng)的參數(shù)就可以創(chuàng)建數(shù)據(jù)集成任務(wù)。
Flinkful是我們?yōu)?Flink 開發(fā)的一個Java客戶端。
Flink 作為一個流行的計算引擎,提供了很多方式讓用戶使用,比如說命令行接口、HTTP 接口等,通過命令行接口用戶可以提交任務(wù)、創(chuàng)建任務(wù)及取消任務(wù);HTTP 接口主要是用于 Web UI 界面。
在對接 Flink 的過程,我們發(fā)現(xiàn) Flink 作為一個運行在 JVM 之上的一個應(yīng)用與同樣運行在 JVM 之上的 Scaleph 應(yīng)用,二者的集成卻要通過 shell 腳本,很不合理。所以我們開發(fā)了 Flinkful,打開 Flink 在 Java 生態(tài)的開放能力,讓用戶通過 Flinkful,可以直接對 Flink 集群和任務(wù)做管理。
我們認(rèn)為 Flinkful 對 Flink 基礎(chǔ)設(shè)施維護(hù)人員是比較有意義的,所以從 Scaleph 倉庫中剝離出來,單獨開源。
插件體系。我們希望通過定義插件,提供系統(tǒng)擴(kuò)展接口,用戶和 Scaleph 開發(fā)者可以通過這些接口快速增強 Scaleph 的功能和特性。目前我們定義了兩個插件,分別是數(shù)據(jù)源插件和 SeaTunnel 插件,通過數(shù)據(jù)源插件可以快速擴(kuò)展出 JDBC、ES、Kafka、Clinkhouse之類的數(shù)據(jù)源,把這些數(shù)據(jù)源集中到 Scaleph 系統(tǒng)進(jìn)行統(tǒng)一的配置和使用。
目前 SeaTunnel 里面提供了很多 connector 和 transform 插件,如果逐一去開發(fā)頁面的話,是比較耗時的一個事情,我們就想著用一種簡單、聲明式的方式,把 SeaTunnel 相關(guān)的參數(shù)定義出來,能快速的把 SeaTunnel 相關(guān)的能力完整的遷到 Scaleph 項目上來。
問題分析
Flink-jdbc-connector 功能增強
SeaTunnel 官方文檔中的很多案例,都是以 FakeSource和 ConsoleSink 實現(xiàn)的,而我們在開發(fā)中是以 jdbc-connector 為主的。在集成過程中,我們發(fā)現(xiàn) flink-jdbc-connector 插件的 JdbcSink 只支持 Stream 模式運行,后來我們就給它實現(xiàn)了 Batch 模式。
JdbcSource 需要用戶提供 sql,程序在內(nèi)部通過正則表達(dá)式獲取到 sql 的列、表信息,以生成 JdbcSource 的 RowTypeInfo。但是在定義復(fù)雜 sql 的時候會出現(xiàn)別名、子查詢之類的情況,正則表達(dá)式難以覆蓋所有場景。我們使用 Jdbc 的 Connection 獲取到 sql 的 ResultSet,從 ResultSet 直接獲取 sql 的列信息,以生成 JdbcSource 的 RowTypeInfo。
?Seatunnel-core-flink.jar 瘦身
SeaTunnel 是運行在 Flink 和 Spark 之上,二者會分別打成兩個 jar 包,seatunnel-core-flink.jar 就是 Flink 對應(yīng)的實現(xiàn)。在2.1.1版本中,Seatunnel 會把基于 flink 實現(xiàn)的 connector 都打進(jìn)這個 fat jar 包中。
而真正去使用的時候,數(shù)據(jù)同步任務(wù),可能只會使用其中的 1-2 種 connector。Seatunnel 任務(wù)提交的時候會有一定量的額外網(wǎng)絡(luò)開銷。
我們想實現(xiàn)這種效果:有一個比較 thin 的 core jar包,然后再加上相關(guān)的 connector 的 jar 包。提交的時候,以core-jar包為主,加上相關(guān)的 connector 的 jar 包。同時前面介紹過的資源 jar 包上傳,如 SeaTunnel 的 jdbc-connector 缺少的 JDBC 驅(qū)動包,攜帶資源 jar 包和 connector jar 包的任務(wù)提交都是同一種處理方式。
后來社區(qū)在開展 connector 拆分的時候,我們也積極在相關(guān) issue 下分享了相關(guān)經(jīng)驗,當(dāng) Seatunnel 2.1.2 發(fā)布時,我們的系統(tǒng)也是很輕松地就適配了 seatunnel-core-flink.jar 和 connector jar 分離的發(fā)布形式。同時用戶沒有在 Flink 集群提前準(zhǔn)備 JDBC 驅(qū)動的情況下,也可以通過資源管理的功能,上傳驅(qū)動包,在提交 SeaTunnel 任務(wù)時,帶著驅(qū)動包一起提交。
Flink jobId 獲取問題
Flink 任務(wù)提交這一塊的最核心方式是以命令行接口的形式去實現(xiàn)的,因此用戶需要通過 shell 腳本去提交 Flink 任務(wù)。Flink 任務(wù)提交后,命令行客戶端會把對應(yīng)的任務(wù) id 輸出到控制臺日志中,用戶就需要捕獲輸出到控制臺上的日志,從中提取出任務(wù) id。
因為我們這個項目和 Flink 的所有交互全是通過 Flinkful 庫實現(xiàn),F(xiàn)linkful 可以把這樣一個 jobId 直接作為接口調(diào)用的返回值給發(fā)回來。所以我們的實現(xiàn)相比捕獲控制臺日志提取 jobId 還是比較優(yōu)雅的。
SeaTunnel 調(diào)用 System.exit() 問題
SeaTunnel 任務(wù)在去執(zhí)行的時候,先會對用戶編寫的配置文件進(jìn)行檢查,如果檢查失敗,會直接調(diào)用 System.exit(),然后這個時候 JVM 也就退出了。SeaTunnel 本身的提交方式是以 shell 腳實現(xiàn)的,因此 JVM 退出是沒有問題的。
但是當(dāng) Scaleph 系統(tǒng),把它集成到我們應(yīng)用里面的時候,在調(diào)用這個方法,就會導(dǎo)致我們 Scaleph 這樣的一個應(yīng)用會直接掛掉,導(dǎo)致我們服務(wù)的一個不可用。因此,我們也是對任務(wù)提交的這一塊代碼,通過 SecurityManager,增加了相關(guān)的一個權(quán)限限制,然后規(guī)定 SeaTunnel 相關(guān)的提交任務(wù)程序,禁止調(diào)用 System.exit() 方法。
05 SeaTunnel 社區(qū)貢獻(xiàn)
和我一起開發(fā) Scaleph 一個朋友,這里是我們倆的一些提交的 pr,比如上面說的 jdbc-connector 的功能增強。還有就是 jdbc-connector 的 upsert 功能的實現(xiàn)。flink-jdbc-connector 的 JdbcSink 的一個很大的缺陷是只支持 insert 功能,無法實現(xiàn) update,這會相當(dāng)限制這個 connector 的功能。我們也是開發(fā)了 upsert 語義的支持,支持?jǐn)?shù)據(jù)的重復(fù)同步。
01?系統(tǒng)演示
這個項目時間充足的話是可以進(jìn)行 Docker 環(huán)境和 IDE 環(huán)境演示的,這里時間有限就選擇 Docker 環(huán)境給大家進(jìn)行演示,演示視頻(直接跳轉(zhuǎn)23'18s):
Scaleph 基于 Apache SeaTunnel(Incubating) 的數(shù)據(jù)集成介紹-王奇
02?后續(xù)開發(fā)計劃
目前我們還是會盡快把 SeaTunnel 相關(guān)的 connector 和 transform 插件,全搬到我們的可視化拖拉拽的頁面上去,能夠讓用戶完整的感受到 SeaTunnel 的一個強大。另外一個就是隨著 SeaTunnel-connector 的相關(guān)插件豐富,也要把 connector 對應(yīng)的數(shù)據(jù)源種類給它豐富上去。
我們也希望能為數(shù)據(jù)開發(fā)和數(shù)據(jù)集成做一些 DAG 相關(guān)的編排調(diào)度,同時也希望能夠在數(shù)據(jù)開發(fā)方面支持 SQL 的任務(wù)開發(fā)。
Apache SeaTunnel
//? 保持聯(lián)絡(luò)?//
微信號 :?Seatunnel
來,和社區(qū)一同成長!
Apache SeaTunnel(Incubating) 是一個分布式、高性能、易擴(kuò)展、用于海量數(shù)據(jù)(離線&實時)同步和轉(zhuǎn)化的數(shù)據(jù)集成平臺。
倉庫地址:?
https://github.com/apache/incubator-seatunnel
網(wǎng)址:
https://seatunnel.apache.org/
Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal
Apache SeaTunnel(Incubating) 2.1.0 下載地址:
https://seatunnel.apache.org/download
衷心歡迎更多人加入!
能夠進(jìn)入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才剛剛開始,但社區(qū)的發(fā)展壯大需要更多人的加入。我們相信,在「Community Over Code」(社區(qū)大于代碼)、「Open and Cooperation」(開放協(xié)作)、「Meritocracy」(精英管理)、以及「多樣性與共識決策」等 The Apache Way 的指引下,我們將迎來更加多元化和包容的社區(qū)生態(tài),共建開源精神帶來的技術(shù)進(jìn)步!
我們誠邀各位有志于讓本土開源立足全球的伙伴加入 SeaTunnel 貢獻(xiàn)者大家庭,一起共建開源!
提交問題和建議:
https://github.com/apache/incubator-seatunnel/issues
貢獻(xiàn)代碼:
https://github.com/apache/incubator-seatunnel/pulls
訂閱社區(qū)開發(fā)郵件列表 :?
dev-subscribe@seatunnel.apache.org
開發(fā)郵件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-123jmewxe-RjB_DW3M3gV~xL91pZ0oVQ
關(guān)注 Twitter:?文章來源:http://www.zghlxwxcb.cn/news/detail-474733.html
https://twitter.com/ASFSeaTunnel文章來源地址http://www.zghlxwxcb.cn/news/detail-474733.html
到了這里,關(guān)于可視化任務(wù)編排&拖拉拽 | Scaleph 基于 Apache SeaTunnel的數(shù)據(jù)集成的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!