Canal介紹
canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費
canal可以用來監(jiān)控數據庫數據的變化,從而獲得新增數據,或者修改的數據。
canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業(yè)務需求而提出的。
阿里系公司開始逐步的嘗試基于數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務。
canal主要用途是基于 MySQL 數據庫增量日志解析,并能提供增量數據訂閱和消費,應用場景十分豐富。
目前canal主要支持mysql數據庫。
github地址:https://github.com/alibaba/canal
版本下載地址:https://github.com/alibaba/canal/releases
文檔地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal應用場景
1)、電商場景下商品、用戶實時更新同步到至Elasticsearch、solr等搜索引擎;
2)、價格、庫存發(fā)生變更實時同步到redis;
3)、數據庫異地備份、數據同步;
4)、代替使用輪詢數據庫方式來監(jiān)控數據庫變更,有效改善輪詢耗費數據庫資源。
MySQL主從復制原理
1)、MySQL master
?將數據變更寫入二進制日志(?binary log
, 其中記錄叫做二進制日志事件binary log events
,可以通過?show binlog events
?進行查看)
2)、MySQL slave
?將 master 的?binary log events
?拷貝到它的中繼日志(relay log
)
3)、MySQL slave
?重放?relay log
?中事件,將數據變更反映它自己的數據
Canal工作原理
-
canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump 協議
-
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
-
canal 解析 binary log 對象(原始為 byte 流)
Canal安裝
參考文檔:https://github.com/alibaba/canal/wiki/QuickStart
Canal配置
mq相關參數說明 (>=1.1.5版本)
在1.1.5版本開始,引入了MQ Connector設計,參數配置做了部分調整
參數名 |
參數說明 |
默認值 |
---|---|---|
canal.aliyun.accessKey |
阿里云ak |
無 |
canal.aliyun.secretKey |
阿里云sk |
無 |
canal.aliyun.uid |
阿里云uid |
無 |
canal.mq.flatMessage |
是否為json格式 如果設置為false,對應MQ收到的消息為protobuf格式 需要通過CanalMessageDeserializer進行解碼 |
false |
canal.mq.canalBatchSize |
獲取canal數據的批次大小 |
50 |
canal.mq.canalGetTimeout |
獲取canal數據的超時時間 |
100 |
canal.mq.accessChannel = local |
是否為阿里云模式,可選值local/cloud |
local |
canal.mq.database.hash |
是否開啟database混淆hash,確保不同庫的數據可以均勻分散,如果關閉可以確保只按照業(yè)務字段做MQ分區(qū)計算 |
true |
canal.mq.send.thread.size |
MQ消息發(fā)送并行度 |
30 |
canal.mq.build.thread.size |
MQ消息構建并行度 |
8 |
---|---|---|
kafka.bootstrap.servers |
kafka服務端地址 |
127.0.0.1:9092 |
kafka.acks |
kafka為 |
all |
kafka.compression.type |
壓縮類型 |
none |
kafka.batch.size |
kafka為 |
16384 |
kafka.linger.ms |
kafka為 |
1 |
kafka.max.request.size |
kafka為 |
1048576 |
kafka.buffer.memory |
kafka為 |
33554432 |
kafka.max.in.flight.requests.per.connection |
kafka為 |
1 |
kafka.retries |
發(fā)送失敗重試次數 |
0 |
kafka.kerberos.enable |
kerberos認證 |
false |
kafka.kerberos.krb5.file |
kerberos認證 |
../conf/kerberos/krb5.conf |
kafka.kerberos.jaas.file |
kerberos認證 |
../conf/kerberos/jaas.conf |
---|---|---|
rocketmq.producer.group |
rocketMQ為ProducerGroup名 |
test |
rocketmq.enable.message.trace |
是否開啟message trace |
false |
rocketmq.customized.trace.topic |
message trace的topic |
無 |
rocketmq.namespace |
rocketmq的namespace |
無 |
rocketmq.namesrv.addr |
rocketmq的namesrv地址 |
127.0.0.1:9876 |
rocketmq.retry.times.when.send.failed |
重試次數 |
0 |
rocketmq.vip.channel.enabled |
rocketmq是否開啟vip channel |
false |
rocketmq.tag |
rocketmq的tag配置 |
空值 |
---|---|---|
rabbitmq.host |
rabbitMQ配置 |
無 |
rabbitmq.virtual.host |
rabbitMQ配置 |
無 |
rabbitmq.exchange |
rabbitMQ配置 |
無 |
rabbitmq.username |
rabbitMQ配置 |
無 |
rabbitmq.password |
rabbitMQ配置 |
無 |
rabbitmq.deliveryMode |
rabbitMQ配置 |
無 |
---|---|---|
pulsarmq.serverUrl |
pulsarmq配置 |
無 |
pulsarmq.roleToken |
pulsarmq配置 |
無 |
pulsarmq.topicTenantPrefix |
pulsarmq配置 |
無 |
---|---|---|
canal.mq.topic |
mq里的topic名 |
無 |
canal.mq.dynamicTopic |
mq里的動態(tài)topic規(guī)則, 1.1.3版本支持 |
無 |
canal.mq.partition |
單隊列模式的分區(qū)下標, |
1 |
canal.mq.enableDynamicQueuePartition |
動態(tài)獲取MQ服務端的分區(qū)數,如果設置為true之后會自動根據topic獲取分區(qū)數替換canal.mq.partitionsNum的定義,目前主要適用于RocketMQ |
false |
canal.mq.partitionsNum |
散列模式的分區(qū)數 |
無 |
canal.mq.dynamicTopicPartitionNum |
mq里的動態(tài)隊列分區(qū)數,比如針對不同topic配置不同partitionsNum |
無 |
canal.mq.partitionHash |
散列規(guī)則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支持新語法,見下文 |
無 |
canal.mq.dynamicTopic 表達式說明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔
-
例子1:test\\.test 指定匹配的單表,發(fā)送到以test_test為名字的topic上
-
例子2:.*\\..* 匹配所有表,則每個表都會發(fā)送到各自表名的topic上
-
例子3:test 指定匹配對應的庫,一個庫的所有表都會發(fā)送到庫名的topic上
-
例子4:test\\..* 指定匹配的表達式,針對匹配的表會發(fā)送到各自表名的topic上
-
例子5:test,test1\\.test1,指定多個表達式,會將test庫的表都發(fā)送到test的topic上,test1\\.test1的表發(fā)送到對應的test1_test1 topic上,其余的表發(fā)送到默認的canal.mq.topic值
為滿足更大的靈活性,允許對匹配條件的規(guī)則指定發(fā)送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
-
例子1: test:test\\.test 指定匹配的單表,發(fā)送到以test為名字的topic上
-
例子2: test:.*\\..* 匹配所有表,因為有指定topic,則每個表都會發(fā)送到test的topic下
-
例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發(fā)送到test的topic下
-
例子4:testA:test\\..* 指定匹配的表達式,針對匹配的表會發(fā)送到testA的topic下
-
例子5:test0:test,test1:test1\\.test1,指定多個表達式,會將test庫的表都發(fā)送到test0的topic下,test1\\.test1的表發(fā)送到對應的test1的topic下,其余的表發(fā)送到默認的canal.mq.topic值
大家可以結合自己的業(yè)務需求,設置匹配規(guī)則,建議MQ開啟自動創(chuàng)建topic的能力
canal.mq.partitionHash 表達式說明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔
-
例子1:test\\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
-
例子2:.*\\..*:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
-
例子3:.*\\..*:$pk$ 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
-
例子4: 匹配規(guī)則啥都不寫,則默認發(fā)到0這個partition上
-
例子5:.*\\..* ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
-
?
按表hash: 一張表的所有數據可以發(fā)到同一個分區(qū),不同表之間會做散列 (會有熱點表分區(qū)過大問題)
-
-
例子6: test\\.test:id,.\\..* , 針對test的表按照id散列,其余的表按照table散列
注意:大家可以結合自己的業(yè)務需求,設置匹配規(guī)則,多條匹配規(guī)則之間是按照順序進行匹配(命中一條規(guī)則就返回)
其他詳細參數可參考Canal AdminGuide
mq順序性問題
binlog本身是有序的,寫入到mq之后如何保障順序是很多人會比較關注,在issue里也有非常多人咨詢了類似的問題,這里做一個統(tǒng)一的解答
-
1.
canal目前選擇支持的kafka/rocketmq,本質上都是基于本地文件的方式來支持了分區(qū)級的順序消息的能力,也就是binlog寫入mq是可以有一些順序性保障,這個取決于用戶的一些參數選擇
-
2.
canal支持MQ數據的幾種路由方式:單topic單分區(qū),單topic多分區(qū)、多topic單分區(qū)、多topic多分區(qū)
-
canal.mq.dynamicTopic,主要控制是否是單topic還是多topic,針對命中條件的表可以發(fā)到表名對應的topic、庫名對應的topic、默認topic name
-
canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分區(qū)以及分區(qū)的partition的路由計算,針對命中條件的可以做到按表級做分區(qū)、pk級做分區(qū)等
-
1.
canal的消費順序性,主要取決于描述2中的路由選擇,舉例說明:
-
單topic單分區(qū),可以嚴格保證和binlog一樣的順序性,缺點就是性能比較慢,單分區(qū)的性能寫入大概在2~3k的TPS
-
多topic單分區(qū),可以保證表級別的順序性,一張表或者一個庫的所有數據都寫入到一個topic的單分區(qū)中,可以保證有序性,針對熱點表也存在寫入分區(qū)的性能問題
-
單topic、多topic的多分區(qū),如果用戶選擇的是指定table的方式,那和第二部分一樣,保障的是表級別的順序性(存在熱點表寫入分區(qū)的性能問題),如果用戶選擇的是指定pk hash的方式,那只能保障的是一個pk的多次binlog順序性 ** pk hash的方式需要業(yè)務權衡,這里性能會最好,但如果業(yè)務上有pk變更或者對多pk數據有順序性依賴,就會產生業(yè)務處理錯亂的情況. 如果有pk變更,pk變更前和變更后的值會落在不同的分區(qū)里,業(yè)務消費就會有先后順序的問題,需要注意
性能表現
Kafka + 混合DML場景測試
場景 |
1個topic + 單分區(qū) |
1個topic+3分區(qū) |
2個topic+1分區(qū) |
2個topic+3分區(qū) |
---|---|---|---|---|
不開啟flatMessage |
29.6k rps (9.71k tps) |
17.54k rps (6.53k tps) |
21.6k rps (7.9k tps) |
16.8k rps (5.71k tps) |
開啟flatMessage |
11.79k rps (4.36k tps) |
15.97 rps (5.94k tps) |
11.91k rps (4.45k tps) |
16.96k rps (6.26k tps) |
Kafka + 單表的batch insert場景測試
場景 |
1個topic + 單分區(qū) |
1個topic+3分區(qū) |
---|---|---|
不開啟flatMessage |
59.6k rps |
45.1k rps |
開啟flatMessage |
51.3k rps |
49.6k rps |
RocketMQ + 混合DML場景測試
場景 |
1個topic + 單分區(qū) |
1個topic+3分區(qū) |
2個topic+1分區(qū) |
2個topic+3分區(qū) |
---|---|---|---|---|
不開啟flatMessage |
29.6k rps (10.71k tps) |
23.3k rps (8.59k tps) |
26.7k rps (9.46k tps) |
21.7k rps (7.66k tps) |
開啟flatMessage |
16.75k rps (6.17k tps) |
14.96k rps (5.55k tps) |
17.83k rps (6.63k tps) |
16.93k rps (6.26k tps) |
RocketMQ + 單表的batch insert場景測試
場景 |
1個topic + 單分區(qū) |
1個topic+3分區(qū) |
---|---|---|
不開啟flatMessage |
81.2k rps |
51.3k rps |
開啟flatMessage |
62.6k rps |
57.9k rps |
附錄:
canal官方文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
Canal+MQ性能表現:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance文章來源:http://www.zghlxwxcb.cn/news/detail-651060.html
參考文檔:https://www.cnblogs.com/zwh0910/p/17043265.html文章來源地址http://www.zghlxwxcb.cn/news/detail-651060.html
到了這里,關于Canal+Kafka實現Mysql數據同步的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!