??博主介紹: 博主從事應(yīng)用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗(yàn),5年面試官經(jīng)驗(yàn),Java技術(shù)專家,WEB架構(gòu)師,阿里云專家博主,華為云云享專家,51CTO TOP紅人
Java知識圖譜點(diǎn)擊鏈接:體系化學(xué)習(xí)Java(Java面試專題)
???? 感興趣的同學(xué)可以收藏關(guān)注下 ,不然下次找不到喲????
?? 感覺對你有幫助的朋友,可以給博主一個(gè)三連,非常感謝 ??????
1、什么是 Kafka 生產(chǎn)者
Kafka 生產(chǎn)者是指使用 Apache Kafka 的應(yīng)用程序,用于向 Kafka 集群發(fā)送消息。生產(chǎn)者將消息發(fā)布到 Kafka 主題(topic),然后消費(fèi)者可以從該主題訂閱并接收這些消息。Kafka 生產(chǎn)者是實(shí)現(xiàn)消息發(fā)布的一方,可以是任何編程語言中的應(yīng)用程序。
2、Java 如何使用 Kafka 生產(chǎn)者
- 首先,在Java項(xiàng)目中添加Kafka客戶端依賴項(xiàng)。您可以在構(gòu)建工具(如Maven或Gradle)中添加以下依賴項(xiàng):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
- 創(chuàng)建Kafka生產(chǎn)者配置。您需要指定Kafka集群的地址和端口等配置信息。以下是一個(gè)示例配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
Producer<String, String> producer;
try {
producer = new KafkaProducer<>(props);
String topic = "your-topic-name";
String key = "your-message-key";
String value = "your-message-value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
} catch (Exception ex) {
} finally {
try {
producer.close();
} catch (Exception ex) {
}
}
但是在 SpringBoot 的項(xiàng)目中我們會使用 KafkaTemplate 去實(shí)現(xiàn)生產(chǎn)消息的發(fā)送。
3、SpringBoot 如何使用 Kafka 生產(chǎn)者
都需添加以下依賴項(xiàng):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
3.1、方式一:代碼
@Configuration
public class KafkaProducerConfig {
/**
* kafka 地址
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
使用如下:
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value);
}
}
3.2、方式二:配置文件
可以 application.properties: 加上:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer=org.apache.kafka.common.serialization.StringSerializer
或者 yml 里面加上
spring:
kafka:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
直接使用
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value);
}
}
以上也只是一個(gè)簡單的實(shí)例,后面我們根據(jù) 【項(xiàng)目實(shí)戰(zhàn)】手把手教你搭建前后端分離項(xiàng)目 SpringBoot + Vue + Element UI + Mysql, 在這個(gè)教程的基礎(chǔ)上,我們寫如何實(shí)戰(zhàn)。
4、Kafka Properties 的詳細(xì)講解
以下是所有參數(shù)的詳細(xì)解釋:
-
bootstrap.servers
:生產(chǎn)者用于與Kafka集群建立初始連接的主機(jī)和端口列表。 -
acks
:生產(chǎn)者要求leader在認(rèn)為請求完成之前接收的確認(rèn)數(shù)??赡艿闹涤校?/li>
-
0
:生產(chǎn)者不等待任何確認(rèn)。 -
1
:生產(chǎn)者等待leader確認(rèn)請求。 -
all
:生產(chǎn)者等待所有同步副本確認(rèn)請求。
-
retries
:在放棄之前,生產(chǎn)者將重試發(fā)送失敗的消息的次數(shù)。設(shè)置大于0的值以啟用重試。 -
batch.size
:生產(chǎn)者嘗試發(fā)送到Kafka代理的批次的大?。ㄒ宰止?jié)為單位)。較大的批次大小可以提高吞吐量,但會增加消息傳遞的延遲。 -
linger.ms
:生產(chǎn)者在將批次發(fā)送到Kafka代理之前等待更多消息累積的時(shí)間(以毫秒為單位)。這有助于批處理,減少發(fā)送到代理的請求數(shù)量。 -
buffer.memory
:生產(chǎn)者用于緩沖等待發(fā)送到Kafka代理的消息的總內(nèi)存量。 -
key.serializer
:用于將鍵對象序列化為字節(jié)的類。常見的選項(xiàng)是StringSerializer
或ByteArraySerializer
。 -
value.serializer
:用于將值對象序列化為字節(jié)的類。常見的選項(xiàng)是StringSerializer
或ByteArraySerializer
。 -
compression.type
:用于消息的壓縮類型。支持的值有none
、gzip
、snappy
或lz4
。壓縮可以減少網(wǎng)絡(luò)帶寬和存儲要求。 -
max.in.flight.requests.per.connection
:在阻塞之前,生產(chǎn)者可以有的未確認(rèn)請求的最大數(shù)量。將此值設(shè)置為較高的值可以增加吞吐量,但也會增加用于緩沖的內(nèi)存。 -
request.timeout.ms
:生產(chǎn)者在考慮請求失敗之前,從Kafka代理等待響應(yīng)的最長時(shí)間(以毫秒為單位)。 -
max.block.ms
:當(dāng)緩沖區(qū)已滿或元數(shù)據(jù)不可用時(shí),生產(chǎn)者在send()
方法中阻塞的最長時(shí)間(以毫秒為單位)。
以上這些是Kafka生產(chǎn)者配置中常用的一些屬性,使用方法如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
5、Spring-Kafka Yml 配置參數(shù)
spring:
kafka:
bootstrap-servers: <bootstrap-servers>
producer:
key-serializer: <key-serializer>
value-serializer: <value-serializer>
retries: <retries>
batch-size: <batch-size>
linger-ms: <linger-ms>
buffer-memory: <buffer-memory>
compression-type: <compression-type>
consumer:
group-id: <group-id>
key-deserializer: <key-deserializer>
value-deserializer: <value-deserializer>
auto-offset-reset: <auto-offset-reset>
enable-auto-commit: <enable-auto-commit>
max-poll-records: <max-poll-records>
以下是每個(gè)參數(shù)的解釋:
- bootstrap-servers :Kafka broker地址的逗號分隔列表。
- producer.key-serializer :用于將鍵對象序列化為字節(jié)的類。
- producer.value-serializer :用于將值對象序列化為字節(jié)的類。
- producer.retries :在放棄之前,生產(chǎn)者將重試發(fā)送失敗的消息的次數(shù)。
- producer.batch-size :生產(chǎn)者將嘗試發(fā)送到Kafka broker的批次的大小(以字節(jié)為單位)。
- producer.linger-ms :生產(chǎn)者在將批次發(fā)送到Kafka broker之前等待更多消息累積的時(shí)間(以毫秒為單位)。
- producer.buffer-memory :生產(chǎn)者用于緩沖等待發(fā)送到Kafka broker的消息的總內(nèi)存量。
- producer.compression-type :消息的壓縮類型。
- consumer.group-id :消費(fèi)者組ID。
- consumer.key-deserializer :用于將鍵對象從字節(jié)反序列化的類。
- consumer.value-deserializer :用于將值對象從字節(jié)反序列化的類。
- consumer.auto-offset-reset :當(dāng)Kafka中沒有初始偏移量或當(dāng)前偏移量不再存在時(shí),使用的策略。
- consumer.enable-auto-commit :消費(fèi)者的偏移量是否應(yīng)自動(dòng)提交。
- consumer.max-poll-records :消費(fèi)者在一次輪詢中最多獲取的記錄數(shù)。
6、Kafka 生產(chǎn)者異步回調(diào)方式生產(chǎn)消息
6.1、什么是異步回調(diào)
什么是異步回調(diào)要搞清楚,異步回調(diào)指的是我發(fā)送完成了,我就不管了,我不需要等你的返回。具體的定義如下:
異步回調(diào)是一種編程模式,用于處理異步操作的結(jié)果。在異步回調(diào)中,當(dāng)一個(gè)操作被觸發(fā)時(shí),程序不會立即阻塞等待結(jié)果,而是繼續(xù)執(zhí)行其他任務(wù)。當(dāng)操作完成后,系統(tǒng)會調(diào)用預(yù)先定義的回調(diào)函數(shù)來處理操作的結(jié)果。
異步回調(diào)常用于處理需要等待時(shí)間較長的操作,例如網(wǎng)絡(luò)請求、數(shù)據(jù)庫查詢等。通過使用異步回調(diào),可以提高系統(tǒng)的響應(yīng)性能和并發(fā)處理能力,避免阻塞和等待的情況。
在異步回調(diào)中,通常將回調(diào)函數(shù)作為參數(shù)傳遞給異步操作的方法。當(dāng)操作完成后,系統(tǒng)會調(diào)用回調(diào)函數(shù),并將操作的結(jié)果作為參數(shù)傳遞給回調(diào)函數(shù),以便進(jìn)行后續(xù)處理。
異步回調(diào)在編寫異步代碼時(shí)非常有用,可以幫助開發(fā)人員處理異步操作的結(jié)果,而無需顯式地等待操作完成。這種方式可以提高系統(tǒng)的性能和可伸縮性,同時(shí)保持代碼的簡潔性和可讀性。文章來源:http://www.zghlxwxcb.cn/news/detail-570015.html
6.2、匿名內(nèi)部類的方式做異步回調(diào)
public class KafkaProducerExample{
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String message = "Hello, Kafka! This is message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// 匿名內(nèi)部類的方式做異步回調(diào)
}
});
}
producer.close();
}
}
6.3、 KafkaTemplate 的異步回調(diào)
package com.pany.camp.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
*
* @description: 生產(chǎn)者
* @copyright: @Copyright (c) 2022
* @company: Aiocloud
* @author: pany
* @version: 1.0.0
* @createTime: 2023-06-26 18:10
*/
@Component
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(Object o) {
}
@Override
public void onFailure(Throwable ex) {
// Handle failure callback
System.err.println("Failed to send message: " + ex.getMessage());
}
});
}
}
???? 本文由激流原創(chuàng),原創(chuàng)不易,希望大家關(guān)注、點(diǎn)贊、收藏,給博主一點(diǎn)鼓勵(lì),感謝?。。?/strong>
??????????????????????????????????????????????????????????????文章來源地址http://www.zghlxwxcb.cn/news/detail-570015.html
到了這里,關(guān)于【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!