国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot-Learning系列之Kafka整合

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot-Learning系列之Kafka整合。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

SpringBoot-Learning系列之Kafka整合

本系列是一個(gè)獨(dú)立的SpringBoot學(xué)習(xí)系列,本著 What Why How 的思想去整合Java開發(fā)領(lǐng)域各種組件。

SpringBoot-Learning系列之Kafka整合

  • 消息系統(tǒng)

    • 主要應(yīng)用場景
      • 流量消峰(秒殺 搶購)、應(yīng)用解耦(核心業(yè)務(wù)與非核心業(yè)務(wù)之間的解耦)
      • 異步處理、順序處理
      • 實(shí)時(shí)數(shù)據(jù)傳輸管道
      • 異構(gòu)語言架構(gòu)系統(tǒng)之間的通信
        • 如 C語言的CS客戶端的HIS系統(tǒng)與java語言開發(fā)的互聯(lián)網(wǎng)在線診療系統(tǒng)的交互
  • Kafka是什么

    kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。是java領(lǐng)域常用的消息隊(duì)列。

    核心概念:

    • 生產(chǎn)者(Producer) 生產(chǎn)者應(yīng)用向主題隊(duì)列中投送消息數(shù)據(jù)
    • 消費(fèi)者 (Consumer) 消費(fèi)者應(yīng)用從訂閱的Kafka的主題隊(duì)列中獲取數(shù)據(jù)、處理數(shù)據(jù)等后續(xù)操作
    • 主題 (Topic) 可以理解為生產(chǎn)者與消費(fèi)者交互的橋梁
    • 分區(qū) (Partition) 默認(rèn)一個(gè)主題有一個(gè)分區(qū),用戶可以設(shè)置多個(gè)分區(qū)。每個(gè)分區(qū)可以有多個(gè)副本(Replica)。分區(qū)的作用是,將數(shù)據(jù)劃分為多個(gè)小塊,提高并發(fā)性和可擴(kuò)展性。每個(gè)分區(qū)都有一個(gè)唯一的標(biāo)識(shí)符,稱為分區(qū)號。消息按照鍵(key)來進(jìn)行分區(qū),相同鍵的消息會(huì)被分配到同一個(gè)分區(qū)中。分區(qū)可以有不同的消費(fèi)者同時(shí)消費(fèi)。副本的作用是提供數(shù)據(jù)的冗余和故障恢復(fù)。每個(gè)分區(qū)可以有多個(gè)副本,其中一個(gè)被稱為領(lǐng)導(dǎo)者(leader),其他副本被稱為追隨者(follower)。領(lǐng)導(dǎo)者負(fù)責(zé)處理讀寫請求,而追隨者只負(fù)責(zé)復(fù)制領(lǐng)導(dǎo)者的數(shù)據(jù)。如果領(lǐng)導(dǎo)者宕機(jī)或不可用,某個(gè)追隨者會(huì)被選舉為新的領(lǐng)導(dǎo)者,保證數(shù)據(jù)的可用性。
  • windows 安裝kafka

    本地環(huán)境DockerDeskTop+WSL2,基于Docker方式安裝Kafka

    2.8.0后不需要依賴zk了

    • 拉取鏡像

      docker pull wurstmeister/zookeeper
      
      docker pull wurstmeister/kafka
      
    • 創(chuàng)建網(wǎng)絡(luò)

      docker network create kafka-net --driver bridge
      
    • 安裝zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安裝kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 測試

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依賴

      								```
      										<?xml version="1.0" encoding="UTF-8"?>
      										<project xmlns="http://maven.apache.org/POM/4.0.0"
      														 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      														 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      												<modelVersion>4.0.0</modelVersion>
      												<parent>
      														<groupId>org.springframework.boot</groupId>
      														<artifactId>spring-boot-starter-parent</artifactId>
      														<version>3.1.0</version>
      														<relativePath/> <!-- lookup parent from repository -->
      												</parent>
      												<groupId>io.github.vino42</groupId>
      												<artifactId>springboot-kafka</artifactId>
      												<version>1.0-SNAPSHOT</version>
      
      												<properties>
      														<java.version>17</java.version>
      														<maven.compiler.source>17</maven.compiler.source>
      														<maven.compiler.target>17</maven.compiler.target>
      														<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      												</properties>
      
      
      												<dependencies>
      														<dependency>
      																<groupId>org.projectlombok</groupId>
      																<artifactId>lombok</artifactId>
      																<optional>true</optional>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-test</artifactId>
      																<scope>test</scope>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-web</artifactId>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-log4j2</artifactId>
      														</dependency>
      														<!--kafka-->
      														<dependency>
      																<groupId>org.springframework.kafka</groupId>
      																<artifactId>spring-kafka</artifactId>
      																<exclusions>
      																		<!--排除掉 自行添加最新的官方clients依賴-->
      																		<exclusion>
      																				<groupId>org.apache.kafka</groupId>
      																				<artifactId>kafka-clients</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.apache.kafka</groupId>
      																<artifactId>kafka-clients</artifactId>
      																<version>3.5.1</version>
      														</dependency>
      														<dependency>
      																<groupId>com.google.code.gson</groupId>
      																<artifactId>gson</artifactId>
      																<version>2.10.1</version>
      														</dependency>
      														<dependency>
      																<groupId>cn.hutool</groupId>
      																<artifactId>hutool-all</artifactId>
      																<version>5.8.21</version>
      														</dependency>
      
      												</dependencies>
      												<build>
      														<plugins>
      																<plugin>
      																		<groupId>org.springframework.boot</groupId>
      																		<artifactId>spring-boot-maven-plugin</artifactId>
      																		<version>3.1.0</version>
      																</plugin>
      														</plugins>
      												</build>
      										</project>
      						```
      
    • 配置

      spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發(fā)送消息的數(shù)量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,處理完業(yè)務(wù)后,手動(dòng)調(diào)用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交
            #      MANUAL_IMMEDIATE	每處理完業(yè)務(wù)手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交
            #      RECORD	當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
            #      BATCH	當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
            #      TIME	當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
            #      COUNT	當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
            #      COUNT_TIME	TIME或COUNT滿足其中一個(gè)時(shí)提交
            ack-mode: manual_immediate
          consumer:
            group-id: test
            # 是否自動(dòng)提交
            enable-auto-commit: false
            max-poll-records: 100
            #      用于指定消費(fèi)者在啟動(dòng)時(shí)、重置消費(fèi)偏移量時(shí)的行為。
            #      earliest:消費(fèi)者會(huì)將消費(fèi)偏移量重置為最早的可用偏移量,也就是從最早的消息開始消費(fèi)。
            #      latest:消費(fèi)者會(huì)將消費(fèi)偏移量重置為最新的可用偏移量,也就是只消費(fèi)最新發(fā)送的消息。
            #      none:如果找不到已保存的消費(fèi)偏移量,消費(fèi)者會(huì)拋出一個(gè)異常
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      server:
        port: 8888spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發(fā)送消息的數(shù)量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
            ack-mode: manual_immediate
          consumer:
            group-id: test
            enable-auto-commit: false
            max-poll-records: 100
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      
    • 生產(chǎn)者代碼示例

      package io.github.vino42.publiser;
      
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      /**
       * =====================================================================================
       *
       * @Created :   2023/8/30 21:29
       * @Compiler :  jdk 17
       * @Author :    VINO
       * @Copyright : VINO
       * @Decription : kafak 消息生產(chǎn)者
       * =====================================================================================
       */
      @Component
      public class KafkaPublishService {
          @Autowired
          KafkaTemplate kafkaTemplate;
      
          /**
           * 這里為了簡單 直接發(fā)送json字符串
           *
           * @param json
           */
          public void send(String topic, String json) {
              kafkaTemplate.send(topic, json);
          }
      }
      
      
          @RequestMapping("/send")
          public String send() {
              IntStream.range(0, 10000).forEach(d -> {
                  kafkaPublishService.send("test", RandomUtil.randomString(16));
              });
              return "ok";
          }
      
      
    • 消費(fèi)者

      @Component
      @Slf4j
      public class CustomKafkaListener {
      
          @org.springframework.kafka.annotation.KafkaListener(topics = "test")
          public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {
              try {
                  String key = String.valueOf(record.key());
                  String body = record.value();
                  log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);
                  log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //手動(dòng)ack
                  acknowledgment.acknowledge();
              }
          }
      }
      

SpringBoot Learning系列 是筆者總結(jié)整理的一個(gè)SpringBoot學(xué)習(xí)集合??梢哉f算是一個(gè)SpringBoot學(xué)習(xí)的大集合。歡迎Star關(guān)注。謝謝觀看。

SpringBoot-Learning系列之Kafka整合
關(guān)注公眾號不迷路文章來源地址http://www.zghlxwxcb.cn/news/detail-706486.html

到了這里,關(guān)于SpringBoot-Learning系列之Kafka整合的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • springboot整合kafka-筆記

    這里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 測試發(fā)送kafka消息-控制臺(tái)日志

    2024年02月12日
    瀏覽(27)
  • SpringBoot整合Kafka

    SpringBoot整合Kafka

    官網(wǎng):Apache Kafka cmd進(jìn)入到kafka安裝目錄: 1:cmd啟動(dòng)zookeeer .binwindowszookeeper-server-start.bat .configzookeeper.properties 2:cmd啟動(dòng)kafka server .binwindowszookeeper-server-start.bat .configzookeeper.properties 3:使用cmd窗口啟動(dòng)一個(gè)生產(chǎn)者命令: .binwindowskafka-console-producer.bat --bootstrap-server local

    2024年01月20日
    瀏覽(21)
  • 什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    目錄 一、什么是Kafka,如何學(xué)習(xí) 二、如何整合SpringBoot 三、Kafka的優(yōu)勢 ? Kafka是一種分布式的消息隊(duì)列系統(tǒng),它可以用于處理大量實(shí)時(shí)數(shù)據(jù)流 。學(xué)習(xí)Kafka需要掌握如何安裝、配置和運(yùn)行Kafka集群,以及如何使用Kafka API編寫生產(chǎn)者和消費(fèi)者代碼來讀寫數(shù)據(jù)。此外,還需要了解Ka

    2024年02月10日
    瀏覽(26)
  • Kafka 基礎(chǔ)整理、 Springboot 簡單整合

    Kafka 基礎(chǔ)整理、 Springboot 簡單整合

    定義: Kafka 是一個(gè)分布式的基于發(fā)布/訂閱默認(rèn)的消息隊(duì)列 是一個(gè)開源的分布式事件流平臺(tái),被常用用于數(shù)據(jù)管道、流分析、數(shù)據(jù)集成、關(guān)鍵任務(wù)應(yīng)用 消費(fèi)模式: 點(diǎn)對點(diǎn)模式 (少用) 消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息 發(fā)布/訂閱模式 生產(chǎn)者推送消息到隊(duì)列,都消費(fèi)者

    2024年02月03日
    瀏覽(21)
  • Kafka入門(安裝和SpringBoot整合)

    Kafka入門(安裝和SpringBoot整合)

    app-tier:網(wǎng)絡(luò)名稱 –driver:網(wǎng)絡(luò)類型為bridge Kafka依賴zookeeper所以先安裝zookeeper -p:設(shè)置映射端口(默認(rèn)2181) -d:后臺(tái)啟動(dòng) 安裝并運(yùn)行Kafka, –name:容器名稱 -p:設(shè)置映射端口(默認(rèn)9092 ) -d:后臺(tái)啟動(dòng) ALLOW_PLAINTEXT_LISTENER任何人可以訪問 KAFKA_CFG_ZOOKEEPER_CONNECT鏈接的zookeeper K

    2024年02月11日
    瀏覽(21)
  • 實(shí)戰(zhàn):徹底搞定 SpringBoot 整合 Kafka

    kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個(gè)項(xiàng)目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項(xiàng)目里快速集成kafka。 除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。

    2024年02月10日
    瀏覽(26)
  • springboot整合ELK+kafka采集日志

    springboot整合ELK+kafka采集日志

    在分布式的項(xiàng)目中,各功能模塊產(chǎn)生的日志比較分散,同時(shí)為滿足性能要求,同一個(gè)微服務(wù)會(huì)集群化部署,當(dāng)某一次業(yè)務(wù)報(bào)錯(cuò)后,如果不能確定產(chǎn)生的節(jié)點(diǎn),那么只能逐個(gè)節(jié)點(diǎn)去查看日志文件;logback中RollingFileAppender,ConsoleAppender這類同步化記錄器也降低系統(tǒng)性能,綜上一些

    2024年02月15日
    瀏覽(23)
  • springboot整合kafka多數(shù)據(jù)源

    springboot整合kafka多數(shù)據(jù)源

    在很多與第三方公司對接的時(shí)候,或者處在不同的網(wǎng)絡(luò)環(huán)境下,比如在互聯(lián)網(wǎng)和政務(wù)外網(wǎng)的分布部署服務(wù)的時(shí)候,我們需要對接多臺(tái)kafka來達(dá)到我們的業(yè)務(wù)需求,那么當(dāng)kafka存在多數(shù)據(jù)源的情況,就與單機(jī)的情況有所不同。 單機(jī)的情況 如果是單機(jī)的kafka我們直接通過springboot自

    2024年02月13日
    瀏覽(29)
  • SpringBoot整合Kafka簡單配置實(shí)現(xiàn)生產(chǎn)消費(fèi)

    *本文基于SpringBoot整合Kafka,通過簡單配置實(shí)現(xiàn)生產(chǎn)及消費(fèi),包括生產(chǎn)消費(fèi)的配置說明、消費(fèi)者偏移設(shè)置方式等。更多功能細(xì)節(jié)可參考 spring kafka 文檔:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka環(huán)境,參考Kafka集群環(huán)境搭建及使用 Java環(huán)境:JDK1.8 Maven版本:apach

    2024年02月16日
    瀏覽(22)
  • 【SpringBoot整合系列】SpringBoot整合FastDFS(一)

    【SpringBoot整合系列】SpringBoot整合FastDFS(一)

    FastDFS(Fast Distributed File System)是一款開源的分布式文件系統(tǒng),它提供了高性能、高可靠性、高擴(kuò)展性和高容錯(cuò)性的分布式文件存儲(chǔ)解決方案。 FastDFS采用了類似于Google File System(GFS)的架構(gòu),它的設(shè)計(jì)目標(biāo)是解決大規(guī)模數(shù)據(jù)存儲(chǔ)和高訪問速度的問題。 分布式架構(gòu): FastDFS采用

    2024年04月27日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包