国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

在Spring Boot微服務集成Kafka客戶端(kafka-clients)操作Kafka

這篇具有很好參考價值的文章主要介紹了在Spring Boot微服務集成Kafka客戶端(kafka-clients)操作Kafka。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

記錄:459

場景:在Spring Boot微服務集成Kafka客戶端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消費者Consumer。

版本:JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。

Kafka安裝:https://blog.csdn.net/zhangbeizhen18/article/details/129071395

1.基礎概念

Event:An event records the fact that "something happened" in the world or in your business. It is also called record or message in the documentation.

Broker:一個Kafka節(jié)點就是一個broker;多個Broker可以組成一個Kafka集群。

Topic:Kafka根據(jù)Topic對消息進行歸類,發(fā)布到Kafka的每條消息都需要指定一個Topic。

Producer:消息生產(chǎn)者,向Broker發(fā)送消息的客戶端。

Consumer:消息消費者,從Broker讀取消息的客戶端。

ConsumerGroup:每個Consumer屬于一個特定的ConsumerGroup,一條消息可以被多個不同的ConsumerGroup消費;但是一個ConsumerGroup中只能有一個Consumer能夠消費該消息。

Partition:一個topic可以分為多個partition,每個partition內部消息是有序的。

publish:發(fā)布,使用Producer向Kafka寫入數(shù)據(jù)。

subscribe:訂閱,使用Consumer從Kafka讀取數(shù)據(jù)。

2.微服務中配置Kafka信息

(1)在pom.xml添加依賴

pom.xml文件:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.0.0</version>
</dependency>

解析:使用原生的kafka-clients,版本:3.0.0。操作kafka的生產(chǎn)者、消費、Topic。

3.配置Kafka生產(chǎn)者和消費者

使用原生的kafka-clients,需配置KafkaProducer和KafkaConsumer,把Kafka的配置信息注入到這兩個對象,便可以操作了生產(chǎn)者和消費者。

配置細節(jié)在官網(wǎng)的configuration:https://kafka.apache.org/documentation/

3.1配置KafkaProducer生產(chǎn)者

(1)示例代碼

@Configuration
public class KafkaConfig {
  @Bean
  public KafkaProducer kafkaProducer() {
    Map<String, Object> configs = new HashMap<>();
    //#kafka服務端的IP和端口,格式:(ip:port)
    configs.put("bootstrap.servers", "192.168.19.203:29001");
    //客戶端發(fā)送服務端失敗的重試次數(shù)
    configs.put("retries", 2);
    //多個記錄被發(fā)送到同一個分區(qū)時,生產(chǎn)者將嘗試將記錄一起批處理成更少的請求.
    //此設置有助于提高客戶端和服務器的性能,配置控制默認批量大小(以字節(jié)為單位)
    configs.put("batch.size", 16384);
    //生產(chǎn)者可用于緩沖等待發(fā)送到服務器的記錄的總內存字節(jié)數(shù)(以字節(jié)為單位)
    configs.put("buffer-memory", 33554432);
    //生產(chǎn)者producer要求leader節(jié)點在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務端的持久化
    //acks=0,設置為0,則生產(chǎn)者producer將不會等待來自服務器的任何確認.該記錄將立即添加到套接字(socket)緩沖區(qū)并視為已發(fā)送.在這種情況下,無法保證服務器已收到記錄,并且重試配置(retries)將不會生效(因為客戶端通常不會知道任何故障),每條記錄返回的偏移量始終設置為-1.
    //acks=1,設置為1,leader節(jié)點會把記錄寫入本地日志,不需要等待所有follower節(jié)點完全確認就會立即應答producer.在這種情況下,在follower節(jié)點復制前,leader節(jié)點確認記錄后立即失敗的話,記錄將會丟失.
    //acks=all,acks=-1,leader節(jié)點將等待所有同步復制副本完成再確認記錄,這保證了只要至少有一個同步復制副本存活,記錄就不會丟失.
    configs.put("acks", "-1");
    //指定key使用的序列化類
    Serializer keySerializer = new StringSerializer();
    //指定value使用的序列化類
    Serializer valueSerializer = new StringSerializer();
    //創(chuàng)建Kafka生產(chǎn)者
    KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer);
    return kafkaProducer;
  }
}

(2)解析代碼

把Kafka的配置信息注入到KafkaProducer,并創(chuàng)建KafkaProducer對象。

使用@Configuration和@Bean注解把KafkaProducer對象注入到Spring的IOC容器,在Spring環(huán)境就可以使用KafkaProducer了。

KafkaProducer的底層使用配置類是ProducerConfig,在配置時可以參考。

全稱:org.apache.kafka.clients.producer.ProducerConfig。

3.2配置KafkaConsumer消費者

(1)示例代碼

@Configuration
public class KafkaConfig {
  @Bean
  public KafkaConsumer kafkaConsumer() {
    Map<String, Object> configs = new HashMap<>();
    //kafka服務端的IP和端口,格式:(ip:port)
    configs.put("bootstrap.servers", "192.168.19.203:29001");
    //開啟consumer的偏移量(offset)自動提交到Kafka
    configs.put("enable.auto.commit", true);
    //consumer的偏移量(offset) 自動提交的時間間隔,單位毫秒
    configs.put("auto.commit.interval", 5000);
    //在Kafka中沒有初始化偏移量或者當前偏移量不存在情況
    //earliest, 在偏移量無效的情況下, 自動重置為最早的偏移量
    //latest, 在偏移量無效的情況下, 自動重置為最新的偏移量
    //none, 在偏移量無效的情況下, 拋出異常.
    configs.put("auto.offset.reset", "latest");
    //請求阻塞的最大時間(毫秒)
    configs.put("fetch.max.wait", 500);
    //請求應答的最小字節(jié)數(shù)
    configs.put("fetch.min.size", 1);
    //心跳間隔時間(毫秒)
    configs.put("heartbeat-interval", 3000);
    //一次調用poll返回的最大記錄條數(shù)
    configs.put("max.poll.records", 500);
    //指定消費組
    configs.put("group.id", "hub-topic-city-01-group");
    //指定key使用的反序列化類
    Deserializer keyDeserializer = new StringDeserializer();
    //指定value使用的反序列化類
    Deserializer valueDeserializer = new StringDeserializer();
    //創(chuàng)建Kafka消費者
    KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer);
    return kafkaConsumer;
  }
}

(2)解析代碼

把Kafka的配置信息注入到KafkaConsumer,并創(chuàng)建KafkaConsumer對象。

使用@Configuration和@Bean注解把KafkaConsumer對象注入到Spring的IOC容器,在Spring環(huán)境就可以使用KafkaConsumer了。

KafkaConsumer的底層使用配置類是ConsumerConfig,在配置時可以參考。

全稱:org.apache.kafka.clients.consumer.ConsumerConfig。

4.使用KafkaProducer操作Kafka生產(chǎn)者Producer

使用原生kafka-clients的KafkaProducer操作Kafka生產(chǎn)者Producer。

KafkaProducer全稱:org.apache.kafka.clients.producer.KafkaProducer。

(1)示例代碼

@RestController
@RequestMapping("/hub/example/producer")
@Slf4j
public class UseKafkaProducerController {
  @Autowired
  private KafkaProducer kafkaProducer;
  private final String topicName = "hub-topic-city-02";
  @GetMapping("/f01_1")
  public Object f01_1() {
    try {
        //1.獲取業(yè)務數(shù)據(jù)
        CityDTO cityDTO = CityDTO.buildDto(2023061501L, "杭州", "杭州是一個好城市");
        String cityStr = JSONObject.toJSONString(cityDTO);
        log.info("向Kafka的Topic: {} ,寫入數(shù)據(jù):", topicName);
        log.info(cityStr);
        //2.使用KafkaProducer向Kafka寫入數(shù)據(jù)
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, cityStr);
        kafkaProducer.send(producerRecord);
    } catch (Exception e) {
        log.info("Producer寫入Topic異常.");
        e.printStackTrace();
    }
    return "寫入成功";
  }
}

(2)解析代碼

創(chuàng)建ProducerRecord對象,指定指定Kafka的Topic名稱和需要寫入的數(shù)據(jù)。ProducerRecord就是需要寫入Kafka中的一條數(shù)據(jù),

使用KafkaProducer 的send方法,傳入ProducerRecord,就能完成Producer向Kafka的Broker節(jié)點寫入數(shù)據(jù)。

5.使用KafkaConsumer操作Kafka的消費者Consumer

使用原生kafka-clients的KafkaConsumer操作Kafka生產(chǎn)者Consumer。

KafkaConsumer全稱:org.apache.kafka.clients.consumer.KafkaConsumer。

(1)示例代碼

@Component
@Slf4j
public class UseKafkaConsumer implements InitializingBean {
  @Autowired
  private KafkaConsumer kafkaConsumer;
  private final String topicName = "hub-topic-city-02";
  @Override
  public void afterPropertiesSet() throws Exception {
    Thread thread = new Thread(() -> {
        log.info("啟動線程監(jiān)聽Topic: {}", topicName);
        ThreadUtil.sleep(1000);
        Collection<String> topics = Lists.newArrayList(topicName);
        kafkaConsumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                //1.從ConsumerRecord中獲取消費數(shù)據(jù)
                String originalMsg = (String) consumerRecord.value();
                log.info("從Kafka中消費的原始數(shù)據(jù): " + originalMsg);
                //2.把消費數(shù)據(jù)轉換為DTO對象
                CityDTO cityDTO = JSONUtil.toBean(originalMsg, CityDTO.class);
                log.info("消費數(shù)據(jù)轉換為DTO對象: " + cityDTO.toString());
            }
        }
    });
    thread.start();
  }
}

(2)解析代碼

使用?while (true)實時遍歷KafkaConsumer消費者,實質即使實時監(jiān)聽Kafka消費者。

使用KafkaConsumer的subscribe方法訂閱需要監(jiān)聽的Kafka的Topic。

使用KafkaConsumer的poll方法輪詢消費者獲取消費消息ConsumerRecord。

從ConsumerRecord中獲取具體消費的業(yè)務數(shù)據(jù)。

6.測試

(1)使用Postman測試,調用生產(chǎn)者寫入數(shù)據(jù)

請求RUL:http://127.0.0.1:18209/hub-209-kafka/hub/example/producer/f01_1

(2)消費者自動消費數(shù)據(jù)

日志信息:

向Kafka的Topic: hub-topic-city-02 ,寫入數(shù)據(jù):
{"cityDescribe":"杭州是一個好城市","cityId":2023061501,"cityName":"杭州","updateTime":"2023-06-17 11:27:52"}
從Kafka中消費的原始數(shù)據(jù): {"cityDescribe":"杭州是一個好城市","cityId":2023061501,"cityName":"杭州","updateTime":"2023-06-17 11:27:52"}
消費數(shù)據(jù)轉換為DTO對象: CityDTO(cityId=2023061501, cityName=杭州, cityDescribe=杭州是一個好城市, updateTime=Sat Jun 17 11:27:52 CST 2023)

7.輔助類

@Data
@Builder
public class CityDTO {
  private Long cityId;
  private String cityName;
  private String cityDescribe;
  @JsonFormat(
          pattern = "yyyy-MM-dd HH:mm:ss"
  )
  private Date updateTime;
  public static CityDTO buildDto(Long cityId, String cityName,
                                 String cityDescribe) {
      return builder().cityId(cityId)
              .cityName(cityName).cityDescribe(cityDescribe)
              .updateTime(new Date()).build();
  }
}

以上,感謝。

2023年6月17日文章來源地址http://www.zghlxwxcb.cn/news/detail-518008.html

到了這里,關于在Spring Boot微服務集成Kafka客戶端(kafka-clients)操作Kafka的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 使用Kafka客戶端(kafka-clients)的Java API操作Kafka的Topic

    記錄 :460 場景 :在Spring Boot微服務集成Kafka客戶端kafka-clients-3.0.0操作Kafka的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服務中 配置Kafka信息 1.1在pom.xml添加依賴 pom.xml文件: 解析

    2024年02月09日
    瀏覽(95)
  • Kafka增加安全驗證安全認證,SASL認證,并通過spring boot-Java客戶端連接配置

    公司Kafka一直沒做安全驗證,由于是誘捕程序故需要面向外網(wǎng)連接,需要增加Kafka連接驗證,保證Kafka不被非法連接,故開始研究Kafka安全驗證 使用Kafka版本為2.4.0版本,主要參考官方文檔 官網(wǎng)對2.4版本安全驗證介紹以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    瀏覽(93)
  • Netty示例教程:結合Spring Boot構建客戶端/服務器通信

    當涉及到在客戶端/服務器應用程序中使用Netty進行通信時,以下是一個結合Spring Boot的示例教程,演示如何使用Netty構建客戶端和服務器應用程序。 簡介 本教程將指導您如何使用Netty結合Spring Boot構建客戶端和服務器應用程序。通過Netty的可靠網(wǎng)絡通信功能,您可以輕松構建高

    2024年02月15日
    瀏覽(96)
  • 基于Spring Boot2.0 & HTTP/2 實現(xiàn)服務器、客戶端

    基于Spring Boot2.0 & HTTP/2 實現(xiàn)服務器、客戶端

    HTTP協(xié)議由于其無狀態(tài)的特性以及超高的普及率,是當下大部分網(wǎng)站選擇使用的應用層協(xié)議。然而,HTTP/1.x的底層傳輸方式的幾個特性,已經(jīng)對應用的整體性能產(chǎn)生了負面影響。特別是,HTTP/1.0在每次的TCP連接上只允許發(fā)送一次請求,在HTTP/1.1中增加了請求管線,但是這僅僅解決

    2023年04月09日
    瀏覽(83)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導致kafak版本與spring版本不兼容,這個時候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • 【Spring Boot Admin】客戶端服務無法注冊到監(jiān)控平臺的相關問題及解決方案

    1、客戶端服務整合了Spring Security 通過URL注冊,需在客戶端服務中添加如下配置 通過注冊中心注冊,需在客戶端服務中添加如下配置 2、客戶端服務配置了server.port.context-path參數(shù),并且客戶端服務通過注冊中心注冊 需在客戶端服務中添加如下配置 3、Spring Boot Admin 監(jiān)控平臺使

    2024年02月16日
    瀏覽(94)
  • [Kafka集群] 配置支持Brokers內部SSL認證\外部客戶端支持SASL_SSL認證并集成spring-cloud-starter-bus-kafka

    [Kafka集群] 配置支持Brokers內部SSL認證\外部客戶端支持SASL_SSL認證并集成spring-cloud-starter-bus-kafka

    目錄 Kafka 集群配置 準備 配置流程 Jaas(Java Authentication and Authorization Service?)文件 zookeeper 配置文件 SSL自簽名 啟動zookeeper集群 啟動kafka集群? spring-cloud-starter-bus-kafka 集成 下載統(tǒng)一版本Kafka服務包至三臺不同的服務器上 文章使用版本為? kafka_2.13-3.5.0.tgz 下載地址 jdk版本 為 Ado

    2024年02月04日
    瀏覽(100)
  • Spring Boot 整合 Redis,使用 RedisTemplate 客戶端

    Spring Boot 整合 Redis,使用 RedisTemplate 客戶端

    1.1.1 添加依賴 redis 的依賴: 1.1.2 yml 配置文件 1.1.3 Config 配置文件 1.1.4 使用示例 注入 RedisTemplate,即可操作 Redis,簡單示例如下: 1.2.1 RedisTemplate 簡介 RedisTemplate 是 Spring Data Redis 項目的一部分,旨在簡化在Java應用程序中使用 Redis 的過程。它提供了一組簡單的方法,可以在

    2024年02月09日
    瀏覽(28)
  • 利用Spring Boot實現(xiàn)客戶端IP地理位置獲取

    利用Spring Boot實現(xiàn)客戶端IP地理位置獲取

    在當今互聯(lián)的世界中,了解客戶端的地理位置對于提供個性化服務和增強用戶體驗至關重要。無論是根據(jù)地區(qū)偏好定制內容,還是確保符合本地法規(guī),訪問客戶端IP位置都是一項寶貴的資產(chǎn)。如抖音評論區(qū)、用戶頁都會展示用戶的IP屬地信息。 在本文中,我們將探討一個Spri

    2024年02月20日
    瀏覽(95)
  • 使用Spring Boot和Apache HttpClient構建REST客戶端

    介紹: 在本文中,我們將學習如何使用Spring Boot和Apache HttpClient創(chuàng)建一個REST客戶端。我們將探討如何與遠程服務器進行通信、處理JSON響應,并為Web應用程序配置跨源資源共享(CORS)。讓我們深入代碼吧! ClientService 類負責發(fā)起HTTP請求并處理響應。它使用 @Service 注解表示它應

    2024年01月16日
    瀏覽(93)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包