本文會將從環(huán)境搭建到demo來全流程體驗flinkcdc 3.0
包含了如下內(nèi)容
- flink1.18 standalone搭建
- doris 1fe1be 搭建
- 整庫數(shù)據(jù)同步
- 測試各同步場景
- 從檢查點重啟同步任務(wù)
環(huán)境搭建
flink環(huán)境(Standalone模式)
下載flink 1.18.0 鏈接 : https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
解壓 :
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
修改checkpoint 時間間隔 為3秒
vim conf/flink-conf.yaml
# 94 行(set nu 顯示行)
taskmanager.numberOfTaskSlots: 2
# 148 行
execution.checkpointing.interval: 3000
啟動
./bin/start-cluster.sh
訪問頁面 : http://127.0.0.1:8081
doris環(huán)境(1fe1be)
修改環(huán)境宿主機的內(nèi)存映射
# 因為mac內(nèi)部實現(xiàn)容器的方式不同,直接修改max_map_count值可能無法成功,所以在容器中進行修改
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
# 修改內(nèi)存映射值(這個值通常用于限制一個進程打開的文件數(shù)量,默認是65530)
sysctl -w vm.max_map_count=2000000
# 退出容器
exit
使用docker compose 搭建doris 1fe1be集群
version: '3'
services:
docker-fe-01:
image: "apache/doris:1.2.2-fe-arm"
container_name: "doris-fe-01"
hostname: "fe-01"
environment:
- FE_SERVERS=fe1:172.20.80.2:9010
- FE_ID=1
ports:
- 8031:8030
- 9031:9030
volumes:
- /Users/antg/docker/doris_1fe_1be/data/fe-01/doris-meta:/opt/apache-doris/fe/doris-meta
- /Users/antg/docker/doris_1fe_1be/data/fe-01/conf:/opt/apache-doris/fe/conf
- /Users/antg/docker/doris_1fe_1be/data/fe-01/log:/opt/apache-doris/fe/log
networks:
doris_net:
ipv4_address: 172.20.80.2
docker-be-01:
image: "apache/doris:1.2.2-be-arm"
container_name: "doris-be-01"
hostname: "be-01"
depends_on:
- docker-fe-01
environment:
- FE_SERVERS=fe1:172.20.80.2:9010
- BE_ADDR=172.20.80.5:9050
ports:
- 8041:8040
volumes:
- /Users/antg/docker/doris_1fe_1be/data/be-01/storage:/opt/apache-doris/be/storage
- /Users/antg/docker/doris_1fe_1be/data/be-01/conf:/opt/apache-doris/be/conf
- /Users/antg/docker/doris_1fe_1be/data/be-01/script:/docker-entrypoint-initdb.d
- /Users/antg/docker/doris_1fe_1be/data/be-01/log:/opt/apache-doris/be/log
networks:
doris_net:
ipv4_address: 172.20.80.5
networks:
doris_net:
ipam:
config:
- subnet: 172.20.80.0/24
啟動并驗證是否啟動成功
# 啟動
docker-compose -f 1fe_1be.yaml up -d
# 連接doris
mysql -h127.0.0.1 -P9031 -uroot -p
# 創(chuàng)建數(shù)據(jù)庫 doris_sync
> create database doris_sync;
mysql環(huán)境及測試數(shù)據(jù)準備
使用本機之前安裝的mysql
建測試庫測試表
create database doris_sync;
CREATE TABLE `a_0` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `a_1` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `abc` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `table_0` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `table_1` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=101 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
其中 a_0,a_1 是分表,table_0,table_1是另外一個分表,abc是一個單獨的表
初始化插入一些測試數(shù)據(jù)
INSERT INTO `a_0` (`id`, `name`) VALUES (1, 'a');
INSERT INTO `a_1` (`id`, `name`) VALUES (2, 'b');
BEGIN;
INSERT INTO `abc` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `abc` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `abc` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `abc` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `abc` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `abc` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `abc` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `abc` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `abc` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `abc` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
BEGIN;
INSERT INTO `table_0` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `table_0` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `table_0` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `table_0` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `table_0` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `table_0` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `table_0` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `table_0` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `table_0` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `table_0` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
INSERT INTO `table_1` (`id`, `name`) VALUES (100, 'tom');
配置容器路由轉(zhuǎn)發(fā)
我們在代碼中開發(fā)過程中可能會用到容器的ip地址,例如上面的172.20.80.0/24這個網(wǎng)段,但是你會發(fā)現(xiàn)你是ping不通的,這里設(shè)計到了一些docker網(wǎng)絡(luò)的一些知識,可以在網(wǎng)上看一下資料,這里只給出解決方法
安裝路由轉(zhuǎn)發(fā)鏡像
# 現(xiàn)在連接器
brew install wenjunxiao/brew/docker-connector
# 加入路由
docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> /opt/homebrew/etc/docker-connector.conf
# 啟動路由器
sudo /opt/homebrew/opt/docker-connector/bin/docker-connector -config /opt/homebrew/etc/docker-connector.conf
# 啟動鏡像
docker run -it -d --restart always --net host --cap-add NET_ADMIN --name connector wenjunxiao/mac-docker-connector
如果還是ping不通就重啟一下上面的轉(zhuǎn)發(fā)容器
這一步很重要,想要通過訪問容器的ip就要完成這一步
依賴包準備
下載flinkcdc 的依賴包放到flink目錄下并解壓
flinkcdc 依賴 : flink-cdc-3.0.0-bin.tar.gz
下載連接器 的依賴包放到flinkcdc的lib目錄下
connector 依賴 :
- MySQL pipeline connector 3.0.0
- Apache Doris pipeline connector 3.0.0
配置FLINK_HOME環(huán)境變量
pwd
/Users/antg/software/flink-1.18.0/
export FLINK_HOME=/Users/antg/software/flink-1.18.0/
數(shù)據(jù)同步
整庫同步
編寫yaml文件 mysql-to-doris.yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 12345678
tables: doris_sync.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 127.0.0.1:8031
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
啟動任務(wù)
bash bin/flink-cdc.sh mysql-to-doris.yaml
查看頁面效果
這里可以看到同步的數(shù)據(jù)條數(shù)及大小
查看doris的數(shù)據(jù)及建表情況
可以看到表被自動創(chuàng)建并且數(shù)據(jù)也同步過來了
新增數(shù)據(jù)
INSERT INTO `a_0` (`id`, `name`) VALUES (3, 'jack');
更新數(shù)據(jù)
update a_0 set name='tom' where id=3;
刪除數(shù)據(jù)
delete from a_0 where id=1;
沒成功同步(已咨詢社區(qū)是1.2.2的bug,在1.2.3修復(fù)了,正常來說會同步)
新增字段
alter table a_0 add column age int;
修改字段
# 修改名稱
alter table a_0 change age age_range int;
# 修改字段類型
alter table a_0 modify column age_range varchar(100);
# 字段字段長度
alter table a_0 modify column age_range varchar(1200);
以上語句不會被同步
刪除字段
alter table a_0 drop column age_range;
以上語句不會被同步
刪除表
drop table a_0;
不會被同步
結(jié)論 :
1.新增數(shù)據(jù),新增字段,修改數(shù)據(jù)會被實時同步到doris
2.delete數(shù)據(jù)不會被同步(已咨詢社區(qū)是1.2.2的bug,在1.2.3修復(fù)了,正常來說會同步)
3.修改字段名稱,類型,長度不會被同步(可能有參數(shù)可以開啟)
4.刪除字段不會被同步
5.刪除表不會被同步
路由變更
這里將使用flinkcdc3.0 新增的路由功能來實現(xiàn)分表合一的效果,而且也可以做到同步到doris的庫名和表名換成自己想要的名稱
將之前的mysql端數(shù)據(jù)清理,表重新建立
需求 :
將mysql端doris_sync同步到doris的ods庫中
a_0,a_1 合并到ods_a表
abc 同步到 ods_abc表
table_0,table_1同步到 ods_table表
任務(wù)配置 route.yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 12345678
tables: doris_sync.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 127.0.0.1:8031
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: doris_sync.a_\.*
sink-table: ods.ods_a
- source-table: doris_sync.abc
sink-table: ods.ods_abc
- source-table: doris_sync.table_\.*
sink-table: ods.ods_table
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
創(chuàng)建doris端ods庫(不會自動創(chuàng)建庫,必須手動創(chuàng)建)
create database ods;
將之前的任務(wù)停掉,啟動這個任務(wù)
可以看到
1.多個分表在doris只創(chuàng)建了一個目標表
2.多個分表的數(shù)據(jù)都同步到了一個表中
非常棒的功能 ??????
測試一下新增一個分表是否會自動同步到目標表
CREATE TABLE `a_2` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `a_2` (`id`, `name`) VALUES (1000, 'a');
新增分表后,分表不會被自動同步
重啟任務(wù)
重啟后數(shù)據(jù)可以被正常同步
從checkpoint恢復(fù)任務(wù)并新增分表
先修改一下flink-conf.yaml,否則任務(wù)cancel的時候ck不會被保留,還需要修改一下ck存儲的路徑
# 在flink目錄下創(chuàng)建一個路徑存儲ck
mkdir ckdata
啟動任務(wù)
bash bin/flink-cdc.sh route.yaml
看一下ck是否正常存儲
新增表,cancel任務(wù),然后從ck處重啟
CREATE TABLE `a_4` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `a_4` (`id`, `name`) VALUES (1000000, 'a');
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 12345678
tables: doris_sync.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 127.0.0.1:8031
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: doris_sync.a_\.*
sink-table: ods.ods_a
- source-table: doris_sync.abc
sink-table: ods.ods_abc
- source-table: doris_sync.table_\.*
sink-table: ods.ods_table
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
在flink-conf最后加上ck的重啟路徑
# 查看當前路徑
pwd
/Users/antg/software/flink-1.18.0/flink-cdc-3.0.0
# 找到最新的ck存儲路徑
ll -rth ../ckdata
drwxr-xr-x@ 5 antg staff 160B Jan 21 16:27 436dfeb839b2c877d6e49023e3e099b5
drwxr-xr-x@ 5 antg staff 160B Jan 21 17:12 d519a3f930d9f410e048f63a883e1dce
drwxr-xr-x@ 5 antg staff 160B Jan 21 18:59 b0ed22a804ad34336ab3e9b328d13257
drwxr-xr-x@ 5 antg staff 160B Jan 21 19:01 394d7a89885bbd319e8ab92043283de9
drwxr-xr-x@ 5 antg staff 160B Jan 21 19:05 1547d3cf60ed278ccd3787025bb4b5f6
drwxr-xr-x@ 5 antg staff 160B Jan 21 19:07 51ff313e98fb9882f20f57bc697a8ae6
drwxr-xr-x@ 5 antg staff 160B Jan 21 19:08 f10623b642135002499775274c078b9e
drwxr-xr-x@ 5 antg staff 160B Jan 21 19:09 73b47091ca00547a5d8121474b3dbd79
ll ../ckdata/73b47091ca00547a5d8121474b3dbd79
drwxr-xr-x@ 3 antg staff 96B Jan 21 19:09 chk-172
drwxr-xr-x@ 2 antg staff 64B Jan 21 19:09 shared
drwxr-xr-x@ 2 antg staff 64B Jan 21 19:09 taskowned
# 將ck路徑加到flink-conf的最后一行
vim ../conf/flink-conf.yaml
execution.savepoint.path: file:///Users/antg/software/flink-1.18.0/ckdata/73b47091ca00547a5d8121474b3dbd79/chk-172
# 啟動任務(wù)
bin/flink-cdc.sh route.yaml
可以看到任務(wù)從檢查點重啟了
數(shù)據(jù)也正常同步
這里從ck重啟是修改了flink-conf,但是感覺這樣很不方便,嘗試過在yaml的pipeline下加上這個屬性,但是不起作用,其他位置也沒找到加ck路徑的地方,如果各位大神有其他好的方法歡迎評論區(qū)留言,也歡迎加我的個人微信一起交流各種技術(shù).文章來源:http://www.zghlxwxcb.cn/news/detail-813733.html
參考
[基于 Flink CDC 3.0 構(gòu)建 MySQL 到 Doris 的 Streaming ELT] : https://ververica.github.io/flink-cdc-connectors/release-3.0/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-doris-pipeline-tutorial-zh.html
[vm.max_map_count參數(shù)詳解] : https://blog.csdn.net/a772304419/article/details/132585239文章來源地址http://www.zghlxwxcb.cn/news/detail-813733.html
到了這里,關(guān)于flinkcdc 3.0 嘗鮮的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!