一、自動(dòng)提交offset的相關(guān)參數(shù)
-
官網(wǎng)文檔
-
參數(shù)解釋
參數(shù) 描述 enable.auto.commi 默認(rèn)值為 true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。 auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s。 -
圖解分析
二、消費(fèi)者(自動(dòng)提交 offset)代碼示例
-
消費(fèi)者自動(dòng)提交 offset代碼文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-709435.html
// 自動(dòng)提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交時(shí)間間隔 1秒 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
-
消費(fèi)者自動(dòng)提交 offset代碼完整代碼文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-709435.html
package com.xz.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerAutoOffset { public static void main(String[] args) { // 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費(fèi)者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 自動(dòng)提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交時(shí)間間隔 1秒 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 1 創(chuàng)建一個(gè)消費(fèi)者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 first ArrayList<String> topics = new ArrayList<>(); topics.add("sevenTopic"); kafkaConsumer.subscribe(topics); // 3 消費(fèi)數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
到了這里,關(guān)于Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!