什么是flink cdc?
cdc github源碼地址
cdc官方文檔
對(duì)很多初入門(mén)的人來(lái)說(shuō)是無(wú)法理解cdc到底是什么個(gè)東西。 有這樣一個(gè)需求,比如在mysql數(shù)據(jù)庫(kù)中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉(cāng)庫(kù)(starrocks), 數(shù)據(jù)倉(cāng)庫(kù)你可以理解為存儲(chǔ)了各種各樣來(lái)自不同數(shù)據(jù)庫(kù)中表。
數(shù)據(jù)的同步目前對(duì)mysql來(lái)說(shuō)比較常見(jiàn)是方式是使用:datax 和 canal配合, 為什么需要這兩個(gè)框架配合呢?
因?yàn)閐atax不支持實(shí)時(shí)的同步, datax只能定義一個(gè)范圍去同步,而且同步結(jié)束后程序就結(jié)束了。但是我想要的是數(shù)據(jù)倉(cāng)庫(kù)中的數(shù)據(jù)近乎實(shí)時(shí)的和mysql中的數(shù)據(jù)保持一致又該怎么辦? 答案是再加上canal, canal和datax相反,它只支持指定一個(gè)binlog同步,然后會(huì)一直同步到現(xiàn)在,并且程序不會(huì)結(jié)束,會(huì)一直同步。 這樣datax+canal就可以達(dá)到實(shí)時(shí)同步的功能。
這是業(yè)界比較常用的同步方式,datax同步歷史數(shù)據(jù),canal+kafka同步最新的數(shù)據(jù),而且還要有一個(gè)程序去讀取kafka中的binlog json數(shù)據(jù)(可以用flink或者spark又或者是flume)??梢钥吹竭@個(gè)鏈路比較長(zhǎng),不是很好。
下面是目前常見(jiàn)的cdc同步方案以及對(duì)比:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-401109.html
-
DataX 不支持增量同步,Canal 不支持全量同步。雖然兩者都是非常流行的數(shù)據(jù)同步工具,但
在場(chǎng)景支持上仍不完善。 - 在全量+增量一體化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持較好。
- 在架構(gòu)方面,Apache Flink 是一個(gè)非常優(yōu)秀的分布式流處理框架,因此 Flink CDC 作為
Apache Flink 的一個(gè)組件具有非常靈活的水平擴(kuò)展能力。而 DataX 和 Canal 是個(gè)單機(jī)架構(gòu),
在大數(shù)據(jù)場(chǎng)景下容易面臨性能瓶頸的問(wèn)題。 - 在數(shù)據(jù)加工的能力上,CDC 工具是否能夠方便地對(duì)數(shù)據(jù)做一些清洗、過(guò)濾、聚合,甚至關(guān)聯(lián)打
寬? Flink CDC 依托強(qiáng)大的 Flink SQL 流式計(jì)算能力,可以非常方便地對(duì)數(shù)據(jù)進(jìn)行加工。而
Debezium 等則需要通過(guò)復(fù)雜的 Java 代碼才能完成,使用門(mén)檻比較高。 - 另外,在生態(tài)方面,這里指的是上下游存儲(chǔ)的支持。Flink CDC 上下游非常豐富,支持對(duì)接
MySQL、PostgreSQL 等數(shù)據(jù)源,還支持寫(xiě)入到 TiDB、HBase、Kafka、Hudi 等各種存儲(chǔ)系統(tǒng)
中,也支持靈活的自定義 connector。 - 我們看到flink cdc 是比較友好的方案, 其內(nèi)部實(shí)現(xiàn)上用的是Debezium去采集binlong, 而且可通過(guò)參數(shù)scan.startup.mode 來(lái)控制同步行為:
- initial (默認(rèn)):在第一次啟動(dòng)時(shí)對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行全量同步,并繼續(xù)讀取最新的 binlog。
- earliest-offset:跳過(guò)快照階段,從可讀取的最早 binlog 位點(diǎn)開(kāi)始讀取
- latest-offset:首次啟動(dòng)時(shí),從不對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行快照, 連接器僅從 binlog 的結(jié)尾處開(kāi)始讀取,這意味著連接器只能讀取在連接器啟動(dòng)之后的數(shù)據(jù)更改。
- specific-offset:跳過(guò)快照階段,從指定的 binlog 位點(diǎn)開(kāi)始讀取。位點(diǎn)可通過(guò) binlog 文件名和位置指定,或者在 GTID 在集群上啟用時(shí)通過(guò) GTID 集合指定。
- timestamp:跳過(guò)快照階段,從指定的時(shí)間戳開(kāi)始讀取 binlog 事件。
一個(gè)demo
對(duì)flink_01 和flink_02 進(jìn)行兩個(gè)分表進(jìn)行同步合并到:flink_merge文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-401109.html
CREATE TABLE `flink_01` (
`indicator_name` varchar(255) DEFAULT NULL COMMENT '指標(biāo)名稱(chēng)',
`indicator_value` varchar(255) DEFAULT NULL COMMENT '指標(biāo)值',
`indicator_code` int NOT NULL COMMENT '指標(biāo)編碼',
`table_name` varchar(255) NOT NULL COMMENT '指標(biāo)計(jì)算上游表名',
`window_start` datetime NOT NULL COMMENT '窗口開(kāi)始時(shí)間',
`window_end` datetime DEFAULT NULL COMMENT '窗口截止時(shí)間',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建更新時(shí)間',
`indicator_description` varchar(255) DEFAULT NULL COMMENT '指標(biāo)描述',
PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登錄用戶(hù)數(shù)');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登錄用戶(hù)數(shù)');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62
到了這里,關(guān)于【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!