Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實際的運維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點擊:Flink 系列文章匯總索引
本文詳細的介紹了debezium的部署、驗證以及通過一個示例介紹其使用。
如果需要了解更多內(nèi)容,可以在本人Flink 專欄中了解更新系統(tǒng)的內(nèi)容。
本文除了maven依賴外,還依賴kafka、flink、debezium。
本專題文章分為如下幾篇:文章來源:http://www.zghlxwxcb.cn/news/detail-824527.html
37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)文章來源地址http://www.zghlxwxcb.cn/news/detail-824527.html
一、Debezium Format
1、Debezium介紹
Debezium 是一個 CDC(Changelog Data Capture,變更數(shù)據(jù)捕獲)的工具,可以把來自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和許多其他數(shù)據(jù)庫的更改實時流式傳輸?shù)?Kafka 中。 Debezium 為變更日志提供了統(tǒng)一的格式結(jié)構(gòu),并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持將 Debezium JSON 和 Avro 消息解析為 INSERT / UPDATE / DELETE 消息到 Flink SQL 系統(tǒng)中。在很多情況下,利用這個特性非常的有用,例如
- 將增量數(shù)據(jù)從數(shù)據(jù)庫同步到其他系統(tǒng)
- 日志審計
- 數(shù)據(jù)庫的實時物化視圖
- 關(guān)聯(lián)維度數(shù)據(jù)庫的變更歷史,等等。
Flink 還支持將 Flink SQL 中的 INSERT / UPDATE / DELETE 消息編碼為 Debezium 格式的 JSON 或 Avro 消息,輸出到 Kafka 等存儲中。 但需要注意的是,目前 Flink 還不支持將 UPDATE_BEFORE 和 UPDATE_AFTER 合并為一條 UPDATE 消息。因此,F(xiàn)link 將 UPDATE_BEFORE 和 UPDATE_AFTER 分別編碼為 DELETE 和 INSERT 類型的 Debezium 消息。
2、binlog設(shè)置及驗證
設(shè)置binlog需要監(jiān)控的數(shù)據(jù)庫,本示例使用的數(shù)據(jù)庫是mysql5.7
1)、配置
本示例設(shè)置的參數(shù)參考下面的配置
[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
......
log-bin=mysql-bin # log-bin的名稱,可以是任意名稱
binlog-format=row # 推薦該參數(shù),其他的參數(shù)視情況而定,比如mixed、statement
server_id=1 # mysql集群環(huán)境中不要重復(fù)
binlog_do_db=test # test是mysql的數(shù)據(jù)庫名稱,如果監(jiān)控多個數(shù)據(jù)庫,可以添加多個binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....
-
STATEMENT模式(SBR)
每一條會修改數(shù)據(jù)的sql語句會記錄到binlog中。優(yōu)點是并不需要記錄每一條sql語句和每一行的數(shù)據(jù)變化,減少了binlog日志量,節(jié)約IO,提高性能。缺點是在某些情況下會導(dǎo)致master-slave中的數(shù)據(jù)不一致(如sleep()函數(shù), last_insert_id(),以及user-defined functions(udf)等會出現(xiàn)問題) -
ROW模式(RBR)
不記錄每條sql語句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了。而且不會出現(xiàn)某些特定情況下的存儲過程、或function、或trigger的調(diào)用和觸發(fā)無法被正確復(fù)制的問題。缺點是會產(chǎn)生大量的日志,尤其是alter table的時候會讓日志暴漲。 -
MIXED模式(MBR)
以上兩種模式的混合使用,一般的復(fù)制使用STATEMENT模式保存binlog,對于STATEMENT模式無法復(fù)制的操作使用ROW模式保存binlog,MySQL會根據(jù)執(zhí)行的SQL語句選擇日志保存方式。
2)、重啟mysql
保存配置后重啟mysql
service mysqld restart
3)、驗證
重啟后,可以通過2個簡單的方法驗證是否設(shè)置成功。
mysql默認的安裝目錄:cd /var/lib/mysql
[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql 154 1月 10 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql 1197 1月 16 12:21 mysql-bin.index
.....
- 查看mysql-bin.000001文件是否生成,且其大小為154字節(jié)。mysql-bin.000001是mysql重啟的次數(shù),重啟2次則為mysql-bin.000002
- 在test數(shù)據(jù)庫中創(chuàng)建或添加數(shù)據(jù),mysql-bin.000001的大小是否增加
以上情況滿足,則說明binlog配置正常
3、debezium部署及驗證
1)、下載-mysql connector連接器
去其官網(wǎng):https://debezium.io/releases/下載需要的版本。
本示例使用的是:debezium-connector-mysql-1.7.2.Final-plugin.tar.gz
2)、解壓
創(chuàng)建解壓目錄:/usr/local/bigdata/debezium/connector
解壓
tar zxvf /usr/local/bigdata/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz -C /usr/local/bigdata/debezium/connector
## 解壓后
[alanchan@server3 connector]$ ll
總用量 4
drwxr-xr-x 2 alanchan root 4096 1月 16 07:20 debezium-connector-mysql
[alanchan@server3 connector]$ cd debezium-connector-mysql/
[alanchan@server3 debezium-connector-mysql]$ ll
總用量 10312
-rw-rw-r-- 1 alanchan root 337864 12月 14 2021 antlr4-runtime-4.8.jar
-rw-rw-r-- 1 alanchan root 308966 12月 14 2021 CHANGELOG.md
-rw-rw-r-- 1 alanchan root 19228 12月 14 2021 CONTRIBUTE.md
-rw-rw-r-- 1 alanchan root 4981 12月 14 2021 COPYRIGHT.txt
-rw-rw-r-- 1 alanchan root 20682 12月 14 2021 debezium-api-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 400546 12月 14 2021 debezium-connector-mysql-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 886363 12月 14 2021 debezium-core-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 2825430 12月 14 2021 debezium-ddl-parser-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 4617 12月 14 2021 failureaccess-1.0.1.jar
-rw-rw-r-- 1 alanchan root 2858426 12月 14 2021 guava-30.0-jre.jar
-rw-rw-r-- 1 alanchan root 129157 12月 14 2021 LICENSE-3rd-PARTIES.txt
-rw-rw-r-- 1 alanchan root 11357 12月 14 2021 LICENSE.txt
-rw-rw-r-- 1 alanchan root 193386 12月 14 2021 mysql-binlog-connector-java-0.25.3.jar
-rw-rw-r-- 1 alanchan root 2475087 12月 14 2021 mysql-connector-java-8.0.27.jar
-rw-rw-r-- 1 alanchan root 19520 12月 14 2021 README_JA.md
-rw-rw-r-- 1 alanchan root 15286 12月 14 2021 README.md
-rw-rw-r-- 1 alanchan root 13114 12月 14 2021 README_ZH.md
3)、kafka配置
因為配置的是kafka的插件,所以需要修改kafka的插件配置,同時需要注意的是,debezium的安裝目錄需要kafka能找到。
本示例中kafka的安裝目錄:/usr/local/bigdata/kafka_2.12-3.0.0
- 修改kafka插件配置文件connect-distributed.properties
修改內(nèi)容如下,其他的根據(jù)情況進行配置,否則就是默認的
bootstrap.servers=server1:9092,server2:9092,server3:9092
group.id=connect-cluster
status.storage.replication.factor=2
plugin.path=/usr/local/bigdata/debezium/connector
根據(jù)實際的應(yīng)用環(huán)境決定是否分發(fā)該配置文件
- 重啟kafka集群
4)、啟動kafak的插件
需要 在部署debezium的機器上進行此操作
#在kafka的/usr/local/bigdata/kafka_2.12-3.0.0/bin目錄下啟動
#執(zhí)行命令:
connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties
[alanchan@server3 config]$ cd /usr/local/bigdata/kafka_2.12-3.0.0/bin
[alanchan@server3 bin]$ connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties
[alanchan@server3 bin]$ jps
8980 ConnectDistributed
9271 Jps
826 Kafka
# ConnectDistributed 進程名稱即為kafka插件
# 也可以通過下面的方式驗證
[alanchan@server3 bin]$ curl server3:8083
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"dVRZjBtQQnum1bb7pu_ljg"}
# 也可以查看有哪些連接器在工作,由于當(dāng)前還未注冊任何的連接器,故為空
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
[]
5)、注冊mysql的連接器
- 參數(shù)說明
{
"name": "alan-debezium-mysql-connector", // 向 Kafka Connect 服務(wù)注冊時的連接器名稱。
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector", // 連接器的類名,不能修改。
"database.hostname": "192.168.10.44", //MySQL 服務(wù)器地址
"database.port": "3306", // MySQL 服務(wù)器端口號
"database.user": "root", // 具有適當(dāng)權(quán)限的 MySQL 用戶
"database.password": "123456", // MySQL 用戶的密碼
"database.server.id": "184054", // 連接器的唯一 ID,隨便寫,但不應(yīng)該重復(fù)
"database.server.name": "ALAN", // MySQL 服務(wù)器或集群的邏輯名稱,將來作為kafka的topic前綴
"database.include.list": "cdctest", // 指定服務(wù)器托管的數(shù)據(jù)庫列表,多個數(shù)據(jù)庫可以用逗號分隔
"database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", // 連接器用于將 DDL 語句寫入和恢復(fù)到數(shù)據(jù)庫歷史主題的 Kafka 代理列表
"database.history.kafka.topic": "alan.historydb", // 數(shù)據(jù)庫歷史主題的名稱。本主題僅供內(nèi)部使用,消費者不得使用
"include.schema.changes": "true" // 指定連接器是否應(yīng)為 DDL 更改生成事件并將它們發(fā)送到fulfillment架構(gòu)更改主題以供使用者使用的標(biāo)志
}
}
- 命令行執(zhí)行
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.10.43:8083/connectors/ -d {"name": "alan-debezium-mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "192.168.10.44","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "184054", "database.server.name": "ALAN", "database.include.list": "cdctest", "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", "database.history.kafka.topic": "alan.historydb","include.schema.changes": "true"}}
- postman等工具執(zhí)行
執(zhí)行后,返回
也可以在kafka的可視化工具中查看,比如offset explorer
也可以通過命令行查看是否注冊成功
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
["alan-debezium-mysql-connector"]
6)、數(shù)據(jù)驗證
啟動成功后,debezium會將監(jiān)控 的數(shù)據(jù)庫表中的數(shù)據(jù)同步到kafka的消息隊列中。
本示例中,mysql中的原始數(shù)據(jù)如下
啟動插件成功后,kafka對應(yīng)的topic中的數(shù)據(jù)如下
以上,則表示完成debezium的初步驗證成功。
4、示例:通過Debezium CDC 將mysql數(shù)據(jù)變化輸出至kafka
以下是針對表userscoressink新增、修改和刪除的數(shù)據(jù)后kafka主題ALAN.cdctest.userscoressink的變化情況
[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic ALAN.cdctest.userscoressink --from-beginning
......
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":null,"after":{"name":"alan_test","scores":666.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717276000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4645,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1705717750512,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":666.0},"after":{"name":"alan_test","scores":888.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717298000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4931,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1705717772785,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":888.0},"after":null,"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717322000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":5234,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1705717796886,"transaction":null}}
以上,本文詳細的介紹了debezium的部署、驗證以及通過一個示例介紹其使用。
本專題文章分為如下幾篇:
37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)
到了這里,關(guān)于37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!