業(yè)務(wù)背景&痛點(diǎn)
- 流式處理的業(yè)務(wù)場景,經(jīng)常會遇到實(shí)時(shí)消息數(shù)據(jù)需要與歷史存量數(shù)據(jù)關(guān)聯(lián)查詢或者聚合,比如電商常見的訂單場景,訂單表做為實(shí)時(shí)事實(shí)表,是典型的流式消息數(shù)據(jù),通常會在 kafka 中,而客戶信息,商品 SKU 表是維度表,通常存在業(yè)務(wù)數(shù)據(jù)庫或者數(shù)倉中,是典型的離線數(shù)據(jù)。實(shí)時(shí)訂單數(shù)據(jù)在實(shí)時(shí)處理時(shí)通常需要事實(shí)表與維度表 join 做 reference 補(bǔ)全,以便拿到訂單詳情并實(shí)時(shí)統(tǒng)計(jì)當(dāng)天或截至當(dāng)天的所有訂單的商品分布詳情。
亞馬遜云科技開發(fā)者社區(qū)為開發(fā)者們提供全球的開發(fā)技術(shù)資源。這里有技術(shù)文檔、開發(fā)案例、技術(shù)專欄、培訓(xùn)視頻、活動與競賽等。幫助中國開發(fā)者對接世界最前沿技術(shù),觀點(diǎn),和項(xiàng)目,并將中國優(yōu)秀開發(fā)者或技術(shù)推薦給全球云社區(qū)。如果你還沒有關(guān)注/收藏,看到這里請一定不要匆匆劃過,點(diǎn)這里(https://dev.amazoncloud.cn/user/register?show=tab1&sc_channel=CSDN)讓它成為你的技術(shù)寶庫! |
- 流式計(jì)算通常采用 Flink 做為數(shù)據(jù)處理平臺,上文中提到的實(shí)時(shí)和離線數(shù)據(jù)join 的場景,F(xiàn)link 提供了 Hive/ jdbc/Hudi/ filesystem 各種 connector 實(shí)現(xiàn)與離線數(shù)據(jù)的提取和讀寫,這樣一來在 Flink 應(yīng)用程序中,即可使用 Table,Sql API 來 join 關(guān)聯(lián)流態(tài)表和離線表數(shù)據(jù),實(shí)現(xiàn)聚合計(jì)算等操作
使用 Flink Sql 離線表 Join 流態(tài)表的常規(guī) lookup join,是通過 Flink hive sql connector 或者 filesystem connector,對離線 hive 庫表或者 S3上離線數(shù)據(jù)建 Flink Table,然后對 kafka 消息流中的數(shù)據(jù)建流態(tài)表,然后直接做量表做 join 操作
該方式架構(gòu)如下圖所示:
該方式主要面臨的問題是:
- lookup 維度表數(shù)據(jù)只會在首次拉起 Flink 應(yīng)用的時(shí)候,保存在 task manager state 中,后續(xù)持續(xù)查詢或者開窗聚合等操作時(shí),是不會再次拉取維度表數(shù)據(jù),業(yè)務(wù)需要定期重啟 Flink 應(yīng)用,或者刷新維度表數(shù)據(jù)到臨時(shí)表,以便 join 聚合時(shí)和最新的維度數(shù)據(jù)關(guān)聯(lián):
- 每次需要重新全量拉取維度表數(shù)據(jù),存在冷啟動問題,且維度表數(shù)據(jù)量大的時(shí)候(如上千萬注冊用戶信息表,上萬的商品 SKU 屬性字段),造成很大 IO 開銷,存在性能瓶頸
- Flink 的 checkpoint 機(jī)制在持續(xù)查詢或者開窗聚合時(shí),需要保存 state 狀態(tài)及處理數(shù)據(jù)到檢查點(diǎn)快照中,造成 state 快照數(shù)據(jù)膨脹
解決方案思路
基于以上業(yè)務(wù)難點(diǎn),本文提出一種解決方案思路,即通過 Alluxio 緩存層,將 hive 維度表數(shù)據(jù)自動加載至 Alluxio UFS 緩存中,同時(shí)通過 Flink 時(shí)態(tài)表 join,把維度表數(shù)據(jù)做成持續(xù)變化表上某一時(shí)刻的視圖
同時(shí)使用 Flink 的 Temporal table function 表函數(shù),傳遞一個(gè)時(shí)間參數(shù),返回 Temporal table 這一指定時(shí)刻的視圖,這樣實(shí)時(shí)動態(tài)表主表與這個(gè) Temporal table 表關(guān)聯(lián)的時(shí)候,可以關(guān)聯(lián)到某一個(gè)版本(歷史上某一個(gè)時(shí)刻)的維度數(shù)據(jù)
優(yōu)化后的整體架構(gòu)如下圖所示:
方案實(shí)施落地Detail
本文以 Kafka 中用戶行為日志數(shù)據(jù)做為實(shí)時(shí)流態(tài)的事實(shí)表數(shù)據(jù),hive 上用戶信息數(shù)據(jù)做為離線維度表數(shù)據(jù),采用 Alluxio+Flink temproal 的 demo,來驗(yàn)證其 flink join 優(yōu)化的解決方案
實(shí)時(shí)事實(shí)表
本實(shí)例中我們使用 json-data-generator 開源組件模擬的用戶行為 json 數(shù)據(jù),實(shí)時(shí)寫入 kafka 中,通過 Flink kafka connector 轉(zhuǎn)換為持續(xù)查詢的 Flink 流態(tài)表,從而做為實(shí)時(shí) join 的時(shí)候的 Fact 事實(shí)表數(shù)據(jù)
用戶行為 json 模擬數(shù)據(jù)如下格式:
[{ "timestamp": "nowTimestamp()",
"system": "BADGE",
"actor": "Agnew",
"action": "EXIT",
"objects": ["Building 1"],
"location": "45.5,44.3",
"message": "Exited Building 1"
}]
包含用戶行為的業(yè)務(wù)時(shí)間,登錄系統(tǒng),用戶署名,行為 activity 動作,操作涉及對象,位置信息,及相關(guān)文本消息字段。我們在
Flink Sql 中建選擇主要字段建事實(shí)表如下
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup6',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
Alluxio 緩存維度表
Alluxio 是大數(shù)據(jù)技術(shù)堆棧的分布式緩存,它提供了一個(gè)統(tǒng)一的 UFS 文件系統(tǒng)可以對接底層 S3,hdfs 數(shù)據(jù),在讀寫 Alluxio UFS 的時(shí)候,可以針對 S3,HDFS 分布式存儲層實(shí)現(xiàn) warm up,顯著提升吞吐量和減少網(wǎng)絡(luò)開銷,且與上層計(jì)算引擎如 Hive,spark,Trino 都有深度的集成,很適合做為離線維度數(shù)據(jù)的緩存加速器
Amazon EMR 對 Alluxio 提供了良好的集成,可以通過 boostrap 啟動腳本方式,在 EMR 創(chuàng)建時(shí)自動部署 Alluxio 組件并啟動 Alluxio master、worker 進(jìn)程,詳細(xì) EMR 安裝和部署 Alluxio 步驟可以參考另一篇文章?Alluxio EMR 集成實(shí)踐
在集成 Alluxio 的 Amazon EMR 集群中,使用 Alluxio 中創(chuàng)建 hive 離線維表數(shù)據(jù)的緩存表方法如下:
hive-env.sh中設(shè)置設(shè)置client jar包:
$ export HIVE_AUX_JARS_PATH=/<PATH_TO_ALLUXIO>/client/alluxio-2.2.0-client.jar:${HIVE_AU
確保安裝部署alluxio的EMR集群上ufs已配置,并且表或者db路徑已創(chuàng)建
alluxio fs mkdir alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
alluxio fs chown hadoop:hadoop alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
在AWS EMR集群上,創(chuàng)建hive表路徑指向alluxio namespace uri:
!connect jdbc:hive2://xxx.xxx.xxx.xxx:10000/default;
hive> CREATE TABLE customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer';
OK
Time taken: 3.485 seconds
如上所示,該 Alluxio 表 location 指向的路徑即為 hive 維度表所在 S3路徑,因此對 Customer 用戶維度信息表的寫入操作會自動同步到 alluxio 緩存中。
創(chuàng)建好 Alluxio hive 離線維度表后,在 flink sql中,可以通過 hive 的 catalog,連接到 hive 元數(shù)據(jù),即可以查看到 alluxio 緩存表的詳細(xì)信息:
CREATE CATALOG hiveCatalog WITH ( 'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf/',
'hive-version' = '3.1.2',
'hadoop-conf-dir'='/etc/hadoop/conf/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hiveCatalog;
show create table customer;
create external table customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
row format delimited fields terminated by '|'
location 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer'
TBLPROPERTIES (
'streaming-source.enable' = 'false',
'lookup.join.cache.ttl' = '12 h'
)
如上圖所示,可以看到該維度表 location 路徑是 alluxio 緩存 ufs 路徑的 uri,業(yè)務(wù)程序讀寫該維度表時(shí),alluxio 會自動更新緩存中的 customer 維度表數(shù)據(jù),并異步寫入到 alluxio的backend storage 的 S3表路徑,實(shí)現(xiàn)數(shù)據(jù)湖的表數(shù)據(jù)同步更新。
Flink Temporal 時(shí)態(tài)表 join
Flink 時(shí)態(tài)表(Temporal table)也是動態(tài)表的一種,時(shí)態(tài)表的每條記錄都會有一個(gè)或多個(gè)時(shí)間字段相關(guān)聯(lián),當(dāng)我們事實(shí)表 join 維度表的時(shí)候,通常需要獲取實(shí)時(shí)的維度表數(shù)據(jù)做 lookup,所以通常需要在事實(shí)表 create table 或者 join 時(shí),通過 proctime()函數(shù)指定事實(shí)表的時(shí)間字段,同時(shí)在 join 時(shí),通過 FOR SYSTEM_TIME AS OF 語法,指定維度表 lookup 時(shí)對應(yīng)的事實(shí)表時(shí)間版本的數(shù)據(jù)
在本 Demo 示例中,客戶信息在 hive 離線表作為一個(gè)變化的維度表的角色,客戶行為在 kafka 中作為事實(shí)表的角色,因此在 flink kafka source table 中,通過 proctime()指定時(shí)間字段,然后在 flink hive table 做 join 時(shí),使用 FOR SYSTEM_TIME AS OF 指定 lookup 的 kafka source table 的時(shí)間字段,從而實(shí)現(xiàn) Flink temporal 時(shí)態(tài)表 join 業(yè)務(wù)處理
如下所示,F(xiàn)link Sql 中通過 Kafka connector 創(chuàng)建用戶行為的事實(shí)表,其中 ts 字段即為時(shí)態(tài)表 join 時(shí)的時(shí)間戳:
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING,
ts as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup-01',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
Flink 離線維度表與流式實(shí)時(shí)表具體 join 方法如下:
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from user_logevent_source) as a
left join customer FOR SYSTEM_TIME AS OF a.proctime as b on a.actor=b.c_last_name;
如上代碼示例,在事實(shí)表 logevent_source join lookup 維度表時(shí),通過 proctime 函數(shù)獲取到維度表的瞬時(shí)最新的版本數(shù)據(jù),保障 join 時(shí)的一致性和實(shí)時(shí)性
同時(shí),該維度表數(shù)據(jù)已經(jīng)在 alluxio cache,因此讀取時(shí)性能遠(yuǎn)高于離線讀取 s3上的表數(shù)據(jù)
通過 hive 切換 S3和 alluxio 路徑的 customer 信息 維度表,對比測試 flink join 可以看出 alluxio 緩存后性能明顯優(yōu)勢
通過 alter table 方便切換本地和 cache 的 location路徑:
alter table customer set location "s3://xxxxxx/data/s3/30/customer";
alter table customer set location "alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer";
選取某一 split 數(shù)據(jù)分片的 TaskManager 日志:
- cache 前(S3路徑讀取): 5s 加載
2022-06-29 02:54:34,791 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 02:54:39,971 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
- cache 后(alluxio 讀取): 2s 加載
2022-06-29 03:25:14,476 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 03:25:16,397 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
在 JobManager 上查看 Timeline,對比 alluxio 和 s3路徑下 job 的執(zhí)行時(shí)間可以看到更加清楚
?
可以看到, 單個(gè) task 查詢提升1倍以上,整體 job 性能提升更加明顯
其他需要考慮的問題
持續(xù) Join 每次都需要拉取維度數(shù)據(jù)做 join,F(xiàn)link 的 checkpoint state 是否一直膨脹導(dǎo)致 TM 的 RockDB 撐爆或者內(nèi)存溢出?
state 自帶有 ttl 機(jī)制,可以設(shè)置 ttl 過期策略,觸發(fā) Flink 清理過期 state 數(shù)據(jù),F(xiàn)link Sql 可以通過 Hint 方式設(shè)置
insert into logevent_sink
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from logevent_source) as a
left join
customer/*+ OPTIONS('lookup.join.cache.ttl' = '5 min')*/ FOR SYSTEM_TIME AS OF a.proctime as b
on a.actor=b.c_last_name;
Flink Table/Streaming API 類似:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter()
.build();
ValueStateDescriptor<Long> lastUserLogin =
new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);
設(shè)置后重新啟動 lookup join,從 Flink TM 日志中可以看到,ttl 到期后,會觸發(fā)清理并重新拉取 hive 維表數(shù)據(jù):
2022-06-29 04:17:09,161 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction
[] - Lookup join cache has expired after 5 minute(s), reloading
此外,可以通過配置 flink state retain,減少 checkpoint 時(shí)候快照數(shù)量,從而減少快照時(shí)候 state 的占用空間
Flink job中配置:
-D state.checkpoints.num-retained=5
設(shè)置后,可以看到 s3 checkpoint 路徑上,F(xiàn)link Job 會自動清理歷史快照,只保留最近的5次快照數(shù)據(jù),從而確保 checkpoint 快照數(shù)據(jù)不會堆積
[hadoop@ip-172-31-41-131 ~]$ aws s3 ls s3://salunchbucket/data/checkpoints/7b9f2f9becbf3c879cd1e5f38c6239f8/
PRE chk-3/
PRE chk-4/
PRE chk-5/
PRE chk-6/
PRE chk-7/
附錄
Alluxio整體架構(gòu)
Alluxio on EMR 快速部署
在 Amazon EMR 中利用 Alluxio 的分層存儲架構(gòu)
EMR Alluxio集成detail
Flink Temporal Join 詳細(xì)
本篇作者
?
唐清原
Amazon 數(shù)據(jù)分析解決方案架構(gòu)師,負(fù)責(zé) Amazon Data Analytic 服務(wù)方案架構(gòu)設(shè)計(jì)以及性能優(yōu)化,遷移,治理等 Deep Dive 支持。10+數(shù)據(jù)領(lǐng)域研發(fā)及架構(gòu)設(shè)計(jì)經(jīng)驗(yàn),歷任 Oracle 高級咨詢顧問,咪咕文化數(shù)據(jù)集市高級架構(gòu)師,澳新銀行數(shù)據(jù)分析領(lǐng)域架構(gòu)師職務(wù)。在大數(shù)據(jù),數(shù)據(jù)湖,智能湖倉,及相關(guān)推薦系統(tǒng) /MLOps 平臺等項(xiàng)目有豐富實(shí)戰(zhàn)經(jīng)驗(yàn)
陳昊
Amazon 合作伙伴解決方案架構(gòu)師,有將近 20 年的 IT 從業(yè)經(jīng)驗(yàn),在企業(yè)應(yīng)用開發(fā)、架構(gòu)設(shè)計(jì)及建設(shè)方面具有豐富的實(shí)踐經(jīng)驗(yàn)。目前主要負(fù)責(zé) Amazon (中國)合作伙伴的方案架構(gòu)咨詢和設(shè)計(jì)工作,致力于 Amazon 云服務(wù)在國內(nèi)的應(yīng)用推廣以及幫助合作伙伴構(gòu)建更高效的 Amazon 云服務(wù)解決方案。
文章來源:https://dev.amazoncloud.cn/column/article/6309af45d4155422a4610a40?sc_channel=CSDN
?文章來源地址http://www.zghlxwxcb.cn/news/detail-407258.html
?文章來源:http://www.zghlxwxcb.cn/news/detail-407258.html
?
?
到了這里,關(guān)于使用 Alluxio 優(yōu)化 EMR 上 Flink Join的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!