国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring for Apache Kafka概述和簡單入門

這篇具有很好參考價值的文章主要介紹了Spring for Apache Kafka概述和簡單入門。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、概述

Spring for Apache Kafka 的高級概述以及底層概念和可運行的示例代碼。

二、準備工作

注意:進行工作開始之前至少要有一個 Apache Kafka 環(huán)境

2.1、依賴

  • 使用 Spring Boot
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

使用 Spring Boot 時,省略版本,Boot 將自動引入與您的 Boot 版本兼容的正確版本

  • 使用 Spring
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.7.14</version>
</dependency>

使用Spring 時必須要申明使用的版本。

2.2、版本兼容性

  • Apache Kafka 客戶端 2.7.0 或 2.8.0

  • Spring 框架 5.3.x 或 Spring Boot 2.7.x

  • 最低 Java 版本:8

在 Spring Boot 應(yīng)用程序中使用 Apache Kafka 時, Apache Kafka 依賴項版本由 Spring Boot 的依賴項管理確定。
想要使用不同于Spring Boot版本的 Apache Kafka時需要覆蓋所有關(guān)聯(lián)的依賴項。
尤其在使用嵌入式 Kafka 代理時特別要注意。

2.3、依賴覆蓋

在 Spring Boot 應(yīng)用程序中使用 Apache Kafka 時, Apache Kafka 依賴項版本由 Spring Boot 的依賴項管理確定。
如果要使用kafka-clients或的不同版本kafka-streams(例如 2.x), 則需要覆蓋所有關(guān)聯(lián)的依賴項。
尤其是在 spring-kafka-test 中使用嵌入式 Kafka 代理時。

并非所有的 Spring Boot都會向下兼容

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.14</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.7.14</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
</dependency>

<!-- 可選  僅在使用 kafka 流時需要 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>{kafka-version}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>{kafka-version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

三、Spring Boot消費者

3.1、應(yīng)用程序

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

3.2、配置項

spring.kafka.consumer.auto-offset-reset=earliest //從提交的offset開始消費;無提交的offset時,從頭開始消費

四、Srping Boot生產(chǎn)者

4.1、應(yīng)用程序

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

五、不使用 Spring Boot

在不使用 Spring Boot 時必須定義幾個基礎(chǔ)的Bean。

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

在 Spring 上下文之外創(chuàng)建偵聽器容器,則必須滿足容器實現(xiàn)的所有接口,否則有些功能會出現(xiàn)異常工作。

不包括 Spring Boot的完整示例如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-701680.html

public class Sender {

	public static void main(String[] args) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
		context.getBean(Sender.class).send("test", 42);
	}

	private final KafkaTemplate<Integer, String> template;

	public Sender(KafkaTemplate<Integer, String> template) {
		this.template = template;
	}

	public void send(String toSend, int key) {
		this.template.send("topic1", key, toSend);
	}

}

public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

到了這里,關(guān)于Spring for Apache Kafka概述和簡單入門的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【SpringBoot】| Spring Boot 概述和入門程序剖析

    【SpringBoot】| Spring Boot 概述和入門程序剖析

    目錄 一:Spring Boot 入門 1. Spring能做什么? 2. SpringBoot特點 3. 如何學(xué)習(xí)SpringBoot 4.?創(chuàng)建Spring Boot項目 Maven的配置 入門案例: SpringBoot中幾個重要的注解 5. 了解自動配置原理 依賴管理 自動配置 6.?SpringBoot核心配置文件 多環(huán)境測試 自定義配置 7.?SpringBoot中使用JSP(了解) 8.?S

    2024年02月06日
    瀏覽(28)
  • Spring Boot與Apache Kafka實現(xiàn)高吞吐量消息處理:解決大規(guī)模數(shù)據(jù)處理問題

    現(xiàn)代數(shù)據(jù)量越來越龐大對數(shù)據(jù)處理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息隊列之一。Spring Boot是現(xiàn)代Java應(yīng)用程序快速開發(fā)的首選框架。綜合使用Spring Boot和Apache Kafka可以實現(xiàn)高吞吐量消息處理。 Apache Kafka采用分布式發(fā)布-訂閱模式具有高度的可擴展性和可

    2024年02月05日
    瀏覽(25)
  • kafka--技術(shù)文檔--spring-boot集成基礎(chǔ)簡單使用

    kafka--技術(shù)文檔--spring-boot集成基礎(chǔ)簡單使用

    ? ? ? ? 查閱了很多資料了解到,使用了spring-boot中整合的kafka的使用是被封裝好的。也就是說這些使用其實和在linux中的使用kafka代碼的使用其實沒有太大關(guān)系。但是邏輯是一樣的。這點要注意! 核心配置為: 如果在下面規(guī)定了spring-boot的版本那么就不需要再使用版本號,如

    2024年02月11日
    瀏覽(23)
  • Spring Security 6.x 系列【1】基礎(chǔ)篇之概述及入門案例

    Spring Security 6.x 系列【1】基礎(chǔ)篇之概述及入門案例

    有道無術(shù),術(shù)尚可求,有術(shù)無道,止于術(shù)。 本系列Spring Boot 版本 3.0.4 本系列Spring Security 版本 6.0.2 源碼地址:https://gitee.com/pearl-organization/study-spring-security-demo 本系列基于最新 Spring Boot 3.0 + Spring Security 6.0 版本,由淺入深,從實戰(zhàn)到源碼分析,詳細講解各種 Spring Security 的使用

    2024年02月06日
    瀏覽(25)
  • Spring Security OAuth2.0(3):Spring Security簡單入門

    Spring Security OAuth2.0(3):Spring Security簡單入門

    Spring Security 快速入門。 本章代碼已分享至Gitee:https://gitee.com/lengcz/security-spring-security qquad Spring Secutiry 是一個能夠為基于Spring的企業(yè)應(yīng)用系統(tǒng)提供聲明式的安全訪問控制解決方案的安全框架。由于它是Spring生態(tài)系統(tǒng)的一員,因此它伴隨著整個Spring生態(tài)系統(tǒng)不斷修正、升級,

    2024年02月13日
    瀏覽(28)
  • Spring Boot快速入門:構(gòu)建簡單的Web應(yīng)用

    ??Spring Boot是一個用于簡化Spring應(yīng)用程序開發(fā)的框架,它通過提供開箱即用的配置和一組常用的功能,使得構(gòu)建高效、可維護的應(yīng)用變得非常容易。在本篇博客中,我們將一步步地介紹如何快速入門Spring Boot,并構(gòu)建一個簡單的Web應(yīng)用。 步驟1:準備開發(fā)環(huán)境 Java Development

    2024年02月07日
    瀏覽(25)
  • 快速入門使用spring詳細步驟(介紹、導(dǎo)入依賴、第一個簡單程序)

    快速入門使用spring詳細步驟(介紹、導(dǎo)入依賴、第一個簡單程序)

    目錄 一、spring介紹 二、spring使用步驟 (一)創(chuàng)建maven項目? (二)?maven項目導(dǎo)入spring依賴 (三)開始編寫第一個spring程序 三、新篇章之springboot(額外篇) spring是作為Java EE企業(yè)級開發(fā)很好的一個框架,這篇文章就來講解一下怎么使用spring。要使用spring,現(xiàn)在一般都是 使用

    2024年02月04日
    瀏覽(18)
  • UI for Apache Kafka

    UI for Apache Kafka

    文章Overview of UI Tools for Monitoring and Management of Apache Kafka Clusters | by German Osin | Towards Data Science中介紹了8種常見的kafka UI工具,這些產(chǎn)品的核心功能對比信息如下圖所示, 通過對比發(fā)現(xiàn) UI for Apache Kafka 功能齊全且免費,因此可以作為我們的首選。本文通過二進制jar包的方式進行安

    2024年02月04日
    瀏覽(20)
  • Splunk Connect for Kafka – Connecting Apache Kafka with Splunk

    1: 背景: 1: splunk 有時要去拉取kafka 上的數(shù)據(jù): 下面要用的有用的插件:Splunk Connect for Kafka 先說一下這個Splunk connect for kafka 是什么: Spunk Connect for Kafka is a “sink connector” built on the Kafka Connect framework for exporting data from Kafka topics into Splunk. With a focus on speed and reliability, include

    2024年02月05日
    瀏覽(17)
  • Spring學(xué)習(xí)筆記之spring概述

    Spring學(xué)習(xí)筆記之spring概述

    Spring是一個輕量級的控制反轉(zhuǎn)和面向切面的容器框架 Spring最初的出現(xiàn)是為了解決EJB臃腫的設(shè)計,以及難以測試等問題。 Spring為了簡化開發(fā)而生,讓程序員只需關(guān)注核心業(yè)務(wù)的實現(xiàn),盡可能的不再關(guān)注非業(yè)務(wù)邏輯代碼(事務(wù)控制,安全日志等) 八大模塊中有兩大核心模塊,

    2024年02月14日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包