国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

這篇具有很好參考價(jià)值的文章主要介紹了Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例

1.1、案例需求

  • 創(chuàng)建一個(gè)獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題 0 號(hào)分區(qū)的數(shù)據(jù),所下圖所示:
    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū)),kafka,kafka

1.2、案例代碼

  • 生產(chǎn)者往firstTopic主題 0 號(hào)分區(qū)發(fā)送數(shù)據(jù)代碼

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、創(chuàng)建 kafka 生產(chǎn)者的配置對(duì)象
            Properties properties = new Properties();
    
            //2、給 kafka 配置對(duì)象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定對(duì)應(yīng)的key和value的序列化類型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、創(chuàng)建 kafka 生產(chǎn)者對(duì)象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、調(diào)用 send 方法,發(fā)送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主題: "+metadata.topic() + " 分區(qū): "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 關(guān)閉資源
            kafkaProducer.close();
        }
    }
    
    
  • 消費(fèi)者消費(fèi)firstTopic主題 0 分區(qū)數(shù)據(jù)代碼文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-700897.html

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerPartition {
    
        public static void main(String[] args) {
            // 配置
            Properties properties = new Properties();
    
            // 連接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 組id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            // 1 創(chuàng)建一個(gè)消費(fèi)者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 訂閱主題對(duì)應(yīng)的分區(qū)
            ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
            topicPartitions.add(new TopicPartition("firstTopic",0));
            kafkaConsumer.assign(topicPartitions);
    
            // 3 消費(fèi)數(shù)據(jù)
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

1.3、測(cè)試

  • 在 IDEA 中執(zhí)行消費(fèi)者程序,如下圖:
    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū)),kafka,kafka
  • 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺(tái)觀察生成幾個(gè) 0號(hào)分區(qū)的數(shù)據(jù),如下圖:
    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū)),kafka,kafka
  • 在 IDEA 控制臺(tái),觀察接收到的數(shù)據(jù),只能消費(fèi)到 0 號(hào)分區(qū)數(shù)據(jù)表示正確。
    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū)),kafka,kafka

到了這里,關(guān)于Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    創(chuàng)建一個(gè)消費(fèi)者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進(jìn)行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費(fèi)請(qǐng)求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認(rèn)1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達(dá)到的超時(shí)時(shí)間,默認(rèn)500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • Kafka3.0.0版本——消費(fèi)者(手動(dòng)提交offset)

    Kafka3.0.0版本——消費(fèi)者(手動(dòng)提交offset)

    1.1、手動(dòng)提交offset的兩種方式 commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。 commitAsync(異步提交) :發(fā)送完提交offset請(qǐng)求后,就開(kāi)始消費(fèi)下一批數(shù)據(jù)了。 1.2、手動(dòng)提交offset兩種方式的區(qū)別 相同點(diǎn):都會(huì)將本次提交的一批數(shù)據(jù)最高的偏移量提交。 不

    2024年02月09日
    瀏覽(27)
  • Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)

    Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)

    官網(wǎng)文檔 參數(shù)解釋 參數(shù) 描述 enable.auto.commi 默認(rèn)值為 true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。 auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s。 圖解分析 消費(fèi)者自動(dòng)提交 offset代碼 消費(fèi)者自動(dòng)提交

    2024年02月09日
    瀏覽(27)
  • Kafka3.0.0版本——消費(fèi)者(分區(qū)的分配以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(分區(qū)的分配以及再平衡)

    1.1、消費(fèi)者分區(qū)及消費(fèi)者組的概述 一個(gè)consumer group中有多個(gè)consumer組成,一個(gè) topic有多個(gè)partition組成。 1.2、如何確定哪個(gè)consumer來(lái)消費(fèi)哪個(gè)partition的數(shù)據(jù) Kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通過(guò)配置參數(shù) partition.assignment.strategy ,修改分

    2024年02月07日
    瀏覽(28)
  • Kafka3.0.0版本——消費(fèi)者(Sticky分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(Sticky分區(qū)分配策略以及再平衡)

    粘性分區(qū)定義:可以理解為分配的結(jié)果帶有“粘性的”。即在執(zhí)行一次新的分配之前,考慮上一次分配的結(jié)果,盡量少的調(diào)整分配的變動(dòng),可以節(jié)省大量的開(kāi)銷。 粘性分區(qū)是 Kafka 從 0.11.x 版本開(kāi)始引入這種分配策略, 首先會(huì)盡量均衡的放置分區(qū)到消費(fèi)者上面, 在出現(xiàn)同一消

    2024年02月09日
    瀏覽(45)
  • Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)

    RoundRobin 針對(duì)集群中 所有Topic而言。 RoundRobin 輪詢分區(qū)策略,是把 所有的 partition 和所有的consumer 都列出來(lái) ,然后 按照 hashcode 進(jìn)行排序 ,最后通過(guò) 輪詢算法 來(lái)分配 partition 給到各個(gè)消費(fèi)者。 2.1、創(chuàng)建帶有7個(gè)分區(qū)的sixTopic主題 在 Kafka 集群控制臺(tái),創(chuàng)建帶有7個(gè)分區(qū)的sixTopi

    2024年02月07日
    瀏覽(22)
  • kafka復(fù)習(xí):(22)一個(gè)分區(qū)只能被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)嗎?

    kafka復(fù)習(xí):(22)一個(gè)分區(qū)只能被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)嗎?

    默認(rèn)情況下,一個(gè)分區(qū)只能被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。但可以自定義PartitionAssignor來(lái)打破這個(gè)限制。 一、自定義PartitionAssignor. 二、定義兩個(gè)消費(fèi)者,給其配置上述PartitionAssignor. 在kafka創(chuàng)建只有一個(gè)分區(qū)的topic : study2023 創(chuàng)建一個(gè)生產(chǎn)者往study2023這個(gè) topic發(fā)送消息: 分別

    2024年02月10日
    瀏覽(25)
  • kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)

    kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)

    kafka是由Apache軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源流處理平臺(tái)。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 kafka中partition類似數(shù)據(jù)庫(kù)中的分表數(shù)據(jù),可以起到水平擴(kuò)展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個(gè)數(shù)據(jù),某個(gè)topic有兩個(gè)partition,一

    2024年01月22日
    瀏覽(161)
  • 多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    記錄 :466 場(chǎng)景 :一個(gè)KafkaProducer在一個(gè)Topic發(fā)布消息,多個(gè)消費(fèi)者KafkaConsumer訂閱Kafka的Topic。每個(gè)KafkaConsumer指定一個(gè)特定的ConsumerGroup,達(dá)到一條消息被多個(gè)不同的ConsumerGroup消費(fèi)。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安裝 :https://blog.csdn.net/zha

    2024年02月16日
    瀏覽(47)
  • 多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    記錄 :465 場(chǎng)景 :一個(gè)Producer在一個(gè)Topic發(fā)布消息,多個(gè)消費(fèi)者Consumer訂閱Kafka的Topic。每個(gè)Consumer指定一個(gè)特定的ConsumerGroup,達(dá)到一條消息被多個(gè)不同的ConsumerGroup消費(fèi)。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安裝 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    瀏覽(22)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包