一、背景
在大數(shù)據(jù)領(lǐng)域,業(yè)務(wù)數(shù)據(jù)通常最初存儲(chǔ)在關(guān)系型數(shù)據(jù)庫(kù),例如MySQL。然而,為了滿足日常分析和報(bào)表等需求,大數(shù)據(jù)平臺(tái)會(huì)采用多種不同的存儲(chǔ)方式來容納這些業(yè)務(wù)數(shù)據(jù)。這些存儲(chǔ)方式包括離線倉(cāng)庫(kù)、實(shí)時(shí)倉(cāng)庫(kù)等,根據(jù)不同的業(yè)務(wù)需求和數(shù)據(jù)特性進(jìn)行選擇。
舉例來說,假設(shè)業(yè)務(wù)部門需要在大數(shù)據(jù)平臺(tái)中查看歷史某一天的表數(shù)據(jù),如下:
- [Mysql] 業(yè)務(wù)數(shù)據(jù) - 用戶表全量數(shù)據(jù):
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
- [Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,且更改了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
加粗為更新/新增數(shù)據(jù)
- [大數(shù)據(jù)平臺(tái)] 2023-06-02日業(yè)務(wù)人員在大數(shù)據(jù)平臺(tái)中查看用戶表實(shí)時(shí)數(shù)據(jù),期望數(shù)據(jù)和Mysql業(yè)務(wù)數(shù)據(jù)一致,如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
- [大數(shù)據(jù)平臺(tái)] 2023-06-03 日業(yè)務(wù)人員在大數(shù)據(jù)平臺(tái)中查看2023-06-02日用戶表的歷史數(shù)據(jù),期望數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
根據(jù)以上需求,業(yè)務(wù)人員希望既能夠查看當(dāng)天的實(shí)時(shí)數(shù)據(jù),又希望查看以天為粒度的歷史數(shù)據(jù)。這類需求比較常見,通??梢圆捎脙煞N解決方式:
- Lambda架構(gòu)
- 實(shí)時(shí)同步 + 拉鏈表架構(gòu)
二、Lambda架構(gòu)
實(shí)時(shí)領(lǐng)域的Lambda架構(gòu)是一種大數(shù)據(jù)架構(gòu)模式,旨在處理實(shí)時(shí)數(shù)據(jù)流和歷史數(shù)據(jù)批處理,以滿足同時(shí)滿足實(shí)時(shí)查詢和歷史數(shù)據(jù)分析的需求。Lambda架構(gòu)的核心思想是將數(shù)據(jù)分成兩個(gè)獨(dú)立的流程:實(shí)時(shí)流程和批處理流程,并在最終層將它們合并,以提供一致的查詢結(jié)果,如下:
-
實(shí)時(shí)流程(Real-time Layer):實(shí)時(shí)流程負(fù)責(zé)處理實(shí)時(shí)產(chǎn)生的數(shù)據(jù)流。它通常包括以下關(guān)鍵組件:
- 數(shù)據(jù)源:實(shí)時(shí)數(shù)據(jù)源,如binlog日志等。
- 實(shí)時(shí)引擎:用于實(shí)時(shí)數(shù)據(jù)的處理和轉(zhuǎn)換,例如Apache Kafka、Apache Flink等。
- 存儲(chǔ)層:用于存儲(chǔ)實(shí)時(shí)數(shù)據(jù),特點(diǎn)是插入快,支持OLAP查詢。
-
離線處理流程(Batch Layer):離線處理流程用于處理歷史數(shù)據(jù),通常以 T+1 凌晨跑批方式運(yùn)行,主要包括以下組件:
- 數(shù)據(jù)倉(cāng)庫(kù):批處理數(shù)據(jù)存儲(chǔ),通常使用分布式數(shù)據(jù)倉(cāng)庫(kù),如Apache Hadoop HDFS、Apache Hive等。
- 批處理作業(yè):用于處理歷史數(shù)據(jù)的定期批處理作業(yè),例如數(shù)據(jù)清洗、轉(zhuǎn)換和聚合。
-
合并層(Serving Layer):合并層負(fù)責(zé)將實(shí)時(shí)和歷史數(shù)據(jù)合并以提供一致的查詢接口:
- 數(shù)據(jù)服務(wù):根據(jù)用戶查詢內(nèi)容選擇性調(diào)用不同存儲(chǔ)服務(wù),用于將實(shí)時(shí)數(shù)據(jù)和批處理數(shù)據(jù)合并以生成一致的視圖。
-
Lambda架構(gòu)的主要優(yōu)點(diǎn)包括:
-
實(shí)時(shí)性:能夠提供近實(shí)時(shí)的數(shù)據(jù)處理和反饋,適用于需要快速?zèng)Q策和實(shí)時(shí)監(jiān)控的場(chǎng)景。
-
容錯(cuò)性:通過將數(shù)據(jù)存儲(chǔ)在持久性存儲(chǔ)中,保證了數(shù)據(jù)的可靠性和可恢復(fù)性。
-
靈活性:可以應(yīng)對(duì)多種不同的數(shù)據(jù)類型和查詢需求,適用于各種大數(shù)據(jù)應(yīng)用。
-
注意:盡管Lambda架構(gòu)可以滿足業(yè)務(wù)人員查看用戶的實(shí)時(shí)或歷史數(shù)據(jù)的需求,但離線數(shù)據(jù)倉(cāng)庫(kù)通常采用T+1批處理方式運(yùn)行,因此在需要高度一致性的場(chǎng)景下會(huì)出現(xiàn)數(shù)據(jù)不一致問題。故本文未采用Lambda架構(gòu);
若想詳細(xì)了解一致性問題的情況,請(qǐng)參考筆者另一篇文章:深入數(shù)倉(cāng)離線數(shù)據(jù)同步:?jiǎn)栴}分析與優(yōu)化措施
三、實(shí)時(shí)同步+拉鏈表架構(gòu)
為了滿足業(yè)務(wù)人員對(duì)實(shí)時(shí)或歷史數(shù)據(jù)的高度一致性需求,并且為了簡(jiǎn)化架構(gòu),這里采用了實(shí)時(shí)+拉鏈表的技術(shù)方案。在這個(gè)架構(gòu)中,只使用了一種計(jì)算引擎,具體的技術(shù)組件為 Flink-cdc-2.x + Doris。以下是我們架構(gòu)的設(shè)計(jì)概述:
此架構(gòu)的關(guān)鍵在于實(shí)時(shí)同步邏輯及拉鏈表設(shè)計(jì)這兩塊的實(shí)現(xiàn)。
3.1、拉鏈表設(shè)計(jì)
拉鏈表是一種維護(hù)歷史狀態(tài)以及最新狀態(tài)數(shù)據(jù)的表,與快照表類似,算是在快照表的基礎(chǔ)上去除了重復(fù)狀態(tài)的數(shù)據(jù);使用拉鏈表在更新頻率和比例不是很大的情況下會(huì)十分節(jié)省存儲(chǔ)。
3.1.1、示例
- 我們以背景需求為例,[Mysql]業(yè)務(wù)數(shù)據(jù)用戶表如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
- [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id | name | phone | gender | create_time | update_time | expire | start date | end date |
---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
可以看到拉鏈表多了expire,start date,end date 三個(gè)字段,用于表示該條數(shù)據(jù)是否過期、開始時(shí)間及有效時(shí)間,下面會(huì)有說明
- [Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,更新了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
加粗為更改/新增數(shù)據(jù)
- [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 | |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | (由9999-12-31改為2023-06-01) |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | (新增一條拉鏈數(shù)據(jù)) |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 | (新增一條最新用戶數(shù)據(jù)) |
由于tom的手機(jī)號(hào)被修改,根據(jù)拉鏈表特性此時(shí)會(huì)新增一條最新的tom數(shù)據(jù),且過期時(shí)間為9999-12-31,舊數(shù)據(jù)不會(huì)刪除而是將有效時(shí)間end date改為2023-06-01
- [Mysql] 2023-06-03 當(dāng)天多次更新業(yè)務(wù)數(shù)據(jù)jason用戶的手機(jī)號(hào),sql及表數(shù)據(jù)如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
id | name | phone | gender | create_time | update_time | 備注 |
---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次) |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
- [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | (由9999-12-31改為2023-06-02) |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | (當(dāng)天更新多次的過期數(shù)據(jù)) |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | (當(dāng)天更新多次的過期數(shù)據(jù)) |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | (新增一條拉鏈數(shù)據(jù)) |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
使用
expire
字段來表示記錄是否過期,下面會(huì)說明
- 說明
-
start_date表示該條記錄的生命周期開始時(shí)間【第一次全量同步時(shí)為系統(tǒng)時(shí)間,增量同步時(shí)為update_time時(shí)間】,end_date表示該條記錄的生命周期結(jié)束時(shí)間
-
end_date = '9999-12-31’表示該條記錄為最新數(shù)據(jù)
-
end_date = '2023-06-02’表示該條記錄僅在2023-06-02當(dāng)日有效
-
expire字段用于標(biāo)識(shí)記錄的狀態(tài),1表示記錄已過期,0表示記錄有效。該字段目的是用于過濾那些在一天之內(nèi)多次更新的數(shù)據(jù)
-
如果查詢當(dāng)前的最新記錄,sql為:select * from user where end_date = ‘9999-12-31’
-
如果查詢2023-06-02的歷史快照,sql為:select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expire = 0(此處是拉鏈表比較重要的一塊)
-
解釋上一條sql:需求是要查2023-06-02的歷史快照,故start_date <= ‘2023-06-02’;而end_date = '2023-06-02’表示該條記錄在2023-06-02當(dāng)日是有效的,又因?yàn)閑nd_date = '9999-12-31’表示目前一直處于有效狀態(tài)【有可能從2023-06-02到目前一直有效的數(shù)據(jù)】,所以end_date >= ‘2023-06-02’
-
示例:查詢2023-06-01日歷史數(shù)據(jù):select * from user where start_date <= ‘2023-06-01’ and end_date >= ‘2023-06-01’ and expore = 0
id | name | phone | gender | create_time | update_time | start date | end date |
---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-01 |
- 示例:查詢2023-06-02日歷史數(shù)據(jù):select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expore = 0
id | name | phone | gender | create_time | update_time | start date | end date |
---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 2023-06-02 | 9999-12-31 |
- 示例:查詢最新實(shí)時(shí)數(shù)據(jù):select * from user where end_date = ‘9999-12-31’
id | name | phone | gender | create_time | update_time | start date | end date |
---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 9999-12-31 |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 2023-06-03 | 9999-12-31 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 2023-06-02 | 9999-12-31 |
3.1.2、建表設(shè)計(jì)
在Doris中的表設(shè)計(jì)中,采用了Unique數(shù)據(jù)模型,這個(gè)決策的背后有一個(gè)關(guān)鍵因素,即利用唯一Key來處理Flink作業(yè)崩潰和重新啟動(dòng)時(shí)的數(shù)據(jù)覆蓋操作,以及通過下游的冪等性來確保端到端的數(shù)據(jù)一致性。
唯一Key的選擇在這里起到了至關(guān)重要的作用。在拉鏈表中,由于用戶ID可能重復(fù)出現(xiàn)的情況【例如2023-06-02號(hào)tom就有兩條數(shù)據(jù)】,故選擇了一個(gè)組合Key: UNIQUE KEY(id
, update_time
) 來確保數(shù)據(jù)的唯一性。這種設(shè)計(jì)使得無論在什么情況下,我們都能夠通過這個(gè)唯一Key來維護(hù)數(shù)據(jù)的一致性,即使在處理實(shí)時(shí)數(shù)據(jù)時(shí)發(fā)生了異常情況或重新啟動(dòng)作業(yè)時(shí)也不會(huì)出現(xiàn)問題。
- 以上述為例建表語句如下:
CREATE TABLE IF NOT EXISTS example_user_zip
(
`id` LARGEINT NOT NULL COMMENT "用戶id",
`update_time` DATETIME COMMENT "用戶更新時(shí)間",
`create_time` DATETIME COMMENT "用戶注冊(cè)時(shí)間",
`name` VARCHAR(50) NOT NULL COMMENT "用戶昵稱",
`phone` LARGEINT COMMENT "手機(jī)號(hào)",
`gender` VARCHAR(5) COMMENT "用戶性別",
`expire` TINYINT DEFAULT '0' COMMENT "數(shù)據(jù)是否過期:0為有效,1為過期",
`start_date` DATE COMMENT "開始時(shí)間",
`end_date` DATE COMMENT "有效時(shí)間"
)
UNIQUE KEY(`id`, `update_time`) -- UNIQUE模型
COMMENT "用戶拉鏈表"
DISTRIBUTED BY HASH(`id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
關(guān)于doris更多數(shù)據(jù)模型可參考官網(wǎng)
3.2、實(shí)時(shí)同步邏輯【重要】
為了更清晰地解釋拉鏈表的同步邏輯,我將以場(chǎng)景的方式逐步說明,如下:
-
全量更新
-
增量更新
- 新增數(shù)據(jù)
- 跨天更新數(shù)據(jù)
- 某條數(shù)據(jù)當(dāng)天多次更新
- 刪除數(shù)據(jù)
-
并發(fā)更新
3.2.1、全量更新
-
需先明確一點(diǎn):拉鏈表的歷史數(shù)據(jù)查詢范圍是從實(shí)時(shí)任務(wù)同步的那天開始,因?yàn)橹挥性趯?shí)時(shí)任務(wù)開始同步的那一天之后,拉鏈表才正式形成,之前的歷史數(shù)據(jù)是不可查詢的。因此,當(dāng)進(jìn)行第一次全量同步時(shí),我們會(huì)將
start_date
設(shè)置為當(dāng)前系統(tǒng)日期。 -
另外,由于實(shí)時(shí)拉鏈表同步需要明確區(qū)分全量和增量更新,以及后續(xù)對(duì) binlog 數(shù)據(jù)進(jìn)行解析及判斷增量更新操作類型,因此,F(xiàn)link CDC SQL 方式的表建立不再滿足我們的要求。為了更好地實(shí)現(xiàn)這一功能,我們需要采用 API 方式來構(gòu)建解決方案,代碼如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // 設(shè)置捕獲的數(shù)據(jù)庫(kù), 如果需要同步整個(gè)數(shù)據(jù)庫(kù),請(qǐng)將 tableList 設(shè)置為 ".*".
.tableList("yourDatabaseName.yourTableName") // 設(shè)置捕獲的表
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 將 SourceRecord 轉(zhuǎn)換為 JSON 字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置 3s 的 checkpoint 間隔
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 設(shè)置 source 節(jié)點(diǎn)的并行度為 4
.setParallelism(4)
.print().setParallelism(1); // 設(shè)置 sink 節(jié)點(diǎn)并行度為 1
env.execute("Print MySQL Snapshot + Binlog");
}
}
代碼摘自mysql-cdc-connector官網(wǎng)示例
- 這里我們?nèi)砸?023-06-01的[Mysql]業(yè)務(wù)數(shù)據(jù)為例:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下:僅展示一條
{
"before": null,
"after": { # 實(shí)際數(shù)據(jù)
"id": 1,
"name": "jack",
"phone": "111",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z", # 該日期是UTC時(shí)間,只需增加8小時(shí)即可轉(zhuǎn)化為北京時(shí)間
"update_time": "2023-06-01T05:00:00Z" # 該日期是UTC時(shí)間,只需增加8小時(shí)即可轉(zhuǎn)化為北京時(shí)間
},
"source": { # 元數(shù)據(jù)
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 0,
"snapshot": "false",
"db": "yushu_dds",
"sequence": null,
"table": "user",
"server_id": 0,
"gtid": null,
"file": "",
"pos": 0,
"row": 0,
"thread": null,
"query": null
},
"op": "r", # 記錄每條數(shù)據(jù)的操作類型[重要]
"ts_ms": 1705471382867,
"transaction": null
}
- 在我們使用 Flink CDC MySQL 同步數(shù)據(jù)時(shí),默認(rèn)采用
initial
模式,這意味著首先進(jìn)行全量同步,然后再進(jìn)行增量同步。因此,在區(qū)分全量和增量同步時(shí),關(guān)鍵在于觀察獲取到的數(shù)據(jù)中的op
字段。op
字段是用來記錄每條數(shù)據(jù)的操作類型的標(biāo)志。具體的操作類型如下:-
op=d
代表刪除操作 -
op=u
代表更新操作 -
op=c
代表新增操作 -
op=r
代表全量讀取,而不是來自 binlog 的增量讀取
-
- 在 Flink 程序中,只需要通過
op=r
即可篩選出全量數(shù)據(jù)。在全量數(shù)據(jù)同步階段,Doris 拉鏈表的start_date
字段設(shè)置為系統(tǒng)當(dāng)前日期,而end_date
字段則設(shè)置為 ‘9999-12-31’。導(dǎo)入語句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(1, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jack', 111, '男', 0, '2023-06-01', '9999-12-31'),
(2, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jason', 222, '男', 0, '2023-06-01', '9999-12-31'),
(3, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'tom', 333, '男', 0, '2023-06-01', '9999-12-31');
- 此時(shí)doris拉鏈表數(shù)據(jù)如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end date |
---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3.2.2、增量更新
當(dāng)全量更新結(jié)束后即為增量更新,請(qǐng)注意以下內(nèi)容:
- 在增量更新時(shí),Doris 拉鏈表中的
start_date
字段【即開始時(shí)間】不再使用系統(tǒng)時(shí)間,而是業(yè)務(wù)數(shù)據(jù)的update_time
截取后的日期。例如,update_time
為 “2023-06-02 13:00:00”,則對(duì)應(yīng)的start_date
為 “2023-06-02”。這么做的目的是為了確保使用事件時(shí)間來劃分?jǐn)?shù)據(jù)的開始時(shí)間,而不是系統(tǒng)時(shí)間。 - 舉例來說,如果采用系統(tǒng)時(shí)間,假設(shè)實(shí)時(shí)同步任務(wù)某一天宕機(jī)并且沒有重啟,等到隔天再重啟,那么
start_date
就會(huì)變成隔天日期,從而導(dǎo)致昨天的數(shù)據(jù)丟失。 - 為什么不使用業(yè)務(wù)數(shù)據(jù)的
create_time
作為拉鏈表的start_date
呢?這是因?yàn)樵跇I(yè)務(wù)數(shù)據(jù)更改時(shí),通常只會(huì)更新update_time
。例如,2023-06-02 日更新了 “Tom” 的手機(jī)號(hào)碼,此時(shí)同步到 Doris 新增的拉鏈數(shù)據(jù)如果使用create_time
,那么start_date
仍然會(huì)是 “2023-06-01”,而實(shí)際上該條數(shù)據(jù)應(yīng)該從 “2023-06-02” 日開始生效。因此,使用update_time
更加合理,確保拉鏈表中的數(shù)據(jù)始終按照業(yè)務(wù)數(shù)據(jù)的更新時(shí)間來進(jìn)行正確的版本管理。
接下來,我們將逐一講解以下四個(gè)場(chǎng)景:新增更新、跨天更新、某條數(shù)據(jù)當(dāng)天多次更新以及刪除更新。
3.2.2.1、新增更新
- 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,此時(shí)表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
加粗為更新/新增數(shù)據(jù)
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
"before": null,
"after": {
"id": 4,
"name": "tony",
"phone": "555",
"gender": "男",
"create_time": "2023-06-02T02:00:00Z",
"update_time": "2023-06-02T02:00:00Z"
},
"source": {
# 此處元數(shù)據(jù)省略
},
"op": "c",
"ts_ms": 1705477497504,
"transaction": null
}
- 可以看到
op=c
代表新增操作,對(duì)于新增操作doris拉鏈表的start_end為業(yè)務(wù)數(shù)據(jù)的update_time,而end_date均設(shè)置為9999-12-31,導(dǎo)入語句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(4, '2023-06-02 10:00:00', '2023-06-02 10:00:00', 'tony', 555, '男', 0, '2023-06-02', '9999-12-31');
- 此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date |
---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
加粗表示更新/新增數(shù)據(jù)
3.2.2.2、跨天更新
首先,解釋一下為何要需要區(qū)分兩種不同的更新場(chǎng)景:跨天更新和當(dāng)天多次更新。這涉及到拉鏈表的歷史數(shù)據(jù)粒度,拉鏈表通常以天為單位。如果一條數(shù)據(jù)在同一天內(nèi)多次更新,那么每次更新后的數(shù)據(jù)的生存時(shí)間將只有幾小時(shí)甚至幾分鐘。在這種情況下,我們希望在拉鏈表中將這種多次更新的臨時(shí)數(shù)據(jù)設(shè)為過期數(shù)據(jù);細(xì)節(jié)在后續(xù)會(huì)有講解,先來解釋跨天更新場(chǎng)景。
- 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)更新了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
加粗為更新/新增數(shù)據(jù)
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
"before": { # 更新前的數(shù)據(jù)
"id": 3,
"name": "tom",
"phone": "333",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-01T05:00:00Z"
},
"after": { # 更新后的數(shù)據(jù)
"id": 3,
"name": "tom",
"phone": "444", # 手機(jī)號(hào)更新
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-02T01:00:00Z" # 更新時(shí)間更新
},
"source": {
# 此處元數(shù)據(jù)省略
},
"op": "u",
"ts_ms": 1705479637926,
"transaction": null
}
- 當(dāng)我們?cè)贔link應(yīng)用中遇到
op=u
(代表更新操作)時(shí),首先需要檢查before
和after
字段中的update_time
是否跨越了天粒度??赡芸缭揭惶?,也可能跨越多天,我們將在Doris拉鏈表中執(zhí)行兩條SQL語句:一條更新語句和一條插入語句。- 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的
end_date
字段,將其設(shè)置為after
字段中update_time
的前一天2023-06-01
。 - 對(duì)于插入語句,我們將插入
after
字段中的新數(shù)據(jù),將start_date
設(shè)置為update_time
的日期,end_date
設(shè)置9999-12-31
),以確保該數(shù)據(jù)在拉鏈表中一直有效。 - sql如下所示:
- 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的
-- 更新語句:
UPDATE example_user_zip SET end_date = '2023-06-01' WHERE `id`=3 AND `update_time`='2023-06-01 13:00:00';
-- 插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(3, '2023-06-02 09:00:00', '2023-06-01 13:00:00', 'tom', 444, '男', 0, '2023-06-02', '9999-12-31');
- 此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date |
---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
加粗表示更新/新增數(shù)據(jù)
- 此時(shí)若要查看2023-06-01歷史數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-01' AND end_date >= '2023-06-01' AND expire = 0;
id | name | phone | gender | create_time | update_time | expire | start date | end date |
---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 |
3.2.2.3、某條數(shù)據(jù)當(dāng)天多次更新
在我們的拉鏈表中,數(shù)據(jù)的粒度是以天為單位。如果一條數(shù)據(jù)在同一天內(nèi)多次更新,我們的處理策略是取最后一次更新為有效數(shù)據(jù),而將之前的更新標(biāo)記為過期數(shù)據(jù)。為了標(biāo)記數(shù)據(jù)是否過期,我們會(huì)將過期數(shù)據(jù)的expire
字段設(shè)置為1。
- 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-03 當(dāng)天多次更新業(yè)務(wù)數(shù)據(jù)jason用戶的手機(jī)號(hào),sql及表數(shù)據(jù)如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
id | name | phone | gender | create_time | update_time | 備注 |
---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次) |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
"before": {
"id": 2,
"name": "jason",
"phone": "222",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-01T05:00:00Z"
},
"after": {
"id": 2,
"name": "jason",
"phone": "333",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-03T02:00:00Z"
},
"source": {
# 元數(shù)據(jù)忽略
},
"op": "u",
"ts_ms": 1705548298335,
"transaction": null
},
{
"before": {
"id": 2,
"name": "jason",
"phone": "333",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-03T02:00:00Z"
},
"after": {
"id": 2,
"name": "jason",
"phone": "444",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-03T04:00:00Z"
},
"source": {
# 元數(shù)據(jù)忽略
},
"op": "u",
"ts_ms": 1705548298392,
"transaction": null
},
{
"before": {
"id": 2,
"name": "jason",
"phone": "444",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-03T04:00:00Z"
},
"after": {
"id": 2,
"name": "jason",
"phone": "555",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-03T06:00:00Z"
},
"source": {
# 元數(shù)據(jù)忽略
},
"op": "u",
"ts_ms": 1705548298484,
"transaction": null
}
-
當(dāng)我們?cè)贔link應(yīng)用中遇到
op=u
(代表更新操作),且檢查before
和after
字段中的update_time
屬于同一天,我們將在Doris拉鏈表中執(zhí)行兩條SQL語句:一條更新語句和一條插入語句。- 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的
expire
字段設(shè)置為1,將其設(shè)置為end_date
字段值設(shè)置為update_time
的當(dāng)天日期2023-06-03
。 - 對(duì)于插入語句,我們將插入
after
字段中的新數(shù)據(jù),將start_date
設(shè)置為update_time
的當(dāng)天日期,end_date
設(shè)置9999-12-31
),以確保該數(shù)據(jù)在拉鏈表中一直有效。 - sql如下所示:
-- 222 -> 333 跨天更新語句: UPDATE example_user_zip SET end_date = '2023-06-02' WHERE `id`=2 AND `update_time`='2023-06-01 13:00:00'; -- 222 -> 333 跨天插入語句: INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date) VALUES (2, '2023-06-03 10:00:00', '2023-06-01 13:00:00', 'jason', 333, '男', 0, '2023-06-03', '9999-12-31'); -- 333 -> 444 同一天更新語句: UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 10:00:00'; -- 333 -> 444 同一天插入語句: INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date) VALUES (2, '2023-06-03 12:00:00', '2023-06-01 13:00:00', 'jason', 444, '男', 0, '2023-06-03', '9999-12-31'); -- 444 -> 555 同一天更新語句: UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 12:00:00'; -- 444 -> 555 同一天插入語句: INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date) VALUES (2, '2023-06-03 14:00:00', '2023-06-01 13:00:00', 'jason', 555, '男', 0, '2023-06-03', '9999-12-31');
- 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的
-
此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 9999-12-31 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | (由9999-12-31改為2023-06-02) |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | (當(dāng)天更新多次的過期數(shù)據(jù)) |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | (當(dāng)天更新多次的過期數(shù)據(jù)) |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | (新增一條最新數(shù)據(jù)) |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
- 此時(shí)若要查看2023-06-03歷史數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-03' AND end_date >= '2023-06-03' AND expire = 0;
id | name | phone | gender | create_time | update_time | start_date | end_date |
---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 9999-12-31 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 2023-06-02 | 9999-12-31 |
3.2.2.4、刪除更新
由于[Mysql]業(yè)務(wù)數(shù)據(jù)都具備唯一鍵,故業(yè)務(wù)數(shù)據(jù)的刪除同步至拉鏈表無需判斷是否跨天,只需更新刪除數(shù)據(jù)的end_date日期為前一天即可。
- [Mysql] 2023-06-04 當(dāng)天刪除業(yè)務(wù)數(shù)據(jù)jack,表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time | 備注 |
---|---|---|---|---|---|---|
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次) |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
"before": {
"id": 1,
"name": "jack",
"phone": "111",
"gender": "男",
"create_time": "2023-06-01T05:00:00Z",
"update_time": "2023-06-01T05:00:00Z"
},
"after": null,
"source": {
# 忽略元數(shù)據(jù)
},
"op": "d", # 操作類型
"ts_ms": 1705561813650,
"transaction": null
}
- 可以看到
op=d
代表刪除操作,對(duì)于刪除操作doris拉鏈表只需將before數(shù)據(jù)的date_date日期更新為前一日2023-06-03
,導(dǎo)入語句如下:
-- 更新語句
UPDATE example_user_zip SET end_date = '2023-06-03' WHERE `id`=1 AND `update_time`='2023-06-01 13:00:00';
- 此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-03 | (由9999-12-31改為2023-06-03) |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
- 此時(shí)若要查看2023-06-04數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-04' AND end_date >= '2023-06-04' AND expire = 0;
id | name | phone | gender | create_time | update_time | start_date | end_date |
---|---|---|---|---|---|---|---|
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 2023-06-02 | 9999-12-31 |
3.2.3、并發(fā)更新
這里單獨(dú)強(qiáng)調(diào)并發(fā)更新場(chǎng)景是因?yàn)樵陉P(guān)系型數(shù)據(jù)庫(kù)中,例如MySQL,通常使用timestamp
類型來表示update_time
,而該數(shù)據(jù)類型的最細(xì)粒度是秒。因此,當(dāng)多個(gè)并發(fā)操作同時(shí)更新同一條數(shù)據(jù)時(shí),update_time
的值只會(huì)發(fā)生一次變化,但會(huì)產(chǎn)生多條binlog日志。由于Doris的拉鏈表以id + update_time
作為唯一鍵,這種情況下會(huì)導(dǎo)致同一條數(shù)據(jù)多次更新。因此,這里單獨(dú)講解并發(fā)更新的情況。
需要注意的是,并發(fā)問題只存在于更新操作,刪除和創(chuàng)建操作不會(huì)出現(xiàn)上述問題。
- [Mysql] 2023-06-05 當(dāng)天 15:00:00 并發(fā)更新業(yè)務(wù)數(shù)據(jù)tony的手機(jī)號(hào),表數(shù)據(jù)如下:
id | name | phone | gender | create_time | update_time | 備注 |
---|---|---|---|---|---|---|
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | |
4 | tony | 777 | 男 | 2023-06-02 10:00:00 | 2023-06-05 15:00:00 | (tony手機(jī)號(hào)從555-> 666-> 777 并發(fā)更改兩次) |
- 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
"before": {
"id": 4,
"name": "tony",
"phone": "555",
"gender": "男",
"create_time": "2023-06-02T02:00:00Z",
"update_time": "2023-06-02T02:00:00Z"
},
"after": {
"id": 4,
"name": "tony",
"phone": "666",
"gender": "男",
"create_time": "2023-06-02T02:00:00Z",
"update_time": "2023-06-05T07:00:00Z"
},
"source": {
# 元數(shù)據(jù)忽略
},
"op": "u",
"ts_ms": 1705564093414,
"transaction": null
},
{
"before": {
"id": 4,
"name": "tony",
"phone": "666",
"gender": "男",
"create_time": "2023-06-02T02:00:00Z",
"update_time": "2023-06-05T07:00:00Z"
},
"after": {
"id": 4,
"name": "tony",
"phone": "777",
"gender": "男",
"create_time": "2023-06-02T02:00:00Z",
"update_time": "2023-06-05T07:00:00Z"
},
"source": {
# 元數(shù)據(jù)忽略
},
"op": "u",
"ts_ms": 1705564093478,
"transaction": null
}
- 可以看到
op=u
代表更新操作,這里我們?nèi)匝赜迷隽扛碌倪壿?,第一條日志中業(yè)務(wù)數(shù)據(jù)555->666屬于跨天更新,第二條日志中業(yè)務(wù)數(shù)據(jù)666->777屬于一條數(shù)據(jù)當(dāng)天多次更新,DorisSql如下所示:
-- 555 -> 666 跨天更新語句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';
-- 555 -> 666 跨天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');
-- 666 -> 777 同一天更新語句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';
-- 666 -> 777 同一天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');
- 此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-03 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 2023-06-04 | (由9999-12-31改為2023-06-04) |
4 | tony | 777 | 男 | 2023-06-02 10:00:00 | 2023-06-05 15:00:00 | 0 | 2023-06-05 | 9999-12-31 | (新增一條最新數(shù)據(jù)) |
此時(shí)可以看到新增了一條Tony的數(shù)據(jù),有些人可能注意到少了一條姓名為Tony、手機(jī)號(hào)為666、expire字段為1的數(shù)據(jù)。這是因?yàn)樽詈蟮母潞筒迦胝Z句中的id + update_time完全一致,觸發(fā)了Doris的replace替換操作。因此,最后一條插入語句覆蓋了前一條更新語句,即"Tony, 666, expire=1"的數(shù)據(jù)被覆蓋掉了。而這種替換操作反而變相解決了并發(fā)更新的問題。
- 此時(shí)若要查看2023-06-04數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
id | name | phone | gender | create_time | update_time | start_date | end_date |
---|---|---|---|---|---|---|---|
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
4 | tony | 777 | 男 | 2023-06-02 10:00:00 | 2023-06-05 15:00:00 | 2023-06-05 | 9999-12-31 |
- 此時(shí)有些同學(xué)會(huì)提出問題,即這個(gè)情況和上文中的跨天更新以及當(dāng)天多次更新的邏輯有何不同? 似乎沒有特殊的操作邏輯。確實(shí),從邏輯上看,這兩種情況是一致的。這是因?yàn)槲覀兿葓?zhí)行更新操作,然后再執(zhí)行新增操作。如果我們反過來,先執(zhí)行新增操作,然后再執(zhí)行更新操作,就會(huì)導(dǎo)致數(shù)據(jù)丟失。接下來,讓我們看一下如果先執(zhí)行新增操作再執(zhí)行更新操作會(huì)發(fā)生什么情況。首先,我們將Doris拉鏈表恢復(fù)到前一天,如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-03 | (由9999-12-31改為2023-06-03) |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 9999-12-31 |
- 接下來我們將更新插入操作調(diào)換順序,sql如下所示:
-- 555 -> 666 跨天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');
-- 555 -> 666 跨天更新語句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';
-- 666 -> 777 同一天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');
-- 666 -> 777 同一天更新語句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';
- 此時(shí)doris拉鏈表內(nèi)容如下所示:
id | name | phone | gender | create_time | update_time | expire | start_date | end_date | 備注 |
---|---|---|---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-03 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-02 | |
2 | jason | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-03 10:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-03 12:00:00 | 1 | 2023-06-03 | 2023-06-03 | |
2 | jason | 555 | 男 | 2023-06-01 13:00:00 | 2023-06-03 14:00:00 | 0 | 2023-06-03 | 9999-12-31 | |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 0 | 2023-06-01 | 2023-06-01 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 0 | 2023-06-02 | 9999-12-31 | |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | 0 | 2023-06-02 | 2023-06-04 | (由9999-12-31改為2023-06-04) |
4 | tony | 777 | 男 | 2023-06-02 10:00:00 | 2023-06-05 15:00:00 | 1 | 2023-06-05 | 2023-06-05 | (更新拉鏈數(shù)據(jù)) |
可以看到已經(jīng)沒有tony的最新數(shù)據(jù)了。
- 此時(shí)查看2023-06-05數(shù)據(jù)執(zhí)行:只有兩條數(shù)據(jù)
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
id | name | phone | gender | create_time | update_time | start_date | end_date |
---|---|---|---|---|---|---|---|
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | 2023-06-01 | 2023-06-02 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | 2023-06-02 | 9999-12-31 |
根據(jù)以上的測(cè)試結(jié)果,我們可以得出以下結(jié)論:當(dāng)涉及到更新操作時(shí),最好的做法是先執(zhí)行更新,然后再執(zhí)行插入操作。這種順序可以有效避免并發(fā)更新問題。
此外,在實(shí)時(shí)引擎中處理數(shù)據(jù)通常涉及到分布式計(jì)算,因此需要特別注意確保相同ID的數(shù)據(jù)只在一個(gè)線程中按順序執(zhí)行,而不是讓執(zhí)行器01執(zhí)行Tony的更新操作,而執(zhí)行器02執(zhí)行Tony的插入操作。相反,應(yīng)該將具有相同ID的數(shù)據(jù)放置在同一個(gè)執(zhí)行器中執(zhí)行,以確保順序性和一致性。這對(duì)于處理并發(fā)更新場(chǎng)景非常重要。
四、總結(jié)
本文我們深入探討了如何使用Apache Flink實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)中拉鏈表的同步。拉鏈表是一種重要的數(shù)據(jù)模型,用于跟蹤數(shù)據(jù)的歷史變化,以便在分析和報(bào)告中提供準(zhǔn)確的歷史視圖。我們介紹了如何借助Flink以及其他相關(guān)技術(shù)構(gòu)建一個(gè)強(qiáng)大的實(shí)時(shí)同步引擎,以應(yīng)對(duì)多種數(shù)據(jù)同步場(chǎng)景。
我們首先介紹了傳統(tǒng)Lambda架構(gòu)到實(shí)時(shí)同步+拉鏈表單引擎架構(gòu)它們之間的區(qū)別。隨后,我們深入討論了Flink CDC(Change Data Capture)和Doris數(shù)據(jù)庫(kù)的結(jié)合使用,以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步的基礎(chǔ)架構(gòu)。我們?cè)敿?xì)討論了全量同步和增量同步兩種關(guān)鍵同步模式,以及如何應(yīng)對(duì)不同的更新場(chǎng)景。
在全文中,強(qiáng)調(diào)了以下關(guān)鍵點(diǎn):文章來源:http://www.zghlxwxcb.cn/news/detail-809211.html
- 實(shí)時(shí)同步+拉鏈表單引擎架構(gòu)的設(shè)計(jì)和實(shí)施。
- 全量同步和增量同步是實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)同步的兩種關(guān)鍵模式,詳細(xì)介紹了它們的實(shí)現(xiàn)邏輯。
- 跨天更新和當(dāng)天多次更新是需要特別注意的場(chǎng)景,提供了解決方案以確保數(shù)據(jù)的完整性。
- 并發(fā)更新可能導(dǎo)致數(shù)據(jù)重復(fù),需要采取適當(dāng)?shù)拇胧﹣響?yīng)對(duì)。
通過深入了解實(shí)時(shí)同步和拉鏈表的實(shí)現(xiàn)細(xì)節(jié),讀者可以更好地理解如何構(gòu)建強(qiáng)大的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),并滿足不斷變化的業(yè)務(wù)需求。文章來源地址http://www.zghlxwxcb.cn/news/detail-809211.html
五、相關(guān)資料
- Doris 數(shù)據(jù)模型
- MySQL CDC Connector
- 深入數(shù)倉(cāng)離線數(shù)據(jù)同步:?jiǎn)栴}分析與優(yōu)化措施
到了這里,關(guān)于Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!