国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

這篇具有很好參考價(jià)值的文章主要介紹了Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1、kafka生產(chǎn)者

1.1 生產(chǎn)者消息發(fā)送流程
1.1.1 發(fā)送原理

在消息發(fā)生的過程中,設(shè)計(jì)到了兩個(gè)線程——main線程和Sender線程。在main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator。main線程將消息發(fā)給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka Broker。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

  • batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會(huì)發(fā)送數(shù)據(jù)。默認(rèn)16k
  • linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值數(shù)0ms,表示沒有延遲。

應(yīng)答acks:

  • 0:生產(chǎn)者發(fā)生過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。
  • 1:生產(chǎn)者發(fā)生過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
  • -1(all):生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1和all等價(jià)。
1.1.2 生產(chǎn)者重要參數(shù)列表
參數(shù)名稱 描述
bootstrap.servers 生產(chǎn)者連接集群所需的Broker地址清單。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以設(shè)置1個(gè)或多個(gè),中間用逗號(hào)隔開。注意這里并非需要所有broker地址,因?yàn)樯a(chǎn)者從給定的broker里查到其他broker信息
key.serializer和value.serializer 指定發(fā)生信息的key和value的序列化類型。一定要寫全類名
buffer.memory RecordAccumulator緩沖區(qū)總大小,默認(rèn)32MB
batch.size 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16K。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加
linger.ms 如果數(shù)據(jù)遲遲未到batch.size,sender等待linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小5-100ms之間
acks 0:生產(chǎn)者發(fā)生過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。1: 生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答。-1(all):生產(chǎn)者發(fā)給過來的數(shù)據(jù),Leader和isr隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1和all是等價(jià)的
max.in.flight.requests.per.connection 允許最多沒有返回ack的次數(shù),默認(rèn)為5,開啟冪等性包保證該值是1-5的數(shù)字
retries 當(dāng)消息發(fā)給出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試的次數(shù)。默認(rèn)是int的最大值,2147483647.如果設(shè)置了重試,還想抱著消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了
retry.backoff.ms 兩次重試之間的時(shí)間間隔,默認(rèn)是 100ms。
enable.idempotence 是否開啟冪等性,默認(rèn) true,開啟冪等性。
compression.type 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。
1.2 異步發(fā)送API
1.2.1 普通異步發(fā)送

1、需求:創(chuàng)建 Kafka 生產(chǎn)者,采用異步的方式發(fā)送到 Kafka Broker
2、代碼編寫
(1)創(chuàng)建工程(KafkaDemo)
(2)導(dǎo)入依賴

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

(3)創(chuàng)建包名org.zhm.producer
(4)編寫不帶回調(diào)函數(shù)的API代碼

package org.zhm.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @ClassName CustomProducer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:35
 * @Version 1.0
 */
public class CustomProducer {
    public static void main(String[] args) {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties=new Properties();

        //2、給kafka配置對(duì)象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //key,value序列化(必須):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //3、創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);

        //4、調(diào)用send()方法,發(fā)生消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","zhm"+i));
        }

        //5、關(guān)閉資源
        kafkaProducer.close();
    }
}


(5)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.2.2 帶回調(diào)函數(shù)的異步發(fā)送

回調(diào)函數(shù)會(huì)在Producer收到ack時(shí)調(diào)用,為異步調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息·(Exception),如果Exception為null,說明消息發(fā)生成功,如果Exception不為null,說明消息發(fā)送失敗。

注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。

package org.zhm.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @ClassName CustoProducerCallback
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:44
 * @Version 1.0
 */
public class CustoProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties=new Properties();

        //2、給kafka配置對(duì)象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //key、value序列化(必須)
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //3、創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);

        //4、調(diào)用send()方法 發(fā)送信息
        for (int i = 0; i < 6; i++) {
            //添加回調(diào)
            producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
                //該方法在Producer收到ack時(shí)調(diào)用,為異步調(diào)用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        //沒有異常,輸出信息到控制臺(tái)
                        System.out.println("主題:"+recordMetadata.topic()+"->"+"分區(qū):"
                                +recordMetadata.partition());

                    }
                    else {
                        //出現(xiàn)異常打印
                        e.printStackTrace();
                    }
                }
            });

            //延遲一會(huì)會(huì)看到數(shù)據(jù)發(fā)往不同分區(qū)
            Thread.sleep(20);

        }

        //5、關(guān)閉資源
        producer.close();
    }
}


1、測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.3 同步發(fā)送API

只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。

package org.zhm.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @ClassName CustomProducerSync
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:58
 * @Version 1.0
 */
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties=new Properties();

        //2、給kafka配置對(duì)象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //key、value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //3、創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);

        //4、調(diào)用send方法,發(fā)送信息
        for (int i = 0; i < 10; i++) {
            //異步發(fā)送 默認(rèn)
//            producer.send(new ProducerRecord<>("first","zhm"+i));
            //同步發(fā)送
            producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get();

        }

        //5、關(guān)閉資源
        producer.close();
    }
}


1、測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。

Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.4 生產(chǎn)者分區(qū)
1.4.1 分區(qū)好處

1、便于合理使用儲(chǔ)存資源,每個(gè)Partition在一個(gè)Broker上儲(chǔ)存,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)儲(chǔ)存在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.4.2 生產(chǎn)者發(fā)生消息的分區(qū)

1、默認(rèn)分區(qū)器DefaultPartitioner
(1)指明partition的情況下,直接將指明的值作為partition值;例如partition=0,所有數(shù)據(jù)寫入分區(qū)0。
(2)沒有指明partition值但有key的情況下,將key的hash值與topic的partition數(shù)進(jìn)行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition數(shù)=2,那么key1 對(duì)應(yīng)的value1寫入1號(hào)分區(qū),key2對(duì)應(yīng)的value2寫入0號(hào)分區(qū)。
(3)既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區(qū)器),會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直
使用該分區(qū),待該分區(qū)的batch已滿或者已完成,Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)行使用(和上一次的分區(qū)不同)。
例如:第一次隨機(jī)選擇0號(hào)分區(qū),等0號(hào)分區(qū)當(dāng)前批次滿了(默認(rèn)16k)或者linger.ms設(shè)置的時(shí)間到, Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)
行使用(如果還是0會(huì)繼續(xù)隨機(jī))。
2、案例一
將數(shù)據(jù)發(fā)往指定 partition 的情況

package org.zhm.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @ClassName CustomProducerCallbackPartitions
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:10
 * @Version 1.0
 */
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties=new Properties();

        //2、給kafka配置對(duì)象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //鍵值序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //3、創(chuàng)建生產(chǎn)者對(duì)象
        KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties);

        //4、調(diào)用send方法,發(fā)送信息
        for (int i = 0; i < 5; i++) {
            //指定數(shù)據(jù)發(fā)送到1號(hào)分區(qū),key1為空
            producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("主題:"+recordMetadata.topic()+"->"+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        //5、關(guān)閉資源
        producer.close();
    }
}


(1)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
3、案例二
沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值。

package org.zhm.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @ClassName CustomProducerCallback1
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:21
 * @Version 1.0
 */
public class CustomProducerCallback1 {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        KafkaProducer<String,String> kafkaProducer=new KafkaProducer(properties);

        for (int i = 0; i < 5; i++) {
            //依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
            kafkaProducer.send(new ProducerRecord<>("first", "a", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("當(dāng)key為a時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });

        }
        for (int i = 0; i < 5; i++) {
            //依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
            kafkaProducer.send(new ProducerRecord<>("first", "b", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("當(dāng)key為b時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });

        }
        for (int i = 0; i < 5; i++) {
            //依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
            kafkaProducer.send(new ProducerRecord<>("first", "f", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("當(dāng)key為f時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });

        }
        kafkaProducer.close();
    }
}


(1)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.4.3 自定義分區(qū)器

如果研發(fā)人員可以根據(jù)企業(yè)需求,自己重新實(shí)現(xiàn)分區(qū)器
1、例如我們實(shí)現(xiàn)一個(gè)分區(qū)器實(shí)現(xiàn),發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)。
2、案例實(shí)現(xiàn)
(1)定義類實(shí)現(xiàn) Partitioner 接口。
(2)重寫 partition()方法。

package org.zhm.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @ClassName Mypartitioner
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:28
 * @Version 1.0
 */

/**
 1、實(shí)現(xiàn)接口Partitioner
 2、實(shí)現(xiàn)三個(gè)方法:Partition、close、configure
 3、編寫Partition方法,返回分區(qū)號(hào)
 */
public class MyPartitioner implements Partitioner {

    /*
    *
     * @description:返回信息對(duì)應(yīng)的分區(qū)
     * @author: zouhuiming
     * @date: 2023/6/12 19:30
     * @param: [s, o, bytes, o1, bytes1, cluster]
     * [主題、消息的key、消息的key序列化后的字節(jié)數(shù)組、消息的value、消息的value序列哈后字節(jié)數(shù)組、集群元數(shù)據(jù)可以查看的分區(qū)信息]
     * @return: int
     **/
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //獲取信息
        String msyValue = o1.toString();

        //創(chuàng)建partition
        int partition;

        //判斷信息是否包含zhm
        if (msyValue.contains("zhm")){
            partition=0;
        }
        else {
            partition=1;
        }
        //返回分區(qū)號(hào)
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}


(3)使用分區(qū)器的方法,在生產(chǎn)者的配置中添加分區(qū)器參數(shù)。

package org.zhm.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @ClassName CustomProducerCallbackPartitionsMine
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:35
 * @Version 1.0
 */
public class CustomProducerCallbackPartitionsMine {
    public static void main(String[] args) {
        Properties properties=new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //添加自定義分區(qū)器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.zhm.producer.MyPartitioner");

        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });

        }
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });

        }

        kafkaProducer.close();
    }
}


(4)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制臺(tái)觀察回調(diào)信息。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.5 生產(chǎn)經(jīng)驗(yàn)——生產(chǎn)者如何提高吞吐量
  • batch.size:批次大小,默認(rèn)16k
  • linger.ms:等待時(shí)間,修改為5-100ms
  • compression.type:壓縮snappy
  • RecordAccumulator:緩存區(qū)大小,修改1為64MB
1.6 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)可靠性

1、ack應(yīng)答原理
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
可靠性總結(jié):

  • acks=0,生產(chǎn)者發(fā)送過來數(shù)據(jù)就不管了,可靠性差,效率高;
  • acks=1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader應(yīng)答,可靠性中等,效率中等;
  • acks=-1(all),,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader和ISR隊(duì)列里面所有Follwer應(yīng)答,可靠性高,效率低;
    在生產(chǎn)環(huán)境中,acks=0很少使用;acks=1,一般用于傳輸普通日志,允許丟個(gè)別數(shù)據(jù);acks=-1,一般用于傳輸和錢相關(guān)的數(shù)據(jù),對(duì)可靠性要求比較高的場(chǎng)景。

數(shù)據(jù)重復(fù)分析
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.7 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)去重
1.7.1 數(shù)據(jù)傳遞語(yǔ)義
  • 至少一次(At Least Once) =ACK級(jí)別設(shè)置為-1+分區(qū)副本數(shù)大于等于2+ISR里應(yīng)答的最小副本數(shù)量大于等于2
  • 最多一次(At Most Once)=ACK級(jí)別設(shè)置為0
  • 總結(jié)
    • At Least Once可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù);
    • At Most Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。
  • 精確一次(Exactly Once):對(duì)于一些非常重要的信息,比如和錢相關(guān)的數(shù)據(jù),要求數(shù)據(jù)既不能重復(fù)也不丟失。Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性和事務(wù)。
1.7.2 冪等性

冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。
精確一次(Exactly Once) = 冪等性 + 至少一次( ack=-1 + 分區(qū)副本數(shù)>=2 + ISR最小副本數(shù)量>=2) 。

重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber>相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。其中PID是Kafka每次重啟都會(huì)分配一個(gè)新的;Partition表示分區(qū)號(hào);Sequence Number是單調(diào)自增的。
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
如何啟用冪等性
開啟參數(shù) enable.idempotence 默認(rèn)為 true,false 關(guān)閉

1.7.3 生產(chǎn)者事務(wù)

1、Kafka事務(wù)原理
注意:開啟事務(wù),必須開啟冪等性
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)
2、Kafka 的事務(wù)一共有如下 5 個(gè) API

// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
1.8 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)有序

Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

1.8 生產(chǎn)檢驗(yàn)——數(shù)據(jù)亂序

1、kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下:
max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)。
2、kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
(1)未開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置為1。
(2)開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置小于等于5。
原因說明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)端會(huì)緩存producer發(fā)來的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。
Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)文章來源地址http://www.zghlxwxcb.cn/news/detail-488080.html

到了這里,關(guān)于Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka入門(五):kafka生產(chǎn)者發(fā)送消息

    構(gòu)建消息,即創(chuàng)建 ProduceRecord 對(duì)象。 (1) kafka發(fā)送消息,最常見的構(gòu)造方法是: topic 表示主題, value 表示值。 (2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來決定消息該被寫到主題的哪個(gè)分區(qū)。擁有相同key 的消息,將被寫到同一個(gè)分區(qū)。

    2024年01月17日
    瀏覽(41)
  • 多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    生產(chǎn)者客戶端代碼 KafkaProducer 通過解析 producer.propeties 文件里面的屬性來構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、 RecordAccumulator消息累加器 、 元信息更新器 、啟動(dòng)發(fā)送請(qǐng)求的后臺(tái)線程 生產(chǎn)者元信息更新器 我們之前有講過. 客戶端都會(huì)保存集群的元信息,例如

    2023年04月09日
    瀏覽(31)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對(duì)象封裝消息主題、消息的value(內(nèi)容)、timestamp(時(shí)間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會(huì)經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對(duì)生產(chǎn)者而言,對(duì)所有消息都是生效的,攔截器也支持鏈?zhǔn)骄幊蹋ㄘ?zé)任器鏈)的

    2024年02月16日
    瀏覽(24)
  • kafka服務(wù)端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設(shè)置為5242880,即5MB,可以根據(jù)實(shí)際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個(gè)消息最大字節(jié)數(shù),根據(jù)實(shí)際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費(fèi)者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會(huì)阻塞等待服務(wù)器的響應(yīng)。如果發(fā)送失敗,生產(chǎn)者會(huì)拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯(cuò)誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進(jìn)行

    2024年02月06日
    瀏覽(23)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    簡(jiǎn)單來說,就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(46)
  • kafka學(xué)習(xí)-生產(chǎn)者

    kafka學(xué)習(xí)-生產(chǎn)者

    目錄 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 自定義序列化器 4、分區(qū)器 默認(rèn)分區(qū)規(guī)則 自定義分區(qū)器 5、生產(chǎn)者攔截器 作用 自定義攔截器 6、生產(chǎn)者原理解析 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進(jìn)行發(fā)送。

    2024年02月09日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包