国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解

這篇具有很好參考價(jià)值的文章主要介紹了Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、背景

在大數(shù)據(jù)領(lǐng)域,業(yè)務(wù)數(shù)據(jù)通常最初存儲(chǔ)在關(guān)系型數(shù)據(jù)庫(kù),例如MySQL。然而,為了滿足日常分析和報(bào)表等需求,大數(shù)據(jù)平臺(tái)會(huì)采用多種不同的存儲(chǔ)方式來容納這些業(yè)務(wù)數(shù)據(jù)。這些存儲(chǔ)方式包括離線倉(cāng)庫(kù)、實(shí)時(shí)倉(cāng)庫(kù)等,根據(jù)不同的業(yè)務(wù)需求和數(shù)據(jù)特性進(jìn)行選擇。

舉例來說,假設(shè)業(yè)務(wù)部門需要在大數(shù)據(jù)平臺(tái)中查看歷史某一天的表數(shù)據(jù),如下:

  1. [Mysql] 業(yè)務(wù)數(shù)據(jù) - 用戶表全量數(shù)據(jù):
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00
  1. [Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,且更改了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00

加粗為更新/新增數(shù)據(jù)

  1. [大數(shù)據(jù)平臺(tái)] 2023-06-02日業(yè)務(wù)人員在大數(shù)據(jù)平臺(tái)中查看用戶表實(shí)時(shí)數(shù)據(jù),期望數(shù)據(jù)和Mysql業(yè)務(wù)數(shù)據(jù)一致,如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00
  1. [大數(shù)據(jù)平臺(tái)] 2023-06-03 日業(yè)務(wù)人員在大數(shù)據(jù)平臺(tái)中查看2023-06-02日用戶表的歷史數(shù)據(jù),期望數(shù)據(jù)如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00

根據(jù)以上需求,業(yè)務(wù)人員希望既能夠查看當(dāng)天的實(shí)時(shí)數(shù)據(jù),又希望查看以天為粒度的歷史數(shù)據(jù)。這類需求比較常見,通??梢圆捎脙煞N解決方式:

  1. Lambda架構(gòu)
  2. 實(shí)時(shí)同步 + 拉鏈表架構(gòu)

二、Lambda架構(gòu)

實(shí)時(shí)領(lǐng)域的Lambda架構(gòu)是一種大數(shù)據(jù)架構(gòu)模式,旨在處理實(shí)時(shí)數(shù)據(jù)流和歷史數(shù)據(jù)批處理,以滿足同時(shí)滿足實(shí)時(shí)查詢和歷史數(shù)據(jù)分析的需求。Lambda架構(gòu)的核心思想是將數(shù)據(jù)分成兩個(gè)獨(dú)立的流程:實(shí)時(shí)流程和批處理流程,并在最終層將它們合并,以提供一致的查詢結(jié)果,如下:

Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解,大數(shù)據(jù),實(shí)時(shí)數(shù)倉(cāng),flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù),拉鏈表,doris

  1. 實(shí)時(shí)流程(Real-time Layer):實(shí)時(shí)流程負(fù)責(zé)處理實(shí)時(shí)產(chǎn)生的數(shù)據(jù)流。它通常包括以下關(guān)鍵組件:

    • 數(shù)據(jù)源:實(shí)時(shí)數(shù)據(jù)源,如binlog日志等。
    • 實(shí)時(shí)引擎:用于實(shí)時(shí)數(shù)據(jù)的處理和轉(zhuǎn)換,例如Apache Kafka、Apache Flink等。
    • 存儲(chǔ)層:用于存儲(chǔ)實(shí)時(shí)數(shù)據(jù),特點(diǎn)是插入快,支持OLAP查詢。
  2. 離線處理流程(Batch Layer):離線處理流程用于處理歷史數(shù)據(jù),通常以 T+1 凌晨跑批方式運(yùn)行,主要包括以下組件:

    • 數(shù)據(jù)倉(cāng)庫(kù):批處理數(shù)據(jù)存儲(chǔ),通常使用分布式數(shù)據(jù)倉(cāng)庫(kù),如Apache Hadoop HDFS、Apache Hive等。
    • 批處理作業(yè):用于處理歷史數(shù)據(jù)的定期批處理作業(yè),例如數(shù)據(jù)清洗、轉(zhuǎn)換和聚合。
  3. 合并層(Serving Layer):合并層負(fù)責(zé)將實(shí)時(shí)和歷史數(shù)據(jù)合并以提供一致的查詢接口:

    • 數(shù)據(jù)服務(wù):根據(jù)用戶查詢內(nèi)容選擇性調(diào)用不同存儲(chǔ)服務(wù),用于將實(shí)時(shí)數(shù)據(jù)和批處理數(shù)據(jù)合并以生成一致的視圖。
  4. Lambda架構(gòu)的主要優(yōu)點(diǎn)包括:

    • 實(shí)時(shí)性:能夠提供近實(shí)時(shí)的數(shù)據(jù)處理和反饋,適用于需要快速?zèng)Q策和實(shí)時(shí)監(jiān)控的場(chǎng)景。

    • 容錯(cuò)性:通過將數(shù)據(jù)存儲(chǔ)在持久性存儲(chǔ)中,保證了數(shù)據(jù)的可靠性和可恢復(fù)性。

    • 靈活性:可以應(yīng)對(duì)多種不同的數(shù)據(jù)類型和查詢需求,適用于各種大數(shù)據(jù)應(yīng)用。

注意:盡管Lambda架構(gòu)可以滿足業(yè)務(wù)人員查看用戶的實(shí)時(shí)或歷史數(shù)據(jù)的需求,但離線數(shù)據(jù)倉(cāng)庫(kù)通常采用T+1批處理方式運(yùn)行,因此在需要高度一致性的場(chǎng)景下會(huì)出現(xiàn)數(shù)據(jù)不一致問題。故本文未采用Lambda架構(gòu);

若想詳細(xì)了解一致性問題的情況,請(qǐng)參考筆者另一篇文章:深入數(shù)倉(cāng)離線數(shù)據(jù)同步:?jiǎn)栴}分析與優(yōu)化措施

三、實(shí)時(shí)同步+拉鏈表架構(gòu)

為了滿足業(yè)務(wù)人員對(duì)實(shí)時(shí)或歷史數(shù)據(jù)的高度一致性需求,并且為了簡(jiǎn)化架構(gòu),這里采用了實(shí)時(shí)+拉鏈表的技術(shù)方案。在這個(gè)架構(gòu)中,只使用了一種計(jì)算引擎,具體的技術(shù)組件為 Flink-cdc-2.x + Doris。以下是我們架構(gòu)的設(shè)計(jì)概述:

Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解,大數(shù)據(jù),實(shí)時(shí)數(shù)倉(cāng),flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù),拉鏈表,doris

此架構(gòu)的關(guān)鍵在于實(shí)時(shí)同步邏輯及拉鏈表設(shè)計(jì)這兩塊的實(shí)現(xiàn)。

3.1、拉鏈表設(shè)計(jì)

拉鏈表是一種維護(hù)歷史狀態(tài)以及最新狀態(tài)數(shù)據(jù)的表,與快照表類似,算是在快照表的基礎(chǔ)上去除了重復(fù)狀態(tài)的數(shù)據(jù);使用拉鏈表在更新頻率和比例不是很大的情況下會(huì)十分節(jié)省存儲(chǔ)。

3.1.1、示例

  1. 我們以背景需求為例,[Mysql]業(yè)務(wù)數(shù)據(jù)用戶表如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00
  1. [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id name phone gender create_time update_time expire start date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31

可以看到拉鏈表多了expire,start date,end date 三個(gè)字段,用于表示該條數(shù)據(jù)是否過期、開始時(shí)間及有效時(shí)間,下面會(huì)有說明

  1. [Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,更新了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00

加粗為更改/新增數(shù)據(jù)

  1. [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01 (由9999-12-31改為2023-06-01)
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31 (新增一條拉鏈數(shù)據(jù))
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31 (新增一條最新用戶數(shù)據(jù))

由于tom的手機(jī)號(hào)被修改,根據(jù)拉鏈表特性此時(shí)會(huì)新增一條最新的tom數(shù)據(jù),且過期時(shí)間為9999-12-31,舊數(shù)據(jù)不會(huì)刪除而是將有效時(shí)間end date改為2023-06-01

  1. [Mysql] 2023-06-03 當(dāng)天多次更新業(yè)務(wù)數(shù)據(jù)jason用戶的手機(jī)號(hào),sql及表數(shù)據(jù)如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
id name phone gender create_time update_time 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次)
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00
  1. [Doris]此時(shí)實(shí)時(shí)同步到Doris的拉鏈表數(shù)據(jù)為:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02 (由9999-12-31改為2023-06-02)
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03 (當(dāng)天更新多次的過期數(shù)據(jù))
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03 (當(dāng)天更新多次的過期數(shù)據(jù))
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31 (新增一條拉鏈數(shù)據(jù))
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31

使用 expire 字段來表示記錄是否過期,下面會(huì)說明

  1. 說明
  • start_date表示該條記錄的生命周期開始時(shí)間【第一次全量同步時(shí)為系統(tǒng)時(shí)間,增量同步時(shí)為update_time時(shí)間】,end_date表示該條記錄的生命周期結(jié)束時(shí)間

  • end_date = '9999-12-31’表示該條記錄為最新數(shù)據(jù)

  • end_date = '2023-06-02’表示該條記錄僅在2023-06-02當(dāng)日有效

  • expire字段用于標(biāo)識(shí)記錄的狀態(tài),1表示記錄已過期,0表示記錄有效。該字段目的是用于過濾那些在一天之內(nèi)多次更新的數(shù)據(jù)

  • 如果查詢當(dāng)前的最新記錄,sql為:select * from user where end_date = ‘9999-12-31’

  • 如果查詢2023-06-02的歷史快照,sql為:select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expire = 0(此處是拉鏈表比較重要的一塊)

  • 解釋上一條sql:需求是要查2023-06-02的歷史快照,故start_date <= ‘2023-06-02’;而end_date = '2023-06-02’表示該條記錄在2023-06-02當(dāng)日是有效的,又因?yàn)閑nd_date = '9999-12-31’表示目前一直處于有效狀態(tài)【有可能從2023-06-02到目前一直有效的數(shù)據(jù)】,所以end_date >= ‘2023-06-02’

  • 示例:查詢2023-06-01日歷史數(shù)據(jù):select * from user where start_date <= ‘2023-06-01’ and end_date >= ‘2023-06-01’ and expore = 0

id name phone gender create_time update_time start date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-01
  • 示例:查詢2023-06-02日歷史數(shù)據(jù):select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expore = 0
id name phone gender create_time update_time start date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 2023-06-02 9999-12-31
  • 示例:查詢最新實(shí)時(shí)數(shù)據(jù):select * from user where end_date = ‘9999-12-31’
id name phone gender create_time update_time start date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 9999-12-31
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 2023-06-03 9999-12-31
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 2023-06-02 9999-12-31

3.1.2、建表設(shè)計(jì)

在Doris中的表設(shè)計(jì)中,采用了Unique數(shù)據(jù)模型,這個(gè)決策的背后有一個(gè)關(guān)鍵因素,即利用唯一Key來處理Flink作業(yè)崩潰和重新啟動(dòng)時(shí)的數(shù)據(jù)覆蓋操作,以及通過下游的冪等性來確保端到端的數(shù)據(jù)一致性。

唯一Key的選擇在這里起到了至關(guān)重要的作用。在拉鏈表中,由于用戶ID可能重復(fù)出現(xiàn)的情況【例如2023-06-02號(hào)tom就有兩條數(shù)據(jù)】,故選擇了一個(gè)組合Key: UNIQUE KEY(id, update_time) 來確保數(shù)據(jù)的唯一性。這種設(shè)計(jì)使得無論在什么情況下,我們都能夠通過這個(gè)唯一Key來維護(hù)數(shù)據(jù)的一致性,即使在處理實(shí)時(shí)數(shù)據(jù)時(shí)發(fā)生了異常情況或重新啟動(dòng)作業(yè)時(shí)也不會(huì)出現(xiàn)問題。

  • 以上述為例建表語句如下:
CREATE TABLE IF NOT EXISTS example_user_zip
(
    `id` LARGEINT NOT NULL COMMENT "用戶id",
    `update_time` DATETIME COMMENT "用戶更新時(shí)間",
    `create_time` DATETIME COMMENT "用戶注冊(cè)時(shí)間",
    `name` VARCHAR(50) NOT NULL COMMENT "用戶昵稱",
    `phone` LARGEINT COMMENT "手機(jī)號(hào)",
    `gender` VARCHAR(5) COMMENT "用戶性別",
    `expire` TINYINT DEFAULT '0' COMMENT "數(shù)據(jù)是否過期:0為有效,1為過期",
    `start_date` DATE COMMENT "開始時(shí)間",
    `end_date` DATE COMMENT "有效時(shí)間"
)
UNIQUE KEY(`id`, `update_time`) -- UNIQUE模型
COMMENT "用戶拉鏈表"
DISTRIBUTED BY HASH(`id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

關(guān)于doris更多數(shù)據(jù)模型可參考官網(wǎng)

3.2、實(shí)時(shí)同步邏輯【重要】

為了更清晰地解釋拉鏈表的同步邏輯,我將以場(chǎng)景的方式逐步說明,如下:

  1. 全量更新

  2. 增量更新

    1. 新增數(shù)據(jù)
    2. 跨天更新數(shù)據(jù)
    3. 某條數(shù)據(jù)當(dāng)天多次更新
    4. 刪除數(shù)據(jù)
  3. 并發(fā)更新

3.2.1、全量更新

  1. 需先明確一點(diǎn):拉鏈表的歷史數(shù)據(jù)查詢范圍是從實(shí)時(shí)任務(wù)同步的那天開始,因?yàn)橹挥性趯?shí)時(shí)任務(wù)開始同步的那一天之后,拉鏈表才正式形成,之前的歷史數(shù)據(jù)是不可查詢的。因此,當(dāng)進(jìn)行第一次全量同步時(shí),我們會(huì)將 start_date 設(shè)置為當(dāng)前系統(tǒng)日期。

  2. 另外,由于實(shí)時(shí)拉鏈表同步需要明確區(qū)分全量和增量更新,以及后續(xù)對(duì) binlog 數(shù)據(jù)進(jìn)行解析及判斷增量更新操作類型,因此,F(xiàn)link CDC SQL 方式的表建立不再滿足我們的要求。為了更好地實(shí)現(xiàn)這一功能,我們需要采用 API 方式來構(gòu)建解決方案,代碼如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // 設(shè)置捕獲的數(shù)據(jù)庫(kù), 如果需要同步整個(gè)數(shù)據(jù)庫(kù),請(qǐng)將 tableList 設(shè)置為 ".*".
        .tableList("yourDatabaseName.yourTableName") // 設(shè)置捕獲的表
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // 將 SourceRecord 轉(zhuǎn)換為 JSON 字符串
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 設(shè)置 3s 的 checkpoint 間隔
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // 設(shè)置 source 節(jié)點(diǎn)的并行度為 4
      .setParallelism(4)
      .print().setParallelism(1); // 設(shè)置 sink 節(jié)點(diǎn)并行度為 1 

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

代碼摘自mysql-cdc-connector官網(wǎng)示例

  1. 這里我們?nèi)砸?023-06-01的[Mysql]業(yè)務(wù)數(shù)據(jù)為例:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00
  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下:僅展示一條
{
	"before": null,
	"after": {		 # 實(shí)際數(shù)據(jù)
		"id": 1,
		"name": "jack",
		"phone": "111",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",  # 該日期是UTC時(shí)間,只需增加8小時(shí)即可轉(zhuǎn)化為北京時(shí)間
		"update_time": "2023-06-01T05:00:00Z"	# 該日期是UTC時(shí)間,只需增加8小時(shí)即可轉(zhuǎn)化為北京時(shí)間
	},
	"source": {		 # 元數(shù)據(jù)
		"version": "1.6.4.Final",
		"connector": "mysql",
		"name": "mysql_binlog_source",
		"ts_ms": 0,
		"snapshot": "false",
		"db": "yushu_dds",
		"sequence": null,
		"table": "user",
		"server_id": 0,
		"gtid": null,
		"file": "",
		"pos": 0,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "r",  	 # 記錄每條數(shù)據(jù)的操作類型[重要]
	"ts_ms": 1705471382867,
	"transaction": null
}
  1. 在我們使用 Flink CDC MySQL 同步數(shù)據(jù)時(shí),默認(rèn)采用 initial 模式,這意味著首先進(jìn)行全量同步,然后再進(jìn)行增量同步。因此,在區(qū)分全量和增量同步時(shí),關(guān)鍵在于觀察獲取到的數(shù)據(jù)中的 op 字段。op 字段是用來記錄每條數(shù)據(jù)的操作類型的標(biāo)志。具體的操作類型如下:
    • op=d 代表刪除操作
    • op=u 代表更新操作
    • op=c 代表新增操作
    • op=r 代表全量讀取,而不是來自 binlog 的增量讀取
  2. 在 Flink 程序中,只需要通過 op=r 即可篩選出全量數(shù)據(jù)。在全量數(shù)據(jù)同步階段,Doris 拉鏈表的 start_date 字段設(shè)置為系統(tǒng)當(dāng)前日期,而 end_date 字段則設(shè)置為 ‘9999-12-31’。導(dǎo)入語句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(1, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jack', 111, '男', 0, '2023-06-01', '9999-12-31'),
(2, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jason', 222, '男', 0, '2023-06-01', '9999-12-31'),
(3, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'tom', 333, '男', 0, '2023-06-01', '9999-12-31');
  1. 此時(shí)doris拉鏈表數(shù)據(jù)如下所示:
id name phone gender create_time update_time expire start_date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31

3.2.2、增量更新

當(dāng)全量更新結(jié)束后即為增量更新,請(qǐng)注意以下內(nèi)容:

  1. 在增量更新時(shí),Doris 拉鏈表中的 start_date 字段【即開始時(shí)間】不再使用系統(tǒng)時(shí)間,而是業(yè)務(wù)數(shù)據(jù)的 update_time 截取后的日期。例如,update_time 為 “2023-06-02 13:00:00”,則對(duì)應(yīng)的 start_date 為 “2023-06-02”。這么做的目的是為了確保使用事件時(shí)間來劃分?jǐn)?shù)據(jù)的開始時(shí)間,而不是系統(tǒng)時(shí)間。
  2. 舉例來說,如果采用系統(tǒng)時(shí)間,假設(shè)實(shí)時(shí)同步任務(wù)某一天宕機(jī)并且沒有重啟,等到隔天再重啟,那么 start_date 就會(huì)變成隔天日期,從而導(dǎo)致昨天的數(shù)據(jù)丟失。
  3. 為什么不使用業(yè)務(wù)數(shù)據(jù)的 create_time 作為拉鏈表的 start_date 呢?這是因?yàn)樵跇I(yè)務(wù)數(shù)據(jù)更改時(shí),通常只會(huì)更新 update_time。例如,2023-06-02 日更新了 “Tom” 的手機(jī)號(hào)碼,此時(shí)同步到 Doris 新增的拉鏈數(shù)據(jù)如果使用 create_time,那么 start_date 仍然會(huì)是 “2023-06-01”,而實(shí)際上該條數(shù)據(jù)應(yīng)該從 “2023-06-02” 日開始生效。因此,使用 update_time 更加合理,確保拉鏈表中的數(shù)據(jù)始終按照業(yè)務(wù)數(shù)據(jù)的更新時(shí)間來進(jìn)行正確的版本管理。

接下來,我們將逐一講解以下四個(gè)場(chǎng)景:新增更新、跨天更新、某條數(shù)據(jù)當(dāng)天多次更新以及刪除更新。

3.2.2.1、新增更新
  1. 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)新增了一名用戶,此時(shí)表數(shù)據(jù)如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00

加粗為更新/新增數(shù)據(jù)

  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
	"before": null,
	"after": {
		"id": 4,
		"name": "tony",
		"phone": "555",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-02T02:00:00Z"
	},
	"source": {
		# 此處元數(shù)據(jù)省略
	},
	"op": "c",
	"ts_ms": 1705477497504,
	"transaction": null
}
  1. 可以看到op=c 代表新增操作,對(duì)于新增操作doris拉鏈表的start_end為業(yè)務(wù)數(shù)據(jù)的update_time,而end_date均設(shè)置為9999-12-31,導(dǎo)入語句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-02 10:00:00', '2023-06-02 10:00:00', 'tony', 555, '男', 0, '2023-06-02', '9999-12-31');
  1. 此時(shí)doris拉鏈表內(nèi)容如下所示:
id name phone gender create_time update_time expire start_date end_date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31

加粗表示更新/新增數(shù)據(jù)

3.2.2.2、跨天更新

首先,解釋一下為何要需要區(qū)分兩種不同的更新場(chǎng)景:跨天更新和當(dāng)天多次更新。這涉及到拉鏈表的歷史數(shù)據(jù)粒度,拉鏈表通常以天為單位。如果一條數(shù)據(jù)在同一天內(nèi)多次更新,那么每次更新后的數(shù)據(jù)的生存時(shí)間將只有幾小時(shí)甚至幾分鐘。在這種情況下,我們希望在拉鏈表中將這種多次更新的臨時(shí)數(shù)據(jù)設(shè)為過期數(shù)據(jù);細(xì)節(jié)在后續(xù)會(huì)有講解,先來解釋跨天更新場(chǎng)景。

  1. 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-02 業(yè)務(wù)數(shù)據(jù)更新了tom的手機(jī)號(hào),此時(shí)表數(shù)據(jù)如下:
id name phone gender create_time update_time
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00

加粗為更新/新增數(shù)據(jù)

  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
	"before": {	 # 更新前的數(shù)據(jù)
		"id": 3,
		"name": "tom",
		"phone": "333",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-01T05:00:00Z"
	},
	"after": {	# 更新后的數(shù)據(jù)
		"id": 3,
		"name": "tom",
		"phone": "444",  # 手機(jī)號(hào)更新
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-02T01:00:00Z"  # 更新時(shí)間更新
	},
	"source": {
		# 此處元數(shù)據(jù)省略
	},
	"op": "u",
	"ts_ms": 1705479637926,
	"transaction": null
}
  1. 當(dāng)我們?cè)贔link應(yīng)用中遇到op=u(代表更新操作)時(shí),首先需要檢查beforeafter字段中的update_time是否跨越了天粒度??赡芸缭揭惶?,也可能跨越多天,我們將在Doris拉鏈表中執(zhí)行兩條SQL語句:一條更新語句和一條插入語句。
    1. 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的end_date字段,將其設(shè)置為after字段中update_time的前一天2023-06-01。
    2. 對(duì)于插入語句,我們將插入after字段中的新數(shù)據(jù),將start_date設(shè)置為update_time的日期,end_date設(shè)置9999-12-31),以確保該數(shù)據(jù)在拉鏈表中一直有效。
    3. sql如下所示:
-- 更新語句:
UPDATE example_user_zip SET end_date = '2023-06-01' WHERE `id`=3 AND `update_time`='2023-06-01 13:00:00';

-- 插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(3, '2023-06-02 09:00:00', '2023-06-01 13:00:00', 'tom', 444, '男', 0, '2023-06-02', '9999-12-31');
  1. 此時(shí)doris拉鏈表內(nèi)容如下所示:
id name phone gender create_time update_time expire start_date end_date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31

加粗表示更新/新增數(shù)據(jù)

  1. 此時(shí)若要查看2023-06-01歷史數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-01' AND end_date >= '2023-06-01' AND expire = 0;
id name phone gender create_time update_time expire start date end date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3.2.2.3、某條數(shù)據(jù)當(dāng)天多次更新

在我們的拉鏈表中,數(shù)據(jù)的粒度是以天為單位。如果一條數(shù)據(jù)在同一天內(nèi)多次更新,我們的處理策略是取最后一次更新為有效數(shù)據(jù),而將之前的更新標(biāo)記為過期數(shù)據(jù)。為了標(biāo)記數(shù)據(jù)是否過期,我們會(huì)將過期數(shù)據(jù)的expire字段設(shè)置為1。

  1. 我們?nèi)砸宰畛醯氖纠秊槔篬Mysql] 2023-06-03 當(dāng)天多次更新業(yè)務(wù)數(shù)據(jù)jason用戶的手機(jī)號(hào),sql及表數(shù)據(jù)如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
id name phone gender create_time update_time 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次)
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00
  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
	"before": {
		"id": 2,
		"name": "jason",
		"phone": "222",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-01T05:00:00Z"
	},
	"after": {
		"id": 2,
		"name": "jason",
		"phone": "333",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-03T02:00:00Z"
	},
	"source": {
		# 元數(shù)據(jù)忽略		
	},
	"op": "u",
	"ts_ms": 1705548298335,
	"transaction": null
},
{
	"before": {
		"id": 2,
		"name": "jason",
		"phone": "333",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-03T02:00:00Z"
	},
	"after": {
		"id": 2,
		"name": "jason",
		"phone": "444",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-03T04:00:00Z"
	},
	"source": {
		# 元數(shù)據(jù)忽略		
	},
	"op": "u",
	"ts_ms": 1705548298392,
	"transaction": null
},
{
	"before": {
		"id": 2,
		"name": "jason",
		"phone": "444",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-03T04:00:00Z"
	},
	"after": {
		"id": 2,
		"name": "jason",
		"phone": "555",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-03T06:00:00Z"
	},
	"source": {
		# 元數(shù)據(jù)忽略
	},
	"op": "u",
	"ts_ms": 1705548298484,
	"transaction": null
}
  1. 當(dāng)我們?cè)贔link應(yīng)用中遇到op=u(代表更新操作),且檢查beforeafter字段中的update_time屬于同一天,我們將在Doris拉鏈表中執(zhí)行兩條SQL語句:一條更新語句和一條插入語句。

    1. 對(duì)于更新語句,我們將更新拉鏈表中舊數(shù)據(jù)id的expire字段設(shè)置為1,將其設(shè)置為end_date字段值設(shè)置為update_time的當(dāng)天日期2023-06-03。
    2. 對(duì)于插入語句,我們將插入after字段中的新數(shù)據(jù),將start_date設(shè)置為update_time的當(dāng)天日期,end_date設(shè)置9999-12-31),以確保該數(shù)據(jù)在拉鏈表中一直有效。
    3. sql如下所示:
    -- 222 -> 333 跨天更新語句:
    UPDATE example_user_zip SET end_date = '2023-06-02' WHERE `id`=2 AND `update_time`='2023-06-01 13:00:00';
    
    -- 222 -> 333 跨天插入語句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 10:00:00', '2023-06-01 13:00:00', 'jason', 333, '男', 0, '2023-06-03', '9999-12-31');
    
    -- 333 -> 444 同一天更新語句:
    UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 10:00:00';
    
    -- 333 -> 444 同一天插入語句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 12:00:00', '2023-06-01 13:00:00', 'jason', 444, '男', 0, '2023-06-03', '9999-12-31');
    
    -- 444 -> 555 同一天更新語句:
    UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 12:00:00';
    
    -- 444 -> 555 同一天插入語句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 14:00:00', '2023-06-01 13:00:00', 'jason', 555, '男', 0, '2023-06-03', '9999-12-31');
    
  2. 此時(shí)doris拉鏈表內(nèi)容如下所示:

id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02 (由9999-12-31改為2023-06-02)
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03 (當(dāng)天更新多次的過期數(shù)據(jù))
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03 (當(dāng)天更新多次的過期數(shù)據(jù))
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31 (新增一條最新數(shù)據(jù))
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31
  1. 此時(shí)若要查看2023-06-03歷史數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-03' AND end_date >= '2023-06-03' AND expire = 0;
id name phone gender create_time update_time start_date end_date
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 9999-12-31
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 2023-06-02 9999-12-31
3.2.2.4、刪除更新

由于[Mysql]業(yè)務(wù)數(shù)據(jù)都具備唯一鍵,故業(yè)務(wù)數(shù)據(jù)的刪除同步至拉鏈表無需判斷是否跨天,只需更新刪除數(shù)據(jù)的end_date日期為前一天即可。

  1. [Mysql] 2023-06-04 當(dāng)天刪除業(yè)務(wù)數(shù)據(jù)jack,表數(shù)據(jù)如下:
id name phone gender create_time update_time 備注
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 (jason手機(jī)號(hào)從222 -> 333 -> 444 -> 555更改了三次)
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00
  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
	"before": {
		"id": 1,
		"name": "jack",
		"phone": "111",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-01T05:00:00Z"
	},
	"after": null,
	"source": {
		# 忽略元數(shù)據(jù)
	},
	"op": "d", 	# 操作類型
	"ts_ms": 1705561813650,
	"transaction": null
}
  1. 可以看到op=d 代表刪除操作,對(duì)于刪除操作doris拉鏈表只需將before數(shù)據(jù)的date_date日期更新為前一日2023-06-03,導(dǎo)入語句如下:
-- 更新語句
UPDATE example_user_zip SET end_date = '2023-06-03' WHERE `id`=1 AND `update_time`='2023-06-01 13:00:00';
  1. 此時(shí)doris拉鏈表內(nèi)容如下所示:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-03 (由9999-12-31改為2023-06-03)
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31
  1. 此時(shí)若要查看2023-06-04數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-04' AND end_date >= '2023-06-04' AND expire = 0;
id name phone gender create_time update_time start_date end_date
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 2023-06-02 9999-12-31

3.2.3、并發(fā)更新

這里單獨(dú)強(qiáng)調(diào)并發(fā)更新場(chǎng)景是因?yàn)樵陉P(guān)系型數(shù)據(jù)庫(kù)中,例如MySQL,通常使用timestamp類型來表示update_time,而該數(shù)據(jù)類型的最細(xì)粒度是秒。因此,當(dāng)多個(gè)并發(fā)操作同時(shí)更新同一條數(shù)據(jù)時(shí),update_time的值只會(huì)發(fā)生一次變化,但會(huì)產(chǎn)生多條binlog日志。由于Doris的拉鏈表以id + update_time作為唯一鍵,這種情況下會(huì)導(dǎo)致同一條數(shù)據(jù)多次更新。因此,這里單獨(dú)講解并發(fā)更新的情況。

需要注意的是,并發(fā)問題只存在于更新操作,刪除和創(chuàng)建操作不會(huì)出現(xiàn)上述問題。

  1. [Mysql] 2023-06-05 當(dāng)天 15:00:00 并發(fā)更新業(yè)務(wù)數(shù)據(jù)tony的手機(jī)號(hào),表數(shù)據(jù)如下:
id name phone gender create_time update_time 備注
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00
4 tony 777 2023-06-02 10:00:00 2023-06-05 15:00:00 (tony手機(jī)號(hào)從555-> 666-> 777 并發(fā)更改兩次)
  1. 此時(shí)Flink應(yīng)用獲取到的數(shù)據(jù)如下所示:
{
	"before": {
		"id": 4,
		"name": "tony",
		"phone": "555",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-02T02:00:00Z"
	},
	"after": {
		"id": 4,
		"name": "tony",
		"phone": "666",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-05T07:00:00Z"
	},
	"source": {
		# 元數(shù)據(jù)忽略
	},
	"op": "u",
	"ts_ms": 1705564093414,
	"transaction": null
},
{
	"before": {
		"id": 4,
		"name": "tony",
		"phone": "666",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-05T07:00:00Z"
	},
	"after": {
		"id": 4,
		"name": "tony",
		"phone": "777",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-05T07:00:00Z"
	},
	"source": {
		# 元數(shù)據(jù)忽略
	},
	"op": "u",
	"ts_ms": 1705564093478,
	"transaction": null
}
  1. 可以看到op=u 代表更新操作,這里我們?nèi)匝赜迷隽扛碌倪壿?,第一條日志中業(yè)務(wù)數(shù)據(jù)555->666屬于跨天更新,第二條日志中業(yè)務(wù)數(shù)據(jù)666->777屬于一條數(shù)據(jù)當(dāng)天多次更新,DorisSql如下所示:
-- 555 -> 666 跨天更新語句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';

-- 555 -> 666 跨天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');

-- 666 -> 777 同一天更新語句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';

-- 666 -> 777 同一天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');
  1. 此時(shí)doris拉鏈表內(nèi)容如下所示:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-03
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 2023-06-04 (由9999-12-31改為2023-06-04)
4 tony 777 2023-06-02 10:00:00 2023-06-05 15:00:00 0 2023-06-05 9999-12-31 (新增一條最新數(shù)據(jù))

此時(shí)可以看到新增了一條Tony的數(shù)據(jù),有些人可能注意到少了一條姓名為Tony、手機(jī)號(hào)為666、expire字段為1的數(shù)據(jù)。這是因?yàn)樽詈蟮母潞筒迦胝Z句中的id + update_time完全一致,觸發(fā)了Doris的replace替換操作。因此,最后一條插入語句覆蓋了前一條更新語句,即"Tony, 666, expire=1"的數(shù)據(jù)被覆蓋掉了。而這種替換操作反而變相解決了并發(fā)更新的問題。

  1. 此時(shí)若要查看2023-06-04數(shù)據(jù)只需執(zhí)行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
id name phone gender create_time update_time start_date end_date
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31
4 tony 777 2023-06-02 10:00:00 2023-06-05 15:00:00 2023-06-05 9999-12-31
  1. 此時(shí)有些同學(xué)會(huì)提出問題,即這個(gè)情況和上文中的跨天更新以及當(dāng)天多次更新的邏輯有何不同? 似乎沒有特殊的操作邏輯。確實(shí),從邏輯上看,這兩種情況是一致的。這是因?yàn)槲覀兿葓?zhí)行更新操作,然后再執(zhí)行新增操作。如果我們反過來,先執(zhí)行新增操作,然后再執(zhí)行更新操作,就會(huì)導(dǎo)致數(shù)據(jù)丟失。接下來,讓我們看一下如果先執(zhí)行新增操作再執(zhí)行更新操作會(huì)發(fā)生什么情況。首先,我們將Doris拉鏈表恢復(fù)到前一天,如下所示:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-03 (由9999-12-31改為2023-06-03)
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 9999-12-31
  1. 接下來我們將更新插入操作調(diào)換順序,sql如下所示:
-- 555 -> 666 跨天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');

-- 555 -> 666 跨天更新語句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';

-- 666 -> 777 同一天插入語句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');

-- 666 -> 777 同一天更新語句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';
  1. 此時(shí)doris拉鏈表內(nèi)容如下所示:
id name phone gender create_time update_time expire start_date end_date 備注
1 jack 111 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-03
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-02
2 jason 333 2023-06-01 13:00:00 2023-06-03 10:00:00 1 2023-06-03 2023-06-03
2 jason 444 2023-06-01 13:00:00 2023-06-03 12:00:00 1 2023-06-03 2023-06-03
2 jason 555 2023-06-01 13:00:00 2023-06-03 14:00:00 0 2023-06-03 9999-12-31
3 tom 333 2023-06-01 13:00:00 2023-06-01 13:00:00 0 2023-06-01 2023-06-01
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 0 2023-06-02 9999-12-31
4 tony 555 2023-06-02 10:00:00 2023-06-02 10:00:00 0 2023-06-02 2023-06-04 (由9999-12-31改為2023-06-04)
4 tony 777 2023-06-02 10:00:00 2023-06-05 15:00:00 1 2023-06-05 2023-06-05 (更新拉鏈數(shù)據(jù))

可以看到已經(jīng)沒有tony的最新數(shù)據(jù)了。

  1. 此時(shí)查看2023-06-05數(shù)據(jù)執(zhí)行:只有兩條數(shù)據(jù)
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
id name phone gender create_time update_time start_date end_date
2 jason 222 2023-06-01 13:00:00 2023-06-01 13:00:00 2023-06-01 2023-06-02
3 tom 444 2023-06-01 13:00:00 2023-06-02 09:00:00 2023-06-02 9999-12-31

根據(jù)以上的測(cè)試結(jié)果,我們可以得出以下結(jié)論:當(dāng)涉及到更新操作時(shí),最好的做法是先執(zhí)行更新,然后再執(zhí)行插入操作。這種順序可以有效避免并發(fā)更新問題。

此外,在實(shí)時(shí)引擎中處理數(shù)據(jù)通常涉及到分布式計(jì)算,因此需要特別注意確保相同ID的數(shù)據(jù)只在一個(gè)線程中按順序執(zhí)行,而不是讓執(zhí)行器01執(zhí)行Tony的更新操作,而執(zhí)行器02執(zhí)行Tony的插入操作。相反,應(yīng)該將具有相同ID的數(shù)據(jù)放置在同一個(gè)執(zhí)行器中執(zhí)行,以確保順序性和一致性。這對(duì)于處理并發(fā)更新場(chǎng)景非常重要。

四、總結(jié)

本文我們深入探討了如何使用Apache Flink實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)中拉鏈表的同步。拉鏈表是一種重要的數(shù)據(jù)模型,用于跟蹤數(shù)據(jù)的歷史變化,以便在分析和報(bào)告中提供準(zhǔn)確的歷史視圖。我們介紹了如何借助Flink以及其他相關(guān)技術(shù)構(gòu)建一個(gè)強(qiáng)大的實(shí)時(shí)同步引擎,以應(yīng)對(duì)多種數(shù)據(jù)同步場(chǎng)景。

我們首先介紹了傳統(tǒng)Lambda架構(gòu)到實(shí)時(shí)同步+拉鏈表單引擎架構(gòu)它們之間的區(qū)別。隨后,我們深入討論了Flink CDC(Change Data Capture)和Doris數(shù)據(jù)庫(kù)的結(jié)合使用,以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步的基礎(chǔ)架構(gòu)。我們?cè)敿?xì)討論了全量同步和增量同步兩種關(guān)鍵同步模式,以及如何應(yīng)對(duì)不同的更新場(chǎng)景。

在全文中,強(qiáng)調(diào)了以下關(guān)鍵點(diǎn):

  • 實(shí)時(shí)同步+拉鏈表單引擎架構(gòu)的設(shè)計(jì)和實(shí)施。
  • 全量同步和增量同步是實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)同步的兩種關(guān)鍵模式,詳細(xì)介紹了它們的實(shí)現(xiàn)邏輯。
  • 跨天更新和當(dāng)天多次更新是需要特別注意的場(chǎng)景,提供了解決方案以確保數(shù)據(jù)的完整性。
  • 并發(fā)更新可能導(dǎo)致數(shù)據(jù)重復(fù),需要采取適當(dāng)?shù)拇胧﹣響?yīng)對(duì)。

通過深入了解實(shí)時(shí)同步和拉鏈表的實(shí)現(xiàn)細(xì)節(jié),讀者可以更好地理解如何構(gòu)建強(qiáng)大的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),并滿足不斷變化的業(yè)務(wù)需求。文章來源地址http://www.zghlxwxcb.cn/news/detail-809211.html

五、相關(guān)資料

  • Doris 數(shù)據(jù)模型
  • MySQL CDC Connector
  • 深入數(shù)倉(cāng)離線數(shù)據(jù)同步:?jiǎn)栴}分析與優(yōu)化措施

到了這里,關(guān)于Flink實(shí)時(shí)數(shù)倉(cāng)同步:拉鏈表實(shí)戰(zhàn)詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink實(shí)時(shí)電商數(shù)倉(cāng)(八)

    主要任務(wù):從kafka頁面日志主題讀取數(shù)據(jù),統(tǒng)計(jì) 七日回流用戶:之前活躍的用戶,有一段時(shí)間不活躍了,之后又開始活躍,稱為回流用戶 當(dāng)日獨(dú)立用戶數(shù):同一個(gè)用戶當(dāng)天重復(fù)登錄,只算作一個(gè)獨(dú)立用戶。 讀取kafka頁面主題數(shù)據(jù) 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu): String - JSONObject 過濾數(shù)據(jù),u

    2024年02月03日
    瀏覽(22)
  • Flink+Doris 實(shí)時(shí)數(shù)倉(cāng)

    Flink+Doris 實(shí)時(shí)數(shù)倉(cāng)

    Doris基本原理 Doris基本架構(gòu)非常簡(jiǎn)單,只有FE(Frontend)、BE(Backend)兩種角色,不依賴任何外部組件,對(duì)部署和運(yùn)維非常友好。架構(gòu)圖如下 可以 看到Doris 的數(shù)倉(cāng)架構(gòu)十分簡(jiǎn)潔,不依賴 Hadoop 生態(tài)組件,構(gòu)建及運(yùn)維成本較低。 FE(Frontend)以 Java 語言為主,主要功能職責(zé): 接收用戶

    2024年02月07日
    瀏覽(20)
  • Flink電商實(shí)時(shí)數(shù)倉(cāng)(四)

    業(yè)務(wù)數(shù)據(jù):數(shù)據(jù)都是MySQL中的表格數(shù)據(jù), 使用Flink SQL 處理 日志數(shù)據(jù):分為page頁面日志(頁面信息,曝光信息,動(dòng)作信息,報(bào)錯(cuò)信息)和啟動(dòng)日志(啟動(dòng)信息,報(bào)錯(cuò)信息),使用Flink Stream API處理 五種日志數(shù)據(jù): “start”; 啟動(dòng)信息 “err”; 錯(cuò)誤信息 “display”; 曝光信息 “ac

    2024年01月17日
    瀏覽(20)
  • Flink電商實(shí)時(shí)數(shù)倉(cāng)(三)

    Flink電商實(shí)時(shí)數(shù)倉(cāng)(三)

    維度層的重點(diǎn)和難點(diǎn)在于實(shí)時(shí)電商數(shù)倉(cāng)需要的維度信息一般是動(dòng)態(tài)的變化的,并且由于實(shí)時(shí)數(shù)倉(cāng)一般需要一直運(yùn)行,無法使用常規(guī)的配置文件重啟加載方式來修改需要讀取的ODS層數(shù)據(jù),因此需要通過Flink-cdc實(shí)時(shí)監(jiān)控MySql中的維度數(shù)據(jù)配置信息表,實(shí)時(shí)動(dòng)態(tài)的發(fā)布廣播信息。主

    2024年02月03日
    瀏覽(19)
  • Flink實(shí)時(shí)電商數(shù)倉(cāng)(十)

    Flink實(shí)時(shí)電商數(shù)倉(cāng)(十)

    app BaseApp: 作為其他子模塊中使用Flink - StreamAPI的父類,實(shí)現(xiàn)了StreamAPI中的通用邏輯,在其他子模塊中只需編寫關(guān)于數(shù)據(jù)處理的核心邏輯。 BaseSQLApp: 作為其他子模塊中使用Flink- SQLAPI的父類。在里面設(shè)置了使用SQL API的環(huán)境、并行度、檢查點(diǎn)等固定邏輯。 bean:存放其他子模塊中

    2024年02月03日
    瀏覽(22)
  • Flink CDC實(shí)時(shí)同步PG數(shù)據(jù)庫(kù)

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè)slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • Flink CDC和Flink SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)Flink寫入Doris

    Flink CDC和Flink SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)Flink寫入Doris

    軟件環(huán)境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 開啟binlog日志、創(chuàng)建用戶 1.開啟bin log MySQL 8.0默認(rèn)開啟了binlog,可以通過代碼show variables like \\\"%log_bin%\\\";查詢是否開啟了,show variables like \\\"%server_id%\\\";查詢服務(wù)器ID。 上圖分別顯示了bin long是否開啟以及bin log所在的位置。 2.創(chuàng)建用戶 C

    2024年02月02日
    瀏覽(17)
  • 基于Canal與Flink實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)增量同步(一)

    基于Canal與Flink實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    瀏覽(98)
  • [大數(shù)據(jù) Flink,Java實(shí)現(xiàn)不同數(shù)據(jù)庫(kù)實(shí)時(shí)數(shù)據(jù)同步過程]

    目錄 ??前言: ??實(shí)現(xiàn)Mysql同步Es的過程包括以下步驟: ??配置Mysql數(shù)據(jù)庫(kù)連接 ??在Flink的配置文件中,添加Mysql數(shù)據(jù)庫(kù)的連接信息。可以在flink-conf.yaml文件中添加如下配置: ??在Flink程序中,使用JDBCInputFormat來連接Mysql數(shù)據(jù)庫(kù),并定義查詢語句,獲取需要同步的數(shù)據(jù)。具體代

    2024年02月10日
    瀏覽(22)
  • 基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠(yuǎn)程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準(zhǔn)備三個(gè)數(shù)據(jù)庫(kù):flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實(shí)時(shí)同步到flink_sink和flink_sink_second的sink_test表。 (建庫(kù)建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包