前言
在上一篇文章中,寫到了如何在springboot中生產(chǎn)者如何使用kafka的事務(wù),詳情鏈接:Springboot使用kafka事務(wù)-生產(chǎn)者方
那么,這一篇就接著上篇所寫的內(nèi)容,講解一下再springboot中消費(fèi)者如何使用kafka的事務(wù)。
實(shí)現(xiàn)
在springboot中kafka的消費(fèi)者配置也和生產(chǎn)者一樣,有兩種配置的方式:
- 第一種是使用springboot提供的自定裝配機(jī)制
- 第二種是自定義配置
自動(dòng)裝配機(jī)制
在springboot的配置文件中加入以下代碼即可實(shí)現(xiàn)
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test_group #默認(rèn)組id 后面會(huì)配置多個(gè)消費(fèi)者組
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
isolation-level: read_committed
enable-auto-commit: false #關(guān)閉自動(dòng)提交
auto-commit-interval: 100
max-poll-records: 20 #批量消費(fèi) 一次接收的最大數(shù)量
這樣就實(shí)現(xiàn)了事務(wù)的自動(dòng)狀態(tài),特別注意的是配置文件中的isolation-level屬性,這個(gè)屬性一定要設(shè)置讀已提交的事務(wù)級(jí)別,這樣才能配合生產(chǎn)者實(shí)現(xiàn)事務(wù)的特性。
使用
這種配置方式的使用就很簡(jiǎn)單了,
第一:新建一個(gè)管理類,類名上用Component注解標(biāo)識(shí)為需要springboot管理
@Component
public class kafkaConfigs {
}
第二:使用springboot提供的KafkaListener注解,即可使用
@KafkaListener
public void testListener(String data) {
log.info("接受到的數(shù)據(jù)為: {} ",data);
}
全部代碼如下:
@Component
public class kafkaConfigs {
@KafkaListener
public void testListener(String data) {
log.info("接受到的數(shù)據(jù)為: {} ",data);
}
}
缺點(diǎn)
自動(dòng)裝配機(jī)制是很方便的,但是在一些場(chǎng)景下,我們需要連接多個(gè)kafka的地址來(lái)實(shí)現(xiàn)不同的業(yè)務(wù),而且有的場(chǎng)景之下我們并不需要kafka的事務(wù)管理機(jī)制,所以這就需要用到我們的第二種方法,自定義配置了。
自定義配置
這次,我們使用springboot為我們提供的KafkaListener注解來(lái)實(shí)現(xiàn)這個(gè)功能。
在yml配置文件中加入第二個(gè)kakfa的連接地址,并且將事務(wù)紫隔離級(jí)別去掉即可。
spring:
kafka:
bootstrap-servers: localhost:9092
bootstrap-servers-2: localhost2:9092
consumer:
group-id: test_group #默認(rèn)組id 后面會(huì)配置多個(gè)消費(fèi)者組
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false #關(guān)閉自動(dòng)提交
auto-commit-interval: 100
max-poll-records: 20 #批量消費(fèi) 一次接收的最大數(shù)量
注意bootstrap-servers-2這個(gè)key,是我們自定義的key,它在kafka的自動(dòng)配置包里面是沒有的。
使用
自定義配置的使用和第一種使用方式大同小異,具體為:
第一:新建一個(gè)管理類,類名上用Component注解標(biāo)識(shí)為需要springboot管理
@Component
public class kafkaConfigs {
}
第二:使用springboot提供的KafkaListener注解,并且在這里標(biāo)識(shí)需要使用到的kafka連接地址以及事務(wù)隔離級(jí)別
@KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})
public void testListener1(String data) {
log.info("接受到的數(shù)據(jù)為: {} ",data);
}
全代碼如下:
@Component
public class kafkaConfigs {
@KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})
public void testListener1(String data) {
log.info("接受到的數(shù)據(jù)為: {} ",data);
}
}
可以看到,我們使用了properties屬性指定了需要連接的kafka地址,并且指定了事務(wù)的隔離級(jí)別,這樣就實(shí)現(xiàn)了一個(gè)具有事務(wù)功能的消費(fèi)者,并且對(duì)其他方法不產(chǎn)生任何影響。
總結(jié)
以上,我們使用兩種方式配置springboot中kafka消費(fèi)者如何使用事務(wù),讀者可以結(jié)合上篇文章結(jié)合食用,效果更佳!文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-720950.html
上篇鏈接:Springboot使用kafka事務(wù)-生產(chǎn)者方文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-720950.html
到了這里,關(guān)于SpringBoot使用kafka事務(wù)-消費(fèi)者方的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!