一、消費者(手動提交 offset)的概述
1.1、手動提交offset的兩種方式
- commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。
- commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。
1.2、手動提交offset兩種方式的區(qū)別
- 相同點:都會將本次提交的一批數(shù)據(jù)最高的偏移量提交。
- 不同點是:同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失敗);而異步提交則沒有失敗重試機制,故有可能提交失敗。
1.3、手動提交offset的圖解
二、消費者(手動提交 offset)的代碼示例
2.1、手動提交 offset(采用同步提交的方式)代碼
-
同步提交代碼
由于同步提交 offset 有失敗重試機制,故更加可靠,但是由于一直等待提交結(jié)果,提交的效率比較低。// 是否自動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手動提交offset(同步提交) kafkaConsumer.commitSync();
-
同步提交完整代碼
package com.xz.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerByHandSync { public static void main(String[] args) { // 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 手動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 創(chuàng)建一個消費者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 sevenTopic ArrayList<String> topics = new ArrayList<>(); topics.add("sevenTopic"); kafkaConsumer.subscribe(topics); // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 手動提交offset(同步提交) kafkaConsumer.commitSync(); } } }
2.1、手動提交 offset(采用異步提交的方式)代碼
-
異步提交代碼
雖然同步提交 offset 更可靠一些,但是由于其會阻塞當前線程,直到提交成功。因此吞吐量會受到很大的影響。因此更多的情況下,會選用異步提交 offset的方式。文章來源:http://www.zghlxwxcb.cn/news/detail-709427.html// 是否自動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手動提交offset(異步提交) kafkaConsumer.commitAsync();
-
異步提交完整代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-709427.html
package com.xz.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerByHandSync { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 手動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 創(chuàng)建一個消費者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 sevenTopic ArrayList<String> topics = new ArrayList<>(); topics.add("sevenTopic"); kafkaConsumer.subscribe(topics); // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 手動提交offset(異步提交) kafkaConsumer.commitAsync(); } } }
到了這里,關(guān)于Kafka3.0.0版本——消費者(手動提交offset)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!