1. 自動(dòng)提交消費(fèi)位移
最簡(jiǎn)單的提交方式是讓消費(fèi)者自動(dòng)提交偏移量,自動(dòng)提交 offset 的相關(guān)參數(shù):
- enable.auto.commit:是否開(kāi)啟自動(dòng)提交 offset 功能,默認(rèn)為 true;
- auto.commit.interval.ms:自動(dòng)提交 offset 的時(shí)間間隔,默認(rèn)為5秒;
如果 enable.auto.commit 被設(shè)置為true,那么每過(guò)5秒,消費(fèi)者就會(huì)自動(dòng)提交 poll() 返回的最大偏移量,即將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交。提交時(shí)間間隔通過(guò) auto.commit.interval.ms 來(lái)設(shè)定,默認(rèn)是5秒。與消費(fèi)者中的其他處理過(guò)程一樣,自動(dòng)提交也是在輪詢(xún)循環(huán)中進(jìn)行的。消費(fèi)者會(huì)在每次輪詢(xún)時(shí)檢查是否該提交偏移量了,如果是,就會(huì)提交最后一次輪詢(xún)返回的偏移量。
① 啟動(dòng)消費(fèi)者消費(fèi)程序,并設(shè)置為自動(dòng)提交消費(fèi)者位移的方式:
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-ni");
// 顯式配置消費(fèi)者自動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 顯式配置消費(fèi)者自動(dòng)提交位移的事件間隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);
// 創(chuàng)建消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 訂閱主題
consumer.subscribe(Arrays.asList("ni"));
// 消費(fèi)數(shù)據(jù)
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("主題 = %s, 分區(qū) = %d, 位移 = %d, " + "消息鍵 = %s, 消息值 = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
② 啟動(dòng)生產(chǎn)者程序發(fā)送3條消息,消息的內(nèi)容都為 hello,kafka
③ 查看消費(fèi)者消費(fèi)的消息記錄:
主題 = ni, 分區(qū) = 0, 位移 = 0, 消息鍵 = null, 消息值 = hello,kafka
主題 = ni, 分區(qū) = 0, 位移 = 1, 消息鍵 = null, 消息值 = hello,kafka
主題 = ni, 分區(qū) = 0, 位移 = 2, 消息鍵 = null, 消息值 = hello,kafka
可以看到,消費(fèi)者消費(fèi)分區(qū)的最新消息的位移為 offset= 2,即消費(fèi)者的消息位移為 offset =2;
④ 查看消費(fèi)者提交的位移:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[group-ni,ni,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1692168114999, expireTimestamp=None)
可以看到,消費(fèi)者的消息位移為 offset =2,但是消費(fèi)者的提交位移為 offset =3;
2. 自動(dòng)提交消費(fèi)位移存在的問(wèn)題?
假設(shè)剛剛提交完一次消費(fèi)位移,然后拉取一批消息進(jìn)行消費(fèi),在下一次自動(dòng)提交消費(fèi)位移之前,消費(fèi)者崩潰了,那么又得從上一次位移提交的地方重新開(kāi)始消費(fèi),這樣便發(fā)生了重復(fù)消費(fèi)的現(xiàn)象(對(duì)于再均衡的情況同樣適用,再均衡完成之后,接管分區(qū)的消費(fèi)者將從最后一次提交的偏移量的位置開(kāi)始讀取消息)??梢酝ㄟ^(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,縮小可能導(dǎo)致重復(fù)消息的時(shí)間窗口,但無(wú)法完全避免。
在使用自動(dòng)提交時(shí),到了該提交偏移量的時(shí)候,輪詢(xún)方法將提交上一次輪詢(xún)返回的偏移量,但它并不知道具體哪些消息已經(jīng)被處理過(guò)了。所以,在再次調(diào)用poll()之前,要確保上一次poll()返回的所有消息都已經(jīng)處理完畢(調(diào)用close()方法也會(huì)自動(dòng)提交偏移量)。通常情況下這不會(huì)有什么問(wèn)題,但在處理異?;蛱崆巴顺鲚喸?xún)循環(huán)時(shí)需要特別小心。
雖然自動(dòng)提交很方便,但是沒(méi)有為避免開(kāi)發(fā)者重復(fù)處理消息留有余地。
3. 手動(dòng)提交消費(fèi)位移
在Kafka中還提供了手動(dòng)位移提交的方式,這樣可以使得開(kāi)發(fā)人員對(duì)消費(fèi)位移的管理控制更加靈活。很多時(shí)候并不是說(shuō)拉取到消息就算消費(fèi)完成,而是需要將消息寫(xiě)入數(shù)據(jù)庫(kù)、寫(xiě)入本地緩存,或者是更加復(fù)雜的業(yè)務(wù)處理。在這些場(chǎng)景下,所有的業(yè)務(wù)處理完成才能認(rèn)為消息被成功消費(fèi),手動(dòng)的提交方式可以讓開(kāi)發(fā)人員根據(jù)程序的邏輯在合適的地方進(jìn)行位移提交。
開(kāi)啟手動(dòng)提交功能的前提是消費(fèi)者客戶(hù)端參數(shù) enable.auto.commit 配置為 false,讓?xiě)?yīng)用程序自己決定何時(shí)提交偏移量。手動(dòng)提交可以細(xì)分為同步提交和異步提交,對(duì)應(yīng)于 KafkaConsumer 中的 commitSync() 和 commitAsync() 兩種類(lèi)型的方法。
① 同步提交位移是指消費(fèi)者在提交位移時(shí)會(huì)阻塞,直到提交完成并收到確認(rèn)。它會(huì)提交 poll() 返回的最新偏移量,提交成功后馬上返回,如果由于某些原因提交失敗就拋出異常。 commitAsync() 方法有四個(gè)不同的重載方法,具體定義如下:
public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout)
② 異步提交位移在執(zhí)行的時(shí)候消費(fèi)者線程不會(huì)被阻塞,可能在提交消費(fèi)位移的結(jié)果還未返回之前就開(kāi)始了新一次的拉取操作。異步提交可以使消費(fèi)者的性能得到一定的增強(qiáng)。commitAsync方法有三個(gè)不同的重載方法,具體定義如下:
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
1. 同步提交消費(fèi)位移
在消費(fèi)消息的循環(huán)中,處理完當(dāng)前批次的消息后,在輪詢(xún)更多的消息之前,調(diào)用 commitSync() 方法提交當(dāng)前批次最新的偏移量,這會(huì)阻塞當(dāng)前線程,直到位移提交完成并收到確認(rèn)。 只要沒(méi)有發(fā)生不可恢復(fù)的錯(cuò)誤,commitSync() 方法就會(huì)一直嘗試直至提交成功。如果提交失敗,就把異常記錄到錯(cuò)誤日志里。
public void commitSync()
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 創(chuàng)建消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 訂閱主題
consumer.subscribe(Arrays.asList("topic-01"));
// 消費(fèi)數(shù)據(jù)
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : consumerRecords) {
// 業(yè)務(wù)處理拉取的消息
}
try{
// 消費(fèi)者手動(dòng)提交消費(fèi)位移:同步提交方式
consumer.commitSync();
}catch (CommitFailedException exception){
log.error("commit failed....");
}
}
}
}
還可以將消費(fèi)者程序修改為批量處理+批量提交的方式:
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 創(chuàng)建消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 訂閱主題
consumer.subscribe(Arrays.asList("topic-01"));
// 消費(fèi)數(shù)據(jù)
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
int minSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record : consumerRecords) {
buffer.add(record);
}
try{
// 消費(fèi)者手動(dòng)提交消費(fèi)位移:同步提交方式
if(buffer.size()>minSize){
// 批量處理消息
// ...
}
// 手動(dòng)提交位移:同步方式
consumer.commitSync();
}catch (CommitFailedException exception){
log.error("commit failed....");
}
}
}
}
上面的示例中將拉取到的消息存入緩存 buffer,等到積累到足夠多的時(shí)候,也就是大于等于200個(gè)的時(shí)候,再做相應(yīng)的批量處理,之后再做批量提交。
commitSync() 方法會(huì)根據(jù) poll() 方法拉取的最新位移來(lái)進(jìn)行提交,只要沒(méi)有發(fā)生不可恢復(fù)的錯(cuò)誤,它就會(huì)阻塞消費(fèi)者線程直至位移提交完成。對(duì)于不可恢復(fù)的錯(cuò)誤,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我們可以將其捕獲并做針對(duì)性的處理。
需要注意的是,同步提交位移時(shí)需要確保在處理完消息后再進(jìn)行提交,因?yàn)?commitSync() 將會(huì)提交 poll() 返回的最新偏移量,如果你在處理完所有記錄之前就調(diào)用了 commitSync(),那么一旦應(yīng)用程序發(fā)生崩潰,就會(huì)有丟失消息的風(fēng)險(xiǎn)(消息已被提交但未被處理)。如果應(yīng)用程序在處理記錄時(shí)發(fā)生崩潰,但 commitSync() 還沒(méi)有被調(diào)用,那么從最近批次的開(kāi)始位置到發(fā)生再均衡時(shí)的所有消息都將被再次處理——這或許比丟失消息更好,或許更壞。
2. 異步提交消費(fèi)位移
同步提交有一個(gè)缺點(diǎn),在broker對(duì)請(qǐng)求做出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量??梢酝ㄟ^(guò)降低提交頻率來(lái)提升吞吐量,但如果發(fā)生了再均衡,則會(huì)增加潛在的消息重復(fù)。這個(gè)時(shí)候可以使用異步提交API。只管發(fā)送請(qǐng)求,無(wú)須等待broker做出響應(yīng)。
public void commitAsync()
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 創(chuàng)建消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 訂閱主題
consumer.subscribe(Arrays.asList("topic-01"));
// 消費(fèi)數(shù)據(jù)
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 業(yè)務(wù)邏輯處理
}
// 異步提交消費(fèi)位移
consumer.commitAsync();
}
}
}
在提交成功或碰到無(wú)法恢復(fù)的錯(cuò)誤之前,commitSync() 會(huì)一直重試,但commitAsync()不會(huì),這是commitAsync() 的一個(gè)缺點(diǎn)。之所以不進(jìn)行重試,是因?yàn)?commitAsync() 在收到服務(wù)器端的響應(yīng)時(shí),可能已經(jīng)有一個(gè)更大的位移提交成功。假設(shè)我們發(fā)出一個(gè)提交位移2000的請(qǐng)求,這個(gè)時(shí)候出現(xiàn)了短暫的通信問(wèn)題,服務(wù)器收不到請(qǐng)求,自然也不會(huì)做出響應(yīng)。與此同時(shí),我們處理了另外一批消息,并成功提交了位移3000。如果此時(shí) commitAsync() 重新嘗試提交位移2000,則有可能在位移3000之后提交成功。這個(gè)時(shí)候如果發(fā)生再均衡,就會(huì)導(dǎo)致消息重復(fù)。
之所以提到這個(gè)問(wèn)題并強(qiáng)調(diào)提交順序的重要性,是因?yàn)?commitAsync() 也支持回調(diào),回調(diào)會(huì)在broker返回響應(yīng)時(shí)執(zhí)行?;卣{(diào)經(jīng)常被用于記錄位移提交錯(cuò)誤或生成指標(biāo),如果要用它來(lái)重試提交位移,那么一定要注意提交順序。
public void commitAsync(OffsetCommitCallback callback)
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 創(chuàng)建消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 訂閱主題
consumer.subscribe(Arrays.asList("topic-01"));
// 消費(fèi)數(shù)據(jù)
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 業(yè)務(wù)邏輯處理
}
// 異步提交消費(fèi)位移
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, Exception exception) {
if(exception!=null){
log.info("fail to commit offsets:{}",offsetAndMetadataMap,exception);
}
}
});
}
}
}
異步提交中如何實(shí)現(xiàn)重試:我們可以設(shè)置一個(gè)遞增的序號(hào)來(lái)維護(hù)異步提交的順序,每次位移提交之后就增加序號(hào)相對(duì)應(yīng)的值。在遇到位移提交失敗需要重試的時(shí)候,可以檢查所提交的位移和序號(hào)的值的大小,如果前者小于后者,則說(shuō)明有更大的位移已經(jīng)提交了,不需要再進(jìn)行本次重試;如果兩者相同,則說(shuō)明可以進(jìn)行重試提交。
3. 同步和異步組合提交消費(fèi)位移
一般情況下,偶爾提交失敗但不進(jìn)行重試不會(huì)有太大問(wèn)題,因?yàn)槿绻峤皇∈怯捎谂R時(shí)問(wèn)題導(dǎo)致的,后續(xù)的提交總會(huì)成功。如果消費(fèi)者異常退出,那么這個(gè)重復(fù)消費(fèi)的問(wèn)題就很難避免,因?yàn)檫@種情況下無(wú)法及時(shí)提交消費(fèi)位移;但如果這是發(fā)生在消費(fèi)者被關(guān)閉或再均衡前的最后一次提交,則要確保提交是成功的,可以在退出或再均衡執(zhí)行之前使用同步提交的方式做最后的把關(guān)。
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll( Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 業(yè)務(wù)邏輯處理
}
// 異步提交位移
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 同步提交位移
consumer.commitSync();
}finally{
consumer.close();
}
}
}
}
4. 提交特定的消費(fèi)位移
對(duì)于采用 commitSync() 的無(wú)參方法而言,它提交消費(fèi)位移的頻率和拉取批次消息、處理批次消息的頻率是一樣的。但如果想要更頻繁地提交位移該怎么辦?如果 poll() 返回了一大批數(shù)據(jù),那么為了避免可能因再均衡引起的消息重復(fù),想要在批次處理過(guò)程中提交位移該怎么辦?這個(gè)時(shí)候不能只是調(diào)用 commitSync() 或commitAsync(),因?yàn)樗鼈冎粫?huì)提交消息批次里的最后一個(gè)位移。
幸運(yùn)的是,消費(fèi)者API允許在調(diào)用 commitSync() 和 commitAsync() 時(shí)傳給它們想要提交的分區(qū)和位移:
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
如圖:消費(fèi)者的提交位移=當(dāng)前一次poll拉取的分區(qū)消息的最大位移offset + 1,這個(gè)提交位移就是下次
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
ConcurrentHashMap<TopicPartition,OffsetAndMetadata> offsets = new ConcurrentHashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 消息所屬的主題和分區(qū)
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
// 消費(fèi)者提交的消費(fèi)位移=當(dāng)前消費(fèi)消息的位移+1
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
offsets.put(topicPartition, offsetAndMetadata);
if(count % 1000 == 0){
consumer.commitAsync(offsets,null);
}
count++;
}
}
}
}
5. 按分區(qū)提交消費(fèi)位移
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 獲取拉取的消息包含的所有分區(qū)列表
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition partition : partitions) {
// 獲取當(dāng)前分區(qū)要消費(fèi)的消息
List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
// 獲取當(dāng)前分區(qū)消息的最大位移
long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 當(dāng)前分區(qū)的消費(fèi)位移提交 = 當(dāng)前分區(qū)消息的最大位移 + 1
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1));
consumer.commitSync(topicPartitionOffsetAndMetadataMap);
}
}
}
}
4. 消費(fèi)者查找不到消費(fèi)位移時(shí)怎么辦?
當(dāng)一個(gè)新的消費(fèi)組建立的時(shí)候,它根本沒(méi)有可以查找的消費(fèi)位移。或者消費(fèi)組內(nèi)的一個(gè)新消費(fèi)者訂閱了一個(gè)新的主題,它也沒(méi)有可以查找的消費(fèi)位移。當(dāng)__consumer_offsets 主題中有關(guān)這個(gè)消費(fèi)組的位移信息過(guò)期而被刪除后,它也沒(méi)有可以查找的消費(fèi)位移。當(dāng) Kafka 中沒(méi)有初始位移或服務(wù)器上不再存在當(dāng)前位移時(shí),該怎么辦?
此時(shí)會(huì)根據(jù)消費(fèi)者客戶(hù)端參數(shù) auto.offset.reset 的配置來(lái)決定從何處開(kāi)始進(jìn)行消費(fèi),auto.offset.reset 參數(shù)的取值如下:
- latest(默認(rèn)值):表示從分區(qū)末尾開(kāi)始消費(fèi)消息。
- earliest: 表示消費(fèi)者會(huì)從起始處,也就是0開(kāi)始消費(fèi)。
- none:查到不到消費(fèi)位移的時(shí)候,既不從最新的消息位置處開(kāi)始消費(fèi),也不從最早的消息位置處開(kāi)始消費(fèi),此時(shí)會(huì)報(bào)出NoOffsetForPartitionException異常。如果能夠找到消費(fèi)位移,那么配置為“none”不會(huì)出現(xiàn)任何異常。
如果配置的不是“l(fā)atest”、“earliest”和“none”,則會(huì)報(bào)出ConfigException異常。
auto.offset.reset 參數(shù)用于指定消費(fèi)者在啟動(dòng)時(shí),如果找不到消費(fèi)位移應(yīng)該從哪里開(kāi)始消費(fèi)消息。 如果能夠找到消費(fèi)位移,那么消費(fèi)者會(huì)從該位移處開(kāi)始消費(fèi)消息,那么 auto.offset.reset 參數(shù)并不會(huì)奏效,只有在找不到消費(fèi)位移時(shí)才會(huì)生效。如果發(fā)生位移越界,即消費(fèi)位移超出了消息隊(duì)列中消息的數(shù)量或位置范圍,那么 auto.offset.reset 參數(shù)也會(huì)生效。
5. 如何從特定分區(qū)位移處讀取消息?
如果消費(fèi)者能夠找到消費(fèi)位移,使用 poll() 可以從各個(gè)分區(qū)的最新位移處讀取消息, 而且提供的 auto.offset.reset 參數(shù)也可以在找不到消費(fèi)位移或位移越界的情況下粗粒度地從開(kāi)頭或末尾開(kāi)始消費(fèi)。但是有些時(shí)候,我們需要一種更細(xì)粒度的掌控,可以讓我們從特定的位移處開(kāi)始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了這個(gè)功能,讓我們得以追前消費(fèi)或回溯消費(fèi)。
public void seek(TopicPartition partition, long offset)
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
① seek() 方法中的參數(shù) partition 表示分區(qū),而 offset 參數(shù)用來(lái)指定從分區(qū)的哪個(gè)位置開(kāi)始消費(fèi)。seek() 方法只能重置消費(fèi)者分配到的分區(qū)的消費(fèi)位置,而分區(qū)的分配是在 poll() 方法的調(diào)用過(guò)程中實(shí)現(xiàn)的。也就是說(shuō),在執(zhí)行 seek() 方法之前需要先執(zhí)行一次poll()方法,等到分配到分區(qū)之后才可以重置消費(fèi)位置:
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
// 執(zhí)行一次poll() 方法完成分區(qū)分配的邏輯
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
Set<TopicPartition> topicPartitions = consumer.assignment();
for (TopicPartition topicPartition : topicPartitions) {
consumer.seek(topicPartition,10);
}
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
// ...
}
}
}
② 如果 poll() 方法中的參數(shù)為0,此方法立刻返回,那么 poll() 方法內(nèi)部進(jìn)行分區(qū)分配的邏輯就會(huì)來(lái)不及實(shí)施,也就是說(shuō),消費(fèi)者此時(shí)并未分配到任何分區(qū),那么 topicPartitions 便是一個(gè)空列表。那么這里的 timeout 參數(shù)設(shè)置為多少合適呢?太短會(huì)使分配分區(qū)的動(dòng)作失敗,太長(zhǎng)又有可能造成一些不必要的等待。我們可以通過(guò) KafkaConsumer的 assignment()方法來(lái)判定是否分配到了相應(yīng)的分區(qū):
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
Set<TopicPartition> topicPartitions = consumer.assignment();
// 此時(shí)說(shuō)明還未完成分區(qū)分配
while (topicPartitions.size()==0){
consumer.poll(Duration.ofMillis(100));
topicPartitions = consumer.assignment();
}
for (TopicPartition topicPartition : topicPartitions) {
// 重置每個(gè)分區(qū)的消費(fèi)位置為10
consumer.seek(topicPartition,10);
}
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
// 消費(fèi)消息
}
}
}
③ 如果對(duì)未分配到的分區(qū)執(zhí)行seek() 方法,那么會(huì)報(bào)出 IllegalStateException 的異常。類(lèi)似在調(diào)用subscribe() 方法之后直接調(diào)用seek() 方法:
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
// 未完成分區(qū)分配,直接調(diào)用seek方法,重置分區(qū)1的消費(fèi)位置為10
consumer.seek(new TopicPartition("topic-01",1),10);
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
// 消費(fèi)消息
}
}
}
報(bào)錯(cuò):
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition topic-01-1
④ 如果消費(fèi)組內(nèi)的消費(fèi)者在啟動(dòng)的時(shí)候能夠找到消費(fèi)位移,那么消費(fèi)者就會(huì)從該位移處開(kāi)始消費(fèi)消息。除非發(fā)生位移越界,即消費(fèi)位移超出了消息隊(duì)列中消息的數(shù)量或位置范圍,否則 auto.offset.reset 參數(shù)并不會(huì)奏效,此時(shí)如果想指定從開(kāi)頭或末尾開(kāi)始消費(fèi),就需要seek() 方法的幫助了,指定從分區(qū)末尾開(kāi)始消費(fèi):
endOffsets() 方法用來(lái)獲取指定分區(qū)的末尾的消息位置, endOffsets 的具體方法定義如下:
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)
其中 partitions 參數(shù)表示分區(qū)集合,而 timeout 參數(shù)用來(lái)設(shè)置等待獲取的超時(shí)時(shí)間。如果沒(méi)有指定 timeout 參數(shù)的值,那么 endOffsets() 方法的等待時(shí)間由客戶(hù)端參數(shù) request.timeout.ms 來(lái)設(shè)置,默認(rèn)值為 30000。與 endOffsets 對(duì)應(yīng)的是 beginningOffset() 方法,一個(gè)分區(qū)的起始位置起初是0,但并不代表每時(shí)每刻都為0,因?yàn)槿罩厩謇淼膭?dòng)作會(huì)清理舊的數(shù)據(jù),所以分區(qū)的起始位置會(huì)自然而然地增加,beginningOffsets() 方法的具體定義如下:
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)
beginningOffsets() 方法中的參數(shù)內(nèi)容和含義都與 endOffsets() 方法中的一樣,配合這兩個(gè)方法我們就可以從分區(qū)的開(kāi)頭或末尾開(kāi)始消費(fèi)。其實(shí)KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法來(lái)實(shí)現(xiàn)這兩個(gè)功能,這兩個(gè)方法的具體定義如下:
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
⑤ 有時(shí)候我們并不知道特定的消費(fèi)位置,卻知道一個(gè)相關(guān)的時(shí)間點(diǎn),比如我們想要消費(fèi)昨天8點(diǎn)之后的消息,這個(gè)需求更符合正常的思維邏輯。此時(shí)我們無(wú)法直接使用seek() 方法來(lái)追溯到相應(yīng)的位置。KafkaConsumer同樣考慮到了這種情況,它提供了一個(gè)offsetsForTimes() 方法,通過(guò)timestamp來(lái)查詢(xún)與此對(duì)應(yīng)的分區(qū)位置:
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)
offsetsForTimes() 方法的參數(shù) timestampsToSearch 是一個(gè)Map類(lèi)型,key為待查詢(xún)的分區(qū),而 value 為待查詢(xún)的時(shí)間戳,該方法會(huì)返回時(shí)間戳大于等于待查詢(xún)時(shí)間的第一條消息對(duì)應(yīng)的位置和時(shí)間戳,對(duì)應(yīng)于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之間的使用方法,首先通過(guò) offsetsForTimes() 方法獲取一天之前的消息位置,然后使用 seek() 方法追溯到相應(yīng)位置開(kāi)始消費(fèi):
@Slf4j
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
// 顯式配置消費(fèi)者手動(dòng)提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("topic-01"));
Map<TopicPartition,Long> timestampToSearch = new HashMap<>();
Set<TopicPartition> topicPartitionSet = consumer.assignment();
// 查詢(xún)的分區(qū)以及查詢(xún)的時(shí)間戳
for (TopicPartition topicPartition : topicPartitionSet) {
timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
}
// 獲取時(shí)間戳大于等于待查詢(xún)時(shí)間的第一條消息對(duì)應(yīng)的位置和時(shí)間戳
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition topicPartition : topicPartitionSet) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
// seek 方法重置消費(fèi)的位移
if(offsetAndTimestamp != null){
consumer.seek(topicPartition,offsetAndTimestamp.offset());
}
}
}
}
⑥ 位移越界也會(huì)觸發(fā) auto.offset.reset 參數(shù)的執(zhí)行,位移越界是指知道消費(fèi)位置卻無(wú)法在實(shí)際的分區(qū)中查找到,比如原本拉取位置為101(fetch offset 101),但已經(jīng)越界了(out of range),所以此時(shí)會(huì)根據(jù) auto.offset.reset 參數(shù)的默認(rèn)值來(lái)將拉取位置重置(resetting offset)為100,我們也能知道此時(shí)分區(qū)中最大的消息 offset 為99。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-657359.html
6. 如何優(yōu)雅地退出輪詢(xún)循環(huán)消費(fèi)?
如何優(yōu)雅地退出輪詢(xún)循環(huán),如果你確定馬上要關(guān)閉消費(fèi)者(即使消費(fèi)者還在等待一個(gè)poll()返回),那么可以在另一個(gè)線程中調(diào)用consumer.wakeup()。如果輪詢(xún)循環(huán)運(yùn)行在主線程中,那么可以在ShutdownHook里調(diào)用這個(gè)方法。需要注意的是,consumer.wakeup() 是消費(fèi)者唯一一個(gè)可以在其他線程中安全調(diào)用的方法。調(diào)用 consumer.wakeup() 會(huì)導(dǎo)致poll()拋出WakeupException,如果調(diào)用 consumer.wakeup() 時(shí)線程沒(méi)有在輪詢(xún),那么異常將在下一次調(diào)用 poll() 時(shí)拋出。不一定要處理WakeupException,但在退出線程之前必須調(diào)用consumer.close() 。消費(fèi)者在被關(guān)閉時(shí)會(huì)提交還沒(méi)有提交的偏移量,并向消費(fèi)者協(xié)調(diào)器發(fā)送消息,告知自己正在離開(kāi)群組。協(xié)調(diào)器會(huì)立即觸發(fā)再均衡,被關(guān)閉的消費(fèi)者所擁有的分區(qū)將被重新分配給群組里其他的消費(fèi)者,不需要等待會(huì)話(huà)超時(shí)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-657359.html
到了這里,關(guān)于分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!