国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

這篇具有很好參考價(jià)值的文章主要介紹了kafka:java集成 kafka(springboot集成、客戶(hù)端集成)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

摘要

對(duì)于java的kafka集成,一般選用springboot集成kafka,但可能由于對(duì)接方kafka老舊、kafka不安全等問(wèn)題導(dǎo)致kafak版本與spring版本不兼容,這個(gè)時(shí)候就得自己根據(jù)kafka客戶(hù)端api集成了。

一、springboot集成kafka

具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

1、加入依賴(lài),spring-boot-starter-web和spring-kafka 的版本號(hào)可以看它們依賴(lài)的spring版本是否一致,這里pom依賴(lài)如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.6</version>
        </dependency>

2、添加application.yml配置,具體如下:

server:
  port: 8087
spring:
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
  kafka:
    bootstrap-servers: 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094
    consumer:
      properties:
        group:
          id: boot-kafka

3、發(fā)送消息,由于KafkaTemplate是自動(dòng)裝配的,所以只要在spring的bean里注入KafkaTemplate發(fā)送消息即可,具體如下:

package com.longqi.bootkafka.controller;

import com.longqi.bootkafka.entity.MessageParam;
import com.longqi.bootkafka.entity.Wrapper;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;

/**
 * <p>
 * 測(cè)試 前端控制器
 * </p>
 * @author LongQi
 * @since 2021-06-23
 */

@Slf4j
@RestController
@RequestMapping("/test")
@Api(value = "TestController", tags = {"測(cè)試 API"})
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Boolean isSend = true;

    @PostMapping("/kafka/sendMessage")
    @ApiOperation(httpMethod = "POST", value = "發(fā)送kafka告警消息", response = Wrapper.class)
    public Wrapper sendKafkaMessage(@Valid @ApiParam("參數(shù)") @RequestBody MessageParam param) {
        kafkaTemplate.send(param.getTopic(), param.getMessage());
        return Wrapper.ok(true);
    }

}

這里用參數(shù){"message": "asd54a6d46a4ds","topic": "device-alarm-test"}進(jìn)行測(cè)試,會(huì)報(bào)如下日志:

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

發(fā)現(xiàn)會(huì)報(bào)警告:[Producer clientId=producer-1] Error while fetching metadata with correlation id 34 : {device-alarm-test=LEADER_NOT_AVAILABLE},獲取主題元數(shù)據(jù)錯(cuò)誤,這個(gè)可以忽略,查找元數(shù)據(jù)失敗,kafka默認(rèn)會(huì)自動(dòng)創(chuàng)建主題的,后續(xù)再次發(fā)送消息,是不會(huì)報(bào)這個(gè)錯(cuò)誤的。

查看可視化工具EFAK,發(fā)現(xiàn)主題device-alarm-test是自動(dòng)創(chuàng)建成功,分區(qū)數(shù)是kafka的集群配置service.properties里配置的分區(qū)9,具體如下:

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)
kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

可以看到,其中一個(gè)分區(qū)保存了這個(gè)消息,logsize變成了1,說(shuō)明這個(gè)消息是發(fā)送成功的。另外也可以看到主題的各分區(qū)主備消息所在的節(jié)點(diǎn)是不一樣的。

4、接收消息,接收消息也很簡(jiǎn)單,只要在spring的bean里使用KafkaListener注解即可,具體如下:

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

可視化工具也能看到該主題該消費(fèi)者9個(gè)分區(qū)的消費(fèi)情況,具體如下:

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

logSize為存入分區(qū)parttion消息數(shù)量,Offset為消費(fèi)的偏移量(已消費(fèi)的數(shù)量),Lag為未消費(fèi)的數(shù)量(積壓的數(shù)量),Owner為消費(fèi)者,目前可以看到消費(fèi)者為同一個(gè),即只有1個(gè)線程在消費(fèi)這9個(gè)分區(qū)的消息。

二、客戶(hù)端集成kafka

直接使用kafka客戶(hù)端,建議使用最新版的客戶(hù)端,畢竟沒(méi)有其他框架版本限制,能用最新的就用最新的,畢竟新的一般性能強(qiáng)也修復(fù)了bug。好比23年2月份出現(xiàn)的kafka安全漏洞:遠(yuǎn)程代碼執(zhí)行漏洞CVE-2023-25194,對(duì)現(xiàn)在最新版3.4.0無(wú)效,對(duì)以前大部分版本就有效。

1、添加依賴(lài),具體如下:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、發(fā)送和消費(fèi)消息,具體代碼如下:

package com.longqi.bootkafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author LongQi
 * @projectName boot-integration
 * @description: kafka配置
 * @date 2023/3/13 14:42
 */

public class KafkaConfig {

    public static void main(String[] args) {
        // 聲明主題
        String topic = "device-alarm-test";
        // 創(chuàng)建消費(fèi)者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 訂閱主題并循環(huán)拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 創(chuàng)建生產(chǎn)者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 給主題發(fā)送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

最后可以看到打印消息如下:

kafka:java集成 kafka(springboot集成、客戶(hù)端集成)

成功接收到消息并打印文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-421243.html

到了這里,關(guān)于kafka:java集成 kafka(springboot集成、客戶(hù)端集成)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot集成WebSocket實(shí)現(xiàn)客戶(hù)端與服務(wù)端通信

    SpringBoot集成WebSocket實(shí)現(xiàn)客戶(hù)端與服務(wù)端通信

    話不多說(shuō),直接上代碼看效果! 一、服務(wù)端: 1、引用依賴(lài) 2、添加配置文件 WebSocketConfig 3、編寫(xiě)WebSocket服務(wù)端接收、發(fā)送功能 ? 聲明接口代碼: ? 實(shí)現(xiàn)類(lèi)代碼: 4、如果不需要實(shí)現(xiàn)客戶(hù)端功能,此處可選擇前端調(diào)用,奉上代碼 二、客戶(hù)端: 1、引用依賴(lài) 2、自定義WebSocket客

    2024年01月23日
    瀏覽(24)
  • SpringBoot集成Elasticsearch客戶(hù)端(新舊版本)(2023-01-28)

    SpringBoot集成Elasticsearch客戶(hù)端(新舊版本)(2023-01-28)

    第一章 SpringBoot集成ElasticSearch(2023-01-28) 例如:業(yè)務(wù)中需要使用es,所以做一些客戶(hù)端選型,熟悉一下基本的操作,所以記錄這篇博客,有關(guān)概念理論性的文章還在整理過(guò)程中,后續(xù)會(huì)整理個(gè)系列 Spring認(rèn)證中國(guó)教育管理中心-Spring Data Elasticsearch教程一 SpringData集成Elasticsearch Sp

    2024年02月07日
    瀏覽(25)
  • Springboot 集成WebSocket作為客戶(hù)端,含重連接功能,開(kāi)箱即用

    使用演示 只需要init后調(diào)用sendMessage方法即可,做到開(kāi)箱即用。內(nèi)部封裝了失敗重連接、斷線重連接等功能。 基于Springboot工程 引入websocket依賴(lài) 開(kāi)箱即用的工具類(lèi)

    2024年02月04日
    瀏覽(36)
  • SpringBoot集成WebSocket實(shí)現(xiàn)客戶(hù)端與服務(wù)端長(zhǎng)連接通信

    SpringBoot集成WebSocket實(shí)現(xiàn)客戶(hù)端與服務(wù)端長(zhǎng)連接通信

    場(chǎng)景: 1、WebSocket協(xié)議是用于前后端長(zhǎng)連接交互的技術(shù),此技術(shù)多用于交互不斷開(kāi)的場(chǎng)景。特點(diǎn)是連接不間斷、更輕量,只有在關(guān)閉瀏覽器窗口、或者關(guān)閉瀏覽器、或主動(dòng)close,當(dāng)前會(huì)話對(duì)象才會(huì)關(guān)閉。 2、相較于 Http/Https?通信只能由客戶(hù)端主動(dòng)發(fā)起請(qǐng)求,而 Socket?通信不僅能

    2024年02月02日
    瀏覽(29)
  • kafka之java客戶(hù)端實(shí)戰(zhàn)

    kafka之java客戶(hù)端實(shí)戰(zhàn)

    ????????Kafka提供了兩套客戶(hù)端API, HighLevel API和LowLevel API 。 HighLevel API封裝了kafka的運(yùn)行細(xì)節(jié),使用起來(lái)比較簡(jiǎn)單,是企業(yè)開(kāi)發(fā)過(guò)程中最常用的客戶(hù)端API。 而LowLevel API則需要客戶(hù)端自己管理Kafka的運(yùn)行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶(hù)端自行管理。這層API功能更靈活,

    2024年01月17日
    瀏覽(22)
  • springboot集成webstock實(shí)戰(zhàn):服務(wù)端數(shù)據(jù)推送數(shù)據(jù)到客戶(hù)端實(shí)現(xiàn)實(shí)時(shí)刷新

    springboot集成webstock實(shí)戰(zhàn):服務(wù)端數(shù)據(jù)推送數(shù)據(jù)到客戶(hù)端實(shí)現(xiàn)實(shí)時(shí)刷新

    ????之前介紹過(guò)springboot集成webstock方式,具體參考: springboot集成websocket實(shí)戰(zhàn):站內(nèi)消息實(shí)時(shí)推送 這里補(bǔ)充另外一個(gè)使用webstock的場(chǎng)景,方便其他同學(xué)理解和使用,廢話不多說(shuō)了,直接開(kāi)始!簡(jiǎn)單介紹一下業(yè)務(wù)場(chǎng)景: ????現(xiàn)在有一個(gè)投票活動(dòng),活動(dòng)詳情中會(huì)顯示投票活動(dòng)的參與人數(shù)、訪

    2024年02月08日
    瀏覽(48)
  • SpringBoot集成Milo庫(kù)實(shí)現(xiàn)OPC UA客戶(hù)端:連接、遍歷節(jié)點(diǎn)、讀取、寫(xiě)入、訂閱與批量訂閱

    SpringBoot集成Milo庫(kù)實(shí)現(xiàn)OPC UA客戶(hù)端:連接、遍歷節(jié)點(diǎn)、讀取、寫(xiě)入、訂閱與批量訂閱

    前面我們搭建了一個(gè)本地的 PLC 仿真環(huán)境,并通過(guò) KEPServerEX6 讀取 PLC 上的數(shù)據(jù),最后還使用 UAExpert 作為OPC客戶(hù)端完成從 KEPServerEX6 這個(gè)OPC服務(wù)器的數(shù)據(jù)讀取與訂閱功能。在這篇文章中,我們將通過(guò) SpringBoot 集成 Milo 庫(kù)實(shí)現(xiàn)一個(gè) OPC UA 客戶(hù)端,包括連接、遍歷節(jié)點(diǎn)、讀取、寫(xiě)入

    2024年02月09日
    瀏覽(29)
  • 使用Kafka客戶(hù)端(spring-kafka)的Java API操作Kafka的Topic

    記錄 :458 場(chǎng)景 :在Spring Boot微服務(wù)集成Kafka客戶(hù)端spring-kafka-2.8.2操作Kafka的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服務(wù)中 配置Kafka信息 1.1在pom.xml添加依賴(lài) pom.xml文件: 解析

    2024年02月09日
    瀏覽(20)
  • 使用Kafka客戶(hù)端(kafka-clients)的Java API操作Kafka的Topic

    記錄 :460 場(chǎng)景 :在Spring Boot微服務(wù)集成Kafka客戶(hù)端kafka-clients-3.0.0操作Kafka的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服務(wù)中 配置Kafka信息 1.1在pom.xml添加依賴(lài) pom.xml文件: 解析

    2024年02月09日
    瀏覽(94)
  • [Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶(hù)端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka

    [Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶(hù)端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka

    目錄 Kafka 集群配置 準(zhǔn)備 配置流程 Jaas(Java Authentication and Authorization Service?)文件 zookeeper 配置文件 SSL自簽名 啟動(dòng)zookeeper集群 啟動(dòng)kafka集群? spring-cloud-starter-bus-kafka 集成 下載統(tǒng)一版本Kafka服務(wù)包至三臺(tái)不同的服務(wù)器上 文章使用版本為? kafka_2.13-3.5.0.tgz 下載地址 jdk版本 為 Ado

    2024年02月04日
    瀏覽(99)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包