SpringBoot-Learning系列之Kafka整合
本系列是一個(gè)獨(dú)立的SpringBoot學(xué)習(xí)系列,本著 What Why How 的思想去整合Java開發(fā)領(lǐng)域各種組件。
-
消息系統(tǒng)
- 主要應(yīng)用場景
- 流量消峰(秒殺 搶購)、應(yīng)用解耦(核心業(yè)務(wù)與非核心業(yè)務(wù)之間的解耦)
- 異步處理、順序處理
- 實(shí)時(shí)數(shù)據(jù)傳輸管道
- 異構(gòu)語言架構(gòu)系統(tǒng)之間的通信
- 如 C語言的CS客戶端的HIS系統(tǒng)與java語言開發(fā)的互聯(lián)網(wǎng)在線診療系統(tǒng)的交互
- 主要應(yīng)用場景
-
Kafka是什么
kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。是java領(lǐng)域常用的消息隊(duì)列。
核心概念:
- 生產(chǎn)者(Producer) 生產(chǎn)者應(yīng)用向主題隊(duì)列中投送消息數(shù)據(jù)
- 消費(fèi)者 (Consumer) 消費(fèi)者應(yīng)用從訂閱的Kafka的主題隊(duì)列中獲取數(shù)據(jù)、處理數(shù)據(jù)等后續(xù)操作
- 主題 (Topic) 可以理解為生產(chǎn)者與消費(fèi)者交互的橋梁
- 分區(qū) (Partition) 默認(rèn)一個(gè)主題有一個(gè)分區(qū),用戶可以設(shè)置多個(gè)分區(qū)。每個(gè)分區(qū)可以有多個(gè)副本(Replica)。分區(qū)的作用是,將數(shù)據(jù)劃分為多個(gè)小塊,提高并發(fā)性和可擴(kuò)展性。每個(gè)分區(qū)都有一個(gè)唯一的標(biāo)識(shí)符,稱為分區(qū)號。消息按照鍵(key)來進(jìn)行分區(qū),相同鍵的消息會(huì)被分配到同一個(gè)分區(qū)中。分區(qū)可以有不同的消費(fèi)者同時(shí)消費(fèi)。副本的作用是提供數(shù)據(jù)的冗余和故障恢復(fù)。每個(gè)分區(qū)可以有多個(gè)副本,其中一個(gè)被稱為領(lǐng)導(dǎo)者(leader),其他副本被稱為追隨者(follower)。領(lǐng)導(dǎo)者負(fù)責(zé)處理讀寫請求,而追隨者只負(fù)責(zé)復(fù)制領(lǐng)導(dǎo)者的數(shù)據(jù)。如果領(lǐng)導(dǎo)者宕機(jī)或不可用,某個(gè)追隨者會(huì)被選舉為新的領(lǐng)導(dǎo)者,保證數(shù)據(jù)的可用性。
-
windows 安裝kafka
本地環(huán)境DockerDeskTop+WSL2,基于Docker方式安裝Kafka
2.8.0后不需要依賴zk了
-
拉取鏡像
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka
-
創(chuàng)建網(wǎng)絡(luò)
docker network create kafka-net --driver bridge
-
安裝zk
docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
-
安裝kafka
docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:latest
-
測試
telnet localhost:9092
-
-
SpringBoot集成
SpringBoot3.1.0+jdk17
-
pom依賴
``` <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>io.github.vino42</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>17</java.version> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <!--排除掉 自行添加最新的官方clients依賴--> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.21</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>3.1.0</version> </plugin> </plugins> </build> </project> ```
-
配置
spring: kafka: bootstrap-servers: 172.31.192.1:9092 producer: retries: 0 # 每次批量發(fā)送消息的數(shù)量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false # MANUAL poll()拉取一批消息,處理完業(yè)務(wù)后,手動(dòng)調(diào)用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交 # MANUAL_IMMEDIATE 每處理完業(yè)務(wù)手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交 # RECORD 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交 # BATCH 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交 # TIME 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交 # COUNT 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交 # COUNT_TIME TIME或COUNT滿足其中一個(gè)時(shí)提交 ack-mode: manual_immediate consumer: group-id: test # 是否自動(dòng)提交 enable-auto-commit: false max-poll-records: 100 # 用于指定消費(fèi)者在啟動(dòng)時(shí)、重置消費(fèi)偏移量時(shí)的行為。 # earliest:消費(fèi)者會(huì)將消費(fèi)偏移量重置為最早的可用偏移量,也就是從最早的消息開始消費(fèi)。 # latest:消費(fèi)者會(huì)將消費(fèi)偏移量重置為最新的可用偏移量,也就是只消費(fèi)最新發(fā)送的消息。 # none:如果找不到已保存的消費(fèi)偏移量,消費(fèi)者會(huì)拋出一個(gè)異常 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000 server: port: 8888spring: kafka: bootstrap-servers: 172.31.192.1:9092 producer: retries: 0 # 每次批量發(fā)送消息的數(shù)量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false ack-mode: manual_immediate consumer: group-id: test enable-auto-commit: false max-poll-records: 100 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000
-
生產(chǎn)者代碼示例
package io.github.vino42.publiser; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * ===================================================================================== * * @Created : 2023/8/30 21:29 * @Compiler : jdk 17 * @Author : VINO * @Copyright : VINO * @Decription : kafak 消息生產(chǎn)者 * ===================================================================================== */ @Component public class KafkaPublishService { @Autowired KafkaTemplate kafkaTemplate; /** * 這里為了簡單 直接發(fā)送json字符串 * * @param json */ public void send(String topic, String json) { kafkaTemplate.send(topic, json); } }
@RequestMapping("/send") public String send() { IntStream.range(0, 10000).forEach(d -> { kafkaPublishService.send("test", RandomUtil.randomString(16)); }); return "ok"; }
-
消費(fèi)者
@Component @Slf4j public class CustomKafkaListener { @org.springframework.kafka.annotation.KafkaListener(topics = "test") public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) { try { String key = String.valueOf(record.key()); String body = record.value(); log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body); log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body); } catch (Exception e) { e.printStackTrace(); } finally { //手動(dòng)ack acknowledgment.acknowledge(); } } }
-
SpringBoot Learning系列 是筆者總結(jié)整理的一個(gè)SpringBoot學(xué)習(xí)集合??梢哉f算是一個(gè)SpringBoot學(xué)習(xí)的大集合。歡迎Star關(guān)注。謝謝觀看。文章來源:http://www.zghlxwxcb.cn/news/detail-706486.html
關(guān)注公眾號不迷路文章來源地址http://www.zghlxwxcb.cn/news/detail-706486.html
到了這里,關(guān)于SpringBoot-Learning系列之Kafka整合的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!