1.安裝mysql
默認(rèn)安裝了mysql(版本8.0.x);
新創(chuàng)建用戶
-- 創(chuàng)建用戶 用戶名:canal 密碼:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
授權(quán)
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' with grant option;
flush privileges;
查看MySQL是否開啟binlog模式
show variables like 'log_bin';
查看當(dāng)前正在寫入的binlog日志:
show master status;
記住文件名和偏移量
2.安裝Canal
去官網(wǎng)下載頁面進(jìn)行下載;
我這里下載的是1.1.7的版本:
解壓canal.deployer-1.1.7.tar.gz,我們可以看到里面有五個(gè)文件夾:
打開配置文件conf/example/instance.properties,配置信息如下:
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后會(huì)自動(dòng)生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 數(shù)據(jù)庫地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=自己的日志名稱
# mysql主庫鏈接時(shí)起始的binlog偏移量
canal.instance.master.position=日志的偏移量
# mysql主庫鏈接時(shí)起始的binlog的時(shí)間戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服務(wù)器授權(quán)的賬號(hào)密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示監(jiān)聽所有表 也可以寫具體的表名,用,隔開
canal.instance.filter.regex=.*\\..*
# mysql 數(shù)據(jù)解析表的黑名單,多個(gè)表用,隔開
canal.instance.filter.black.regex=
開啟Canal服務(wù)端:
進(jìn)入bin目錄
.\startup.bat
3.Java客戶端操作
首先引入maven依賴:
<!--canal客戶端-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
然后創(chuàng)建一個(gè)CanalClient類文章來源:http://www.zghlxwxcb.cn/news/detail-858888.html
import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
try {
//打開連接
connector.connect();
//訂閱數(shù)據(jù)庫表,全部表
connector.subscribe(".*\\..*");
//回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒有ack的地方開始拿
connector.rollback();
while (true) {
// 獲取指定數(shù)量的數(shù)據(jù)
Message message = connector.getWithoutAck(BATCH_SIZE);
//獲取批量ID
long batchId = message.getId();
//獲取批量的數(shù)量
int size = message.getEntries().size();
//如果沒有數(shù)據(jù)
if (batchId == -1 || size == 0) {
try {
//線程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有數(shù)據(jù),處理數(shù)據(jù)
printEntry(message.getEntries());
}
//進(jìn)行 batch id 的確認(rèn)。確認(rèn)之后,小于等于此 batchId 的 Message 都會(huì)被確認(rèn)。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog獲得的實(shí)體類信息
*/
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
//開啟/關(guān)閉事務(wù)的實(shí)體類型,跳過
continue;
}
//RowChange對象,包含了一行數(shù)據(jù)變化的所有特征
//比如isDdl 是否是ddl變更操作 sql 具體的ddl sql beforeColumns afterColumns 變更前后的數(shù)據(jù)字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//獲取操作類型:insert/update/delete類型
EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判斷是否是DDL語句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//獲取RowChange對象里的每一行數(shù)據(jù),打印出來
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是刪除語句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增語句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的語句
} else {
//變更前的數(shù)據(jù)
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//變更后的數(shù)據(jù)
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
完成之后再對應(yīng)數(shù)據(jù)庫進(jìn)行操作,控制臺(tái)會(huì)打印對應(yīng)的操作,說明對數(shù)據(jù)的寫入操作進(jìn)行了有效的監(jiān)控;
注意只讀操作并不會(huì)寫入binlog也不會(huì)被Canal監(jiān)控到(也沒必要監(jiān)控讀取操作)。文章來源地址http://www.zghlxwxcb.cn/news/detail-858888.html
到了這里,關(guān)于Canal1--搭建Canal監(jiān)聽數(shù)據(jù)庫變化的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!