国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用Kafka客戶端(spring-kafka)的Java API操作Kafka的Topic

這篇具有很好參考價值的文章主要介紹了使用Kafka客戶端(spring-kafka)的Java API操作Kafka的Topic。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

記錄:458

場景:在Spring Boot微服務(wù)集成Kafka客戶端spring-kafka-2.8.2操作Kafka的Topic的創(chuàng)建和刪除。

版本:JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。

Kafka安裝:https://blog.csdn.net/zhangbeizhen18/article/details/129071395

1.微服務(wù)中配置Kafka信息

1.1在pom.xml添加依賴

pom.xml文件:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.8.2</version>
</dependency>

解析:spring-kafka選擇一般是使用spring-boot集成的對應(yīng)版本。

請知悉:spring-kafka框架底層使用了原生的kafka-clients。本例對應(yīng)版本:3.0.0。

1.2在application.yml中配置Kafka信息

配置細節(jié)在官網(wǎng)的configuration:https://kafka.apache.org/documentation/

(1)application.yml配置內(nèi)容

spring:
  kafka:
    #kafka服務(wù)端的IP和端口,格式:(ip:port)
    bootstrap-servers: 192.168.19.203:29001
    #生產(chǎn)者
    producer:
      #客戶端發(fā)送服務(wù)端失敗的重試次數(shù)
      retries: 2
      #多個記錄被發(fā)送到同一個分區(qū)時,生產(chǎn)者將嘗試將記錄一起批處理成更少的請求.
      #此設(shè)置有助于提高客戶端和服務(wù)器的性能,配置控制默認批量大小(以字節(jié)為單位)
      batch-size: 16384
      #生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)數(shù)(以字節(jié)為單位)
      buffer-memory: 33554432
      #指定key使用的序列化類
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #指定value使用的序列化類
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #生產(chǎn)者producer要求leader節(jié)點在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化
      #acks=0,設(shè)置為0,則生產(chǎn)者producer將不會等待來自服務(wù)器的任何確認.該記錄將立即添加到套接字(socket)緩沖區(qū)并視為已發(fā)送.在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置(retries)將不會生效(因為客戶端通常不會知道任何故障),每條記錄返回的偏移量始終設(shè)置為-1.
      #acks=1,設(shè)置為1,leader節(jié)點會把記錄寫入本地日志,不需要等待所有follower節(jié)點完全確認就會立即應(yīng)答producer.在這種情況下,在follower節(jié)點復(fù)制前,leader節(jié)點確認記錄后立即失敗的話,記錄將會丟失.
      #acks=all,acks=-1,leader節(jié)點將等待所有同步復(fù)制副本完成再確認記錄,這保證了只要至少有一個同步復(fù)制副本存活,記錄就不會丟失.
      acks: -1
    consumer:
      #開啟consumer的偏移量(offset)自動提交到Kafka
      enable-auto-commit: true
      #consumer的偏移量(offset)自動提交的時間間隔,單位毫秒
      auto-commit-interval: 1000
      #在Kafka中沒有初始化偏移量或者當前偏移量不存在情況
      #earliest,在偏移量無效的情況下,自動重置為最早的偏移量
      #latest,在偏移量無效的情況下,自動重置為最新的偏移量
      #none,在偏移量無效的情況下,拋出異常.
      auto-offset-reset: latest
      #一次調(diào)用poll返回的最大記錄條數(shù)
      max-poll-records: 500
      #請求阻塞的最大時間(毫秒)
      fetch-max-wait: 500
      #請求應(yīng)答的最小字節(jié)數(shù)
      fetch-min-size: 1
      #心跳間隔時間(毫秒)
      heartbeat-interval: 3000
      #指定key使用的反序列化類
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #指定value使用的反序列化類
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(2)解析

配置類在spring boot自動注解包:spring-boot-autoconfigure-2.6.3.jar。

類:org.springframework.boot.autoconfigure.kafka.KafkaProperties。

使用@ConfigurationProperties注解使其生效,前綴是:spring.kafka。

1.3加載邏輯

Spring Boot微服務(wù)在啟動時,Spring Boot會讀取application.yml的配置信息,根據(jù)配置內(nèi)容在spring-boot-autoconfigure-2.6.3.jar找到KafkaProperties并注入到對應(yīng)屬性。Spring Boot微服務(wù)在啟動完成后,在Spring環(huán)境中就能取到KafkaProperties的配置信息。

Spring的spring-kafka框架將KafkaProperties配置信息注入到KafkaAdmin。

使用KafkaAdminClient創(chuàng)建AdminClient,再使用AdminClient操作Topic。

2.使用AdminClient創(chuàng)建Kafka的Topic

AdminClient全稱:org.apache.kafka.clients.admin.AdminClient

盡管集成spring-kafka,但是在操作Kafka的Topic方面,主要還是以kafka-clients的API為主。

(1)示例代碼

@RestController
@RequestMapping("/hub/example/topic")
@Slf4j
public class OperateKafkaTopicController {
  @Autowired
  private KafkaAdmin kafkaAdmin;
  private String topicName = "hub-topic-city-01";
  @GetMapping("/f01_1")
  public Object f01_1() {
    try {
        //1.獲取Kafka配置信息
        Map<String, Object> configs = kafkaAdmin.getConfigurationProperties();
        //2.創(chuàng)建客戶端AdminClient
        AdminClient adminClient = KafkaAdminClient.create(configs);
        //3.獲取Topic清單
        Set<String> topicSet = adminClient.listTopics().names().get();
        log.info("在Kafka已建Topic數(shù)量: {} ,清單:", topicSet.size());
        topicSet.forEach(System.out::println);
        //4.創(chuàng)建Topic
        if (!topicSet.contains(topicName)) {
            log.info("新建Topic: {}", topicName);
            // Topic名稱,分區(qū)Partition數(shù)目,復(fù)制因子(replication Factor)
            NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
            Collection<NewTopic> newTopics = Lists.newArrayList(newTopic);
            adminClient.createTopics(newTopics);
        }
    } catch (Exception e) {
        log.info("創(chuàng)建Topic異常.");
        e.printStackTrace();
    }
    return "創(chuàng)建成功";
  }
}

(2)解析代碼

注入spring-kafka框架的KafkaAdmin主要目的是獲取配置內(nèi)容。

操作Kafka的Topic需要先創(chuàng)建AdminClient,使用AdminClient的API創(chuàng)建Topic。

創(chuàng)建Topic一般只需指定Topic名稱,分區(qū)Partition數(shù)目,復(fù)制因子(replication Factor)就行。

3.使用AdminClient刪除Kafka的Topic

AdminClient全稱:org.apache.kafka.clients.admin.AdminClient

盡管集成spring-kafka,但是在操作Kafka的Topic方面,主要還是以kafka-clients的API為主。

(1)示例代碼

@RestController
@RequestMapping("/hub/example/topic")
@Slf4j
public class OperateKafkaTopicController {
  @Autowired
  private KafkaAdmin kafkaAdmin;
  private String topicName = "hub-topic-city-01";
  @GetMapping("/f01_2")
  public Object f01_2() {
      try {
          //1.獲取Kafka配置信息
          Map<String, Object> configs = kafkaAdmin.getConfigurationProperties();
          //2.創(chuàng)建客戶端AdminClient
          AdminClient adminClient = KafkaAdminClient.create(configs);
          //3.獲取Topic清單
          Set<String> topicSet = adminClient.listTopics().names().get();
          //4.刪除Topic
          if (topicSet.contains(topicName)) {
              log.info("刪除Topic: {}", topicName);
              Collection<String> topics = Lists.newArrayList(topicName);
              DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics);
              deleteTopicsResult.all().get();
          }
      } catch (Exception e) {
          log.info("刪除Topic異常.");
          e.printStackTrace();
      }
      return "刪除成功";
  }
}

(2)解析代碼

注入spring-kafka框架的KafkaAdmin主要目的是獲取配置內(nèi)容。

操作Kafka的Topic需要先創(chuàng)建AdminClient,使用AdminClient的API刪除Topic。

創(chuàng)建Topic一般只需指定Topic名稱就行。

4.測試

創(chuàng)建請求RUL:http://127.0.0.1:18208/hub-208-kafka/hub/example/topic/f01_1

刪除請求RUL:http://127.0.0.1:18208/hub-208-kafka/hub/example/topic/f01_2

以上,感謝。

2023年6月17日文章來源地址http://www.zghlxwxcb.cn/news/detail-488204.html

到了這里,關(guān)于使用Kafka客戶端(spring-kafka)的Java API操作Kafka的Topic的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 使用Kafka客戶端(kafka-clients)的Java API操作Kafka的Topic

    記錄 :460 場景 :在Spring Boot微服務(wù)集成Kafka客戶端kafka-clients-3.0.0操作Kafka的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服務(wù)中 配置Kafka信息 1.1在pom.xml添加依賴 pom.xml文件: 解析

    2024年02月09日
    瀏覽(95)
  • Kafka增加安全驗證安全認證,SASL認證,并通過spring boot-Java客戶端連接配置

    公司Kafka一直沒做安全驗證,由于是誘捕程序故需要面向外網(wǎng)連接,需要增加Kafka連接驗證,保證Kafka不被非法連接,故開始研究Kafka安全驗證 使用Kafka版本為2.4.0版本,主要參考官方文檔 官網(wǎng)對2.4版本安全驗證介紹以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    瀏覽(93)
  • [Kafka集群] 配置支持Brokers內(nèi)部SSL認證\外部客戶端支持SASL_SSL認證并集成spring-cloud-starter-bus-kafka

    [Kafka集群] 配置支持Brokers內(nèi)部SSL認證\外部客戶端支持SASL_SSL認證并集成spring-cloud-starter-bus-kafka

    目錄 Kafka 集群配置 準備 配置流程 Jaas(Java Authentication and Authorization Service?)文件 zookeeper 配置文件 SSL自簽名 啟動zookeeper集群 啟動kafka集群? spring-cloud-starter-bus-kafka 集成 下載統(tǒng)一版本Kafka服務(wù)包至三臺不同的服務(wù)器上 文章使用版本為? kafka_2.13-3.5.0.tgz 下載地址 jdk版本 為 Ado

    2024年02月04日
    瀏覽(99)
  • kafka 02——三個重要的kafka客戶端

    kafka 02——三個重要的kafka客戶端

    請參考下面的文章: Kafka 01——Kafka的安裝及簡單入門使用. AdminClient API: 允許管理和檢測Topic、Broker以及其他Kafka對象。 Producer API: 發(fā)布消息到一個或多個API。 Consumer API: 訂閱一個或多個Topic,并處理產(chǎn)生的消息。 如下: 完整的pom 關(guān)于配置,可參考官網(wǎng): https://kafka.apa

    2024年02月13日
    瀏覽(26)
  • kafka客戶端工具(Kafka Tool)的安裝

    kafka客戶端工具(Kafka Tool)的安裝

    官方下載 根據(jù)不同的系統(tǒng)下載對應(yīng)的版本,點擊下載后雙擊,如何一直下一步,安裝 kafka環(huán)境搭建請參考:CentOS 搭建Kafka集群 (1)連接kafka (2)簡單使用 ?

    2024年04月23日
    瀏覽(35)
  • kafka客戶端應(yīng)用參數(shù)詳解

    kafka客戶端應(yīng)用參數(shù)詳解

    Kafka提供了非常簡單的客戶端API。只需要引入一個Maven依賴即可: 1、消息發(fā)送者主流程? 然后可以使用Kafka提供的Producer類,快速發(fā)送消息。 ? 整體來說,構(gòu)建Producer分為三個步驟: 設(shè)置Producer核心屬性 ?:Producer可選的屬性都可以由ProducerConfig類管理。比如ProducerConfig.BOOTST

    2024年02月07日
    瀏覽(26)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導致kafak版本與spring版本不兼容,這個時候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • Kafka客戶端程序無法連接到Kafka集群的解決方法

    Kafka是一個高性能、分布式的流式數(shù)據(jù)平臺,廣泛用于構(gòu)建實時數(shù)據(jù)流處理應(yīng)用程序。然而,有時候我們可能會遇到Kafka客戶端程序無法連接到Kafka集群的問題。在本文中,我將介紹一些可能導致連接問題的常見原因,并提供相應(yīng)的解決方案。 網(wǎng)絡(luò)配置問題 首先,確保Kafka集群

    2024年01月21日
    瀏覽(25)
  • 【Kafka】Kafka客戶端認證失?。篊luster authorization failed.

    【Kafka】Kafka客戶端認證失敗:Cluster authorization failed.

    kafka客戶端是公司內(nèi)部基于spring-kafka封裝的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群認證方式:SASL_PLAINTEXT/SCRAM-SHA-512 經(jīng)過多年的經(jīng)驗,以及實際驗證,配置是沒問題的,但是業(yè)務(wù)方反饋用相同的配置,還是報錯! 封裝的kafka客戶端版本過低,高版本的配置項:secu

    2024年01月17日
    瀏覽(21)
  • 一個基于Kafka客戶端封裝的工具,Kafka開發(fā)效率神器

    一個基于Kafka客戶端封裝的工具,Kafka開發(fā)效率神器

    GitHub源碼https://github.com/zhangchuangiie/SimpleKafka 一個基于Kafka客戶端封裝的工具,Kafka開發(fā)效率神器 封裝了常用的Kafka客戶端操作,無需維護配置,無需初始化客戶端,真正實現(xiàn)了一行代碼調(diào)用 將連接池的維護封裝在工具類里面,多線程使用也無需維護客戶端集合 只需要集成1個

    2024年02月05日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包