前言
關于Canal的介紹及原理不在此贅述,可自行查閱。筆者在使用Canal同步Mysql實時操作記錄至RabbitMQ的過程中,也翻閱了一些大牛們的文章,可能是我使用的Canal版本與文中版本不一致,出現(xiàn)了一些問題,在此總結記錄一下可行的方案。
注:本文使用的Canal為 v1.1.7
一、Mysql數(shù)據(jù)庫開啟bin_log
- 先查看目標數(shù)據(jù)庫是否開啟
bin_log
SHOW VARIABLES LIKE 'log_bin'
如結果中,log_bin的值為OFF則未開啟,為ON則已開啟。
- 如未開啟,可編輯Mysql配置文件:
/etc/my.cnf
。
[mysqld]
log-bin=mysql-bin # 開啟binlog
binlog-format=ROW # 選擇ROW模式
server_id=1 # 配置MySQL replaction需要定義,不和Canal的slaveId重復即可
重啟MySQL ,再次通過上一步查看配置是否生效。
二、數(shù)據(jù)庫創(chuàng)建新用戶
- 創(chuàng)建專用于數(shù)據(jù)同步的新用戶
-- 創(chuàng)建一個新用戶,名稱可自行定義
create user canal@'%' IDENTIFIED by 'canal';
-- 為新用戶授權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新緩存中的用戶數(shù)據(jù)
FLUSH PRIVILEGES;
三、配置RabhitMQ
以下使用的名稱均可自行定義,保證唯一即可
1. 添加交換機
2. 添加隊列
3. 綁定交換機與隊列,設置 Routing key
四、下載、配置、運行Canal(windows環(huán)境)
1. 下載服務端
-
可到以下地址下載所需版本的包:github-alibaba-canal
本文使用較新的v1.1.7
。 -
選擇下載
canal.deployer-1.1.7.tar.gz
。
2. 配置
- 解壓下載包,獲得如下文件。
- 編輯:
conf\canal.properties
(僅列出需要修改的配置項)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
# host 無需添加端口號
rabbitmq.host = 192.168.0.2
# 填寫 / 即可
rabbitmq.virtual.host = /
# RabbitMQ的用戶名、密碼
rabbitmq.username = admin
rabbitmq.password = 123456
# 上文配置的交換機(exchange)名稱:Name
rabbitmq.exchange = canal.exchange
# 交換機類型:Type
rabbitmq.deliveryMode = direct
# 以下兩個字段為自行添加,否則會報空指針異常
# 隊列(queue)名稱:Name
rabbitmq.queue = canal.queue
# 綁定隊列-交換機時的路由秘鑰:Routing key
rabbitmq.routingKey = canal.routing.key
- 編輯:
conf\example\instance.properties
(僅列出需要修改的配置項)
# 目標數(shù)據(jù)庫地址
canal.instance.master.address=192.168.0.1:3306
# 目標數(shù)據(jù)庫用戶名密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123
# 表過濾正則表達式(按需修改)
# 全庫全表 : .*\\..*
# 指定庫所有表: 庫名\..* 例:test\..*
# 單表: 庫名.表名 例:test.user
# 多規(guī)則組合使用: 庫名1\..*,庫名2.表名1,庫名3.表名2 (逗號分隔) 例 test\..*,test2.user1,test3.user2 (逗號分隔)
canal.instance.filter.regex=.*\\..*
# canal.instance.filter.regex=project.sys_user,project.sys_role
3. 運行
windows環(huán)境下直接運行bin\startup.bat
,linux環(huán)境下執(zhí)行bin\startup.sh
。
執(zhí)行啟動腳本后,查看日志信息logs\canal\canal.log
,出現(xiàn)如下信息,表示啟動成功。
[main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
五、測試
對監(jiān)聽的數(shù)據(jù)庫表做修改操作,至RabbitMQ控制臺的隊列中查看是否插入消息。
如下,即成功插入實時操作數(shù)據(jù)。
六、項目中監(jiān)聽處理
- 創(chuàng)建一個maven項目
- 在
pom.xml
中引入spring-boot-starter-amqp
依賴,此包集成了對RabbitMQ的支持。
<!-- RabbitMQ 集成支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- fastjson 解析數(shù)據(jù) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.9.graal</version>
</dependency>
- 修改配置文件
application.yml
(此處已按個人偏好,文件類型改為yaml),配置RabbitMQ。
spring:
rabbitmq:
host: 192.168.0.2
port: 5672
username: admin
password: 123456
- binLog數(shù)據(jù)實體類
BinLogEntity
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Data
public class BinLogEntity {
/**
* 數(shù)據(jù)庫
*/
private String database;
/**
* 表
*/
private String table;
/**
* 操作類型
*/
private String type;
/**
* 操作數(shù)據(jù)
*/
private JSONArray data;
/**
* 變更前數(shù)據(jù)
*/
private JSONArray old;
/**
* 主鍵名稱
*/
private JSONArray pkNames;
/**
* 執(zhí)行sql語句
*/
private String sql;
private Long es;
private String gtid;
private Long id;
private Boolean isDdl;
private JSONObject mysqlType;
private JSONObject sqlType;
private Long ts;
public <T> List<T> getData(Class<T> clazz) {
if (this.data == null || this.data.size() == 0) {
return null;
}
return this.data.toJavaList(clazz);
}
public <T> List<T> getOld(Class<T> clazz) {
if (this.old == null || this.old.size() == 0) {
return null;
}
return this.old.toJavaList(clazz);
}
public List<String> getPkNames() {
if (this.pkNames == null || this.pkNames.size() == 0) {
return null;
}
List<String> pkNames = new ArrayList<>();
for (Object pkName : this.pkNames){
pkNames.add(pkName.toString());
}
return pkNames;
}
public Map<String, String> getMysqlType() {
if(this.mysqlType == null){
return null;
}
Map<String, String> mysqlTypeMap = new HashMap<>();
this.mysqlType.forEach((k, v) -> {
mysqlTypeMap.put(k, v.toString());
});
return mysqlTypeMap;
}
public Map<String, Integer> getSqlType() {
if(this.sqlType == null){
return null;
}
Map<String, Integer> sqlTypeMap = new HashMap<>();
this.sqlType.forEach((k, v) -> {
sqlTypeMap.put(k, Integer.valueOf(v.toString()));
});
return sqlTypeMap;
}
}
- 操作數(shù)據(jù)實體類
@Data
public class User implements Serializable {
private static final long serialVersionUID = 1L;
/**
* ID
*/
private Long id;
/**
* 姓名
*/
private String name;
/**
* 年齡
*/
private Integer age;
/**
* 電話
*/
private String phone;
}
- 監(jiān)聽類
CanalListener
import com.alibaba.fastjson.JSON;
import com.example.canalclient.entity.BinLogEntity;
import com.example.canalclient.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* 監(jiān)聽數(shù)據(jù)庫數(shù)據(jù)變化時RabbitMQ發(fā)送的信息
*/
@Component
public class CanalListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(@Payload Message message) {
// 獲取消息內容
String content = new String(message.getBody(), StandardCharsets.UTF_8);
// 反序列化
BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class);
// 獲取操作數(shù)據(jù)
User user = binLog.getData(User.class).get(0);
User oldUser = binLog.getOld(User.class).get(0);
System.out.println("數(shù)據(jù)庫:" + binLog.getDatabase());
System.out.println("表:" + binLog.getTable());
System.out.println("操作類型:" + binLog.getType());
System.out.println("主鍵:" + JSON.toJSONString(binLog.getPkNames()));
System.out.println("數(shù)據(jù):" + JSON.toJSONString(User));
System.out.println("原數(shù)據(jù):" + JSON.toJSONString(User));
System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType()));
}
}
- 打印結果(修改操作)
數(shù)據(jù)庫:project
表:sys_user
操作類型:UPDATE
主鍵:["id"]
數(shù)據(jù):{
"id": 1,
"name": "張三",
"age": 21,
"phone": 13333333333
}
原數(shù)據(jù):{
"age": 20,
"phone": 12222222222
}
MysqlType:{
"id": "bigint unsigned",
"name": "varchar(50)",
"age": "int(3) unsigned",
"phone": "varchar(50)"
}
至此,已實現(xiàn)對目標數(shù)據(jù)庫實時操作數(shù)據(jù)進行監(jiān)聽,可根據(jù)不同的操作類型,采取相應的業(yè)務處理。
七、參考文章
Canal+Msql+RabbitMq數(shù)據(jù)庫同步配置,看這一篇就夠了
使用canal同步mysql數(shù)據(jù)庫信息到RabbitMQ文章來源:http://www.zghlxwxcb.cn/news/detail-846547.html
Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑文章來源地址http://www.zghlxwxcb.cn/news/detail-846547.html
到了這里,關于Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!