前言
前幾天在網(wǎng)上沖浪的時候發(fā)現(xiàn)了一個比較成熟的開源中間件——?Canal?。在了解了它的工作原理和使用場景后,頓時產(chǎn)生了濃厚的興趣。今天,就讓我們跟隨我的腳步,一起來揭開它神秘的面紗吧。
目錄
前言
簡介?
工作原理?
MySQL主備復(fù)制原理
canal 工作原理
Canal架構(gòu)?
Canal-HA機制?
應(yīng)用場景?
同步緩存 Redis /全文搜索 ES
下發(fā)任務(wù)
數(shù)據(jù)異構(gòu)
MySQL 配置?
開啟 binlog
擴展
statement
row
mixed
配置權(quán)限
Canal 配置?
配置
啟動
報錯
解決
實戰(zhàn)?
引入依賴
代碼樣例
測試
簡介?
canal?翻譯為管道,主要用途是基于 MySQL 數(shù)據(jù)庫的增量日志 Binlog 解析,提供增量數(shù)據(jù)訂閱和消費。
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業(yè)務(wù)需求,實現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費業(yè)務(wù)。
基于日志增量訂閱和消費的業(yè)務(wù)包括
-
數(shù)據(jù)庫鏡像;
-
數(shù)據(jù)庫實時備份;
-
索引構(gòu)建和實時維護(拆分異構(gòu)索引、倒排索引等);
-
業(yè)務(wù) cache 刷新;
-
帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理;
當(dāng)前的 canal 支持源端 MySQL 的版本包括 5.1.x,5.5.x,5.6.x,5.7.x,8.0.x。
工作原理?
MySQL主備復(fù)制原理
-
MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件 binary log events,可以通過 show binlog events 進(jìn)行查看);
-
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log);
-
MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù);
canal 工作原理
-
canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump 協(xié)議;
-
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );
-
canal 解析 binary log 對象(原始為 byte 流);
github地址:https://github.com/alibaba/canal
完整wiki地址:https://github.com/alibaba/canal/wiki
Canal架構(gòu)?
一個 server 代表一個 canal 運行實例,對應(yīng)于一個 jvm,一個 instance 對應(yīng)一個數(shù)據(jù)隊列。
instance模塊:
-
eventParser :數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進(jìn)行交互,協(xié)議解析;
-
eventSink :Parser 和 Store 鏈接器,進(jìn)行數(shù)據(jù)過濾、加工、分發(fā)的工作;
-
eventStore :數(shù)據(jù)存儲;
-
metaManager :增量訂閱&消費信息管理器;
instance 是 canal 數(shù)據(jù)同步的核心,在一個 canal 實例中只有啟動 instace 才能進(jìn)行數(shù)據(jù)的同步任務(wù)。一個 canal server 實例中可以創(chuàng)建多個 Canal Instance 實例。每一個 Canal Instance 可以看成是對應(yīng)一個 MySQL 實例。
Canal-HA機制?
所謂 HA 即高可用,是 High Available 的簡稱。通常我們一個服務(wù)要支持高可用都需要借助于第三方的分布式同步協(xié)調(diào)服務(wù),最常用的是zookeeper 。canal 實現(xiàn)高可用,也是依賴了zookeeper 的幾個特性:watcher 和 EPHEMERAL 節(jié)點。
canal 的高可用分為兩部分:canal server 和 canal client
-
canal server: 為了減少對 mysql dump 的請求,不同 server 上的 instance(不同 server 上的相同 instance)要求同一時間只能有一個處于 running,其他的處于 standby 狀態(tài),也就是說,只會有一個 canal server 的 instance 處于 active 狀態(tài),但是當(dāng)這個 instance down 掉后會重新選出一個 canal server。
-
canal client: 為了保證有序性,一份 instance 同一時間只能由一個 canal client 進(jìn)行 get/ack/rollback 操作,否則客戶端接收無法保證有序。
server ha 的架構(gòu)圖如下:
大致步驟:
-
canal server 要啟動某個 canal instance 時都先向 zookeeper 進(jìn)行一次嘗試啟動判斷(實現(xiàn):創(chuàng)建 EPHEMERAL 節(jié)點,誰創(chuàng)建成功就允許誰啟動);
-
創(chuàng)建 zookeeper 節(jié)點成功后,對應(yīng)的 canal server 就啟動對應(yīng)的 canal instance,沒有創(chuàng)建成功的 canal instance 就會處于 standby 狀態(tài)。
-
一旦 zookeeper 發(fā)現(xiàn) canal server A 創(chuàng)建的 instance 節(jié)點消失后,立即通知其他的 canal server 再次進(jìn)行步驟1的操作,重新選出一個 canal server 啟動 instance。
-
canal client 每次進(jìn)行 connect 時,會首先向 zookeeper 詢問當(dāng)前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試 connect。
Canal Client 的方式和 canal server 方式類似,也是利用 zookeeper 的搶占 EPHEMERAL 節(jié)點的方式進(jìn)行控制。
應(yīng)用場景?
同步緩存 Redis /全文搜索 ES
當(dāng)數(shù)據(jù)庫變更后通過 binlog 進(jìn)行緩存/ES的增量更新。當(dāng)緩存/ES更新出現(xiàn)問題時,應(yīng)該回退 binlog 到過去某個位置進(jìn)行重新同步,并提供全量刷新緩存/ES的方法。
下發(fā)任務(wù)
當(dāng)數(shù)據(jù)變更時需要通知其他依賴系統(tǒng)。其原理是任務(wù)系統(tǒng)監(jiān)聽數(shù)據(jù)庫變更,然后將變更的數(shù)據(jù)寫入 MQ/kafka 進(jìn)行任務(wù)下發(fā),比如商品數(shù)據(jù)變更后需要通知商品詳情頁、列表頁、搜索頁等相關(guān)系統(tǒng)。
這種方式可以保證數(shù)據(jù)下發(fā)的精確性,通過 MQ 發(fā)送消息通知變更緩存是無法做到這一點的,而且業(yè)務(wù)系統(tǒng)中不會散落著各種下發(fā) MQ 的代碼,從而實現(xiàn)了下發(fā)歸集。
數(shù)據(jù)異構(gòu)
在大型網(wǎng)站架構(gòu)中,DB都會采用分庫分表來解決容量和性能問題。但分庫分表之后帶來的新問題,比如不同維度的查詢或者聚合查詢,此時就會非常棘手。一般我們會通過數(shù)據(jù)異構(gòu)機制來解決此問題。
所謂的數(shù)據(jù)異構(gòu),那就是將需要 join 查詢的多表按照某一個維度又聚合在一個 DB 中讓你去查詢,canal 就是實現(xiàn)數(shù)據(jù)異構(gòu)的手段之一。
MySQL 配置?
開啟 binlog
首先在 mysql 的配置文件目錄中查找配置文件 my.cnf(Linux環(huán)境)
[root@iZ2zebiempwqvoc2xead5lZ?mysql]#?find?/?-name?my.cnf
/etc/my.cnf
[root@iZ2zebiempwqvoc2xead5lZ?mysql]#?cd?/etc
[root@iZ2zebiempwqvoc2xead5lZ?etc]#?vim?my.cnf
在 [mysqld] 區(qū)塊下添加配置開啟 binlog
server-id=1?#master端的ID號【必須是唯一的】;
log_bin=mysql-bin?#同步的日志路徑,一定注意這個目錄要是mysql有權(quán)限寫入的
binlog-format=row?#行級,記錄每次操作后每行記錄的變化。
binlog-do-db=cheetah?#指定庫,縮小監(jiān)控的范圍。
重啟 mysql:service mysqld restart,會發(fā)現(xiàn)在 /var/lib/mysql 下會生成兩個文件 mysql-bin.000001 和 mysql-bin.index,當(dāng) mysql 重啟或到達(dá)單個文件大小的閾值時,新生一個文件,按順序編號 mysql-bin.000002,以此類推。
擴展
binlog 日志有三種格式,可以通過?binlog_format?
參數(shù)指定。
statement
記錄的內(nèi)容是?SQL語句?原文,比如執(zhí)行一條?update T set update_time=now() where id=1?
,記錄的內(nèi)容如下
同步數(shù)據(jù)時,會執(zhí)行記錄的 SQL 語句,但是有個問題,update_time=now() 這里會獲取當(dāng)前?系統(tǒng)時間?,直接執(zhí)行會導(dǎo)致與原庫的數(shù)據(jù)?不一致?。
row
為了解決上述問題,我們需要指定為 row,記錄的內(nèi)容不再是簡單的 SQL 語句了,還包含操作的具體數(shù)據(jù),記錄內(nèi)容如下。
row 格式記錄的內(nèi)容看不到詳細(xì)信息,要通過 mysql binlog 工具解析出來。
update_time=now()?
變成了具體的時間?update_time=1627112756247?
,條件后面的 @1、@2、@3 都是該行數(shù)據(jù)第1個~3個字段的原始值(假設(shè)這張表只有3個字段)。
這樣就能保證同步數(shù)據(jù)的一致性,通常情況下都是指定為 row,這樣可以為數(shù)據(jù)庫的恢復(fù)與同步帶來更好的可靠性。
缺點:占空間、恢復(fù)與同步時消耗更多的IO資源,影響執(zhí)行速度。
mixed
MySQL 會判斷這條 SQL 語句是否可能引起數(shù)據(jù)不一致,如果是,就用 row 格式,否則就用 statement 格式。
配置權(quán)限
CREATE?USER?canal?IDENTIFIED?BY?'XXXX';???#創(chuàng)建用戶名和密碼都為?canal?的用戶
GRANT?SELECT,?REPLICATION?SLAVE,?REPLICATION?CLIENT?ON?*.*?TO?'canal'@'%';?#授予該用戶對所有數(shù)據(jù)庫和表的查詢、復(fù)制主節(jié)點數(shù)據(jù)的操作權(quán)限
FLUSH?PRIVILEGES;?#重新加載權(quán)限
注意:如果密碼設(shè)置的過于簡單,會報以下錯誤
ERROR?1819?(HY000):?Your?password?does?not?satisfy?the?current?policy?requirements
MySQL 有密碼設(shè)置的規(guī)范,可以自行百度??。
Canal 配置?
官網(wǎng)下載地址,我下載的版本是?canal.deployer-1.1.6.tar.gz?
,然后通過 psftp 上傳到服務(wù)器。
解壓:?tar -zxvf canal.deployer-1.1.6.tar.gz
配置
通過查看?conf/canal.properties?
配置,發(fā)現(xiàn)需要暴漏三個端口
canal.admin.port?=?11110
canal.port?=?11111
canal.metrics.pull.port?=?11112
修改?conf/canal.properties?
配置
#?指定實例,多個實例使用逗號分隔:?canal.destinations?=?example1,example2
canal.destinations?=?example
修改?conf/example/instance.properties?
實例配置
#?配置?slaveId?自定義,不等于?mysql?的?server?Id?即可
canal.instance.mysql.slaveId=10?
#?數(shù)據(jù)庫地址:自己的數(shù)據(jù)庫ip+端口
canal.instance.master.address=127.0.0.1:3306?
?
#?數(shù)據(jù)庫用戶名和密碼?
canal.instance.dbUsername=xxx?
canal.instance.dbPassword=xxx
#代表數(shù)據(jù)庫的編碼方式對應(yīng)到?java?中的編碼類型,比如?UTF-8,GBK?,?ISO-8859-1
canal.instance.connectionCharset?=?UTF-8
?
#?指定庫和表,這里的?.*?表示?canal.instance.master.address?下面的所有數(shù)據(jù)庫
canal.instance.filter.regex=.*\\..*
如果系統(tǒng)是1個 cpu,需要將?
canal.instance.parser.parallel?
設(shè)置為 false
啟動
需要在安裝目錄?/usr/local?
下執(zhí)行:?sh bin/startup.sh?
或者?./bin/startup.sh?
。
報錯
發(fā)現(xiàn)在 logs 下沒有生成 canal.log 日志,在進(jìn)程命令中?ps -ef | grep canal?
也查不到 canal 的進(jìn)程。
解決
在目錄 logs 中存在文件 canal_stdout.log ,文件內(nèi)容如下:
報錯信息提示內(nèi)存不足,Java 運行時環(huán)境無法繼續(xù)。更詳細(xì)的錯誤日志在文件:?/usr/local/bin/hs_err_pid25186.log?
中。
既然是內(nèi)存原因,那就檢查一下自己的內(nèi)存,執(zhí)行命令?free -h?
,發(fā)現(xiàn)可用內(nèi)存僅為 96M,應(yīng)該是內(nèi)存問題,解決方法如下:
-
殺死運行的一些進(jìn)程;
-
增加虛擬機的內(nèi)存;
-
修改 canal 啟動時所需要的內(nèi)存;
我就是用的第三種方法,首先用 vim 打開 startup.sh 修改內(nèi)存參數(shù),可以對照我的進(jìn)行修改,按照自己服務(wù)器剩余內(nèi)存進(jìn)行修改,這里我將內(nèi)存調(diào)整到了 80M。
改為?-server -Xms80m -Xmx80m -Xmn80m -XX:SurvivorRatio=2 -XX:PermSize=66m -XX:MaxPermSize=80m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError
改完之后執(zhí)行命令發(fā)現(xiàn)依舊報錯:?found canal.pid , Please run stop.sh first ,then startup.sh?
意思是找到了 canal.pid,請先運行stop.sh。
這是由于 canal 服務(wù)不正常退出服務(wù)導(dǎo)致的,比如說虛擬機強制重啟。
執(zhí)行 stop.sh 命令后重新啟動,成功運行,成功運行后可以在 canal/logs 文件夾中生成 canal.log 日志。
實戰(zhàn)?
引入依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
代碼樣例
代碼樣例來自官網(wǎng),僅用于測試使用
public?class?SimpleCanalClientExample?{
????public?static?void?main(String?args[])?{
????????//?創(chuàng)建鏈接:換成自己的數(shù)據(jù)庫ip地址
????????CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("127.0.0.1",
????????????????11111),?"example",?"",?"");
????????int?batchSize?=?1000;
????????int?emptyCount?=?0;
????????try?{
????????????connector.connect();
????????????connector.subscribe(".*\\..*");
????????????connector.rollback();
????????????int?totalEmptyCount?=?120;
????????????while?(emptyCount?<?totalEmptyCount)?{
????????????????Message?message?=?connector.getWithoutAck(batchSize);?//?獲取指定數(shù)量的數(shù)據(jù)
????????????????long?batchId?=?message.getId();
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{
????????????????????emptyCount++;
????????????????????System.out.println("empty?count?:?"?+?emptyCount);
????????????????????try?{
????????????????????????Thread.sleep(1000);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????}
????????????????}?else?{
????????????????????emptyCount?=?0;
????????????????????printEntry(message.getEntries());
????????????????}
????????????????connector.ack(batchId);?//?提交確認(rèn)
????????????}
????????????System.out.println("empty?too?many?times,?exit");
????????}?finally?{
????????????connector.disconnect();
????????}
????}
????private?static?void?printEntry(List<CanalEntry.Entry>?entrys)?{
????????for?(CanalEntry.Entry?entry?:?entrys)?{
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}
????????????CanalEntry.RowChange?rowChage?=?null;
????????????try?{
????????????????rowChage?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(Exception?e)?{
????????????????throw?new?RuntimeException("ERROR?##?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),
????????????????????????e);
????????????}
????????????CanalEntry.EventType?eventType?=?rowChage.getEventType();
????????????System.out.println(String.format("================>?binlog[%s:%s]?,?name[%s,%s]?,?eventType?:?%s",
????????????????????entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),
????????????????????entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),
????????????????????eventType));
????????????for?(CanalEntry.RowData?rowData?:?rowChage.getRowDatasList())?{
????????????????if?(eventType?==?CanalEntry.EventType.DELETE)?{
????????????????????printColumn(rowData.getBeforeColumnsList());
????????????????}?else?if?(eventType?==?CanalEntry.EventType.INSERT)?{
????????????????????printColumn(rowData.getAfterColumnsList());
????????????????}?else?{
????????????????????System.out.println("------->?before");
????????????????????printColumn(rowData.getBeforeColumnsList());
????????????????????System.out.println("------->?after");
????????????????????printColumn(rowData.getAfterColumnsList());
????????????????}
????????????}
????????}
????}
????private?static?void?printColumn(List<CanalEntry.Column>?columns)?{
????????for?(CanalEntry.Column?column?:?columns)?{
????????????System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());
????????}
????}
}
測試
啟動項目,打印日志
empty?count?:?1
empty?count?:?2
empty?count?:?3
empty?count?:?4
手動修改數(shù)據(jù)庫中的字段:文章來源:http://www.zghlxwxcb.cn/news/detail-828559.html
================>?binlog[mysql-bin.000002:8377]?,?name[cheetah,product_info]?,?eventType?:?UPDATE
------->?before
id?:?3????update=false
name?:?java開發(fā)1????update=false
price?:?87.0????update=false
create_date?:?2021-03-27?22:43:31????update=false
update_date?:?2021-03-27?22:43:34????update=false
------->?after
id?:?3????update=false
name?:?java開發(fā)????update=true
price?:?87.0????update=false
create_date?:?2021-03-27?22:43:31????update=false
update_date?:?2021-03-27?22:43:34????update=false
可以看出是在?mysql-bin.000002?
文件中,數(shù)據(jù)庫名稱 cheetah ,表名 product_info,事件類型:update。文章來源地址http://www.zghlxwxcb.cn/news/detail-828559.html
到了這里,關(guān)于MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!