安裝
官方docker安裝說明文檔:https://github.com/alibaba/canal/wiki/Docker-QuickStart
組件介紹
canal.adapter
canal 1.1.1版本之后, 增加客戶端數據落地的適配及啟動功能, 目前支持功能:
客戶端啟動器
同步管理REST接口
日志適配器, 作為DEMO
關系型數據庫的數據同步(表對表同步), ETL功能
HBase的數據同步(表對表同步), ETL功能
(后續(xù)支持) ElasticSearch多表數據同步,ETL功能
canal.admin
設計上是為canal提供整體配置管理、節(jié)點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作
canal.deployer
這個就相當于canal的服務端,啟動它才可以在客戶端接收數據庫變更信息。也是本文的重點,上面兩個類似與這個的拓展
canal.example
是Canal提供的一個示例工程,用于演示如何整合使用Canal的各個組件。
配置數據庫
如何開啟mysql的binlog日志可看這個博客:biinlog是什么
canal原理是偽裝成mysql的從節(jié)點,并通過讀取主庫的binlog日志來同步數據的
主從同步原理圖:
canal工作原理圖:
創(chuàng)建canal用戶
# 新建用戶 用戶名:canal 密碼:canal
CREATE USER canal IDENTIFIED by 'canal';
# 授權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新MySQL的系統(tǒng)權限相關表
FLUSH PRIVILEGES;
docker安裝
#安裝這個鏡像就相當于canal.deployer的功能
docker pull canal/canal-server:latest
#創(chuàng)建目錄canal掛載的目錄
mkdir -p /mydata/canal/conf
#先啟動canal的docker
docker run -p 11111:11111 --name canal -d canal/canal-server:latest
#將容器內的配置文件copy到剛建的文件夾中
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
#刪除剛才啟動的容器
docker rm canal
#修改instance.properties配置文件
cd /mydata/canal/conf
vim instance.properties
#掛載配置文件并啟動
docker run -p 11111:11111 --name canal -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server:latest
#查看啟動情況
docker logs -fn 100 canal
表示啟動成功
程序測試
示例代碼:https://github.com/alibaba/canal/wiki/ClientExample
賬號官方教程引入依賴,編寫代碼測試,并啟動
啟動后控制臺上
數據庫修改數據后
會檢測到變動,說明前面部署一切正常,可以開始編寫后續(xù)邏輯
binlog:binlog文件名
name:庫名表名
eventType:操作類型
下面的就是數據了
canal監(jiān)聽mysql數據同步elasticsearch
上文說到canal.adapter的功能中有同步es的功能所以這一步就需要使用到這個模塊實現mysql數據同步es的功能
安裝canal-adapter
在瀏覽量docker hub后發(fā)現官方沒有發(fā)布該功能的容器,搜到的都是別人自己創(chuàng)建的容器(可能不是最新的)
所以這一步就直接使用壓縮包來安裝。
下載安裝包
下載地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
官方指導文檔:https://github.com/alibaba/canal/wiki/Sync-ES
配置java環(huán)境
略
啟動canal-adapter
#解壓
tar -zxvf canal.adapter-1.1.7.tar.gz
#修改applincation.yml
vim application.yml
#未注釋的改成和下面一樣
###################################分割線###################################
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer 這個改成你canal-server的地址,我這里是本機
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer 這里可以不用配置,原來的也不用刪,不影響
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer 同上
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer 同上
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
#這里注意,原先是被注釋掉的,這里是配置數據源,就是mysql的地址
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.17.98:3306/es_test?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
#在conf目錄下有三個文件夾,分別是es6、es7、es8,因為我使用es版本是7,所以我這里寫es7
- name: es7
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
###################################分割線###################################
#修改完application.yml需要修改es7目錄下的配置,如果你的版本不是7請進入對應目錄
cd es7
#修改mytest_user.yml
vim mytest_user.yml
###################################分割線###################################
# 源數據源的key, 對應上面配置的srcDataSources中的值
dataSourceKey: defaultDS
# cannal的instance或者MQ的topic
destination: example
# 對應MQ模式下的groupId, 只會同步對應groupId的數據
groupId: g1
esMapping:
# es 的索引名稱,這等下可以手動創(chuàng)建索引
_index: documentv1
# es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
_id: _id
# upsert: true
# pk: id # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
# sql映射,這里就是查詢全表的sql
sql: "select a.id as _id, a.title as title , a.content as content , a.create_time as create_time, a.update_time as update_time from document a"
# objFields:
# _labels: array:;
# etl 的條件參數,adapter 的 ETL 接口為:/etl/{type}/{task} curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml?params=更新時間
#默認web端口為 8081
#type 為類型(hbase/es7/rdb)
#task 為任務名對應配置文件名,如sys_user.yml
#etlCondition: "where a.update_time>={0}"
# 提交批大小
commitBatch: 3000
###################################分割線###################################
#配置完后進到bin目錄下,啟動
sh start.sh
#查看日志,日志在logs/adapter目錄下
啟動截圖文章來源:http://www.zghlxwxcb.cn/news/detail-764088.html
給es創(chuàng)建索引
//這個索引名稱要和上面mytest_user.yml文件中配置的一樣
PUT /documentv1
{
"mappings": {
"properties": {
"content": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"create_time": {
"type": "date"
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"update_time": {
"type": "date"
}
}
}
}
在mysql中增、刪、改一條數據觀察日志
我修改了一條數據,日志中就會有一條信息
至此canal-server監(jiān)聽mysql然后通過canal-adapter同步到elasticsearch全鏈路已經完成。文章來源地址http://www.zghlxwxcb.cn/news/detail-764088.html
到了這里,關于docker安裝canal入門實戰(zhàn),同步mysql數據到elasticsearch的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!