自定義kafka客戶端消費topic
結(jié)論
使用自定義的KafkaConsumer給spring進(jìn)行管理,之后在注入topic的set方法中,開單線程主動訂閱和讀取該topic的消息。
1 背景
后端服務(wù)不需要啟動時就開始監(jiān)聽消費,而是根據(jù)啟動的模塊或者用戶自定義監(jiān)聽需要監(jiān)聽或者停止的topic
2 spring集成2.1.8.RELEASE版本不支持autoStartup屬性
使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒有找到可以直接配置屬性autoStartup = "false"來手動啟動topic,可能是版本低的原因,如果有可以支持的版本,也可以打在評論區(qū),我去驗證一下。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
@KafkaListener(topics = "<Kafka主題>", autoStartup = "false")
public void receive(String message) {
// 處理接收到的消息
}
3 自定義kafka客戶端消費topic
3.1 yml配置
spring:
kafka:
bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092
consumer:
group-id: data-dev
enable-auto-commit: true
auto-offset-reset: latest
auto-commit-interval: 1000
topic:
costomTopic: costomData
3.2 KafkaConfig客戶端配置
kafka其他配置項和原有的kafka客戶端配置一樣,只有額外增加了一個cutomConsumer讓spring來管理,方便手動啟動客戶端來使用
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
// @Value("${spring.kafka.listener.concurrency}")
// private Integer concurrency;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// concurrency
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Bean
public KafkaConsumer cutomConsumer() {
// 新建一個自定義啟動消費者
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());
return consumer;
}
}
3.3 手動啟動消費客戶端
這里手動啟動消費客戶端只有在配置了costomTopic才開始啟動,如果需要動態(tài)指定啟停topic文章來源:http://www.zghlxwxcb.cn/news/detail-784786.html
@Component
public class CutomKafkaConsumer {
// 使用cutomConsumer實例消費
@Autowired
private KafkaConsumer cutomConsumer;
@Value("${spring.kafka.topic.costomTopic:}")
public void setCostomTopic(String costomTopic) {
// 手動啟動消費類,防止下級模塊默認(rèn)不配置costomTopic導(dǎo)致啟動報錯
if (StringUtils.isEmpty(costomTopic)) {
return;
}
// 使這個消費者訂閱對應(yīng)話題
cutomConsumer.subscribe(Collections.singleton(costomTopic));
// 單線程拉取消息
ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
consumerExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = cutomConsumer.poll(3000);
if (!records.iterator().hasNext()) {
continue;
}
try {
// 捕獲異常,防止頂級消費循環(huán)被異常中斷
records.forEach(record -> operate(record));
} catch (Exception e) {
log.error("消費數(shù)據(jù)失敗,失敗原因: {}", e.getMessage(), e);
}
// 通過異步的方式提交位移
cutomConsumer.commitAsync(((offsets, exception) -> {
if (exception == null) {
offsets.forEach((topicPartition, metadata) -> {
System.out.println(topicPartition + " -> offset=" + metadata.offset());
});
} else {
exception.printStackTrace();
// 如果出錯了,同步提交位移
cutomConsumer.commitSync(offsets);
}
}));
}
}
});
}
}
public void operate(ConsumerRecord<String, String> record) {
log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
}
參考:
Kafka消費者——API開發(fā)
Kafka Consumer如何實現(xiàn)精確一次消費數(shù)據(jù)
Apache Kafka - 靈活控制Kafka消費_動態(tài)開啟/關(guān)閉監(jiān)聽實現(xiàn)
@KafkaListener 詳解及消息消費啟??刂?br>kafka多個消費者消費一個topic_kafka消費者組與重平衡機(jī)制,了解一下
kafka學(xué)習(xí)(五):消費者分區(qū)策略(再平衡機(jī)制)
Kafka 3.0 源碼筆記(3)-Kafka 消費者的核心流程源碼分析文章來源地址http://www.zghlxwxcb.cn/news/detail-784786.html
到了這里,關(guān)于自定義kafka客戶端消費topic的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!