国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

這篇具有很好參考價值的文章主要介紹了canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

簡介

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
canal基于MySQL數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費,是阿里開源CDC工具,它可以獲取MySQL binlog數(shù)據(jù)并解析,然后將數(shù)據(jù)變動傳輸給下游?;赾anal,可以實現(xiàn)從MySQL到其他數(shù)據(jù)庫的實時同步

工作原理

MySQL主備復(fù)制原理

MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

canal 工作原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對象(原始為 byte 流)

以上來自canal的github介紹,鏈接:https://github.com/alibaba/canal

canal 使用流程

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

  1. 部署Deployer服務(wù),該服務(wù)負(fù)責(zé)從上游拉取binlog數(shù)據(jù)、記錄位點等
  2. 部署Client-Adapter服務(wù),該服務(wù)負(fù)責(zé)對接Deployer解析過的數(shù)據(jù),并將數(shù)據(jù)傳輸?shù)侥繕?biāo)庫中。
  3. 部署完成后,canal默認(rèn)會自動同步MySQL增量數(shù)據(jù)。
  4. 如果需要同步MySQL全量數(shù)據(jù),請手動調(diào)用Client-Adapter服務(wù)的方法觸發(fā)同步任務(wù)。
    待全量數(shù)據(jù)同步完成后,canal會自動開始增量同步。

環(huán)境使用版本

需要注意版本對應(yīng),canal1.1.6版本需要jdk11,canal1.1.5版本支持jdk8

應(yīng)用 版本
mysql 8.0.28
elasticsearch 7.9.2
canal 1.1.5
jdk 8

MySQL環(huán)境搭建

1.修改mysql配置文件

配置數(shù)據(jù)庫my.cnf文件,如果是windows則配置my.ini文件

#開啟二進(jìn)制日志功能
log-bin=mall-mysql-bin
#設(shè)置使用的二進(jìn)制日志格式(mixed,statement,row)
binlog_format=row

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)配置完成重啟mysql實例

# 重啟mysql命令
systemctl restart mysqld

登錄mysql后校驗是否開啟

# 登錄mysql,需要密碼輸入
mysql -uroot -p
# 查看日志開啟的sql
show variables like '%log_bin%'; 
mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+
6 rows in set (0.12 sec)

查看是否為row模式

mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.02 sec)

2.創(chuàng)建并賦權(quán)從庫賬號

創(chuàng)建從庫權(quán)限賬號canal,用于訂閱binlog

#創(chuàng)建用戶,密碼自己填寫,由于創(chuàng)建用戶時默認(rèn)的密碼加密方式為caching_sha2_password,所以修改為mysql_native_password,否則服務(wù)端啟動時可能會報錯
create user 'canal'@'%' identified with mysql_native_password by 'Password@123';

# 給新創(chuàng)建賬戶賦予從庫權(quán)限
 grant select, replication slave, replication client on *.* to 'canal'@'%';
 
# 刷新權(quán)限
flush privileges;

3.創(chuàng)建測試數(shù)據(jù)庫

 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;

創(chuàng)建測試數(shù)據(jù)表

CREATE TABLE `test_book` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '題名',
  `isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
  `author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
  `publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC

ES環(huán)境搭建

1.創(chuàng)建索引

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

2.建立映射

{
  "properties": {
    "id": {
      "type": "long"
    },
    "title": {
      "type": "text"
    },
    "isbn": {
      "type": "text"
    },
    "author": {
      "type": "text"
    },
    "publisherName": {
      "type": "text"
    }
  }
}

canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

canal的下載部署

下載canal

下載地址:https://github.com/alibaba/canal/releases
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
下載解壓到服務(wù)器指定目錄,并給文件夾及其子文件賦權(quán)
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

chmod -R 755 /home/canal-1.1.5/

配置服務(wù)端 canal-deployer

canal-deployer偽裝成mysql的從庫,監(jiān)聽binlog接收數(shù)據(jù),目錄結(jié)構(gòu)如下:
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

1.修改配置/conf/canal.properties

#canal的server地址:127.0.0.1,除了ip和port外,其他配置可不改動
canal.ip =127.0.0.1
#canal端口,用于客戶端監(jiān)聽
canal.port = 11111

2.修改配置/conf/example/instance.properties

#被同步的mysql地址
canal.instance.master.address=127.0.0.1:3306
#數(shù)據(jù)庫從庫權(quán)限賬號
canal.instance.dbUsername=canal
#數(shù)據(jù)庫從庫權(quán)限賬號的密碼
canal.instance.dbPassword=Password@123
#數(shù)據(jù)庫連接編碼 
canal.instance.connectionCharset = UTF-8 
#需要訂閱binlog的表過濾正則表達(dá)式
#canal.instance.filter.regex=.*\\..*
#我們只監(jiān)聽數(shù)據(jù)同步表
canal.instance.filter.regex=cxstar_oa.data_sync_es
#這里與文件夾名保持一致,后面會用到
canal.mq.topic=example

3.啟動canal-deployer

進(jìn)入bin目錄,執(zhí)行啟動命令:

./startup.sh

查看日志:/logs/canal/canal.log

2023-02-02 15:28:16.016 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-02 15:28:16.043 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-02 15:28:16.054 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-02 15:28:16.112 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
2023-02-02 15:28:17.824 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看日志:/logs/canal/canal.log

2023-02-02 15:28:17.590 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-02 15:28:17.757 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-02 15:28:17.776 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-02 15:28:17.776 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2023-02-02 15:28:18.382 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000008,position=12380,serverId=101,gtid=,timestamp=1675309792000] cost : 610ms , the next step is binlog dump

日志如上就已經(jīng)成功啟動

可能的問題: caching_sha2_password Auth failed
原因:
使用mysql版本為8.0,而創(chuàng)建用戶時默認(rèn)的密碼加密方式為caching_sha2_password,所以修改為mysql_native_password

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '密碼'; #更新一下用戶密碼
FLUSH PRIVILEGES; #刷新權(quán)限

配置客戶端canal-adapter

canal-adapter:作為canal的客戶端,會從canal-server中獲取數(shù)據(jù),然后同步數(shù)據(jù)到MySQL、Elasticsearch等存儲中去。目錄結(jié)構(gòu)如下:
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

1.替換client-adapter.es7的jar文件

下載v1.1.5-alpha-2,解壓后找到plugin目錄下的
client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
增加執(zhí)行權(quán)限

chmod -R 755 /home/canal-1.1.5/canal.adapter-1.1.5/plugin

關(guān)于為何這么做的解釋,如果不替換,在啟動的時候會報錯:
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

2.修改配置/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
    
canal.conf:
  mode: tcp # 客戶端的模式,可選tcp kafka rocketMQ
  flatMessage: true # 扁平message開關(guān), 是否以json字符串形式投遞數(shù)據(jù), 僅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 對應(yīng)集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批數(shù)量
  retries: 0 # 重試次數(shù), -1為無限重試
  timeout: # 同步超時時間, 單位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #設(shè)置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
 
  srcDataSources: # 源數(shù)據(jù)庫配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true #測試數(shù)據(jù)庫連接
      username: root #數(shù)據(jù)庫賬號
      password: Passwd@2014 #數(shù)據(jù)庫密碼
  canalAdapters: # 適配器列表
  - instance: example # canal實例名或者M(jìn)Q topic名
    groups: # 分組列表
    - groupId: g1 # 分組id, 如果是MQ模式將用到該值
      outerAdapters:
      - name: logger # 日志打印適配器
      - name: es7 # ES同步適配器
        hosts: 192.168.0.182:9200 # ES連接地址
        properties:
          mode: rest # 模式可選transport(9300) 或者 rest(9200)
          #security.auth: elastic:123456 #  連接es的用戶和密碼,僅rest模式使用
          cluster.name: elasticsearch # ES集群名稱, 與es目錄下 elasticsearch.yml文件cluster.name對應(yīng)

3.增加mysql同步es的映射文件

進(jìn)入/conf/es7目錄下,復(fù)制mytest_user.yml命名為test_book.yml,同時修改:

dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值
destination: example  # canal的instance或者M(jìn)Q的topic
groupId: g1 # 對應(yīng)MQ模式下的groupId, 只會同步對應(yīng)groupId的數(shù)據(jù)
esMapping:
  _index: test_book # es 的索引名稱
  _id: _id  # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  sql: "SELECT
         tb.id AS _id,
         tb.title,
         tb.isbn,
         tb.author,
         tb.publisher_name as publisherName
        FROM
         test_book tb"        # sql映射
  etlCondition: "where p.id>={}"   #etl的條件參數(shù)
  commitBatch: 3000   # 提交批大小

4.啟動canal-adapter

啟動canal-adapter,進(jìn)入bin目錄,執(zhí)行啟動命令:

./startup.sh

日志如下即表示啟動成功

2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-04-02 13:22:52.338 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.372 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-04-02 13:22:52.679 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-04-02 13:22:52.751 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-04-02 13:22:52.951 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-04-02 13:22:52.960 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.982 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: cxstar_oa-g1 succeed
2023-04-02 13:22:52.983 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-04-02 13:22:52.983 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: cxstar_oa <=============
2023-04-02 13:22:52.990 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-04-02 13:22:52.991 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-04-02 13:22:53.013 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-04-02 13:22:53.028 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.259 seconds (JVM running for 6.378)
2023-04-02 13:22:53.081 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: cxstar_oa succeed <=============


5.canal-adapter啟動可能的報錯問題

1.com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

[main] ERROR com.alibaba.druid.pool.DruidDataSource - init datasource error, url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 216 milliseconds ago.  The last packet sent successfully to the server was 210 milliseconds ago.

解決方法:/conf/application.yml 中的mysql連接去除&useSSL=true

2.com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]

原因:druid 包沖突
解決方法:
方法1.下載源碼包 ,修改client-adapter/escore/pom.xml

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <scope>provided</scope>
 </dependency>

打包后將client-adapter/es7x/target/client-adapter.es7x-1.1.5-jar-with-dependencies.jar上傳到服務(wù)器,替換adataper/plugin下的同名jar文件

方法2.下載v1.1.5-alpha-2,
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
找到plugin目錄下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jarcanal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
上傳到服務(wù)器 canal.adapter-1.1.5/plugin目錄下,同時刪除client-adapter.es7x-1.1.5-jar-with-dependencies.jar
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)
3.Load canal adapter: es7 failed,Name or service not known

 ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.net.UnknownHostException: http: Name or service not known
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]

解決方案:/conf/application.yml配置中 ,hosts不要帶http://
4.java.lang.NullPointerException: esMapping._type

ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type
java.lang.NullPointerException: esMapping._type
        at com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.validate(ESSyncConfig.java:35) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor$FileListener.onFileChange(ESConfigMonitor.java:102) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at org.apache.commons.io.monitor.FileAlterationObserver.doMatch(FileAlterationObserver.java:400) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:334) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:304) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationMonitor.run(FileAlterationMonitor.java:182) [commons-io-2.4.jar:2.4]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

解決方案:canal.adapter-1.1.5/conf/es7目錄下的yml中增加一個官方配置的屬性
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

hosts: 192.168.0.182:9200 # ES連接地址

驗證canal-adapter是否啟動成功
查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

[org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=2b76ff4e,type=ConfigurationPropertiesRebinder]
2023-02-03 09:34:13.373 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-02-03 09:34:13.374 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-02-03 09:34:13.375 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:13.418 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-02-03 09:34:13.643 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-02-03 09:34:13.726 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-02-03 09:34:13.995 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-02-03 09:34:14.005 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2023-02-03 09:34:14.029 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-02-03 09:34:14.037 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-02-03 09:34:14.039 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-02-03 09:34:14.067 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-02-03 09:34:14.080 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.221 seconds (JVM running for 5.807)
2023-02-03 09:34:14.169 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

同步測試

建立es索引和mysql表的映射

在客戶端目錄canal.adapter-1.1.5/conf/es7下配置字段的映射,adapter默認(rèn)會加載es路徑下的所有yml文件。一個配置文件表示一張表的mapping。
建立es和mysql的映射文件test_book.yml

dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值
destination: example  # canal的instance或者M(jìn)Q的topic
groupId: g1 # 對應(yīng)MQ模式下的groupId, 只會同步對應(yīng)groupId的數(shù)據(jù)
esVersion: es7
esMapping:
  _index: test_book # es 的索引名稱
  _id: _id  # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  sql: "SELECT
         b.id AS _id, 
         b.title,
         b.author,
         b.isbn,
         b.publisher_name as publisherName
        FROM
         test_book b"        # sql映射
  etlCondition: "where p.id>={}"   #etl的條件參數(shù)
  commitBatch: 5000   # 提交批大小

插入mysql數(shù)據(jù)驗證同步

INSERT INTO `canal`.`test_book`( `title`, `isbn`, `author`, `publisher_name`) VALUES (  '三體', '98741254125', '劉慈欣', '工業(yè)出版社');

查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

2023-02-03 10:18:21.988 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"title":"三體","isbn":"98741254125","author":"劉慈欣","publisher_name":"工業(yè)出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"}
2023-02-03 10:18:22.225 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":3,"title":"三體","isbn":"98741254125","author":"劉慈欣","publisher_name":"工業(yè)出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"} 
Affected indexes: test_book 

查看es數(shù)據(jù),已經(jīng)成功同步
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

數(shù)據(jù)同步

canal開啟前的數(shù)據(jù)如何同步

canal-adapter提供一個REST接口可全量同步數(shù)據(jù)到ES,調(diào)用Client-Adapter服務(wù)的方法觸發(fā)同步任務(wù)。此時,canal會先中止增量數(shù)據(jù)傳輸,然后同步全量數(shù)據(jù)。待全量數(shù)據(jù)同步完成后,canal會自動進(jìn)行增量數(shù)據(jù)同步。
注意:如果數(shù)據(jù)是binlog開啟前存在,則不可以使用此種方式

curl http://127.0.0.1:8081/etl/es7/test_book.yml -X POST

同步日志:

2023-02-03 10:41:35.043 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - start etl to import data to index: test_book
2023-02-03 10:41:35.130 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - 數(shù)據(jù)全量導(dǎo)入完成, 一共導(dǎo)入 3 條數(shù)據(jù), 耗時: 85

binlog未開啟前的歷史數(shù)據(jù)如何同步?

因為canal是基于binlog實現(xiàn)全量同步的,那么未開啟binlog之前的歷史數(shù)據(jù)就無法被同步,將數(shù)據(jù)庫中的數(shù)據(jù)導(dǎo)出再重新導(dǎo)入一遍,這樣就可以生成binlog

es數(shù)組類型同步

adapter配置文件中添加配置

  objFields:
    author: array:, #代表字段以,分割

配置更新后會監(jiān)聽到配置改變,無需重啟

2023-02-03 11:33:24.098 [Thread-3] INFO  c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - Change a es mapping config: test_book.yml of canal adapter

更新數(shù)據(jù),author字段

UPDATE `canal`.`test_book` SET `title` = '三體', `isbn` = '98741254125', `author` = '劉慈欣,劉電工', `publisher_name` = '工業(yè)出版社' WHERE `id` = 1;

es中的數(shù)據(jù)已改變
canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

多張表數(shù)據(jù)同步到一個索引中

yml映射文件中,主表一定要在最左側(cè),從表的數(shù)據(jù)改變也會自動同步到es中!
示例:journal_volume 表中的數(shù)據(jù)改變,也會自動同步到j(luò)ournal_paper 表對應(yīng)的es索引中

SELECT  
    jp.id AS _id, 
    jp.sid AS sid, 
    jp.import_id AS importId, 
    jp.journal_id AS journalId, 
    jp.journal_volume_id AS journalVolumeId, 
    jv.`year` as year,
    jv.volume as volume,
    jv.issue as issue,
    j.publisher_name as publisherName
FROM journal_paper jp 
left join journal_volume jv on jp.journal_volume_id=jv.id
left join journal j on j.id=jp.journal_id

可參考,待親自實現(xiàn)
https://blog.csdn.net/qq_24950043/article/details/122643889文章來源地址http://www.zghlxwxcb.cn/news/detail-401140.html

到了這里,關(guān)于canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Canal實時同步MySQL數(shù)據(jù)到ES

    Canal實時同步MySQL數(shù)據(jù)到ES

    canal主要用途是對MySQL數(shù)據(jù)庫增量日志進(jìn)行解析,提供增量數(shù)據(jù)的訂閱和消費,簡單說就是可以對MySQL的增量數(shù)據(jù)進(jìn)行實時同步,支持同步到MySQL、Elasticsearch、HBase等數(shù)據(jù)存儲中去。 早期阿里巴巴因為杭州和美國雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實現(xiàn)方式主要是基于業(yè)

    2024年02月04日
    瀏覽(23)
  • canal實現(xiàn)mysql數(shù)據(jù)實時同步到es

    最近有一個需求:原有一些mysql數(shù)據(jù),這些數(shù)據(jù)量很大,且包含文本信息,需要對其進(jìn)行搜索,這時如果使用mysql的like來匹配,效率會很低,且很可能影響整個系統(tǒng)的運行,經(jīng)過和同事的討論,最終決定使用es來做搜索。 但是源數(shù)據(jù)有很多關(guān)聯(lián)關(guān)系,搜索的時候也會帶上這些

    2024年02月16日
    瀏覽(87)
  • docker安裝canal1.1.5監(jiān)控mysql的binlog日志并配置rocketmq進(jìn)行數(shù)據(jù)同步到elasticsearch(超級大干貨)

    docker安裝canal1.1.5監(jiān)控mysql的binlog日志并配置rocketmq進(jìn)行數(shù)據(jù)同步到elasticsearch(超級大干貨)

    1、直接拉取canal鏡像 2、創(chuàng)建canal文件夾,用來存在容器掛載到宿主機(jī)的目錄或文件(注:本實例在/home下操作) 3、先啟動canal容器,把需要掛載的目錄都copy出來,本例子只掛載了conf和logs目錄(自己還想掛載啥東西就進(jìn)去容器里面看看唄,docker exec -it canal /bin/bash) ??4、第

    2024年02月07日
    瀏覽(21)
  • MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    前幾天在網(wǎng)上沖浪的時候發(fā)現(xiàn)了一個比較成熟的開源中間件——? Canal? 。在了解了它的工作原理和使用場景后,頓時產(chǎn)生了濃厚的興趣。今天,就讓我們跟隨我的腳步,一起來揭開它神秘的面紗吧。 目錄 前言 簡介? 工作原理? MySQL主備復(fù)制原理 canal 工作原理 Canal架構(gòu)? C

    2024年02月20日
    瀏覽(25)
  • 基于Canal同步MySQL數(shù)據(jù)到Elasticsearch

    基于Canal同步MySQL數(shù)據(jù)到Elasticsearch

    基于 canal 同步 mysql 的數(shù)據(jù)到 elasticsearch 中。 相關(guān)軟件的安裝請參考:《Canal實現(xiàn)數(shù)據(jù)同步》 1.1 pom依賴 1.2 SimpleCanalClientExample編寫 注意當(dāng)后面 canal-adapter 也連接上 canal-server 后,程序就監(jiān)聽不到數(shù)據(jù)變化了。 這個類只是測試,下面不使用。 由于目前 canal-adapter 沒有官方dock

    2024年02月07日
    瀏覽(87)
  • docker環(huán)境安裝mysql、canal、elasticsearch,基于binlog利用canal實現(xiàn)mysql的數(shù)據(jù)同步到elasticsearch中

    docker環(huán)境安裝mysql、canal、elasticsearch,基于binlog利用canal實現(xiàn)mysql的數(shù)據(jù)同步到elasticsearch中

    ?? 本文提供的指令完全可以按順序逐一執(zhí)行,已進(jìn)行了多次測試。因此如果你是直接按照我本文寫的指令一條條執(zhí)行的,而非自定義修改過,執(zhí)行應(yīng)當(dāng)是沒有任何問題的。 ?? 本文講述:使用docker環(huán)境安裝mysql、canal、elasticsearch,基于binlog利用canal實現(xiàn)mysql的數(shù)據(jù)同步到elas

    2024年02月02日
    瀏覽(94)
  • docker安裝canal入門實戰(zhàn),同步mysql數(shù)據(jù)到elasticsearch

    docker安裝canal入門實戰(zhàn),同步mysql數(shù)據(jù)到elasticsearch

    官方docker安裝說明文檔:https://github.com/alibaba/canal/wiki/Docker-QuickStart canal.adapter canal 1.1.1版本之后, 增加客戶端數(shù)據(jù)落地的適配及啟動功能, 目前支持功能: 客戶端啟動器 同步管理REST接口 日志適配器, 作為DEMO 關(guān)系型數(shù)據(jù)庫的數(shù)據(jù)同步(表對表同步), ETL功能 HBase的數(shù)據(jù)同步(表對表

    2024年02月04日
    瀏覽(19)
  • Springcloud Alibaba使用Canal將Mysql數(shù)據(jù)實時同步到Redis保證緩存的一致性

    Springcloud Alibaba使用Canal將Mysql數(shù)據(jù)實時同步到Redis保證緩存的一致性

    目錄 ? 1. 背景 2. Windows系統(tǒng)安裝canal 3.Mysql準(zhǔn)備工作 4. 公共依賴包 5. Redis緩存設(shè)計 6. mall-canal-service ? canal [k?\\\'n?l] ,譯意為水道/管道/溝渠,主要用途是 基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費 。其誕生的背景是早期阿里巴巴因為杭州和美國雙機(jī)房部署,存

    2024年02月03日
    瀏覽(20)
  • Canal —— 一款 MySql 實時同步到 ES 的阿里開源神器

    Canal —— 一款 MySql 實時同步到 ES 的阿里開源神器

    目錄 一. 前言 二. Canal 簡介和使用場景 2.1. Canal 簡介 2.2. Canal 使用場景 三. Canal Server 設(shè)計 3.1. 整體設(shè)計 3.2. EventParser 設(shè)計 3.3.?CanalLogPositionManager 設(shè)計 3.4.?CanalHAController 類圖設(shè)計 3.5.?EventSink 類圖設(shè)計和擴(kuò)展 3.6.?EventStore 類圖設(shè)計和擴(kuò)展 3.7.?MetaManager 類圖設(shè)計和擴(kuò)展 四. Can

    2024年01月25日
    瀏覽(26)
  • 實時同步ES技術(shù)選型:Mysql+Canal+Adapter+ES+Kibana

    實時同步ES技術(shù)選型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精簡操作而來 讓ELK在同一個docker網(wǎng)絡(luò)下通過名字直接訪問 Ubuntu服務(wù)器ELK部署與實踐 使用 Docker 部署 canal 服務(wù)實現(xiàn)MySQL和ES實時同步 Docker部署ES服務(wù),canal全量同步的時候內(nèi)存爆炸,ES/Canal Adapter自動關(guān)閉,CPU100% 2.1 新建mysql docker 首先新建數(shù)據(jù)庫的docker鏡像

    2024年02月11日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包