国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

三、Kafka生產(chǎn)者

這篇具有很好參考價值的文章主要介紹了三、Kafka生產(chǎn)者。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

3.1 生產(chǎn)者消息發(fā)送流程

1 發(fā)送原理

在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker
【RecordAccumulator緩沖的結(jié)構(gòu):每一個分區(qū)對應(yīng)一個雙端隊(duì)列deque,存放的單元是ProducerBatch,一個Batch中存放了多個Record,那么存消息是自動放到尾端,而讀取消息(發(fā)送線程讀?。┦菑念^部開始的,目的是讓發(fā)送的消息更加緊湊,節(jié)約空間,提高效率。注意這個大的緩沖池,默認(rèn)是32M,如果超出了會阻塞send()方法,可以設(shè)置參數(shù)來調(diào)節(jié)這個大小。】

三、Kafka生產(chǎn)者,Kafka,kafka



3.2 異步發(fā)送 API

public class CustomProducer {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定對應(yīng)的key和value的序列化類型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}

帶回調(diào)函數(shù)的異步發(fā)送

回調(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,為異步調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發(fā)送成功,如果 Exception 不為 null,說明消息發(fā)送失敗。

注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定對應(yīng)的key和value的序列化類型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主題: "+metadata.topic() + " 分區(qū): "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}



3.3 同步發(fā)送數(shù)據(jù)

只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。

public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定對應(yīng)的key和value的序列化類型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}

3.4 生產(chǎn)者分區(qū)

1 kafka分區(qū)的好處

  • 便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果
  • 提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。

2 生產(chǎn)者發(fā)送消息的分區(qū)策略

  • 指定分區(qū)
  • 沒有指定分區(qū),但是傳入了key值
  • 既沒有指定分區(qū),也沒有傳入key值
    三、Kafka生產(chǎn)者,Kafka,kafka

3 自定義分區(qū)器

1)需求:
例如我們實(shí)現(xiàn)一個分區(qū)器實(shí)現(xiàn),發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號分區(qū),不包含 atguigu,就發(fā)往 1 號分區(qū)

2)定義類實(shí)現(xiàn) Partitioner 接口,重寫 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 獲取數(shù)據(jù) atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("atguigu")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3)使用分區(qū)器的方法,在生產(chǎn)者的配置中添加分區(qū)器參數(shù)

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");

        // 指定對應(yīng)的key和value的序列化類型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 關(guān)聯(lián)自定義分區(qū)器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null) {
                        System.out.println("主題: " + metadata.topic() + " 分區(qū): " + metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}

三、Kafka生產(chǎn)者,Kafka,kafka



3.5 生產(chǎn)者如何提高吞吐量

  • 分批次發(fā)送消息
  • 對生產(chǎn)者消息采用壓縮

四個重要參數(shù):
三、Kafka生產(chǎn)者,Kafka,kafka

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 連接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 緩沖區(qū)大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 壓縮
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 創(chuàng)建生產(chǎn)者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}


3.6 數(shù)據(jù)可靠性

1、ACK應(yīng)答級別

  • 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答
  • 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
  • -1 :生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader+和isr隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1和all等價。
    三、Kafka生產(chǎn)者,Kafka,kafka



ACK=-1時存在的問題?
Leader收到數(shù)據(jù),所有Follower都開始同步數(shù)據(jù),但有一個Follower,因?yàn)槟撤N故障,遲遲不能與Leader進(jìn)行同步,那這個問題怎么解決呢?

Leader維護(hù)了一個動態(tài)的in-sync replica set(ISR),意為和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。

如果Follower長時間未向Leader發(fā)送通信請求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn)30s

數(shù)據(jù)完全可靠條件 = ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2

在生產(chǎn)環(huán)境中,acks=0很少使用;acks=1,一般用于傳輸普通日志,允許丟個別數(shù)據(jù);acks=-1,一般用于傳輸和錢相關(guān)的數(shù)據(jù)對可靠性要求比較高的場景,



數(shù)據(jù)重復(fù)性問題
三、Kafka生產(chǎn)者,Kafka,kafka

2、代碼演示

// acks
properties.put(ProducerConfig.ACKS_CONFIG,“1”);
// 重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,3);

public class CustomProducerAcks {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定對應(yīng)的key和value的序列化類型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        // 重試次數(shù)
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 發(fā)送數(shù)據(jù)
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 關(guān)閉資源
        kafkaProducer.close();
    }
}



3.7 數(shù)據(jù)去重

1、三個概念

保證消息至少被消費(fèi)一次 :ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2

保證消息最多被消費(fèi)一次: ACK級別設(shè)置為0

保證消息只能精確消費(fèi)一次: (ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2) + 消息消費(fèi)的冪等性

Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性和事務(wù)。



2 冪等性

冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會持久化一條,保證了不重復(fù)。

如何使用冪等性
開啟參數(shù) enable.idempotence 默認(rèn)為 true,false 關(guān)閉



3 生產(chǎn)者事務(wù)

1) kafka的事務(wù)原理
這里講的不好,待會找另外的視頻看一下

2)kafka事務(wù)的api
Kafka 的事務(wù)一共有如下 5 個 API


        // 1 初始化事務(wù)
        void initTransactions();
        // 2 開啟事務(wù)
        void beginTransaction() throws ProducerFencedException;
        // 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
        void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets,String consumerGroupId) throws ProducerFencedException;
        // 4 提交事務(wù)
        void commitTransaction() throws ProducerFencedException;
        // 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
        void abortTransaction() throws ProducerFencedException;

事務(wù)使用的demo

public class CustomProducerTranactions {

    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();
        // 連接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");
        // 指定對應(yīng)的key和value的序列化類型 key.serializer
//      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事務(wù)id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 創(chuàng)建kafka生產(chǎn)者對象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2 發(fā)送數(shù)據(jù)
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }
            int i = 1 / 0;
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.abortTransaction();
        } finally {
            // 3 關(guān)閉資源
            kafkaProducer.close();
        }
    }
}

3.8 數(shù)據(jù)有序性

三、Kafka生產(chǎn)者,Kafka,kafka

3.9 數(shù)據(jù)亂序

三、Kafka生產(chǎn)者,Kafka,kafka文章來源地址http://www.zghlxwxcb.cn/news/detail-660738.html

到了這里,關(guān)于三、Kafka生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka 生產(chǎn)者

    Kafka 生產(chǎn)者

    目錄 一、kafka生產(chǎn)者原理 二、kafka異步發(fā)送 配置kafka 創(chuàng)建對象,發(fā)送數(shù)據(jù) 帶回調(diào)函數(shù)的異步發(fā)送 同步發(fā)送 ? 三、kafka生產(chǎn)者分區(qū) 分區(qū)策略 指定分區(qū): ?指定key: 什么都不指定: 自定義分區(qū)器 四、生產(chǎn)者提高吞吐量 五、數(shù)據(jù)的可靠性 ACK應(yīng)答級別 數(shù)據(jù)完全可靠條件 可靠性

    2023年04月15日
    瀏覽(24)
  • kafka學(xué)習(xí)-生產(chǎn)者

    kafka學(xué)習(xí)-生產(chǎn)者

    目錄 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 自定義序列化器 4、分區(qū)器 默認(rèn)分區(qū)規(guī)則 自定義分區(qū)器 5、生產(chǎn)者攔截器 作用 自定義攔截器 6、生產(chǎn)者原理解析 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進(jìn)行發(fā)送。

    2024年02月09日
    瀏覽(26)
  • Kafka生產(chǎn)者

    1.acks 如果acks=0,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。 缺點(diǎn):如果當(dāng)中出現(xiàn)了問題,導(dǎo)致服務(wù)器沒有收到消息,那么生產(chǎn)者就無從得知,消息就丟失了 優(yōu)點(diǎn):因?yàn)樯a(chǎn)者不需要等待服務(wù)器的響應(yīng),所有他可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而

    2024年01月19日
    瀏覽(33)
  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個分區(qū)對應(yīng)一

    2024年02月12日
    瀏覽(17)
  • 「Kafka」生產(chǎn)者篇

    「Kafka」生產(chǎn)者篇

    在消息發(fā)送的過程中,涉及到了 兩個線程 —— main 線程 和 Sender 線程 。 在 main 線程中創(chuàng)建了 一個 雙端隊(duì)列 RecordAccumulator 。 main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。 main線程創(chuàng)建 Producer 對象,調(diào)用 send 函數(shù)發(fā)送消息,

    2024年01月19日
    瀏覽(21)
  • Kafka-生產(chǎn)者

    Kafka-生產(chǎn)者

    Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。 Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。 在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動,提高開發(fā)效率,也

    2024年01月20日
    瀏覽(22)
  • (三)Kafka 生產(chǎn)者

    (三)Kafka 生產(chǎn)者

    創(chuàng)建一個 ProducerRecord 對象,需要包含目標(biāo)主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時間戳或標(biāo)頭。 在發(fā)送 ProducerRecord 對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基

    2024年02月09日
    瀏覽(18)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊(duì)列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場景 傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(24)
  • 【Kafka】高級特性:生產(chǎn)者

    【Kafka】高級特性:生產(chǎn)者

    整個流程如下: Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設(shè)置為守護(hù)線程。 生產(chǎn)消息時,內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過攔截器-序列化器-分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建)。 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者

    2024年01月24日
    瀏覽(23)
  • Kafka生產(chǎn)者相關(guān)概念

    Kafka生產(chǎn)者相關(guān)概念

    Kafka中消息是以topic進(jìn)行分類的,Producer生產(chǎn)消息,Consumer消費(fèi)消息,都是面向topic的。 Topic是邏輯上的概念,Partition是物理上的概念,每個Partition對應(yīng)著一個log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。 寫入方式 producer采用推(push)模式將消息發(fā)布到broker,每條消息都

    2024年04月13日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包