国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)

這篇具有很好參考價值的文章主要介紹了Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

一、RoundRobin 分區(qū)分配策略原理

  • RoundRobin 針對集群中所有Topic而言。
  • RoundRobin 輪詢分區(qū)策略,是把所有的 partition 和所有的consumer 都列出來,然后按照 hashcode 進(jìn)行排序,最后通過輪詢算法來分配 partition 給到各個消費(fèi)者。
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

二、RoundRobin分區(qū)分配策略代碼案例

2.1、創(chuàng)建帶有7個分區(qū)的sixTopic主題

  • 在 Kafka 集群控制臺,創(chuàng)建帶有7個分區(qū)的sixTopic主題

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sixTopic
    

    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

2.3、創(chuàng)建三個消費(fèi)者 組成 消費(fèi)者組

  • 復(fù)制 CustomConsumer1類,創(chuàng)建 CustomConsumer2和CustomConsumer3。這樣可以由三個消費(fèi)者組成消費(fèi)者組,組名都為“test1”,設(shè)置分區(qū)分配策略為 RoundRobin。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 連接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消費(fèi)者組id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
            // 設(shè)置分區(qū)分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    
            // 1 創(chuàng)建一個消費(fèi)者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 訂閱主題 sixTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sixTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消費(fèi)數(shù)據(jù)
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

2.3、創(chuàng)建生產(chǎn)者

  • 創(chuàng)建CustomProducer生產(chǎn)者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、創(chuàng)建 kafka 生產(chǎn)者的配置對象
            Properties properties = new Properties();
    
            //2、給 kafka 配置對象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定對應(yīng)的key和value的序列化類型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、創(chuàng)建 kafka 生產(chǎn)者對象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、調(diào)用 send 方法,發(fā)送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("sixTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主題: "+metadata.topic() + " 分區(qū): "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 關(guān)閉資源
            kafkaProducer.close();
        }
    }
    
    

2.4、測試

  • 首先,在 IDEA中分別啟動消費(fèi)者1、消費(fèi)者2和消費(fèi)者3代碼
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

  • 然后,在 IDEA中分別啟動生產(chǎn)者代碼
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

  • 在 IDEA 控制臺觀察消費(fèi)者1、消費(fèi)者2和消費(fèi)者3控制臺接收到的數(shù)據(jù),如下圖所示:

    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

2.5、RoundRobin分區(qū)分配策略代碼案例說明

  • 由上述測試輸出結(jié)果截圖可知: 消費(fèi)者1消費(fèi)1、4分區(qū)的數(shù)據(jù);消費(fèi)者2消費(fèi)2和5分區(qū)的數(shù)據(jù);消費(fèi)者3消費(fèi)0、3、6分區(qū)的數(shù)據(jù)。
  • 說明:Kafka 采用修改后的RoundRobin分區(qū)分配策略。

三、RoundRobin 分區(qū)分配再平衡案例

3.1、停止某一個消費(fèi)者后,(45s 以內(nèi))重新發(fā)送消息示例

  • 由下圖控制臺輸出可知:2號消費(fèi)者 消費(fèi)到 2、5號分區(qū)數(shù)據(jù)。
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka
  • 由下圖控制臺輸出可知:3號消費(fèi)者 消費(fèi)到 0、3、6號分區(qū)數(shù)據(jù)。
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

3.2、停止某一個消費(fèi)者后,(45s 以后)重新發(fā)送消息示例

  • 由下圖控制臺輸出可知:2號消費(fèi)者 消費(fèi)到 1、3、5號分區(qū)數(shù)據(jù)。
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka

  • 由下圖控制臺輸出可知:3號消費(fèi)者 消費(fèi)到 0、2、4、6號分區(qū)數(shù)據(jù)。
    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡),kafka,kafka文章來源地址http://www.zghlxwxcb.cn/news/detail-733573.html

3.3、RoundRobin 分區(qū)分配再平衡案例說明

  • 1號消費(fèi)者掛掉后,消費(fèi)者組需要按照超時時間 45s 來判斷它是否退出,所以需要等待,時間到了 45s 后,判斷它真的退出就會把任務(wù)分配給其他 broker 執(zhí)行。
  • 消費(fèi)者 1 已經(jīng)被踢出消費(fèi)者組,所以重新按照 RoundRobin 方式分配。

到了這里,關(guān)于Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者總體工作流程圖解)

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者總體工作流程圖解)

    角色劃分:生產(chǎn)者、zookeeper、kafka集群、消費(fèi)者、消費(fèi)者組。如下圖所示: 生產(chǎn)者發(fā)送消息給leader,followerr主動從leader同步數(shù)據(jù),一個消費(fèi)者可以消費(fèi)某一個分區(qū)數(shù)據(jù)或者一個消費(fèi)者可以消費(fèi)多個分區(qū)數(shù)據(jù)。如下圖所示: 每個分區(qū)的數(shù)據(jù)只能由消費(fèi)者組中一個消費(fèi)者消費(fèi)。如下

    2024年02月09日
    瀏覽(37)
  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組初始化流程圖解)

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組初始化流程圖解)

    每個consumer都發(fā)送JoinGroup請求,如下圖所示: 選出一個consumer作為leader,如下圖所示: 把要消費(fèi)的topic情況發(fā)送給leader 消費(fèi)者,如下圖所示: leader會負(fù)責(zé)制定消費(fèi)方案,并把消費(fèi)方案發(fā)給coordinator,如下圖所示: Coordinator就把消費(fèi)方案下發(fā)給各個consumer,如下圖所示: 每個消

    2024年02月09日
    瀏覽(23)
  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    創(chuàng)建一個消費(fèi)者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進(jìn)行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費(fèi)請求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認(rèn)1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達(dá)到的超時時間,默認(rèn)500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • Kafka3.0.0版本——消費(fèi)者(自動提交 offset)

    Kafka3.0.0版本——消費(fèi)者(自動提交 offset)

    官網(wǎng)文檔 參數(shù)解釋 參數(shù) 描述 enable.auto.commi 默認(rèn)值為 true,消費(fèi)者會自動周期性地向服務(wù)器提交偏移量。 auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s。 圖解分析 消費(fèi)者自動提交 offset代碼 消費(fèi)者自動提交

    2024年02月09日
    瀏覽(27)
  • Kafka3.0.0版本——消費(fèi)者(手動提交offset)

    Kafka3.0.0版本——消費(fèi)者(手動提交offset)

    1.1、手動提交offset的兩種方式 commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。 commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費(fèi)下一批數(shù)據(jù)了。 1.2、手動提交offset兩種方式的區(qū)別 相同點(diǎn):都會將本次提交的一批數(shù)據(jù)最高的偏移量提交。 不

    2024年02月09日
    瀏覽(28)
  • Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個主題數(shù)據(jù)案例__訂閱主題)

    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個主題數(shù)據(jù)案例__訂閱主題)

    1.1、案例需求 創(chuàng)建一個獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題中數(shù)據(jù),所下圖所示: 注意:在消費(fèi)者 API 代碼中必須配置消費(fèi)者組 id。命令行啟動消費(fèi)者不填寫消費(fèi)者組id 會被自動填寫隨機(jī)的消費(fèi)者組 id。 1.2、案例代碼 代碼 1.3、測試 在 Kafka 集群控制臺,創(chuàng)建firstTopic主題 在 IDEA中

    2024年02月09日
    瀏覽(26)
  • 【Kafka-Consumer分區(qū)分配策略】Kafka 消費(fèi)者組三種分區(qū)分配策略 Range Assignor、RoundRobin Assignor、Sticky Assignor 詳細(xì)解析

    【Kafka-Consumer分區(qū)分配策略】Kafka 消費(fèi)者組三種分區(qū)分配策略 Range Assignor、RoundRobin Assignor、Sticky Assignor 詳細(xì)解析

    1、一個 consumer group 中有多個 consumer 組成,一個 topic 有多個 partition 組成,現(xiàn)在的問題是,到底由哪個 consumer 來消費(fèi)哪個 partition 的數(shù)據(jù)。 2、Kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通過配置參數(shù) partition.assignment.strategy ,修改分區(qū)的分配

    2024年02月22日
    瀏覽(21)
  • Kafka3.0.0版本——手動調(diào)整分區(qū)副本示例

    Kafka3.0.0版本——手動調(diào)整分區(qū)副本示例

    四臺服務(wù)器 原始服務(wù)器名稱 原始服務(wù)器ip 節(jié)點(diǎn) centos7虛擬機(jī)1 192.168.136.27 broker0 centos7虛擬機(jī)2 192.168.136.28 broker1 centos7虛擬機(jī)3 192.168.136.29 broker2 centos7虛擬機(jī)4 192.168.136.30 broker3 2.1、先啟動zookeeper集群 啟動zookeeper集群 2.2、再啟動kafka集群 啟動kafka集群 3.1、手動調(diào)整分區(qū)副本的前提

    2024年02月11日
    瀏覽(24)
  • Kafka學(xué)習(xí)---4、消費(fèi)者(分區(qū)消費(fèi)、分區(qū)平衡策略、offset、漏消費(fèi)和重復(fù)消費(fèi))

    Kafka學(xué)習(xí)---4、消費(fèi)者(分區(qū)消費(fèi)、分區(qū)平衡策略、offset、漏消費(fèi)和重復(fù)消費(fèi))

    1.1 Kafka消費(fèi)方式 1、pull(拉)模式:consumer采用從broker中主動拉取數(shù)據(jù)。 2、push(推)模式:Kafka沒有采用這種方式。因為broker決定消息發(fā)生速率,很難適應(yīng)所有消費(fèi)者的消費(fèi)速率。例如推送的速度是50M/s,Consumer1、Consumer2就來不及處理消息。 pull模式不足之處是如果Kafka沒有數(shù)

    2024年02月16日
    瀏覽(24)
  • 10、Kafka ------ 消費(fèi)者組 和 消費(fèi)者實(shí)例,分區(qū) 和 消費(fèi)者實(shí)例 之間的分配策略

    10、Kafka ------ 消費(fèi)者組 和 消費(fèi)者實(shí)例,分區(qū) 和 消費(fèi)者實(shí)例 之間的分配策略

    形象來說:你可以把主題內(nèi)的多個分區(qū)當(dāng)成多個子任務(wù)、多個子任務(wù)組成項目,每個消費(fèi)者實(shí)例就相當(dāng)于一個員工,假如你們 team 包含2個員工。 同理: 同一主題下,每個分區(qū)最多只會分給同一個組內(nèi)的一個消費(fèi)者實(shí)例 消費(fèi)者以組的名義來訂閱主題,前面的 kafka-console-consu

    2024年01月19日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包