国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理

這篇具有很好參考價值的文章主要介紹了Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

關于Canal的介紹及原理不在此贅述,可自行查閱。筆者在使用Canal同步Mysql實時操作記錄至RabbitMQ的過程中,也翻閱了一些大牛們的文章,可能是我使用的Canal版本與文中版本不一致,出現(xiàn)了一些問題,在此總結記錄一下可行的方案。
注:本文使用的Canal為 v1.1.7

一、Mysql數(shù)據(jù)庫開啟bin_log

  • 先查看目標數(shù)據(jù)庫是否開啟bin_log
SHOW VARIABLES LIKE 'log_bin'

如結果中,log_bin的值為OFF則未開啟,為ON則已開啟。
Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

  • 如未開啟,可編輯Mysql配置文件:/etc/my.cnf
[mysqld]
log-bin=mysql-bin # 開啟binlog
binlog-format=ROW # 選擇ROW模式
server_id=1 # 配置MySQL replaction需要定義,不和Canal的slaveId重復即可

重啟MySQL ,再次通過上一步查看配置是否生效。

二、數(shù)據(jù)庫創(chuàng)建新用戶

  • 創(chuàng)建專用于數(shù)據(jù)同步的新用戶
-- 創(chuàng)建一個新用戶,名稱可自行定義
create user canal@'%' IDENTIFIED by 'canal';
-- 為新用戶授權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新緩存中的用戶數(shù)據(jù)
FLUSH PRIVILEGES;

三、配置RabhitMQ

以下使用的名稱均可自行定義,保證唯一即可

1. 添加交換機

Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

2. 添加隊列

Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

3. 綁定交換機與隊列,設置 Routing key

Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

四、下載、配置、運行Canal(windows環(huán)境)

1. 下載服務端

  • 可到以下地址下載所需版本的包:github-alibaba-canal
    本文使用較新的 v1.1.7 。
    Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

  • 選擇下載 canal.deployer-1.1.7.tar.gz。
    Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

2. 配置

  • 解壓下載包,獲得如下文件。
    Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫
  • 編輯:conf\canal.properties(僅列出需要修改的配置項)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
# host 無需添加端口號
rabbitmq.host = 192.168.0.2
# 填寫 / 即可
rabbitmq.virtual.host = /
# RabbitMQ的用戶名、密碼
rabbitmq.username = admin
rabbitmq.password = 123456
# 上文配置的交換機(exchange)名稱:Name
rabbitmq.exchange = canal.exchange
# 交換機類型:Type
rabbitmq.deliveryMode = direct

# 以下兩個字段為自行添加,否則會報空指針異常
# 隊列(queue)名稱:Name
rabbitmq.queue = canal.queue
# 綁定隊列-交換機時的路由秘鑰:Routing key
rabbitmq.routingKey = canal.routing.key
  • 編輯:conf\example\instance.properties(僅列出需要修改的配置項)
# 目標數(shù)據(jù)庫地址
canal.instance.master.address=192.168.0.1:3306
# 目標數(shù)據(jù)庫用戶名密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123

# 表過濾正則表達式(按需修改)
# 全庫全表 : .*\\..*
# 指定庫所有表:  庫名\..*   例:test\..*
# 單表:  庫名.表名  例:test.user
# 多規(guī)則組合使用:  庫名1\..*,庫名2.表名1,庫名3.表名2 (逗號分隔)  例 test\..*,test2.user1,test3.user2 (逗號分隔)
canal.instance.filter.regex=.*\\..*
# canal.instance.filter.regex=project.sys_user,project.sys_role

3. 運行

windows環(huán)境下直接運行bin\startup.bat,linux環(huán)境下執(zhí)行bin\startup.sh。
執(zhí)行啟動腳本后,查看日志信息logs\canal\canal.log,出現(xiàn)如下信息,表示啟動成功。

[main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

五、測試

對監(jiān)聽的數(shù)據(jù)庫表做修改操作,至RabbitMQ控制臺的隊列中查看是否插入消息。
如下,即成功插入實時操作數(shù)據(jù)。
Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

六、項目中監(jiān)聽處理

  • 創(chuàng)建一個maven項目

Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理,mysql,rabbitmq,數(shù)據(jù)庫

  • pom.xml中引入spring-boot-starter-amqp依賴,此包集成了對RabbitMQ的支持。
	<!-- RabbitMQ 集成支持 -->
	<dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	
	<!-- fastjson 解析數(shù)據(jù) -->
	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>fastjson</artifactId>
	    <version>2.0.9.graal</version>
	</dependency>
  • 修改配置文件application.yml(此處已按個人偏好,文件類型改為yaml),配置RabbitMQ。
spring:
  rabbitmq:
    host: 192.168.0.2
    port: 5672
    username: admin
    password: 123456
  • binLog數(shù)據(jù)實體類BinLogEntity
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Data
public class BinLogEntity {
    /**
     * 數(shù)據(jù)庫
     */
    private String database;

    /**
     * 表
     */
    private String table;

    /**
     * 操作類型
     */
    private String type;

    /**
     * 操作數(shù)據(jù)
     */
    private JSONArray data;

    /**
     * 變更前數(shù)據(jù)
     */
    private JSONArray old;

    /**
     * 主鍵名稱
     */
    private JSONArray pkNames;

    /**
     * 執(zhí)行sql語句
     */
    private String sql;
    
    private Long es;
    private String gtid;
    private Long id;
    private Boolean isDdl;
    private JSONObject mysqlType;
    private JSONObject sqlType;
    private Long ts;

    public <T> List<T> getData(Class<T> clazz) {
        if (this.data == null || this.data.size() == 0) {
            return null;
        }
        return this.data.toJavaList(clazz);
    }

    public <T> List<T> getOld(Class<T> clazz) {
        if (this.old == null || this.old.size() == 0) {
            return null;
        }
        return this.old.toJavaList(clazz);
    }

    public List<String> getPkNames() {
        if (this.pkNames == null || this.pkNames.size() == 0) {
            return null;
        }
        List<String> pkNames = new ArrayList<>();
        for (Object pkName : this.pkNames){
            pkNames.add(pkName.toString());
        }
        return pkNames;
    }

    public Map<String, String> getMysqlType() {
        if(this.mysqlType == null){
            return null;
        }
        Map<String, String> mysqlTypeMap = new HashMap<>();
        this.mysqlType.forEach((k, v) -> {
            mysqlTypeMap.put(k, v.toString());
        });
        return mysqlTypeMap;
    }

    public Map<String, Integer> getSqlType() {
        if(this.sqlType == null){
            return null;
        }
        Map<String, Integer> sqlTypeMap = new HashMap<>();
        this.sqlType.forEach((k, v) -> {
            sqlTypeMap.put(k, Integer.valueOf(v.toString()));
        });
        return sqlTypeMap;
    }
}
  • 操作數(shù)據(jù)實體類
@Data
public class User implements Serializable {
	private static final long serialVersionUID = 1L;

	/**
	 * ID
	 */
	private Long id;

	/**
	 * 姓名
	 */
	private String name;

	/**
	 * 年齡
	 */
	private Integer age;

	/**
	 * 電話
	 */
	private String phone;

}
  • 監(jiān)聽類CanalListener
import com.alibaba.fastjson.JSON;
import com.example.canalclient.entity.BinLogEntity;
import com.example.canalclient.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 監(jiān)聽數(shù)據(jù)庫數(shù)據(jù)變化時RabbitMQ發(fā)送的信息
 */
@Component
public class CanalListener {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
	public void handleDataChange(@Payload Message message) {
        // 獲取消息內容
        String content = new String(message.getBody(), StandardCharsets.UTF_8);
        // 反序列化
        BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class);
        // 獲取操作數(shù)據(jù)
        User user = binLog.getData(User.class).get(0);
        User oldUser = binLog.getOld(User.class).get(0);

        System.out.println("數(shù)據(jù)庫:" + binLog.getDatabase());
        System.out.println("表:" + binLog.getTable());
        System.out.println("操作類型:" + binLog.getType());
        System.out.println("主鍵:" + JSON.toJSONString(binLog.getPkNames()));
        System.out.println("數(shù)據(jù):" + JSON.toJSONString(User));
        System.out.println("原數(shù)據(jù):" + JSON.toJSONString(User));
        System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType()));
    }
}
  • 打印結果(修改操作)
數(shù)據(jù)庫:project
表:sys_user
操作類型:UPDATE
主鍵:["id"]
數(shù)據(jù):{
	"id": 1,
	"name": "張三",
	"age": 21,
	"phone": 13333333333
}
原數(shù)據(jù):{
	"age": 20,
	"phone": 12222222222
}
MysqlType:{
	"id": "bigint unsigned",
	"name": "varchar(50)",
	"age": "int(3) unsigned",
	"phone": "varchar(50)"
}

至此,已實現(xiàn)對目標數(shù)據(jù)庫實時操作數(shù)據(jù)進行監(jiān)聽,可根據(jù)不同的操作類型,采取相應的業(yè)務處理。

七、參考文章

Canal+Msql+RabbitMq數(shù)據(jù)庫同步配置,看這一篇就夠了

使用canal同步mysql數(shù)據(jù)庫信息到RabbitMQ

Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑文章來源地址http://www.zghlxwxcb.cn/news/detail-846547.html

到了這里,關于Canal同步Mysql實時操作日志至RabbitMQ,并實現(xiàn)監(jiān)聽及解析處理的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Canal —— 一款 MySql 實時同步到 ES 的阿里開源神器

    Canal —— 一款 MySql 實時同步到 ES 的阿里開源神器

    目錄 一. 前言 二. Canal 簡介和使用場景 2.1. Canal 簡介 2.2. Canal 使用場景 三. Canal Server 設計 3.1. 整體設計 3.2. EventParser 設計 3.3.?CanalLogPositionManager 設計 3.4.?CanalHAController 類圖設計 3.5.?EventSink 類圖設計和擴展 3.6.?EventStore 類圖設計和擴展 3.7.?MetaManager 類圖設計和擴展 四. Can

    2024年01月25日
    瀏覽(26)
  • canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

    canal實時同步mysql數(shù)據(jù)到elasticsearch(部署,配置,測試)(一)

    canal基于MySQL數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費,是阿里開源CDC工具,它可以獲取MySQL binlog數(shù)據(jù)并解析,然后將數(shù)據(jù)變動傳輸給下游?;赾anal,可以實現(xiàn)從MySQL到其他數(shù)據(jù)庫的實時同步 MySQL主備復制原理 MySQL master 將數(shù)據(jù)變更寫入二進制日志( binary log, 其中記錄叫

    2023年04月08日
    瀏覽(25)
  • MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    前幾天在網上沖浪的時候發(fā)現(xiàn)了一個比較成熟的開源中間件——? Canal? 。在了解了它的工作原理和使用場景后,頓時產生了濃厚的興趣。今天,就讓我們跟隨我的腳步,一起來揭開它神秘的面紗吧。 目錄 前言 簡介? 工作原理? MySQL主備復制原理 canal 工作原理 Canal架構? C

    2024年02月20日
    瀏覽(25)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 實現(xiàn)數(shù)據(jù)監(jiān)聽

    SpringCloud 整合 Canal+RabbitMQ+Redis 實現(xiàn)數(shù)據(jù)監(jiān)聽

    Canal 指的是阿里巴巴開源的數(shù)據(jù)同步工具,用于數(shù)據(jù)庫的實時增量數(shù)據(jù)訂閱和消費。它可以針對 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的異構數(shù)據(jù)同步等情況進行實時增量數(shù)據(jù)同步。 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步數(shù)據(jù)庫

    2024年02月03日
    瀏覽(19)
  • Springcloud Alibaba 使用Canal將MySql數(shù)據(jù)實時同步到Elasticsearch

    本篇文章在Springcloud Alibaba使用Canal將Mysql數(shù)據(jù)實時同步到Redis保證緩存的一致性-CSDN博客 基礎上使用canal將mysql數(shù)據(jù)實時同步到Elasticsearch。 公共包 實體類Sku @Column注解 用來標識實體類中屬性與數(shù)據(jù)表中字段的對應關系 name 定義了被標注字段在數(shù)據(jù)庫表中所對應字段的名稱;由

    2024年02月03日
    瀏覽(23)
  • 實時同步ES技術選型:Mysql+Canal+Adapter+ES+Kibana

    實時同步ES技術選型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精簡操作而來 讓ELK在同一個docker網絡下通過名字直接訪問 Ubuntu服務器ELK部署與實踐 使用 Docker 部署 canal 服務實現(xiàn)MySQL和ES實時同步 Docker部署ES服務,canal全量同步的時候內存爆炸,ES/Canal Adapter自動關閉,CPU100% 2.1 新建mysql docker 首先新建數(shù)據(jù)庫的docker鏡像

    2024年02月11日
    瀏覽(28)
  • 基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)

    基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    瀏覽(98)
  • Springcloud Alibaba使用Canal將Mysql數(shù)據(jù)實時同步到Redis保證緩存的一致性

    Springcloud Alibaba使用Canal將Mysql數(shù)據(jù)實時同步到Redis保證緩存的一致性

    目錄 ? 1. 背景 2. Windows系統(tǒng)安裝canal 3.Mysql準備工作 4. 公共依賴包 5. Redis緩存設計 6. mall-canal-service ? canal [k?\\\'n?l] ,譯意為水道/管道/溝渠,主要用途是 基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費 。其誕生的背景是早期阿里巴巴因為杭州和美國雙機房部署,存

    2024年02月03日
    瀏覽(20)
  • 基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一),計算機畢設源碼要提交嗎

    基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一),計算機畢設源碼要提交嗎

    配置修改 修改conf/example/instance.properties,修改內容如下: canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的數(shù)據(jù)庫信息 canal.instance.master.address = kms-1.apache.com:3306 #username/password,需要改成自己的數(shù)據(jù)庫信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.mq.topic

    2024年04月12日
    瀏覽(38)
  • Canal+Kafka實現(xiàn)Mysql數(shù)據(jù)同步

    Canal+Kafka實現(xiàn)Mysql數(shù)據(jù)同步

    canal [k?\\\'n?l] ,譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費 canal可以用來監(jiān)控數(shù)據(jù)庫數(shù)據(jù)的變化,從而獲得新增數(shù)據(jù),或者修改的數(shù)據(jù)。 canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業(yè)務需求而提出的。

    2024年02月12日
    瀏覽(89)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包