国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

這篇具有很好參考價(jià)值的文章主要介紹了37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 系列文章

一、Flink 專(zhuān)欄

Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢(xún)、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專(zhuān)欄

Flink 示例專(zhuān)欄是 Flink 專(zhuān)欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專(zhuān)欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。

兩專(zhuān)欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文詳細(xì)的介紹了Debezium 的mysql connector的部署及驗(yàn)證、示例,同時(shí)也以具體的示例展示了Flink sql client通過(guò)debezium解析cdc數(shù)據(jù)同步至kafka的使用過(guò)程。

如果需要了解更多內(nèi)容,可以在本人Flink 專(zhuān)欄中了解更新系統(tǒng)的內(nèi)容。

本文除了maven依賴(lài)外,還依賴(lài)kafka、flink、debezium。

本專(zhuān)題文章分為如下幾篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實(shí)踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-836764.html

一、Debezium Format

1、Debezium介紹

Debezium 是一個(gè) CDC(Changelog Data Capture,變更數(shù)據(jù)捕獲)的工具,可以把來(lái)自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和許多其他數(shù)據(jù)庫(kù)的更改實(shí)時(shí)流式傳輸?shù)?Kafka 中。 Debezium 為變更日志提供了統(tǒng)一的格式結(jié)構(gòu),并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持將 Debezium JSON 和 Avro 消息解析為 INSERT / UPDATE / DELETE 消息到 Flink SQL 系統(tǒng)中。在很多情況下,利用這個(gè)特性非常的有用,例如

  • 將增量數(shù)據(jù)從數(shù)據(jù)庫(kù)同步到其他系統(tǒng)
  • 日志審計(jì)
  • 數(shù)據(jù)庫(kù)的實(shí)時(shí)物化視圖
  • 關(guān)聯(lián)維度數(shù)據(jù)庫(kù)的變更歷史,等等。

Flink 還支持將 Flink SQL 中的 INSERT / UPDATE / DELETE 消息編碼為 Debezium 格式的 JSON 或 Avro 消息,輸出到 Kafka 等存儲(chǔ)中。 但需要注意的是,目前 Flink 還不支持將 UPDATE_BEFORE 和 UPDATE_AFTER 合并為一條 UPDATE 消息。因此,F(xiàn)link 將 UPDATE_BEFORE 和 UPDATE_AFTER 分別編碼為 DELETE 和 INSERT 類(lèi)型的 Debezium 消息。

2、binlog設(shè)置及驗(yàn)證

設(shè)置binlog需要監(jiān)控的數(shù)據(jù)庫(kù),本示例使用的數(shù)據(jù)庫(kù)是mysql5.7

1)、配置

本示例設(shè)置的參數(shù)參考下面的配置

[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
......

log-bin=mysql-bin  # log-bin的名稱(chēng),可以是任意名稱(chēng)
binlog-format=row  # 推薦該參數(shù),其他的參數(shù)視情況而定,比如mixed、statement
server_id=1 # mysql集群環(huán)境中不要重復(fù)
binlog_do_db=test # test是mysql的數(shù)據(jù)庫(kù)名稱(chēng),如果監(jiān)控多個(gè)數(shù)據(jù)庫(kù),可以添加多個(gè)binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....

  • STATEMENT模式(SBR)
    每一條會(huì)修改數(shù)據(jù)的sql語(yǔ)句會(huì)記錄到binlog中。優(yōu)點(diǎn)是并不需要記錄每一條sql語(yǔ)句和每一行的數(shù)據(jù)變化,減少了binlog日志量,節(jié)約IO,提高性能。缺點(diǎn)是在某些情況下會(huì)導(dǎo)致master-slave中的數(shù)據(jù)不一致(如sleep()函數(shù), last_insert_id(),以及user-defined functions(udf)等會(huì)出現(xiàn)問(wèn)題)

  • ROW模式(RBR)
    不記錄每條sql語(yǔ)句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了。而且不會(huì)出現(xiàn)某些特定情況下的存儲(chǔ)過(guò)程、或function、或trigger的調(diào)用和觸發(fā)無(wú)法被正確復(fù)制的問(wèn)題。缺點(diǎn)是會(huì)產(chǎn)生大量的日志,尤其是alter table的時(shí)候會(huì)讓日志暴漲。

  • MIXED模式(MBR)
    以上兩種模式的混合使用,一般的復(fù)制使用STATEMENT模式保存binlog,對(duì)于STATEMENT模式無(wú)法復(fù)制的操作使用ROW模式保存binlog,MySQL會(huì)根據(jù)執(zhí)行的SQL語(yǔ)句選擇日志保存方式。

2)、重啟mysql

保存配置后重啟mysql

service mysqld restart

3)、驗(yàn)證

重啟后,可以通過(guò)2個(gè)簡(jiǎn)單的方法驗(yàn)證是否設(shè)置成功。

mysql默認(rèn)的安裝目錄:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql    154 110 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql       1197 116 12:21 mysql-bin.index
.....

  • 查看mysql-bin.000001文件是否生成,且其大小為154字節(jié)。mysql-bin.000001是mysql重啟的次數(shù),重啟2次則為mysql-bin.000002
  • 在test數(shù)據(jù)庫(kù)中創(chuàng)建或添加數(shù)據(jù),mysql-bin.000001的大小是否增加

以上情況滿足,則說(shuō)明binlog配置正常

3、debezium部署及驗(yàn)證

1)、下載-mysql connector連接器

去其官網(wǎng):https://debezium.io/releases/下載需要的版本。
本示例使用的是:debezium-connector-mysql-1.7.2.Final-plugin.tar.gz

2)、解壓

創(chuàng)建解壓目錄:/usr/local/bigdata/debezium/connector
解壓


tar zxvf /usr/local/bigdata/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz -C /usr/local/bigdata/debezium/connector
## 解壓后
[alanchan@server3 connector]$ ll
總用量 4
drwxr-xr-x 2 alanchan root 4096 116 07:20 debezium-connector-mysql
[alanchan@server3 connector]$ cd debezium-connector-mysql/
[alanchan@server3 debezium-connector-mysql]$ ll
總用量 10312
-rw-rw-r-- 1 alanchan root  337864 1214 2021 antlr4-runtime-4.8.jar
-rw-rw-r-- 1 alanchan root  308966 1214 2021 CHANGELOG.md
-rw-rw-r-- 1 alanchan root   19228 1214 2021 CONTRIBUTE.md
-rw-rw-r-- 1 alanchan root    4981 1214 2021 COPYRIGHT.txt
-rw-rw-r-- 1 alanchan root   20682 1214 2021 debezium-api-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  400546 1214 2021 debezium-connector-mysql-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  886363 1214 2021 debezium-core-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 2825430 1214 2021 debezium-ddl-parser-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root    4617 1214 2021 failureaccess-1.0.1.jar
-rw-rw-r-- 1 alanchan root 2858426 1214 2021 guava-30.0-jre.jar
-rw-rw-r-- 1 alanchan root  129157 1214 2021 LICENSE-3rd-PARTIES.txt
-rw-rw-r-- 1 alanchan root   11357 1214 2021 LICENSE.txt
-rw-rw-r-- 1 alanchan root  193386 1214 2021 mysql-binlog-connector-java-0.25.3.jar
-rw-rw-r-- 1 alanchan root 2475087 1214 2021 mysql-connector-java-8.0.27.jar
-rw-rw-r-- 1 alanchan root   19520 1214 2021 README_JA.md
-rw-rw-r-- 1 alanchan root   15286 1214 2021 README.md
-rw-rw-r-- 1 alanchan root   13114 1214 2021 README_ZH.md

3)、kafka配置

因?yàn)榕渲玫氖莐afka的插件,所以需要修改kafka的插件配置,同時(shí)需要注意的是,debezium的安裝目錄需要kafka能找到。
本示例中kafka的安裝目錄:/usr/local/bigdata/kafka_2.12-3.0.0

  • 修改kafka插件配置文件connect-distributed.properties
    修改內(nèi)容如下,其他的根據(jù)情況進(jìn)行配置,否則就是默認(rèn)的
bootstrap.servers=server1:9092,server2:9092,server3:9092
group.id=connect-cluster
status.storage.replication.factor=2

plugin.path=/usr/local/bigdata/debezium/connector

根據(jù)實(shí)際的應(yīng)用環(huán)境決定是否分發(fā)該配置文件

  • 重啟kafka集群

4)、啟動(dòng)kafak的插件

需要 在部署debezium的機(jī)器上進(jìn)行此操作

#在kafka的/usr/local/bigdata/kafka_2.12-3.0.0/bin目錄下啟動(dòng)
#執(zhí)行命令:
connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties

[alanchan@server3 config]$ cd /usr/local/bigdata/kafka_2.12-3.0.0/bin
[alanchan@server3 bin]$ connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties
[alanchan@server3 bin]$ jps
8980 ConnectDistributed
9271 Jps
826 Kafka
# ConnectDistributed 進(jìn)程名稱(chēng)即為kafka插件
# 也可以通過(guò)下面的方式驗(yàn)證
[alanchan@server3 bin]$ curl server3:8083
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"dVRZjBtQQnum1bb7pu_ljg"}
# 也可以查看有哪些連接器在工作,由于當(dāng)前還未注冊(cè)任何的連接器,故為空
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
[]

5)、注冊(cè)mysql的連接器

  • 參數(shù)說(shuō)明

{
    "name": "alan-debezium-mysql-connector", // 向 Kafka Connect 服務(wù)注冊(cè)時(shí)的連接器名稱(chēng)。
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 連接器的類(lèi)名,不能修改。
        "database.hostname": "192.168.10.44", //MySQL 服務(wù)器地址
        "database.port": "3306", // MySQL 服務(wù)器端口號(hào)
        "database.user": "root", // 具有適當(dāng)權(quán)限的 MySQL 用戶
        "database.password": "123456", // MySQL 用戶的密碼
        "database.server.id": "184054", // 連接器的唯一 ID,隨便寫(xiě),但不應(yīng)該重復(fù)
        "database.server.name": "ALAN", // MySQL 服務(wù)器或集群的邏輯名稱(chēng),將來(lái)作為kafka的topic前綴
        "database.include.list": "cdctest", // 指定服務(wù)器托管的數(shù)據(jù)庫(kù)列表,多個(gè)數(shù)據(jù)庫(kù)可以用逗號(hào)分隔
        "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", // 連接器用于將 DDL 語(yǔ)句寫(xiě)入和恢復(fù)到數(shù)據(jù)庫(kù)歷史主題的 Kafka 代理列表
        "database.history.kafka.topic": "alan.historydb", // 數(shù)據(jù)庫(kù)歷史主題的名稱(chēng)。本主題僅供內(nèi)部使用,消費(fèi)者不得使用
        "include.schema.changes": "true" // 指定連接器是否應(yīng)為 DDL 更改生成事件并將它們發(fā)送到fulfillment架構(gòu)更改主題以供使用者使用的標(biāo)志
    }
}

  • 命令行執(zhí)行
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.10.43:8083/connectors/ -d {"name": "alan-debezium-mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "192.168.10.44","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "184054", "database.server.name": "ALAN", "database.include.list": "cdctest", "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", "database.history.kafka.topic": "alan.historydb","include.schema.changes": "true"}}
  • postman等工具執(zhí)行
    37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc
    執(zhí)行后,返回
    37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc
    也可以在kafka的可視化工具中查看,比如offset explorer
    37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc
    也可以通過(guò)命令行查看是否注冊(cè)成功
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
["alan-debezium-mysql-connector"]

6)、數(shù)據(jù)驗(yàn)證

啟動(dòng)成功后,debezium會(huì)將監(jiān)控 的數(shù)據(jù)庫(kù)表中的數(shù)據(jù)同步到kafka的消息隊(duì)列中。
本示例中,mysql中的原始數(shù)據(jù)如下
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc

啟動(dòng)插件成功后,kafka對(duì)應(yīng)的topic中的數(shù)據(jù)如下
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc

以上,則表示完成debezium的初步驗(yàn)證成功。

4、示例:通過(guò)Debezium CDC 將mysql數(shù)據(jù)變化輸出至kafka

以下是針對(duì)表userscoressink新增、修改和刪除的數(shù)據(jù)后kafka主題ALAN.cdctest.userscoressink的變化情況

[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic ALAN.cdctest.userscoressink --from-beginning
......
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":null,"after":{"name":"alan_test","scores":666.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717276000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4645,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1705717750512,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":666.0},"after":{"name":"alan_test","scores":888.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717298000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4931,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1705717772785,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":888.0},"after":null,"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717322000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":5234,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1705717796886,"transaction":null}}


二、Flink 與 Debezium 實(shí)踐

1、maven 依賴(lài)

為了使用Debezium格式,使用構(gòu)建自動(dòng)化工具(如Maven或SBT)的項(xiàng)目和帶有SQLJAR包的SQLClient都需要以下依賴(lài)項(xiàng)。

1)、avro

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.17.1</version>
</dependency>

2)、json

json格式是Flink 自帶的依賴(lài)包。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.17.1</version>
</dependency>

參考 Debezium 文檔,了解如何設(shè)置 Debezium Kafka Connect 用來(lái)將變更日志同步到 Kafka 主題。

2、Flink sql client 建表示例

Debezium 為變更日志提供了統(tǒng)一的格式,這是一個(gè) JSON 格式的從 MySQL userscoressink表捕獲的更新操作的簡(jiǎn)單示例:

{
	"before": {
		"name": "alan_test",
		"scores": 666.0
	},
	"after": {
		"name": "alan_test",
		"scores": 888.0
	},
	"source": {
		"version": "1.7.2.Final",
		"connector": "mysql",
		"name": "ALAN",
		"ts_ms": 1705717298000,
		"snapshot": "false",
		"db": "cdctest",
		"sequence": null,
		"table": "userscoressink",
		"server_id": 1,
		"gtid": null,
		"file": "alan_master_logbin.000004",
		"pos": 4931,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "u",
	"ts_ms": 1705717772785,
	"transaction": null
}

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一條更新事件,其中 name = alan_test的行的 scores 值從 666 更改為 888 。

此消息已同步到 Kafka 主題 ALAN.cdctest.userscoressink,則可以使用以下 DDL 來(lái)使用此主題并解析更改事件。

具體啟動(dòng)debezium參考本文的第一部分的kafka示例,其他不再贅述。下面的部分僅僅是演示debezium環(huán)境都正常后,在Flink SQL client中的操作。

-- 元數(shù)據(jù)與 MySQL "userscoressink" 表完全相同

CREATE TABLE userscoressink_debezium (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
  -- 使用 'debezium-json' format 來(lái)解析 Debezium 的 JSON 消息
  -- 如果 Debezium 用 Avro 編碼消息,請(qǐng)使用 'debezium-avro-confluent'
 'format' = 'debezium-json'  
);

重要說(shuō)明

上面的debezium的json更新事件是基于在配置kafka插件的時(shí)候設(shè)置了以下參數(shù)的情況下,默認(rèn)該參數(shù)是true,json的事件就是下面一個(gè)示例的情況。

key.converter.schemas.enable=false 
value.converter.schemas.enable=false

在某些情況下(默認(rèn)),用戶在設(shè)置 Debezium Kafka Connect 時(shí),可能會(huì)開(kāi)啟 Kafka 的配置 ‘value.converter.schemas.enable’,用來(lái)在消息體中包含 schema 信息。
然后,Debezium JSON 消息可能如下所示:

{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "sequence"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "ALAN.cdctest.userscoressink.Envelope"
	},
	"payload": {
		"before": {
			"name": "alan_test",
			"scores": 666.0
		},
		"after": {
			"name": "alan_test",
			"scores": 888.0
		},
		"source": {
			"version": "1.7.2.Final",
			"connector": "mysql",
			"name": "ALAN",
			"ts_ms": 1705717298000,
			"snapshot": "false",
			"db": "cdctest",
			"sequence": null,
			"table": "userscoressink",
			"server_id": 1,
			"gtid": null,
			"file": "alan_master_logbin.000004",
			"pos": 4931,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "u",
		"ts_ms": 1705717772785,
		"transaction": null
	}
}

為了解析這一類(lèi)信息,你需要在上述 DDL WITH 子句中添加選項(xiàng) ‘debezium-json.schema-include’ = ‘true’(默認(rèn)為 false)。通常情況下,建議不要包含 schema 的描述,因?yàn)檫@樣會(huì)使消息變得非常冗長(zhǎng),并降低解析性能。

本示例采用默認(rèn)的方式進(jìn)行展示。

CREATE TABLE userscoressink_debezium (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'true',
 'format' = 'debezium-json'  
);

Flink SQL> CREATE TABLE userscoressink_debezium (
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ALAN.cdctest.userscoressink',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'debezium-json.schema-include' = 'true',
>  'format' = 'debezium-json'  
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink_debezium;
+----+--------------------------------+--------------------------------+
| op |                           name |                         scores |
+----+--------------------------------+--------------------------------+
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          100.0 |
| +I |                    alanchanchn |                          109.0 |
| +I |                      alan_test |                          666.0 |
| -U |                      alan_test |                          666.0 |
| +U |                      alan_test |                          888.0 |
| -D |                      alan_test |                          888.0 |
| +I |               test_alanchanchn |                          199.0 |
| -U |               test_alanchanchn |                          199.0 |
| +U |               test_alanchanchn |                          299.0 |
| -D |               test_alanchanchn |                          299.0 |

在將主題注冊(cè)為 Flink 表之后,可以將 Debezium 消息用作變更日志源。

-- MySQL "userscoressink_debezium" 的實(shí)時(shí)物化視圖
-- 按name分組統(tǒng)計(jì)scores
Flink SQL> select name ,sum(scores) from userscoressink_debezium group by name;
+----+--------------------------------+--------------------------------+
| op |                           name |                         EXPR$1 |
+----+--------------------------------+--------------------------------+
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          100.0 |
| +I |                    alanchanchn |                          109.0 |
| +I |                      alan_test |                          666.0 |
| -D |                      alan_test |                          666.0 |
| +I |                      alan_test |                          888.0 |
| -D |                      alan_test |                          888.0 |
| +I |               test_alanchanchn |                          199.0 |
| -D |               test_alanchanchn |                          199.0 |
| +I |               test_alanchanchn |                          299.0 |
| -D |               test_alanchanchn |                          299.0 |

3、Available Metadata

以下格式元數(shù)據(jù)可以在表定義中公開(kāi)為只讀(VIRTUAL)列。

只有當(dāng)相應(yīng)的連接器轉(zhuǎn)發(fā)格式元數(shù)據(jù)時(shí),注意格式元數(shù)據(jù)字段才可用。截至Flink 1.17版本,只有Kafka連接器能夠公開(kāi)其值格式的元數(shù)據(jù)字段。

37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc
以下示例顯示了如何訪問(wèn)Kafka中的Debezium元數(shù)據(jù)字段:

CREATE TABLE userscoressink_debezium_meta (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'true',
 'format' = 'debezium-json'  
);

Flink SQL> CREATE TABLE userscoressink_debezium_meta (
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
>   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
>   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
>   origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ALAN.cdctest.userscoressink',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'debezium-json.schema-include' = 'true',
>  'format' = 'debezium-json'  
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink_debezium_meta;
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |               origin_ts |              event_time |                origin_database |                  origin_schema |                   origin_table |              origin_properties |                           name |                         scores |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 2024-01-20 02:17:48.975 | 2024-01-20 02:17:48.972 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                           alan |                           80.0 |
| +I | 2024-01-20 02:17:48.976 | 2024-01-20 02:17:48.976 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                       alanchan |                          100.0 |
| +I | 2024-01-20 02:17:48.977 | 2024-01-20 02:17:48.976 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                    alanchanchn |                          109.0 |
| +I | 2024-01-20 02:29:10.512 | 2024-01-20 02:21:16.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          666.0 |
| -U | 2024-01-20 02:29:32.785 | 2024-01-20 02:21:38.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          666.0 |
| +U | 2024-01-20 02:29:32.785 | 2024-01-20 02:21:38.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          888.0 |
| -D | 2024-01-20 02:29:56.886 | 2024-01-20 02:22:02.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          888.0 |
| +I | 2024-01-20 02:53:49.248 | 2024-01-20 02:45:55.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          199.0 |
| -U | 2024-01-20 02:53:55.424 | 2024-01-20 02:46:01.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          199.0 |
| +U | 2024-01-20 02:53:55.424 | 2024-01-20 02:46:01.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          299.0 |
| -D | 2024-01-20 02:53:59.522 | 2024-01-20 02:46:05.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          299.0 |
Query terminated, received a total of 11 rows

4、Format 參數(shù)

Flink 提供了 debezium-avro-confluent 和 debezium-json 兩種 format 來(lái)解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。
請(qǐng)使用 debezium-avro-confluent 來(lái)解析 Debezium 的 Avro 消息,
使用 debezium-json 來(lái)解析 Debezium 的 JSON 消息。

1)、Debezium Avro

37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc

2)、Debezium Json

37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版),# Flink專(zhuān)欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink kafka,flink cdc

5、數(shù)據(jù)類(lèi)型映射

截至Flink 1.17 版本,Debezium Format 使用 JSON Format 進(jìn)行序列化和反序列化。有關(guān)數(shù)據(jù)類(lèi)型映射的更多詳細(xì)信息,請(qǐng)參考 JSON Format 文檔 和 Confluent Avro Format 文檔。

6、注意事項(xiàng)

1)、重復(fù)的變更事件

在正常的操作環(huán)境下,Debezium 應(yīng)用能以 exactly-once 的語(yǔ)義投遞每條變更事件。在這種情況下,F(xiàn)link 消費(fèi) Debezium 產(chǎn)生的變更事件能夠工作得很好。 然而,當(dāng)有故障發(fā)生時(shí),Debezium 應(yīng)用只能保證 at-least-once 的投遞語(yǔ)義。可以查看 Debezium 官方文檔 了解更多關(guān)于 Debezium 的消息投遞語(yǔ)義。 這也意味著,在非正常情況下,Debezium 可能會(huì)投遞重復(fù)的變更事件到 Kafka 中,當(dāng) Flink 從 Kafka 中消費(fèi)的時(shí)候就會(huì)得到重復(fù)的事件。 這可能會(huì)導(dǎo)致 Flink query 的運(yùn)行得到錯(cuò)誤的結(jié)果或者非預(yù)期的異常。因此,建議在這種情況下,將作業(yè)參數(shù) table.exec.source.cdc-events-duplicate 設(shè)置成 true,并在該 source 上定義 PRIMARY KEY。 框架會(huì)生成一個(gè)額外的有狀態(tài)算子,使用該 primary key 來(lái)對(duì)變更事件去重并生成一個(gè)規(guī)范化的 changelog 流。

2)、消費(fèi) Debezium Postgres Connector 產(chǎn)生的數(shù)據(jù)

如果你正在使用 Debezium PostgreSQL Connector 捕獲變更到 Kafka,請(qǐng)確保被監(jiān)控表的 REPLICA IDENTITY 已經(jīng)被配置成 FULL 了,默認(rèn)值是 DEFAULT。 否則,F(xiàn)link SQL 將無(wú)法正確解析 Debezium 數(shù)據(jù)。

當(dāng)配置為 FULL 時(shí),更新和刪除事件將完整包含所有列的之前的值。當(dāng)為其他配置時(shí),更新和刪除事件的 “before” 字段將只包含 primary key 字段的值,或者為 null(沒(méi)有 primary key)。 你可以通過(guò)運(yùn)行 ALTER TABLE REPLICA IDENTITY FULL 來(lái)更改 REPLICA IDENTITY 的配置。 請(qǐng)閱讀 Debezium 關(guān)于 PostgreSQL REPLICA IDENTITY 的文檔 了解更多。

以上,本文詳細(xì)的介紹了Debezium 的mysql connector的部署及驗(yàn)證、示例,同時(shí)也以具體的示例展示了Flink sql client通過(guò)debezium解析cdc數(shù)據(jù)同步至kafka的使用過(guò)程。

本專(zhuān)題文章分為如下幾篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實(shí)踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

到了這里,關(guān)于37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包