1、前言
??????? 工作中,我們很多時(shí)候需要根據(jù)某些狀態(tài)的變化更新另一個(gè)業(yè)務(wù)的邏輯,比如訂單的生成,成交等,需要更新或者通知其他的業(yè)務(wù)。我們通常的操作通過(guò)業(yè)務(wù)埋點(diǎn)、接口的調(diào)用或者中間件完成。
????????但是狀態(tài)變化的入口比較多的時(shí)候,就很容易漏掉某些地方。代碼維護(hù)起來(lái)也比較麻煩。今天介紹阿里出品的 【canal】中間件完成數(shù)據(jù)庫(kù)字段的監(jiān)聽(tīng)。
2、canal的簡(jiǎn)單介紹
??????? canal詳見(jiàn)介紹件官網(wǎng):https://github.com/alibaba/canal
?
2.1 家族成員:
【canal.adapter】:客戶端落地的適配以及功能
??????
?【canal.admin】:提供WebUI的管理界面
?【canal.deployer】:canal服務(wù)
?【canal.example】:客戶端提供的demo
2.2 工作原理
?3、 實(shí)踐目標(biāo)
??????? 使用canal監(jiān)控mysql數(shù)據(jù)的變化,將變化的數(shù)據(jù)推送到kafka,并使用canal-admin動(dòng)態(tài)管理需要監(jiān)控的數(shù)據(jù)庫(kù)表。
?4、工具準(zhǔn)備
其中kafka是依賴zookeeper的,所以也需要zookeeper。
5、配置并啟動(dòng)kafka
Kafka QuickStart
5.1 修改配置
vim config/server.properties
換成自己的IP
替換成自己zookeeper的地址
?5.2 啟動(dòng)server
- 啟動(dòng)zookeeper腳本
# bin/zkServer.sh start
- 啟動(dòng)kafka腳本
# bin/kafka-server-start.sh -daemon config/server.properties &
- ?查看是否啟動(dòng)成功腳本
# jps -ml
?
此時(shí)kafka啟動(dòng)成功。
5.3 注意事項(xiàng)
值得注意的是官方文檔中查看topic的命令,
# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
在心的kafka版本中已經(jīng)改變,可移步kafka官方文檔: Apache Kafka
新版本中使用bootstrap-server,如下
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
6、啟動(dòng)canal-admin
6.1 修改配置
改成對(duì)應(yīng)的ip
?6.2 執(zhí)行 conf/canal.manage.sql
???????? 該腳本是canal-admin的管理腳本。
?6.3 啟動(dòng)canal-admin
sh bin/startup.sh
?6.4 查看啟動(dòng)狀態(tài)
?6.5 訪問(wèn)頁(yè)面
此時(shí)代表canal-admin已經(jīng)啟動(dòng)成功,可以通過(guò) http://127.0.0.1:8089/ 訪問(wèn),
默認(rèn)密碼:admin/123456
7、啟動(dòng)canal-server
7.1 修改配置腳本
# vim conf/canal_local.properties
換成canal-admin的IP
7.2 啟動(dòng)服務(wù) 指定local
# sh bin/startup.sh local
7.3 查詢啟動(dòng)狀態(tài)
8、管理平臺(tái)配置
8.1 查看canal服務(wù)的狀態(tài)
?8.2 配置實(shí)例
?修改監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)信息:
canal.instance.master.address=192.168.88.111:3306
canal.instance.dbUsername=***
canal.instance.dbPassword=***#默認(rèn)監(jiān)聽(tīng)全庫(kù)
canal.instance.filter.regex=test.test_user
#配置不可訪問(wèn)的庫(kù)表
canal.instance.filter.black.regex=
#配置mq的主題/路由
canal.mq.topic=example
?保存即可。
8.3 啟動(dòng)實(shí)例
9、編寫(xiě)客戶端監(jiān)聽(tīng)kafka的客戶端
@Test
public void test01(){
// 修改打印日志的級(jí)別,不然會(huì)不停的打印debug日志,影響閱讀
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Logger root = loggerContext.getLogger("root");
root.setLevel(Level.INFO);
//設(shè)置消費(fèi)者屬性
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.88.111:9092");
//反序列化器,與生產(chǎn)者的序列化器相對(duì)應(yīng)
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
//設(shè)置消費(fèi)者的消費(fèi)者群組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");
// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
//消費(fèi)者訂閱主題(可以多個(gè),支持正則表達(dá)式,進(jìn)行模糊匹配)
consumer.subscribe(Collections.singletonList("example"));
System.out.println("-------------------------消費(fèi)端準(zhǔn)備就緒,等待消息接受------------------------------------");
//kafka消費(fèi)者是通過(guò)拉取的方式獲得服務(wù)端消息
while(true){
//循環(huán)調(diào)用poll方法,獲取數(shù)據(jù)。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record:records){
String topic = record.topic();
String value = record.value();
if (StringUtils.isNotEmpty(value)) {
System.out.println(String.format("topic:%s;" + "value:%s", topic,value));
}
}
}
} finally {
consumer.close();
}
}
?10、驗(yàn)證
修改數(shù)據(jù)庫(kù)字段,可以接收到修改的信息文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-403225.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-403225.html
到了這里,關(guān)于基于Canal+kafka監(jiān)聽(tīng)數(shù)據(jù)庫(kù)變化的最佳實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!