国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)

這篇具有很好參考價(jià)值的文章主要介紹了Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、自動(dòng)提交offset的相關(guān)參數(shù)

  • 官網(wǎng)文檔
    Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset),kafka,kafka

  • 參數(shù)解釋

    參數(shù) 描述
    enable.auto.commi 默認(rèn)值為 true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。
    auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s。
  • 圖解分析

    Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset),kafka,kafka

二、消費(fèi)者(自動(dòng)提交 offset)代碼示例

  • 消費(fèi)者自動(dòng)提交 offset代碼

    // 自動(dòng)提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    // 提交時(shí)間間隔 1秒
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
  • 消費(fèi)者自動(dòng)提交 offset代碼完整代碼文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-709435.html

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerAutoOffset {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 連接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消費(fèi)者組id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 自動(dòng)提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    
            // 提交時(shí)間間隔 1秒
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
            // 1 創(chuàng)建一個(gè)消費(fèi)者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 訂閱主題 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消費(fèi)數(shù)據(jù)
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

到了這里,關(guān)于Kafka3.0.0版本——消費(fèi)者(自動(dòng)提交 offset)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    創(chuàng)建一個(gè)消費(fèi)者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進(jìn)行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費(fèi)請(qǐng)求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認(rèn)1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達(dá)到的超時(shí)時(shí)間,默認(rèn)500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題數(shù)據(jù)案例__訂閱主題)

    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題數(shù)據(jù)案例__訂閱主題)

    1.1、案例需求 創(chuàng)建一個(gè)獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題中數(shù)據(jù),所下圖所示: 注意:在消費(fèi)者 API 代碼中必須配置消費(fèi)者組 id。命令行啟動(dòng)消費(fèi)者不填寫消費(fèi)者組id 會(huì)被自動(dòng)填寫隨機(jī)的消費(fèi)者組 id。 1.2、案例代碼 代碼 1.3、測(cè)試 在 Kafka 集群控制臺(tái),創(chuàng)建firstTopic主題 在 IDEA中

    2024年02月09日
    瀏覽(26)
  • Kafka3.0.0版本——消費(fèi)者(分區(qū)的分配以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(分區(qū)的分配以及再平衡)

    1.1、消費(fèi)者分區(qū)及消費(fèi)者組的概述 一個(gè)consumer group中有多個(gè)consumer組成,一個(gè) topic有多個(gè)partition組成。 1.2、如何確定哪個(gè)consumer來(lái)消費(fèi)哪個(gè)partition的數(shù)據(jù) Kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通過(guò)配置參數(shù) partition.assignment.strategy ,修改分

    2024年02月07日
    瀏覽(28)
  • Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    1.1、案例需求 創(chuàng)建一個(gè)獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題 0 號(hào)分區(qū)的數(shù)據(jù),所下圖所示: 1.2、案例代碼 生產(chǎn)者往firstTopic主題 0 號(hào)分區(qū)發(fā)送數(shù)據(jù)代碼 消費(fèi)者消費(fèi)firstTopic主題 0 分區(qū)數(shù)據(jù)代碼 1.3、測(cè)試 在 IDEA 中執(zhí)行消費(fèi)者程序,如下圖: 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺(tái)觀察

    2024年02月09日
    瀏覽(34)
  • Kafka3.0.0版本——消費(fèi)者(Sticky分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(Sticky分區(qū)分配策略以及再平衡)

    粘性分區(qū)定義:可以理解為分配的結(jié)果帶有“粘性的”。即在執(zhí)行一次新的分配之前,考慮上一次分配的結(jié)果,盡量少的調(diào)整分配的變動(dòng),可以節(jié)省大量的開(kāi)銷。 粘性分區(qū)是 Kafka 從 0.11.x 版本開(kāi)始引入這種分配策略, 首先會(huì)盡量均衡的放置分區(qū)到消費(fèi)者上面, 在出現(xiàn)同一消

    2024年02月09日
    瀏覽(45)
  • Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費(fèi)者(RoundRobin分區(qū)分配策略以及再平衡)

    RoundRobin 針對(duì)集群中 所有Topic而言。 RoundRobin 輪詢分區(qū)策略,是把 所有的 partition 和所有的consumer 都列出來(lái) ,然后 按照 hashcode 進(jìn)行排序 ,最后通過(guò) 輪詢算法 來(lái)分配 partition 給到各個(gè)消費(fèi)者。 2.1、創(chuàng)建帶有7個(gè)分區(qū)的sixTopic主題 在 Kafka 集群控制臺(tái),創(chuàng)建帶有7個(gè)分區(qū)的sixTopi

    2024年02月07日
    瀏覽(23)
  • Kafka:消費(fèi)者手動(dòng)提交

    雖然自動(dòng)提交offset十分簡(jiǎn)單便利,但由于其是基于時(shí)間提交的,開(kāi)發(fā)人員難以把握offset提交的時(shí)機(jī)。 兩種手動(dòng)提交方式: commitSync(同步提交): 必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。 同步提交阻塞當(dāng)前線程,一直到提交成功,并且會(huì)自動(dòng)失敗重試(由不可控因素

    2024年02月11日
    瀏覽(22)
  • Kafka篇——Kafka消費(fèi)者端常見(jiàn)配置,涵蓋自動(dòng)手動(dòng)提交offset、poll消息細(xì)節(jié)、健康狀態(tài)檢查、新消費(fèi)組消費(fèi)offset規(guī)則以及指定分區(qū)等技術(shù)點(diǎn)配置,全面無(wú)死角,一篇文章拿下!

    Kafka篇——Kafka消費(fèi)者端常見(jiàn)配置,涵蓋自動(dòng)手動(dòng)提交offset、poll消息細(xì)節(jié)、健康狀態(tài)檢查、新消費(fèi)組消費(fèi)offset規(guī)則以及指定分區(qū)等技術(shù)點(diǎn)配置,全面無(wú)死角,一篇文章拿下!

    一、自動(dòng)提交offset 1、概念 Kafka中默認(rèn)是自動(dòng)提交offset。消費(fèi)者在poll到消息后默認(rèn)情況下,會(huì)自動(dòng)向Broker的_consumer_offsets主題提交當(dāng)前 主題-分區(qū)消費(fèi)的偏移量 2、自動(dòng)提交offset和手動(dòng)提交offset流程圖 3、在Java中實(shí)現(xiàn)配置 4、自動(dòng)提交offset問(wèn)題 自動(dòng)提交會(huì)丟消息。因?yàn)槿绻M(fèi)

    2024年01月22日
    瀏覽(22)
  • Kafka消費(fèi)者提交偏移量

    在Kafka中,偏移量(offset)是一個(gè)與分區(qū)相關(guān)的概念,用于跟蹤一個(gè)消費(fèi)者在分區(qū)中已經(jīng)處理的消息位置。每個(gè)分區(qū)都有自己的偏移量,用于記錄已經(jīng)傳遞給消費(fèi)者的消息的位置。 每個(gè)分區(qū)都有一個(gè)偏移量: Kafka中的每個(gè)分區(qū)都會(huì)維護(hù)一個(gè)偏移量,表示消費(fèi)者在該分區(qū)中的消

    2024年01月24日
    瀏覽(28)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式

    最簡(jiǎn)單的提交方式是讓消費(fèi)者自動(dòng)提交偏移量,自動(dòng)提交 offset 的相關(guān)參數(shù): enable.auto.commit:是否開(kāi)啟自動(dòng)提交 offset 功能,默認(rèn)為 true; auto.commit.interval.ms:自動(dòng)提交 offset 的時(shí)間間隔,默認(rèn)為5秒; 如果 enable.auto.commit 被設(shè)置為true,那么每過(guò)5秒,消費(fèi)者就會(huì)自動(dòng)提交 poll() 返

    2024年02月12日
    瀏覽(32)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包