@KafkaListener(id = "eventConsumer", topics = "perception_event", groupId = "defaultConsumerGroup", containerFactory = "kafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> consumerRecordList) {
.......
}
1.kafka批量消費(fèi)消息,使用containerFactory 監(jiān)聽消費(fèi)失敗消息?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-515022.html
/**
* 消費(fèi)失敗消息最大重試15次,存入到死信隊(duì)列中
*
* @param configurer kafkaConsumerFactory kafkaTemplate
* @return factory
*/
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//最大重試15次
RetryingBatchErrorHandler retryingBatchErrorHandler = new RetryingBatchErrorHandler(new FixedBackOff(500L, 15L),
createConsumerRecordRecoverer());
factory.setBatchErrorHandler(retryingBatchErrorHandler);
return factory;
}
/**
* 最終消費(fèi)失敗打印日志即可
*/
private ConsumerRecordRecoverer createConsumerRecordRecoverer() {
return (consumerRecord, exception) -> {
log.error("consumer event last fail:{}, exception:{}", SensitiveUtils.phone(consumerRecord.toString()), exception.toString());
};
}
2.使用RetryingBatchErrorHandler 指定批量消費(fèi)時(shí)失敗消息的重試次數(shù)和時(shí)間,如果不是批量消費(fèi),則使用RetryingErrorHandler來(lái)指定重試次數(shù)和間隔時(shí)間文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-515022.html
到了這里,關(guān)于kafka消費(fèi)失敗重試機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!