国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot kafka消費(fèi)者啟動(dòng)/停止監(jiān)聽控制,啟動(dòng)時(shí)只消費(fèi)此時(shí)之后的數(shù)據(jù)

這篇具有很好參考價(jià)值的文章主要介紹了springboot kafka消費(fèi)者啟動(dòng)/停止監(jiān)聽控制,啟動(dòng)時(shí)只消費(fèi)此時(shí)之后的數(shù)據(jù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1.業(yè)務(wù)需求

在springboot項(xiàng)目中,使用spring-kafka消費(fèi)kafka數(shù)據(jù)。希望能夠控制消費(fèi)者(KafkaConsumer)啟動(dòng)或停止消費(fèi),并且在啟動(dòng)消費(fèi)時(shí)只消費(fèi)當(dāng)前時(shí)刻以后生產(chǎn)的數(shù)據(jù)(最新生產(chǎn)的數(shù)據(jù)),也就是說,啟動(dòng)消費(fèi)之前未消費(fèi)的數(shù)據(jù)不再消費(fèi)。

2.實(shí)現(xiàn)

2.1.創(chuàng)建消費(fèi)監(jiān)聽

按照官方文檔創(chuàng)建一個(gè)監(jiān)聽。
官方文檔地址

KafkaConsumer.java

@Slf4j
@Component
public class KafkaConsumer {
	@KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

2.2.控制啟動(dòng)/停止消費(fèi)

通過KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可達(dá)到控制目的。
創(chuàng)建一個(gè)Kafak控制類,實(shí)現(xiàn)控制代碼。

KafkaCtrlHandler.java

@Slf4j
@Component
public class KafkaCtrlHandler {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 開始消費(fèi)
     */
    public void start() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        if (!listenerContainer.isRunning()) {
            listenerContainer.start();
        }
        listenerContainer.resume();
        log.info("kafka consumer開始消費(fèi)");
    }

    /**
     * 停止消費(fèi)
     */
    public void stop() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        listenerContainer.pause();
        log.info("kafka consumer停止消費(fèi)");
    }
}

這樣即可通過KafkaCtrlHandler 實(shí)例來控制消費(fèi)者開始或者暫停監(jiān)聽。

2.3.控制啟動(dòng)消費(fèi)時(shí)只消費(fèi)最新數(shù)據(jù)

讓KafkaConsumer類實(shí)現(xiàn)org.springframework.kafka.listener包下的ConsumerSeekAware接口,并實(shí)現(xiàn)onPartitionsAssigned方法。
監(jiān)聽創(chuàng)建時(shí),設(shè)置各個(gè)分區(qū)的偏移量。
具體原理待研究,有懂的大佬請(qǐng)留言指教。

新的KafkaConsumer.java文章來源地址http://www.zghlxwxcb.cn/news/detail-414049.html

@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
                                     @NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
        			 topicPartition.partition()));
    }
    
    @KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

注意,修改上面的kafka控制類KafkaCtrlHandler.java,停止消費(fèi)時(shí)讓監(jiān)聽器停止(stop)而非暫停(pause)。這樣監(jiān)聽才會(huì)重新創(chuàng)建并設(shè)置各分區(qū)的偏移量。

新KafkaCtrlHandler.java

@Slf4j
@Component
public class KafkaCtrlHandler {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 開始消費(fèi)
     */
    public void start() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        if (!listenerContainer.isRunning()) {
            listenerContainer.start();
        }
        listenerContainer.resume();
        log.info("kafka consumer開始消費(fèi)");
    }

    /**
     * 停止消費(fèi)
     */
    public void stop() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        // !?。?!這里變了!?。?!
        listenerContainer.stop();
        log.info("kafka consumer停止消費(fèi)");
    }
}

2.4.設(shè)置springboot 啟動(dòng)時(shí)消費(fèi)者監(jiān)聽不自動(dòng)啟動(dòng)

創(chuàng)建配置類

KafkaInitialConfiguration.java

@Slf4j
@Configuration
public class KafkaInitialConfiguration {
    // 監(jiān)聽器工廠
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> customContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //設(shè)置是否自動(dòng)啟動(dòng)
        factory.setAutoStartup(false);
        return factory;
    }
}

配置監(jiān)聽工廠

新的KafkaConsumer.java

@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
                                     @NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
        			 topicPartition.partition()));
    }
    
    @KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id",
            // !?。∵@里變了?。。。?!
            containerFactory = "customContainerFactory"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

到了這里,關(guān)于springboot kafka消費(fèi)者啟動(dòng)/停止監(jiān)聽控制,啟動(dòng)時(shí)只消費(fèi)此時(shí)之后的數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包