国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

自定義kafka客戶端消費topic

這篇具有很好參考價值的文章主要介紹了自定義kafka客戶端消費topic。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

自定義kafka客戶端消費topic

結(jié)論

使用自定義的KafkaConsumer給spring進(jìn)行管理,之后在注入topic的set方法中,開單線程主動訂閱和讀取該topic的消息。

1 背景

后端服務(wù)不需要啟動時就開始監(jiān)聽消費,而是根據(jù)啟動的模塊或者用戶自定義監(jiān)聽需要監(jiān)聽或者停止的topic

2 spring集成2.1.8.RELEASE版本不支持autoStartup屬性

使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒有找到可以直接配置屬性autoStartup = "false"來手動啟動topic,可能是版本低的原因,如果有可以支持的版本,也可以打在評論區(qū),我去驗證一下。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.8.RELEASE</version>
</dependency>
@KafkaListener(topics = "<Kafka主題>", autoStartup = "false") 
public void receive(String message) {    
	// 處理接收到的消息 
}

3 自定義kafka客戶端消費topic

3.1 yml配置

spring:
  kafka:
      bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092
      consumer:
        group-id: data-dev
        enable-auto-commit: true
        auto-offset-reset: latest
        auto-commit-interval: 1000
      topic:
        costomTopic: costomData

3.2 KafkaConfig客戶端配置

kafka其他配置項和原有的kafka客戶端配置一樣,只有額外增加了一個cutomConsumer讓spring來管理,方便手動啟動客戶端來使用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    //    @Value("${spring.kafka.listener.concurrency}")
//    private Integer concurrency;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // concurrency
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    @Bean
    public KafkaConsumer cutomConsumer() {
        // 新建一個自定義啟動消費者
        KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());
        return consumer;
    }
}

3.3 手動啟動消費客戶端

這里手動啟動消費客戶端只有在配置了costomTopic才開始啟動,如果需要動態(tài)指定啟停topic

@Component
public class CutomKafkaConsumer {

    // 使用cutomConsumer實例消費
    @Autowired
    private KafkaConsumer cutomConsumer;

    @Value("${spring.kafka.topic.costomTopic:}")
    public void setCostomTopic(String costomTopic) {
        // 手動啟動消費類,防止下級模塊默認(rèn)不配置costomTopic導(dǎo)致啟動報錯
        if (StringUtils.isEmpty(costomTopic)) {
            return;
        }
        // 使這個消費者訂閱對應(yīng)話題
        cutomConsumer.subscribe(Collections.singleton(costomTopic));
        // 單線程拉取消息
        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
        consumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    ConsumerRecords<String, String> records = cutomConsumer.poll(3000);
                    if (!records.iterator().hasNext()) {
                        continue;
                    }
                    try {
                        // 捕獲異常,防止頂級消費循環(huán)被異常中斷
                        records.forEach(record -> operate(record));
                    } catch (Exception e) {
                        log.error("消費數(shù)據(jù)失敗,失敗原因: {}", e.getMessage(), e);
                    }
                    // 通過異步的方式提交位移
                    cutomConsumer.commitAsync(((offsets, exception) -> {
                        if (exception == null) {
                            offsets.forEach((topicPartition, metadata) -> {
                                System.out.println(topicPartition + " -> offset=" + metadata.offset());
                            });
                        } else {
                            exception.printStackTrace();
                            // 如果出錯了,同步提交位移
                            cutomConsumer.commitSync(offsets);
                        }
                    }));
                }
            }
        });
    }
}    

public void operate(ConsumerRecord<String, String> record) {
    log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
}

參考:
Kafka消費者——API開發(fā)
Kafka Consumer如何實現(xiàn)精確一次消費數(shù)據(jù)
Apache Kafka - 靈活控制Kafka消費_動態(tài)開啟/關(guān)閉監(jiān)聽實現(xiàn)
@KafkaListener 詳解及消息消費啟??刂?br>kafka多個消費者消費一個topic_kafka消費者組與重平衡機(jī)制,了解一下
kafka學(xué)習(xí)(五):消費者分區(qū)策略(再平衡機(jī)制)
Kafka 3.0 源碼筆記(3)-Kafka 消費者的核心流程源碼分析文章來源地址http://www.zghlxwxcb.cn/news/detail-784786.html

到了這里,關(guān)于自定義kafka客戶端消費topic的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka-客戶端使用

    Kafka-客戶端使用

    Kafka提供了兩套客戶端API,HighLevel API和LowLevel API。 HighLevel API 封裝了kafka的運行細(xì)節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 LowLevel API則需要客戶端自己管理Kafka的運行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。這層API功能更靈活,但是使用起來

    2024年02月22日
    瀏覽(19)
  • SpringCloud系列篇:核心組件之聲明式HTTP客戶端組件【遠(yuǎn)程消費】

    SpringCloud系列篇:核心組件之聲明式HTTP客戶端組件【遠(yuǎn)程消費】

    接下來看看由輝輝所寫的關(guān)于SpringCloud的相關(guān)操作吧 目錄 ????Welcome Huihui\\\'s Code World ! !???? 一.?遠(yuǎn)程消費組件是什么 二.?遠(yuǎn)程消費組件的詳解 場景模擬 代碼實操 1.生產(chǎn)者 2.消費者 3.復(fù)雜參數(shù)的處理 DTO 屬性賦值 ???????? 聲明式HTTP客戶端組件是一種用于簡化HTTP請求的

    2024年02月02日
    瀏覽(21)
  • kafka 02——三個重要的kafka客戶端

    kafka 02——三個重要的kafka客戶端

    請參考下面的文章: Kafka 01——Kafka的安裝及簡單入門使用. AdminClient API: 允許管理和檢測Topic、Broker以及其他Kafka對象。 Producer API: 發(fā)布消息到一個或多個API。 Consumer API: 訂閱一個或多個Topic,并處理產(chǎn)生的消息。 如下: 完整的pom 關(guān)于配置,可參考官網(wǎng): https://kafka.apa

    2024年02月13日
    瀏覽(28)
  • kafka客戶端應(yīng)用參數(shù)詳解

    kafka客戶端應(yīng)用參數(shù)詳解

    Kafka提供了非常簡單的客戶端API。只需要引入一個Maven依賴即可: 1、消息發(fā)送者主流程? 然后可以使用Kafka提供的Producer類,快速發(fā)送消息。 ? 整體來說,構(gòu)建Producer分為三個步驟: 設(shè)置Producer核心屬性 ?:Producer可選的屬性都可以由ProducerConfig類管理。比如ProducerConfig.BOOTST

    2024年02月07日
    瀏覽(26)
  • kafka客戶端工具(Kafka Tool)的安裝

    kafka客戶端工具(Kafka Tool)的安裝

    官方下載 根據(jù)不同的系統(tǒng)下載對應(yīng)的版本,點擊下載后雙擊,如何一直下一步,安裝 kafka環(huán)境搭建請參考:CentOS 搭建Kafka集群 (1)連接kafka (2)簡單使用 ?

    2024年04月23日
    瀏覽(36)
  • kafka之java客戶端實戰(zhàn)

    kafka之java客戶端實戰(zhàn)

    ????????Kafka提供了兩套客戶端API, HighLevel API和LowLevel API 。 HighLevel API封裝了kafka的運行細(xì)節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 而LowLevel API則需要客戶端自己管理Kafka的運行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。這層API功能更靈活,

    2024年01月17日
    瀏覽(22)
  • python-kafka客戶端封裝

    本文對python的kafka包做簡單封裝,方便kafka初學(xué)者使用。包安裝: kafka_helper.py kafka_test.py Kafka入門,這一篇就夠了(安裝,topic,生產(chǎn)者,消費者)

    2024年02月09日
    瀏覽(20)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導(dǎo)致kafak版本與spring版本不兼容,這個時候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • c#客戶端Kafka的使用方法

    c#客戶端Kafka的使用方法

    Apache Kafka是一個分布式流處理平臺,最初由LinkedIn開發(fā),現(xiàn)在是Apache軟件基金會的頂級項目之一。Kafka能夠處理大規(guī)模的實時數(shù)據(jù)流,支持高可靠性、高可擴(kuò)展性、低延遲和高吞吐量。它主要用于構(gòu)建實時數(shù)據(jù)管道和流式處理應(yīng)用程序。 Kafka的核心概念包括:Producer(生產(chǎn)者)

    2024年02月12日
    瀏覽(22)
  • Kafka客戶端程序無法連接到Kafka集群的解決方法

    Kafka是一個高性能、分布式的流式數(shù)據(jù)平臺,廣泛用于構(gòu)建實時數(shù)據(jù)流處理應(yīng)用程序。然而,有時候我們可能會遇到Kafka客戶端程序無法連接到Kafka集群的問題。在本文中,我將介紹一些可能導(dǎo)致連接問題的常見原因,并提供相應(yīng)的解決方案。 網(wǎng)絡(luò)配置問題 首先,確保Kafka集群

    2024年01月21日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包