環(huán)境說(shuō)明:
flink?1.15.2
mysql 版本5.7? ? 注意:需要開(kāi)啟binlog,因?yàn)樵隽客绞腔赽inlog捕獲數(shù)據(jù)
windows11 IDEA 本地運(yùn)行
先上官網(wǎng)使用說(shuō)明和案例:MySQL CDC Connector — Flink CDC documentation
1. mysql開(kāi)啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測(cè)試是捕獲不到binlog日志的,增量相當(dāng)于沒(méi)用,不知道是不是ndbcluster 下的binlog 配置是否有問(wèn)題,但是同一集群下,InnoDB的表就可以捕獲到binlog日志。聽(tīng)朋友說(shuō),ndbcluster 是內(nèi)存型引擎,有可能不會(huì)實(shí)時(shí)寫日志到磁盤,所以捕獲不到.....)
# 判斷MySQL是否已經(jīng)開(kāi)啟binlog? ?on? 為打開(kāi)狀態(tài)
SHOW VARIABLES LIKE 'log_bin'; ? ?# 查看MySQL的binlog模式
show global variables like "binlog%";# 查看日志開(kāi)啟狀態(tài)?
show variables like 'log_%';# 刷新log日志,立刻產(chǎn)生一個(gè)新編號(hào)的binlog日志文件,跟重啟一個(gè)效果?
flush logs;# 清空所有binlog日志?
reset master;
2. 創(chuàng)建一個(gè)用戶,賦權(quán)
CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'flink@cdc';
GRANT ALL PRIVILEGES ON *.* TO 'flink_cdc_user'@'%';
3. maven依賴:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- 此標(biāo)簽會(huì)移除jar包,當(dāng)需要打包到集群運(yùn)行時(shí)加上此標(biāo)簽-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.2</version>
<!--<scope>provided</scope>-->
<!--此標(biāo)簽會(huì)移除jar包,當(dāng)需要打包到集群運(yùn)行時(shí)加上此標(biāo)簽-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
4. 若是打包到集群運(yùn)行,相關(guān)依賴要放開(kāi) provided,這樣就不會(huì)把依賴打入到j(luò)ar包里面,就不會(huì)和flink lib里面的jar包沖突。
lib 里面需要加入的包:從官網(wǎng)下載,放入即可
flink-connector-jdbc-1.15.4.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
mysql-connector-java-8.0.29.jar
commons-cli-1.5.0.jar
5.mysql建表如下:
#mysql建表:
CREATE TABLE `user` (
? `id` int(11) NOT NULL,
? `username` varchar(255) DEFAULT NULL,
? `password` varchar(255) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE `user_sink` (
? `id` int(11) NOT NULL,
? `username` varchar(255) DEFAULT NULL,
? `password` varchar(255) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.測(cè)試demo如下:
package com.xgg.flink.stream.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToMysqlHavePrimaryKey {
public static void main(String[] args) {
//1.獲取stream的執(zhí)行環(huán)境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.創(chuàng)建表執(zhí)行環(huán)境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
String sourceTable = "CREATE TABLE mysql_cdc_source (" +
" id INT,\n" +
" username STRING,\n" +
" password STRING,\n" +
"PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'localhost',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'database-name' = 'test_cdc',\n" +
"'debezium.snapshot.mode' = 'initial',\n" +
"'table-name' = 'user'\n" +
")";
tEnv.executeSql(sourceTable);
String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
" id INT,\n" +
" username STRING,\n" +
" password STRING,\n" +
"PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'jdbc',\n" +
"'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"'url' = 'jdbc:mysql://localhost:3306/test_cdc?rewriteBatchedStatements=true',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'table-name' = 'user_sink'\n" +
")";
tEnv.executeSql(sinkTable);
tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
tEnv.executeSql("select * from mysql_cdc_source").print();
}
}
源表進(jìn)行操作,flink cdc 捕獲操作記錄進(jìn)行打印,然后插入到表中。(mysql的cdc可以一邊打印,一邊寫表,無(wú)問(wèn)題。oracle的cdc,如果有多個(gè)執(zhí)行操作,就會(huì)只執(zhí)行一個(gè),比如,先打印再寫表,oracle只能打印,寫表操作就不會(huì)觸發(fā)。如果不打印,只寫表,那就沒(méi)問(wèn)題。好像和senv.setParallelism(1);沒(méi)關(guān)系,應(yīng)該還是底層實(shí)現(xiàn)的問(wèn)題。)
user 源表和目標(biāo)表 user_sink,數(shù)據(jù)都如下。
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-691039.html
?源表和目標(biāo)表都是在Mysql有主鍵的,所以找個(gè)參數(shù)雖然是初始化操作,后面插入也是 insert into ,但是不管執(zhí)行多少遍,都不會(huì)有重復(fù)的數(shù)據(jù)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-691039.html
"'debezium.snapshot.mode' = 'initial',\n" +
?rewriteBatchedStatements=true 這個(gè)參數(shù)是開(kāi)啟批量寫,能加大寫速度。
到了這里,關(guān)于Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!