前言
在FLink cdc 2.x的版本,各企業(yè)做了許多類似的基礎(chǔ)功能改造工作(B站 2022年企業(yè)flink cdc實(shí)踐分享 )。
最近Flink CDC 3.0發(fā)布,schema 變更自動同步、整庫同步、分庫分表等增強(qiáng)功能使 Flink CDC 3.0 在更復(fù)雜的數(shù)據(jù)集成與用戶業(yè)務(wù)場景中發(fā)揮作用:用戶無需在數(shù)據(jù)源發(fā)生 schema 變更時手動介入,大大降低用戶的運(yùn)維成本;只需對同步任務(wù)進(jìn)行簡單配置即可將多表、多庫同步至下游,并進(jìn)行合并等邏輯,顯著降低用戶的開發(fā)難度與入門門檻。Flink CDC 3.0 正式發(fā)布。
我們今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,來體驗下新上的整庫同步、表結(jié)構(gòu)變更同步和分庫分表同步的功能。
準(zhǔn)備
flink環(huán)境
準(zhǔn)備 Flink Standalone 集群,下載最新版本 Flink 1.18.0 ,解壓后得到 flink-1.18.0 目錄。并且設(shè)置 FLINK_HOME 為 flink-1.18.0 所在目錄。
通過在 conf/flink-conf.yaml
配置文件追加下列參數(shù)開啟 checkpoint,每隔 3 秒做一次 checkpoint,方便后續(xù)觀察數(shù)據(jù)變更。
execution.checkpointing.interval: 3000
使用下面的命令啟動 Flink 集群。
./bin/start-cluster.sh
啟動成功的話,可以在 http://localhost:8081/ 訪問到 Flink Web UI,如下所示:
多次執(zhí)行 start-cluster.sh 可以拉起多個 TaskManager,保證Total Task Slots >= 2, 不然提交任務(wù)會有資源不足異常,比如我這里執(zhí)行了3次。 或者是修改 conf/flink-conf.yaml
資源配置。
docker構(gòu)建mysql、doris環(huán)境
如果有安裝這兩個組件,就可以免去docker,接下來的教程將以 docker-compose 的方式準(zhǔn)備所需要的組件。
由于 Doris 的運(yùn)行需要內(nèi)存映射支持,需在宿主機(jī)執(zhí)行如下命令:
sysctl -w vm.max_map_count=2000000
docker 鏡像啟動,使用下面的內(nèi)容創(chuàng)建一個 docker-compose.yml
文件:
version: '2.1'
services:
doris:
image: yagagagaga/doris-standalone
ports:
- "8030:8030"
- "8040:8040"
- "9030:9030"
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
該 Docker Compose 中包含的容器有:
MySQL: 包含商品信息的數(shù)據(jù)庫 app_db
Doris: 存儲從 MySQL 中根據(jù)規(guī)則映射過來的結(jié)果表
在 docker-compose.yml
所在目錄下執(zhí)行下面的命令來啟動本教程需要的組件:
docker-compose up -d
該命令將以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的容器是否正常啟動了,也可以通過訪問 http://localhost:8030/ 來查看 Doris 是否運(yùn)行正常。
數(shù)據(jù)準(zhǔn)備
進(jìn)入 MySQL 容器, 或者通過客戶端工具連接到mysql
docker-compose exec mysql mysql -uroot -p123456
創(chuàng)建數(shù)據(jù)庫 app_db 和表 orders,products
并插入數(shù)據(jù)
-- 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE app_db;
USE app_db;
-- 創(chuàng)建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入數(shù)據(jù)
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
-- 創(chuàng)建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入數(shù)據(jù)
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
-- 創(chuàng)建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
Doris 暫時不支持自動創(chuàng)建數(shù)據(jù)庫,需要先創(chuàng)建寫入表對應(yīng)的數(shù)據(jù)庫。
進(jìn)入 Doris Web UI。http://localhost:8030/,默認(rèn)的用戶名為 root,默認(rèn)密碼為空。
通過 Web UI 創(chuàng)建 app_db 數(shù)據(jù)庫
create database if not exists app_db;
通過 FlinkCDC cli 提交任務(wù)
下載下面列出的二進(jìn)制壓縮包,并解壓得到目錄 flink-cdc-3.0.0
:
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下會包含 bin、lib、log、conf 四個目錄。
下載下面列出的 connector 包,并且移動到 lib 目錄下
- MySQL pipeline connector 3.0.0
-
Apache Doris pipeline connector 3.0.0
整庫同步
編寫任務(wù)配置 yaml 文件,下面給出了一個整庫同步的示例文件 mysql-to-doris.yaml:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
其中:
source 中的 tables: app_db.\.*
通過正則匹配同步 app_db
下的所有表。
sink 添加 table.create.properties.replication_num
參數(shù)是由于 Docker 鏡像中只有一個 Doris BE 節(jié)點(diǎn)。
最后,通過命令行提交任務(wù)到 Flink Standalone cluster
bash bin/flink-cdc.sh conf/mysql-to-doris.yaml
提交成功后,返回信息如:
在 Flink Web UI,可以看到一個名為 Sync MySQL Database to Doris
的任務(wù)正在運(yùn)行。job id對應(yīng)上面的cb049fe4a2112510a77ee46e197054a6
打開 Doris 的 Web UI,可以看到數(shù)據(jù)表已經(jīng)被創(chuàng)建出來,數(shù)據(jù)能成功寫入。
同步變更
接下來,修改 MySQL 數(shù)據(jù)庫中表的數(shù)據(jù),Doris 中顯示的訂單數(shù)據(jù)也將實(shí)時更新:
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 區(qū)別于官方再新增一條數(shù)據(jù)
INSERT INTO app_db.orders VALUES (4, 200, 200.00);
也可以拆開每執(zhí)行一步,刷新一次 Doris Web UI,可以看到 Doris 中顯示的 orders 數(shù)據(jù)將實(shí)時更新,如下所示:
同樣的,去修改 shipments, products 表,也能在 Doris 中實(shí)時看到同步變更的結(jié)果。
路由變更
Flink CDC 提供了將源表的表結(jié)構(gòu)/數(shù)據(jù)路由到其他表名的配置,借助這種能力,我們能夠?qū)崿F(xiàn)表名庫名替換,整庫同步等功能。
下面提供一個配置文件conf/mysql-to-doris-route.yaml
說明:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db1.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
benodes: 127.0.0.1:8040
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: app_db1.orders\.*
sink-table: app_db1.ods_orders
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
通過上面的 route 配置,使用正則表達(dá)式,可以將諸如 app_db1.order_01、app_db1.order_02 的表匯總到 app_db1.ods_orders 中。從而實(shí)現(xiàn)分庫分表同步的功能。注意,目前還不支持多表中存在相同主鍵數(shù)據(jù)的場景,將在后續(xù)版本支持。
另外官方文檔里的寫法存在一個問題。
正則表達(dá)式前面加上’\‘轉(zhuǎn)義,app_db1.orders\.*
,否則會拋出異常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
目前已在git提了issue,后面應(yīng)該會處理這里的問題。
我們在mysql和doris分別創(chuàng)建數(shù)據(jù)庫app_db1
, 然后初始化mysql
-- 創(chuàng)建表orders_01
CREATE TABLE `orders_01` (
`id` int NOT NULL,
`price` decimal(10,2) NOT NULL,
`amount` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- 創(chuàng)建表orders_02
CREATE TABLE `orders_02` (
`id` int NOT NULL,
`price` decimal(10,2) NOT NULL,
`amount` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
啟動新的job。
然后在orders_01,orders_02分別插入數(shù)據(jù)
INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);
在doris里驗證,數(shù)據(jù)都寫入了app_db1.ods_orders
路由表結(jié)構(gòu)不一致無法同步
看Schema Evolution 設(shè)計原理,F(xiàn)link CDC 3.0 在作業(yè)拓?fù)渲幸肓?SchemaRegistry,結(jié)合 SchemaOperator 協(xié)調(diào)并控制作業(yè)拓?fù)渲械?schema 變更事件處理。當(dāng)上游數(shù)據(jù)源發(fā)生 schema 變更時,SchemaRegistry 會控制 SchemaOperator 以暫停數(shù)據(jù)流,并將流水線中的數(shù)據(jù)從 sink 全部刷出以保證 schema 一致性。當(dāng) schema 變更事件在外部系統(tǒng)處理成功后,SchemaOperator 恢復(fù)數(shù)據(jù)流,完成本次 schema 變更的處理。
所以考慮只修改orders_01,再插入數(shù)據(jù)看doris同步的變化。
-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);
可以看到doris中的app_db1.orders
表結(jié)構(gòu)發(fā)生了變化,但是orders_02
的id=14這條數(shù)據(jù)沒有正常寫入。flink異常提示:java.lang.IllegalStateException: Column size does not match the data size
而當(dāng)修改orders_02的表結(jié)構(gòu),也會有異常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后寫入的數(shù)據(jù)無法正常同步。文章來源:http://www.zghlxwxcb.cn/news/detail-775011.html
結(jié)尾
flink cdc的功能越來越強(qiáng),也再嘗試解決用戶的使用痛點(diǎn)。不過放到生產(chǎn)環(huán)境使用還需要建立在更多的實(shí)踐測試之上。文章來源地址http://www.zghlxwxcb.cn/news/detail-775011.html
到了這里,關(guān)于Flink cdc3.0同步實(shí)例(動態(tài)變更表結(jié)構(gòu)、分庫分表同步)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!