阿丹:
? ? ? ? 查閱了很多資料了解到,使用了spring-boot中整合的kafka的使用是被封裝好的。也就是說這些使用其實(shí)和在linux中的使用kafka代碼的使用其實(shí)沒有太大關(guān)系。但是邏輯是一樣的。這點(diǎn)要注意!
使用spring-boot整合kafka
1、導(dǎo)入依賴
核心配置為:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
如果在下面規(guī)定了spring-boot的版本那么就不需要再使用版本號,如果沒有的話就需要規(guī)定版本號。?
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!--配置文件報(bào)錯問題-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
</dependencies>
2、寫入配置
#服務(wù)端口號
server:
port: 8025
spring:
main:
allow-circular-references: true
application:
name: producer
kafka:
bootstrap-servers: kafka的ip地址:9092
producer:
# 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
retries: 1
#當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
batch-size: 16384
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。
# acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。
# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
acks: 1
consumer:
# 該屬性指定了消費(fèi)者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
# latest(默認(rèn)值)在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動之后生成的記錄)
# earliest :在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽器容器中運(yùn)行的線程數(shù)。
concurrency: 5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
3、生產(chǎn)者
將發(fā)送封裝為一個工具類
public void send(Object obj){
String obj2String = JSON.toJSONString(obj);
log.info("準(zhǔn)備發(fā)送消息為:{}",obj2String);
//發(fā)送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj2String);
//回調(diào)
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
//發(fā)送失敗的處理
log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
//成功的處理
log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + result.toString());
}
});
4、消費(fèi)者
?如果需要使用多線程來監(jiān)聽的話使用這個策略。文章來源:http://www.zghlxwxcb.cn/news/detail-672526.html
@KafkaListener(topics = "Hello-Kafka", groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> record) {
// 消息處理邏輯
}
@KafkaListener(topics = "Hello-Kafka", groupId = "group2")
public void onMessage2(ConsumerRecord<?, ?> record) {
// 消息處理邏輯
}
以上就可以簡單實(shí)現(xiàn)一個kafka的監(jiān)聽消費(fèi)。文章來源地址http://www.zghlxwxcb.cn/news/detail-672526.html
到了這里,關(guān)于kafka--技術(shù)文檔--spring-boot集成基礎(chǔ)簡單使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!