簡介
canal基于MySQL數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費,是阿里開源CDC工具,它可以獲取MySQL binlog數(shù)據(jù)并解析,然后將數(shù)據(jù)變動傳輸給下游?;赾anal,可以實現(xiàn)從MySQL到其他數(shù)據(jù)庫的實時同步
工作原理
MySQL主備復(fù)制原理
MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)
canal 工作原理
canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對象(原始為 byte 流)
以上來自canal的github介紹,鏈接:https://github.com/alibaba/canal
canal 使用流程
- 部署Deployer服務(wù),該服務(wù)負(fù)責(zé)從上游拉取binlog數(shù)據(jù)、記錄位點等
- 部署Client-Adapter服務(wù),該服務(wù)負(fù)責(zé)對接Deployer解析過的數(shù)據(jù),并將數(shù)據(jù)傳輸?shù)侥繕?biāo)庫中。
- 部署完成后,canal默認(rèn)會自動同步MySQL增量數(shù)據(jù)。
- 如果需要同步MySQL全量數(shù)據(jù),請手動調(diào)用Client-Adapter服務(wù)的方法觸發(fā)同步任務(wù)。
待全量數(shù)據(jù)同步完成后,canal會自動開始增量同步。
環(huán)境使用版本
需要注意版本對應(yīng),canal1.1.6版本需要jdk11,canal1.1.5版本支持jdk8
應(yīng)用 | 版本 |
---|---|
mysql | 8.0.28 |
elasticsearch | 7.9.2 |
canal | 1.1.5 |
jdk | 8 |
MySQL環(huán)境搭建
1.修改mysql配置文件
配置數(shù)據(jù)庫my.cnf文件,如果是windows則配置my.ini文件
#開啟二進(jìn)制日志功能
log-bin=mall-mysql-bin
#設(shè)置使用的二進(jìn)制日志格式(mixed,statement,row)
binlog_format=row
配置完成重啟mysql實例
# 重啟mysql命令
systemctl restart mysqld
登錄mysql后校驗是否開啟
# 登錄mysql,需要密碼輸入
mysql -uroot -p
# 查看日志開啟的sql
show variables like '%log_bin%';
mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------+
| Variable_name | Value |
+---------------------------------+-------------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mall-mysql-bin |
| log_bin_index | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-------------------------------------+
6 rows in set (0.12 sec)
查看是否為row模式
mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.02 sec)
2.創(chuàng)建并賦權(quán)從庫賬號
創(chuàng)建從庫權(quán)限賬號canal,用于訂閱binlog
#創(chuàng)建用戶,密碼自己填寫,由于創(chuàng)建用戶時默認(rèn)的密碼加密方式為caching_sha2_password,所以修改為mysql_native_password,否則服務(wù)端啟動時可能會報錯
create user 'canal'@'%' identified with mysql_native_password by 'Password@123';
# 給新創(chuàng)建賬戶賦予從庫權(quán)限
grant select, replication slave, replication client on *.* to 'canal'@'%';
# 刷新權(quán)限
flush privileges;
3.創(chuàng)建測試數(shù)據(jù)庫
CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;
創(chuàng)建測試數(shù)據(jù)表
CREATE TABLE `test_book` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '題名',
`isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
`author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
`publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC
ES環(huán)境搭建
1.創(chuàng)建索引
2.建立映射
{
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text"
},
"isbn": {
"type": "text"
},
"author": {
"type": "text"
},
"publisherName": {
"type": "text"
}
}
}
canal的下載部署
下載canal
下載地址:https://github.com/alibaba/canal/releases
下載解壓到服務(wù)器指定目錄,并給文件夾及其子文件賦權(quán)
chmod -R 755 /home/canal-1.1.5/
配置服務(wù)端 canal-deployer
canal-deployer偽裝成mysql的從庫,監(jiān)聽binlog接收數(shù)據(jù),目錄結(jié)構(gòu)如下:
1.修改配置/conf/canal.properties
#canal的server地址:127.0.0.1,除了ip和port外,其他配置可不改動
canal.ip =127.0.0.1
#canal端口,用于客戶端監(jiān)聽
canal.port = 11111
2.修改配置/conf/example/instance.properties
#被同步的mysql地址
canal.instance.master.address=127.0.0.1:3306
#數(shù)據(jù)庫從庫權(quán)限賬號
canal.instance.dbUsername=canal
#數(shù)據(jù)庫從庫權(quán)限賬號的密碼
canal.instance.dbPassword=Password@123
#數(shù)據(jù)庫連接編碼
canal.instance.connectionCharset = UTF-8
#需要訂閱binlog的表過濾正則表達(dá)式
#canal.instance.filter.regex=.*\\..*
#我們只監(jiān)聽數(shù)據(jù)同步表
canal.instance.filter.regex=cxstar_oa.data_sync_es
#這里與文件夾名保持一致,后面會用到
canal.mq.topic=example
3.啟動canal-deployer
進(jìn)入bin目錄,執(zhí)行啟動命令:
./startup.sh
查看日志:/logs/canal/canal.log
2023-02-02 15:28:16.016 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-02 15:28:16.043 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-02 15:28:16.054 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-02 15:28:16.112 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
2023-02-02 15:28:17.824 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
查看日志:/logs/canal/canal.log
2023-02-02 15:28:17.590 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2023-02-02 15:28:17.619 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-02 15:28:17.619 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-02 15:28:17.757 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-02 15:28:17.776 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-02 15:28:17.776 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2023-02-02 15:28:18.382 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000008,position=12380,serverId=101,gtid=,timestamp=1675309792000] cost : 610ms , the next step is binlog dump
日志如上就已經(jīng)成功啟動
可能的問題: caching_sha2_password Auth failed
原因:
使用mysql版本為8.0,而創(chuàng)建用戶時默認(rèn)的密碼加密方式為caching_sha2_password,所以修改為mysql_native_password
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '密碼'; #更新一下用戶密碼
FLUSH PRIVILEGES; #刷新權(quán)限
配置客戶端canal-adapter
canal-adapter:作為canal的客戶端,會從canal-server中獲取數(shù)據(jù),然后同步數(shù)據(jù)到MySQL、Elasticsearch等存儲中去。目錄結(jié)構(gòu)如下:
1.替換client-adapter.es7的jar文件
下載v1.1.5-alpha-2,解壓后找到plugin目錄下的
client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
增加執(zhí)行權(quán)限
chmod -R 755 /home/canal-1.1.5/canal.adapter-1.1.5/plugin
關(guān)于為何這么做的解釋,如果不替換,在啟動的時候會報錯:
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
2.修改配置/conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # 客戶端的模式,可選tcp kafka rocketMQ
flatMessage: true # 扁平message開關(guān), 是否以json字符串形式投遞數(shù)據(jù), 僅在kafka/rocketMQ模式下有效
zookeeperHosts: # 對應(yīng)集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批數(shù)量
retries: 0 # 重試次數(shù), -1為無限重試
timeout: # 同步超時時間, 單位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #設(shè)置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources: # 源數(shù)據(jù)庫配置
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true #測試數(shù)據(jù)庫連接
username: root #數(shù)據(jù)庫賬號
password: Passwd@2014 #數(shù)據(jù)庫密碼
canalAdapters: # 適配器列表
- instance: example # canal實例名或者M(jìn)Q topic名
groups: # 分組列表
- groupId: g1 # 分組id, 如果是MQ模式將用到該值
outerAdapters:
- name: logger # 日志打印適配器
- name: es7 # ES同步適配器
hosts: 192.168.0.182:9200 # ES連接地址
properties:
mode: rest # 模式可選transport(9300) 或者 rest(9200)
#security.auth: elastic:123456 # 連接es的用戶和密碼,僅rest模式使用
cluster.name: elasticsearch # ES集群名稱, 與es目錄下 elasticsearch.yml文件cluster.name對應(yīng)
3.增加mysql同步es的映射文件
進(jìn)入/conf/es7目錄下,復(fù)制mytest_user.yml命名為test_book.yml,同時修改:
dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值
destination: example # canal的instance或者M(jìn)Q的topic
groupId: g1 # 對應(yīng)MQ模式下的groupId, 只會同步對應(yīng)groupId的數(shù)據(jù)
esMapping:
_index: test_book # es 的索引名稱
_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
sql: "SELECT
tb.id AS _id,
tb.title,
tb.isbn,
tb.author,
tb.publisher_name as publisherName
FROM
test_book tb" # sql映射
etlCondition: "where p.id>={}" #etl的條件參數(shù)
commitBatch: 3000 # 提交批大小
4.啟動canal-adapter
啟動canal-adapter,進(jìn)入bin目錄,執(zhí)行啟動命令:
./startup.sh
日志如下即表示啟動成功
2023-04-02 13:22:52.337 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-04-02 13:22:52.337 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-04-02 13:22:52.338 [main] INFO c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.372 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-04-02 13:22:52.679 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2023-04-02 13:22:52.751 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-04-02 13:22:52.951 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-04-02 13:22:52.960 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.982 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: cxstar_oa-g1 succeed
2023-04-02 13:22:52.983 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-04-02 13:22:52.983 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: cxstar_oa <=============
2023-04-02 13:22:52.990 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-04-02 13:22:52.991 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-04-02 13:22:53.013 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-04-02 13:22:53.028 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.259 seconds (JVM running for 6.378)
2023-04-02 13:22:53.081 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: cxstar_oa succeed <=============
5.canal-adapter啟動可能的報錯問題
1.com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
[main] ERROR com.alibaba.druid.pool.DruidDataSource - init datasource error, url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 216 milliseconds ago. The last packet sent successfully to the server was 210 milliseconds ago.
解決方法:/conf/application.yml 中的mysql連接去除&useSSL=true
2.com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
原因:druid 包沖突
解決方法:
方法1.下載源碼包 ,修改client-adapter/escore/pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
打包后將client-adapter/es7x/target/client-adapter.es7x-1.1.5-jar-with-dependencies.jar上傳到服務(wù)器,替換adataper/plugin下的同名jar文件
方法2.下載v1.1.5-alpha-2,
找到plugin目錄下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
上傳到服務(wù)器 canal.adapter-1.1.5/plugin目錄下,同時刪除client-adapter.es7x-1.1.5-jar-with-dependencies.jar
3.Load canal adapter: es7 failed,Name or service not known
ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.net.UnknownHostException: http: Name or service not known
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
解決方案:/conf/application.yml配置中 ,hosts不要帶http://
4.java.lang.NullPointerException: esMapping._type
ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type
java.lang.NullPointerException: esMapping._type
at com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.validate(ESSyncConfig.java:35) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor$FileListener.onFileChange(ESConfigMonitor.java:102) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
at org.apache.commons.io.monitor.FileAlterationObserver.doMatch(FileAlterationObserver.java:400) [commons-io-2.4.jar:2.4]
at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:334) [commons-io-2.4.jar:2.4]
at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:304) [commons-io-2.4.jar:2.4]
at org.apache.commons.io.monitor.FileAlterationMonitor.run(FileAlterationMonitor.java:182) [commons-io-2.4.jar:2.4]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
解決方案:canal.adapter-1.1.5/conf/es7目錄下的yml中增加一個官方配置的屬性
hosts: 192.168.0.182:9200 # ES連接地址
驗證canal-adapter是否啟動成功
查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log
[org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=2b76ff4e,type=ConfigurationPropertiesRebinder]
2023-02-03 09:34:13.373 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-02-03 09:34:13.374 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-02-03 09:34:13.375 [main] INFO c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:13.418 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-02-03 09:34:13.643 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2023-02-03 09:34:13.726 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-02-03 09:34:13.995 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-02-03 09:34:14.005 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:14.029 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2023-02-03 09:34:14.029 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2023-02-03 09:34:14.029 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-02-03 09:34:14.037 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-02-03 09:34:14.039 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-02-03 09:34:14.067 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-02-03 09:34:14.080 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.221 seconds (JVM running for 5.807)
2023-02-03 09:34:14.169 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
同步測試
建立es索引和mysql表的映射
在客戶端目錄canal.adapter-1.1.5/conf/es7下配置字段的映射,adapter默認(rèn)會加載es路徑下的所有yml文件。一個配置文件表示一張表的mapping。
建立es和mysql的映射文件test_book.yml
dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值
destination: example # canal的instance或者M(jìn)Q的topic
groupId: g1 # 對應(yīng)MQ模式下的groupId, 只會同步對應(yīng)groupId的數(shù)據(jù)
esVersion: es7
esMapping:
_index: test_book # es 的索引名稱
_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
sql: "SELECT
b.id AS _id,
b.title,
b.author,
b.isbn,
b.publisher_name as publisherName
FROM
test_book b" # sql映射
etlCondition: "where p.id>={}" #etl的條件參數(shù)
commitBatch: 5000 # 提交批大小
插入mysql數(shù)據(jù)驗證同步
INSERT INTO `canal`.`test_book`( `title`, `isbn`, `author`, `publisher_name`) VALUES ( '三體', '98741254125', '劉慈欣', '工業(yè)出版社');
查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log
2023-02-03 10:18:21.988 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"title":"三體","isbn":"98741254125","author":"劉慈欣","publisher_name":"工業(yè)出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"}
2023-02-03 10:18:22.225 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":3,"title":"三體","isbn":"98741254125","author":"劉慈欣","publisher_name":"工業(yè)出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"}
Affected indexes: test_book
查看es數(shù)據(jù),已經(jīng)成功同步
數(shù)據(jù)同步
canal開啟前的數(shù)據(jù)如何同步
canal-adapter提供一個REST接口可全量同步數(shù)據(jù)到ES,調(diào)用Client-Adapter服務(wù)的方法觸發(fā)同步任務(wù)。此時,canal會先中止增量數(shù)據(jù)傳輸,然后同步全量數(shù)據(jù)。待全量數(shù)據(jù)同步完成后,canal會自動進(jìn)行增量數(shù)據(jù)同步。
注意:如果數(shù)據(jù)是binlog開啟前存在,則不可以使用此種方式
curl http://127.0.0.1:8081/etl/es7/test_book.yml -X POST
同步日志:
2023-02-03 10:41:35.043 [http-nio-8081-exec-1] INFO c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - start etl to import data to index: test_book
2023-02-03 10:41:35.130 [http-nio-8081-exec-1] INFO c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - 數(shù)據(jù)全量導(dǎo)入完成, 一共導(dǎo)入 3 條數(shù)據(jù), 耗時: 85
binlog未開啟前的歷史數(shù)據(jù)如何同步?
因為canal是基于binlog實現(xiàn)全量同步的,那么未開啟binlog之前的歷史數(shù)據(jù)就無法被同步,將數(shù)據(jù)庫中的數(shù)據(jù)導(dǎo)出再重新導(dǎo)入一遍,這樣就可以生成binlog
es數(shù)組類型同步
adapter配置文件中添加配置
objFields:
author: array:, #代表字段以,分割
配置更新后會監(jiān)聽到配置改變,無需重啟
2023-02-03 11:33:24.098 [Thread-3] INFO c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - Change a es mapping config: test_book.yml of canal adapter
更新數(shù)據(jù),author字段
UPDATE `canal`.`test_book` SET `title` = '三體', `isbn` = '98741254125', `author` = '劉慈欣,劉電工', `publisher_name` = '工業(yè)出版社' WHERE `id` = 1;
es中的數(shù)據(jù)已改變
多張表數(shù)據(jù)同步到一個索引中
yml映射文件中,主表一定要在最左側(cè),從表的數(shù)據(jù)改變也會自動同步到es中!
示例:journal_volume 表中的數(shù)據(jù)改變,也會自動同步到j(luò)ournal_paper 表對應(yīng)的es索引中文章來源:http://www.zghlxwxcb.cn/news/detail-401140.html
SELECT
jp.id AS _id,
jp.sid AS sid,
jp.import_id AS importId,
jp.journal_id AS journalId,
jp.journal_volume_id AS journalVolumeId,
jv.`year` as year,
jv.volume as volume,
jv.issue as issue,
j.publisher_name as publisherName
FROM journal_paper jp
left join journal_volume jv on jp.journal_volume_id=jv.id
left join journal j on j.id=jp.journal_id
可參考,待親自實現(xiàn)
https://blog.csdn.net/qq_24950043/article/details/122643889文章來源地址http://www.zghlxwxcb.cn/news/detail-401140.html
到了這里,關(guān)于canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!