首先回顧下 offset 的定義
offset :在 Apache Kafka 中,offset 是一個用來唯一標識消息在分區(qū)中位置的數(shù)字。每個分區(qū)中的消息都會被分配一個唯一的 offset 值,用來表示該消息在該分區(qū)中的位置。消費者可以通過記錄自己消費的最后一個 offset 值來跟蹤自己消費消息的進度,確保不會漏掉消息或者重復消費消息。通過管理 offset,Kafka 實現(xiàn)了高效的消息傳遞和消費處理。
在 kafka0.9版本之前,consumer 默認將 offset 保存在 Zookeeper 中,但在0.9版本之后,offset被保存在 Kafka 一個內(nèi)置的 topic 中,該 topic 為 __consumer_offsets
__consumer_offsets :采用 KV 鍵值對的方式存儲,key:group.id + topic +?分區(qū)號,value :當前 offset 的值
__consumer_offsets 既然作為一個 topic 存在與 Kafka 中,那么它也可以通過消費者消費數(shù)據(jù)的方式進行消費。
自動提交 offset
在 Kafka 所提供的API中,enable.auto.commit 參數(shù)的值表示是否開啟自動提交 offset,默認為 true,消費者會自動周期性地向服務(wù)器提交偏移量,而 auto.commit.interval.ms 則表示自動提交?offset?的時間間隔,默認是5s。
核心代碼的實現(xiàn)
public class CustomConsumerAutoOffset {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消費者組id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交時間間隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
// 1 創(chuàng)建一個消費者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 訂閱主題first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消費數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
手動提交 offset
相比起自動提交,手動提交可以讓開發(fā)者更加容易的把握提交時機,同樣手動提交也分為同步提交(commitSync)和異步提交(commitAsync)
commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。
commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。
首先將?enable.auto.commit 參數(shù)的值改為 false
// 手動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
然后在消費數(shù)據(jù)的代碼中,加入手動提交 offset?
// 3 消費數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 手動提交offset(同步提交)
// kafkaConsumer.commitSync();
// 異步提交(主要)
kafkaConsumer.commitAsync();
}
指定 offset 消費
當消費者組第一次消費或者服務(wù)器上不再存在當前偏移量時,可以通過設(shè)置 auto.offset.reset 參數(shù)來指定偏移量的重置策略。
- 如果設(shè)置為?earliest?,則會將偏移量重置為最早的可用偏移量,相當于從最早的消息開始消費(即 --from-beginning)。
- 如果設(shè)置為 latest(默認值),則會將偏移量重置為最新的偏移量,即從最新的消息開始消費。
- 如果設(shè)置為 none,當未找到消費者組的先前偏移量時,會向消費者拋出異常。
- 也可以通過任意指定 offset 位移來開始消費。
主要介紹下通過任意指定 offset 位移來開始消費。在消費者代碼的基礎(chǔ)上,指定所要消費的位置,以及指定 offset
// 指定位置進行消費
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保證分區(qū)分配方案已經(jīng)指定完畢
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 指定offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition, 600);
}
指定時間消費
除了上面說的指定 offset 進行消費,也可以指定時間進行消費,比如指定消費前一天以后的數(shù)據(jù)
核心思路是,將想要指定的時間轉(zhuǎn)換為對應(yīng)的 offset 值,會用到 Kafka 所提供的 API:offsetsForTimes (這里的邏輯比較饒,這里只做介紹)文章來源:http://www.zghlxwxcb.cn/news/detail-846998.html
核心代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-846998.html
// 指定位置進行消費
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保證分區(qū)分配方案已經(jīng)指定完畢
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 希望將時間轉(zhuǎn)換為對應(yīng)的offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
// 封裝對應(yīng)集合
for (TopicPartition topicPartition : assignment) {
topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
// 指定offset
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
到了這里,關(guān)于Kafka之offset位移的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!