一、Kafka消費(fèi)者提交Offset的策略
Kafka消費(fèi)者提交Offset的策略有
- 自動(dòng)提交Offset:
- 消費(fèi)者將消息拉取下來(lái)以后未被消費(fèi)者消費(fèi)前,直接自動(dòng)提交offset。
- 自動(dòng)提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來(lái)以后,消費(fèi)者掛了
- 手動(dòng)提交Offset
- 消費(fèi)者在消費(fèi)消息時(shí)/后,再提交offset,在消費(fèi)者中實(shí)現(xiàn)
- 手動(dòng)提交Offset分為:手動(dòng)同步提交(commitSync)、手動(dòng)異步提交(commitAsync)
- 什么是Offset
- 參考文章:Linux:【Kafka三】組件介紹
二、自動(dòng)提交策略
? ? ? ? Kafka消費(fèi)者默認(rèn)是自動(dòng)提交Offset的策略
? ? ? ? 可設(shè)置自動(dòng)提交的時(shí)間間隔
package com.demo.lxb.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @Description: kafka消費(fèi)者消費(fèi)消息,自動(dòng)提交offset
* @Author: lvxiaobu
* @Date: 2023-10-24 16:26
**/
public class MyConsumerAutoSubmitOffset {
private final static String CONSUMER_GROUP_NAME = "GROUP1";
private final static String TOPIC_NAME = "topic0921";
public static void main(String[] args) {
Properties props = new Properties();
// 一、設(shè)置參數(shù)
// 配置kafka地址
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
// "192.168.151.28:9092"); // 單機(jī)配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
// 配置消息 鍵值的序列化規(guī)則
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 配置消費(fèi)者組
props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
// 設(shè)置消費(fèi)者offset的提交方式
// 自動(dòng)提交:默認(rèn)配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 自動(dòng)提交offset的時(shí)間間隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
// 二、創(chuàng)建消費(fèi)者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
// 三、消費(fèi)者訂閱主題
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 四、拉取消息,開(kāi)始消費(fèi)
while (true){
// 從kafka集群中拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 消費(fèi)消息,當(dāng)前是自動(dòng)提交模式,在消息上一行消息被拉取下來(lái)以后,offset就自動(dòng)被提交了,下面的代碼如果出錯(cuò),或者此時(shí)
// 消費(fèi)者掛掉了,那么消費(fèi)其實(shí)是沒(méi)有進(jìn)行消費(fèi)的(也就是業(yè)務(wù)邏輯處理)
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
+ ", key值: " + record.key() + " , value值: "+record.value());
}
}
}
}
上述代碼中的如下代碼是自動(dòng)提交策略的相關(guān)設(shè)置?
// 設(shè)置消費(fèi)者offset的提交方式
// 自動(dòng)提交:默認(rèn)配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 自動(dòng)提交offset的時(shí)間間隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
三、手動(dòng)提交策略
3.1、手動(dòng)同步提交策略
????????手動(dòng)同步提交,會(huì)在提交offset處阻塞。當(dāng)消費(fèi)者接收到 kafka集群返回的消費(fèi)者提交offset成功的ack后,才開(kāi)始執(zhí)行消費(fèi)者中后續(xù)的代碼。
? ? ? ? 因?yàn)槭褂卯惒教峤蝗菀讈G失消息,固一般使用同步提交,在同步提交后不要再做其他邏輯處理。
package com.demo.lxb.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @Description: kafka消費(fèi)者消費(fèi)消息,手動(dòng)同步提交offset
* @Author: lvxiaobu
* @Date: 2023-10-24 16:26
**/
public class MyConsumerMauSubmitOffset {
private final static String CONSUMER_GROUP_NAME = "GROUP1";
private final static String TOPIC_NAME = "topic0921";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
// 關(guān)鍵代碼:關(guān)閉自動(dòng)提交
// 手動(dòng)提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// 自動(dòng)提交offset的時(shí)間間隔:此時(shí)不再需要設(shè)置該值
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
+ ", key值: " + record.key() + " , value值: "+record.value());
}
// 關(guān)鍵代碼:commitSync():同步提交方法
// 同步方式提交,此時(shí)會(huì)產(chǎn)生阻塞,當(dāng)kafka集群返回了提交成功的ack以后,才會(huì)消除阻塞,進(jìn)行后續(xù)的代碼邏輯。
// 一般使用同步提交,在同步提交后不再做其他邏輯處理
consumer.commitSync();
// do anything
}
}
}
3.2、手動(dòng)異步提交策略文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-715700.html
????????異步提交,不會(huì)在提交offset代碼處阻塞,即消費(fèi)者提交了offset后,不需要等待kafka集群返回的ack即可繼續(xù)執(zhí)行后續(xù)代碼。但是在提交offset時(shí)需要提供一個(gè)回調(diào)方法,供kafka集群回調(diào),來(lái)告訴消費(fèi)者提交offset的結(jié)果。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-715700.html
package com.demo.lxb.kafka;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
* @Description: kafka消費(fèi)者消費(fèi)消息,手動(dòng)異步提交offset
* @Author: lvxiaobu
* @Date: 2023-10-24 16:26
**/
public class MyConsumerMauSubmitOffset2 {
private final static String CONSUMER_GROUP_NAME = "GROUP1";
private final static String TOPIC_NAME = "topic0921";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
// 關(guān)鍵代碼:關(guān)閉自動(dòng)提交
// 手動(dòng)提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// 自動(dòng)提交offset的時(shí)間間隔:此時(shí)不再需要設(shè)置該值
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
+ ", key值: " + record.key() + " , value值: "+record.value());
}
// 關(guān)鍵代碼:commitAsync() 異步提交
// new OffsetCommitCallback是kafka集群會(huì)回調(diào)的方法,告訴消費(fèi)者提交offset的結(jié)果
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null){
// 可將提交失敗的消息記錄到日志
System.out.println("記錄提交offset失敗的消息到日志");
System.out.println("消費(fèi)者提交offset拋出異常:" + Arrays.toString(e.getStackTrace()));
System.out.println("消費(fèi)者提交offset異常的消息信息:" + JSONObject.toJSONString(map));
}
}
});
// 后續(xù)邏輯處理,不需要等到kafka集群返回了提交成功的ack以后才開(kāi)始處理。
//do anything
}
}
}
到了這里,關(guān)于Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!