CDC介紹
數(shù)據(jù)庫(kù)中的CDC(Change Data Capture,變更數(shù)據(jù)捕獲)是一種用于實(shí)時(shí)跟蹤數(shù)據(jù)庫(kù)中數(shù)據(jù)變化的技術(shù)。CDC的主要目的是在數(shù)據(jù)庫(kù)中捕獲增量數(shù)據(jù),以便在需要時(shí)可以輕松地將這些數(shù)據(jù)合并到其他系統(tǒng)或應(yīng)用程序中。CDC在數(shù)據(jù)庫(kù)管理、數(shù)據(jù)同步、數(shù)據(jù)集成和數(shù)據(jù)備份等方面具有廣泛的應(yīng)用。
CDC通常通過(guò)以下幾種方式實(shí)現(xiàn):
-
基于觸發(fā)器的CDC:在表上創(chuàng)建觸發(fā)器,當(dāng)數(shù)據(jù)發(fā)生更改時(shí),觸發(fā)器會(huì)將更改的數(shù)據(jù)記錄到其他系統(tǒng)或表中。
-
基于事務(wù)日志的CDC:通過(guò)讀取數(shù)據(jù)庫(kù)事務(wù)日志,將日志中的更改記錄解析為可操作的數(shù)據(jù)。這種方法通常用于增量備份和恢復(fù)。
-
基于游標(biāo)的CDC:在數(shù)據(jù)庫(kù)中使用游標(biāo),逐行處理數(shù)據(jù)更改,并將這些更改應(yīng)用于其他系統(tǒng)或表。
-
基于時(shí)間戳的CDC:為表中的每個(gè)數(shù)據(jù)行分配一個(gè)時(shí)間戳,當(dāng)數(shù)據(jù)發(fā)生更改時(shí),更新相應(yīng)的時(shí)間戳。然后,可以使用時(shí)間戳來(lái)識(shí)別和處理數(shù)據(jù)更改。
-
基于消息隊(duì)列的CDC:將數(shù)據(jù)更改作為事件發(fā)送到消息隊(duì)列,以便其他系統(tǒng)或應(yīng)用程序可以訂閱和處理這些事件。
Flink CDC
Flink CDC(Change Data Capture,即數(shù)據(jù)變更抓?。┦且粋€(gè)開源的數(shù)據(jù)庫(kù)變更日志捕獲和處理框架,它可以實(shí)時(shí)地從各種數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕獲數(shù)據(jù)變更并將其轉(zhuǎn)換為流式數(shù)據(jù)。Flink CDC 可以幫助實(shí)時(shí)應(yīng)用程序?qū)崟r(shí)地處理和分析這些流數(shù)據(jù),從而實(shí)現(xiàn)數(shù)據(jù)同步、數(shù)據(jù)管道、實(shí)時(shí)分析和實(shí)時(shí)應(yīng)用等功能。
Flink CDC 的主要特點(diǎn)包括:
-
支持多種數(shù)據(jù)庫(kù)類型:Flink CDC 支持多種數(shù)據(jù)庫(kù),如 MySQL、PostgreSQL、Oracle、MongoDB 等。
-
實(shí)時(shí)數(shù)據(jù)捕獲:Flink CDC 能夠?qū)崟r(shí)捕獲數(shù)據(jù)庫(kù)中的數(shù)據(jù)變更,并將其轉(zhuǎn)換為流式數(shù)據(jù)。
-
高性能:Flink CDC 基于 Flink 引擎,具有高性能的數(shù)據(jù)處理能力。
-
低延遲:Flink CDC 可以在毫秒級(jí)的延遲下處理大量的數(shù)據(jù)變更。
-
易集成:Flink CDC 與 Flink 生態(tài)系統(tǒng)緊密集成,可以方便地與其他 Flink 應(yīng)用程序一起使用。
-
高可用性:Flink CDC 支持實(shí)時(shí)備份和恢復(fù),確保數(shù)據(jù)的高可用性。
適用于場(chǎng)景?
Flink CDC 可以用于各種場(chǎng)景,如:
-
實(shí)時(shí)數(shù)據(jù)同步:將數(shù)據(jù)從一個(gè)數(shù)據(jù)庫(kù)實(shí)時(shí)同步到另一個(gè)數(shù)據(jù)庫(kù)。
-
實(shí)時(shí)數(shù)據(jù)管道:構(gòu)建實(shí)時(shí)數(shù)據(jù)處理管道,處理和分析數(shù)據(jù)庫(kù)中的數(shù)據(jù)。
-
實(shí)時(shí)數(shù)據(jù)分析:實(shí)時(shí)分析數(shù)據(jù)庫(kù)中的數(shù)據(jù),提供實(shí)時(shí)的業(yè)務(wù)洞察。
-
實(shí)時(shí)應(yīng)用:將數(shù)據(jù)庫(kù)中的數(shù)據(jù)實(shí)時(shí)應(yīng)用于實(shí)時(shí)應(yīng)用程序,如實(shí)時(shí)報(bào)表、實(shí)時(shí)推薦等。
-
實(shí)時(shí)監(jiān)控:實(shí)時(shí)監(jiān)控?cái)?shù)據(jù)庫(kù)中的數(shù)據(jù),檢測(cè)異常和錯(cuò)誤。
Flink CDC 的簡(jiǎn)單用例
數(shù)據(jù)庫(kù)配置
創(chuàng)建數(shù)據(jù)庫(kù)和相應(yīng)的表
創(chuàng)建mydb數(shù)據(jù)庫(kù),并創(chuàng)建user表
create database mydb;
create table user(
id bigint primary key auto_increment,
name varchar(255)
);
INSERT INTO mydb.user (name) VALUES ('小明');
INSERT INTO mydb.user (name) VALUES ('小紅');
創(chuàng)建了一個(gè)名為 mydb
的數(shù)據(jù)庫(kù),并在其中創(chuàng)建了一個(gè)名為 user
的表。表中包含一個(gè)主鍵 id
和一個(gè)字符串類型的 name
字段。還向 user
表中插入了兩條記錄,分別是 '小明'
和 '小紅'
。
開啟mysql數(shù)據(jù)庫(kù)bin-log日志
1.如果是服務(wù)器
在my.cnf中添加binlog配置,并重啟mysql數(shù)據(jù)庫(kù)
server-id = 123
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
gtid_mode = on
enforce_gtid_consistency = on
已經(jīng)為 MySQL 設(shè)置了一些配置參數(shù)。下面是對(duì)這些參數(shù)的解釋:
-
server-id = 123:指定服務(wù)器的唯一標(biāo)識(shí)符,通常用于區(qū)分不同的數(shù)據(jù)庫(kù)服務(wù)器。
-
log_bin = mysql-bin:?jiǎn)⒂枚M(jìn)制日志記錄,以便在數(shù)據(jù)庫(kù)出現(xiàn)故障時(shí)可以恢復(fù)數(shù)據(jù)。
-
binlog_format = row:指定二進(jìn)制日志的記錄格式。row 格式會(huì)記錄每個(gè)更改行的詳細(xì)信息,這對(duì)于需要事務(wù)完整性的應(yīng)用程序非常有用。
-
binlog_row_image = full:設(shè)置 row 格式的二進(jìn)制日志記錄行的完整信息,包括列值、注釋等。這有助于提高應(yīng)用程序的可恢復(fù)性。
-
expire_logs_days = 10:設(shè)置自動(dòng)清理過(guò)期二進(jìn)制日志文件的天數(shù)。在這個(gè)例子中,設(shè)置為 10 天。
-
gtid_mode = on:?jiǎn)⒂萌质聞?wù) ID 模式,這使得基于 GTID 的復(fù)制成為可能。
-
enforce_gtid_consistency = on:強(qiáng)制執(zhí)行 GTID 一致性,確保事務(wù)在不同的 MySQL 實(shí)例之間保持一致。
2.如果在Windows使用小皮
在小皮面板里設(shè)置,如圖:
打開bin日志開關(guān)
搭建Flink CDC java環(huán)境
添加maven相關(guān)pom
在pom里添加相關(guān)Flink CDC依賴
<!-- flink connector 基礎(chǔ)包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.4</version>
</dependency>
<!-- CDC mysql 源-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- Flink Steam流處理-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.4</version>
</dependency>
<!-- flink java客戶端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!-- 開啟webui支持,默認(rèn)是8081,默認(rèn)沒有開啟-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!-- Flink Table API和SQL API使得在Flink中進(jìn)行數(shù)據(jù)處理變得更加簡(jiǎn)單和高效
通過(guò)使用Table API和SQL API,可以像使用傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)一樣,通過(guò)編寫SQL語(yǔ)句或者使用類似于
Java的API進(jìn)行數(shù)據(jù)處理和分析-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.11</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
這是一段 Maven 依賴配置,它引入了 Flink Connector Base、CDC MySQL Source、Flink Streaming Java、Flink Java Client、Flink Runtime Web、Flink Table Runtime 和 Logback Classic。
這些依賴庫(kù)提供了以下功能:
-
Flink Connector Base:Flink 的連接器基礎(chǔ)包,用于將 Flink 與其他系統(tǒng)進(jìn)行集成。
-
CDC MySQL Source:Flink 的 MySQL CDC 源,用于從 MySQL 數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)流。
-
Flink Streaming Java:Flink 的 Java 流處理 API,用于編寫并發(fā)程序以處理數(shù)據(jù)流。
-
Flink Java Client:Flink 的 Java API,用于在 Java 應(yīng)用程序中使用 Flink。
-
Flink Runtime Web:Flink 的 Web UI,用于監(jiān)控和管理 Flink 集群。
-
Flink Table Runtime:Flink 的 Table API,使在 Flink 中進(jìn)行數(shù)據(jù)處理變得更加簡(jiǎn)單和高效。
-
Logback Classic:日志記錄庫(kù),用于記錄應(yīng)用程序的日志信息。
構(gòu)建Sink
Flink CDC(Change Data Capture)中的Sink用于將CDC接收到的數(shù)據(jù)寫入外部系統(tǒng)(如數(shù)據(jù)庫(kù)或文件系統(tǒng)),以實(shí)現(xiàn)數(shù)據(jù)同步和數(shù)據(jù)備份等功能,并將其轉(zhuǎn)換為DataStream流。然后,Sink將這個(gè)DataStream流寫入到外部系統(tǒng)中,以便進(jìn)行后續(xù)的數(shù)據(jù)處理和分析。
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomSink extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("json->: "+value);
}
}
這段代碼定義了一個(gè)名為CustomSink的類,它繼承自RichSinkFunction類。RichSinkFunction是Flink CDC中用于將數(shù)據(jù)寫入外部系統(tǒng)(如數(shù)據(jù)庫(kù)或文件系統(tǒng))的函數(shù)接口。CustomSink的作用是將CDC接收到的數(shù)據(jù)寫入外部系統(tǒng)中。具體實(shí)現(xiàn)方式由子類CustomSink來(lái)定義。由于這個(gè)類繼承了RichSinkFunction,因此可以使用Flink中的其他RichSink函數(shù)特性,例如設(shè)置日志級(jí)別、配置連接等,invoke則是處理函數(shù)。
main配置運(yùn)行
如下面的代碼,構(gòu)建Flink CDC連接
public static void main(String[] args) throws Exception {
MySqlSourceBuilder<String> builder = MySqlSource.builder();
MySqlSource<String> source = builder.hostname("192.168.2.6")
.port(3306)
.databaseList("mydb")
.tableList("mydb.user")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.build();
// 啟動(dòng)webui,綁定本地web-ui端口號(hào)
Configuration configuration=new Configuration();
configuration.setInteger(RestOptions.PORT,8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(5000);
env.fromSource(source, WatermarkStrategy.noWatermarks(),"MYSQL Source")
.addSink(new CustomSink());
env.execute();
}
這段代碼是使用Flink構(gòu)建一個(gè)數(shù)據(jù)流處理任務(wù),從MySQL數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)并進(jìn)行處理。
首先,使用MySqlSourceBuilder創(chuàng)建一個(gè)MySqlSource對(duì)象,并設(shè)置連接參數(shù)(hostname、port、databaseList、tableList、username和password)以及反序列化器(JsonDebeziumDeserializationSchema)。然后,創(chuàng)建Configuration對(duì)象并設(shè)置WebUI端口號(hào)(RestOptions.PORT),接著使用StreamExecutionEnvironment創(chuàng)建一個(gè)執(zhí)行環(huán)境,啟用檢查點(diǎn)(checkpointing)并將MySqlSource和自定義的Sink添加到執(zhí)行環(huán)境中。最后,執(zhí)行整個(gè)任務(wù)。
操作數(shù)據(jù)庫(kù)查看結(jié)果
如圖所示:
{
"before": null,
"after": {
"id": "1661935564737286146",
"qu_type": 4,
"level": 1,
"image": "",
"content": "dos查看日期、時(shí)間",
"create_time": 1685100091000,
"update_time": 1685100091000,
"remark": "",
"analysis": ""
},
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 0,
"snapshot": "false",
"db": "mydb",
"sequence": null,
"table": "t_user",
"server_id": 0,
"gtid": null,
"file": "",
"pos": 0,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1685521801276,
"transaction": null
}
這段代碼是Flink CDC監(jiān)聽到的MySQL數(shù)據(jù)庫(kù)binlog中的一條變更數(shù)據(jù)記錄,表示在MySQL的t_user表中發(fā)生了一次讀取操作,讀取的數(shù)據(jù)記錄的內(nèi)容為:
-
“id”: “1661935564737286146”
-
“qu_type”: 4
-
“l(fā)evel”: 1
-
“image”: “”
-
“content”: “dos查看日期、時(shí)間”
-
“create_time”: 1685100091000
-
“update_time”: 1685100091000
-
“remark”: “”
-
“analysis”: “” 其中,before字段為null,表示這是一條insert操作;after字段為變更后的數(shù)據(jù)內(nèi)容;source字段表示數(shù)據(jù)的來(lái)源信息,包括MySQL的版本、連接器類型、數(shù)據(jù)庫(kù)名稱、表名稱等;op字段表示操作類型,"r"表示讀取操作;ts_ms字段表示變更發(fā)生的時(shí)間戳;transaction字段表示事務(wù)信息,這里為null,表示這是一條非事務(wù)性的操作記錄。
操作數(shù)據(jù)JSON講解
在Flink CDC中,op字段表示MySQL數(shù)據(jù)庫(kù)binlog中的操作類型,通常情況下分為以下幾種類型:
-
“c”:表示create,表示對(duì)數(shù)據(jù)庫(kù)進(jìn)行了創(chuàng)建操作。
-
“u”:表示update,表示對(duì)數(shù)據(jù)庫(kù)進(jìn)行了更新操作。
-
“d”:表示delete,表示對(duì)數(shù)據(jù)庫(kù)進(jìn)行了刪除操作。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-515057.html
-
“r”:表示read,表示對(duì)數(shù)據(jù)庫(kù)進(jìn)行了讀取操作。 其中,前三種操作類型都是數(shù)據(jù)的變更操作,read操作則是指對(duì)數(shù)據(jù)庫(kù)進(jìn)行的查詢操作。在Flink CDC中,一般只會(huì)監(jiān)聽到前三種操作類型,因?yàn)橹挥羞@三種操作類型才會(huì)導(dǎo)致數(shù)據(jù)庫(kù)中的數(shù)據(jù)發(fā)生變化,而read操作則只是查詢數(shù)據(jù),并不會(huì)導(dǎo)致數(shù)據(jù)的變化。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-515057.html
到了這里,關(guān)于什么是Flink CDC,以及如何使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!