手動提交offset
雖然offset十分遍歷,但是由于其是基于時間提交的,開發(fā)人員難以把握offset提交的實(shí)際。因此Kafka還提供了手動提交offset的API
手動提交offset的方法有兩種:分別commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點(diǎn)是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交:不同點(diǎn)是,同步提交阻塞當(dāng)前線程,一致到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。┒惒教峤粍t沒有重試機(jī)制,故有可能提交失敗。
commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。
commitAsync(異步提交):發(fā)送完提交offset請求后,就開始消費(fèi)下一批數(shù)據(jù)了
同步提交
是否自動提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
同步提交offset kafkaConsumer.commitSync();
由于同步提交offset有失敗重試機(jī)制,故更加可靠,但是由于一致等待提交結(jié)果,提交的效率比較低。以下為同步提交offset的示例
package com.longer.handsync;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
//創(chuàng)建消費(fèi)者的配置對象
Properties properties=new Properties();
//2、給消費(fèi)者配置對象添加參數(shù)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//配置序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消費(fèi)者組(組名任意起名)必須
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//修改分區(qū)策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
//是否自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//創(chuàng)建消費(fèi)者對象
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
//注冊要消費(fèi)的主題
ArrayList<String> topics=new ArrayList<>();
topics.add("two");
kafkaConsumer.subscribe(topics);
while (true){
//設(shè)置1s中消費(fèi)一批數(shù)據(jù)
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
//打印消費(fèi)到的數(shù)據(jù)
for(ConsumerRecord<String,String> record:consumerRecords){
System.out.println(record);
}
//同步提交offset
kafkaConsumer.commitSync();
}
}
}
異步提交
雖然同步提交offset更可靠一些,但是由于其會阻塞當(dāng)前線程,直到提交成功。因此吞吐量會收到很大的影響,因此更多情況下會選擇異步offset的方式kafkaConsumer.commitAsync();
文章來源:http://www.zghlxwxcb.cn/news/detail-665281.html
package com.longer.handasync;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* 同步提交
*/
public class CustomConsumerByHandAsync {
public static void main(String[] args) {
//創(chuàng)建消費(fèi)者的配置對象
Properties properties=new Properties();
//2、給消費(fèi)者配置對象添加參數(shù)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//配置序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消費(fèi)者組(組名任意起名)必須
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//修改分區(qū)策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
//是否自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//創(chuàng)建消費(fèi)者對象
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
//注冊要消費(fèi)的主題
ArrayList<String> topics=new ArrayList<>();
topics.add("two");
kafkaConsumer.subscribe(topics);
while (true){
//設(shè)置1s中消費(fèi)一批數(shù)據(jù)
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
//打印消費(fèi)到的數(shù)據(jù)
for(ConsumerRecord<String,String> record:consumerRecords){
System.out.println(record);
}
//同步提交offset
kafkaConsumer.commitAsync();
}
}
}
指定 Offset 消費(fèi)
auto.offset.reset = earliest | latest | none 默認(rèn)是latest
當(dāng)Kafka中沒有初始偏移量(消費(fèi)者組第一次消費(fèi))或服務(wù)器上不再存在當(dāng)前偏移量時(例如該數(shù)據(jù)已被刪除),該怎么辦?
1)earliest:自動將偏移量重置為最早的偏移量,–from-beginning
2) latest(默認(rèn)值):自動將偏移量重置為最新偏移量
3)如果未找到消費(fèi)者組的先前偏移量,則向消費(fèi)者拋出異常。
主要代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-665281.html
Set<TopicPartition> assigment=new HashSet<>();
while (assigment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
//獲取消費(fèi)者分區(qū)分配信息(有了分區(qū)分配信息才能開始消費(fèi))
assigment= kafkaConsumer.assignment();
}
//遍歷所有分區(qū),并指定從100得位置開始消費(fèi)
for (TopicPartition tp : assigment) {
kafkaConsumer.seek(tp,100);
}
package com.longer.seek;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {
public static void main(String[] args) {
//創(chuàng)建消費(fèi)者的配置對象
Properties properties=new Properties();
//2、給消費(fèi)者配置對象添加參數(shù)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//配置序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消費(fèi)者組(組名任意起名)必須
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//修改分區(qū)策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
//創(chuàng)建消費(fèi)者對象
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
//注冊要消費(fèi)的主題
ArrayList<String> topics=new ArrayList<>();
topics.add("two");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assigment=new HashSet<>();
while (assigment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
//獲取消費(fèi)者分區(qū)分配信息(有了分區(qū)分配信息才能開始消費(fèi))
assigment= kafkaConsumer.assignment();
}
//遍歷所有分區(qū),并指定從100得位置開始消費(fèi)
for (TopicPartition tp : assigment) {
kafkaConsumer.seek(tp,100);
}
while (true){
//設(shè)置1s中消費(fèi)一批數(shù)據(jù)
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
//打印消費(fèi)到的數(shù)據(jù)
for(ConsumerRecord<String,String> record:consumerRecords){
System.out.println(record);
}
}
}
}
到了這里,關(guān)于Kafka入門,手動提交offset,同步提交,異步提交,指定 Offset 消費(fèi)(二十三)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!