? ? ? ? ?文章末尾附有flinkcdc對(duì)應(yīng)瀚高數(shù)據(jù)庫(kù)flink-cdc-connector代碼下載地址
1、業(yè)務(wù)需求
????????目前項(xiàng)目有主數(shù)據(jù)系統(tǒng)和N個(gè)業(yè)務(wù)系統(tǒng),為保障“一數(shù)一源”,各業(yè)務(wù)系統(tǒng)表涉及到主數(shù)據(jù)系統(tǒng)的字段都需用主數(shù)據(jù)系統(tǒng)表中的字段進(jìn)行實(shí)時(shí)覆蓋,這里以某個(gè)業(yè)務(wù)系統(tǒng)的一張表舉例說(shuō)明:業(yè)務(wù)系統(tǒng)表TableB字段col_b3與主數(shù)據(jù)系統(tǒng)表TableA中col_a3不一致,需要用col_a3實(shí)時(shí)覆蓋col_b3生成目標(biāo)表TableB_new中間表,業(yè)務(wù)系統(tǒng)存儲(chǔ)為國(guó)產(chǎn)瀚高數(shù)據(jù)庫(kù),中間庫(kù)用TIDB。
2、需求分析
?????????業(yè)務(wù)系統(tǒng)已上線多年,存在歷史數(shù)據(jù)和新數(shù)據(jù),需要分兩個(gè)階段進(jìn)行處理。
????????第一階段,歷史數(shù)據(jù)通過(guò)TableA、TableB聯(lián)合關(guān)聯(lián)生成中間表TableC,其中TableC中的主數(shù)據(jù)字段已用主數(shù)據(jù)進(jìn)行了更新,再將TableC實(shí)時(shí)同步到瀚高數(shù)據(jù)庫(kù)中生成一個(gè)新的業(yè)務(wù)表TableB_new(TableC和TableB_new表結(jié)構(gòu)一致);
????????第二階段,歷史數(shù)據(jù)處理結(jié)束后,業(yè)務(wù)系統(tǒng)直接割接到新表TableB_new,后期新的業(yè)務(wù)數(shù)據(jù)用TableB_new與主數(shù)據(jù)表TableA關(guān)聯(lián),實(shí)時(shí)生成中間表TableC,再用FlinkCDC,實(shí)時(shí)同步TableC數(shù)據(jù)覆蓋TableB_new主數(shù)據(jù)字段。
3、具體實(shí)現(xiàn)
? ? ? ? 第一階段流程圖,歷史數(shù)據(jù)處理,由TableA和TableB實(shí)時(shí)關(guān)聯(lián)生成中間表TableC,再實(shí)時(shí)同步TableC到新的業(yè)務(wù)表TableB_new,完成歷史數(shù)據(jù)主數(shù)據(jù)字段的覆蓋:
? ? ? ? 第二階段流程圖,業(yè)務(wù)割接到新表TableB_new實(shí)時(shí)同步,直接由TableA和TableB_new關(guān)聯(lián)生成TableC,再用cdc任務(wù)實(shí)時(shí)同步到新業(yè)務(wù) 表TableB_new中,即可完成主數(shù)據(jù)的覆蓋:
? ? ??
4、FlinkSQL腳本
4.1、第一階段腳本
4.1.1、TableA實(shí)時(shí)關(guān)聯(lián)TableB生成中間表TableC
//指定任務(wù)名稱
set pipeline.name=task_TablA_Table_B_TableC;
//主數(shù)據(jù)源表TableA
DROP TABLE IF EXISTS TableA;
CREATE TABLE TableA(
col_a1 varchar(255),
col_a2 varchar(255),
col_a3 varchar(255)
) WITH (
'connector' = 'highgo-cdc',
'hostname' = '10.*.*.*',
'port' = '5866',
'username' = 'cdcuser',
'password' = '123456a?',
'database-name' = 'databaseA',
'schema-name' = 'public',
'table-name' = 'TableA',
'slot.name' = 'TableA',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'false'
);
//業(yè)務(wù)數(shù)據(jù)源表TableB
DROP TABLE IF EXISTS tableB;
CREATE TABLE tableB(
col_b1 varchar(255),
col_b2 varchar(255),
col_b3 varchar(255)
) WITH (
'connector' = 'highgo-cdc',
'hostname' = '10.*.*.*',
'port' = '5866',
'username' = 'cdcuser',
'password' = '123456a?',
'database-name' = 'databaseB',
'schema-name' = 'public',
'table-name' = 'TableB',
'slot.name' = 'TableB',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'false'
);
//中間表TableC
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
col_c1 varchar(255),
col_c2 varchar(255),
col_c3 varchar(255)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.8.8.8:4000/databaseC',
'username' = 'root',
'password' = '*****',
'table-name' = 'TableC',
'driver' = 'com.mysql.jdbc.Driver'
);
insert into tableC
select
b.col_b1 as col_c1,
b.col_b2 as col_c2,
CASE
WHEN a.col_a3 IS NOT NULL THEN a.col_a3
ELSE b.col_b3
END as col_c3
from TableB t1 left join TableA t2 on b.fk=a.id;
4.1.2、TableC實(shí)時(shí)同步到TableB_new
//指定任務(wù)名稱
set pipeline.name=task_TableC_TableB_new;
//中間表TableC TIDB
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
col_c1 varchar(255),
col_c2 varchar(255),
col_c3 varchar(255)
) WITH (
'connector' = 'tidb-cdc',
'tikv.grpc.timeout_in_ms' = '20000',
'pd-addresses' = '10.*.*.*:4000',
'database-name' = 'databaseC',
'table-name' = 'TableC'
);
//業(yè)務(wù)結(jié)果表 寫(xiě)入瀚高數(shù)據(jù)庫(kù)
DROP TABLE IF EXISTS TableB_new;
CREATE TABLE TableB_new(
col_b1 varchar(255),
col_b2 varchar(255),
col_b3 varchar(255)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:highgo://10.*.*.*:5866/databaseB?currentSchema=public',
'username' = 'sysdba',
'password' = '****',
'table-name' = 'TableB_new',
'driver' = 'com.highgo.jdbc.Driver'
);
insert into TableB_new
select
col_c1 as col_b1,
col_c2 as col_b2,
col_c3 as col_b3,
from TableC;
4.2、第二階段腳本
4.2.1、TableA實(shí)時(shí)關(guān)聯(lián)TableB_new生成中間表TableC
//指定任務(wù)名稱
set pipeline.name=task_TablA_Table_B_TableC;
//主數(shù)據(jù)源表TableA
DROP TABLE IF EXISTS TableA;
CREATE TABLE TableA(
col_a1 varchar(255),
col_a2 varchar(255),
col_a3 varchar(255)
) WITH (
'connector' = 'highgo-cdc',
'hostname' = '10.*.*.*',
'port' = '5866',
'username' = 'cdcuser',
'password' = '123456a?',
'database-name' = 'databaseA',
'schema-name' = 'public',
'table-name' = 'TableA',
'slot.name' = 'TableA',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'false'
);
//業(yè)務(wù)數(shù)據(jù)源表 也是目標(biāo)表TableB_new
DROP TABLE IF EXISTS TableB_new;
CREATE TABLE tableB(
col_b1 varchar(255),
col_b2 varchar(255),
col_b3 varchar(255)
) WITH (
'connector' = 'highgo-cdc',
'hostname' = '10.*.*.*',
'port' = '5866',
'username' = 'cdcuser',
'password' = '123456a?',
'database-name' = 'databaseB',
'schema-name' = 'public',
'table-name' = 'TableB_new',
'slot.name' = 'TableB_new',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'false'
);
//中間表TableC
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
col_c1 varchar(255),
col_c2 varchar(255),
col_c3 varchar(255)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.8.8.8:4000/databaseC',
'username' = 'root',
'password' = '*****',
'table-name' = 'TableC',
'driver' = 'com.mysql.jdbc.Driver'
);
insert into tableC
select
b.col_b1 as col_c1,
b.col_b2 as col_c2,
CASE
WHEN a.col_a3 IS NOT NULL THEN a.col_a3
ELSE b.col_b3
END as col_c3
from TableB_new t1 left join TableA t2 on b.fk=a.id;
4.2.2、TableC實(shí)時(shí)同步到TableB_new
? ? ? ? 與4.1.2腳本一致,略
? ? ? 備:flink-cdc-connector代碼:支持瀚高數(shù)據(jù)庫(kù)Highgo下載地址:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-779141.html
?https://github.com/lujisen/flink-cdc-connectors.githttp://xn--flink-cdc-connector-jz52b18z5q4dqpxn文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-779141.html
到了這里,關(guān)于FlinkCDC實(shí)現(xiàn)主數(shù)據(jù)與各業(yè)務(wù)系統(tǒng)數(shù)據(jù)的一致性(瀚高、TIDB)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!