国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

全網(wǎng)最詳細(xì)地理解Kafka中的Topic和Partition以及關(guān)于kafka的消息分發(fā)、服務(wù)端如何消費(fèi)指定分區(qū)、kafka的分區(qū)分配策略(range策略和RoundRobin策略)

這篇具有很好參考價(jià)值的文章主要介紹了全網(wǎng)最詳細(xì)地理解Kafka中的Topic和Partition以及關(guān)于kafka的消息分發(fā)、服務(wù)端如何消費(fèi)指定分區(qū)、kafka的分區(qū)分配策略(range策略和RoundRobin策略)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1. 文章引言

最近在學(xué)習(xí)kafka相關(guān)的知識(shí),特將學(xué)習(xí)成功記錄成文章,以供大家共同學(xué)習(xí)。

首先要注意的是,Kafka中的TopicActiveMQ中的Topic是不一樣的。

Kafka中,Topic是一個(gè)存儲(chǔ)消息的邏輯概念,可以認(rèn)為是一個(gè)消息集合。每條消息發(fā)送到Kafka集群的消息都有一個(gè)類(lèi)別。

物理上來(lái)說(shuō),不同的Topic的消息是分開(kāi)存儲(chǔ)的,每個(gè)Topic可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者去消費(fèi)其中的消息。

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

每個(gè)Topic可以劃分多個(gè)分區(qū)**(每個(gè)Topic至少有一個(gè)分區(qū))**,同一Topic 下的不同分區(qū)包含的消息是不同的。

每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè)offset,它是消息在此分區(qū)中的唯一編號(hào),Kafka通過(guò)offset保證消息在分區(qū)內(nèi)的順序。

offset的順序不跨分區(qū),即Kafka只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的。

消息是每次追加到對(duì)應(yīng)的Partition的后面:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

2. Topic & Partition的存儲(chǔ)

Topic是一個(gè)邏輯上的概念,具體的存儲(chǔ)還是基于Partition來(lái)的。

創(chuàng)建一個(gè)test2 Topic(注意這里的 partitions 參數(shù)為 3)

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

可以進(jìn)入/tmp/kafka-logs目錄下進(jìn)行查看(當(dāng)前機(jī)器IP 192.168.220.135),如下圖所示:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

在另外一臺(tái)136機(jī)器上:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

可以發(fā)現(xiàn):

  1. 135機(jī)器上有test2-0test2-2

  2. 136機(jī)器上有test2-1

接下來(lái),再結(jié)合Kafka的消息分發(fā)策略來(lái)看。

3. Kafka的消息分發(fā)

Kafka中最基本的數(shù)據(jù)單元就是消息,而一條消息其實(shí)是由Key + Value組成:

  1. Key是可選項(xiàng),可傳空值

  2. Value也可以傳空值

這也是與ActiveMQ不同的一個(gè)地方。

在發(fā)送一條消息時(shí),我們可以指定這 Key,那 Producer會(huì)根據(jù)Keypartition機(jī)制來(lái)判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲(chǔ)到哪個(gè)partition 中(這個(gè)就跟分片機(jī)制類(lèi)似)。

我們可以根據(jù)需要進(jìn)行擴(kuò)展Producerpartition機(jī)制 (默認(rèn)算法是hash%。

如下擴(kuò)展自己的partition代碼所示:

 
/**
 * 消息發(fā)送后會(huì)調(diào)用自定義的策略
 *
 * @author super先生
 * @date 2023/2/10 14:20
 */
public class MyPartitioner implements Partitioner {
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲取當(dāng)前 topic 有多少個(gè)分區(qū)(分區(qū)列表)
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int partitionNum = 0;
        if (key == null) { //之前介紹過(guò) Key 是可以傳空值的
            partitionNum = new Random().nextInt(partitions.size());   //隨機(jī)
        } else {
            //取 %
            partitionNum = Math.abs((key.hashCode()) % partitions.size());
        }
        System.out.println("key:" + key + ",value:" + value + ",partitionNum:" + partitionNum);
        //發(fā)送到指定分區(qū)
        return partitionNum;
    }
 
    @Override
    public void close() {}
 
    @Override
    public void configure(Map<String, ?> configs) {}
}

生產(chǎn)者和消費(fèi)者代碼可參考我的博文:實(shí)現(xiàn)kafka的生產(chǎn)者(Producer)和消費(fèi)者(Consumer)的代碼

如下創(chuàng)建kafka生產(chǎn)者(Producer)的代碼:

/**
 * @author super先生
 * @date 2023/2/10 14:40
 */
public class KafkaProducerDemo extends Thread {
    /**
     * 消息發(fā)送者
     */
    private final KafkaProducer<Integer, String> producer;
 
    /**
     * topic
     */
    private final String topic;
 
    private final Boolean isAsync;
 
    public KafkaProducerDemo(String topic, Boolean isAsync) {
        this.isAsync = isAsync;
        //構(gòu)建相關(guān)屬性
        //@see ProducerConfig
        Properties properties = new Properties();
        //Kafka 地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
        //kafka 客戶(hù)端 Demo
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        //The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
        /**發(fā)送端消息確認(rèn)模式:
         *  0:消息發(fā)送給broker后,不需要確認(rèn)(性能較高,但是會(huì)出現(xiàn)數(shù)據(jù)丟失,而且風(fēng)險(xiǎn)最大,因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)
         *  1:只需要獲得集群中的 leader節(jié)點(diǎn)的確認(rèn)即可返回
         *  -1/all:需要 ISR 中的所有的 Replica進(jìn)行確認(rèn)(集群中的所有節(jié)點(diǎn)確認(rèn)),最安全的,也有可能出現(xiàn)數(shù)據(jù)丟失(因?yàn)?ISR 可能會(huì)縮小到僅包含一個(gè) Replica)
         */
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
 
        /**【調(diào)優(yōu)】
         * batch.size 參數(shù)(默認(rèn) 16kb)
         *  public static final String BATCH_SIZE_CONFIG = "batch.size";
         *
         *  producer對(duì)于同一個(gè) 分區(qū) 來(lái)說(shuō),會(huì)按照 batch.size 的大小進(jìn)行統(tǒng)一收集進(jìn)行批量發(fā)送,相當(dāng)于消息并不會(huì)立即發(fā)送,而是會(huì)收集整理大小至 16kb.若將該值設(shè)為0,則不會(huì)進(jìn)行批處理
         */
 
        /**【調(diào)優(yōu)】
         * linger.ms 參數(shù)
         *  public static final String LINGER_MS_CONFIG = "linger.ms";
         *  一個(gè)毫秒值。Kafka 默認(rèn)會(huì)把兩次請(qǐng)求的時(shí)間間隔之內(nèi)的消息進(jìn)行搜集。相當(dāng)于會(huì)有一個(gè) delay 操作。比如定義的是1000(1s),消息一秒鐘發(fā)送5條,那么這 5條消息不會(huì)立馬發(fā)送,而是會(huì)有一個(gè) delay操作進(jìn)行聚合,
         *  delay以后再次批量發(fā)送到 broker。默認(rèn)是 0,就是不延遲(同 TCP Nagle算法),那么 batch.size 也就不生效了
         */
        //linger.ms 參數(shù)和batch.size 參數(shù)只要滿(mǎn)足其中一個(gè)都會(huì)發(fā)送
 
        /**【調(diào)優(yōu)】
         * max.request.size 參數(shù)(默認(rèn)是1M)   設(shè)置請(qǐng)求最大字節(jié)數(shù)
         * public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
         * 如果設(shè)置的過(guò)大,發(fā)送的性能會(huì)受到影響,同時(shí)寫(xiě)入接收的性能也會(huì)受到影響。
         */
 
        //設(shè)置 key的序列化,key 是 Integer類(lèi)型,使用 IntegerSerializer
        //org.apache.kafka.common.serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        //設(shè)置 value 的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        //指定分區(qū)策略
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"superJson.kafka.partition.MyPartitioner");
 
        //構(gòu)建 kafka Producer,這里 key 是 Integer 類(lèi)型,Value 是 String 類(lèi)型
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
    }
 
    public static void main(String[] args) {
        new KafkaProducerDemo("test2",true).start();
    }
 
    @Override
    public void run() {
        int num = 0;
        while (num < 100) {
            String message = "message--->" + num;
            System.out.println("start to send message 【 " + message + " 】");
            if (isAsync) {  //如果是異步發(fā)送
                producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (metadata!=null){
                            System.out.println("async-offset:"+metadata.offset()+"-> partition"+metadata.partition());
                        }
                    }
                });
            } else {   //同步發(fā)送
                try {
                    RecordMetadata metadata = producer.send(new ProducerRecord<Integer, String>(topic, message)).get();
                    System.out.println("sync-offset:"+metadata.offset()+"-> partition"+metadata.partition());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

實(shí)現(xiàn)kafka的消費(fèi)者(Consumer)中在接收消息的時(shí)候輸出分區(qū),如下代碼所示:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

 
/**
 * @author super先生
 * @date 2023/2/10 15:10
 */
public class KafkaConsumerDemo extends Thread {
 
    private final KafkaConsumer<Integer, String> kafkaConsumer;
 
    public KafkaConsumerDemo(String topic) {
        //構(gòu)建相關(guān)屬性
        //@see ConsumerConfig
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
        //消費(fèi)組
        /**
         * consumer group是kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是
         一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例((consumer instance),
         它們共享一個(gè)公共的ID,即group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂
         閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個(gè)分區(qū)只能由同一
         個(gè)消費(fèi)組內(nèi)的一個(gè)consumer來(lái)消費(fèi).后面會(huì)進(jìn)一步介紹。
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
 
        /** auto.offset.reset 參數(shù)  從什么時(shí)候開(kāi)始消費(fèi)
         *  public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
         *
         *  這個(gè)參數(shù)是針對(duì)新的groupid中的消費(fèi)者而言的,當(dāng)有新groupid的消費(fèi)者來(lái)消費(fèi)指定的topic時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語(yǔ)義
         *  auto.offset.reset=latest情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的offset處開(kāi)始消費(fèi)topic下的消息
         *  auto.offset.reset= earliest情況下,新的消費(fèi)者會(huì)從該topic最早的消息開(kāi)始消費(fèi)
            auto.offset.reset=none情況下,新的消費(fèi)組加入以后,由于之前不存在 offset,則會(huì)直接拋出異常。說(shuō)白了,新的消費(fèi)組不要設(shè)置這個(gè)值
         */
 
        //enable.auto.commit
        //消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到(如果沒(méi)有 commit,消息可以重復(fù)消費(fèi),也沒(méi)有 offset),還可以配合auto.commit.interval.ms控制自動(dòng)提交的頻率。
        //當(dāng)然,我們也可以通過(guò)consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 
        /**max.poll.records
         *此參數(shù)設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次poll間隔
         要處理的最大值。通過(guò)調(diào)整此值,可以減少poll間隔
         */
 
        //間隔時(shí)間
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //反序列化 key
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //反序列化 value
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //構(gòu)建 KafkaConsumer
        kafkaConsumer = new KafkaConsumer<>(properties);
        //設(shè)置 topic
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }
 
 
    /**
     * 接收消息
      */
    @Override
    public void run() {
        while (true) {
            //拉取消息
            ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(100000000);
            for (ConsumerRecord<Integer, String> record : consumerRecord) {
                //record.partition() 獲取當(dāng)前分區(qū)
                System.out.println(record.partition()+"】】  message receive 【" + record.value() + "】");
            }
        }
    }
 
    public static void main(String[] args) {
        new KafkaConsumerDemo("test2").start();
    }
 
}

首先啟動(dòng)Consumer,再啟動(dòng)Producer

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

可以看到是能夠?qū)Φ纳系摹?/p>

默認(rèn)情況下,Kafka采用的是hash 取 % 的分區(qū)算法。

如果Keynull,則會(huì)隨機(jī)分配一個(gè)分區(qū)。

這個(gè)隨機(jī)是在這個(gè)參數(shù)metadata.max.age.ms的時(shí)間范圍內(nèi)隨機(jī)選擇一個(gè)。

對(duì)于這個(gè)時(shí)間段內(nèi),如果Key為 null,則只會(huì)發(fā)送到唯一的分區(qū)。這個(gè)值默認(rèn)情況下是 10 分鐘更新一次 (因?yàn)?partition 狀態(tài)可能會(huì)發(fā)生變化)。

4. 關(guān)于 Metadata

Metadata包含 TopicPartitionbroker 的映射關(guān)系,每一個(gè) Topic 的每一個(gè) partition,需要知道對(duì)應(yīng)的 broker 列表是什么,Leader 是誰(shuí),Follower 是誰(shuí)。

這些信息都是存儲(chǔ)在Metadata這個(gè)類(lèi)中,如下圖所示:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

5. 消費(fèi)端如何消費(fèi)指定分區(qū)

Consumer可以指定具體消費(fèi)的分區(qū),如下圖所示:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

再重新啟動(dòng)ConsumerProducer

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

可以看到Consumer只消費(fèi)了分區(qū)為1的消息。

以上是單個(gè)Consumer消費(fèi)(指定)分區(qū)的情況。

一般每個(gè)Topic都會(huì)有多個(gè)partition(主要是用于數(shù)據(jù)分片,減少消息的容量,從而提升 I/O 性能)。

當(dāng)然也可以使用多個(gè)Consumer從而提高消費(fèi)能力,有一個(gè)消費(fèi)組的概念(具體可參看:一文了解kafka消息隊(duì)列)

如果Consumer1、Consumer2 和 Consumer3 都屬于group.id 為1 的消費(fèi)組,那么消費(fèi)情況如下:

  1. Consumer1就會(huì)消費(fèi) p0

  2. Consumer2就會(huì)消費(fèi)p1

  3. Consumer3就會(huì)消費(fèi)p2

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

不使用指定分區(qū)的方式創(chuàng)建三個(gè)Consumer

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

而且,它們都是同一個(gè)消費(fèi)組:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)
同時(shí)啟動(dòng)三個(gè)ConsumerProducer,而且它們都是同一個(gè)消費(fèi)組:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

可以看到三個(gè)Consumer分別消費(fèi)三個(gè)Partition,很均勻。

對(duì)同一個(gè)Group來(lái)說(shuō),其中的Consumer可以消費(fèi)指定分區(qū)也可以消費(fèi)自動(dòng)分配的分區(qū)(這里是 Consumer數(shù)量和partition數(shù)量一致,均勻分配)。

如果下述情況如何處理:

  1. 如果Consumer數(shù)量大于partition數(shù)量呢?
  2. 如果Consumer數(shù)量小于partition數(shù)量呢?

這兩種情況,讀者可自行測(cè)試。

但要注意如下情況:

  1. 如果Consumer數(shù)量比partition數(shù)量多,會(huì)有的Consumer閑置無(wú)法消費(fèi),這樣是一個(gè)浪費(fèi)。

  2. 如果Consumer數(shù)量小于partition數(shù)量會(huì)有一個(gè)Consumer消費(fèi)多個(gè)partition。

Kafkapartition上是不允許并發(fā)的。Consuemr數(shù)量建議最好是partition的整數(shù)倍。

還有一點(diǎn),如果Consumer從多個(gè)partiton上讀取數(shù)據(jù),是不保證順序性的。Kafka只保證一個(gè)partition的順序性,跨partition是不保證順序性的。增減 Consumer、broker、partition 會(huì)導(dǎo)致 Rebalance。

6. Kafka 分區(qū)分配策略

Kafka中,同一個(gè)Group中的消費(fèi)者對(duì)于一個(gè)Topic中的多個(gè)partition存在一定的分區(qū)分配策略——分區(qū)分配策略有如下兩種:

  1. Range(默認(rèn))策略

  2. RoundRobin(輪詢(xún))策略

通過(guò)partition.assignment.strategy這個(gè)參數(shù)來(lái)設(shè)置。

6.1 Range strategy(范圍分區(qū))

Range策略是對(duì)每個(gè)主題而言的,首先對(duì)同一個(gè)主題里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序。

假設(shè)我們有10個(gè)分區(qū),3個(gè)消費(fèi)者,排完序的分區(qū)將會(huì)是0,1,2,3,4,5,6,7,8,9;消費(fèi)者線(xiàn)程排完序?qū)?huì)是C1-0, C2-0, C3-0。

然后partitions的個(gè)數(shù)除于消費(fèi)者線(xiàn)程的總數(shù)來(lái)決定每個(gè)消費(fèi)者線(xiàn)程消費(fèi)幾個(gè)分區(qū)。如果除不盡,那么前面幾個(gè)消費(fèi)者線(xiàn)程將會(huì)多消費(fèi)一個(gè)分區(qū)。

假如在 Topic1中有10個(gè)分區(qū),3個(gè)消費(fèi)者線(xiàn)程,10/3 = 3,而且除不盡,那么消費(fèi)者線(xiàn)程C1-0將會(huì)多消費(fèi)一個(gè)分區(qū),所以最后分區(qū)分配的結(jié)果是這樣的:

  1. C1-0將消費(fèi)0,1,2,3分區(qū)

  2. C2-0將消費(fèi)4,5,6分區(qū)

  3. C3-0將消費(fèi)7,8,9分區(qū)

假如在Topic1中有11個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:

  1. C1-0將消費(fèi)0,1,2,3分區(qū)

  2. C2-0將消費(fèi)4, 5, 6, 7分區(qū)

  3. C3-0將消費(fèi)8,9,10分區(qū)

假如有兩個(gè)Topic:Topic1 和 Topic2,都有10個(gè)分區(qū)。那么,最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:

  1. C1-0將消費(fèi)Topic10,1,2,3分區(qū)和Topic10,1,2,3分區(qū)

  2. C2-0將消費(fèi)Topic14,5,6分區(qū)和Topic24,5,6分區(qū)

  3. C3-0將消費(fèi)Topic17,8,9分區(qū)和Topic27,8,9分區(qū)

其實(shí)這樣就會(huì)有一個(gè)問(wèn)題,C1-0就會(huì)多消費(fèi)兩個(gè)分區(qū),這就是一個(gè)很明顯的弊端。

6.2 RoundRobin strategy(輪詢(xún)分區(qū))

輪詢(xún)分區(qū)策略是把所有partition和所有Consumer線(xiàn)程都列出來(lái),然后按照hashcode進(jìn)行排序。

最后通過(guò)輪詢(xún)算法分配partition給消費(fèi)線(xiàn)程。如果所有Consumer實(shí)例的訂閱是相同的,那么partition會(huì)均勻分布。

假如按照hashCode排序完的Topic / partitions組依次為:

  • T1一5
  • T1一3
  • T1-0
  • T1-8
  • T1-2
  • T1-1
  • T1-4
  • T1-7
  • T1-6
  • T1-9

消費(fèi)者線(xiàn)程排序?yàn)椋?/p>

  • C1-0
  • C1-1
  • C2-0
  • C2-1

最后的分區(qū)分配的結(jié)果為:

  • C1-0將消費(fèi)T1-5, T1-2, T1-6分區(qū)
  • C1-1將消費(fèi)T1-3, T1-1, T1-9分區(qū)
  • C2-0將消費(fèi)T1-0, T1-4分區(qū)
  • C2-1將消費(fèi)T1-8, T1-7分區(qū)

使用輪詢(xún)分區(qū)策略必須滿(mǎn)足兩個(gè)條件:

  • 每個(gè)主題的消費(fèi)者實(shí)例具有相同數(shù)量的流

  • 每個(gè)消費(fèi)者訂閱的主題必須是相同的

1. 什么時(shí)候會(huì)觸發(fā)這個(gè)策略呢?

當(dāng)出現(xiàn)以下幾種情況時(shí),Kafka會(huì)進(jìn)行一次分區(qū)分配操作,也就是Kafka ConsumerRebalance

  • 同一個(gè)Consumer group內(nèi)新增了消費(fèi)者

  • 消費(fèi)者離開(kāi)當(dāng)前所屬的Consumer group,比如主動(dòng)停機(jī)或者宕機(jī)

  • Topic新增了分區(qū)(也就是分區(qū)數(shù)量發(fā)生了變化)

Kafka ConsuemrRebalance機(jī)制規(guī)定了一個(gè)Consumer group下的所有Consumer如何達(dá)成一致來(lái)分配訂閱Topic的每個(gè)分區(qū)。

而具體如何執(zhí)行分區(qū)策略,就是前面提到過(guò)的兩種內(nèi)置的分區(qū)策略。而Kafka對(duì)于分配策略這塊,提供了可插拔的實(shí)現(xiàn)方式,也就是說(shuō),除了這兩種之外,我們還可以創(chuàng)建自己的分配機(jī)制。

2. 誰(shuí)來(lái)執(zhí)行Rebalance以及管理Consumergroup呢?

Consumer group如何確定自己的coordinator是誰(shuí)呢,消費(fèi)者向Kafka集群中的任意一個(gè)broker發(fā)送一個(gè) GroupCoord inatorRequest請(qǐng)求,服務(wù)端會(huì)返回一個(gè)負(fù)載最小的broker節(jié)點(diǎn)的id,并將該broker設(shè)置為 coordinator

3. JoinGroup的過(guò)程

Rebalance之前,需要保證coordinator是已經(jīng)確定好了的,整個(gè)Rebalance的過(guò)程分為兩個(gè)步驟,Join和Syncjoin:表示加入到Consumer group中,在這一步中,所有的成員都會(huì)向coordinator發(fā)送 joinGroup的請(qǐng)求。

一旦所有成員都發(fā)了joinGroup請(qǐng)求,那么coordinator會(huì)選擇一個(gè)Consumer擔(dān)任leader角色,并把組成員信息和訂閱信息發(fā)送給消費(fèi)者:

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

  • protocol-metadata:序列化后的消費(fèi)者的訂閱信息

  • leader id:消費(fèi)組中的消費(fèi)者,coordinator會(huì)選擇一個(gè)作為leader,對(duì)應(yīng)的就是member id

  • member metadata:對(duì)應(yīng)消費(fèi)者的訂閱信息

  • members: consumer group中全部的消費(fèi)者的訂閱信息

  • generation_id:年代信息,類(lèi)似于ZooKeepe 中的epoch,對(duì)于每一輪Rebalancegeneration_id都會(huì)遞增。主要用來(lái)保護(hù)consumer group,隔離無(wú)效的offset提交。也就是上一輪的consumer成員無(wú)法提交 offset到新的Consumer group中。

4. Synchronizing Group State 階段

完成分區(qū)分配之后,就進(jìn)入了Synchronizing Group Stat 階段,主要邏輯是向GroupCoordinator發(fā)送SyncGroupRequest請(qǐng)求,并且處理SyncGroupResponse響應(yīng),簡(jiǎn)單來(lái)說(shuō),就是leader將消費(fèi)者對(duì)應(yīng)的 partition分配方案同步給Consumer group中的所有Consumer

kafka topic partition設(shè)置,免費(fèi)專(zhuān)欄,kafka,java,分布式,后端,運(yùn)維開(kāi)發(fā)

每個(gè)消費(fèi)者都會(huì)向coordinator發(fā)送syncgroup請(qǐng)求,不過(guò)只有leader節(jié)點(diǎn)會(huì)發(fā)送分配方案,其他消費(fèi)者只是打打醬油而已。

當(dāng)leader把方案發(fā)給coordinator以后,coordinator會(huì)把結(jié)果設(shè)置到SyncGroupResponse中。這樣所有成員都知道自己應(yīng)該消費(fèi)哪個(gè)分區(qū)。

Consumer group的分區(qū)分配方案是在客戶(hù)端執(zhí)行的!Kafka將這個(gè)權(quán)利下放給客戶(hù)端主要是因?yàn)檫@樣做可以有更好的靈活性。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-823649.html

7.參考文獻(xiàn)

  1. https://dongguabai.blog.csdn.net/article/details/86536894

到了這里,關(guān)于全網(wǎng)最詳細(xì)地理解Kafka中的Topic和Partition以及關(guān)于kafka的消息分發(fā)、服務(wù)端如何消費(fèi)指定分區(qū)、kafka的分區(qū)分配策略(range策略和RoundRobin策略)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka某Topic的部分partition無(wú)法消費(fèi)問(wèn)題

    Kafka某Topic的部分partition無(wú)法消費(fèi)問(wèn)題

    今天同事反饋有個(gè)topic出現(xiàn)積壓。于是上kfk管理平臺(tái)查看該topic對(duì)應(yīng)的group。發(fā)現(xiàn)6個(gè)分區(qū)中有2個(gè)不消費(fèi),另外4個(gè)消費(fèi)也較慢,總體lag在增長(zhǎng)。查看服務(wù)器日志,日志中有rebalance 12?retry 。。。Exception,之后改消費(fèi)線(xiàn)程停止。 查閱相關(guān)rebalance資料: ? 分析Rebalance?可能是 Consu

    2024年02月12日
    瀏覽(26)
  • kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息隊(duì)列有兩種消費(fèi)模式,分別是 點(diǎn)對(duì)點(diǎn)模式 和 訂閱/發(fā)布模式 。具體比較可以參考Kafka基礎(chǔ)–消息隊(duì)列與消費(fèi)模式。 下圖是一個(gè) 點(diǎn)對(duì)點(diǎn) 的Kafka結(jié)構(gòu)示意圖,其中有以下幾個(gè)部分: producer:消息生產(chǎn)者 consumer:消息消費(fèi)者 Topic:消息主題 partition:主題內(nèi)分區(qū) Brokers:消

    2024年02月04日
    瀏覽(39)
  • kafka消費(fèi)報(bào)錯(cuò) UNKNOWN_TOPIC_OR_PARTITION

    kafka消費(fèi)報(bào)錯(cuò) UNKNOWN_TOPIC_OR_PARTITION

    查看topic 發(fā)現(xiàn)分區(qū)數(shù)量為:1 我們是多個(gè)消費(fèi)服務(wù)

    2024年02月11日
    瀏覽(43)
  • 大數(shù)據(jù)篇Kafka消息隊(duì)列指定Topic打印Key、Value、Offset和Partition

    說(shuō)到Apache Kafka消息傳遞系統(tǒng)時(shí),以下是一些關(guān)鍵概念的解釋?zhuān)?Key(鍵):Kafka消息由Key和Value組成。Key是一個(gè)可選的字段,它通常用于消息的路由和分區(qū)策略。Key的目的是確保具有相同Key的消息被寫(xiě)入同一個(gè)分區(qū)。當(dāng)消費(fèi)者接收到消息時(shí),可以使用Key來(lái)進(jìn)行消息處理和路由操

    2024年02月16日
    瀏覽(27)
  • 關(guān)于java中的多態(tài)和對(duì)實(shí)例化對(duì)象以及向上、向下轉(zhuǎn)型的一些理解

    關(guān)于java中的多態(tài)和對(duì)實(shí)例化對(duì)象以及向上、向下轉(zhuǎn)型的一些理解

    java面向?qū)ο笕筇卣骷礊椋豪^承封裝多態(tài)。而多態(tài)需要三大必要條件。分別是:繼承、方法重寫(xiě)、父類(lèi)引用指向子類(lèi)對(duì)象。我們先一個(gè)一個(gè)來(lái)理解。 1、首先是繼承和重寫(xiě)。這個(gè)很簡(jiǎn)單。因?yàn)槎鄳B(tài)就是建立在不同的重寫(xiě)之上的。也就是說(shuō)多態(tài)就是在使用著一個(gè)方法的不同重寫(xiě)

    2024年02月02日
    瀏覽(22)
  • 全面理解java中的構(gòu)造方法以及this關(guān)鍵字的用法(超詳細(xì))

    全面理解java中的構(gòu)造方法以及this關(guān)鍵字的用法(超詳細(xì))

    Hello,各位鐵汁們!我是小??兒哈!今天我又來(lái)更新我的Java基礎(chǔ)學(xué)習(xí)博客了。 本篇主要內(nèi)容概述: 1、??如何用構(gòu)造方法初始化對(duì)象 2、??為啥要有this這個(gè) 3、??this.屬性名訪(fǎng)問(wèn)成員變量、成員方法 4、??this.方法名 || this.()的用法 目錄 初識(shí)構(gòu)造方法? 構(gòu)造方法的使

    2023年04月09日
    瀏覽(22)
  • Kafka系列之:基于Apache Kafka Connect實(shí)現(xiàn)端到端topic數(shù)據(jù)字段級(jí)加密的詳細(xì)方法

    與其他通信工具一樣,加密在 Apache Kafka 中很有價(jià)值,可以保護(hù)數(shù)據(jù)。 希望通過(guò)與 Apache Kafka Connect 集成來(lái)加密數(shù)據(jù)來(lái)實(shí)現(xiàn)這一目標(biāo)。 Kafka 可以利用多種安全功能,從身份驗(yàn)證和授權(quán)到基于 TLS 的數(shù)據(jù)進(jìn)出 Kafka 主題的線(xiàn)上流量加密。盡管這些措施可以保護(hù)傳輸中的數(shù)據(jù),但它

    2024年02月13日
    瀏覽(30)
  • 理解RabbitMQ中的AMQP-0-9-1模型,全網(wǎng)最全

    理解RabbitMQ中的AMQP-0-9-1模型,全網(wǎng)最全

    Type:交換器的類(lèi)型。 Durability:(交換器)持久化特性,如果啟動(dòng)此特性,則Broker重啟后交換器依然存在,否則交換器會(huì)被刪除。 Auto-delete:是否自動(dòng)刪除,如果啟用此特性,當(dāng)最后一個(gè)隊(duì)列解除與交換器的綁定關(guān)系,交換器會(huì)被刪除。 Arguments:可選參數(shù),一般配合插件或者

    2024年04月26日
    瀏覽(28)
  • 關(guān)于訪(fǎng)問(wèn)后端接口報(bào)404的問(wèn)題——全網(wǎng)最詳細(xì)的404錯(cuò)誤詳解

    關(guān)于訪(fǎng)問(wèn)后端接口報(bào)404的問(wèn)題——全網(wǎng)最詳細(xì)的404錯(cuò)誤詳解

    當(dāng)我們通過(guò)前端向后端發(fā)起一個(gè)請(qǐng)求調(diào)用后端接口時(shí),經(jīng)常會(huì)遇到404的問(wèn)題。網(wǎng)上關(guān)于對(duì)404問(wèn)題介紹的一大堆,其實(shí)404問(wèn)題的本質(zhì)就兩點(diǎn)。 在介紹404問(wèn)題之前先溫習(xí)一個(gè)小的知識(shí)點(diǎn)——項(xiàng)目訪(fǎng)問(wèn)路徑 項(xiàng)目訪(fǎng)問(wèn)路徑:就是定位一個(gè)項(xiàng)目的路徑,可以理解為項(xiàng)目名,但是一般這

    2024年02月02日
    瀏覽(18)
  • 【mysql進(jìn)階-彩蛋篇】深入理解順序io和隨機(jī)io(全網(wǎng)最詳細(xì)篇)

    【mysql進(jìn)階-彩蛋篇】深入理解順序io和隨機(jī)io(全網(wǎng)最詳細(xì)篇)

    MySql系列整體欄目 內(nèi)容 鏈接地址 【一】深入理解mysql索引本質(zhì) https://blog.csdn.net/zhenghuishengq/article/details/121027025 【二】深入理解mysql索引優(yōu)化以及explain https://blog.csdn.net/zhenghuishengq/article/details/124552080 【三】深入理解mysql的索引分類(lèi),覆蓋索引(失效),回表,MRR https://bl

    2024年02月05日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包