環(huán)境說明:
flink 1.15.2
mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù)
windows11 IDEA 本地運行
具體前提設(shè)置,請看這篇,包含 binlog 設(shè)置、Maven......
Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客
經(jīng)過不懈努力,終于從阿里help頁面找到了支持無主鍵同步的參數(shù):
MySQL_實時計算 Flink版-阿里云幫助中心
?然后就開始一頓模式,各種參數(shù)調(diào)試,終于達到了目的,無主鍵表實時同步,只不過在sink表關(guān)聯(lián)目標表時,要指定幾個字段為主鍵,這樣就不會有重復(fù)的覆蓋情況了,多給幾個字段作為主鍵,不就避免重復(fù)沖突了嘛。比如id+date+local等,具體看表字段。
demo如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToMysqlNonePrimaryKey {
public static void main(String[] args) {
//1.獲取stream的執(zhí)行環(huán)境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
//2.創(chuàng)建表執(zhí)行環(huán)境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);
String sourceTable = "CREATE TABLE mysql_cdc_source (" +
" id INT,\n" +
" username STRING,\n" +
" password STRING\n" +
") WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'localhost',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'database-name' = 'test_cdc',\n" +
"'debezium.snapshot.mode' = 'initial',\n" +
"'scan.incremental.snapshot.enabled' = 'false',\n" +
//如果開啟增量快照,必須設(shè)置主鍵。
//默認開啟增量快照。增量快照是一種讀取全量數(shù)據(jù)快照的新機制。與舊的快照讀取相比,增量快照有很多優(yōu)點,包括:
//讀取全量數(shù)據(jù)時,Source可以是并行讀取。
//讀取全量數(shù)據(jù)時,Source支持chunk粒度的檢查點。
//讀取全量數(shù)據(jù)時,Source不需要獲取全局讀鎖(FLUSH TABLES WITH read lock)。
//如果您希望Source支持并發(fā)讀取,每個并發(fā)的Reader需要有一個唯一的服務(wù)器ID,因此server-id必須是5400-6400這樣的范圍,并且范圍必須大于等于并發(fā)數(shù)。
"'scan.incremental.snapshot.chunk.key-column' = 'id' ,\n" +
//可以指定某一列作為快照階段切分分片的切分列。無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。
//有主鍵的表為選填,僅支持從主鍵中選擇一列。
" 'table-name' = 'user'\n" +
")";
tEnv.executeSql(sourceTable);
// tEnv.executeSql("select * from mysql_cdc_source").print();
String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
" id INT,\n" +
" username STRING,\n" +
" password STRING\n" +
" ,PRIMARY KEY (id,username,password) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'jdbc',\n" +
"'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"'url' = 'jdbc:mysql://localhost:3306/" + "test_cdc" + "?rewriteBatchedStatements=true',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'table-name' = 'user_new'\n" +
")";
tEnv.executeSql(sinkTable);
tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
}
}
由于無主鍵,?debezium.snapshot.mode' = 'initial',這個參數(shù)會導(dǎo)致,程序運行幾次,源表數(shù)據(jù)就會同步幾次到目標表,并不會去重,如果想一直這個參數(shù)運行,需要在插入前先清空表,但是如果是數(shù)據(jù)量大的,推薦還是先用這個參數(shù)同步歷史數(shù)據(jù),完成后,再改為?schema_only,啟動程序,然后把上面一個程序干掉。
上面設(shè)置的主鍵是三個字段,id、username、password,這三個字段不能為null,如果有數(shù)據(jù)為null,程序在啟動的時候,就會報錯,雖然沒有打印到控制臺上,但是可以看到控制臺程序結(jié)束了,不是一直在運行,并且數(shù)據(jù)也是同步不過去的。所以挑選主鍵字段時一定要確定此字段一定不為null,如果為null的話,就需要能接受轉(zhuǎn)換處理,比如:varchar 類型 將null值轉(zhuǎn)換為空字符串
insert into mysql_cdc_sink select case when id is null then 0 else id end,case when username is null then '' else username? end,case when password is null then '' else password end from mysql_cdc_source
具體如何處理,還看業(yè)務(wù)需求。不過,在數(shù)據(jù)同步時,盡量要做到不對數(shù)據(jù)做任何變動。如果是可以加入清洗,那就隨便玩。文章來源:http://www.zghlxwxcb.cn/news/detail-718830.html
具體數(shù)據(jù)變化時同步的情況還需自行探索。文章來源地址http://www.zghlxwxcb.cn/news/detail-718830.html
到了這里,關(guān)于Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!