一、實時數(shù)倉建設(shè)背景
1. 實時需求日趨迫切
目前各大公司的產(chǎn)品需求和內(nèi)部決策對于數(shù)據(jù)實時性的要求越來越迫切,需要實時數(shù)倉的能力來賦能。傳統(tǒng)離線數(shù)倉的數(shù)據(jù)時效性是 T+1,調(diào)度頻率以天為單位,無法支撐實時場景的數(shù)據(jù)需求。即使能將調(diào)度頻率設(shè)置成小時,也只能解決部分時效性要求不高的場景,對于實效性要求很高的場景還是無法優(yōu)雅的支撐。因此實時使用數(shù)據(jù)的問題必須得到有效解決。
2. 實時技術(shù)日趨成熟
實時計算框架已經(jīng)經(jīng)歷了三代發(fā)展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟。一方面,實時任務(wù)的開發(fā)已經(jīng)能通過編寫 SQL 的方式來完成,在技術(shù)層面能很好地繼承離線數(shù)倉的架構(gòu)設(shè)計思想;另一方面,在線數(shù)據(jù)開發(fā)平臺所提供的功能對實時任務(wù)開發(fā)、調(diào)試、運維的支持也日漸趨于成熟,開發(fā)成本逐步降低,有助于去做這件事。
二、實時數(shù)倉建設(shè)目的
1. 解決傳統(tǒng)數(shù)倉的問題
從目前數(shù)倉建設(shè)的現(xiàn)狀來看,實時數(shù)倉是一個容易讓人產(chǎn)生混淆的概念,根據(jù)傳統(tǒng)經(jīng)驗分析,數(shù)倉有一個重要的功能,即能夠記錄歷史。通常,數(shù)倉都是希望從業(yè)務(wù)上線的第一天開始有數(shù)據(jù),然后一直記錄到現(xiàn)在。但實時流處理技術(shù),又是強調(diào)當(dāng)前處理狀態(tài)的一個技術(shù),結(jié)合當(dāng)前一線大廠的建設(shè)經(jīng)驗和滴滴在該領(lǐng)域的建設(shè)現(xiàn)狀,我們嘗試把公司內(nèi)實時數(shù)倉建設(shè)的目的定位為,以數(shù)倉建設(shè)理論和實時技術(shù),解決由于當(dāng)前離線數(shù)倉數(shù)據(jù)時效性低解決不了的問題。
現(xiàn)階段我們要建設(shè)實時數(shù)倉的主要原因是:
- 公司業(yè)務(wù)對于數(shù)據(jù)的實時性越來越迫切,需要有實時數(shù)據(jù)來輔助完成決策;
- 實時數(shù)據(jù)建設(shè)沒有規(guī)范,數(shù)據(jù)可用性較差,無法形成數(shù)倉體系,資源大量浪費;
- 數(shù)據(jù)平臺工具對整體實時開發(fā)的支持也日漸趨于成熟,開發(fā)成本降低。
2. 實時數(shù)倉的應(yīng)用場景
- 實時 OLAP 分析;
- 實時數(shù)據(jù)看板;
- 實時業(yè)務(wù)監(jiān)控;
- 實時數(shù)據(jù)接口服務(wù)。
三、實時數(shù)倉建設(shè)方案
接下來我們分析下目前實時數(shù)倉建設(shè)比較好的幾個案例,希望這些案例能夠給大家?guī)硪恍﹩l(fā)。
1. 滴滴順風(fēng)車實時數(shù)倉案例
滴滴數(shù)據(jù)團隊建設(shè)的實時數(shù)倉,基本滿足了順風(fēng)車業(yè)務(wù)方在實時側(cè)的各類業(yè)務(wù)需求,初步建立起順風(fēng)車實時數(shù)倉,完成了整體數(shù)據(jù)分層,包含明細(xì)數(shù)據(jù)和匯總數(shù)據(jù),統(tǒng)一了 DWD 層,降低了大數(shù)據(jù)資源消耗,提高了數(shù)據(jù)復(fù)用性,可對外輸出豐富的數(shù)據(jù)服務(wù)。
順風(fēng)車實時數(shù)倉和對應(yīng)的離線數(shù)倉有很多類似的地方。例如分層結(jié)構(gòu);比如 ODS 層,明細(xì)層,匯總層,乃至應(yīng)用層,他們命名的模式可能都是一樣的。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:
- 與離線數(shù)倉相比,實時數(shù)倉的層次更少一些:
- 從目前建設(shè)離線數(shù)倉的經(jīng)驗來看,數(shù)倉的數(shù)據(jù)明細(xì)層內(nèi)容會非常豐富,處理明細(xì)數(shù)據(jù)外一般還會包含輕度匯總層的概念,另外離線數(shù)倉中應(yīng)用層數(shù)據(jù)在數(shù)倉內(nèi)部,但實時數(shù)倉中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落入應(yīng)用系統(tǒng)的存儲介質(zhì)中,可以把該層與數(shù)倉的表分離;
- 應(yīng)用層少建設(shè)的好處:實時處理數(shù)據(jù)的時候,每建一個層次,數(shù)據(jù)必然會產(chǎn)生一定的延遲;
- 匯總層少建的好處:在匯總統(tǒng)計的時候,往往為了容忍一部分?jǐn)?shù)據(jù)的延遲,可能會人為的制造一些延遲來保證數(shù)據(jù)的準(zhǔn)確。舉例,在統(tǒng)計跨天相關(guān)的訂單事件中的數(shù)據(jù)時,可能會等到 00:00:05 或者 00:00:10 再統(tǒng)計,確保 00:00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進(jìn)行統(tǒng)計。所以,匯總層的層次太多的話,就會更大的加重人為造成的數(shù)據(jù)延遲。
- 與離線數(shù)倉相比,實時數(shù)倉的數(shù)據(jù)源存儲不同:
- 在建設(shè)離線數(shù)倉的時候,目前滴滴內(nèi)部整個離線數(shù)倉都是建立在 Hive 表之上。但是,在建設(shè)實時數(shù)倉的時候,同一份表,會使用不同的方式進(jìn)行存儲。比如常見的情況下,明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會存在 Kafka 里面,但是像城市、渠道等維度信息需要借助 Hbase,mysql 或者其他 KV 存儲等數(shù)據(jù)庫來進(jìn)行存儲。
接下來,根據(jù)順風(fēng)車實時數(shù)倉架構(gòu)圖,對每一層建設(shè)做具體展開:
1. ODS 貼源層建設(shè)
根據(jù)順風(fēng)車具體場景,目前順風(fēng)車數(shù)據(jù)源主要包括訂單相關(guān)的 binlog 日志,冒泡和安全相關(guān)的 public 日志,流量相關(guān)的埋點日志等。這些數(shù)據(jù)部分已采集寫入 kafka 或 ddmq 等數(shù)據(jù)通道中,部分?jǐn)?shù)據(jù)需要借助內(nèi)部自研同步工具完成采集,最終基于順風(fēng)車數(shù)倉 ods 層建設(shè)規(guī)范分主題統(tǒng)一寫入 kafka 存儲介質(zhì)中。
命名規(guī)范:ODS 層實時數(shù)據(jù)源主要包括兩種。
- 一種是在離線采集時已經(jīng)自動生產(chǎn)的 DDMQ 或者是 Kafka topic,這類型的數(shù)據(jù)命名方式為采集系統(tǒng)自動生成規(guī)范為:cn-binlog-數(shù)據(jù)庫名-數(shù)據(jù)庫名 eg:
cn-binlog-ihap_fangyuan-ihap_fangyuan
- 一種是需要自己進(jìn)行采集同步到 kafka topic 中,生產(chǎn)的 topic 命名規(guī)范同離線類似:ODS 層采用:
realtime_ods_binlog_{源系統(tǒng)庫/表名}/ods_log_{日志名} eg: realtime_ods_binlog_ihap_fangyuan
2. DWD 明細(xì)層建設(shè)
根據(jù)順風(fēng)車業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程特點,構(gòu)建最細(xì)粒度的明細(xì)層事實表;結(jié)合順風(fēng)車分析師在離線側(cè)的數(shù)據(jù)使用特點,將明細(xì)事實表的某些重要維度屬性字段做適當(dāng)冗余,完成寬表化處理,之后基于當(dāng)前順風(fēng)車業(yè)務(wù)方對實時數(shù)據(jù)的需求重點,重點建設(shè)交易、財務(wù)、體驗、安全、流量等幾大模塊;該層的數(shù)據(jù)來源于 ODS 層,通過大數(shù)據(jù)架構(gòu)提供的 Stream SQL 完成 ETL 工作,對于 binlog 日志的處理主要進(jìn)行簡單的數(shù)據(jù)清洗、處理數(shù)據(jù)漂移和數(shù)據(jù)亂序,以及可能對多個 ODS 表進(jìn)行 Stream Join,對于流量日志主要是做通用的 ETL 處理和針對順風(fēng)車場景的數(shù)據(jù)過濾,完成非結(jié)構(gòu)化數(shù)據(jù)的結(jié)構(gòu)化處理和數(shù)據(jù)的分流;該層的數(shù)據(jù)除了存儲在消息隊列 Kafka 中,通常也會把數(shù)據(jù)實時寫入 Druid 數(shù)據(jù)庫中,供查詢明細(xì)數(shù)據(jù)和作為簡單匯總數(shù)據(jù)的加工數(shù)據(jù)源。
命名規(guī)范:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應(yīng)遵循下述規(guī)則:
realtime_dwd_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_[{業(yè)務(wù)過程縮寫}]_[{自定義表命名標(biāo)簽縮寫}]
- {業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
- {數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分
- {自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
- 樣例:realtime_dwd_trip_trd_order_base
3. DIM 層
- 公共維度層,基于維度建模理念思想,建立整個業(yè)務(wù)過程的一致性維度,降低數(shù)據(jù)計算口徑和算法不統(tǒng)一風(fēng)險;
- DIM 層數(shù)據(jù)來源于兩部分:一部分是 Flink 程序?qū)崟r處理 ODS 層數(shù)據(jù)得到,另外一部分是通過離線任務(wù)出倉得到;
- DIM 層維度數(shù)據(jù)主要使用 MySQL、Hbase、fusion(滴滴自研 KV 存儲) 三種存儲引擎,對于維表數(shù)據(jù)比較少的情況可以使用 MySQL,對于單條數(shù)據(jù)大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 存儲,降低機器內(nèi)存資源占用,對于數(shù)據(jù)量比較大,對維表數(shù)據(jù)變化不是特別敏感的場景,可以使用 HBase 存儲。
命名規(guī)范:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字符,并且應(yīng)遵循下述規(guī)則:
dim_{業(yè)務(wù)/pub}_{維度定義}[_{自定義命名標(biāo)簽}]
:
- {業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
- {維度定義}:參考維度命名
- {自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
- 樣例:dim_trip_dri_base
4. DWM 匯總層建設(shè)
在建設(shè)順風(fēng)車實時數(shù)倉的匯總層的時候,跟順風(fēng)車離線數(shù)倉有很多一樣的地方,但其具體技術(shù)實現(xiàn)會存在很大不同。
第一:對于一些共性指標(biāo)的加工,比如 pv,uv,訂單業(yè)務(wù)過程指標(biāo)等,我們會在匯總層進(jìn)行統(tǒng)一的運算,確保關(guān)于指標(biāo)的口徑是統(tǒng)一在一個固定的模型中完成。對于一些個性指標(biāo),從指標(biāo)復(fù)用性的角度出發(fā),確定唯一的時間字段,同時該字段盡可能與其他指標(biāo)在時間維度上完成拉齊,例如行中異常訂單數(shù)需要與交易域指標(biāo)在事件時間上做到拉齊。
第二:在順風(fēng)車匯總層建設(shè)中,需要進(jìn)行多維的主題匯總,因為實時數(shù)倉本身是面向主題的,可能每個主題會關(guān)心的維度都不一樣,所以需要在不同的主題下,按照這個主題關(guān)心的維度對數(shù)據(jù)進(jìn)行匯總,最后來算業(yè)務(wù)方需要的匯總指標(biāo)。在具體操作中,對于 pv 類指標(biāo)使用 Stream SQL 實現(xiàn) 1 分鐘匯總指標(biāo)作為最小匯總單位指標(biāo),在此基礎(chǔ)上進(jìn)行時間維度上的指標(biāo)累加;對于 uv 類指標(biāo)直接使用 druid 數(shù)據(jù)庫作為指標(biāo)匯總?cè)萜?,根?jù)業(yè)務(wù)方對匯總指標(biāo)的及時性和準(zhǔn)確性的要求,實現(xiàn)相應(yīng)的精確去重和非精確去重。
第三:匯總層建設(shè)過程中,還會涉及到衍生維度的加工。在順風(fēng)車券相關(guān)的匯總指標(biāo)加工中我們使用 Hbase 的版本機制來構(gòu)建一個衍生維度的拉鏈表,通過事件流和 Hbase 維表關(guān)聯(lián)的方式得到實時數(shù)據(jù)當(dāng)時的準(zhǔn)確維度
命名規(guī)范:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應(yīng)遵循下述規(guī)則:
realtime_dwm_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_{數(shù)據(jù)主粒度縮寫}_[{自定義表命名標(biāo)簽縮寫}]_{統(tǒng)計時間周期范圍縮寫}
:
- {業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
- {數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分
- {數(shù)據(jù)主粒度縮寫}:指數(shù)據(jù)主要粒度或數(shù)據(jù)域的縮寫,也是聯(lián)合主鍵中的主要維度
- {自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
- {統(tǒng)計時間周期范圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量)
- 樣例:
realtime_dwm_trip_trd_pas_bus_accum_1min
- APP 應(yīng)用層
該層主要的工作是把實時匯總數(shù)據(jù)寫入應(yīng)用系統(tǒng)的數(shù)據(jù)庫中,包括用于大屏顯示和實時 OLAP 的 Druid 數(shù)據(jù)庫(該數(shù)據(jù)庫除了寫入應(yīng)用數(shù)據(jù),也可以寫入明細(xì)數(shù)據(jù)完成匯總指標(biāo)的計算)中,用于實時數(shù)據(jù)接口服務(wù)的 Hbase 數(shù)據(jù)庫,用于實時數(shù)據(jù)產(chǎn)品的 mysql 或者 redis 數(shù)據(jù)庫中。
命名規(guī)范:基于實時數(shù)倉的特殊性不做硬性要求。
2. 快手實時數(shù)倉場景化案例
1) 目標(biāo)及難點
- 目標(biāo)
首先由于是做數(shù)倉,因此希望所有的實時指標(biāo)都有離線指標(biāo)去對應(yīng),要求實時指標(biāo)和離線指標(biāo)整體的數(shù)據(jù)差異在 1% 以內(nèi),這是最低標(biāo)準(zhǔn)。
其次是數(shù)據(jù)延遲,其 SLA 標(biāo)準(zhǔn)是活動期間所有核心報表場景的數(shù)據(jù)延遲不能超過 5 分鐘,這 5 分鐘包括作業(yè)掛掉之后和恢復(fù)時間,如果超過則意味著 SLA 不達(dá)標(biāo)。
最后是穩(wěn)定性,針對一些場景,比如作業(yè)重啟后,我們的曲線是正常的,不會因為作業(yè)重啟導(dǎo)致指標(biāo)產(chǎn)出一些明顯的異常。
- 難點
第一個難點是數(shù)據(jù)量大。每天整體的入口流量數(shù)據(jù)量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達(dá)到億 / 秒。
第二個難點是組件依賴比較復(fù)雜。可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個難點是鏈路復(fù)雜。目前我們有 200+ 核心業(yè)務(wù)作業(yè),50+ 核心數(shù)據(jù)源,整體作業(yè)超過 1000。
2) 實時數(shù)倉 - 分層模型
基于上面三個難點,來看一下數(shù)倉架構(gòu):
最下層有三個不同的數(shù)據(jù)源,分別是客戶端日志、服務(wù)端日志以及 Binlog 日志;在公共基礎(chǔ)層分為兩個不同的層次,一個是 DWD 層,做明細(xì)數(shù)據(jù),另一個是 DWS 層,做公共聚合數(shù)據(jù),DIM 是我們常說的維度。我們有一個基于離線數(shù)倉的主題預(yù)分層,這個主題預(yù)分層可能包括流量、用戶、設(shè)備、視頻的生產(chǎn)消費、風(fēng)控、社交等。DWD 層的核心工作是標(biāo)準(zhǔn)化的清洗;DWS 層是把維度的數(shù)據(jù)和 DWD 層進(jìn)行關(guān)聯(lián),關(guān)聯(lián)之后生成一些通用粒度的聚合層次。再往上是應(yīng)用層,包括一些大盤的數(shù)據(jù),多維分析的模型以及業(yè)務(wù)專題數(shù)據(jù);最上面是場景。整體過程可以分為三步:
第一步是做業(yè)務(wù)數(shù)據(jù)化,相當(dāng)于把業(yè)務(wù)的數(shù)據(jù)接進(jìn)來;第二步是數(shù)據(jù)資產(chǎn)化,意思是對數(shù)據(jù)做很多的清洗,然后形成一些規(guī)則有序的數(shù)據(jù);第三步是數(shù)據(jù)業(yè)務(wù)化,可以理解數(shù)據(jù)在實時數(shù)據(jù)層面可以反哺業(yè)務(wù),為業(yè)務(wù)數(shù)據(jù)價值建設(shè)提供一些賦能。
3) 實時數(shù)倉 - 保障措施
基于上面的分層模型,來看一下整體的保障措施:
保障層面分為三個不同的部分,分別是質(zhì)量保障,時效保障以及穩(wěn)定保障。
我們先看藍(lán)色部分的質(zhì)量保障。針對質(zhì)量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控,這是我們基于自己的 SDK 的采集做的,以及數(shù)據(jù)源和離線的一致性校準(zhǔn)。研發(fā)階段的計算過程有三個階段,分別是研發(fā)階段、上線階段和服務(wù)階段。研發(fā)階段可能會提供一個標(biāo)準(zhǔn)化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證質(zhì)量是一致的;上線階段更多的是服務(wù)監(jiān)控和指標(biāo)監(jiān)控;在服務(wù)階段,如果出現(xiàn)一些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了一些不符合預(yù)期的場景,我們會做離線的整體數(shù)據(jù)修復(fù)。
第二個是時效性保障。針對數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納入監(jiān)控。在研發(fā)階段其實還有兩個事情:首先是壓測,常規(guī)的任務(wù)會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務(wù)延遲的情況;通過壓測之后,會有一些任務(wù)上線和重啟性能評估,相當(dāng)于按照 CP 恢復(fù)之后,重啟的性能是什么樣子。
最后一個是穩(wěn)定保障,這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基于之前的壓測結(jié)果做限流,目的是保障作業(yè)在超過極限的情況下,仍然是穩(wěn)定的,不會出現(xiàn)很多的不穩(wěn)定或者 CP 失敗的情況。之后我們會有兩種不同的標(biāo)準(zhǔn),一種是冷備雙機房,另外一種是熱備雙機房。冷備雙機房是:當(dāng)一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當(dāng)于同樣一份邏輯在兩個機房各部署一次。以上就是我們整體的保障措施。
3) 快手場景問題及解決方案
1. PV/UV 標(biāo)準(zhǔn)化
1.1 場景
第一個問題是 PV/UV 標(biāo)準(zhǔn)化,這里有三個截圖:
第一張圖是春晚活動的預(yù)熱場景,相當(dāng)于是一種玩法,第二和第三張圖是春晚當(dāng)天的發(fā)紅包活動和直播間截圖。
在活動進(jìn)行過程中,我們發(fā)現(xiàn) 60~70% 的需求是計算頁面里的信息,如:
- 這個頁面來了多少人,或者有多少人點擊進(jìn)入這個頁面;
- 活動一共來了多少人;
- 頁面里的某一個掛件,獲得了多少點擊、產(chǎn)生了多少曝光。
1.2 方案
抽象一下這個場景
簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產(chǎn)生一些 Count 或者 Sum 操作。
基于這種場景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機制,從 Source 數(shù)據(jù)源取數(shù)據(jù),之后做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之后會有一個叫做 Local Window Agg 的東西,相當(dāng)于數(shù)據(jù)分完桶之后把相同類型的數(shù)據(jù)相加。Local Window Agg 之后再按照維度進(jìn)行 Global Window Agg 的合桶,合桶的概念相當(dāng)于按照維度計算出最終的結(jié)果。Early Fire 機制相當(dāng)于在 Local Window Agg 開一個天級的窗口,然后每分鐘去對外輸出一次。
這個過程中我們遇到了一些問題,如上圖左下角所示。
在代碼正常運行的情況下是沒有問題的,但如果整體數(shù)據(jù)存在延遲或者追溯歷史數(shù)據(jù)的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候數(shù)據(jù)量會比較大,所以可能導(dǎo)致 14:00 追溯歷史,直接讀到了 14:02 的數(shù)據(jù),而 14:01 的那個點就被丟掉了,丟掉了以后會發(fā)生什么?
在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史數(shù)據(jù)的結(jié)果。橫坐標(biāo)是分鐘,縱坐標(biāo)是截止到當(dāng)前時刻的頁面 UV,我們發(fā)現(xiàn)有些點是橫著的,意味著沒有數(shù)據(jù)結(jié)果,然后一個陡增,然后又橫著的,接著又一個陡增,而這個曲線的預(yù)期結(jié)果其實是圖中下方那種平滑的曲線。
為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。
數(shù)據(jù)開一個大的天級窗口,大窗口下又開了一個小的分鐘級窗口,數(shù)據(jù)按數(shù)據(jù)本身的 Row Time 落到分鐘級窗口。
Watermark 推進(jìn)過了窗口的 event_time,它會進(jìn)行一次下發(fā)的觸發(fā),通過這種方式可以解決回溯的問題,數(shù)據(jù)本身落在真實的窗口, Watermark 推進(jìn),在窗口結(jié)束后觸發(fā)。此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序數(shù)據(jù)本身是一個不丟棄的狀態(tài),會記錄到最新的累計數(shù)據(jù)。最后是語義一致性,它會基于事件時間,在亂序不嚴(yán)重的情況下,和離線計算出來的結(jié)果一致性是相當(dāng)高的。以上是 PV/UV 一個標(biāo)準(zhǔn)化的解決方案。
2. DAU 計算
2.1 背景介紹
下面介紹一下 DAU 計算:
我們對于整個大盤的活躍設(shè)備、新增設(shè)備和回流設(shè)備有比較多的監(jiān)控。
活躍設(shè)備指的是當(dāng)天來過的設(shè)備;新增設(shè)備指的是當(dāng)天來過且歷史沒有來過的設(shè)備;回流設(shè)備指的是當(dāng)天來過且 N 天內(nèi)沒有來過的設(shè)備。但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標(biāo)。
我們看一下離線過程中,邏輯應(yīng)該怎么算。
首先我們先算活躍設(shè)備,把這些合并到一起,然后做一個維度下的天級別去重,接著再去關(guān)聯(lián)維度表,這個維度表包括設(shè)備的首末次時間,就是截止到昨天設(shè)備首次訪問和末次訪問的時間。
得到這個信息之后,我們就可以進(jìn)行邏輯計算,然后我們會發(fā)現(xiàn)新增和回流的設(shè)備其實是活躍設(shè)備里打的一個子標(biāo)簽。新增設(shè)備就是做了一個邏輯處理,回流設(shè)備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?
其實我們最開始是這么做的,但遇到了一些問題:
第一個問題是:數(shù)據(jù)源是 6~8 個,而且我們大盤的口徑經(jīng)常會做微調(diào),如果是單作業(yè)的話,每次微調(diào)的過程之中都要改,單作業(yè)的穩(wěn)定性會非常差;第二個問題是:數(shù)據(jù)量是萬億級,這會導(dǎo)致兩個情況,首先是這個量級的單作業(yè)穩(wěn)定性非常差,其次是實時關(guān)聯(lián)維表的時候用的 KV 存儲,任何一個這樣的 RPC 服務(wù)接口,都不可能在萬億級數(shù)據(jù)量的場景下保證服務(wù)穩(wěn)定性;第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘。整個鏈路要避免批處理,如果出現(xiàn)了一些任務(wù)性能的單點問題,我們還要保證高性能和可擴容。
2.2 技術(shù)方案
針對以上問題,介紹一下我們是怎么做的:
第一步是對 A B C 這三個數(shù)據(jù)源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個分鐘級別去重的數(shù)據(jù)源,接著把它們 Union 到一起,然后再進(jìn)行同樣的邏輯操作。
這相當(dāng)于我們數(shù)據(jù)源的入口從萬億變到了百億的級別,分鐘級別去重之后再進(jìn)行一個天級別的去重,產(chǎn)生的數(shù)據(jù)源就可以從百億變成了幾十億的級別。
在幾十億級別數(shù)據(jù)量的情況下,我們再去關(guān)聯(lián)數(shù)據(jù)服務(wù)化,這就是一種比較可行的狀態(tài),相當(dāng)于去關(guān)聯(lián)用戶畫像的 RPC 接口,得到 RPC 接口之后,最終寫入到了目標(biāo) Topic。這個目標(biāo) Topic 會導(dǎo)入到 OLAP 引擎,供給多個不同的服務(wù),包括移動版服務(wù),大屏服務(wù),指標(biāo)看板服務(wù)等。
這個方案有三個方面的優(yōu)勢,分別是穩(wěn)定性、時效性和準(zhǔn)確性。
首先是穩(wěn)定性。松耦合可以簡單理解為當(dāng)數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時,可以單獨修改。第二是任務(wù)可擴容,因為我們把所有邏輯拆分得非常細(xì)粒度,當(dāng)一些地方出現(xiàn)了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務(wù)化后置和狀態(tài)可控。其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。最后是準(zhǔn)確性,我們支持?jǐn)?shù)據(jù)驗證、實時監(jiān)控、模型出口統(tǒng)一等。此時我們遇到了另外一個問題 - 亂序。對于上方三個不同的作業(yè),每一個作業(yè)重啟至少會有兩分鐘左右的延遲,延遲會導(dǎo)致下游的數(shù)據(jù)源 Union 到一起就會有亂序。
2.3 延遲計算方案
遇到上面這種有亂序的情況下,我們要怎么處理?
我們總共有三種處理方案:
第一種解決方案是用 “did + 維度 + 分鐘” 進(jìn)行去重,Value 設(shè)為 “是否來過”。比如同一個 did,04:01 來了一條,它會進(jìn)行結(jié)果輸出。同樣的,04:02 和 04:04 也會進(jìn)行結(jié)果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進(jìn)行結(jié)果輸出。
這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態(tài)大小是存 10 分鐘的兩倍,到后面這個狀態(tài)大小有點不太可控,因此我們又換了解決方案 2。
第二種解決方案,我們的做法會涉及到一個假設(shè)前提,就是假設(shè)不存在數(shù)據(jù)源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。04:01 來了一條數(shù)據(jù),進(jìn)行結(jié)果輸出。04:02 來了一條數(shù)據(jù),如果是同一個 did,那么它會更新時間戳,然后仍然做結(jié)果輸出。04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的數(shù)據(jù),它發(fā)現(xiàn)時間戳已經(jīng)更新到 04:04,它會丟棄這條數(shù)據(jù)。這樣的做法大幅度減少了本身所需要的一些狀態(tài),但是對亂序是零容忍,不允許發(fā)生任何亂序的情況,由于我們不好解決這個問題,因此我們又想出了解決方案 3。
方案 3 是在方案 2 時間戳的基礎(chǔ)之上,加了一個類似于環(huán)形緩沖區(qū),在緩沖區(qū)之內(nèi)允許亂序。
比如 04:01 來了一條數(shù)據(jù),進(jìn)行結(jié)果輸出;04:02 來了一條數(shù)據(jù),它會把時間戳更新到 04:02,并且會記錄同一個設(shè)備在 04:01 也來過。如果 04:04 再來了一條數(shù)據(jù),就按照相應(yīng)的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序。
綜合來看這三個方案:
方案 1 在容忍 16 分鐘亂序的情況下,單作業(yè)的狀態(tài)大小在 480G 左右。這種情況雖然保證了準(zhǔn)確性,但是作業(yè)的恢復(fù)和穩(wěn)定性是完全不可控的狀態(tài),因此我們還是放棄了這個方案;
方案 2 是 30G 左右的狀態(tài)大小,對于亂序 0 容忍,但是數(shù)據(jù)不準(zhǔn)確,由于我們對準(zhǔn)確性的要求非常高,因此也放棄了這個方案;
方案 3 的狀態(tài)跟方案 1 相比,它的狀態(tài)雖然變化了但是增加的不多,而且整體能達(dá)到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業(yè)的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。
3. 運營場景
3.1 背景介紹
運營場景可分為四個部分:
第一個是數(shù)據(jù)大屏支持,包括單直播間的分析數(shù)據(jù)和大盤的分析數(shù)據(jù),需要做到分鐘級延遲,更新要求比較高;
第二個是直播看板支持,直播看板的數(shù)據(jù)會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;
第三個是數(shù)據(jù)策略榜單,這個榜單主要是預(yù)測熱門作品、爆款,要求的是小時級別的數(shù)據(jù),更新要求比較低;
第四個是 C 端實時指標(biāo)展示,查詢量比較大,但是查詢模式比較固定。
下面進(jìn)行分析這 4 種不同的狀態(tài)產(chǎn)生的一些不同的場景。
前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業(yè)務(wù)場景,有的是通用業(yè)務(wù)場景。
針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內(nèi)容,會有哪些指標(biāo),而且對 QPS 要求比較高。
3.2 技術(shù)方案
針對上方 4 種不同的場景,我們是如何去做的?
首先看一下基礎(chǔ)明細(xì)層,數(shù)據(jù)源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費信息,還有觀看 / 點贊 / 評論。經(jīng)過一輪基礎(chǔ)清洗,然后做維度管理。上游的這些維度信息來源于 Kafka,Kafka 寫入了一些內(nèi)容的維度,放到了 KV 存儲里邊,包括一些用戶的維度。
這些維度關(guān)聯(lián)了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級緩存的操作。
如圖中上方,我們讀取 DWD 層的數(shù)據(jù)然后做基礎(chǔ)匯總,核心是窗口維度聚合生成 4 種不同粒度的數(shù)據(jù),分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的數(shù)據(jù)。
如圖中下方,基于這些通用維度數(shù)據(jù),我們再去加工個性化維度的數(shù)據(jù),也就是 ADS 層。拿到了這些數(shù)據(jù)之后會有維度擴展,包括內(nèi)容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構(gòu)服務(wù)實時 topic 和大 V 直播實時 topic。
分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發(fā)現(xiàn)任務(wù)經(jīng)常出問題,并且分不清楚哪個任務(wù)的職責(zé)是什么,構(gòu)建不出這樣的一個穩(wěn)定層。
如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業(yè)務(wù)看板的場景。
3. 騰訊看點實時數(shù)倉案例
騰訊看點業(yè)務(wù)為什么要構(gòu)建實時數(shù)倉,因為原始的上報數(shù)據(jù)量非常大,一天上報峰值就有上萬億條。而且上報格式混亂。缺乏內(nèi)容維度信息、用戶畫像信息,下游沒辦法直接使用。而我們提供的實時數(shù)倉,是根據(jù)騰訊看點信息流的業(yè)務(wù)場景,進(jìn)行了內(nèi)容維度的關(guān)聯(lián),用戶畫像的關(guān)聯(lián),各種粒度的聚合,下游可以非常方便的使用實時數(shù)據(jù)。
1) 方案選型
那就看下我們多維實時數(shù)據(jù)分析系統(tǒng)的方案選型,選型我們對比了行業(yè)內(nèi)的領(lǐng)先方案,選擇了最符合我們業(yè)務(wù)場景的方案。
- 第一塊是實時數(shù)倉的選型,我們選擇的是業(yè)界比較成熟的 Lambda 架構(gòu),他的優(yōu)點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線數(shù)據(jù)用兩套代碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做數(shù)據(jù)對賬的工作,如果有異常會進(jìn)行告警。
- 第二塊是實時計算引擎選型,因為 Flink 設(shè)計之初就是為了流處理,SparkStreaming 嚴(yán)格來說還是微批處理,Strom 用的已經(jīng)不多了。再看 Flink 具有 Exactly-once 的準(zhǔn)確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎。
- 第三塊是實時存儲引擎,我們的要求就是需要有維度索引、支持高并發(fā)、預(yù)聚合、高性能實時多維 OLAP 查詢??梢钥吹剑琀base、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內(nèi)容,存放在同一個 Segment 上,計算全局 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 數(shù)據(jù)庫引擎 ClickHouse。
2) 設(shè)計目標(biāo)與設(shè)計難點
我們多維實時數(shù)據(jù)分析系統(tǒng)分為三大模塊
- 實時計算引擎
- 實時存儲引擎
- App 層
難點主要在前兩個模塊:實時計算引擎和實時存儲引擎。
- 千萬級/s 的海量數(shù)據(jù)如何實時接入,并且進(jìn)行極低延遲維表關(guān)聯(lián)。
- 實時存儲引擎如何支持高并發(fā)寫入、高可用分布式和高性能索引查詢,是比較難的。
這幾個模塊的具體實現(xiàn),看一下我們系統(tǒng)的架構(gòu)設(shè)計。
3) 架構(gòu)設(shè)計
前端采用的是開源組件 Ant Design,利用了 Nginx 服務(wù)器,部署靜態(tài)頁面,并反向代理了瀏覽器的請求到后臺服務(wù)器上。
后臺服務(wù)是基于騰訊自研的 RPC 后臺服務(wù)框架寫的,并且會進(jìn)行一些二級緩存。
實時數(shù)倉部分,分為了接入層、實時計算層和實時數(shù)倉存儲層。
- 接入層主要是從千萬級/s 的原始消息隊列中,拆分出不同行為數(shù)據(jù)的微隊列,拿看點的視頻來說,拆分過后,數(shù)據(jù)就只有百萬級/s 了;
- 實時計算層主要負(fù)責(zé),多行行為流水?dāng)?shù)據(jù)進(jìn)行行轉(zhuǎn)列,實時關(guān)聯(lián)用戶畫像數(shù)據(jù)和內(nèi)容維度數(shù)據(jù);
- 實時數(shù)倉存儲層主要是設(shè)計出符合看點業(yè)務(wù)的,下游好用的實時消息隊列。我們暫時提供了兩個消息隊列,作為實時數(shù)倉的兩層。一層 DWM 層是內(nèi)容 ID-用戶 ID 粒度聚合的,就是一條數(shù)據(jù)包含內(nèi)容 ID-用戶 ID 還有 B 側(cè)內(nèi)容數(shù)據(jù)、C 側(cè)用戶數(shù)據(jù)和用戶畫像數(shù)據(jù);另一層是 DWS 層,是內(nèi)容 ID 粒度聚合的,一條數(shù)據(jù)包含內(nèi)容 ID,B 側(cè)數(shù)據(jù)和 C 側(cè)數(shù)據(jù)??梢钥吹絻?nèi)容 ID-用戶 ID 粒度的消息隊列流量進(jìn)一步減小到十萬級/s,內(nèi)容 ID 粒度的更是萬級/s,并且格式更加清晰,維度信息更加豐富。
實時存儲部分分為實時寫入層、OLAP 存儲層和后臺接口層。
- 實時寫入層主要是負(fù)責(zé) Hash 路由將數(shù)據(jù)寫入;
- OLAP 存儲層利用 MPP 存儲引擎,設(shè)計符合業(yè)務(wù)的索引和物化視圖,高效存儲海量數(shù)據(jù);
- 后臺接口層提供高效的多維實時查詢接口。
4) 實時計算
這個系統(tǒng)最復(fù)雜的兩塊,實時計算和實時存儲。
先介紹實時計算部分:分為實時關(guān)聯(lián)和實時數(shù)倉。
1. 實時高性能維表關(guān)聯(lián)
實時維表關(guān)聯(lián)這一塊難度在于?百萬級/s?的實時數(shù)據(jù)流,如果直接去關(guān)聯(lián) HBase,1 分鐘的數(shù)據(jù),關(guān)聯(lián)完 HBase 耗時是小時級的,會導(dǎo)致數(shù)據(jù)延遲嚴(yán)重。
我們提出了幾個解決方案:
- 第一個是,在 Flink 實時計算環(huán)節(jié),先按照 1 分鐘進(jìn)行了窗口聚合,將窗口內(nèi)多行行為數(shù)據(jù)轉(zhuǎn)一行多列的數(shù)據(jù)格式,經(jīng)過這一步操作,原本小時級的關(guān)聯(lián)耗時下降到了十幾分鐘,但是還是不夠的。
- 第二個是,在訪問 HBase 內(nèi)容之前設(shè)置一層 Redis 緩存,因為 1000 條數(shù)據(jù)訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍。為了防止過期的數(shù)據(jù)浪費緩存,緩存過期時間設(shè)置成 24 小時,同時通過監(jiān)聽寫 HBase Proxy 來保證緩存的一致性。這樣將訪問時間從十幾分鐘變成了秒級。
- 第三個是,上報過程中會上報不少非常規(guī)內(nèi)容 ID,這些內(nèi)容 ID 在內(nèi)容 HBase 中是不存儲的,會造成緩存穿透的問題。所以在實時計算的時候,我們直接過濾掉這些內(nèi)容 ID,防止緩存穿透,又減少一些時間。
- 第四個是,因為設(shè)置了定時緩存,會引入一個緩存雪崩的問題。為了防止雪崩,我們在實時計算中,進(jìn)行了削峰填谷的操作,錯開設(shè)置緩存的時間。
可以看到,優(yōu)化前后,數(shù)據(jù)量從百億級減少到了十億級,耗時從小時級減少到了數(shù)十秒,減少 99%。
2. 下游提供服務(wù)
實時數(shù)倉的難度在于:它處于比較新的領(lǐng)域,并且各個公司各個業(yè)務(wù)差距比較大,怎么能設(shè)計出方便,好用,符合看點業(yè)務(wù)場景的實時數(shù)倉是有難度的。
先看一下實時數(shù)倉做了什么,實時數(shù)倉對外就是幾個消息隊列,不同的消息隊列里面存放的就是不同聚合粒度的實時數(shù)據(jù),包括內(nèi)容 ID、用戶 ID、C 側(cè)行為數(shù)據(jù)、B 側(cè)內(nèi)容維度數(shù)據(jù)和用戶畫像數(shù)據(jù)等。
我們是怎么搭建實時數(shù)倉的,就是上面介紹的實時計算引擎的輸出,放到消息隊列中保存,可以提供給下游多用戶復(fù)用。
我們可以看下,在我們建設(shè)實時數(shù)據(jù)倉庫前后,開發(fā)一個實時應(yīng)用的區(qū)別。沒有數(shù)倉的時候,我們需要消費千萬級/s 的原始隊列,進(jìn)行復(fù)雜的數(shù)據(jù)清洗,然后再進(jìn)行用戶畫像關(guān)聯(lián)、內(nèi)容維度關(guān)聯(lián),才能拿到符合要求格式的實時數(shù)據(jù),開發(fā)和擴展的成本都會比較高,如果想開發(fā)一個新的應(yīng)用,又要走一遍這個流程。有了數(shù)倉之后,如果想開發(fā)內(nèi)容 ID 粒度的實時應(yīng)用,就直接申請 TPS 萬級/s 的 DWS 層的消息隊列。開發(fā)成本變低很多,資源消耗小很多,可擴展性也強很多。
看個實際例子,開發(fā)我們系統(tǒng)的實時數(shù)據(jù)大屏,原本需要進(jìn)行如上所有操作,才能拿到數(shù)據(jù)?,F(xiàn)在只需要消費 DWS 層消息隊列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 內(nèi)存。
可以看到,以 50 個消費者為例,建立實時數(shù)倉前后,下游開發(fā)一個實時應(yīng)用,可以減少 98%的資源消耗。包括計算資源,存儲資源,人力成本和開發(fā)人員學(xué)習(xí)接入成本等等。并且消費者越多,節(jié)省越多。就拿 Redis 存儲這一部分來說,一個月就能省下上百萬人民幣。
5) 實時存儲
介紹完實時計算,再來介紹實時存儲。
這塊分為三個部分來介紹
- 第一是 分布式-高可用
- 第二是 海量數(shù)據(jù)-寫入
- 第三是 高性能-查詢
1. 分布式-高可用
我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實現(xiàn)高可用的方案。數(shù)據(jù)寫入一個分片,僅寫入一個副本,然后再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取數(shù)據(jù),保證數(shù)據(jù)一致性。
這里沒有選用消息隊列進(jìn)行數(shù)據(jù)同步,是因為 ZK 更加輕量級。而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的數(shù)據(jù)。而且就算其它節(jié)點第一次來獲取數(shù)據(jù)失敗了,后面只要發(fā)現(xiàn)它跟 ZK 上記錄的數(shù)據(jù)不一致,就會再次嘗試獲取數(shù)據(jù),保證一致性。
2. 海量數(shù)據(jù)-寫入
數(shù)據(jù)寫入遇到的第一個問題是,海量數(shù)據(jù)直接寫入 Clickhouse 的話,會導(dǎo)致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入。Batch 設(shè)置多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游內(nèi)存壓力太大,通過實驗,最終我們選用了大小幾十萬的 Batch。
第二個問題是,隨著數(shù)據(jù)量的增長,單 QQ 看點的視頻內(nèi)容每天可能寫入百億級的數(shù)據(jù),默認(rèn)方案是寫一張分布式表,這就會造成單臺機器出現(xiàn)磁盤的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似于 HBase、RocketsDB 的底層 LSM-Tree。在合并的過程中會存在寫放大的問題,加重磁盤壓力。峰值每分鐘幾千萬條數(shù)據(jù),寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢。我們做的兩個優(yōu)化方案:一是對磁盤做 Raid,提升磁盤的 IO;二是在寫入之前進(jìn)行分表,直接分開寫入到不同的分片上,磁盤壓力直接變?yōu)?1/N。
第三個問題是,雖然我們寫入按照分片進(jìn)行了劃分,但是這里引入了一個分布式系統(tǒng)常見的問題,就是局部的 Top 并非全局 Top 的問題。比如同一個內(nèi)容 ID 的數(shù)據(jù)落在了不同的分片上,計算全局 Top100 閱讀的內(nèi)容 ID,有一個內(nèi)容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導(dǎo)致匯總的時候,會丟失一部分?jǐn)?shù)據(jù),影響最終結(jié)果。我們做的優(yōu)化是在寫入之前加上一層路由,將同一個內(nèi)容 ID 的記錄,全部路由到同一個分片上,解決了該問題。
介紹完寫入,下一步介紹 Clickhouse 的高性能存儲和查詢。
3. 高性能-存儲-查詢
Clickhouse 高性能查詢的一個關(guān)鍵點是稀疏索引。稀疏索引這個設(shè)計就很有講究,設(shè)計得好可以加速查詢,設(shè)計不好反而會影響查詢效率。我根據(jù)我們的業(yè)務(wù)場景,因為我們的查詢大部分都是時間和內(nèi)容 ID 相關(guān)的,比如說,某個內(nèi)容,過去 N 分鐘在各個人群表現(xiàn)如何?我按照日期,分鐘粒度時間和內(nèi)容 ID 建立了稀疏索引。針對某個內(nèi)容的查詢,建立稀疏索引之后,可以減少 99%的文件掃描。
還有一個問題就是,我們現(xiàn)在數(shù)據(jù)量太大,維度太多。拿 QQ 看點的視頻內(nèi)容來說,一天流水有上百億條,有些維度有幾百個類別。如果一次性把所有維度進(jìn)行預(yù)聚合,數(shù)據(jù)量會指數(shù)膨脹,查詢反而變慢,并且會占用大量內(nèi)存空間。我們的優(yōu)化,針對不同的維度,建立對應(yīng)的預(yù)聚合物化視圖,用空間換時間,這樣可以縮短查詢的時間。
分布式表查詢還會有一個問題,查詢單個內(nèi)容 ID 的信息,分布式表會將查詢下發(fā)到所有的分片上,然后再返回查詢結(jié)果進(jìn)行匯總。實際上,因為做過路由,一個內(nèi)容 ID 只存在于一個分片上,剩下的分片都在空跑。針對這類查詢,我們的優(yōu)化是后臺按照同樣的規(guī)則先進(jìn)行路由,直接查詢目標(biāo)分片,這樣減少了 N-1/N 的負(fù)載,可以大量縮短查詢時間。而且由于我們是提供的 OLAP 查詢,數(shù)據(jù)滿足最終一致性即可,通過主從副本讀寫分離,可以進(jìn)一步提升性能。
我們在后臺還做了一個 1 分鐘的數(shù)據(jù)緩存,針對相同條件查詢,后臺就直接返回了。
4. 擴容
這里再介紹一下我們的擴容的方案,調(diào)研了業(yè)內(nèi)的一些常見方案。
比如 HBase,原始數(shù)據(jù)都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始數(shù)據(jù)的遷移。但是 Clickhouse 的每個分片數(shù)據(jù)都是在本地,是一個比較底層存儲引擎,不能像 HBase 那樣方便擴容。
Redis 是哈希槽這種類似一致性哈希的方式,是比較經(jīng)典分布式緩存的方案。Redis slot 在 Rehash 的過程中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最后再刪除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由于列式存儲,不支持刪除的特性,一致性哈希的方案不是很適合。
目前擴容的方案是,另外消費一份數(shù)據(jù),寫入新 Clickhouse 集群,兩個集群一起跑一段時間,因為實時數(shù)據(jù)就保存 3 天,等 3 天之后,后臺服務(wù)直接訪問新集群。
4. 有贊實時數(shù)倉案例
1) 分層設(shè)計
傳統(tǒng)離線數(shù)倉的分層設(shè)計大家都很熟悉,為了規(guī)范的組織和管理數(shù)據(jù),層級劃分會比較多,在一些復(fù)雜邏輯處理場景還會引入臨時層落地中間結(jié)果以方便下游加工處理。實時數(shù)倉考慮到時效性問題,分層設(shè)計需要盡量精簡,降低中間流程出錯的可能性,不過總體而言,實時數(shù)倉還是會參考離線數(shù)倉的分層思想來設(shè)計。
實時數(shù)倉分層架構(gòu)如下圖所示 :
- ODS(實時數(shù)據(jù)接入層)
ODS 層,即實時數(shù)據(jù)接入層,通過數(shù)據(jù)采集工具收集各個業(yè)務(wù)系統(tǒng)的實時數(shù)據(jù),對非結(jié)構(gòu)化的數(shù)據(jù)進(jìn)行結(jié)構(gòu)化處理,保存原始數(shù)據(jù),幾乎不過濾數(shù)據(jù);該層數(shù)據(jù)的主要來源有三個部分:第一部分是業(yè)務(wù)方創(chuàng)建的 NSQ 消息,第二部分是業(yè)務(wù)數(shù)據(jù)庫的 Binlog 日志,第三部分是埋點日志和應(yīng)用程序日志,以上三部分的實時數(shù)據(jù)最終統(tǒng)一寫入 Kafka 存儲介質(zhì)中。
ODS 層表命名規(guī)范:部門名稱.應(yīng)用名稱.數(shù)倉層級主題域前綴數(shù)據(jù)庫名/消息名
例如:接入業(yè)務(wù)庫的 Binlog
實時數(shù)倉表命名:
deptname.appname.ods_subjectname_tablename
例如:接入業(yè)務(wù)方的 NSQ 消息
實時數(shù)倉表命名:
deptname.appname.ods_subjectname_msgname
- DWS(實時明細(xì)中間層)
DWS 層,即實時明細(xì)中間層,該層以業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程事件來構(gòu)建最細(xì)粒度的明細(xì)層事實表;比如交易過程,有下單事件、支付事件、發(fā)貨事件等,我們會基于這些獨立的事件來進(jìn)行明細(xì)層的構(gòu)建。在這層,事實明細(xì)數(shù)據(jù)同樣是按照離線數(shù)倉的主題域來進(jìn)行劃分,也會采用維度建模的方式組織數(shù)據(jù),對于一些重要的維度字段,會做適當(dāng)冗余?;谟匈潓崟r需求的場景,重點建設(shè)交易、營銷、客戶、店鋪、商品等主題域的數(shù)據(jù)。該層的數(shù)據(jù)來源于 ODS 層,通過 FlinkSQL 進(jìn)行 ETL 處理,主要工作有規(guī)范命名、數(shù)據(jù)清洗、維度補全、多流關(guān)聯(lián),最終統(tǒng)一寫入 Kafka 存儲介質(zhì)中。
DWS 層表命名規(guī)范:
部門名稱.應(yīng)用名稱.數(shù)倉層級_主題域前綴_數(shù)倉表命名
例如:實時事件 A 的中間層
實時數(shù)倉表命名:
deptname.appname.dws_subjectname_tablename_eventnameA
例如:實時事件 B 的中間層
實時數(shù)倉表命名:
deptname.appname.dws_subjectname_tablename_eventnameB
- DIM(實時維表層)
DIM 層,即實時維表層,用來存放維度數(shù)據(jù),主要用于實時明細(xì)中間層寬化處理時補全維度使用,目前該層的數(shù)據(jù)主要存儲于 HBase 中,后續(xù)會基于 QPS 和數(shù)據(jù)量大小提供更多合適類型的存儲介質(zhì)。
DIM 層表命名規(guī)范:
應(yīng)用名稱_數(shù)倉層級_主題域前綴_數(shù)倉表命名
例如:HBase 存儲,實時維度表
實時數(shù)倉表命名:
appname_dim_tablename
- DWA(實時匯總層)
DWA 層,即實時匯總層,該層通過 DWS 層數(shù)據(jù)進(jìn)行多維匯總,提供給下游業(yè)務(wù)方使用,在實際應(yīng)用過程中,不同業(yè)務(wù)方使用維度匯總的方式不太一樣,根據(jù)不同的需求采用不同的技術(shù)方案去實現(xiàn)。第一種方式,采用 FlinkSQL 進(jìn)行實時匯總,將結(jié)果指標(biāo)存入 HBase、MySQL 等數(shù)據(jù)庫,該種方式是我們早期采用的方案,優(yōu)點是實現(xiàn)業(yè)務(wù)邏輯比較靈活,缺點是聚合粒度固化,不易擴展;第二種方式,采用實時 OLAP 工具進(jìn)行匯總,該種方式是我們目前常用的方案,優(yōu)點是聚合粒度易擴展,缺點是業(yè)務(wù)邏輯需要在中間層預(yù)處理。
DWA 層表命名規(guī)范:
應(yīng)用名稱_數(shù)倉層級_主題域前綴_聚合粒度_數(shù)據(jù)范圍
例如:HBase 存儲,某域當(dāng)日某粒度實時匯總表
實時數(shù)倉表命名:
appname_dwa_subjectname_aggname_daily
- APP(實時應(yīng)用層)
APP 層,即實時應(yīng)用層,該層數(shù)據(jù)已經(jīng)寫入應(yīng)用系統(tǒng)的存儲中,例如寫入 Druid 作為 BI 看板的實時數(shù)據(jù)集;寫入 HBase、MySQL 用于提供統(tǒng)一數(shù)據(jù)服務(wù)接口;寫入 ClickHouse 用于提供實時 OLAP 服務(wù)。因為該層非常貼近業(yè)務(wù),在命名規(guī)范上實時數(shù)倉不做統(tǒng)一要求。
2) 實時 ETL
實時數(shù)倉 ETL 處理過程所涉及的組件比較多,接下來盤點構(gòu)建實時數(shù)倉所需要的組件以及每個組件的應(yīng)用場景。如下圖所示:
具體實時 ETL 處理流程如下圖所示:
1. 維度補全
創(chuàng)建調(diào)用 Duboo 接口的 UDF 函數(shù)在實時流里補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 接口壓力會過大。在實際應(yīng)用場景補全維度首選還是關(guān)聯(lián)維度表,但關(guān)聯(lián)也存在一定概率的丟失問題,為了彌補這種丟失,可以采用 Duboo 接口調(diào)用兜底的方式來補全。偽代碼如下:
create?function?call_dubbo?as?'XXXXXXX';
create?function?get_json_object?as?'XXXXXXX';
case
????when?cast(?b.column?as?bigint)?is?not?null
????????then?cast(?b.column?as?bigint)
????????????else?cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'
??????????????????????????????????????????????????????????????,'serviceName'
??????????????????????????????????????????????????????????????,'methodName'
??????????????????????????????????????????????????????????????,cast(concat('[',cast(a.column?as?varchar),']')?as?varchar)
??????????????????????????????????????????????????????????????,'key'
??????????????????????????????????????????????????????????????)
?????????????????????????????????????????????,'rootId')
?????????????????????????????????????????as?bigint)
???????????????????????????????????,a.column)
????????????????????????????as?bigint)??end
2. 冪等處理
實時任務(wù)在運行過程中難免會遇到執(zhí)行異常的情況,當(dāng)任務(wù)異常重啟的時候會導(dǎo)致部分消息重新發(fā)送和消費,從而引發(fā)下游實時統(tǒng)計數(shù)據(jù)不準(zhǔn)確,為了有效避免這種情況,可以選擇對實時消息流做冪等處理,當(dāng)消費完一條消息,將這條消息的 Key 存入 KV,如果任務(wù)異常重啟導(dǎo)致消息重新發(fā)送的時候,先從 KV 判斷該消息是否已被消費,如果已消費就不再往下發(fā)送。偽代碼如下:
create?function?idempotenc?as?'XXXXXXX';
insert?into?table
select
????order_no
from
????(
????????select
????????????a.orderNo????????????????????????????????????????as??order_no
??????????,?idempotenc('XXXXXXX',?coalesce(?order_no,?'')?)??as??rid
????????from
????????????table1
????)?t
where
????t.rid?=?0;
3. 數(shù)據(jù)驗證
由于實時數(shù)倉的數(shù)據(jù)是無邊界的流,相比于離線數(shù)倉固定不變的數(shù)據(jù)更難驗收?;诓煌膱鼍埃覀兲峁┝?2 種驗證方式,分別是:抽樣驗證與全量驗證。如圖 3.3 所示
- 抽樣驗證方案
該方案主要應(yīng)用在數(shù)據(jù)準(zhǔn)確性驗證上,實時匯總結(jié)果是基于存儲在 Kafka 的實時明細(xì)中間層計算而來,但 Kafka 本身不支持按照特定條件檢索,不支持寫查詢語句,再加上消息的無邊界性,統(tǒng)計結(jié)果是在不斷變化的,很難尋找參照物進(jìn)行比對。鑒于此,我們采用了持久化消息的方法,將消息落盤到 TiDB 存儲,基于 TiDB 的能力對落盤的消息進(jìn)行檢索、查詢、匯總。編寫固定時間邊界的測試用例與相同時間邊界的業(yè)務(wù)庫數(shù)據(jù)或者離線數(shù)倉數(shù)據(jù)進(jìn)行比對。通過以上方式,抽樣核心店鋪的數(shù)據(jù)進(jìn)行指標(biāo)準(zhǔn)確性驗證,確保測試用例全部通過。
- 全量驗證方案
該方案主要應(yīng)用在數(shù)據(jù)完整性和一致性驗證上,在實時維度表驗證的場景使用最多。大體思路:將存儲實時維度表的在線 HBase 集群中的數(shù)據(jù)同步到離線 HBase 集群中,再將離線 HBase 集群中的數(shù)據(jù)導(dǎo)入到 Hive 中,在限定實時維度表的時間邊界后,通過數(shù)據(jù)平臺提供的數(shù)據(jù)校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的數(shù)據(jù)完全一致。
4. 數(shù)據(jù)恢復(fù)
實時任務(wù)一旦上線就要求持續(xù)不斷的提供準(zhǔn)確、穩(wěn)定的服務(wù)。區(qū)別于離線任務(wù)按天調(diào)度,如果離線任務(wù)出現(xiàn) Bug,會有充足的時間去修復(fù)。如果實時任務(wù)出現(xiàn) Bug,必須按照提前制定好的流程,嚴(yán)格按照步驟執(zhí)行,否則極易出現(xiàn)問題。造成 Bug 的情況有非常多,比如代碼 Bug、異常數(shù)據(jù) Bug、實時集群 Bug,如下圖展示了修復(fù)實時任務(wù) Bug 并恢復(fù)數(shù)據(jù)的流程。
5. 騰訊全場景實時數(shù)倉建設(shè)案例
在數(shù)倉體系中會有各種各樣的大數(shù)據(jù)組件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據(jù)不同的需求,用戶會構(gòu)建大數(shù)據(jù)存儲和處理平臺,數(shù)據(jù)在平臺經(jīng)過處理和分析,結(jié)果數(shù)據(jù)會保存到 MySQL、Elasticsearch 等支持快速查詢的關(guān)系型、非關(guān)系型數(shù)據(jù)庫中,接下來應(yīng)用層就可以基于這些數(shù)據(jù)進(jìn)行 BI 報表開發(fā)、用戶畫像,或基于 Presto 這種 OLAP 工具進(jìn)行交互式查詢等。
1) Lambda 架構(gòu)的痛點
在整個過程中我們常常會用一些離線的調(diào)度系統(tǒng),定期的(T+1 或者每隔幾小時)去執(zhí)行一些 Spark 分析任務(wù),做一些數(shù)據(jù)的輸入、輸出或是 ETL 工作。離線數(shù)據(jù)處理的整個過程中必然存在數(shù)據(jù)延遲的現(xiàn)象,不管是數(shù)據(jù)接入還是中間的分析,數(shù)據(jù)的延遲都是比較大的,可能是小時級也有可能是天級別的。另外一些場景中我們也常常會為了一些實時性的需求去構(gòu)建一個實時處理過程,比如借助 Flink+Kafka 去構(gòu)建實時的流處理系統(tǒng)。
整體上,數(shù)倉架構(gòu)中有非常多的組件,大大增加了整個架構(gòu)的復(fù)雜性和運維的成本。
如下圖,這是很多公司之前或者現(xiàn)在正在采用的 Lambda 架構(gòu),Lambda 架構(gòu)將數(shù)倉分為離線層和實時層,相應(yīng)的就有批處理和流處理兩個相互獨立的數(shù)據(jù)處理流程,同一份數(shù)據(jù)會被處理兩次以上,同一套業(yè)務(wù)邏輯代碼需要適配性的開發(fā)兩次。Lambda 架構(gòu)大家應(yīng)該已經(jīng)非常熟悉了,下面我就著重介紹一下我們采用 Lambda 架構(gòu)在數(shù)倉建設(shè)過程中遇到的一些痛點問題。
例如在實時計算一些用戶相關(guān)指標(biāo)的實時場景下,我們想看到當(dāng)前 pv、uv 時,我們會將這些數(shù)據(jù)放到實時層去做一些計算,這些指標(biāo)的值就會實時呈現(xiàn)出來,但同時想了解用戶的一個增長趨勢,需要把過去一天的數(shù)據(jù)計算出來。這樣就需要通過批處理的調(diào)度任務(wù)來實現(xiàn),比如凌晨兩三點的時候在調(diào)度系統(tǒng)上起一個 Spark 調(diào)度任務(wù)把當(dāng)天所有的數(shù)據(jù)重新跑一遍。
很顯然在這個過程中,由于兩個過程運行的時間是不一樣的,跑的數(shù)據(jù)卻相同,因此可能造成數(shù)據(jù)的不一致。因為某一條或幾條數(shù)據(jù)的更新,需要重新跑一遍整個離線分析的鏈路,數(shù)據(jù)更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發(fā)流程和運維成本其實都是非常高的。
為了解決 Lambda 架構(gòu)帶來的各種問題,就誕生了 Kappa 架構(gòu),這個架構(gòu)大家應(yīng)該也非常的熟悉。
2) Kappa 架構(gòu)的痛點
我們來講一下 Kappa 架構(gòu),如下圖,它中間其實用的是消息隊列,通過用 Flink 將整個鏈路串聯(lián)起來。Kappa 架構(gòu)解決了 Lambda 架構(gòu)中離線處理層和實時處理層之間由于引擎不一樣,導(dǎo)致的運維成本和開發(fā)成本高昂的問題,但 Kappa 架構(gòu)也有其痛點。
首先,在構(gòu)建實時業(yè)務(wù)場景時,會用到 Kappa 去構(gòu)建一個近實時的場景,但如果想對數(shù)倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進(jìn)一步的數(shù)據(jù)處理時,如將數(shù)據(jù)寫到 DWD 層的 Kafka,則需要另外接入 Flink。同時,當(dāng)需要從 DWD 層的 Kafka 把數(shù)據(jù)再導(dǎo)入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做進(jìn)一步的分析時,顯然就增加了整個架構(gòu)的復(fù)雜性。
其次,Kappa 架構(gòu)是強烈依賴消息隊列的,我們知道消息隊列本身在整個鏈路上數(shù)據(jù)計算的準(zhǔn)確性是嚴(yán)格依賴它上游數(shù)據(jù)的順序,消息隊列接的越多,發(fā)生亂序的可能性就越大。ODS 層數(shù)據(jù)一般是絕對準(zhǔn)確的,把 ODS 層的數(shù)據(jù)發(fā)送到下一個 kafka 的時候就有可能發(fā)生亂序,DWD 層再發(fā)到 DWS 的時候可能又亂序了,這樣數(shù)據(jù)不一致性就會變得很嚴(yán)重。
第三,Kafka 由于它是一個順序存儲的系統(tǒng),順序存儲系統(tǒng)是沒有辦法直接在其上面利用 OLAP 分析的一些優(yōu)化策略,例如謂詞下推這類的優(yōu)化策略,在順序存儲的 Kafka 上來實現(xiàn)是比較困難的事情。
那么有沒有這樣一個架構(gòu),既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發(fā)的成本,解決通過消息隊列構(gòu)建 Kappa 架構(gòu)過程中遇到的一些痛點?答案是肯定的,后面的篇幅會詳細(xì)論述。
3) 痛點總結(jié)
4) Flink+Iceberg 構(gòu)建實時數(shù)倉
1. 近實時的數(shù)據(jù)接入
前面介紹了 Iceberg 既支持讀寫分離,又支持并發(fā)讀、增量讀、小文件合并,還可以支持秒級到分鐘級的延遲,基于這些優(yōu)勢我們嘗試采用 Iceberg 這些功能來構(gòu)建基于 Flink 的實時全鏈路批流一體化的實時數(shù)倉架構(gòu)。
如下圖所示,Iceberg 每次的 commit 操作,都是對數(shù)據(jù)的可見性的改變,比如說讓數(shù)據(jù)從不可見變成可見,在這個過程中,就可以實現(xiàn)近實時的數(shù)據(jù)記錄。
2. 實時數(shù)倉 - 數(shù)據(jù)湖分析系統(tǒng)
此前需要先進(jìn)行數(shù)據(jù)接入,比如用 Spark 的離線調(diào)度任務(wù)去跑一些數(shù)據(jù),拉取,抽取最后再寫入到 Hive 表里面,這個過程的延時比較大。有了 Iceberg 的表結(jié)構(gòu),可以中間使用 Flink,或者 spark streaming,完成近實時的數(shù)據(jù)接入。
基于以上功能,我們再來回顧一下前面討論的 Kappa 架構(gòu),Kappa 架構(gòu)的痛點上面已經(jīng)描述過,Iceberg 既然能夠作為一個優(yōu)秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?
Iceberg 底層依賴的存儲是像 HDFS 或 S3 這樣的廉價存儲,而且 Iceberg 是支持 parquet、orc、Avro 這樣的列式存儲。有列式存儲的支持,就可以對 OLAP 分析進(jìn)行基本的優(yōu)化,在中間層直接進(jìn)行計算。例如謂詞下推最基本的 OLAP 優(yōu)化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務(wù)天級別到小時級別的延遲大大的降低,改造成一個近實時的數(shù)據(jù)湖分析系統(tǒng)。
在中間處理層,可以用 presto 進(jìn)行一些簡單的查詢,因為 Iceberg 支持 Streaming read,所以在系統(tǒng)的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務(wù),把中間結(jié)果做進(jìn)一步計算后輸出到下游。
替換 Kafka 的優(yōu)劣勢:
總的來說,Iceberg 替換 Kafka 的優(yōu)勢主要包括:
- 實現(xiàn)存儲層的流批統(tǒng)一
- 中間層支持 OLAP 分析
- 完美支持高效回溯
- 存儲成本降低
當(dāng)然,也存在一定的缺陷,如:
- 數(shù)據(jù)延遲從實時變成近實時
- 對接其他數(shù)據(jù)系統(tǒng)需要額外開發(fā)工作
秒級分析 - 數(shù)據(jù)湖加速:文章來源:http://www.zghlxwxcb.cn/news/detail-480490.html
由于 Iceberg 本身是將數(shù)據(jù)文件全部存儲在 HDFS 上的,HDFS 讀寫這塊對于秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支持 Alluxio 這樣一個緩存,借助于緩存的能力可以實現(xiàn)數(shù)據(jù)湖的加速。這塊的架構(gòu)也在我們未來的一個規(guī)劃和建設(shè)中。文章來源地址http://www.zghlxwxcb.cn/news/detail-480490.html
到了這里,關(guān)于詳解大廠實時數(shù)倉建設(shè)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!