国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Canal+Kafka實現Mysql數據同步

這篇具有很好參考價值的文章主要介紹了Canal+Kafka實現Mysql數據同步。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Canal介紹

canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費

canal可以用來監(jiān)控數據庫數據的變化,從而獲得新增數據,或者修改的數據。

canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業(yè)務需求而提出的。

阿里系公司開始逐步的嘗試基于數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務。

canal主要用途是基于 MySQL 數據庫增量日志解析,并能提供增量數據訂閱和消費,應用場景十分豐富。

目前canal主要支持mysql數據庫。

github地址:https://github.com/alibaba/canal

版本下載地址:https://github.com/alibaba/canal/releases

文檔地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal應用場景

1)、電商場景下商品、用戶實時更新同步到至Elasticsearch、solr等搜索引擎;
2)、價格、庫存發(fā)生變更實時同步到redis;
3)、數據庫異地備份、數據同步;
4)、代替使用輪詢數據庫方式來監(jiān)控數據庫變更,有效改善輪詢耗費數據庫資源。

Canal+Kafka實現Mysql數據同步,kafka,mysql,分布式

MySQL主從復制原理

1)、MySQL master?將數據變更寫入二進制日志(?binary log, 其中記錄叫做二進制日志事件binary log events,可以通過?show binlog events?進行查看)
2)、MySQL slave?將 master 的?binary log events?拷貝到它的中繼日志(relay log)
3)、MySQL slave?重放?relay log?中事件,將數據變更反映它自己的數據

Canal工作原理

  • canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump 協議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

    Canal+Kafka實現Mysql數據同步,kafka,mysql,分布式

Canal安裝

參考文檔:https://github.com/alibaba/canal/wiki/QuickStart

Canal配置

mq相關參數說明 (>=1.1.5版本)

在1.1.5版本開始,引入了MQ Connector設計,參數配置做了部分調整

參數名

參數說明

默認值

canal.aliyun.accessKey

阿里云ak

canal.aliyun.secretKey

阿里云sk

canal.aliyun.uid

阿里云uid

canal.mq.flatMessage

是否為json格式 如果設置為false,對應MQ收到的消息為protobuf格式 需要通過CanalMessageDeserializer進行解碼

false

canal.mq.canalBatchSize

獲取canal數據的批次大小

50

canal.mq.canalGetTimeout

獲取canal數據的超時時間

100

canal.mq.accessChannel = local

是否為阿里云模式,可選值local/cloud

local

canal.mq.database.hash

是否開啟database混淆hash,確保不同庫的數據可以均勻分散,如果關閉可以確保只按照業(yè)務字段做MQ分區(qū)計算

true

canal.mq.send.thread.size

MQ消息發(fā)送并行度

30

canal.mq.build.thread.size

MQ消息構建并行度

8

kafka.bootstrap.servers

kafka服務端地址

127.0.0.1:9092

kafka.acks

kafka為ProducerConfig.ACKS_CONFIG

all

kafka.compression.type

壓縮類型

none

kafka.batch.size

kafka為ProducerConfig.BATCH_SIZE_CONFIG

16384

kafka.linger.ms

kafka為ProducerConfig.LINGER_MS_CONFIG?, 如果是flatMessage格式建議將該值調大, 如: 200

1

kafka.max.request.size

kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG

1048576

kafka.buffer.memory

kafka為ProducerConfig.BUFFER_MEMORY_CONFIG

33554432

kafka.max.in.flight.requests.per.connection

kafka為ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

1

kafka.retries

發(fā)送失敗重試次數

0

kafka.kerberos.enable

kerberos認證

false

kafka.kerberos.krb5.file

kerberos認證

../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file

kerberos認證

../conf/kerberos/jaas.conf

rocketmq.producer.group

rocketMQ為ProducerGroup名

test

rocketmq.enable.message.trace

是否開啟message trace

false

rocketmq.customized.trace.topic

message trace的topic

rocketmq.namespace

rocketmq的namespace

rocketmq.namesrv.addr

rocketmq的namesrv地址

127.0.0.1:9876

rocketmq.retry.times.when.send.failed

重試次數

0

rocketmq.vip.channel.enabled

rocketmq是否開啟vip channel

false

rocketmq.tag

rocketmq的tag配置

空值

rabbitmq.host

rabbitMQ配置

rabbitmq.virtual.host

rabbitMQ配置

rabbitmq.exchange

rabbitMQ配置

rabbitmq.username

rabbitMQ配置

rabbitmq.password

rabbitMQ配置

rabbitmq.deliveryMode

rabbitMQ配置

pulsarmq.serverUrl

pulsarmq配置

pulsarmq.roleToken

pulsarmq配置

pulsarmq.topicTenantPrefix

pulsarmq配置

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的動態(tài)topic規(guī)則, 1.1.3版本支持

canal.mq.partition

單隊列模式的分區(qū)下標,

1

canal.mq.enableDynamicQueuePartition

動態(tài)獲取MQ服務端的分區(qū)數,如果設置為true之后會自動根據topic獲取分區(qū)數替換canal.mq.partitionsNum的定義,目前主要適用于RocketMQ

false

canal.mq.partitionsNum

散列模式的分區(qū)數

canal.mq.dynamicTopicPartitionNum

mq里的動態(tài)隊列分區(qū)數,比如針對不同topic配置不同partitionsNum

canal.mq.partitionHash

散列規(guī)則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支持新語法,見下文

canal.mq.dynamicTopic 表達式說明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔

  • 例子1:test\\.test 指定匹配的單表,發(fā)送到以test_test為名字的topic上
  • 例子2:.*\\..* 匹配所有表,則每個表都會發(fā)送到各自表名的topic上
  • 例子3:test 指定匹配對應的庫,一個庫的所有表都會發(fā)送到庫名的topic上
  • 例子4:test\\..* 指定匹配的表達式,針對匹配的表會發(fā)送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多個表達式,會將test庫的表都發(fā)送到test的topic上,test1\\.test1的表發(fā)送到對應的test1_test1 topic上,其余的表發(fā)送到默認的canal.mq.topic值

為滿足更大的靈活性,允許對匹配條件的規(guī)則指定發(fā)送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的單表,發(fā)送到以test為名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因為有指定topic,則每個表都會發(fā)送到test的topic下
  • 例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發(fā)送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表達式,針對匹配的表會發(fā)送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多個表達式,會將test庫的表都發(fā)送到test0的topic下,test1\\.test1的表發(fā)送到對應的test1的topic下,其余的表發(fā)送到默認的canal.mq.topic值

大家可以結合自己的業(yè)務需求,設置匹配規(guī)則,建議MQ開啟自動創(chuàng)建topic的能力

canal.mq.partitionHash 表達式說明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔

  • 例子1:test\\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
  • 例子2:.*\\..*:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
  • 例子3:.*\\..*:$pk$ 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
  • 例子4: 匹配規(guī)則啥都不寫,則默認發(fā)到0這個partition上
  • 例子5:.*\\..* ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
    • ?

      按表hash: 一張表的所有數據可以發(fā)到同一個分區(qū),不同表之間會做散列 (會有熱點表分區(qū)過大問題)

  • 例子6: test\\.test:id,.\\..* , 針對test的表按照id散列,其余的表按照table散列

注意:大家可以結合自己的業(yè)務需求,設置匹配規(guī)則,多條匹配規(guī)則之間是按照順序進行匹配(命中一條規(guī)則就返回)

其他詳細參數可參考Canal AdminGuide

mq順序性問題

binlog本身是有序的,寫入到mq之后如何保障順序是很多人會比較關注,在issue里也有非常多人咨詢了類似的問題,這里做一個統(tǒng)一的解答

  1. 1.

    canal目前選擇支持的kafka/rocketmq,本質上都是基于本地文件的方式來支持了分區(qū)級的順序消息的能力,也就是binlog寫入mq是可以有一些順序性保障,這個取決于用戶的一些參數選擇

  2. 2.

    canal支持MQ數據的幾種路由方式:單topic單分區(qū),單topic多分區(qū)、多topic單分區(qū)、多topic多分區(qū)

  • canal.mq.dynamicTopic,主要控制是否是單topic還是多topic,針對命中條件的表可以發(fā)到表名對應的topic、庫名對應的topic、默認topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分區(qū)以及分區(qū)的partition的路由計算,針對命中條件的可以做到按表級做分區(qū)、pk級做分區(qū)等
  1. 1.

    canal的消費順序性,主要取決于描述2中的路由選擇,舉例說明:

  • 單topic單分區(qū),可以嚴格保證和binlog一樣的順序性,缺點就是性能比較慢,單分區(qū)的性能寫入大概在2~3k的TPS
  • 多topic單分區(qū),可以保證表級別的順序性,一張表或者一個庫的所有數據都寫入到一個topic的單分區(qū)中,可以保證有序性,針對熱點表也存在寫入分區(qū)的性能問題
  • 單topic、多topic的多分區(qū),如果用戶選擇的是指定table的方式,那和第二部分一樣,保障的是表級別的順序性(存在熱點表寫入分區(qū)的性能問題),如果用戶選擇的是指定pk hash的方式,那只能保障的是一個pk的多次binlog順序性 ** pk hash的方式需要業(yè)務權衡,這里性能會最好,但如果業(yè)務上有pk變更或者對多pk數據有順序性依賴,就會產生業(yè)務處理錯亂的情況. 如果有pk變更,pk變更前和變更后的值會落在不同的分區(qū)里,業(yè)務消費就會有先后順序的問題,需要注意

性能表現

Kafka + 混合DML場景測試

場景

1個topic + 單分區(qū)

1個topic+3分區(qū)

2個topic+1分區(qū)

2個topic+3分區(qū)

不開啟flatMessage

29.6k rps (9.71k tps)

17.54k rps (6.53k tps)

21.6k rps (7.9k tps)

16.8k rps (5.71k tps)

開啟flatMessage

11.79k rps (4.36k tps)

15.97 rps (5.94k tps)

11.91k rps (4.45k tps)

16.96k rps (6.26k tps)

Kafka + 單表的batch insert場景測試

場景

1個topic + 單分區(qū)

1個topic+3分區(qū)

不開啟flatMessage

59.6k rps

45.1k rps

開啟flatMessage

51.3k rps

49.6k rps


RocketMQ + 混合DML場景測試

場景

1個topic + 單分區(qū)

1個topic+3分區(qū)

2個topic+1分區(qū)

2個topic+3分區(qū)

不開啟flatMessage

29.6k rps (10.71k tps)

23.3k rps (8.59k tps)

26.7k rps (9.46k tps)

21.7k rps (7.66k tps)

開啟flatMessage

16.75k rps (6.17k tps)

14.96k rps (5.55k tps)

17.83k rps (6.63k tps)

16.93k rps (6.26k tps)

RocketMQ + 單表的batch insert場景測試

場景

1個topic + 單分區(qū)

1個topic+3分區(qū)

不開啟flatMessage

81.2k rps

51.3k rps

開啟flatMessage

62.6k rps

57.9k rps

附錄:

canal官方文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

Canal+MQ性能表現:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance

參考文檔:https://www.cnblogs.com/zwh0910/p/17043265.html文章來源地址http://www.zghlxwxcb.cn/news/detail-651060.html

到了這里,關于Canal+Kafka實現Mysql數據同步的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 使用canal+rocketmq實現將mysql數據同步到es

    使用canal+rocketmq實現將mysql數據同步到es

    實際開發(fā)過程中,經常遇到數據庫與緩存不一致的問題,造成這種問題的原因有很多,其中緩存數據沒有及時更新、緩存中過期的數據沒有及時更新,導致緩存中存在失效數據,導致數據庫與緩存不一致。而這種問題的出現大部分都是因為同步延遲、緩存失效、過期和錯誤使

    2024年02月11日
    瀏覽(72)
  • Debezium同步Mysql數據到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安裝好mysql,開啟binlog (1)下載kafka (2)下載mysql-connector插件 (3)編輯配置文件 (4)啟動kafka自帶的zk (5)啟動kafka (6)啟動connect (7)調用api 注意:當成功調用api,創(chuàng)建此連接器后會有如下主題產生:dbhistory.inventory、mysql1、

    2024年02月10日
    瀏覽(22)
  • 基于Canal實現Mysql數據實時同步到Elasticsearch(Docker版)

    基于Canal實現Mysql數據實時同步到Elasticsearch(Docker版)

    1、Canal簡介 ??Canal主要用途是對MySQL數據庫增量日志進行解析,提供增量數據的訂閱和消費,簡單說就是可以對MySQL的增量數據進行實時同步,支持同步到MySQL、Elasticsearch、HBase等數據存儲中去。 ??Canal會模擬MySQL主庫和從庫的交互協議,從而偽裝成MySQL的從庫,然后向My

    2024年02月10日
    瀏覽(92)
  • 本地部署Canal筆記-實現MySQL與ElasticSearch7數據同步

    本地部署Canal筆記-實現MySQL與ElasticSearch7數據同步

    本地搭建canal實現mysql數據到es的簡單的數據同步,僅供學習參考 建議首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL數據庫 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地環(huán)境為window11,大部分組件采用docker進行部署,MySQL采用8.0.27, 推薦

    2024年02月02日
    瀏覽(96)
  • cancel框架同步mysql數據到kafka

    1、下載cancel 2、修改conf文件夾下的canal.properties配置文件 3、修改conf/example文件夾下的instance.properties配置文件 在sql查詢show binary logs語句得到binlog日志 4、啟動 在bin目錄下執(zhí)行 啟動程序 注:MySQL需要創(chuàng)建新用戶

    2024年02月15日
    瀏覽(17)
  • 從 MySQL 到 DolphinDB,Debezium + Kafka 數據同步實戰(zhàn)

    從 MySQL 到 DolphinDB,Debezium + Kafka 數據同步實戰(zhàn)

    Debezium 是一個開源的分布式平臺,用于實時捕獲和發(fā)布數據庫更改事件。它可以將關系型數據庫(如 MySQL、PostgreSQL、Oracle 等)的變更事件轉化為可觀察的流數據,以供其他應用程序實時消費和處理。 本文中我們將采用 Debezium 與 Kafka 組合的方式來實現從 MySQL 到 DolphinDB 的數

    2024年02月02日
    瀏覽(26)
  • 使用finksql方式將mysql數據同步到kafka中,每次只能同步一張表

    使用finksql方式將mysql數據同步到kafka中,每次只能同步一張表

    2024年02月11日
    瀏覽(22)
  • 大數據Canal(三):使用Canal同步MySQL數據

    大數據Canal(三):使用Canal同步MySQL數據

    文章目錄 ??????使用Canal同步MySQL數據 一、Canal架構原理

    2024年02月03日
    瀏覽(25)
  • Alibaba Canal數據同步 mysql->mysql

    Alibaba Canal數據同步 mysql->mysql

    目錄 1.前言 2.什么是canal 3.canal能做什么 4.如何搭建canal 4.1首先有一個MySQL服務器 4.2 準備canal 1.下載 2.解壓 3.修改配置文件 4.啟動canal 5.Java創(chuàng)建客戶端,監(jiān)聽canalServer(官網推薦方式) 1.創(chuàng)建SpringBoot項目 略過… 2.導入canal客戶端包 3.導入測試Main方法 6.Java創(chuàng)建客戶端,GitHub推薦

    2023年04月22日
    瀏覽(20)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數據到 Elasticsearch、Kafka

    基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數據到 Elasticsearch、Kafka

    Dinky 是一個開箱即用的一站式實時計算平臺以 Apache Flink 為基礎,連接 OLAP 和數據湖等眾多框架致力于流批一體和湖倉一體的建設與實踐。本文以此為FlinkSQL可視化工具。 Flink SQL 使得使用標準 SQL 開發(fā)流式應用變得簡單,免去代碼開發(fā)。 Flink CDC 本文使用 MySQL CDC 連接器 允許從

    2024年02月16日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包