1.業(yè)務(wù)需求
在springboot項(xiàng)目中,使用spring-kafka消費(fèi)kafka數(shù)據(jù)。希望能夠控制消費(fèi)者(KafkaConsumer)啟動(dòng)或停止消費(fèi),并且在啟動(dòng)消費(fèi)時(shí)只消費(fèi)當(dāng)前時(shí)刻以后生產(chǎn)的數(shù)據(jù)(最新生產(chǎn)的數(shù)據(jù)),也就是說,啟動(dòng)消費(fèi)之前未消費(fèi)的數(shù)據(jù)不再消費(fèi)。
2.實(shí)現(xiàn)
2.1.創(chuàng)建消費(fèi)監(jiān)聽
按照官方文檔創(chuàng)建一個(gè)監(jiān)聽。
官方文檔地址
KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
2.2.控制啟動(dòng)/停止消費(fèi)
通過KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可達(dá)到控制目的。
創(chuàng)建一個(gè)Kafak控制類,實(shí)現(xiàn)控制代碼。
KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 開始消費(fèi)
*/
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer開始消費(fèi)");
}
/**
* 停止消費(fèi)
*/
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
listenerContainer.pause();
log.info("kafka consumer停止消費(fèi)");
}
}
這樣即可通過KafkaCtrlHandler 實(shí)例來控制消費(fèi)者開始或者暫停監(jiān)聽。
2.3.控制啟動(dòng)消費(fèi)時(shí)只消費(fèi)最新數(shù)據(jù)
讓KafkaConsumer類實(shí)現(xiàn)org.springframework.kafka.listener包下的ConsumerSeekAware接口,并實(shí)現(xiàn)onPartitionsAssigned方法。
監(jiān)聽創(chuàng)建時(shí),設(shè)置各個(gè)分區(qū)的偏移量。
具體原理待研究,有懂的大佬請(qǐng)留言指教。
新的KafkaConsumer.java文章來源地址http://www.zghlxwxcb.cn/news/detail-414049.html
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
注意,修改上面的kafka控制類KafkaCtrlHandler.java,停止消費(fèi)時(shí)讓監(jiān)聽器停止(stop)而非暫停(pause)。這樣監(jiān)聽才會(huì)重新創(chuàng)建并設(shè)置各分區(qū)的偏移量。
新KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 開始消費(fèi)
*/
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer開始消費(fèi)");
}
/**
* 停止消費(fèi)
*/
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
// !?。?!這里變了!?。?!
listenerContainer.stop();
log.info("kafka consumer停止消費(fèi)");
}
}
2.4.設(shè)置springboot 啟動(dòng)時(shí)消費(fèi)者監(jiān)聽不自動(dòng)啟動(dòng)
創(chuàng)建配置類
KafkaInitialConfiguration.java
@Slf4j
@Configuration
public class KafkaInitialConfiguration {
// 監(jiān)聽器工廠
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> customContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//設(shè)置是否自動(dòng)啟動(dòng)
factory.setAutoStartup(false);
return factory;
}
}
配置監(jiān)聽工廠文章來源:http://www.zghlxwxcb.cn/news/detail-414049.html
新的KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id",
// !?。∵@里變了?。。。?!
containerFactory = "customContainerFactory"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
到了這里,關(guān)于springboot kafka消費(fèi)者啟動(dòng)/停止監(jiān)聽控制,啟動(dòng)時(shí)只消費(fèi)此時(shí)之后的數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!