国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例

這篇具有很好參考價值的文章主要介紹了37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實際的運維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點擊:Flink 系列文章匯總索引



本文詳細的介紹了debezium的部署、驗證以及通過一個示例介紹其使用。

如果需要了解更多內(nèi)容,可以在本人Flink 專欄中了解更新系統(tǒng)的內(nèi)容。

本文除了maven依賴外,還依賴kafka、flink、debezium。

本專題文章分為如下幾篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)文章來源地址http://www.zghlxwxcb.cn/news/detail-824527.html

一、Debezium Format

1、Debezium介紹

Debezium 是一個 CDC(Changelog Data Capture,變更數(shù)據(jù)捕獲)的工具,可以把來自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和許多其他數(shù)據(jù)庫的更改實時流式傳輸?shù)?Kafka 中。 Debezium 為變更日志提供了統(tǒng)一的格式結(jié)構(gòu),并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持將 Debezium JSON 和 Avro 消息解析為 INSERT / UPDATE / DELETE 消息到 Flink SQL 系統(tǒng)中。在很多情況下,利用這個特性非常的有用,例如

  • 將增量數(shù)據(jù)從數(shù)據(jù)庫同步到其他系統(tǒng)
  • 日志審計
  • 數(shù)據(jù)庫的實時物化視圖
  • 關(guān)聯(lián)維度數(shù)據(jù)庫的變更歷史,等等。

Flink 還支持將 Flink SQL 中的 INSERT / UPDATE / DELETE 消息編碼為 Debezium 格式的 JSON 或 Avro 消息,輸出到 Kafka 等存儲中。 但需要注意的是,目前 Flink 還不支持將 UPDATE_BEFORE 和 UPDATE_AFTER 合并為一條 UPDATE 消息。因此,F(xiàn)link 將 UPDATE_BEFORE 和 UPDATE_AFTER 分別編碼為 DELETE 和 INSERT 類型的 Debezium 消息。

2、binlog設(shè)置及驗證

設(shè)置binlog需要監(jiān)控的數(shù)據(jù)庫,本示例使用的數(shù)據(jù)庫是mysql5.7

1)、配置

本示例設(shè)置的參數(shù)參考下面的配置

[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
......

log-bin=mysql-bin  # log-bin的名稱,可以是任意名稱
binlog-format=row  # 推薦該參數(shù),其他的參數(shù)視情況而定,比如mixed、statement
server_id=1 # mysql集群環(huán)境中不要重復(fù)
binlog_do_db=test # test是mysql的數(shù)據(jù)庫名稱,如果監(jiān)控多個數(shù)據(jù)庫,可以添加多個binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....

  • STATEMENT模式(SBR)
    每一條會修改數(shù)據(jù)的sql語句會記錄到binlog中。優(yōu)點是并不需要記錄每一條sql語句和每一行的數(shù)據(jù)變化,減少了binlog日志量,節(jié)約IO,提高性能。缺點是在某些情況下會導(dǎo)致master-slave中的數(shù)據(jù)不一致(如sleep()函數(shù), last_insert_id(),以及user-defined functions(udf)等會出現(xiàn)問題)

  • ROW模式(RBR)
    不記錄每條sql語句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了。而且不會出現(xiàn)某些特定情況下的存儲過程、或function、或trigger的調(diào)用和觸發(fā)無法被正確復(fù)制的問題。缺點是會產(chǎn)生大量的日志,尤其是alter table的時候會讓日志暴漲。

  • MIXED模式(MBR)
    以上兩種模式的混合使用,一般的復(fù)制使用STATEMENT模式保存binlog,對于STATEMENT模式無法復(fù)制的操作使用ROW模式保存binlog,MySQL會根據(jù)執(zhí)行的SQL語句選擇日志保存方式。

2)、重啟mysql

保存配置后重啟mysql

service mysqld restart

3)、驗證

重啟后,可以通過2個簡單的方法驗證是否設(shè)置成功。

mysql默認的安裝目錄:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql    154 110 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql       1197 116 12:21 mysql-bin.index
.....

  • 查看mysql-bin.000001文件是否生成,且其大小為154字節(jié)。mysql-bin.000001是mysql重啟的次數(shù),重啟2次則為mysql-bin.000002
  • 在test數(shù)據(jù)庫中創(chuàng)建或添加數(shù)據(jù),mysql-bin.000001的大小是否增加

以上情況滿足,則說明binlog配置正常

3、debezium部署及驗證

1)、下載-mysql connector連接器

去其官網(wǎng):https://debezium.io/releases/下載需要的版本。
本示例使用的是:debezium-connector-mysql-1.7.2.Final-plugin.tar.gz

2)、解壓

創(chuàng)建解壓目錄:/usr/local/bigdata/debezium/connector
解壓


tar zxvf /usr/local/bigdata/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz -C /usr/local/bigdata/debezium/connector
## 解壓后
[alanchan@server3 connector]$ ll
總用量 4
drwxr-xr-x 2 alanchan root 4096 116 07:20 debezium-connector-mysql
[alanchan@server3 connector]$ cd debezium-connector-mysql/
[alanchan@server3 debezium-connector-mysql]$ ll
總用量 10312
-rw-rw-r-- 1 alanchan root  337864 1214 2021 antlr4-runtime-4.8.jar
-rw-rw-r-- 1 alanchan root  308966 1214 2021 CHANGELOG.md
-rw-rw-r-- 1 alanchan root   19228 1214 2021 CONTRIBUTE.md
-rw-rw-r-- 1 alanchan root    4981 1214 2021 COPYRIGHT.txt
-rw-rw-r-- 1 alanchan root   20682 1214 2021 debezium-api-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  400546 1214 2021 debezium-connector-mysql-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  886363 1214 2021 debezium-core-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 2825430 1214 2021 debezium-ddl-parser-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root    4617 1214 2021 failureaccess-1.0.1.jar
-rw-rw-r-- 1 alanchan root 2858426 1214 2021 guava-30.0-jre.jar
-rw-rw-r-- 1 alanchan root  129157 1214 2021 LICENSE-3rd-PARTIES.txt
-rw-rw-r-- 1 alanchan root   11357 1214 2021 LICENSE.txt
-rw-rw-r-- 1 alanchan root  193386 1214 2021 mysql-binlog-connector-java-0.25.3.jar
-rw-rw-r-- 1 alanchan root 2475087 1214 2021 mysql-connector-java-8.0.27.jar
-rw-rw-r-- 1 alanchan root   19520 1214 2021 README_JA.md
-rw-rw-r-- 1 alanchan root   15286 1214 2021 README.md
-rw-rw-r-- 1 alanchan root   13114 1214 2021 README_ZH.md

3)、kafka配置

因為配置的是kafka的插件,所以需要修改kafka的插件配置,同時需要注意的是,debezium的安裝目錄需要kafka能找到。
本示例中kafka的安裝目錄:/usr/local/bigdata/kafka_2.12-3.0.0

  • 修改kafka插件配置文件connect-distributed.properties
    修改內(nèi)容如下,其他的根據(jù)情況進行配置,否則就是默認的
bootstrap.servers=server1:9092,server2:9092,server3:9092
group.id=connect-cluster
status.storage.replication.factor=2

plugin.path=/usr/local/bigdata/debezium/connector

根據(jù)實際的應(yīng)用環(huán)境決定是否分發(fā)該配置文件

  • 重啟kafka集群

4)、啟動kafak的插件

需要 在部署debezium的機器上進行此操作

#在kafka的/usr/local/bigdata/kafka_2.12-3.0.0/bin目錄下啟動
#執(zhí)行命令:
connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties

[alanchan@server3 config]$ cd /usr/local/bigdata/kafka_2.12-3.0.0/bin
[alanchan@server3 bin]$ connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties
[alanchan@server3 bin]$ jps
8980 ConnectDistributed
9271 Jps
826 Kafka
# ConnectDistributed 進程名稱即為kafka插件
# 也可以通過下面的方式驗證
[alanchan@server3 bin]$ curl server3:8083
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"dVRZjBtQQnum1bb7pu_ljg"}
# 也可以查看有哪些連接器在工作,由于當(dāng)前還未注冊任何的連接器,故為空
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
[]

5)、注冊mysql的連接器

  • 參數(shù)說明

{
    "name": "alan-debezium-mysql-connector", // 向 Kafka Connect 服務(wù)注冊時的連接器名稱。
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 連接器的類名,不能修改。
        "database.hostname": "192.168.10.44", //MySQL 服務(wù)器地址
        "database.port": "3306", // MySQL 服務(wù)器端口號
        "database.user": "root", // 具有適當(dāng)權(quán)限的 MySQL 用戶
        "database.password": "123456", // MySQL 用戶的密碼
        "database.server.id": "184054", // 連接器的唯一 ID,隨便寫,但不應(yīng)該重復(fù)
        "database.server.name": "ALAN", // MySQL 服務(wù)器或集群的邏輯名稱,將來作為kafka的topic前綴
        "database.include.list": "cdctest", // 指定服務(wù)器托管的數(shù)據(jù)庫列表,多個數(shù)據(jù)庫可以用逗號分隔
        "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", // 連接器用于將 DDL 語句寫入和恢復(fù)到數(shù)據(jù)庫歷史主題的 Kafka 代理列表
        "database.history.kafka.topic": "alan.historydb", // 數(shù)據(jù)庫歷史主題的名稱。本主題僅供內(nèi)部使用,消費者不得使用
        "include.schema.changes": "true" // 指定連接器是否應(yīng)為 DDL 更改生成事件并將它們發(fā)送到fulfillment架構(gòu)更改主題以供使用者使用的標(biāo)志
    }
}

  • 命令行執(zhí)行
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.10.43:8083/connectors/ -d {"name": "alan-debezium-mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "192.168.10.44","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "184054", "database.server.name": "ALAN", "database.include.list": "cdctest", "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", "database.history.kafka.topic": "alan.historydb","include.schema.changes": "true"}}
  • postman等工具執(zhí)行
    37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink cdc,flink kafka
    執(zhí)行后,返回
    37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink cdc,flink kafka
    也可以在kafka的可視化工具中查看,比如offset explorer
    37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink cdc,flink kafka
    也可以通過命令行查看是否注冊成功
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
["alan-debezium-mysql-connector"]

6)、數(shù)據(jù)驗證

啟動成功后,debezium會將監(jiān)控 的數(shù)據(jù)庫表中的數(shù)據(jù)同步到kafka的消息隊列中。
本示例中,mysql中的原始數(shù)據(jù)如下
37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink cdc,flink kafka

啟動插件成功后,kafka對應(yīng)的topic中的數(shù)據(jù)如下
37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,flink cdc,flink kafka

以上,則表示完成debezium的初步驗證成功。

4、示例:通過Debezium CDC 將mysql數(shù)據(jù)變化輸出至kafka

以下是針對表userscoressink新增、修改和刪除的數(shù)據(jù)后kafka主題ALAN.cdctest.userscoressink的變化情況

[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic ALAN.cdctest.userscoressink --from-beginning
......
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":null,"after":{"name":"alan_test","scores":666.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717276000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4645,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1705717750512,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":666.0},"after":{"name":"alan_test","scores":888.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717298000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4931,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1705717772785,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":888.0},"after":null,"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717322000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":5234,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1705717796886,"transaction":null}}


以上,本文詳細的介紹了debezium的部署、驗證以及通過一個示例介紹其使用。

本專題文章分為如下幾篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 與Debezium 實踐
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

到了這里,關(guān)于37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署與示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包