国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink cdc3.0同步實(shí)例(動態(tài)變更表結(jié)構(gòu)、分庫分表同步)

這篇具有很好參考價值的文章主要介紹了Flink cdc3.0同步實(shí)例(動態(tài)變更表結(jié)構(gòu)、分庫分表同步)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

前言

在FLink cdc 2.x的版本,各企業(yè)做了許多類似的基礎(chǔ)功能改造工作(B站 2022年企業(yè)flink cdc實(shí)踐分享 )。
最近Flink CDC 3.0發(fā)布,schema 變更自動同步、整庫同步、分庫分表等增強(qiáng)功能使 Flink CDC 3.0 在更復(fù)雜的數(shù)據(jù)集成與用戶業(yè)務(wù)場景中發(fā)揮作用:用戶無需在數(shù)據(jù)源發(fā)生 schema 變更時手動介入,大大降低用戶的運(yùn)維成本;只需對同步任務(wù)進(jìn)行簡單配置即可將多表、多庫同步至下游,并進(jìn)行合并等邏輯,顯著降低用戶的開發(fā)難度與入門門檻。Flink CDC 3.0 正式發(fā)布。
我們今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,來體驗下新上的整庫同步、表結(jié)構(gòu)變更同步和分庫分表同步的功能。

準(zhǔn)備

flink環(huán)境

準(zhǔn)備 Flink Standalone 集群,下載最新版本 Flink 1.18.0 ,解壓后得到 flink-1.18.0 目錄。并且設(shè)置 FLINK_HOME 為 flink-1.18.0 所在目錄。
通過在 conf/flink-conf.yaml 配置文件追加下列參數(shù)開啟 checkpoint,每隔 3 秒做一次 checkpoint,方便后續(xù)觀察數(shù)據(jù)變更。

execution.checkpointing.interval: 3000

使用下面的命令啟動 Flink 集群。

./bin/start-cluster.sh

啟動成功的話,可以在 http://localhost:8081/ 訪問到 Flink Web UI,如下所示:
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
多次執(zhí)行 start-cluster.sh 可以拉起多個 TaskManager,保證Total Task Slots >= 2, 不然提交任務(wù)會有資源不足異常,比如我這里執(zhí)行了3次。 或者是修改 conf/flink-conf.yaml 資源配置。

docker構(gòu)建mysql、doris環(huán)境

如果有安裝這兩個組件,就可以免去docker,接下來的教程將以 docker-compose 的方式準(zhǔn)備所需要的組件。
由于 Doris 的運(yùn)行需要內(nèi)存映射支持,需在宿主機(jī)執(zhí)行如下命令:

sysctl -w vm.max_map_count=2000000

docker 鏡像啟動,使用下面的內(nèi)容創(chuàng)建一個 docker-compose.yml 文件:

version: '2.1'
services:
  doris:
    image: yagagagaga/doris-standalone
    ports:
      - "8030:8030"
      - "8040:8040"
      - "9030:9030"
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

該 Docker Compose 中包含的容器有:

MySQL: 包含商品信息的數(shù)據(jù)庫 app_db

Doris: 存儲從 MySQL 中根據(jù)規(guī)則映射過來的結(jié)果表

docker-compose.yml 所在目錄下執(zhí)行下面的命令來啟動本教程需要的組件:

docker-compose up -d

該命令將以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的容器是否正常啟動了,也可以通過訪問 http://localhost:8030/ 來查看 Doris 是否運(yùn)行正常。

數(shù)據(jù)準(zhǔn)備

進(jìn)入 MySQL 容器, 或者通過客戶端工具連接到mysql

docker-compose exec mysql mysql -uroot -p123456

創(chuàng)建數(shù)據(jù)庫 app_db 和表 orders,products 并插入數(shù)據(jù)

-- 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE app_db;

USE app_db;

-- 創(chuàng)建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入數(shù)據(jù)
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- 創(chuàng)建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入數(shù)據(jù)
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- 創(chuàng)建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

Doris 暫時不支持自動創(chuàng)建數(shù)據(jù)庫,需要先創(chuàng)建寫入表對應(yīng)的數(shù)據(jù)庫。
進(jìn)入 Doris Web UI。http://localhost:8030/,默認(rèn)的用戶名為 root,默認(rèn)密碼為空。
通過 Web UI 創(chuàng)建 app_db 數(shù)據(jù)庫

create database if not exists app_db;

flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫

通過 FlinkCDC cli 提交任務(wù)

下載下面列出的二進(jìn)制壓縮包,并解壓得到目錄 flink-cdc-3.0.0
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下會包含 bin、lib、log、conf 四個目錄。

下載下面列出的 connector 包,并且移動到 lib 目錄下

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0
    flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫

整庫同步

編寫任務(wù)配置 yaml 文件,下面給出了一個整庫同步的示例文件 mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

其中:
source 中的 tables: app_db.\.* 通過正則匹配同步 app_db 下的所有表。
sink 添加 table.create.properties.replication_num 參數(shù)是由于 Docker 鏡像中只有一個 Doris BE 節(jié)點(diǎn)。

最后,通過命令行提交任務(wù)到 Flink Standalone cluster

bash bin/flink-cdc.sh conf/mysql-to-doris.yaml

提交成功后,返回信息如:
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
在 Flink Web UI,可以看到一個名為 Sync MySQL Database to Doris 的任務(wù)正在運(yùn)行。job id對應(yīng)上面的cb049fe4a2112510a77ee46e197054a6
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
打開 Doris 的 Web UI,可以看到數(shù)據(jù)表已經(jīng)被創(chuàng)建出來,數(shù)據(jù)能成功寫入。
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫

同步變更

接下來,修改 MySQL 數(shù)據(jù)庫中表的數(shù)據(jù),Doris 中顯示的訂單數(shù)據(jù)也將實(shí)時更新:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 區(qū)別于官方再新增一條數(shù)據(jù)
INSERT INTO app_db.orders VALUES (4, 200, 200.00);

也可以拆開每執(zhí)行一步,刷新一次 Doris Web UI,可以看到 Doris 中顯示的 orders 數(shù)據(jù)將實(shí)時更新,如下所示:
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
同樣的,去修改 shipments, products 表,也能在 Doris 中實(shí)時看到同步變更的結(jié)果。

路由變更

Flink CDC 提供了將源表的表結(jié)構(gòu)/數(shù)據(jù)路由到其他表名的配置,借助這種能力,我們能夠?qū)崿F(xiàn)表名庫名替換,整庫同步等功能。
下面提供一個配置文件conf/mysql-to-doris-route.yaml說明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db1.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  benodes: 127.0.0.1:8040
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: app_db1.orders\.*
    sink-table: app_db1.ods_orders

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

通過上面的 route 配置,使用正則表達(dá)式,可以將諸如 app_db1.order_01、app_db1.order_02 的表匯總到 app_db1.ods_orders 中。從而實(shí)現(xiàn)分庫分表同步的功能。注意,目前還不支持多表中存在相同主鍵數(shù)據(jù)的場景,將在后續(xù)版本支持。
另外官方文檔里的寫法存在一個問題。
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
正則表達(dá)式前面加上’\‘轉(zhuǎn)義,app_db1.orders\.*,否則會拋出異常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
目前已在git提了issue,后面應(yīng)該會處理這里的問題。
我們在mysql和doris分別創(chuàng)建數(shù)據(jù)庫app_db1, 然后初始化mysql

-- 創(chuàng)建表orders_01
CREATE TABLE `orders_01` (
  `id` int NOT NULL,
  `price` decimal(10,2) NOT NULL,
  `amount` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- 創(chuàng)建表orders_02
CREATE TABLE `orders_02` (
  `id` int NOT NULL,
  `price` decimal(10,2) NOT NULL,
  `amount` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

啟動新的job。
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
然后在orders_01,orders_02分別插入數(shù)據(jù)


INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);

在doris里驗證,數(shù)據(jù)都寫入了app_db1.ods_orders
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫

路由表結(jié)構(gòu)不一致無法同步

看Schema Evolution 設(shè)計原理,F(xiàn)link CDC 3.0 在作業(yè)拓?fù)渲幸肓?SchemaRegistry,結(jié)合 SchemaOperator 協(xié)調(diào)并控制作業(yè)拓?fù)渲械?schema 變更事件處理。當(dāng)上游數(shù)據(jù)源發(fā)生 schema 變更時,SchemaRegistry 會控制 SchemaOperator 以暫停數(shù)據(jù)流,并將流水線中的數(shù)據(jù)從 sink 全部刷出以保證 schema 一致性。當(dāng) schema 變更事件在外部系統(tǒng)處理成功后,SchemaOperator 恢復(fù)數(shù)據(jù)流,完成本次 schema 變更的處理。
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
所以考慮只修改orders_01,再插入數(shù)據(jù)看doris同步的變化。

-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);

可以看到doris中的app_db1.orders表結(jié)構(gòu)發(fā)生了變化,但是orders_02的id=14這條數(shù)據(jù)沒有正常寫入。flink異常提示:java.lang.IllegalStateException: Column size does not match the data size
flinkcdc 3.0整庫同步,大數(shù)據(jù),flink,flink,大數(shù)據(jù),cdc,動態(tài)變更,分表分庫
而當(dāng)修改orders_02的表結(jié)構(gòu),也會有異常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后寫入的數(shù)據(jù)無法正常同步。

結(jié)尾

flink cdc的功能越來越強(qiáng),也再嘗試解決用戶的使用痛點(diǎn)。不過放到生產(chǎn)環(huán)境使用還需要建立在更多的實(shí)踐測試之上。文章來源地址http://www.zghlxwxcb.cn/news/detail-775011.html

到了這里,關(guān)于Flink cdc3.0同步實(shí)例(動態(tài)變更表結(jié)構(gòu)、分庫分表同步)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個項目,使用的是pg數(shù)據(jù)庫,公司沒有成熟的DCD組件,為了實(shí)現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽數(shù)據(jù)變化,進(jìn)行異步通知,做系統(tǒng)內(nèi)異步任務(wù)。 架構(gòu)方案(懶得寫了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)
  • 實(shí)戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴: 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象 5、CDC 數(shù)據(jù)實(shí)體類 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(23)
  • Flink CDC數(shù)據(jù)同步

    Flink CDC數(shù)據(jù)同步

    一、什么是FLink Apache?Flink?是一個框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計算。Flink?能在所有常見集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計算。 接下來,我們來介紹一下?Flink?架構(gòu)中的重要方面。 任何類型的數(shù)據(jù)都可以形成一種事

    2024年02月08日
    瀏覽(19)
  • Flink CDC整庫同步

    背景 項目需要能夠捕獲外部數(shù)據(jù)源的數(shù)據(jù)變更,實(shí)時同步到目標(biāo)數(shù)據(jù)庫中,自動更新數(shù)據(jù),實(shí)現(xiàn)源數(shù)據(jù)庫和目標(biāo)數(shù)據(jù)庫所有表的數(shù)據(jù)同步更新,本文以mysql - greenplumn場景記錄實(shí)現(xiàn)方案。 實(shí)現(xiàn) 1.引入依賴 2.創(chuàng)建FlinkCDCSource 創(chuàng)建FlinkCDC連接器,設(shè)置數(shù)據(jù)源的連接信息,日志捕獲的

    2024年04月14日
    瀏覽(27)
  • Flink CDC整庫同步(多表異構(gòu)同步)

    flinkcdc單表同步比較簡單,按照官方案例基本都能成功,多表異構(gòu)同步、整庫同步這塊一直想嘗試一下,社區(qū)說使用API可以做到,但是一直沒能白嫖到可行方案(代碼),然后自己動手嘗試了下,咳咳,無奈技術(shù)太菜,java各種語法都搞的不是太明白,時間跨度蠻久,中間遇到

    2024年02月11日
    瀏覽(18)
  • 基于 Flink CDC 的實(shí)時同步系統(tǒng)

    基于 Flink CDC 的實(shí)時同步系統(tǒng)

    摘要: 本文整理自科杰科技大數(shù)據(jù)架構(gòu)師張軍,在 FFA 2022 數(shù)據(jù)集成專場的分享。本篇內(nèi)容主要分為四個部分: 功能概述 架構(gòu)設(shè)計 技術(shù)挑戰(zhàn) 生產(chǎn)實(shí)踐 Tips: 點(diǎn)擊 「閱讀原文」 查看原文視頻演講 ppt 科杰科技是專門做大數(shù)據(jù)服務(wù)的供應(yīng)商,目前的客戶包括能源、金融、證券等

    2024年02月05日
    瀏覽(29)
  • 【實(shí)戰(zhàn)-01】flink cdc 實(shí)時數(shù)據(jù)同步利器

    【實(shí)戰(zhàn)-01】flink cdc 實(shí)時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(91)
  • flink oracle cdc實(shí)時同步(超詳細(xì))

    flink oracle cdc實(shí)時同步(超詳細(xì))

    官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文參照官方文檔來記錄Oracle CDC 的配置。 在本文開始前,需要先安裝Oracle,有興趣的同學(xué)可以參考博主之前寫的《docker下安裝oracle11g(一次安裝成功)》。 如果要做oracle的實(shí)時同步

    2024年02月12日
    瀏覽(21)
  • Flink CDC MySQL同步MySQL錯誤記錄

    Flink CDC MySQL同步MySQL錯誤記錄

    0、相關(guān)Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者從mvnrepository.com下載 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    瀏覽(21)
  • Flink CDC實(shí)時同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包