目錄
1.前言
2.什么是canal
3.canal能做什么
4.如何搭建canal
4.1首先有一個MySQL服務器
4.2 準備canal
1.下載
2.解壓
3.修改配置文件
4.啟動canal
5.Java創(chuàng)建客戶端,監(jiān)聽canalServer(官網(wǎng)推薦方式)
1.創(chuàng)建SpringBoot項目
略過…
2.導入canal客戶端包
3.導入測試Main方法
6.Java創(chuàng)建客戶端,GitHub推薦三方工具
1.創(chuàng)建SpringBoot項目
2.導入依賴
3.導入Bean代碼
4.啟動項目
7.canal-adapter同步數(shù)據(jù)
7.1.安裝adapter客戶端
7.2 解壓到純英文 路徑,中文會報錯
7.3配置application.yml
7.4mysql-》mysql需要修改 rdb配置文件
7.5全量同步
7.6多表同步
8.報錯解決
8.1 canal服務端
?????? 8.1.1:找不到表
8.2 adapter客戶端
?8.2.1: dir not exist? (找不到目錄)
8.2.2: Did not matched any columns to update(沒有任何一列可以更新)
8.2.3: 全量更新執(zhí)行ETL命令報錯 ?Task not fund 解決方案見 (canal數(shù)據(jù)同步7.5(全量同步)
1.前言
我們都知道一個系統(tǒng)最重要的是數(shù)據(jù),數(shù)據(jù)是保存在數(shù)據(jù)庫里。但是很多時候不單止要保存在數(shù)據(jù)庫中,還要同步保存到Elastic Search、HBase、Redis等等。
這時我注意到阿里開源的框架Canal,他可以很方便地同步數(shù)據(jù)庫的增量數(shù)據(jù)到其他的存儲應用。所以在這里總結一下,分享給各位讀者參考~
2.什么是canal
我們先看官網(wǎng)的介紹:
canal,譯意為水道/管道/溝渠,主要用途是基于?MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。
這句介紹有幾個關鍵字:增量日志,增量數(shù)據(jù)訂閱和消費。
這里我們可以簡單地把canal理解為一個用來同步增量數(shù)據(jù)的一個工具。
canal的工作原理就是把自己偽裝成MySQL slave,模擬MySQL slave的交互協(xié)議向MySQL Mater發(fā)送 dump協(xié)議,MySQL mater收到canal發(fā)送過來的dump請求,開始推送binary log給canal,然后canal解析binary log,再發(fā)送到存儲目的地,比如MySQL,Kafka,Elastic Search等等。
3.canal能做什么
與其問canal能做什么,不如說數(shù)據(jù)同步有什么作用。
但是canal的數(shù)據(jù)同步不是全量的,而是增量?;?/span>binary log增量訂閱和消費,canal可以做:
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實時備份
- 索引構建和實時維護
- 業(yè)務cache(緩存)刷新
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理
4.如何搭建canal
4.1首先有一個MySQL服務器
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
我的Linux服務器安裝的MySQL服務器是5.7版本。
MySQL的安裝這里就不演示了,比較簡單,網(wǎng)上也有很多教程。
然后在MySQL中需要創(chuàng)建一個用戶,并授權:
-- 使用命令登錄:mysql -u root -p -- 創(chuàng)建用戶 用戶名:canal 密碼:Canal@123456 create user 'canal'@'%' identified by 'Canal@123456'; -- 授權 *.*表示所有庫 grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';
下一步在MySQL配置文件my.cnf設置如下信息:
[mysqld] # 打開binlog log-bin=mysql-bin # 選擇ROW(行)模式 binlog-format=ROW # 配置MySQL replaction需要定義,不要和canal的slaveId重復 server_id=1
4.2 準備canal
1.下載
去官網(wǎng)下載頁面進行下載:https://github.com/alibaba/canal/releases
2.解壓
3.修改配置文件
需要配置以下參數(shù):
#canal讀取mysql的binlog文件是把自己偽裝成一個slava ,所以需要配置一個id,這個id不能和mysql配置的id相同 canal.instance.mysql.slaveId=9 #要監(jiān)聽的mysql的地址 canal.instance.master.address=127.0.0.1:3306 #mysql 數(shù)據(jù)解析關注的表,Perl正則表達式. #多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\\) #常見例子: #1.? 所有表:.*?? or? .*\\..* #2.? canal schema下所有表:canal\\..* #3.? canal下的以canal打頭的表:canal\\.canal.* #4.? canal schema下的一張表:canal.test1 #5.? 多個規(guī)則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔) #默認監(jiān)聽的數(shù)據(jù)庫的名稱,也可以不配置這個 canal.instance.defaultDatabaseName=test #黑名單 canal.instance.filter.black.regex=mysql\\.slave_.* #白名單 canal.instance.filter.regex=.*\\..* |
4.啟動canal
- 查看配置,是否有報錯
5.Java創(chuàng)建客戶端,監(jiān)聽canalServer(官網(wǎng)推薦方式)
1.創(chuàng)建SpringBoot項目
略過…
2.導入canal客戶端包
<dependency> |
3.導入測試Main方法
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class ClientSample { ??? public static void main(String args[]) { ??????? // 創(chuàng)建鏈接 ??????? CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), ??????????????? 11111), "example", "", ""); ??????? int batchSize = 1000; ??????? int emptyCount = 0; ??????? try { ??????????? connector.connect(); ?????????? ?connector.subscribe(".*\\..*"); ??????????? connector.rollback(); ??????????? int totalEmtryCount = 1200; ??????????? while (emptyCount < totalEmtryCount) { ??????????????? Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù) ?????????????? ?long batchId = message.getId(); ??????????????? int size = message.getEntries().size(); ??????????????? if (batchId == -1 || size == 0) { ??????????????????? emptyCount++; ??????????????????? System.out.println("empty count : " + emptyCount); ??????????? ????????try { ??????????????????????? Thread.sleep(1000); ??????????????????? } catch (InterruptedException e) { ??????????????????????? e.printStackTrace(); ??????????????????? } ??????????????? } else { ??????????????????? emptyCount = 0; ?????????????? ?????// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); ??????????????????? printEntry(message.getEntries()); ??????????????? } ??????????????? connector.ack(batchId); // 提交確認 ??????????????? // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù) ??????????? } ??????????? System.out.println("empty too many times, exit"); ??????? } finally { ??????????? connector.disconnect(); ??????? } ??? } ??? private static void printEntry( List<Entry> entrys) { ??????? for (Entry entry : entrys) { ????? ??????if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { ??????????????? continue; ??????????? } ??????????? RowChange rowChage = null; ??????????? try { ??????????????? rowChage = RowChange.parseFrom(entry.getStoreValue()); ??????????? } catch (Exception e) { ??????????????? throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), ??????????????????????? e); ??????????? } ??????????? EventType eventType = rowChage.getEventType(); ??????????? System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", ??????????????????? entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), ??????????????????? entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), ??????????????????? eventType)); ??????????? for (RowData rowData : rowChage.getRowDatasList()) { ??????????????? if (eventType == EventType.DELETE) { ??????????????????? printColumn(rowData.getBeforeColumnsList()); ??????????????? } else if (eventType == EventType.INSERT) { ??????????????????? printColumn(rowData.getAfterColumnsList()); ??????????????? } else { ??????????????????? System.out.println("-------> before"); ??????????????????? printColumn(rowData.getBeforeColumnsList()); ??????????????????? System.out.println("-------> after"); ??????????????????? printColumn(rowData.getAfterColumnsList()); ??????????????? } ??????????? } ??????? } ??? } ?? ?private static void printColumn( List<Column> columns) { ??????? for (Column column : columns) { ??????????? System.out.println(column.getName() + " : " + column.getValue() + "??? update=" + column.getUpdated()); ??????? } ??? } } |
6.Java創(chuàng)建客戶端,GitHub推薦三方工具
1.創(chuàng)建SpringBoot項目
省略..
2.導入依賴
<!--第三方 GitHub 開源工具 --> |
3.導入Bean代碼
import com.brs.canalclient.domain.School; |
4.啟動項目
?省略..
7.canal-adapter同步數(shù)據(jù)
詳細配置教學:https://blog.csdn.net/zcl111/article/details/119868846
7.1.安裝adapter客戶端
https://github.com/alibaba/canal/releases
|
7.2 解壓到純英文 路徑,中文會報錯
7.3配置application.yml
|
-
-
-
- 修改標記的4個地方:
-
-
#↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓? adapter適配器基礎配置 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ server: ? port: 8081 # spring: ? jackson: ??? date-format: yyyy-MM-dd HH:mm:ss ??? time-zone: GMT+8 ??? default-property-inclusion: non_null ??? ??? ??? ??? #↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓? canal服務端地址?????? ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ canal.conf: # kafka rocketMQ ? mode: tcp ? canalServerHost: 127.0.0.1:11111????? # 2.canalService 地址 #? zookeeperHosts: slave1:2181 #? mqServers: 127.0.0.1:9092 #or rocketmq #? flatMessage: true ? batchSize: 500 ? syncBatchSize: 1000 ? retries: 0 ? timeout: ? accessKey: ? secretKey: #↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓? canal監(jiān)控的數(shù)據(jù)源 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ ? srcDataSources:?? ??? source1:??????????????????????????????????????????????????????? # 3.監(jiān)控地址 ????? url: jdbc:mysql://localhost:3306/test?useUnicode=true ????? username: root ????? password: 123456yts ????? ????? ????? ????? ????? ????? #↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓? 數(shù)據(jù)去處時適配器配置。可以配置多個,并發(fā)執(zhí)行。每個適配器都有個對應的instance。??? ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ ????? ? canalAdapters:???????????? ???????????????????????????????????????# 同步數(shù)據(jù)的目標數(shù)據(jù)源配置 ? - instance: example # canal instance Name or mq topic name??????? #canalServer的實例名稱,或者mq交換機類型 ??? groups:???????????????????????????????????????????????????????? # 消費組其一的ID? ??? - groupId: g1??????? ???????????????????????????????????????????# 消費者 group_01 下的多個數(shù)據(jù)去處 適配器。 ????? outerAdapters:??????????????????????????????????????????????? # 組內的適配器是串行處理。如果其中一個異常,會導致下面的的適配器不能執(zhí)行。 ????? - name: rdb?????????????????????????????????????????????????? # 適配器類型。即目前支持的通過SPI加載的適配器。從 plugin文件夾中讀取的。 ??????? key: adapterTestKey1??????????????????????????????????????? # 適配器key。具體適配器yml中配置 outerAdapterKey引用。 ??????? properties:???????????????????????????????????????????????? # 4.目標地址 ????????? jdbc.driverClassName: com.mysql.jdbc.Driver ????????? jdbc.url: jdbc:mysql://localhost:3307/test?useUnicode=true ????????? jdbc.username: root ????????? jdbc.password: 123456yts ???? |
7.4mysql-》mysql需要修改 rdb配置文件
|
dataSourceKey: source1????????????????????? ##canal adapter配置的數(shù)據(jù)源 destination: example??????????????????????? #canal實例名稱,對應application.yml中的instance groupId: g1???????????????????????????????? #對應哪個outerAdapter的消費組 outerAdapterKey: adapterTestKey1??????????? #application.yml 中配置的Key concurrent: true??????????????????????????? #是否并行同步 dbMapping:????????????????????????????????? #上下游數(shù)據(jù)映射 ? #mirrorDb: true??????????????????????????? #同步數(shù)據(jù)庫DDL語句create-drop-alert,同步DDL必須配置這個? ? database: test??????????????????????????? #上游數(shù)據(jù)庫名稱 ? table: school???????????????????????????? #上游數(shù)據(jù)庫中的表名稱 ? targetTable: test.school????????????????? #下游目標數(shù)據(jù)庫+表名稱?? 用 . 號連接 ? targetPk:???????????????????????????????? #目標主鍵 primaryKey 映射 ??? id: id????????????????????????????????? #ID映射 ?#mapAll: true????????????????????????????? # 是否整表映射, 要求源表和目標表字段名一模一樣 (如果targetColumns也配置了映射,則以targetColumns配置為準) ??????????????????????????????????????????? # 注意 這段表述“以targetColumns配置為準”。這個并不是說只同步 targetColumns配置的屬性。而是說一樣要同步 源表的所有屬性。 ??????????????????????????????????????????? # 但是考慮到目標表的屬性名稱可能不完全一致,有區(qū)別的屬性名稱可以通過targetColumns來配置映射關系,沒有配置的默認屬性默認都是相同。 ??????????????????????????????????????????? # 如果只需要同步部分源表的屬性到目標表中,這里應該設置false ? targetColumns:? ??????????????????????????#2. 這也是映射,如果和上面 mapAll 同時開啟,targetColumns優(yōu)先級更高 ??????????????????????????????????????????? # 字段映射, 格式: 目標表字段: 源表字段, 如果字段名一樣源表字段名可不填 ??????????????????????????????????????????? # 注意數(shù)據(jù)源的 to: from 前面是數(shù)據(jù)要同步到的地方,后面是數(shù)據(jù)來源的 ??? id: ??? school_name: name ? etlCondition: "where c_time>={}" ? commitBatch: 3000 # 批量提交的大小 |
7.5全量同步
1.上面講解的都是增量同步的方式
???????? 增量同步其實很簡單,只用發(fā)送一條etl 請求就可以了
http://127.0.0.1:8081/etl/rdb/adapterTestKey1/mytest_user.yml
|
標記①:這里是動態(tài)配置的,如果CanalAdapter的配置文件中
??配置了①標記出的key在etl的連接中就要加上這個eky |
圖1標記②:這個是同步的類型,我們現(xiàn)在執(zhí)行的是mysql->mysql 所以是rdb
7.6多表同步
1.思路:
???????? 每一張表的同步在canal-adapter里面多需要一個適配器
2.配置適配器
在application.yml 中配置
|
3.使用適配器
適配器的使用,就是指定從哪里同步到哪里了
|
修改復制的hobby_user.yml內容
|
4.配置好了直接重啟 adapter服務
如果要全量同步,就通過每張表配置適配器時候指定的outerAdapterKey 來同步
8.報錯解決
8.1 canal服務端
?????? 8.1.1:找不到表
Caused by: java.io.IOException: ErrorPacket [errorNumber=1146, fieldCount=-1, message=Table 'xzw.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#] ?with command: show create table `xzw`.`t_1`;show create table `xzw`.`BASE TABLE`; ??????? at com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor.queryMulti(MysqlQueryExecutor.java:109) ~[canal.parse.driver-1.1.6.jar:na] ??????? at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.queryMulti(MysqlConnection.java:111) ~[canal.parse-1.1.6.jar:na] ??????? at com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta.dumpTableMeta(DatabaseTableMeta.java:233) ~[canal.parse-1.1.6.jar:na] ??????? at com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta.rollback(DatabaseTableMeta.java:174) ~[canal.parse-1.1.6.jar:na] ??????? at com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.processTableMeta(AbstractMysqlEventParser.java:142) ~[canal.parse-1.1.6.jar:na] ??????? at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:197) ~[canal.parse-1.1.6.jar:na] ??????? at java.lang.Thread.run(Thread.java:750) [na:1.8.0_331] |
配置 canal.instance.filter.black.regex=.*\\.BASE.* 解決
文檔:https://github.com/alibaba/canal/issues/4219
8.2 adapter客戶端
?8.2.1: dir not exist? (找不到目錄)
???????? 1.解決方案:adapter目錄不能存放在有中文路徑的文件里文章來源:http://www.zghlxwxcb.cn/news/detail-421151.html
8.2.2: Did not matched any columns to update(沒有任何一列可以更新)
???????? 1.解決方案:數(shù)據(jù)庫字段名特殊符號_ 下劃線識別不了文章來源地址http://www.zghlxwxcb.cn/news/detail-421151.html
8.2.3: 全量更新執(zhí)行ETL命令報錯 ?Task not fund 解決方案見 (canal數(shù)據(jù)同步7.5(全量同步)
到了這里,關于Alibaba Canal數(shù)據(jù)同步 mysql->mysql的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!