使用 flink 1.13.0 和 CDC 2.3.0 的 demo
public class TMySqlCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
Properties dbProps = new Properties();
dbProps.put("database.serverTimezone", "UTC");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.18.126")
.port(3306)
.databaseList("xn_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("xn_test.hl_t")// set captured table
.includeSchemaChanges(true)
.username("root")
.password("123456")
.debeziumProperties(dbProps)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_cdc")
.print();
env.execute();
}
}
踩坑一
Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
原因
https://github.com/ververica/flink-cdc-connectors/pull/1407
簡單講,F(xiàn)link 運行機器時區(qū)和Mysql Server 時區(qū)不匹配,database.serverTimezone
配置配置影響
具體代碼可以查看CDC com.ververica.cdc.connectors.mysql.MySqlValidator#checkTimeZone
解決辦法
手動指定下Flink 運行的時區(qū),和連接的數(shù)據(jù)庫時區(qū)信息保持一致
dbProps.put("database.serverTimezone", "UTC");
踩坑二
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder
原因
flink 為了解決包沖突,對一些通用的工具包做了shaded,傳送門flink-shaded
flink-cdc-connectors 2.3.0 版本引用了 flink 1.16.0,這個版本的flink使用了 flink-shaded-guava:30.1.1-jre-15.0版本。
而 flink 1.13.0 使用的是 flink-shaded-guava:18.0-13.0 版本,兩個版本的 shaded package 不一樣引起的
解決
那既然是 shaded 引用,在 cdc 中再次 shaded 一下,讓 cdc 里面引用到的 guava30 變?yōu)?guava18
clone cdc,基于tag release-2.3.0 創(chuàng)建分支,修改 flink-cdc-connectors 的 pom.xml 文件,引入cdc 后排除 guava 依賴。
# 編譯基于已經(jīng) release 的tag
git branch supos/release-2.3.0 release-2.3.0
git checkout supos/release-2.3.0
修改shaded配置
<!-- 在 maven-shade-plugin 插件中添加configuration -->
<relocation>
<pattern>org.apache.flink.shaded.guava30</pattern>
<shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
</relocation>
使用 mvn version 修改版本
# 要發(fā)布到公司內(nèi)部倉庫,修改為 snapshot 版本
mvn versions:set -DnewVersion=supos-2.3.0-SNAPSHOT -DgenerateBackupPoms=false
執(zhí)行編譯
# -Drat.skip=true 發(fā)布審計插件,文件需要有 license 頭,可選 [-T 8 # 多線程編譯]
mvn clean install -Drat.skip=true -DskipTests -T 8
# 將源碼打包到j(luò)ar 包,發(fā)布到內(nèi)部私有倉庫上
mvn clean source:jar install deploy -Drat.skip=true -DskipTests -T 8
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:48 min
[INFO] Finished at: 2023-03-03T14:14:05+08:00
[INFO] ------------------------------------------------------------------------
引入cdc
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
run ~
參考
maven-shaded-plugin
maven-version-plugin
maven-source-plugin
maven snapshot
maven 插件生命周期
flink-cdc 2.4.0 發(fā)布了,要適配 flink1.13,研究了一下 maven 生命周期
cdc 2.4.0 開始,有多個模塊使用了 guava30,如果要挨個模塊中配置就不太方便了,通過maven 父子模塊管理,在父模塊中統(tǒng)一處理
<!-- flink-cdc-connectors pom -->
<configuration>
<relocations>
<relocation>
<pattern>org.apache.flink.shaded.guava30</pattern>
<shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
</relocation>
</relocations>
</configuration>
注意,在子模塊中,要引用一下父工程
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>{version}</version>
</parent>
構(gòu)建
mvn clean install -Drat.skip=true -DskipTests -T 8
通過觀察,插件是有生命周期的,如果在父模塊中配置過 shade,把 guava30 -> guava18,子模塊中配置 guava30 - guava11 是不生效的,需要配置成 guava18 - > guava11 才能生效。
父模塊的配置不會影響到子模塊,有些統(tǒng)一處理的方案可以在父模塊中直接配置為全局文章來源:http://www.zghlxwxcb.cn/news/detail-596272.html
給自己定個flag,沒事了寫點東西,記錄下工作。文章來源地址http://www.zghlxwxcb.cn/news/detail-596272.html
到了這里,關(guān)于flink 1.13.x集成 CDC 2.3.0的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!