国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者

這篇具有很好參考價(jià)值的文章主要介紹了【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

??博主介紹: 博主從事應(yīng)用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗(yàn),5年面試官經(jīng)驗(yàn),Java技術(shù)專家,WEB架構(gòu)師,阿里云專家博主,華為云云享專家,51CTO TOP紅人

Java知識圖譜點(diǎn)擊鏈接:體系化學(xué)習(xí)Java(Java面試專題)

???? 感興趣的同學(xué)可以收藏關(guān)注下 ,不然下次找不到喲????

?? 感覺對你有幫助的朋友,可以給博主一個(gè)三連,非常感謝 ??????

【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者,中間件,java,kafka,生產(chǎn)者,項(xiàng)目實(shí)戰(zhàn),原力計(jì)劃

1、什么是 Kafka 生產(chǎn)者

【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者,中間件,java,kafka,生產(chǎn)者,項(xiàng)目實(shí)戰(zhàn),原力計(jì)劃

Kafka 生產(chǎn)者是指使用 Apache Kafka 的應(yīng)用程序,用于向 Kafka 集群發(fā)送消息。生產(chǎn)者將消息發(fā)布到 Kafka 主題(topic),然后消費(fèi)者可以從該主題訂閱并接收這些消息。Kafka 生產(chǎn)者是實(shí)現(xiàn)消息發(fā)布的一方,可以是任何編程語言中的應(yīng)用程序。

2、Java 如何使用 Kafka 生產(chǎn)者

  1. 首先,在Java項(xiàng)目中添加Kafka客戶端依賴項(xiàng)。您可以在構(gòu)建工具(如Maven或Gradle)中添加以下依賴項(xiàng):
<dependency>
 	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>
  1. 創(chuàng)建Kafka生產(chǎn)者配置。您需要指定Kafka集群的地址和端口等配置信息。以下是一個(gè)示例配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器

Producer<String, String> producer;
try {
	producer = new KafkaProducer<>(props);
	
	String topic = "your-topic-name";
	String key = "your-message-key";
	String value = "your-message-value";
	ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
	producer.send(record);
} catch (Exception ex) {
} finally {
	try {
		producer.close();
	} catch (Exception ex) {
	}
}

但是在 SpringBoot 的項(xiàng)目中我們會使用 KafkaTemplate 去實(shí)現(xiàn)生產(chǎn)消息的發(fā)送。

3、SpringBoot 如何使用 Kafka 生產(chǎn)者

都需添加以下依賴項(xiàng):

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>		
</dependency>

3.1、方式一:代碼

@Configuration
public class KafkaProducerConfig {
	/**
	 * kafka 地址
	 */
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

使用如下:

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
}

3.2、方式二:配置文件

可以 application.properties: 加上:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer=org.apache.kafka.common.serialization.StringSerializer

或者 yml 里面加上

spring:
  kafka:
    bootstrap-servers: localhost:9092
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

直接使用

@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
}

以上也只是一個(gè)簡單的實(shí)例,后面我們根據(jù) 【項(xiàng)目實(shí)戰(zhàn)】手把手教你搭建前后端分離項(xiàng)目 SpringBoot + Vue + Element UI + Mysql, 在這個(gè)教程的基礎(chǔ)上,我們寫如何實(shí)戰(zhàn)。

4、Kafka Properties 的詳細(xì)講解

以下是所有參數(shù)的詳細(xì)解釋:

  1. bootstrap.servers :生產(chǎn)者用于與Kafka集群建立初始連接的主機(jī)和端口列表。
  2. acks :生產(chǎn)者要求leader在認(rèn)為請求完成之前接收的確認(rèn)數(shù)??赡艿闹涤校?/li>
  • 0 :生產(chǎn)者不等待任何確認(rèn)。
  • 1 :生產(chǎn)者等待leader確認(rèn)請求。
  • all :生產(chǎn)者等待所有同步副本確認(rèn)請求。
  1. retries :在放棄之前,生產(chǎn)者將重試發(fā)送失敗的消息的次數(shù)。設(shè)置大于0的值以啟用重試。
  2. batch.size :生產(chǎn)者嘗試發(fā)送到Kafka代理的批次的大?。ㄒ宰止?jié)為單位)。較大的批次大小可以提高吞吐量,但會增加消息傳遞的延遲。
  3. linger.ms :生產(chǎn)者在將批次發(fā)送到Kafka代理之前等待更多消息累積的時(shí)間(以毫秒為單位)。這有助于批處理,減少發(fā)送到代理的請求數(shù)量。
  4. buffer.memory :生產(chǎn)者用于緩沖等待發(fā)送到Kafka代理的消息的總內(nèi)存量。
  5. key.serializer :用于將鍵對象序列化為字節(jié)的類。常見的選項(xiàng)是 StringSerializerByteArraySerializer 。
  6. value.serializer :用于將值對象序列化為字節(jié)的類。常見的選項(xiàng)是 StringSerializerByteArraySerializer 。
  7. compression.type :用于消息的壓縮類型。支持的值有 none 、 gzipsnappylz4 。壓縮可以減少網(wǎng)絡(luò)帶寬和存儲要求。
  8. max.in.flight.requests.per.connection :在阻塞之前,生產(chǎn)者可以有的未確認(rèn)請求的最大數(shù)量。將此值設(shè)置為較高的值可以增加吞吐量,但也會增加用于緩沖的內(nèi)存。
  9. request.timeout.ms :生產(chǎn)者在考慮請求失敗之前,從Kafka代理等待響應(yīng)的最長時(shí)間(以毫秒為單位)。
  10. max.block.ms :當(dāng)緩沖區(qū)已滿或元數(shù)據(jù)不可用時(shí),生產(chǎn)者在 send() 方法中阻塞的最長時(shí)間(以毫秒為單位)。

以上這些是Kafka生產(chǎn)者配置中常用的一些屬性,使用方法如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器

5、Spring-Kafka Yml 配置參數(shù)

spring:
  kafka:
    bootstrap-servers: <bootstrap-servers>
    producer:
      key-serializer: <key-serializer>
      value-serializer: <value-serializer>
      retries: <retries>
      batch-size: <batch-size>
      linger-ms: <linger-ms>
      buffer-memory: <buffer-memory>
      compression-type: <compression-type>
    consumer:
      group-id: <group-id>
      key-deserializer: <key-deserializer>
      value-deserializer: <value-deserializer>
      auto-offset-reset: <auto-offset-reset>
      enable-auto-commit: <enable-auto-commit>
      max-poll-records: <max-poll-records>

以下是每個(gè)參數(shù)的解釋:

  • bootstrap-servers :Kafka broker地址的逗號分隔列表。
  • producer.key-serializer :用于將鍵對象序列化為字節(jié)的類。
  • producer.value-serializer :用于將值對象序列化為字節(jié)的類。
  • producer.retries :在放棄之前,生產(chǎn)者將重試發(fā)送失敗的消息的次數(shù)。
  • producer.batch-size :生產(chǎn)者將嘗試發(fā)送到Kafka broker的批次的大小(以字節(jié)為單位)。
  • producer.linger-ms :生產(chǎn)者在將批次發(fā)送到Kafka broker之前等待更多消息累積的時(shí)間(以毫秒為單位)。
  • producer.buffer-memory :生產(chǎn)者用于緩沖等待發(fā)送到Kafka broker的消息的總內(nèi)存量。
  • producer.compression-type :消息的壓縮類型。
  • consumer.group-id :消費(fèi)者組ID。
  • consumer.key-deserializer :用于將鍵對象從字節(jié)反序列化的類。
  • consumer.value-deserializer :用于將值對象從字節(jié)反序列化的類。
  • consumer.auto-offset-reset :當(dāng)Kafka中沒有初始偏移量或當(dāng)前偏移量不再存在時(shí),使用的策略。
  • consumer.enable-auto-commit :消費(fèi)者的偏移量是否應(yīng)自動(dòng)提交。
  • consumer.max-poll-records :消費(fèi)者在一次輪詢中最多獲取的記錄數(shù)。

6、Kafka 生產(chǎn)者異步回調(diào)方式生產(chǎn)消息

6.1、什么是異步回調(diào)

什么是異步回調(diào)要搞清楚,異步回調(diào)指的是我發(fā)送完成了,我就不管了,我不需要等你的返回。具體的定義如下:

異步回調(diào)是一種編程模式,用于處理異步操作的結(jié)果。在異步回調(diào)中,當(dāng)一個(gè)操作被觸發(fā)時(shí),程序不會立即阻塞等待結(jié)果,而是繼續(xù)執(zhí)行其他任務(wù)。當(dāng)操作完成后,系統(tǒng)會調(diào)用預(yù)先定義的回調(diào)函數(shù)來處理操作的結(jié)果。

異步回調(diào)常用于處理需要等待時(shí)間較長的操作,例如網(wǎng)絡(luò)請求、數(shù)據(jù)庫查詢等。通過使用異步回調(diào),可以提高系統(tǒng)的響應(yīng)性能和并發(fā)處理能力,避免阻塞和等待的情況。

在異步回調(diào)中,通常將回調(diào)函數(shù)作為參數(shù)傳遞給異步操作的方法。當(dāng)操作完成后,系統(tǒng)會調(diào)用回調(diào)函數(shù),并將操作的結(jié)果作為參數(shù)傳遞給回調(diào)函數(shù),以便進(jìn)行后續(xù)處理。

異步回調(diào)在編寫異步代碼時(shí)非常有用,可以幫助開發(fā)人員處理異步操作的結(jié)果,而無需顯式地等待操作完成。這種方式可以提高系統(tǒng)的性能和可伸縮性,同時(shí)保持代碼的簡潔性和可讀性。

6.2、匿名內(nèi)部類的方式做異步回調(diào)

public class KafkaProducerExample{
     
   private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String message = "Hello, Kafka! This is message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 匿名內(nèi)部類的方式做異步回調(diào)
                }
            });
        }

        producer.close();
    }
}

6.3、 KafkaTemplate 的異步回調(diào)

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

/**
 *
 * @description:  生產(chǎn)者
 * @copyright: @Copyright (c) 2022
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0
 * @createTime: 2023-06-26 18:10
 */
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

        future.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(Object o) {
            }

            @Override
            public void onFailure(Throwable ex) {
                // Handle failure callback
                System.err.println("Failed to send message: " + ex.getMessage());
            }
        });
    }

}

???? 本文由激流原創(chuàng),原創(chuàng)不易,希望大家關(guān)注、點(diǎn)贊、收藏,給博主一點(diǎn)鼓勵(lì),感謝?。。?/strong>
??????????????????????????????????????????????????????????????
【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者,中間件,java,kafka,生產(chǎn)者,項(xiàng)目實(shí)戰(zhàn),原力計(jì)劃文章來源地址http://www.zghlxwxcb.cn/news/detail-570015.html

到了這里,關(guān)于【項(xiàng)目實(shí)戰(zhàn)】Java 開發(fā) Kafka 生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。 acks :Broker對消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會等待Broker的Leader副本確認(rèn)

    2024年02月16日
    瀏覽(35)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過程 初始化過程中會新建很多對象,本文暫先分享部分對象 1.分區(qū)器---Partitioner partitioner 2.重試時(shí)間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(36)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • (三)Kafka 生產(chǎn)者

    (三)Kafka 生產(chǎn)者

    創(chuàng)建一個(gè) ProducerRecord 對象,需要包含目標(biāo)主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時(shí)間戳或標(biāo)頭。 在發(fā)送 ProducerRecord 對象時(shí),生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基

    2024年02月09日
    瀏覽(20)
  • kafka學(xué)習(xí)-生產(chǎn)者

    kafka學(xué)習(xí)-生產(chǎn)者

    目錄 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 自定義序列化器 4、分區(qū)器 默認(rèn)分區(qū)規(guī)則 自定義分區(qū)器 5、生產(chǎn)者攔截器 作用 自定義攔截器 6、生產(chǎn)者原理解析 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進(jìn)行發(fā)送。

    2024年02月09日
    瀏覽(26)
  • Kafka-生產(chǎn)者

    Kafka-生產(chǎn)者

    Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。 Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。 在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動(dòng),提高開發(fā)效率,也

    2024年01月20日
    瀏覽(22)
  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個(gè)分區(qū)對應(yīng)一

    2024年02月12日
    瀏覽(21)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊(duì)列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場景 傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(26)
  • 「Kafka」生產(chǎn)者篇

    「Kafka」生產(chǎn)者篇

    在消息發(fā)送的過程中,涉及到了 兩個(gè)線程 —— main 線程 和 Sender 線程 。 在 main 線程中創(chuàng)建了 一個(gè) 雙端隊(duì)列 RecordAccumulator 。 main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。 main線程創(chuàng)建 Producer 對象,調(diào)用 send 函數(shù)發(fā)送消息,

    2024年01月19日
    瀏覽(23)
  • Kafka 生產(chǎn)者

    Kafka 生產(chǎn)者

    目錄 一、kafka生產(chǎn)者原理 二、kafka異步發(fā)送 配置kafka 創(chuàng)建對象,發(fā)送數(shù)據(jù) 帶回調(diào)函數(shù)的異步發(fā)送 同步發(fā)送 ? 三、kafka生產(chǎn)者分區(qū) 分區(qū)策略 指定分區(qū): ?指定key: 什么都不指定: 自定義分區(qū)器 四、生產(chǎn)者提高吞吐量 五、數(shù)據(jù)的可靠性 ACK應(yīng)答級別 數(shù)據(jù)完全可靠條件 可靠性

    2023年04月15日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包