問題概述
canal
https://github.com/alibaba/canal
功能
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業(yè)務 cache 刷新
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理
傳統(tǒng)mysql主從復制原理
MySQL的主從復制將經(jīng)過如下步驟: - 當 master 主服務器上的數(shù)據(jù)發(fā)生改變時,則將其改變寫入二進制事件日志文件中;
- salve 從服務器會在一定時間間隔內對 master 主服務器上的二進制日志進行探測,探測其是否發(fā)生過改變,
如果探測到 master 主服務器的二進制事件日志發(fā)生了改變,則開始一個 I/O Thread 請求 master 二進制事件日志; - 同時 master 主服務器為每個 I/O Thread 啟動一個dump Thread,用于向其發(fā)送二進制事件日志;
- slave 從服務器將接收到的二進制事件日志保存至自己本地的中繼日志文件中;
Canal原理’
安裝部署
https://github.com/alibaba/canal/releases/tag/canal-1.1.6
mysql配置
# 查看mysql版本
SELECT VERSION();
# 查看是否開啟bin_log
SHOW VARIABLES LIKE 'log_bin';
mysql中my.ini配置
linux為my.cnf
# Linux 尋找my.cnf命令
find / -name my.cnf
# my.ini
log-bin=mysql-bin #開啟 binlog
binlog-format=ROW #選擇 ROW 模式
server_id=1 #配置MySQL replaction需要定義,不要和canal的 slaveId重復
- ROW模式 除了記錄sql語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行數(shù)據(jù)的變化歷史,但會占用較多的空間。
- STATEMENT模式只記錄了sql語句,但是沒有記錄上下文信息,在進行數(shù)據(jù)恢復的時候可能會導致數(shù)據(jù)的丟失情況;
- MIX模式比較靈活的記錄,理論上說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了數(shù)據(jù)更新或者刪除情況下就會變?yōu)閞ow模式;
重啟mysql
授權canal連接mysql賬號
# 查詢現(xiàn)有賬戶
SELECT * FROM mysql.`user`
新建canal用戶
# 如果存在名為 'canal' 的用戶,且允許從任何主機 '%' 登錄,則刪除該用戶。
DROP USER IF EXISTS 'canal'@'%';
# 創(chuàng)建一個名為 'canal' 的用戶,允許從任何主機 '%' 登錄,密碼為 'canal'。
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 賦予 'canal' 用戶在所有數(shù)據(jù)庫中的所有表的全部權限,并使用密碼 'canal'。
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新 MySQL 的權限表,使修改后的權限立即生效。
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;
canal服務端
下載
https://github.com/alibaba/canal/releases/tag/canal-1.1.6
解壓.tar.gz配置
修改conf/example/instance.properties
配置mysql主機ip
配置mysql賬號密碼
啟動
啟動腳本bin/startup.sh
前提:安裝好java8環(huán)境
./start.sh
查看日志
server日志:logs/canal.log
樣例日志:logs/example.log
canal客戶端(Java程序)
# 選個數(shù)據(jù)庫,以你自己為主,本例bigdata,按照下面建表
CREATE TABLE `t_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`userName` varchar(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
pom引入canal
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.canal</groupId>
<artifactId>canal_demo02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mapper.version>4.1.5</mapper.version>
<mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
</properties>
<dependencies>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!--SpringBoot通用依賴模塊-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot與Redis整合依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--SpringBoot與AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!--Mysql數(shù)據(jù)庫驅動-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SpringBoot集成druid連接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--通用基礎配置junit/devtools/test/log4j/lombok/hutool-->
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0.2</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
YML
# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/上面的數(shù)據(jù)庫?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false
主啟動
業(yè)務類
public class RedisUtils
{
public static final String REDIS_IP_ADDR = "192.168.1.1";
public static final String REDIS_pwd = "";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(5);
jedisPoolConfig.setMaxIdle(5);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,30000,REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool fail");
}
}
https://github.com/alibaba/canal/wiki/ClientExample
CANAL_IP_ADDR:CANAL服務端ip地址
Canal默認端口11111文章來源:http://www.zghlxwxcb.cn/news/detail-832833.html
public class RedisCanalClientExample
{
public static final Integer _60SECONDS = 60;
public static final String CANAL_IP_ADDR = "192.168.111.185";
private static void redisInsert(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.del(columns.get(0).getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//獲取變更的row數(shù)據(jù)
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//獲取變動類型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args)
{
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 創(chuàng)建鏈接canal服務端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
11111), "example", "", "");
int batchSize = 1000;
//空閑空轉計數(shù)器
int emptyCount = 0;
System.out.println("---------------------canal init OK,開始監(jiān)聽mysql變化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");
connector.subscribe("bigdata.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在監(jiān)聽:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//計數(shù)器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
System.out.println("已經(jīng)監(jiān)聽了"+totalEmptyCount+"秒,無任何消息,請重啟重試......");
} finally {
connector.disconnect();
}
}
}
connector.subscribe配置過濾
try-with-resources文章來源地址http://www.zghlxwxcb.cn/news/detail-832833.html
到了這里,關于Redis(十四)雙寫一致性工程案例的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!