系列文章目錄
第一章 Kafka 配置部署及SASL_PLAINTEXT安全認(rèn)證
第二章??Spring Boot 整合 Kafka消息隊(duì)列?生產(chǎn)者
第三章??Spring Boot 整合 Kafka消息隊(duì)列?消息者
前言
????????Kafka 是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。本文主是基于Spirng Boot封裝了Apache 的Kafka-client,用于在Spring Boot 項(xiàng)目里快速集成kafka。
一、Kafka 是什么?
Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。
它最初由LinkedIn公司開(kāi)發(fā),之后成為Apache項(xiàng)目的一部分。
Kafka是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。
二、生產(chǎn)者
1.引入庫(kù)
引入需要依賴的jar包,引入POM文件
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2.配置文件
spring:
custom:
kafka:
username: admin
password: admin-secret
partitions: 1
enable-auto-commit: false
batch-listener: false
bootstrap-servers:
- 192.168.1.95:9092
3.端啟動(dòng)類
啟動(dòng)類名 EnableAutoKafka
package com.cdkjframework.kafka.producer.annotation;
import com.cdkjframework.kafka.producer.config.KafkaMarkerConfiguration;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.annotation
* @ClassName: EnableAutoKafka
* @Description: Kafka自動(dòng)啟動(dòng)類
* @Author: xiaLin
* @Date: 2023/7/18 9:20
* @Version: 1.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({KafkaMarkerConfiguration.class})
public @interface EnableAutoKafka {
}
4.spring.factories配置文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.cdkjframework.kafka.producer.config.KafkaAutoConfiguration
5.配置類
5.1?kafka?配置 KafkaConfig
package com.cdkjframework.kafka.producer.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.config;
* @ClassName: KafakConfig
* @Description: Kafak 配置
* @Author: xiaLin
* @Version: 1.0
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.custom.kafka")
public class KafkaConfig {
/**
* 服務(wù)列表
*/
private List<String> bootstrapServers;
/**
* 主題
*/
private List<String> topics;
/**
* 賬號(hào)
*/
private String username;
/**
* 密碼
*/
private String password;
/**
* 延遲為1毫秒
*/
private Integer linger = 1;
/**
* 批量大小
*/
private Integer batchSize = 16384;
/**
* 重試次數(shù),0為不啟用重試機(jī)制
*/
private Integer retries = 0;
/**
* 人鎖
*/
private Integer maxBlock = 6000;
/**
* acks
*/
private String acks = "1";
/**
* security.providers
*/
private String securityProviders;
/**
* 啟用自動(dòng)提交
*/
private boolean enableAutoCommit = true;
/**
* 會(huì)話超時(shí)
*/
private String sessionTimeout = "5000";
/**
* 會(huì)話超時(shí)
*/
private Integer maxPollInterval = 10000;
/**
* 組ID
*/
private String groupId = "defaultGroup";
/**
* 最大投票記錄
*/
private Integer maxPollRecords = 1;
/**
* 并發(fā)性
*/
private Integer concurrency = 3;
/**
* 拉取超時(shí)時(shí)間
*/
private Integer pollTimeout = 60000;
/**
* 批量監(jiān)聽(tīng)
*/
private boolean batchListener = false;
/**
* 副本數(shù)量
*/
private String sort = "1";
/**
* 分區(qū)數(shù)
*/
private Integer partitions = 3;
/**
* 消費(fèi)者默認(rèn)支持解壓
*/
private String compressionType = "none";
/**
* offset偏移量規(guī)則設(shè)置
*/
private String autoOffsetReset = "earliest";
/**
* 自動(dòng)提交的頻率
*/
private Integer autoCommitInterval = 100;
/**
* 生產(chǎn)者可以使用的總內(nèi)存字節(jié)來(lái)緩沖等待發(fā)送到服務(wù)器的記錄
*/
private Integer bufferMemory = 33554432;
/**
* 消息的最大大小限制
*/
private Integer maxRequestSize = 1048576;
}
5.2?kafka 自動(dòng)配置 KafkaAutoConfiguration
package com.cdkjframework.kafka.producer.config;
import com.cdkjframework.kafka.producer.ProducerConfiguration;
import com.cdkjframework.kafka.producer.util.ProducerUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.config
* @ClassName: KafkaAutoConfiguration
* @Description: kafka 自動(dòng)配置
* @Author: xiaLin
* @Date: 2023/7/18 9:21
* @Version: 1.0
*/
@Lazy(false)
@RequiredArgsConstructor
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ImportAutoConfiguration(ProducerConfiguration.class)
@ConditionalOnBean(KafkaMarkerConfiguration.Marker.class)
public class KafkaAutoConfiguration {
/**
* 讀取配置文件
*/
private final KafkaConfig kafkaConfig;
/**
* kafka topic 啟動(dòng)觸發(fā)器
*
* @return 返回結(jié)果
*/
@Bean(initMethod = "kafkaAdmin")
@ConditionalOnMissingBean
public TopicConfig kafkaTopic() {
TopicConfig trigger = new TopicConfig(kafkaConfig);
return trigger;
}
/**
* kafka 配置 啟動(dòng)觸發(fā)器
*
* @return 返回結(jié)果
*/
@Bean(initMethod = "start")
@ConditionalOnMissingBean
public ProducerUtils Producer() {
return new ProducerUtils();
}
}
5.3?kafka 標(biāo)記配置 KafkaMarkerConfiguration
package com.cdkjframework.kafka.producer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.config
* @ClassName: KafkaMarkerConfiguration
* @Description: Kafka標(biāo)記配置
* @Author: xiaLin
* @Date: 2023/12/6 9:45
* @Version: 1.0
*/
@EnableKafka
@Configuration(proxyBeanMethods = false)
public class KafkaMarkerConfiguration {
@Bean
public Marker kafkaMarker() {
return new Marker();
}
public static class Marker {
}
}
5.4?topic配置?TopicConfig
package com.cdkjframework.kafka.producer.config;
import com.cdkjframework.constant.IntegerConsts;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.config
* @ClassName: TopicConfig
* @Description: topic配置
* @Author: xiaLin
* @Version: 1.0
*/
public class TopicConfig {
/**
* 配置
*/
private final KafkaConfig kafkaConfig;
/**
* 構(gòu)造函數(shù)
*/
public TopicConfig(KafkaConfig kafkaConfig) {
this.kafkaConfig = kafkaConfig;
}
/**
* 定義一個(gè)KafkaAdmin的bean,可以自動(dòng)檢測(cè)集群中是否存在topic,不存在則創(chuàng)建
*/
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>(IntegerConsts.ONE);
// 指定多個(gè)kafka集群多個(gè)地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
return new KafkaAdmin(configs);
}
}
6.生產(chǎn)者配置
生產(chǎn)者配置類?ProducerConfiguration
package com.cdkjframework.kafka.producer;
import com.cdkjframework.kafka.producer.config.KafkaConfig;
import com.cdkjframework.kafka.producer.util.ProducerUtils;
import com.cdkjframework.util.tool.StringUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer
* @ClassName: ProducerConfiguration
* @Description: 設(shè)置@Configuration、@EnableKafka兩個(gè)注解,聲明Config并且打開(kāi)KafkaTemplate能力。
* @Author: xiaLin
* @Version: 1.0
*/
@Configuration
@RequiredArgsConstructor
public class ProducerConfiguration {
/**
* 配置
*/
private final KafkaConfig kafkaConfig;
/**
* JAAS配置
*/
private String JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;";
/**
* Producer Template 配置
*/
@Bean(name = "kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer 工廠配置
*/
@Bean(name = "producerFactory")
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* Producer 參數(shù)配置
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多個(gè)kafka集群多個(gè)地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
// 重試次數(shù),0為不啟用重試機(jī)制
props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfig.getRetries());
//同步到副本, 默認(rèn)為1
// acks=0 把消息發(fā)送到kafka就認(rèn)為發(fā)送成功
// acks=1 把消息發(fā)送到kafka leader分區(qū),并且寫入磁盤就認(rèn)為發(fā)送成功
// acks=all 把消息發(fā)送到kafka leader分區(qū),并且leader分區(qū)的副本follower對(duì)消息進(jìn)行了同步就任務(wù)發(fā)送成功
props.put(ProducerConfig.ACKS_CONFIG, kafkaConfig.getAcks());
// 生產(chǎn)者空間不足時(shí),send()被阻塞的時(shí)間,默認(rèn)60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaConfig.getMaxBlock());
// security.providers
props.put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, kafkaConfig.getSecurityProviders());
// 控制批處理大小,單位為字節(jié)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaConfig.getBatchSize());
// 批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),從而提高并發(fā)量
props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaConfig.getLinger());
// 生產(chǎn)者可以使用的總內(nèi)存字節(jié)來(lái)緩沖等待發(fā)送到服務(wù)器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaConfig.getBufferMemory());
// 消息的最大大小限制,也就是說(shuō)send的消息大小不能超過(guò)這個(gè)限制, 默認(rèn)1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfig.getMaxRequestSize());
// 鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 壓縮消息,支持四種類型,分別為:none、lz4、gzip、snappy,默認(rèn)為none。
// 消費(fèi)者默認(rèn)支持解壓,所以壓縮設(shè)置在生產(chǎn)者,消費(fèi)者無(wú)需設(shè)置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfig.getCompressionType());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaConfig.getPartitions());
// 賬號(hào)密碼
if (StringUtils.isNotNullAndEmpty(kafkaConfig.getUsername()) &&
StringUtils.isNotNullAndEmpty(kafkaConfig.getPassword())) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
String SASL_MECHANISM = "PLAIN";
props.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG, kafkaConfig.getUsername(), kafkaConfig.getPassword()));
}
return props;
}
}
7.?生產(chǎn)者工具
生產(chǎn)者端?ProducerUtils
package com.cdkjframework.kafka.producer.util;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.util.log.LogUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.kafka.producer.util
* @ClassName: ProducerUtils
* @Description: 生產(chǎn)工具
* @Author: xiaLin
* @Version: 1.0
*/
public class ProducerUtils {
/**
* 日志
*/
private static LogUtils logUtils = LogUtils.getLogger(ProducerUtils.class);
/**
* 模板
*/
private static KafkaTemplate kafkaTemplate;
/**
* 數(shù)據(jù)模板
*/
@Resource(name = "kafkaTemplate")
private KafkaTemplate template;
/**
* 初始化工具
*/
private void start() {
kafkaTemplate = template;
}
/**
* producer 同步方式發(fā)送數(shù)據(jù)
*
* @param topic topic名稱
* @param message producer發(fā)送的數(shù)據(jù)
* @throws InterruptedException 異常信息
* @throws ExecutionException 異常信息
* @throws TimeoutException 異常信息
*/
public static void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, message).get(IntegerConsts.TEN, TimeUnit.SECONDS);
}
/**
* producer 異步方式發(fā)送數(shù)據(jù)
*
* @param topic topic名稱
* @param message producer發(fā)送的數(shù)據(jù)
*/
public static void sendMessageAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
logUtils.error("topic:" + topic + ",message:" + message);
logUtils.error(throwable, throwable.getMessage());
}
@Override
public void onSuccess(Object o) {
logUtils.info("topic:" + topic + ",發(fā)送成功");
}
});
}
/**
* producer 異步方式發(fā)送數(shù)據(jù)
*
* @param topic topic名稱
* @param key key值
* @param message producer發(fā)送的數(shù)據(jù)
*/
public static void sendMessageAsync(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
logUtils.error("topic:" + topic + ",message:" + message);
logUtils.error(throwable, throwable.getMessage());
}
@Override
public void onSuccess(Object o) {
logUtils.info("topic:" + topic + ",發(fā)送成功");
}
});
}
}
總結(jié)
例如:以上就是今天要講的內(nèi)容,本文僅僅簡(jiǎn)單介紹了 Spring Boot?集成消息生產(chǎn)者的封裝,消息者待續(xù)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-823847.html
相對(duì)應(yīng)的開(kāi)源項(xiàng)目歡迎訪問(wèn):維基框架文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-823847.html
到了這里,關(guān)于第二章 Spring Boot 整合 Kafka消息隊(duì)列 生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!