国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

  • 需求

  • 步驟

  • 環(huán)境搭建

  • 整合SpringBoot Canal實(shí)現(xiàn)客戶端

  • Canal整合RabbitMQ

  • SpringBoot整合RabbitMQ

hevc?url=https%3A%2F%2Fmmbiz.qpic.cn%2Fmmbiz_jpg%2F8KKrHK5ic6XAiccUfgo6IXHY0tFbibtGvYdKcc2qqPSbqAIb5KYL8nlq29RCD7cf0QqxOcISM3I4j2fQ4fq6LlrMA%2F640%3Fwx_fmt%3Djpeg%26tp%3Dwxpic%26wxfrom%3D5%26wx_lazy%3D1%26wx_co%3D1&type=jpg

?文章來源地址http://www.zghlxwxcb.cn/news/detail-668294.html


需求

我想要在SpringBoot中采用一種與業(yè)務(wù)代碼解耦合的方式,來實(shí)現(xiàn)數(shù)據(jù)的變更記錄,記錄的內(nèi)容是新數(shù)據(jù),如果是更新操作還得有舊數(shù)據(jù)內(nèi)容。

經(jīng)過調(diào)研發(fā)現(xiàn),使用Canal來監(jiān)聽MySQL的binlog變化可以實(shí)現(xiàn)這個(gè)需求,可是在監(jiān)聽到變化后需要馬上保存變更記錄,除非再做一些邏輯處理,于是我又結(jié)合了RabbitMQ來處理保存變更記錄的操作。

基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能

  • 項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro

  • 視頻教程:https://doc.iocoder.cn/video/

步驟

  • 啟動(dòng)MySQL環(huán)境,并開啟binlog

  • 啟動(dòng)Canal環(huán)境,為其創(chuàng)建一個(gè)MySQL賬號(hào),然后以Slave的形式連接MySQL

  • Canal服務(wù)模式設(shè)為TCP,用Java編寫客戶端代碼,監(jiān)聽MySQL的binlog修改

  • Canal服務(wù)模式設(shè)為RabbitMQ,啟動(dòng)RabbitMQ環(huán)境,配置Canal和RabbitMQ的連接,用消息隊(duì)列去接收binlog修改事件

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能

  • 項(xiàng)目地址:https://github.com/YunaiV/yudao-cloud

  • 視頻教程:https://doc.iocoder.cn/video/

環(huán)境搭建

環(huán)境搭建基于docker-compose:

version:?"3"??
services:??
????mysql:??
????????network_mode:?mynetwork??
????????container_name:?mymysql??
????????ports:??
????????????-?3306:3306??
????????restart:?always??
????????volumes:??
????????????-?/etc/localtime:/etc/localtime??
????????????-?/home/mycontainers/mymysql/data:/data??
????????????-?/home/mycontainers/mymysql/mysql:/var/lib/mysql??
????????????-?/home/mycontainers/mymysql/conf:/etc/mysql??
????????environment:??
????????????-?MYSQL_ROOT_PASSWORD=root??
????????command:???
????????????--character-set-server=utf8mb4??
????????????--collation-server=utf8mb4_unicode_ci??
????????????--log-bin=/var/lib/mysql/mysql-bin??
????????????--server-id=1??
????????????--binlog-format=ROW??
????????????--expire_logs_days=7??
????????????--max_binlog_size=500M??
????????image:?mysql:5.7.20??
????rabbitmq:?????
????????container_name:?myrabbit??
????????ports:??
????????????-?15672:15672??
????????????-?5672:5672??
????????restart:?always??
????????volumes:??
????????????-?/etc/localtime:/etc/localtime??
????????????-?/home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq??
????????network_mode:?mynetwork??
????????environment:??
????????????-?RABBITMQ_DEFAULT_USER=admin??
????????????-?RABBITMQ_DEFAULT_PASS=123456??
????????image:?rabbitmq:3.8-management??
????canal-server:??
????????container_name:?canal-server??
????????restart:?always??
????????ports:??
????????????-?11110:11110??
????????????-?11111:11111??
????????????-?11112:11112??
????????volumes:??
????????????-?/home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties??
????????????-?/home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties??
????????????-?/home/mycontainers/canal-server/logs:/home/admin/canal-server/logs??
????????network_mode:?mynetwork??
????????depends_on:??
????????????-?mysql??
????????????-?rabbitmq??
????????????#?-?canal-admin??
????????image:?canal/canal-server:v1.1.5??

我們需要修改下Canal環(huán)境的配置文件:canal.propertiesinstance.properties,映射Canal中的以下兩個(gè)路徑:

  • /home/admin/canal-server/conf/canal.properties

配置文件中,canal.destinations意思是server上部署的instance列表,

  • /home/admin/canal-server/conf/example/instance.properties

這里的/example是指instance即實(shí)例名,要和上面canal.properties內(nèi)instance配置對(duì)應(yīng),canal會(huì)為實(shí)例創(chuàng)建對(duì)應(yīng)的文件夾,一個(gè)Client對(duì)應(yīng)一個(gè)實(shí)例

以下是我們需要準(zhǔn)備的兩個(gè)配置文件具體內(nèi)容:

canal.properties

################################################??
########?????common?argument???############??
################################################??
#?tcp?bind?ip??
canal.ip?=??
#?register?ip?to?zookeeper??
canal.register.ip?=??
canal.port?=?11111??
canal.metrics.pull.port?=?11112??
#?canal?instance?user/passwd??
#?canal.user?=?canal??
#?canal.passwd?=?E3619321C1A937C46A0D8BD1DAC39F93B27D4458??
??
#?canal?admin?config??
#?canal.admin.manager?=?canal-admin:8089??
??
#?canal.admin.port?=?11110??
#?canal.admin.user?=?admin??
#?canal.admin.passwd?=?6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9??
??
#?admin?auto?register?自動(dòng)注冊(cè)??
#?canal.admin.register.auto?=?true??
#?集群名,單機(jī)則不寫??
#?canal.admin.register.cluster?=??
#?Canal?Server?名字??
#?canal.admin.register.name?=?canal-admin??
??
canal.zkServers?=??
#?flush?data?to?zk??
canal.zookeeper.flush.period?=?1000??
canal.withoutNetty?=?false??
#?tcp,?kafka,?rocketMQ,?rabbitMQ,?pulsarMQ??
canal.serverMode?=?tcp??
#?flush?meta?cursor/parse?position?to?file??
canal.file.data.dir?=?${canal.conf.dir}??
canal.file.flush.period?=?1000??
#?memory?store?RingBuffer?size,?should?be?Math.pow(2,n)??
canal.instance.memory.buffer.size?=?16384??
#?memory?store?RingBuffer?used?memory?unit?size?,?default?1kb??
canal.instance.memory.buffer.memunit?=?1024???
#?meory?store?gets?mode?used?MEMSIZE?or?ITEMSIZE??
canal.instance.memory.batch.mode?=?MEMSIZE??
canal.instance.memory.rawEntry?=?true??
??
#?detecing?config??
canal.instance.detecting.enable?=?false??
#canal.instance.detecting.sql?=?insert?into?retl.xdual?values(1,now())?on?duplicate?key?update?x=now()??
canal.instance.detecting.sql?=?select?1??
canal.instance.detecting.interval.time?=?3??
canal.instance.detecting.retry.threshold?=?3??
canal.instance.detecting.heartbeatHaEnable?=?false??
??
#?support?maximum?transaction?size,?more?than?the?size?of?the?transaction?will?be?cut?into?multiple?transactions?delivery??
canal.instance.transaction.size?=??1024??
#?mysql?fallback?connected?to?new?master?should?fallback?times??
canal.instance.fallbackIntervalInSeconds?=?60??
??
#?network?config??
canal.instance.network.receiveBufferSize?=?16384??
canal.instance.network.sendBufferSize?=?16384??
canal.instance.network.soTimeout?=?30??
??
#?binlog?filter?config??
canal.instance.filter.druid.ddl?=?true??
canal.instance.filter.query.dcl?=?false??
canal.instance.filter.query.dml?=?false??
canal.instance.filter.query.ddl?=?false??
canal.instance.filter.table.error?=?false??
canal.instance.filter.rows?=?false??
canal.instance.filter.transaction.entry?=?false??
canal.instance.filter.dml.insert?=?false??
canal.instance.filter.dml.update?=?false??
canal.instance.filter.dml.delete?=?false??
??
#?binlog?format/image?check??
canal.instance.binlog.format?=?ROW,STATEMENT,MIXED???
canal.instance.binlog.image?=?FULL,MINIMAL,NOBLOB??
??
#?binlog?ddl?isolation??
canal.instance.get.ddl.isolation?=?false??
??
#?parallel?parser?config??
canal.instance.parser.parallel?=?true??
#?concurrent?thread?number,?default?60%?available?processors,?suggest?not?to?exceed?Runtime.getRuntime().availableProcessors()??
canal.instance.parser.parallelThreadSize?=?16??
#?disruptor?ringbuffer?size,?must?be?power?of?2??
canal.instance.parser.parallelBufferSize?=?256??
??
#?table?meta?tsdb?info??
canal.instance.tsdb.enable?=?true??
canal.instance.tsdb.dir?=?${canal.file.data.dir:../conf}/${canal.instance.destination:}??
canal.instance.tsdb.url?=?jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;??
canal.instance.tsdb.dbUsername?=?canal??
canal.instance.tsdb.dbPassword?=?canal??
#?dump?snapshot?interval,?default?24?hour??
canal.instance.tsdb.snapshot.interval?=?24??
#?purge?snapshot?expire?,?default?360?hour(15?days)??
canal.instance.tsdb.snapshot.expire?=?360??
??
################################################??
########?????destinations????############??
################################################??
canal.destinations?=?canal-exchange??
#?conf?root?dir??
canal.conf.dir?=?../conf??
#?auto?scan?instance?dir?add/remove?and?start/stop?instance??
canal.auto.scan?=?true??
canal.auto.scan.interval?=?5??
#?set?this?value?to?'true'?means?that?when?binlog?pos?not?found,?skip?to?latest.??
#?WARN:?pls?keep?'false'?in?production?env,?or?if?you?know?what?you?want.??
canal.auto.reset.latest.pos.mode?=?false??
??
canal.instance.tsdb.spring.xml?=?classpath:spring/tsdb/h2-tsdb.xml??
#canal.instance.tsdb.spring.xml?=?classpath:spring/tsdb/mysql-tsdb.xml??
??
canal.instance.global.mode?=?spring??
canal.instance.global.lazy?=?false??
canal.instance.global.manager.address?=?${canal.admin.manager}??
#canal.instance.global.spring.xml?=?classpath:spring/memory-instance.xml??
canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml??
#canal.instance.global.spring.xml?=?classpath:spring/default-instance.xml??
??
#################################################??
########?????????MQ?Properties??????############??
#################################################??
#?aliyun?ak/sk?,?support?rds/mq??
canal.aliyun.accessKey?=??
canal.aliyun.secretKey?=??
canal.aliyun.uid=??
??
canal.mq.flatMessage?=?true??
canal.mq.canalBatchSize?=?50??
canal.mq.canalGetTimeout?=?100??
#?Set?this?value?to?"cloud",?if?you?want?open?message?trace?feature?in?aliyun.??
canal.mq.accessChannel?=?local??
??
canal.mq.database.hash?=?true??
canal.mq.send.thread.size?=?30??
canal.mq.build.thread.size?=?8??
??
#################################################??
########?????????RabbitMQ???????############??
#################################################??
rabbitmq.host?=?myrabbit??
rabbitmq.virtual.host?=?/??
rabbitmq.exchange?=?canal-exchange??
rabbitmq.username?=?admin??
rabbitmq.password?=?RabbitMQ密碼??
rabbitmq.deliveryMode?=??

此時(shí)canal.serverMode = tcp,即TCP直連,我們先開啟這個(gè)服務(wù),然后手寫Java客戶端代碼去連接它,等下再改為RabbitMQ。

通過注釋可以看到,canal支持的服務(wù)模式有:tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ,即主流的消息隊(duì)列都支持。

instance.properties

################################################??
#?mysql?serverId?,?v1.0.26+?will?autoGen??
#canal.instance.mysql.slaveId=123??
??
#?enable?gtid?use?true/false??
canal.instance.gtidon=false??
??
#?position?info??
canal.instance.master.address=mymysql:3306??
canal.instance.master.journal.name=??
canal.instance.master.position=??
canal.instance.master.timestamp=??
canal.instance.master.gtid=??
??
#?rds?oss?binlog??
canal.instance.rds.accesskey=??
canal.instance.rds.secretkey=??
canal.instance.rds.instanceId=??
??
#?table?meta?tsdb?info??
canal.instance.tsdb.enable=true??
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb??
#canal.instance.tsdb.dbUsername=canal??
#canal.instance.tsdb.dbPassword=canal??
??
#canal.instance.standby.address?=??
#canal.instance.standby.journal.name?=??
#canal.instance.standby.position?=??
#canal.instance.standby.timestamp?=??
#canal.instance.standby.gtid=??
??
#?username/password??
canal.instance.dbUsername=canal??
canal.instance.dbPassword=canal??
canal.instance.connectionCharset?=?UTF-8??
#?enable?druid?Decrypt?database?password??
canal.instance.enableDruid=false??
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==??
??
#?table?regex??
canal.instance.filter.regex=.*\..*??
#?table?black?regex??
canal.instance.filter.black.regex=mysql\.slave_.*??
#?table?field?filter(format:?schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)??
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch??
#?table?field?black?filter(format:?schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)??
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch??
??
#?mq?config??
canal.mq.topic=canal-routing-key??
#?dynamic?topic?route?by?schema?or?table?regex??
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*??
canal.mq.partition=0??

把這兩個(gè)配置文件映射好,再次提醒,注意實(shí)例的路徑名,默認(rèn)是:/example/instance.properties

修改canal配置文件

我們需要修改這個(gè)實(shí)例配置文件,去連接MySQL,確保以下的配置正確:

canal.instance.master.address=mymysql:3306??
canal.instance.dbUsername=canal??
canal.instance.dbPassword=canal??

mymysql是同為docker容器的MySQL環(huán)境,端口3306是指內(nèi)部端口。

這里多說明一下,docker端口配置時(shí)假設(shè)為:13306:3306,那么容器對(duì)外的端口就是13306,內(nèi)部是3306,在本示例中,MySQL和Canal都是容器環(huán)境,所以Canal連接MySQL需要滿足以下條件:

  • 處于同一網(wǎng)段(docker-compose.yml中的mynetwork)

  • 訪問內(nèi)部端口(即3306,而非13306)

dbUsername和dbPassword為MySQL賬號(hào)密碼,為了開發(fā)方便可以使用root/root,但是我仍建議自行創(chuàng)建用戶并分配訪問權(quán)限:

#?進(jìn)入docker中的mysql容器??
docker?exec?-it?mymysql?bash??
#?進(jìn)入mysql指令模式??
mysql?-uroot?-proot??
??
#?編寫MySQL語句并執(zhí)行??
>?...??
--?選擇mysql??
use?mysql;??
--?創(chuàng)建canal用戶,賬密:canal/canal??
create?user?'canal'@'%'?identified?by?'canal';??
--?分配權(quán)限,以及允許所有主機(jī)登錄該用戶??
grant?SELECT,?INSERT,?UPDATE,?DELETE,?REPLICATION?SLAVE,?REPLICATION?CLIENT?on?*.*?to?'canal'@'%';??
??
--?刷新一下使其生效??
flush?privileges;??
??
--?附帶一個(gè)刪除用戶指令??
drop?user?'canal'@'%';??

用navicat或者shell去登錄canal這個(gè)用戶,可以訪問即創(chuàng)建成功

整合SpringBoot Canal實(shí)現(xiàn)客戶端

Maven依賴:

<canal.version>1.1.5</canal.version>??
??
<!--canal-->??
<dependency>??
??<groupId>com.alibaba.otter</groupId>??
??<artifactId>canal.client</artifactId>??
??<version>${canal.version}</version>??
</dependency>??
<dependency>??
??<groupId>com.alibaba.otter</groupId>??
??<artifactId>canal.protocol</artifactId>??
??<version>${canal.version}</version>??
</dependency>???

新增組件并啟動(dòng):

import?com.alibaba.otter.canal.client.CanalConnector;??
import?com.alibaba.otter.canal.client.CanalConnectors;??
import?com.alibaba.otter.canal.protocol.CanalEntry;??
import?com.alibaba.otter.canal.protocol.Message;??
import?org.springframework.boot.CommandLineRunner;??
import?org.springframework.stereotype.Component;??
??
import?java.net.InetSocketAddress;??
import?java.util.List;??
??
@Component??
public?class?CanalClient?{??
??
????private?final?static?int?BATCH_SIZE?=?1000;??
??
????public?void?run()?{??
????????//?創(chuàng)建鏈接??
????????CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("localhost",?11111),?"canal-exchange",?"canal",?"canal");??
????????try?{??
????????????//打開連接??
????????????connector.connect();??
????????????//訂閱數(shù)據(jù)庫表,全部表??
????????????connector.subscribe(".*\..*");??
????????????//回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒有ack的地方開始拿??
????????????connector.rollback();??
????????????while?(true)?{??
????????????????//?獲取指定數(shù)量的數(shù)據(jù)??
????????????????Message?message?=?connector.getWithoutAck(BATCH_SIZE);??
????????????????//獲取批量ID??
????????????????long?batchId?=?message.getId();??
????????????????//獲取批量的數(shù)量??
????????????????int?size?=?message.getEntries().size();??
????????????????//如果沒有數(shù)據(jù)??
????????????????if?(batchId?==?-1?||?size?==?0)?{??
????????????????????try?{??
????????????????????????//線程休眠2秒??
????????????????????????Thread.sleep(2000);??
????????????????????}?catch?(InterruptedException?e)?{??
????????????????????????e.printStackTrace();??
????????????????????}??
????????????????}?else?{??
????????????????????//如果有數(shù)據(jù),處理數(shù)據(jù)??
????????????????????printEntry(message.getEntries());??
????????????????}??
????????????????//進(jìn)行?batch?id?的確認(rèn)。確認(rèn)之后,小于等于此?batchId?的?Message?都會(huì)被確認(rèn)。??
????????????????connector.ack(batchId);??
????????????}??
????????}?catch?(Exception?e)?{??
????????????e.printStackTrace();??
????????}?finally?{??
????????????connector.disconnect();??
????????}??
????}??
??
????/**??
?????*?打印canal?server解析binlog獲得的實(shí)體類信息??
?????*/??
????private?static?void?printEntry(List<CanalEntry.Entry>?entrys)?{??
????????for?(CanalEntry.Entry?entry?:?entrys)?{??
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{??
????????????????//開啟/關(guān)閉事務(wù)的實(shí)體類型,跳過??
????????????????continue;??
????????????}??
????????????//RowChange對(duì)象,包含了一行數(shù)據(jù)變化的所有特征??
????????????//比如isDdl?是否是ddl變更操作?sql?具體的ddl?sql?beforeColumns?afterColumns?變更前后的數(shù)據(jù)字段等等??
????????????CanalEntry.RowChange?rowChage;??
????????????try?{??
????????????????rowChage?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());??
????????????}?catch?(Exception?e)?{??
????????????????throw?new?RuntimeException("ERROR?#?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),?e);??
????????????}??
????????????//獲取操作類型:insert/update/delete類型??
????????????CanalEntry.EventType?eventType?=?rowChage.getEventType();??
????????????//打印Header信息??
????????????System.out.println(String.format("================》;?binlog[%s:%s]?,?name[%s,%s]?,?eventType?:?%s",??
????????????????????entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),??
????????????????????entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),??
????????????????????eventType));??
????????????//判斷是否是DDL語句??
????????????if?(rowChage.getIsDdl())?{??
????????????????System.out.println("================》;isDdl:?true,sql:"?+?rowChage.getSql());??
????????????}??
????????????//獲取RowChange對(duì)象里的每一行數(shù)據(jù),打印出來??
????????????for?(CanalEntry.RowData?rowData?:?rowChage.getRowDatasList())?{??
????????????????//如果是刪除語句??
????????????????if?(eventType?==?CanalEntry.EventType.DELETE)?{??
????????????????????printColumn(rowData.getBeforeColumnsList());??
????????????????????//如果是新增語句??
????????????????}?else?if?(eventType?==?CanalEntry.EventType.INSERT)?{??
????????????????????printColumn(rowData.getAfterColumnsList());??
????????????????????//如果是更新的語句??
????????????????}?else?{??
????????????????????//變更前的數(shù)據(jù)??
????????????????????System.out.println("------->;?before");??
????????????????????printColumn(rowData.getBeforeColumnsList());??
????????????????????//變更后的數(shù)據(jù)??
????????????????????System.out.println("------->;?after");??
????????????????????printColumn(rowData.getAfterColumnsList());??
????????????????}??
????????????}??
????????}??
????}??
??
????private?static?void?printColumn(List<CanalEntry.Column>?columns)?{??
????????for?(CanalEntry.Column?column?:?columns)?{??
????????????System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());??
????????}??
????}??
}??

啟動(dòng)類Application:

@SpringBootApplication??
public?class?BaseApplication?implements?CommandLineRunner?{??
????@Autowired??
????private?CanalClient?canalClient;??
??
????@Override??
????public?void?run(String...?args)?throws?Exception?{??
????????canalClient.run();??
????}??
}??

啟動(dòng)程序,此時(shí)新增或修改數(shù)據(jù)庫中的數(shù)據(jù),我們就能從客戶端中監(jiān)聽到

不過我建議監(jiān)聽的信息放到消息隊(duì)列中,在空閑的時(shí)候去處理,所以直接配置Canal整合RabbitMQ更好。

Canal整合RabbitMQ

修改canal.properties中的serverMode:

canal.serverMode?=?rabbitMQ??

修改instance.properties中的topic:

canal.mq.topic=canal-routing-key??

然后找到關(guān)于RabbitMQ的配置:

#################################################??
########?????????RabbitMQ???????############??
#################################################??
#?連接rabbit,寫IP,因?yàn)橥瑐€(gè)網(wǎng)絡(luò)下,所以可以寫容器名??
rabbitmq.host?=?myrabbit??
rabbitmq.virtual.host?=?/??
#?交換器名稱,等等我們要去手動(dòng)創(chuàng)建??
rabbitmq.exchange?=?canal-exchange??
#?賬密??
rabbitmq.username?=?admin??
rabbitmq.password?=?123456??
#?暫不支持指定端口,使用的是默認(rèn)的5762,好在在本示例中適用??

重新啟動(dòng)容器,進(jìn)入RabbitMQ管理頁面創(chuàng)建exchange交換器和隊(duì)列queue:

  • 新建exchange,命名為:canal-exchange

  • 新建queue,命名為:canal-queue

  • 綁定exchange和queue,routing-key設(shè)置為:canal-routing-key,這里對(duì)應(yīng)上面instance.propertiescanal.mq.topic

順帶一提,上面這段可以忽略,因?yàn)樵赟pringBoot的RabbitMQ配置中,會(huì)自動(dòng)創(chuàng)建交換器exchange和隊(duì)列queue,不過手動(dòng)創(chuàng)建的話,可以在忽略SpringBoot的基礎(chǔ)上,直接在RabbitMQ的管理頁面上看到修改記錄的消息。

SpringBoot整合RabbitMQ

依賴:

<amqp.version>2.3.4.RELEASE</amqp.version>??
??
<!--消息隊(duì)列-->??
<dependency>??
??<groupId>org.springframework.boot</groupId>??
??<artifactId>spring-boot-starter-amqp</artifactId>??
??<version>${amqp.version}</version>??
</dependency>??

application.yml?:

spring:??
??rabbitmq:??
????#????host:?myserverhost??
????host:?192.168.0.108??
????port:?5672??
????username:?admin??
????password:?RabbitMQ密碼??
????#?消息確認(rèn)配置項(xiàng)??
????#?確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)??
????publisher-confirm-type:?correlated??
????#?確認(rèn)消息已發(fā)送到隊(duì)列(Queue)??
????publisher-returns:?true??

RabbitMQ配置類:

@Configuration??
public?class?RabbitConfig?{??
????@Bean??
????public?RabbitTemplate?rabbitTemplate(ConnectionFactory?connectionFactory)?{??
????????RabbitTemplate?template?=?new?RabbitTemplate();??
????????template.setConnectionFactory(connectionFactory);??
????????template.setMessageConverter(new?Jackson2JsonMessageConverter());??
??
????????return?template;??
????}??
??
????/**??
?????*?template.setMessageConverter(new?Jackson2JsonMessageConverter());??
?????*?這段和上面這行代碼解決RabbitListener循環(huán)報(bào)錯(cuò)的問題??
?????*/??
????@Bean??
????public?SimpleRabbitListenerContainerFactory?rabbitListenerContainerFactory(ConnectionFactory?connectionFactory)?{??
????????SimpleRabbitListenerContainerFactory?factory?=?new?SimpleRabbitListenerContainerFactory();??
????????factory.setConnectionFactory(connectionFactory);??
????????factory.setMessageConverter(new?Jackson2JsonMessageConverter());??
????????return?factory;??
????}??
}??

Canal消息生產(chǎn)者:

public?static?final?String?CanalQueue?=?"canal-queue";??
public?static?final?String?CanalExchange?=?"canal-exchange";??
public?static?final?String?CanalRouting?=?"canal-routing-key";??

/**??
?*?Canal消息提供者,canal-server生產(chǎn)的消息通過RabbitMQ消息隊(duì)列發(fā)送??
?*/??
@Configuration??
public?class?CanalProvider?{??
????/**??
?????*?隊(duì)列??
?????*/??
????@Bean??
????public?Queue?canalQueue()?{??
????????/**??
?????????*?durable:是否持久化,默認(rèn)false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在;暫存隊(duì)列:當(dāng)前連接有效??
?????????*?exclusive:默認(rèn)為false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級(jí)高于durable??
?????????*?autoDelete:是否自動(dòng)刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除??
?????????*/??
????????return?new?Queue(RabbitConstant.CanalQueue,?true);??
????}??
??
????/**??
?????*?交換機(jī),這里使用直連交換機(jī)??
?????*/??
????@Bean??
????DirectExchange?canalExchange()?{??
????????return?new?DirectExchange(RabbitConstant.CanalExchange,?true,?false);??
????}??
??
????/**??
?????*?綁定交換機(jī)和隊(duì)列,并設(shè)置匹配鍵??
?????*/??
????@Bean??
????Binding?bindingCanal()?{??
????????return?BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);??
????}??
}??

Canal消息消費(fèi)者:

/**??
?*?Canal消息消費(fèi)者??
?*/??
@Component??
@RabbitListener(queues?=?RabbitConstant.CanalQueue)??
public?class?CanalComsumer?{??
????private?final?SysBackupService?sysBackupService;??
??
????public?CanalComsumer(SysBackupService?sysBackupService)?{??
????????this.sysBackupService?=?sysBackupService;??
????}??
??
????@RabbitHandler??
????public?void?process(Map<String,?Object>?msg)?{??
????????System.out.println("收到canal消息:"?+?msg);??
????????boolean?isDdl?=?(boolean)?msg.get("isDdl");??
??
????????//?不處理DDL事件??
????????if?(isDdl)?{??
????????????return;??
????????}??
??
????????//?TiCDC的id,應(yīng)該具有唯一性,先保存再說??
????????int?tid?=?(int)?msg.get("id");??
????????//?TiCDC生成該消息的時(shí)間戳,13位毫秒級(jí)??
????????long?ts?=?(long)?msg.get("ts");??
????????//?數(shù)據(jù)庫??
????????String?database?=?(String)?msg.get("database");??
????????//?表??
????????String?table?=?(String)?msg.get("table");??
????????//?類型:INSERT/UPDATE/DELETE??
????????String?type?=?(String)?msg.get("type");??
????????//?每一列的數(shù)據(jù)值??
????????List<?>?data?=?(List<?>)?msg.get("data");??
????????//?僅當(dāng)type為UPDATE時(shí)才有值,記錄每一列的名字和UPDATE之前的數(shù)據(jù)值??
????????List<?>?old?=?(List<?>)?msg.get("old");??
??
????????//?跳過sys_backup,防止無限循環(huán)??
????????if?("sys_backup".equalsIgnoreCase(table))?{??
????????????return;??
????????}??
??
????????//?只處理指定類型??
????????if?(!"INSERT".equalsIgnoreCase(type)??
????????????????&&?!"UPDATE".equalsIgnoreCase(type)??
????????????????&&?!"DELETE".equalsIgnoreCase(type))?{??
????????????return;??
????????}??
????}??
}??

測(cè)試一下,修改MySQL中的一條消息,Canal就會(huì)發(fā)送信息到RabbitMQ,我們就能從監(jiān)聽的RabbitMQ隊(duì)列中得到該條消息。

?

到了這里,關(guān)于SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot+Canal+RabbitMQ實(shí)戰(zhàn)

    SpringBoot+Canal+RabbitMQ實(shí)戰(zhàn)

    1. Canal簡(jiǎn)介 https://github.com/alibaba/canal 1.1 Canal工作原理 MySQL主備復(fù)制原理 MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看) MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log) MySQL slave 重放

    2024年02月03日
    瀏覽(13)
  • SpringBoot快速整合canal1.1.5(TCP模式)

    SpringBoot快速整合canal1.1.5(TCP模式) 安裝并配置MySQL主從? 1:Docker安裝MySQL8.0.28 2:創(chuàng)建目錄: 3:編寫my.cnf文件: 內(nèi)容如下:(注意:把binlog-do-db的值修改成你需要canal監(jiān)聽的數(shù)據(jù)庫名稱,如果需要監(jiān)聽多個(gè)數(shù)據(jù)庫,一定要在下面寫多個(gè)binlog-do-db,而不是用“,”分隔) 4:?jiǎn)?/p>

    2024年02月06日
    瀏覽(19)
  • 接過火炬,升級(jí)canal-client-springboot-starter,一個(gè)支持rabbitmq的CanalClient(1)

    接過火炬,升級(jí)canal-client-springboot-starter,一個(gè)支持rabbitmq的CanalClient(1)

    connect = factory.newConnection(); channel = connect.createChannel(); } catch (IOException | TimeoutException e) { throw new CanalClientException(“Start RabbitMQ producer error”, e); } } 不過也是,截止2021年10月26日,canal官方推薦的正式版本仍然是1.1.4,對(duì)客戶端支持rabbitmq還沒有做足夠的支持。 二、最終方案 參考

    2024年04月13日
    瀏覽(18)
  • springboot-rabbitmq 實(shí)現(xiàn)動(dòng)態(tài)配置監(jiān)聽容器

    springboot-rabbitmq 實(shí)現(xiàn)動(dòng)態(tài)配置監(jiān)聽容器

    1.1.1從factories我們可以看到mq的啟動(dòng)配置類 1.1.2然后我們找到 RabbitAutoConfiguration ,發(fā)現(xiàn)它引入了 RabbitAnnotationDrivenConfiguration 這個(gè)配置類 1.1.3進(jìn)入 RabbitAnnotationDrivenConfiguration 滑到最低部看到這里引入了 @EnableRabbit 這個(gè)注解,找個(gè)注解里面又引出 RabbitBootstrapConfiguration 這個(gè)配置類

    2023年04月09日
    瀏覽(25)
  • RabbitMQ: SpringBoot 整合 RabbitMQ

    重點(diǎn)是這個(gè)依賴 通過 ? ? ? ? ? ? ?和上一個(gè)一樣 ?

    2024年02月09日
    瀏覽(29)
  • 【RabbitMQ】RabbitMQ整合SpringBoot案例

    【RabbitMQ】RabbitMQ整合SpringBoot案例

    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章 RabbitMQ實(shí)現(xiàn)流程 2.1 實(shí)現(xiàn)架構(gòu)總覽 實(shí)現(xiàn)步驟: 1:創(chuàng)建生產(chǎn)者工程:sspringboot-rabbitmq-fanout-producer 2:創(chuàng)建消費(fèi)者工程:springboot-rabbitmq-fanout-consumer 3:引入spring-boot-rabbitmq的依賴 4:進(jìn)行消息的分發(fā)和測(cè)試 5:查看和觀察web控制臺(tái)的狀況 2.2 具體實(shí)現(xiàn)

    2024年02月12日
    瀏覽(22)
  • SpringBoot項(xiàng)目整合RabbitMQ

    消息隊(duì)列(Message Queue)是分布式系統(tǒng)中常用的組件,它允許不同的應(yīng)用程序之間通過發(fā)送和接收消息進(jìn)行通信。Spring Boot提供了簡(jiǎn)單且強(qiáng)大的方式來整合消息隊(duì)列,其中包括RabbitMQ、ActiveMQ、Kafka等多種消息隊(duì)列實(shí)現(xiàn)。 本文將以RabbitMQ為例,詳細(xì)介紹如何使用Spring Boot來整合消

    2024年02月09日
    瀏覽(30)
  • SpringBoot 整合RabbitMQ

    SpringBoot 整合RabbitMQ

    2007 年發(fā)布,是一個(gè)在 AMQP(高級(jí)消息隊(duì)列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議 )的開源實(shí)現(xiàn),由于erlang 語言的高并發(fā)特性,性能較好,本質(zhì)是個(gè)隊(duì)列,F(xiàn)IFO 先入先出

    2024年02月15日
    瀏覽(25)
  • SpringBoot 整合 RabbitMQ

    SpringBoot 整合 RabbitMQ

    由于有的 Idea 不選擇插線無法創(chuàng)建 Spring Boot 項(xiàng)目,這里我們先隨便選一個(gè)插件,大家也可以根據(jù)需求選擇~~ 把版本改為 2.7.14 引入這兩個(gè)依賴: 配置 application.yml文件 Config 類 : RabbitMQConfig 測(cè)試類: RabbitMQConfigTests 結(jié)果 當(dāng)我們啟動(dòng) 測(cè)試類 之后就可以發(fā)現(xiàn)我們的 rabbitmq 界面里的

    2024年02月10日
    瀏覽(16)
  • SpringBoot整合RabbitMQ

    SpringBoot整合RabbitMQ

    ??作者簡(jiǎn)介:練習(xí)時(shí)長(zhǎng)兩年半的Java up主 ??個(gè)人主頁:程序員老茶 ?? ps:點(diǎn)贊??是免費(fèi)的,卻可以讓寫博客的作者開心好久好久?? ??系列專欄:Java全棧,計(jì)算機(jī)系列(火速更新中) ?? 格言:種一棵樹最好的時(shí)間是十年前,其次是現(xiàn)在 ??動(dòng)動(dòng)小手,點(diǎn)個(gè)關(guān)注不迷路,感

    2024年02月21日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包