??本篇主要探討MySQL數(shù)據(jù)同步的各類常見技術(shù)方案及優(yōu)劣勢(shì)對(duì)比分析,從而更加深層次的理解方案,進(jìn)而在后續(xù)的實(shí)際業(yè)務(wù)中,更好的選擇方案。
1 CDC概念
??CDC即Change Data Capture
,變更數(shù)據(jù)捕獲,即當(dāng)數(shù)據(jù)發(fā)生變更時(shí),能夠?qū)崟r(shí)或準(zhǔn)實(shí)時(shí)的捕獲到數(shù)據(jù)的變化,以MySQL為例,產(chǎn)生數(shù)據(jù)變更的操作有insert
,update
,delete
。CDC技術(shù)就時(shí)在數(shù)據(jù)變更時(shí),能夠以安全、可靠的方式同步給其他服務(wù)、存儲(chǔ),如mongodb、es、kafka、redis、clickhouse等。
2 CDC原理分類
??目前一些常用的組件有alibaba canal,apache flink,go-mysql-transfer等。CDC 的技術(shù)方案非常多,目前業(yè)界主流的實(shí)現(xiàn)機(jī)制可以分為兩種:
2.1 基于查詢的 CDC
- 離線調(diào)度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過查詢?nèi)カ@取表中最新的數(shù)據(jù);
- 無法保障數(shù)據(jù)一致性,查的過程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更;
- 不保障實(shí)時(shí)性,基于離線調(diào)度存在天然的延遲。
2.2 基于日志的 CDC
- 實(shí)時(shí)消費(fèi)日志,流處理,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫(kù)中的變更,可以把 binlog 文件當(dāng)作流的數(shù)據(jù)源;
- 保障數(shù)據(jù)一致性,因?yàn)?binlog 文件包含了所有歷史變更明細(xì);
- 保障實(shí)時(shí)性,因?yàn)轭愃?binlog 的日志文件是可以流式消費(fèi)的,提供的是實(shí)時(shí)數(shù)據(jù)。
3 開源方案對(duì)比
flink cdc | Debezium | Canal | Sqoop | Kettle | Oracle Goldengate | Go-mysql-transfer |
|
---|---|---|---|---|---|---|---|
CDC機(jī)制 | 日志 | 日志 | 日志 | 查詢 | 查詢 | 日志 | 日志 |
增量同步 | ? | ? | ? | ? | ? | ? | ? |
全量同步 | ? | ? | ? | ? | ? | ? | ? |
斷點(diǎn)續(xù)傳 | ? | ? | ? | ? | ? | ? | ? |
全量 + 增量 | ? | ? | ? | ? | ? | ? | ? |
架構(gòu) | 分布式 | 單機(jī) | 單機(jī) | 分布式 | 分布式 | 分布式 | 單機(jī) |
Transformation | ?????????? | ???? | ???? | ???? | ?? | ?? | ???????? |
生態(tài) | ?????????? | ?????? | ?????? | ???? | ???? | ?????? | ???? |
如上圖所示,需要根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景,決定使用哪一種開源方案。
4 使用場(chǎng)景
cdc,顧名思義,就是數(shù)據(jù)變更捕獲,其本質(zhì)是實(shí)時(shí)獲取MySQL數(shù)據(jù)變更(增刪改),進(jìn)而同步其他服務(wù)或者業(yè)務(wù)方。因此其使用場(chǎng)景主要分為:
- 數(shù)據(jù)分發(fā):將一個(gè)數(shù)據(jù)源的數(shù)據(jù)分發(fā)給多個(gè)下游業(yè)務(wù)系統(tǒng),常用于業(yè)務(wù)解耦、微服務(wù)系統(tǒng)。
- 數(shù)據(jù)采集:面向數(shù)據(jù)倉(cāng)庫(kù)、數(shù)據(jù)湖的ETL數(shù)據(jù)集成,消除數(shù)據(jù)孤島,便于后續(xù)的分析。
- 數(shù)據(jù)同步:常用于數(shù)據(jù)備份、容災(zāi)等。
5 MySQL配置
5.1 開啟MySQL的binlog
[mysqld]
default-storage-engine=INNODB
server-id = 100 (`唯一`)
port = 3306
log-bin=mysql-bin (`開啟`)
binlog_format = ROW (`注意要設(shè)置為行模式`)
開啟之后,在MySQL的數(shù)據(jù)目錄(/usr/local/mysql-8.0.32-macos13-arm64/data
),就會(huì)生成相應(yīng)的binlog文件
-rw-r----- 1 _mysql _mysql 1867 6 12 00:03 mysql-bin.000001
-rw-r----- 1 _mysql _mysql 5740 6 18 20:55 mysql-bin.000002
-rw-r----- 1 _mysql _mysql 38 6 12 00:03 mysql-bin.index
5.2 創(chuàng)建canal同步賬戶及權(quán)限設(shè)置
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;
6 Canal配置
6.1 canal同步kafka原理

?原理等同于MySQL的主從復(fù)制,具體流程:
- canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
- MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對(duì)象(原始為 byte 流)
6.2 canal安裝與配置
具體配置請(qǐng)參考文章 https://www.cnblogs.com/Clera-tea/p/16517424.html
6.2.1 配置文件
/canal/conf/canal.properties
6.2.2 同步kafka配置
canal.serverMode = kafka
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092 (本機(jī)kafka服務(wù))
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
6.2.3 binlog過濾設(shè)置
# binlog filter config
canal.instance.filter.druid.ddl = false(注意這里true 改成 false)
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
6.2.4 同步destinations設(shè)置
canal.destinations = example,mytopic(多個(gè)逗號(hào)分隔)
6.2.5 每個(gè)topic都有各自的實(shí)例配置
路徑/conf/topicname/instance.properties
設(shè)置監(jiān)聽mysql地址
canal.instance.master.address=127.0.0.1:3306
配置mysql賬戶
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
配置canal同步到kafka topic信息
canal.mq.topic=mytopic
6.2.6 kafka數(shù)據(jù)接收
1 mysql
2 zkServer start
3 kafka-server-start /opt/homebrew/etc/kafka/server.properties
4 canal/bin/startup.sh
kafka 消費(fèi)者收到的消息如下
{
"data":[
{
"id":"22",
"url":"1",
"source":"d",
"status":"1",
"created_at":"2023-06-29 00:10:31",
"updated_at":"2023-06-29 00:10:31"
}
],
"database":"finance",
"es":1687968631000,
"id":2,
"isDdl":false,
"mysqlType":{
"id":"int unsigned",
"url":"varchar(2048)",
"source":"varchar(32)",
"status":"tinyint",
"created_at":"datetime",
"updated_at":"datetime"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"url":12,
"source":12,
"status":-6,
"created_at":93,
"updated_at":93
},
"table":"f_collect",
"ts":1687968631537,
"type":"INSERT"
}
{
"data":[
{
"id":"22",
"url":"1",
"source":"d",
"status":"100",
"created_at":"2023-06-29 00:10:31",
"updated_at":"2023-06-29 00:31:39"
}
],
"database":"finance",
"es":1687969899000,
"id":3,
"isDdl":false,
"mysqlType":{
"id":"int unsigned",
"url":"varchar(2048)",
"source":"varchar(32)",
"status":"tinyint",
"created_at":"datetime",
"updated_at":"datetime"
},
"old":[
{
"status":"1",
"updated_at":"2023-06-29 00:10:31"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"url":12,
"source":12,
"status":-6,
"created_at":93,
"updated_at":93
},
"table":"f_collect",
"ts":1687969899293,
"type":"UPDATE"
}
{
"data":[
{
"id":"22",
"url":"1",
"source":"d",
"status":"100",
"created_at":"2023-06-29 00:10:31",
"updated_at":"2023-06-29 00:31:39"
}
],
"database":"finance",
"es":1687969946000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int unsigned",
"url":"varchar(2048)",
"source":"varchar(32)",
"status":"tinyint",
"created_at":"datetime",
"updated_at":"datetime"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"url":12,
"source":12,
"status":-6,
"created_at":93,
"updated_at":93
},
"table":"f_collect",
"ts":1687969946443,
"type":"DELETE"
}
7 go-mysql-transfer配置
7.1 基本說明
項(xiàng)目github地址:go-mysql-transfer
- 簡(jiǎn)單,不依賴其它組件,一鍵部署
- 集成多種接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,無需編寫客戶端,開箱即用
- 內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則、模板語法
- 支持Lua腳本擴(kuò)展,可處理復(fù)雜邏輯
- 集成Prometheus客戶端,支持監(jiān)控告警
- 集成Web Admin監(jiān)控頁面
- 支持高可用集群部署
- 數(shù)據(jù)同步失敗重試
- 支持全量數(shù)據(jù)初始化
7.2 原理
- 將自己偽裝為MySQL的
Slave
監(jiān)聽binlog
,獲取binlog的變更數(shù)據(jù) - 根據(jù)規(guī)則或者
lua腳本
解析數(shù)據(jù),生成指定格式的消息 - 將生成的消息批量發(fā)送給接收端
7.3 安裝
1、依賴Golang 1.14 及以上版本
2、設(shè)置' GO111MODULE=on '
3、拉取源碼 ' git clone https://github.com/wj596/go-mysql-transfer.git '
4、進(jìn)入目錄,執(zhí)行 ' go build ' 編譯
7.4 全量數(shù)據(jù)同步
./go-mysql-transfer -stock
7.5 配置文件app.yaml
都能看懂,不做詳細(xì)說明,主要配置項(xiàng)
1. mysql
2. target (kafka)
3. kafka配置
4. rule
4.1 數(shù)據(jù)庫(kù),表,字段
4.2 lua_file_path: lua/sync.lua 可以只配置基本的數(shù)據(jù)格式,也可以配置lua腳本來調(diào)整數(shù)據(jù)格式
4.3 kafka topic
# mysql配置
addr: 127.0.0.1:3306
user: #mysql用戶名
pass: #mysql密碼
charset : utf8
slave_id: 1001 #slave ID
flavor: mysql #mysql or mariadb,默認(rèn)mysql
#系統(tǒng)相關(guān)配置
#data_dir: D:\\transfer #應(yīng)用產(chǎn)生的數(shù)據(jù)存放地址,包括日志、緩存數(shù)據(jù)等,默認(rèn)當(dāng)前運(yùn)行目錄下store文件夾
#logger:
# level: info #日志級(jí)別;支持:debug|info|warn|error,默認(rèn)info
#maxprocs: 50 #并發(fā)協(xié)(線)程數(shù)量,默認(rèn)為: CPU核數(shù)*2;一般情況下不需要設(shè)置此項(xiàng)
#bulk_size: 1000 #每批處理數(shù)量,不寫默認(rèn)100,可以根據(jù)帶寬、機(jī)器性能等調(diào)整;如果是全量數(shù)據(jù)初始化時(shí)redis建議設(shè)為1000,其他接收端酌情調(diào)大
#prometheus相關(guān)配置
#enable_exporter: true #是否啟用prometheus exporter,默認(rèn)false
#exporter_addr: 9595 #prometheus exporter端口,默認(rèn)9595
#web admin相關(guān)配置
enable_web_admin: true #是否啟用web admin,默認(rèn)false
web_admin_port: 8060 #web監(jiān)控端口,默認(rèn)8060
#cluster: # 集群相關(guān)配置
#name: myTransfer #集群名稱,具有相同name的節(jié)點(diǎn)放入同一個(gè)集群
#bind_ip: 127.0.0.1 # 綁定的IP,如果機(jī)器有多張網(wǎng)卡(包含虛擬網(wǎng)卡)會(huì)有多個(gè)IP,使用這個(gè)屬性綁定一個(gè)
#ZooKeeper地址,多個(gè)用逗號(hào)風(fēng)格
#zk_addrs: 192.168.1.10:2181,192.168.1.11:2182,192.168.1.12:2183
#zk_authentication: 123456 #digest類型的訪問秘鑰,如:user:password,默認(rèn)為空
#etcd_addrs: 127.0.0.1:2379 #etcd連接地址,多個(gè)用逗號(hào)分隔
#etcd_user: test #etcd用戶名
#etcd_password: 123456 #etcd密碼
#目標(biāo)類型
target: kafka # 支持redis、mongodb、elasticsearch、rocketmq、kafka、rabbitmq
#redis連接配置
#redis_addrs: 127.0.0.1:6379 #redis地址,多個(gè)用逗號(hào)分隔
#redis_group_type: cluster # 集群類型 sentinel或者cluster
#redis_master_name: mymaster # Master節(jié)點(diǎn)名稱,如果group_type為sentinel則此項(xiàng)不能為空,為cluster此項(xiàng)無效
#redis_pass: 123456 #redis密碼
#redis_database: 0 #redis數(shù)據(jù)庫(kù) 0-16,默認(rèn)0。如果group_type為cluster此項(xiàng)無效
#mongodb連接配置
#mongodb_addrs: 127.0.0.1:27017 #mongodb連接地址,多個(gè)用逗號(hào)分隔
#mongodb_username: #mongodb用戶名,默認(rèn)為空
#mongodb_password: #mongodb密碼,默認(rèn)為空
#elasticsearch連接配置
#es_addrs: 127.0.0.1:9200 #連接地址,多個(gè)用逗號(hào)分隔
#es_version: 7 # Elasticsearch版本,支持6和7、默認(rèn)為7
#es_password: # 用戶名
#es_version: # 密碼
#rocketmq連接配置
#rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服務(wù)地址,多個(gè)用逗號(hào)分隔
#rocketmq_group_name: transfer_test_group #rocketmq group name,默認(rèn)為空
#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默認(rèn)為空
#rocketmq_access_key: RocketMQ #訪問控制 accessKey,默認(rèn)為空
#rocketmq_secret_key: 12345678 #訪問控制 secretKey,默認(rèn)為空
#kafka連接配置
kafka_addrs: 127.0.0.1:9092 #kafka連接地址,多個(gè)用逗號(hào)分隔
#kafka_sasl_user: #kafka SASL_PLAINTEXT認(rèn)證模式 用戶名
#kafka_sasl_password: #kafka SASL_PLAINTEXT認(rèn)證模式 密碼
#rabbitmq連接配置
#rabbitmq_addr: amqp://guest:guest@127.0.0.1:5672/ #連接字符串,如: amqp://guest:guest@localhost:5672/
#規(guī)則配置
rule:
-
schema: test #數(shù)據(jù)庫(kù)名稱
table: score #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時(shí)不能為空
#column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
column_underscore_to_camel: false #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列,多值逗號(hào)分隔,如:id,name,age,area_id 為空時(shí)表示包含全部列
include_columns: ID,name,age,sex
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號(hào)分隔,如:id,name,age,area_id 默認(rèn)為空
#column_mappings: USER_NAME=account #列名稱映射,多個(gè)映射關(guān)系用逗號(hào)分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: area_name=合肥 #默認(rèn)的列-值,多個(gè)用逗號(hào)分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
lua_file_path: lua/sync.lua #lua腳本文件,項(xiàng)目目錄創(chuàng)建lua目錄
#lua_script: #lua 腳本
value_encoder: json #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
#value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達(dá)式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值
#redis相關(guān)
redis_structure: string # 數(shù)據(jù)類型。 支持string、hash、list、set、sortedset類型(與redis的數(shù)據(jù)類型一致)
#redis_key_prefix: USER_ #key的前綴
#redis_key_column: USER_NAME #使用哪個(gè)列的值作為key,不填寫默認(rèn)使用主鍵
#redis_key_formatter: '{{.ID}}|{{.USER_NAME}}'
#redis_key_value: user #KEY的值(固定值);當(dāng)redis_structure為hash、list、set、sortedset此值不能為空
#redis_hash_field_prefix: _CARD_ #hash的field前綴,僅redis_structure為hash時(shí)起作用
#redis_hash_field_column: Cert_No #使用哪個(gè)列的值作為hash的field,僅redis_structure為hash時(shí)起作用,不填寫默認(rèn)使用主鍵
#redis_sorted_set_score_column: id #sortedset的score,當(dāng)數(shù)據(jù)類型為sortedset時(shí),此項(xiàng)不能為空,此項(xiàng)的值應(yīng)為數(shù)字類型
#mongodb相關(guān)
#mongodb_database: transfer #mongodb database不能為空
#mongodb_collection: transfer_test_topic #mongodb collection,可以為空,默認(rèn)使用表名稱
#elasticsearch相關(guān)
#es_index: user_index #Index名稱,可以為空,默認(rèn)使用表(Table)名稱
#es_mappings: #索引映射,可以為空,為空時(shí)根據(jù)數(shù)據(jù)類型自行推導(dǎo)ES推導(dǎo)
# -
# column: REMARK #數(shù)據(jù)庫(kù)列名稱
# field: remark #映射后的ES字段名稱
# type: text #ES字段類型
# analyzer: ik_smart #ES分詞器,type為text此項(xiàng)有意義
# #format: #日期格式,type為date此項(xiàng)有意義
# -
# column: USER_NAME #數(shù)據(jù)庫(kù)列名稱
# field: account #映射后的ES字段名稱
# type: keyword #ES字段類型
#rocketmq相關(guān)
#rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱
#kafka相關(guān)
kafka_topic: test #rocketmq topic,可以為空,默認(rèn)使用表名稱
#rabbitmq相關(guān)
#rabbitmq_queue: user_topic #queue名稱,可以為空,默認(rèn)使用表(Table)名稱
#reserve_raw_data: true #保留update之前的數(shù)據(jù),針對(duì)rocketmq、kafka、rabbitmq有用;默認(rèn)為false
7.6 項(xiàng)目啟動(dòng)
1. 啟動(dòng)zk(zkServer.sh)
2. 啟動(dòng)kafka (kafka-server-start.sh server.properties)
3. 啟動(dòng)go-mysql-transfer (./go-mysql-transfer)
4. 啟動(dòng)kafka消費(fèi)者(kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test)
5. 編寫簡(jiǎn)單的lua腳本,實(shí)現(xiàn)數(shù)據(jù)同步
6. 驗(yàn)證數(shù)據(jù)同步
go-mysql-transfer/lua/sync.lua腳本內(nèi)容
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local os = require("os") --加載os模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete
local id = row["id"] --獲取ID列的值
local name = row["name"]
local age = row["age"]
local sex = row["sex"]
local result = {}
local data = {}
result["timestamp"] = os.time()
result["action"] = action
data['id'] = id
data['name'] = name
data['age'] = age
data['sex'] = sex
result["object"] = data
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("test", val) -- 發(fā)送消息,參數(shù)1:topic(string類型),參數(shù)2:消息內(nèi)容
啟動(dòng)go-mysql-transfer
mysql更新數(shù)據(jù)
kafka收到的消息文章來源:http://www.zghlxwxcb.cn/news/detail-538233.html
常見問題匯總
- The Cluster ID i0yMUA_eRHuBS60eM1ph9w doesn’t match stored clusterId Some(aH
https://blog.csdn.net/m0_59252007/article/details/119533700
參考文檔
1 https://www.kancloud.cn/wj596/go-mysql-transfer/2116628
2 https://www.cnblogs.com/Clera-tea/p/16517424.html文章來源地址http://www.zghlxwxcb.cn/news/detail-538233.html
到了這里,關(guān)于MySQL CDC技術(shù)方案梳理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!