国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

主題配置和 KafkaTemplate 的使用

這篇具有很好參考價值的文章主要介紹了主題配置和 KafkaTemplate 的使用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

一、主題

1.1、配置主題

  • 在應(yīng)用程序上下文定義一個 KafkaAdmin Bean, 它可以自動將主題添加到代理。通過這個Bean可以將
    每一個新建的主題 Topic 添加到應(yīng)用程序上下文中。下面是一個簡單的示例:

也可以創(chuàng)建 TopicBuilder 類,使用它創(chuàng)建 Bean 更加簡單。

@Bean
public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(configs);
        }

@Bean
public KafkaAdmin.NewTopics topics456() {
        return new NewTopics(
        TopicBuilder.name("defaultBoth")
        .build(),
        TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
        TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build());
        }

使用 Spring Boot 時,KafkaAdminbean 會自動注冊
默認(rèn)情況下,代理不可用時會記錄一條消息,然后上下文會繼續(xù)加載。可以以編程方式調(diào)用Admin的initialize()方法以稍后重試。
也可將 admin 的fatalIfBrokerNotAvailable屬性設(shè)置為true。然后上下文無法初始化。

1.2、在運(yùn)行時檢查和創(chuàng)建主題

目前有兩種方法來進(jìn)行操作:

  • createOrModifyTopics
  • describeTopics
    或者使用 AdminClient 來直接使用:
@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

二、消息發(fā)送

2.1、使用 KafkaTemplate

2.1.1、KafkaTemplate 介紹

KafkaTemplate 包裝了生產(chǎn)者并提供了將數(shù)據(jù)發(fā)送到 Kafka 主題的便捷方法。

2.1.2、配置 KafkaTemplate

要使用模板,需要配置生產(chǎn)者工廠并在模板的構(gòu)造函數(shù)中提供。

  • 單個生產(chǎn)者配置
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}
  • 多個生產(chǎn)者配置

    使用來自同一工廠的不同生產(chǎn)者配置創(chuàng)建模板,需要覆蓋工廠的ProducerConfig屬性。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

然后調(diào)用 KafkaTemplate 的方法來使用它。

  • 異步消息發(fā)布示例
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new KafkaSendCallback<SendResult<Integer, String>>() {

        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            handleSuccess(data);
        }

        @Override
        public void onFailure(KafkaProducerException ex) {
            handleFailure(data, record, ex);
        }

    });
}
  • 阻塞發(fā)布示例
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

ExecutionException 在于 KafkaProducerException 屬性 failedProducerRecord 中

2.1.3、發(fā)布結(jié)果查看

  • 異步

發(fā)布成功還是失敗可以向偵聽器注冊回調(diào)以異步接收發(fā)送結(jié)果:

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
        future.addCallback(new KafkaSendCallback<Integer, String>() {

@Override
public void onSuccess(SendResult<Integer, String> result) {
        ...
        }

@Override
public void onFailure(KafkaProducerException ex) {
        ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
        ...
        }

        });

或者使用 lambda:

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
        ...
    }, (KafkaFailureCallback<Integer, String>) ex -> {
            ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
            ...
    });
  • 同步

阻塞發(fā)送線程等待結(jié)果需要調(diào)用 future 的 get()方法,可以使用帶超時的 get() 方法。文章來源地址http://www.zghlxwxcb.cn/news/detail-701683.html

到了這里,關(guān)于主題配置和 KafkaTemplate 的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 多個消費(fèi)者訂閱一個Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    記錄 :465 場景 :一個Producer在一個Topic發(fā)布消息,多個消費(fèi)者Consumer訂閱Kafka的Topic。每個Consumer指定一個特定的ConsumerGroup,達(dá)到一條消息被多個不同的ConsumerGroup消費(fèi)。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安裝 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    瀏覽(22)
  • Spring for Apache Kafka Deep Dive – Part 2: Apache Kafka and Spring Cloud Stream

    Spring for Apache Kafka Deep Dive – Part 2: Apache Kafka and Spring Cloud Stream

    On the heels of part 1 in this blog series,?Spring for Apache Kafka – Part 1: Error Handling, Message Conversion and Transaction Support, here in part 2 we’ll focus on another project that enhances the developer experience when building streaming applications on Kafka: Spring Cloud Stream. We will cover the following in this post: Overview of Spring Clo

    2024年02月19日
    瀏覽(50)
  • flink如何監(jiān)聽kafka主題配置變更

    flink如何監(jiān)聽kafka主題配置變更

    從前一篇文章我們知道flink消費(fèi)kafka主題時是采用的手動assign指定分區(qū)的方式,這種消費(fèi)方式是不處理主題的rebalance操作的,也就是消費(fèi)者組中即使有消費(fèi)者退出或者進(jìn)入也是不會觸發(fā)消費(fèi)者所消費(fèi)的分區(qū)的,那么疑問就來了,那是否比如kafka主題分區(qū)變多,或者新增了滿足

    2024年02月14日
    瀏覽(16)
  • 【項目實戰(zhàn)】SpringBoot整合Kafka消息隊列(基于KafkaTemplate和@KafkaListener實現(xiàn))

    【項目實戰(zhàn)】SpringBoot整合Kafka消息隊列(基于KafkaTemplate和@KafkaListener實現(xiàn))

    Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。 它最初由LinkedIn公司開發(fā),之后成為Apache項目的一部分。 Kafka是一種快速、可擴(kuò)展的、設(shè)計內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。 Apache Kafka與傳統(tǒng)消息系統(tǒng)相比,有以下不同: 它將消息持久化到磁盤,因此可用于批量消

    2023年04月09日
    瀏覽(26)
  • Spring for Apache Kafka概述和簡單入門

    Spring for Apache Kafka 的高級概述以及底層概念和可運(yùn)行的示例代碼。 注意:進(jìn)行工作開始之前至少要有一個 Apache Kafka 環(huán)境 使用 Spring Boot 使用 Spring Boot 時,省略版本,Boot 將自動引入與您的 Boot 版本兼容的正確版本 使用 Spring 使用Spring 時必須要申明使用的版本。 Apache Kafka 客戶

    2024年02月09日
    瀏覽(30)
  • Kafka【問題 02】KafkaTemplate 報錯 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 問題解決

    主要的報錯信息: Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 和 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 報錯詳情如下: 配置信息: 很顯然是配置文件沒有生效 ?? 原來是一個很高階( diji )的錯誤 ?? kafka的配置應(yīng)該是在sprin

    2024年02月16日
    瀏覽(19)
  • Windows上安裝和配置Apache Kafka

    Windows上安裝和配置Apache Kafka

    首先,讓我們從Apache Kafka的官方網(wǎng)站下載最新的二進(jìn)制發(fā)行版。您可以在以下網(wǎng)址找到下載鏈接:Apache Kafka 選擇適用于Windows的版本并下載壓縮文件。一旦下載完成,將文件解壓到您選擇的目錄中。 接下來,您需要進(jìn)行一些配置,以確保Kafka在Windows上正常運(yùn)行。 2.1 配置Kafk

    2024年02月09日
    瀏覽(17)
  • 在Windows上搭建Kafka環(huán)境的步驟,包括安裝Java、下載Kafka、配置Zookeeper和Kafka、啟動Zookeeper和Kafka、創(chuàng)建主題和生產(chǎn)者/消費(fèi)者等

    1. 安裝Java Kafka需要Java環(huán)境支持。可以從Oracle官網(wǎng)下載JDK,或者使用OpenJDK。 2. 下載Kafka 可以從Kafka官網(wǎng)下載Kafka二進(jìn)制壓縮包。解壓后可以看到bin、config、libs等目錄。 3. 配置Zookeeper Kafka依賴Zookeeper實現(xiàn)分布式協(xié)作??梢允褂肒afka自帶的Zookeeper,也可以獨(dú)立安裝Zookeeper。 如果使

    2024年02月11日
    瀏覽(22)
  • redis的配置和使用、redis的數(shù)據(jù)結(jié)構(gòu)以及緩存遇見的常見問題

    redis的配置和使用、redis的數(shù)據(jù)結(jié)構(gòu)以及緩存遇見的常見問題

    目錄 1.緩存 2.redis不僅僅可以做緩存,只不過說他的大部分場景,是做緩存。本地緩存重啟后緩存里的東西就沒有了,但是redis有。 3.redis有幾個特性:查詢快,但是是放到內(nèi)存里的〈斷電或者重啟,數(shù)據(jù)就丟了),所以他有特定的持久化機(jī)制 4.服務(wù)器(centos)安裝redis 5.?redis在

    2024年02月14日
    瀏覽(35)
  • Spring Boot與Apache Kafka實現(xiàn)高吞吐量消息處理:解決大規(guī)模數(shù)據(jù)處理問題

    現(xiàn)代數(shù)據(jù)量越來越龐大對數(shù)據(jù)處理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息隊列之一。Spring Boot是現(xiàn)代Java應(yīng)用程序快速開發(fā)的首選框架。綜合使用Spring Boot和Apache Kafka可以實現(xiàn)高吞吐量消息處理。 Apache Kafka采用分布式發(fā)布-訂閱模式具有高度的可擴(kuò)展性和可

    2024年02月05日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包