-
需求
-
步驟
-
環(huán)境搭建
-
整合SpringBoot Canal實(shí)現(xiàn)客戶端
-
Canal整合RabbitMQ
-
SpringBoot整合RabbitMQ
?文章來源地址http://www.zghlxwxcb.cn/news/detail-668294.html
需求
我想要在SpringBoot中采用一種與業(yè)務(wù)代碼解耦合的方式,來實(shí)現(xiàn)數(shù)據(jù)的變更記錄,記錄的內(nèi)容是新數(shù)據(jù),如果是更新操作還得有舊數(shù)據(jù)內(nèi)容。
經(jīng)過調(diào)研發(fā)現(xiàn),使用Canal來監(jiān)聽MySQL的binlog變化可以實(shí)現(xiàn)這個(gè)需求,可是在監(jiān)聽到變化后需要馬上保存變更記錄,除非再做一些邏輯處理,于是我又結(jié)合了RabbitMQ來處理保存變更記錄的操作。
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
步驟
-
啟動(dòng)MySQL環(huán)境,并開啟binlog
-
啟動(dòng)Canal環(huán)境,為其創(chuàng)建一個(gè)MySQL賬號(hào),然后以Slave的形式連接MySQL
-
Canal服務(wù)模式設(shè)為TCP,用Java編寫客戶端代碼,監(jiān)聽MySQL的binlog修改
-
Canal服務(wù)模式設(shè)為RabbitMQ,啟動(dòng)RabbitMQ環(huán)境,配置Canal和RabbitMQ的連接,用消息隊(duì)列去接收binlog修改事件
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
環(huán)境搭建
環(huán)境搭建基于docker-compose:
version:?"3"??
services:??
????mysql:??
????????network_mode:?mynetwork??
????????container_name:?mymysql??
????????ports:??
????????????-?3306:3306??
????????restart:?always??
????????volumes:??
????????????-?/etc/localtime:/etc/localtime??
????????????-?/home/mycontainers/mymysql/data:/data??
????????????-?/home/mycontainers/mymysql/mysql:/var/lib/mysql??
????????????-?/home/mycontainers/mymysql/conf:/etc/mysql??
????????environment:??
????????????-?MYSQL_ROOT_PASSWORD=root??
????????command:???
????????????--character-set-server=utf8mb4??
????????????--collation-server=utf8mb4_unicode_ci??
????????????--log-bin=/var/lib/mysql/mysql-bin??
????????????--server-id=1??
????????????--binlog-format=ROW??
????????????--expire_logs_days=7??
????????????--max_binlog_size=500M??
????????image:?mysql:5.7.20??
????rabbitmq:?????
????????container_name:?myrabbit??
????????ports:??
????????????-?15672:15672??
????????????-?5672:5672??
????????restart:?always??
????????volumes:??
????????????-?/etc/localtime:/etc/localtime??
????????????-?/home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq??
????????network_mode:?mynetwork??
????????environment:??
????????????-?RABBITMQ_DEFAULT_USER=admin??
????????????-?RABBITMQ_DEFAULT_PASS=123456??
????????image:?rabbitmq:3.8-management??
????canal-server:??
????????container_name:?canal-server??
????????restart:?always??
????????ports:??
????????????-?11110:11110??
????????????-?11111:11111??
????????????-?11112:11112??
????????volumes:??
????????????-?/home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties??
????????????-?/home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties??
????????????-?/home/mycontainers/canal-server/logs:/home/admin/canal-server/logs??
????????network_mode:?mynetwork??
????????depends_on:??
????????????-?mysql??
????????????-?rabbitmq??
????????????#?-?canal-admin??
????????image:?canal/canal-server:v1.1.5??
我們需要修改下Canal環(huán)境的配置文件:canal.properties
和instance.properties
,映射Canal中的以下兩個(gè)路徑:
-
/home/admin/canal-server/conf/canal.properties
配置文件中,canal.destinations
意思是server上部署的instance列表,
-
/home/admin/canal-server/conf/example/instance.properties
這里的/example是指instance即實(shí)例名,要和上面canal.properties
內(nèi)instance配置對(duì)應(yīng),canal會(huì)為實(shí)例創(chuàng)建對(duì)應(yīng)的文件夾,一個(gè)Client對(duì)應(yīng)一個(gè)實(shí)例
以下是我們需要準(zhǔn)備的兩個(gè)配置文件具體內(nèi)容:
canal.properties
################################################??
########?????common?argument???############??
################################################??
#?tcp?bind?ip??
canal.ip?=??
#?register?ip?to?zookeeper??
canal.register.ip?=??
canal.port?=?11111??
canal.metrics.pull.port?=?11112??
#?canal?instance?user/passwd??
#?canal.user?=?canal??
#?canal.passwd?=?E3619321C1A937C46A0D8BD1DAC39F93B27D4458??
??
#?canal?admin?config??
#?canal.admin.manager?=?canal-admin:8089??
??
#?canal.admin.port?=?11110??
#?canal.admin.user?=?admin??
#?canal.admin.passwd?=?6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9??
??
#?admin?auto?register?自動(dòng)注冊(cè)??
#?canal.admin.register.auto?=?true??
#?集群名,單機(jī)則不寫??
#?canal.admin.register.cluster?=??
#?Canal?Server?名字??
#?canal.admin.register.name?=?canal-admin??
??
canal.zkServers?=??
#?flush?data?to?zk??
canal.zookeeper.flush.period?=?1000??
canal.withoutNetty?=?false??
#?tcp,?kafka,?rocketMQ,?rabbitMQ,?pulsarMQ??
canal.serverMode?=?tcp??
#?flush?meta?cursor/parse?position?to?file??
canal.file.data.dir?=?${canal.conf.dir}??
canal.file.flush.period?=?1000??
#?memory?store?RingBuffer?size,?should?be?Math.pow(2,n)??
canal.instance.memory.buffer.size?=?16384??
#?memory?store?RingBuffer?used?memory?unit?size?,?default?1kb??
canal.instance.memory.buffer.memunit?=?1024???
#?meory?store?gets?mode?used?MEMSIZE?or?ITEMSIZE??
canal.instance.memory.batch.mode?=?MEMSIZE??
canal.instance.memory.rawEntry?=?true??
??
#?detecing?config??
canal.instance.detecting.enable?=?false??
#canal.instance.detecting.sql?=?insert?into?retl.xdual?values(1,now())?on?duplicate?key?update?x=now()??
canal.instance.detecting.sql?=?select?1??
canal.instance.detecting.interval.time?=?3??
canal.instance.detecting.retry.threshold?=?3??
canal.instance.detecting.heartbeatHaEnable?=?false??
??
#?support?maximum?transaction?size,?more?than?the?size?of?the?transaction?will?be?cut?into?multiple?transactions?delivery??
canal.instance.transaction.size?=??1024??
#?mysql?fallback?connected?to?new?master?should?fallback?times??
canal.instance.fallbackIntervalInSeconds?=?60??
??
#?network?config??
canal.instance.network.receiveBufferSize?=?16384??
canal.instance.network.sendBufferSize?=?16384??
canal.instance.network.soTimeout?=?30??
??
#?binlog?filter?config??
canal.instance.filter.druid.ddl?=?true??
canal.instance.filter.query.dcl?=?false??
canal.instance.filter.query.dml?=?false??
canal.instance.filter.query.ddl?=?false??
canal.instance.filter.table.error?=?false??
canal.instance.filter.rows?=?false??
canal.instance.filter.transaction.entry?=?false??
canal.instance.filter.dml.insert?=?false??
canal.instance.filter.dml.update?=?false??
canal.instance.filter.dml.delete?=?false??
??
#?binlog?format/image?check??
canal.instance.binlog.format?=?ROW,STATEMENT,MIXED???
canal.instance.binlog.image?=?FULL,MINIMAL,NOBLOB??
??
#?binlog?ddl?isolation??
canal.instance.get.ddl.isolation?=?false??
??
#?parallel?parser?config??
canal.instance.parser.parallel?=?true??
#?concurrent?thread?number,?default?60%?available?processors,?suggest?not?to?exceed?Runtime.getRuntime().availableProcessors()??
canal.instance.parser.parallelThreadSize?=?16??
#?disruptor?ringbuffer?size,?must?be?power?of?2??
canal.instance.parser.parallelBufferSize?=?256??
??
#?table?meta?tsdb?info??
canal.instance.tsdb.enable?=?true??
canal.instance.tsdb.dir?=?${canal.file.data.dir:../conf}/${canal.instance.destination:}??
canal.instance.tsdb.url?=?jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;??
canal.instance.tsdb.dbUsername?=?canal??
canal.instance.tsdb.dbPassword?=?canal??
#?dump?snapshot?interval,?default?24?hour??
canal.instance.tsdb.snapshot.interval?=?24??
#?purge?snapshot?expire?,?default?360?hour(15?days)??
canal.instance.tsdb.snapshot.expire?=?360??
??
################################################??
########?????destinations????############??
################################################??
canal.destinations?=?canal-exchange??
#?conf?root?dir??
canal.conf.dir?=?../conf??
#?auto?scan?instance?dir?add/remove?and?start/stop?instance??
canal.auto.scan?=?true??
canal.auto.scan.interval?=?5??
#?set?this?value?to?'true'?means?that?when?binlog?pos?not?found,?skip?to?latest.??
#?WARN:?pls?keep?'false'?in?production?env,?or?if?you?know?what?you?want.??
canal.auto.reset.latest.pos.mode?=?false??
??
canal.instance.tsdb.spring.xml?=?classpath:spring/tsdb/h2-tsdb.xml??
#canal.instance.tsdb.spring.xml?=?classpath:spring/tsdb/mysql-tsdb.xml??
??
canal.instance.global.mode?=?spring??
canal.instance.global.lazy?=?false??
canal.instance.global.manager.address?=?${canal.admin.manager}??
#canal.instance.global.spring.xml?=?classpath:spring/memory-instance.xml??
canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml??
#canal.instance.global.spring.xml?=?classpath:spring/default-instance.xml??
??
#################################################??
########?????????MQ?Properties??????############??
#################################################??
#?aliyun?ak/sk?,?support?rds/mq??
canal.aliyun.accessKey?=??
canal.aliyun.secretKey?=??
canal.aliyun.uid=??
??
canal.mq.flatMessage?=?true??
canal.mq.canalBatchSize?=?50??
canal.mq.canalGetTimeout?=?100??
#?Set?this?value?to?"cloud",?if?you?want?open?message?trace?feature?in?aliyun.??
canal.mq.accessChannel?=?local??
??
canal.mq.database.hash?=?true??
canal.mq.send.thread.size?=?30??
canal.mq.build.thread.size?=?8??
??
#################################################??
########?????????RabbitMQ???????############??
#################################################??
rabbitmq.host?=?myrabbit??
rabbitmq.virtual.host?=?/??
rabbitmq.exchange?=?canal-exchange??
rabbitmq.username?=?admin??
rabbitmq.password?=?RabbitMQ密碼??
rabbitmq.deliveryMode?=??
此時(shí)canal.serverMode = tcp
,即TCP直連,我們先開啟這個(gè)服務(wù),然后手寫Java客戶端代碼去連接它,等下再改為RabbitMQ。
通過注釋可以看到,canal支持的服務(wù)模式有:tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ,即主流的消息隊(duì)列都支持。
instance.properties
################################################??
#?mysql?serverId?,?v1.0.26+?will?autoGen??
#canal.instance.mysql.slaveId=123??
??
#?enable?gtid?use?true/false??
canal.instance.gtidon=false??
??
#?position?info??
canal.instance.master.address=mymysql:3306??
canal.instance.master.journal.name=??
canal.instance.master.position=??
canal.instance.master.timestamp=??
canal.instance.master.gtid=??
??
#?rds?oss?binlog??
canal.instance.rds.accesskey=??
canal.instance.rds.secretkey=??
canal.instance.rds.instanceId=??
??
#?table?meta?tsdb?info??
canal.instance.tsdb.enable=true??
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb??
#canal.instance.tsdb.dbUsername=canal??
#canal.instance.tsdb.dbPassword=canal??
??
#canal.instance.standby.address?=??
#canal.instance.standby.journal.name?=??
#canal.instance.standby.position?=??
#canal.instance.standby.timestamp?=??
#canal.instance.standby.gtid=??
??
#?username/password??
canal.instance.dbUsername=canal??
canal.instance.dbPassword=canal??
canal.instance.connectionCharset?=?UTF-8??
#?enable?druid?Decrypt?database?password??
canal.instance.enableDruid=false??
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==??
??
#?table?regex??
canal.instance.filter.regex=.*\..*??
#?table?black?regex??
canal.instance.filter.black.regex=mysql\.slave_.*??
#?table?field?filter(format:?schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)??
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch??
#?table?field?black?filter(format:?schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)??
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch??
??
#?mq?config??
canal.mq.topic=canal-routing-key??
#?dynamic?topic?route?by?schema?or?table?regex??
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*??
canal.mq.partition=0??
把這兩個(gè)配置文件映射好,再次提醒,注意實(shí)例的路徑名,默認(rèn)是:/example/instance.properties
修改canal配置文件
我們需要修改這個(gè)實(shí)例配置文件,去連接MySQL,確保以下的配置正確:
canal.instance.master.address=mymysql:3306??
canal.instance.dbUsername=canal??
canal.instance.dbPassword=canal??
mymysql是同為docker容器的MySQL環(huán)境,端口3306是指內(nèi)部端口。
這里多說明一下,docker端口配置時(shí)假設(shè)為:13306:3306,那么容器對(duì)外的端口就是13306,內(nèi)部是3306,在本示例中,MySQL和Canal都是容器環(huán)境,所以Canal連接MySQL需要滿足以下條件:
-
處于同一網(wǎng)段(docker-compose.yml中的mynetwork)
-
訪問內(nèi)部端口(即3306,而非13306)
dbUsername和dbPassword為MySQL賬號(hào)密碼,為了開發(fā)方便可以使用root/root,但是我仍建議自行創(chuàng)建用戶并分配訪問權(quán)限:
#?進(jìn)入docker中的mysql容器??
docker?exec?-it?mymysql?bash??
#?進(jìn)入mysql指令模式??
mysql?-uroot?-proot??
??
#?編寫MySQL語句并執(zhí)行??
>?...??
--?選擇mysql??
use?mysql;??
--?創(chuàng)建canal用戶,賬密:canal/canal??
create?user?'canal'@'%'?identified?by?'canal';??
--?分配權(quán)限,以及允許所有主機(jī)登錄該用戶??
grant?SELECT,?INSERT,?UPDATE,?DELETE,?REPLICATION?SLAVE,?REPLICATION?CLIENT?on?*.*?to?'canal'@'%';??
??
--?刷新一下使其生效??
flush?privileges;??
??
--?附帶一個(gè)刪除用戶指令??
drop?user?'canal'@'%';??
用navicat或者shell去登錄canal這個(gè)用戶,可以訪問即創(chuàng)建成功
整合SpringBoot Canal實(shí)現(xiàn)客戶端
Maven依賴:
<canal.version>1.1.5</canal.version>??
??
<!--canal-->??
<dependency>??
??<groupId>com.alibaba.otter</groupId>??
??<artifactId>canal.client</artifactId>??
??<version>${canal.version}</version>??
</dependency>??
<dependency>??
??<groupId>com.alibaba.otter</groupId>??
??<artifactId>canal.protocol</artifactId>??
??<version>${canal.version}</version>??
</dependency>???
新增組件并啟動(dòng):
import?com.alibaba.otter.canal.client.CanalConnector;??
import?com.alibaba.otter.canal.client.CanalConnectors;??
import?com.alibaba.otter.canal.protocol.CanalEntry;??
import?com.alibaba.otter.canal.protocol.Message;??
import?org.springframework.boot.CommandLineRunner;??
import?org.springframework.stereotype.Component;??
??
import?java.net.InetSocketAddress;??
import?java.util.List;??
??
@Component??
public?class?CanalClient?{??
??
????private?final?static?int?BATCH_SIZE?=?1000;??
??
????public?void?run()?{??
????????//?創(chuàng)建鏈接??
????????CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("localhost",?11111),?"canal-exchange",?"canal",?"canal");??
????????try?{??
????????????//打開連接??
????????????connector.connect();??
????????????//訂閱數(shù)據(jù)庫表,全部表??
????????????connector.subscribe(".*\..*");??
????????????//回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒有ack的地方開始拿??
????????????connector.rollback();??
????????????while?(true)?{??
????????????????//?獲取指定數(shù)量的數(shù)據(jù)??
????????????????Message?message?=?connector.getWithoutAck(BATCH_SIZE);??
????????????????//獲取批量ID??
????????????????long?batchId?=?message.getId();??
????????????????//獲取批量的數(shù)量??
????????????????int?size?=?message.getEntries().size();??
????????????????//如果沒有數(shù)據(jù)??
????????????????if?(batchId?==?-1?||?size?==?0)?{??
????????????????????try?{??
????????????????????????//線程休眠2秒??
????????????????????????Thread.sleep(2000);??
????????????????????}?catch?(InterruptedException?e)?{??
????????????????????????e.printStackTrace();??
????????????????????}??
????????????????}?else?{??
????????????????????//如果有數(shù)據(jù),處理數(shù)據(jù)??
????????????????????printEntry(message.getEntries());??
????????????????}??
????????????????//進(jìn)行?batch?id?的確認(rèn)。確認(rèn)之后,小于等于此?batchId?的?Message?都會(huì)被確認(rèn)。??
????????????????connector.ack(batchId);??
????????????}??
????????}?catch?(Exception?e)?{??
????????????e.printStackTrace();??
????????}?finally?{??
????????????connector.disconnect();??
????????}??
????}??
??
????/**??
?????*?打印canal?server解析binlog獲得的實(shí)體類信息??
?????*/??
????private?static?void?printEntry(List<CanalEntry.Entry>?entrys)?{??
????????for?(CanalEntry.Entry?entry?:?entrys)?{??
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{??
????????????????//開啟/關(guān)閉事務(wù)的實(shí)體類型,跳過??
????????????????continue;??
????????????}??
????????????//RowChange對(duì)象,包含了一行數(shù)據(jù)變化的所有特征??
????????????//比如isDdl?是否是ddl變更操作?sql?具體的ddl?sql?beforeColumns?afterColumns?變更前后的數(shù)據(jù)字段等等??
????????????CanalEntry.RowChange?rowChage;??
????????????try?{??
????????????????rowChage?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());??
????????????}?catch?(Exception?e)?{??
????????????????throw?new?RuntimeException("ERROR?#?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),?e);??
????????????}??
????????????//獲取操作類型:insert/update/delete類型??
????????????CanalEntry.EventType?eventType?=?rowChage.getEventType();??
????????????//打印Header信息??
????????????System.out.println(String.format("================》;?binlog[%s:%s]?,?name[%s,%s]?,?eventType?:?%s",??
????????????????????entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),??
????????????????????entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),??
????????????????????eventType));??
????????????//判斷是否是DDL語句??
????????????if?(rowChage.getIsDdl())?{??
????????????????System.out.println("================》;isDdl:?true,sql:"?+?rowChage.getSql());??
????????????}??
????????????//獲取RowChange對(duì)象里的每一行數(shù)據(jù),打印出來??
????????????for?(CanalEntry.RowData?rowData?:?rowChage.getRowDatasList())?{??
????????????????//如果是刪除語句??
????????????????if?(eventType?==?CanalEntry.EventType.DELETE)?{??
????????????????????printColumn(rowData.getBeforeColumnsList());??
????????????????????//如果是新增語句??
????????????????}?else?if?(eventType?==?CanalEntry.EventType.INSERT)?{??
????????????????????printColumn(rowData.getAfterColumnsList());??
????????????????????//如果是更新的語句??
????????????????}?else?{??
????????????????????//變更前的數(shù)據(jù)??
????????????????????System.out.println("------->;?before");??
????????????????????printColumn(rowData.getBeforeColumnsList());??
????????????????????//變更后的數(shù)據(jù)??
????????????????????System.out.println("------->;?after");??
????????????????????printColumn(rowData.getAfterColumnsList());??
????????????????}??
????????????}??
????????}??
????}??
??
????private?static?void?printColumn(List<CanalEntry.Column>?columns)?{??
????????for?(CanalEntry.Column?column?:?columns)?{??
????????????System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());??
????????}??
????}??
}??
啟動(dòng)類Application:
@SpringBootApplication??
public?class?BaseApplication?implements?CommandLineRunner?{??
????@Autowired??
????private?CanalClient?canalClient;??
??
????@Override??
????public?void?run(String...?args)?throws?Exception?{??
????????canalClient.run();??
????}??
}??
啟動(dòng)程序,此時(shí)新增或修改數(shù)據(jù)庫中的數(shù)據(jù),我們就能從客戶端中監(jiān)聽到
不過我建議監(jiān)聽的信息放到消息隊(duì)列中,在空閑的時(shí)候去處理,所以直接配置Canal整合RabbitMQ更好。
Canal整合RabbitMQ
修改canal.properties中的serverMode:
canal.serverMode?=?rabbitMQ??
修改instance.properties中的topic:
canal.mq.topic=canal-routing-key??
然后找到關(guān)于RabbitMQ的配置:
#################################################??
########?????????RabbitMQ???????############??
#################################################??
#?連接rabbit,寫IP,因?yàn)橥瑐€(gè)網(wǎng)絡(luò)下,所以可以寫容器名??
rabbitmq.host?=?myrabbit??
rabbitmq.virtual.host?=?/??
#?交換器名稱,等等我們要去手動(dòng)創(chuàng)建??
rabbitmq.exchange?=?canal-exchange??
#?賬密??
rabbitmq.username?=?admin??
rabbitmq.password?=?123456??
#?暫不支持指定端口,使用的是默認(rèn)的5762,好在在本示例中適用??
重新啟動(dòng)容器,進(jìn)入RabbitMQ管理頁面創(chuàng)建exchange交換器和隊(duì)列queue:
-
新建exchange,命名為:
canal-exchange
-
新建queue,命名為:
canal-queue
-
綁定exchange和queue,routing-key設(shè)置為:
canal-routing-key
,這里對(duì)應(yīng)上面instance.properties
的canal.mq.topic
順帶一提,上面這段可以忽略,因?yàn)樵赟pringBoot的RabbitMQ配置中,會(huì)自動(dòng)創(chuàng)建交換器exchange和隊(duì)列queue,不過手動(dòng)創(chuàng)建的話,可以在忽略SpringBoot的基礎(chǔ)上,直接在RabbitMQ的管理頁面上看到修改記錄的消息。
SpringBoot整合RabbitMQ
依賴:
<amqp.version>2.3.4.RELEASE</amqp.version>??
??
<!--消息隊(duì)列-->??
<dependency>??
??<groupId>org.springframework.boot</groupId>??
??<artifactId>spring-boot-starter-amqp</artifactId>??
??<version>${amqp.version}</version>??
</dependency>??
application.yml?:
spring:??
??rabbitmq:??
????#????host:?myserverhost??
????host:?192.168.0.108??
????port:?5672??
????username:?admin??
????password:?RabbitMQ密碼??
????#?消息確認(rèn)配置項(xiàng)??
????#?確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)??
????publisher-confirm-type:?correlated??
????#?確認(rèn)消息已發(fā)送到隊(duì)列(Queue)??
????publisher-returns:?true??
RabbitMQ配置類:
@Configuration??
public?class?RabbitConfig?{??
????@Bean??
????public?RabbitTemplate?rabbitTemplate(ConnectionFactory?connectionFactory)?{??
????????RabbitTemplate?template?=?new?RabbitTemplate();??
????????template.setConnectionFactory(connectionFactory);??
????????template.setMessageConverter(new?Jackson2JsonMessageConverter());??
??
????????return?template;??
????}??
??
????/**??
?????*?template.setMessageConverter(new?Jackson2JsonMessageConverter());??
?????*?這段和上面這行代碼解決RabbitListener循環(huán)報(bào)錯(cuò)的問題??
?????*/??
????@Bean??
????public?SimpleRabbitListenerContainerFactory?rabbitListenerContainerFactory(ConnectionFactory?connectionFactory)?{??
????????SimpleRabbitListenerContainerFactory?factory?=?new?SimpleRabbitListenerContainerFactory();??
????????factory.setConnectionFactory(connectionFactory);??
????????factory.setMessageConverter(new?Jackson2JsonMessageConverter());??
????????return?factory;??
????}??
}??
Canal消息生產(chǎn)者:
public?static?final?String?CanalQueue?=?"canal-queue";??
public?static?final?String?CanalExchange?=?"canal-exchange";??
public?static?final?String?CanalRouting?=?"canal-routing-key";??
/**??
?*?Canal消息提供者,canal-server生產(chǎn)的消息通過RabbitMQ消息隊(duì)列發(fā)送??
?*/??
@Configuration??
public?class?CanalProvider?{??
????/**??
?????*?隊(duì)列??
?????*/??
????@Bean??
????public?Queue?canalQueue()?{??
????????/**??
?????????*?durable:是否持久化,默認(rèn)false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在;暫存隊(duì)列:當(dāng)前連接有效??
?????????*?exclusive:默認(rèn)為false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級(jí)高于durable??
?????????*?autoDelete:是否自動(dòng)刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除??
?????????*/??
????????return?new?Queue(RabbitConstant.CanalQueue,?true);??
????}??
??
????/**??
?????*?交換機(jī),這里使用直連交換機(jī)??
?????*/??
????@Bean??
????DirectExchange?canalExchange()?{??
????????return?new?DirectExchange(RabbitConstant.CanalExchange,?true,?false);??
????}??
??
????/**??
?????*?綁定交換機(jī)和隊(duì)列,并設(shè)置匹配鍵??
?????*/??
????@Bean??
????Binding?bindingCanal()?{??
????????return?BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);??
????}??
}??
Canal消息消費(fèi)者:
/**??
?*?Canal消息消費(fèi)者??
?*/??
@Component??
@RabbitListener(queues?=?RabbitConstant.CanalQueue)??
public?class?CanalComsumer?{??
????private?final?SysBackupService?sysBackupService;??
??
????public?CanalComsumer(SysBackupService?sysBackupService)?{??
????????this.sysBackupService?=?sysBackupService;??
????}??
??
????@RabbitHandler??
????public?void?process(Map<String,?Object>?msg)?{??
????????System.out.println("收到canal消息:"?+?msg);??
????????boolean?isDdl?=?(boolean)?msg.get("isDdl");??
??
????????//?不處理DDL事件??
????????if?(isDdl)?{??
????????????return;??
????????}??
??
????????//?TiCDC的id,應(yīng)該具有唯一性,先保存再說??
????????int?tid?=?(int)?msg.get("id");??
????????//?TiCDC生成該消息的時(shí)間戳,13位毫秒級(jí)??
????????long?ts?=?(long)?msg.get("ts");??
????????//?數(shù)據(jù)庫??
????????String?database?=?(String)?msg.get("database");??
????????//?表??
????????String?table?=?(String)?msg.get("table");??
????????//?類型:INSERT/UPDATE/DELETE??
????????String?type?=?(String)?msg.get("type");??
????????//?每一列的數(shù)據(jù)值??
????????List<?>?data?=?(List<?>)?msg.get("data");??
????????//?僅當(dāng)type為UPDATE時(shí)才有值,記錄每一列的名字和UPDATE之前的數(shù)據(jù)值??
????????List<?>?old?=?(List<?>)?msg.get("old");??
??
????????//?跳過sys_backup,防止無限循環(huán)??
????????if?("sys_backup".equalsIgnoreCase(table))?{??
????????????return;??
????????}??
??
????????//?只處理指定類型??
????????if?(!"INSERT".equalsIgnoreCase(type)??
????????????????&&?!"UPDATE".equalsIgnoreCase(type)??
????????????????&&?!"DELETE".equalsIgnoreCase(type))?{??
????????????return;??
????????}??
????}??
}??
測(cè)試一下,修改MySQL中的一條消息,Canal就會(huì)發(fā)送信息到RabbitMQ,我們就能從監(jiān)聽的RabbitMQ隊(duì)列中得到該條消息。文章來源:http://www.zghlxwxcb.cn/news/detail-668294.html
?
到了這里,關(guān)于SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!