1.什么是 CDC
CDC
(Change Data Capture
,數(shù)據(jù)變更抓取)是一種用于跟蹤數(shù)據(jù)庫(kù)中數(shù)據(jù)更改的技術(shù)。它用于監(jiān)視數(shù)據(jù)庫(kù)中的變化,并捕獲這些變化,以便實(shí)時(shí)或定期將變化的數(shù)據(jù)同步到其他系統(tǒng)、數(shù)據(jù)倉(cāng)庫(kù)或分析平臺(tái)。CDC 技術(shù)通常用于數(shù)據(jù)復(fù)制、數(shù)據(jù)倉(cāng)庫(kù)更新、實(shí)時(shí)報(bào)告和數(shù)據(jù)同步等場(chǎng)景。
CDC 可以捕獲數(shù)據(jù)庫(kù)中的以下類型的數(shù)據(jù)變化:
- ? 插入(
Insert
):當(dāng)新數(shù)據(jù)被插入到數(shù)據(jù)庫(kù)表中時(shí)。 - ? 更新(
Update
):當(dāng)數(shù)據(jù)庫(kù)表中的現(xiàn)有數(shù)據(jù)被修改時(shí)。 - ? 刪除(
Delete
):當(dāng)數(shù)據(jù)從數(shù)據(jù)庫(kù)表中被刪除時(shí)。
2.什么是 Flink CDC
Flink CDC
是一個(gè)開(kāi)源的數(shù)據(jù)庫(kù)變更日志捕獲和處理框架,它可以實(shí)時(shí)地從各種數(shù)據(jù)庫(kù)(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕獲數(shù)據(jù)變更并將其轉(zhuǎn)換為流式數(shù)據(jù)。Flink CDC 可以幫助實(shí)時(shí)應(yīng)用程序?qū)崟r(shí)地處理和分析這些流數(shù)據(jù),從而實(shí)現(xiàn) 數(shù)據(jù)同步、數(shù)據(jù)管道、實(shí)時(shí)分析 和 實(shí)時(shí)應(yīng)用 等功能。
本質(zhì)上是一系列的 Flink Source Connector 集合,用于來(lái)獲取數(shù)據(jù)庫(kù)的實(shí)時(shí)變更,底層基于 Debezium 實(shí)現(xiàn)。
?? https://github.com/ververica/flink-cdc-connectors
3.Flink CDC 前生今世
3.1 Flink CDC 1.x
Flink CDC 1.x
開(kāi)啟了 Flink 在 CDC 上的實(shí)踐之路,F(xiàn)link CDC 1.x
第一次引入了 Debezium 框架,利用 Debezium 已有的能力將數(shù)據(jù)庫(kù)實(shí)時(shí)變更接入到 Flink 流計(jì)算框架中,利用 Flink 豐富的生態(tài)對(duì)數(shù)據(jù)進(jìn)行加工處理,滿足不同的業(yè)務(wù)需求,在功能層面上而言,F(xiàn)link CDC 1.x
只能說(shuō)是可以用,但不能生產(chǎn)上用,為什么:
-
1.x
版本全增量切換時(shí)會(huì)對(duì)表加鎖,在同步過(guò)程中有段時(shí)間業(yè)務(wù)會(huì)處于暫停狀態(tài)。 - 各方面功能還不夠完善,比如自動(dòng)加表、DDL 事件傳遞等。
總體而言 Flink CDC 1.x
只能說(shuō)是一個(gè)比較有趣的小玩具,還不具備大規(guī)模商業(yè)盈利的價(jià)值。
3.2 Flink CDC 2.x
在 2.x
版本中,F(xiàn)link CDC 引入了 Netfix DBLog 中的無(wú)鎖算法,徹底解決了全增量切換上業(yè)務(wù)停滯的問(wèn)題,同時(shí)得益于 FLIP-27 對(duì) Flink Source API 的重構(gòu),F(xiàn)link CDC 也基于 FLIP-27 升級(jí)到了新的框架設(shè)計(jì),至此,F(xiàn)link CDC 被大規(guī)模公司使用并投入到生產(chǎn)中。
3.3 Flink CDC 3.x
近期,F(xiàn)link CDC 發(fā)布了全新的 3.0
版本,并宣布捐贈(zèng)回 Flink 主項(xiàng)目,在新的 3.0
版本中,F(xiàn)link CDC 對(duì)于接口和架構(gòu)上做了很大的升級(jí)和調(diào)整,對(duì)于整體項(xiàng)目的定位也從之前的 Flink Source Connector 轉(zhuǎn)變?yōu)榱?Data Integration Engine,未來(lái)將與 SeaTunnel
、DataX
、Chunjun
等一系列老牌數(shù)據(jù)集成項(xiàng)目同臺(tái)競(jìng)技,讓我們拭目以待。
4.Flink CDC 使用
在本地啟動(dòng)一個(gè) MySQL 的 Docker 環(huán)境。
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4
創(chuàng)建表:
create database cdc_test;
use cdc_test;
create table cdc_table (
id int primary key auto_increment,
name varchar(1000),
age int
);
在 IDEA 中新建一個(gè)Java 項(xiàng)目。
導(dǎo)入依賴:
<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
編寫(xiě)代碼:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-821495.html
public class FlinkCDCApplication {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000L);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("cdc_test.cdc_table") // set captured table
.username("root")
.password("debezium")
.includeSchemaChanges(true)
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
.print();
env.execute();
}
}
添加日志配置:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821495.html
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
5.Debezium 標(biāo)準(zhǔn) CDC Event 格式詳解
{
"before": null,
"after": {
"id": 1,
"name": "xing.yu",
"age": 26,
"new_column": "dewu"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1702723640000,
"snapshot": "false",
"db": "cdc_test",
"sequence": null,
"table": "cdc_table",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2394,
"row": 0,
"thread": 39,
"query": null
},
"op": "c",
"ts_ms": 1702723640483,
"transaction": null
}
{
// 表數(shù)據(jù)更新前的值,update/delete
"before": {},
// 表數(shù)據(jù)更新后的值,create/update
"after": {},
// 元數(shù)據(jù)信息
"source": {},
// 操作類型 c/d/u
"op": "",
// 記錄解析時(shí)間
"ts_ms": "",
"transaction": ""
}
到了這里,關(guān)于【大數(shù)據(jù)】Flink CDC 的概覽和使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!