1.引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
#producer:
#當retris為0時,produce不會重復。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。
#如果沒收到ack響應 重試次數(shù) 設置大于0的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送
#retries: 3
# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送 每次提交的批次大小 16K
# batch-size: 16384
#produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù) 32M
# buffer-memory: 33554432
# 0 是直接響應返回 1是leader完成響應返回 -1(all) 是ISR里 leade follower 全部完成 響應
#procedure要求leader在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產(chǎn)者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數(shù)據(jù)復制到所有的副本服務器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當于acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
# acks: 1
#ey value 的序列化
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默認消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名
group-id: defaultName
#關(guān)閉自動提交
enable-auto-commit: false
#重置消費者的offset
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設置smallest
auto-offset-reset: latest
#key value 的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 5
listener:
# RECORD 當每一條記錄被消費者監(jiān)聽器(ListenerConsumer)處理后提交
# BATCH 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理后提交
# TIME 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理后,距離上次提交時間大于TIME時提交
# COUNT 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理后,被處理record數(shù)量大于COUNT時提交
# COUNT_TIME TIME | COUMT 有一個條件滿足時提交
# MANUAL 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理后,手動調(diào)用 Acknowledgment.acknowledge()后提交
# MANUAL_IMMEDIATE 手動調(diào)用 Acknowledgment.acknowledge() 之后 立即提交
ack-mode: manual_immediate
# 消費監(jiān)聽接口監(jiān)聽的主題不存在時,默認會報錯
missing-topics-fatal: false
3.設置消費
@Component
public class KafkaConsumer {
private final static String TOPIC_NAME="topic_NAME";
@KafkaListener(topics = TOPIC_NAME,groupId = "defaultName")
public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){
System.out.println(record.value());
System.out.println(record);
手動提交offset
ack.acknowledge();
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-603771.html
文章來源:http://www.zghlxwxcb.cn/news/detail-603771.html
到了這里,關(guān)于springboot簡單使用kafka消費者監(jiān)聽,以及kafka配置賬號密碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!