国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot整合Kafka

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot整合Kafka。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、首先下載windows版本的Kafka

官網(wǎng):Apache Kafka

二、啟動(dòng)Kafka

cmd進(jìn)入到kafka安裝目錄:

1:cmd啟動(dòng)zookeeer

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2:cmd啟動(dòng)kafka server

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

3:使用cmd窗口啟動(dòng)一個(gè)生產(chǎn)者命令:

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic Topic1

4:cmd啟動(dòng)zookeeer

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 -topic Topic1

?三、引入kafka依賴(lài)

       <!--kafka依賴(lài)-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

四、配置文件

server:
    port: 8080

spring:
    application:
        name: kafka-demo
    kafka:
        bootstrap-servers: localhost:9092
        producer: # producer 生產(chǎn)者
            retries: 0 # 重試次數(shù)
            acks: 1 # 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
            batch-size: 16384 # 批量大小
            buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            #      value-serializer: com.itheima.demo.config.MySerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer

        consumer: # consumer消費(fèi)者
            group-id: javagroup # 默認(rèn)的消費(fèi)組ID
            enable-auto-commit: true # 是否自動(dòng)提交offset
            auto-commit-interval: 100  # 提交offset延時(shí)(接收到消息后多久提交offset)

            # earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
            # latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
            # none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            #      value-deserializer: com.itheima.demo.config.MyDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

?五、編寫(xiě)生產(chǎn)者發(fā)送消息

1:異步發(fā)送

@RestController
@Api(tags = "異步接口")
@RequestMapping("/kafka")
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("Topic3", JSON.toJSONString(message));
    }
}

1:同步發(fā)送

//測(cè)試同步發(fā)送與監(jiān)聽(tīng)
@RestController
@Api(tags = "同步接口")
@RequestMapping("/kafka")
public class AsyncProducer {
    private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //同步發(fā)送
    @GetMapping("/kafka/sync/{msg}")
    public void sync(@PathVariable("msg") String msg) throws Exception {
        Message message = new Message();
        message.setMessage(msg);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("Topic3", JSON.toJSONString(message));
        //注意,可以設(shè)置等待時(shí)間,超出后,不再等候結(jié)果
        SendResult<String, Object> result = future.get(3, TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());
    }

}

六、消費(fèi)者編寫(xiě)

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默認(rèn)取yml里配置的
    @KafkaListener(topics = {"Topic3"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

?通過(guò)swagger,進(jìn)行生產(chǎn)者發(fā)送消息,觀察控制臺(tái)結(jié)果

SpringBoot整合Kafka,spring boot,kafka,后端

SpringBoot整合Kafka,spring boot,kafka,后端

?至此,一個(gè)簡(jiǎn)單的整合就完成了。

后續(xù)會(huì)持續(xù)更新kafka相關(guān)內(nèi)容(多多關(guān)注哦?。?mark hidden color="red">文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-809695.html

SpringBoot整合Kafka,spring boot,kafka,后端文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-809695.html

到了這里,關(guān)于SpringBoot整合Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 第三章 Spring Boot 整合 Kafka消息隊(duì)列 消息者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全認(rèn)證 第二章??Spring Boot 整合 Kafka消息隊(duì)列?生產(chǎn)者 第三章??Spring Boot 整合 Kafka消息隊(duì)列?消息者 ????????Kafka 是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。本文主是基于Spirng Boot封裝了Apache 的

    2024年02月22日
    瀏覽(15)
  • 第二章 Spring Boot 整合 Kafka消息隊(duì)列 生產(chǎn)者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全認(rèn)證 第二章??Spring Boot 整合 Kafka消息隊(duì)列?生產(chǎn)者 第三章??Spring Boot 整合 Kafka消息隊(duì)列?消息者 ????????Kafka 是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。本文主是基于Spirng Boot封裝了Apache 的

    2024年01月25日
    瀏覽(17)
  • Spring Boot 整合kafka消費(fèi)模式AckMode以及手動(dòng)消費(fèi) 依賴(lài)管理

    在pom.xml文件中導(dǎo)入依賴(lài) 需要自己配置AckMode時(shí)候的配置 kafka支持的消費(fèi)模式,設(shè)置在 AbstractMessageListenerContainer.AckMode 的枚舉中,下面就介紹下各個(gè)模式的區(qū)別 AckMode模式 AckMode模式 作用 MANUAL 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment

    2024年02月16日
    瀏覽(24)
  • 從零到Kafka:萬(wàn)字帶你體驗(yàn)Spring Boot整合消息驅(qū)動(dòng)的奇妙之旅

    從零到Kafka:萬(wàn)字帶你體驗(yàn)Spring Boot整合消息驅(qū)動(dòng)的奇妙之旅

    主頁(yè)傳送門(mén):?? 傳送 Spring boot : | 基于Spring的開(kāi)源框架,用于簡(jiǎn)化新Spring應(yīng)用的初始搭建以及開(kāi)發(fā)過(guò)程 特性: | 快速開(kāi)發(fā)、輕量級(jí)、無(wú)代碼生成和獨(dú)立運(yùn)行等特性 優(yōu)勢(shì): | 簡(jiǎn)化配置,提供自動(dòng)配置,減少開(kāi)發(fā)時(shí)間 應(yīng)用場(chǎng)景: | 適用于微服務(wù)架構(gòu)、云原生應(yīng)用等場(chǎng)景 環(huán)境

    2024年02月05日
    瀏覽(23)
  • Spring Boot 整合kafka:生產(chǎn)者ack機(jī)制和消費(fèi)者AckMode消費(fèi)模式、手動(dòng)提交ACK

    Kafka 生產(chǎn)者的 ACK 機(jī)制指的是生產(chǎn)者在發(fā)送消息后,對(duì)消息副本的確認(rèn)機(jī)制。ACK 機(jī)制可以幫助生產(chǎn)者確保消息被成功寫(xiě)入 Kafka 集群中的多個(gè)副本,并在需要時(shí)獲取確認(rèn)信息。 Kafka 提供了三種 ACK 機(jī)制的配置選項(xiàng),分別是: acks=0:生產(chǎn)者在成功將消息發(fā)送到網(wǎng)絡(luò)緩沖區(qū)后即視

    2024年02月04日
    瀏覽(28)
  • Spring Boot 整合 Shiro(后端)

    Spring Boot 整合 Shiro(后端)

    1 Shiro 什么是 Shiro 官網(wǎng): http://shiro.apache.org/ 是一款主流的 Java 安全框架,不依賴(lài)任何容器,可以運(yùn)行在 Java SE 和 Java EE 項(xiàng)目中,它的主要作用是對(duì)訪問(wèn)系統(tǒng)的用戶(hù)進(jìn)行身份認(rèn)證、 授權(quán)、會(huì)話管理、加密等操作。 Shiro 就是用來(lái)解決安全管理的系統(tǒng)化框架。 2 Shiro 核心組件 用

    2024年02月09日
    瀏覽(26)
  • java阻塞隊(duì)列/kafka/spring整合kafka

    java阻塞隊(duì)列/kafka/spring整合kafka

    queue增加刪除元素 增加元素 add方法在添加元素的時(shí)候,若超出了度列的長(zhǎng)度會(huì)直接拋出異常: put方法,若向隊(duì)尾添加元素的時(shí)候發(fā)現(xiàn)隊(duì)列已經(jīng)滿(mǎn)了會(huì)發(fā)生阻塞一直等待空間,以加入元素 offer方法在添加元素時(shí),如果發(fā)現(xiàn)隊(duì)列已滿(mǎn)無(wú)法添加的話,會(huì)直接返回false 刪除元素 pol

    2024年02月12日
    瀏覽(22)
  • 什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    目錄 一、什么是Kafka,如何學(xué)習(xí) 二、如何整合SpringBoot 三、Kafka的優(yōu)勢(shì) ? Kafka是一種分布式的消息隊(duì)列系統(tǒng),它可以用于處理大量實(shí)時(shí)數(shù)據(jù)流 。學(xué)習(xí)Kafka需要掌握如何安裝、配置和運(yùn)行Kafka集群,以及如何使用Kafka API編寫(xiě)生產(chǎn)者和消費(fèi)者代碼來(lái)讀寫(xiě)數(shù)據(jù)。此外,還需要了解Ka

    2024年02月10日
    瀏覽(26)
  • Spring整合kafka

    只用spring-kafka依賴(lài)就行 ?注入KafkaTemplate模板 消息發(fā)送 ?監(jiān)聽(tīng)消息消費(fèi) 測(cè)試發(fā)送 spring-kafka和kafka-clients結(jié)合使用(推薦) ?消費(fèi)者組件 生產(chǎn)者組件 生產(chǎn)消息和消費(fèi)消息 ?注: 這里記錄一下生產(chǎn)發(fā)生的問(wèn)題 關(guān)于max.poll.interval.ms配置的問(wèn)題,根據(jù)自己的業(yè)務(wù)配置poll拉去間隔等待

    2024年02月02日
    瀏覽(13)
  • springboot整合kafka入門(mén)

    springboot整合kafka入門(mén)

    producer: 生產(chǎn)者,負(fù)責(zé)發(fā)布消息到kafka cluster(kafka集群)中。生產(chǎn)者可以是web前端產(chǎn)生的page view,或者是服務(wù)器日志,系統(tǒng)CPU、memory等。 consumer: 消費(fèi)者,每個(gè)consumer屬于一個(gè)特定的consuer group(可為每個(gè)consumer指定group name,若不指定group name則屬于默認(rèn)的group)。創(chuàng)建消費(fèi)者時(shí),

    2024年02月07日
    瀏覽(24)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包