1.CDC概述
CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術。它允許實時地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動,并將這些變動抽取出來,以便進行進一步的處理和分析。
傳統(tǒng)上,數(shù)據(jù)源的變化通常通過周期性地輪詢整個數(shù)據(jù)集進行檢查來實現(xiàn)。但是,這種輪詢的方式效率低下且不能實時反應變化。而 CDC 技術則通過在數(shù)據(jù)源上設置一種機制,使得變化的數(shù)據(jù)可以被實時捕獲并傳遞給下游處理系統(tǒng),從而實現(xiàn)了實時的數(shù)據(jù)變動監(jiān)控。
Flink 作為一個強大的流式計算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫、消息隊列等),捕獲其中的數(shù)據(jù)變化,并進行靈活的實時處理和分析。
通過使用 Flink CDC,我們可以輕松地構建實時數(shù)據(jù)管道,對數(shù)據(jù)變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。
?
2.CDC 的實現(xiàn)原理
通常來講,CDC 分為主動查詢和事件接收兩種技術實現(xiàn)模式。對于主動查詢而言,用戶通常會在數(shù)據(jù)源表的某個字段中,保存上次更新的時間戳或版本號等信息,然后下游通過不斷的查詢和與上次的記錄做對比,來確定數(shù)據(jù)是否有變動,是否需要同步。這種方式優(yōu)點是不涉及數(shù)據(jù)庫底層特性,實現(xiàn)比較通用;缺點是要對業(yè)務表做改造,且實時性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對數(shù)據(jù)庫的壓力較大。事件接收模式可以通過觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來實現(xiàn)。當數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實現(xiàn)同步。這種方式的優(yōu)點是實時性高,可以精確捕捉上游的各種變動;缺點是部署數(shù)據(jù)庫的事件接收和解析器(例如 Debezium、Canal 等),有一定的學習和運維成本,對一些冷門的數(shù)據(jù)庫支持不夠。綜合來看,事件接收模式整體在實時性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫實現(xiàn),建議使用Debezium來實現(xiàn)變更數(shù)據(jù)的捕獲(下圖來自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。
?
3.為什么選 Flink
從上圖可以看到,Debezium 官方架構圖中,是通過 Kafka Streams 直接實現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因為 Flink 相對 Kafka Streams 而言,有如下優(yōu)勢:
強大的流處理引擎: Flink 是一個強大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語義等特性。它通過基于事件時間的處理模型,支持準確和有序的數(shù)據(jù)處理,適用于實時數(shù)據(jù)處理和分析場景。這使得 Flink 成為實現(xiàn) CDC 的理想選擇。
內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實現(xiàn) CDC 變得更加簡單和高效。
多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進行集成,如關系型數(shù)據(jù)庫(如MySQL、PostgreSQL)、消息隊列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無論你的數(shù)據(jù)存儲在哪里,F(xiàn)link 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進行進一步的實時處理和分析。
靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強大的數(shù)據(jù)處理能力,可以通過編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來對 CDC 數(shù)據(jù)進行各種實時計算和分析。同時,F(xiàn)link 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語句或 Table API 進行簡單查詢和分析的方式。
完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,F(xiàn)link 還與其他流行的開源項目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。
4.支持的連接器
?5.支持的 Flink 版本
?
6.Flink CDC特性
支持讀取數(shù)據(jù)庫快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進行Exactly-once處理
DataStream API 的 CDC 連接器,用戶可以在單個作業(yè)中使用多個數(shù)據(jù)庫和表的更改,而無需部署 Debezium 和 Kafka
Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來監(jiān)視單個表上的更改
下表顯示了連接器的當前特性:
?
7.用法實例
7.1DataStream API 的用法(推薦)
請嚴格按照上面的《5.支持的 Flink 版本》搭配來使用Flink CDC
<properties>
<flink.version>1.13.0</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
請?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務即可
my.cnf
[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30
ddl&dml.sql
create table test_cdc
(
id int not null
primary key,
name varchar(100) null,
age int null
);
INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);
FlinkDSCDC.java
package com.daniel.util;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author Daniel
* @Date: 2023/7/25 10:03
* @Description DataStream API CDC
**/
public class FlinkDSCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink")
// 這里一定要是db.table的形式
.tableList("flink.test_cdc")
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.print();
env.execute("FlinkDSCDC");
}
}
UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;
打印出的日志
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
可以得出的結論:
1、日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個更新操作,對應的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個更新操作,對應的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。
2、每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(version)、連接器名稱(connector)、時間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來源和處理過程。
3、日志中的 sourceOffset 包含了一些關鍵信息,如事務ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準確順序和一致性。
7.2Table/SQL API的用法
FlinkSQLCDC.java文章來源:http://www.zghlxwxcb.cn/news/detail-681194.html
package com.daniel.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Author Daniel
* @Date: 2023/7/25 15:25
* @Description
**/
public class FlinkSQLCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE test_cdc (" +
" id int primary key," +
" name STRING," +
" age int" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'flink'," +
" 'table-name' = 'test_cdc'" +
")");
Table table = tableEnv.sqlQuery("select * from test_cdc");
DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);
dataStreamSource.print();
env.execute("FlinkSQLCDC");
}
}
UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);
打印出的日志文章來源地址http://www.zghlxwxcb.cn/news/detail-681194.html
(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])
到了這里,關于Flink CDC介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!