国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka3.0.0版本——消費者(手動提交offset)

這篇具有很好參考價值的文章主要介紹了Kafka3.0.0版本——消費者(手動提交offset)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、消費者(手動提交 offset)的概述

1.1、手動提交offset的兩種方式

  • commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。
  • commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。

1.2、手動提交offset兩種方式的區(qū)別

  • 相同點:都會將本次提交的一批數(shù)據(jù)最高的偏移量提交。
  • 不同點是:同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失敗);而異步提交則沒有失敗重試機制,故有可能提交失敗。

1.3、手動提交offset的圖解

Kafka3.0.0版本——消費者(手動提交offset),kafka,kafka

二、消費者(手動提交 offset)的代碼示例

2.1、手動提交 offset(采用同步提交的方式)代碼

  • 同步提交代碼
    由于同步提交 offset 有失敗重試機制,故更加可靠,但是由于一直等待提交結(jié)果,提交的效率比較低。

     // 是否自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手動提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代碼

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 連接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消費者組id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 手動提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 1 創(chuàng)建一個消費者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 訂閱主題 sevenTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消費數(shù)據(jù)
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
    
                // 手動提交offset(同步提交)
                kafkaConsumer.commitSync();
            }
        }
    }
    

2.1、手動提交 offset(采用異步提交的方式)代碼

  • 異步提交代碼
    雖然同步提交 offset 更可靠一些,但是由于其會阻塞當前線程,直到提交成功。因此吞吐量會受到很大的影響。因此更多的情況下,會選用異步提交 offset的方式。

     // 是否自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手動提交offset(異步提交)
    kafkaConsumer.commitAsync();
    
  • 異步提交完整代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-709427.html

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 連接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消費者組id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 手動提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 1 創(chuàng)建一個消費者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 訂閱主題 sevenTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消費數(shù)據(jù)
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
                // 手動提交offset(異步提交)
                kafkaConsumer.commitAsync();
            }
        }
    }
    

到了這里,關(guān)于Kafka3.0.0版本——消費者(手動提交offset)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    創(chuàng)建一個消費者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費請求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達到的超時時間,默認500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • Kafka3.0.0版本——消費者(獨立消費者消費某一個主題數(shù)據(jù)案例__訂閱主題)

    Kafka3.0.0版本——消費者(獨立消費者消費某一個主題數(shù)據(jù)案例__訂閱主題)

    1.1、案例需求 創(chuàng)建一個獨立消費者,消費firstTopic主題中數(shù)據(jù),所下圖所示: 注意:在消費者 API 代碼中必須配置消費者組 id。命令行啟動消費者不填寫消費者組id 會被自動填寫隨機的消費者組 id。 1.2、案例代碼 代碼 1.3、測試 在 Kafka 集群控制臺,創(chuàng)建firstTopic主題 在 IDEA中

    2024年02月09日
    瀏覽(26)
  • Kafka3.0.0版本——消費者(分區(qū)的分配以及再平衡)

    Kafka3.0.0版本——消費者(分區(qū)的分配以及再平衡)

    1.1、消費者分區(qū)及消費者組的概述 一個consumer group中有多個consumer組成,一個 topic有多個partition組成。 1.2、如何確定哪個consumer來消費哪個partition的數(shù)據(jù) Kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通過配置參數(shù) partition.assignment.strategy ,修改分

    2024年02月07日
    瀏覽(28)
  • Kafka3.0.0版本——消費者(獨立消費者消費某一個主題中某個分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    Kafka3.0.0版本——消費者(獨立消費者消費某一個主題中某個分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    1.1、案例需求 創(chuàng)建一個獨立消費者,消費firstTopic主題 0 號分區(qū)的數(shù)據(jù),所下圖所示: 1.2、案例代碼 生產(chǎn)者往firstTopic主題 0 號分區(qū)發(fā)送數(shù)據(jù)代碼 消費者消費firstTopic主題 0 分區(qū)數(shù)據(jù)代碼 1.3、測試 在 IDEA 中執(zhí)行消費者程序,如下圖: 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺觀察

    2024年02月09日
    瀏覽(34)
  • Kafka3.0.0版本——消費者(Sticky分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費者(Sticky分區(qū)分配策略以及再平衡)

    粘性分區(qū)定義:可以理解為分配的結(jié)果帶有“粘性的”。即在執(zhí)行一次新的分配之前,考慮上一次分配的結(jié)果,盡量少的調(diào)整分配的變動,可以節(jié)省大量的開銷。 粘性分區(qū)是 Kafka 從 0.11.x 版本開始引入這種分配策略, 首先會盡量均衡的放置分區(qū)到消費者上面, 在出現(xiàn)同一消

    2024年02月09日
    瀏覽(45)
  • Kafka3.0.0版本——消費者(RoundRobin分區(qū)分配策略以及再平衡)

    Kafka3.0.0版本——消費者(RoundRobin分區(qū)分配策略以及再平衡)

    RoundRobin 針對集群中 所有Topic而言。 RoundRobin 輪詢分區(qū)策略,是把 所有的 partition 和所有的consumer 都列出來 ,然后 按照 hashcode 進行排序 ,最后通過 輪詢算法 來分配 partition 給到各個消費者。 2.1、創(chuàng)建帶有7個分區(qū)的sixTopic主題 在 Kafka 集群控制臺,創(chuàng)建帶有7個分區(qū)的sixTopi

    2024年02月07日
    瀏覽(22)
  • Kafka:消費者手動提交

    雖然自動提交offset十分簡單便利,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機。 兩種手動提交方式: commitSync(同步提交): 必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。 同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素

    2024年02月11日
    瀏覽(22)
  • Spring Boot 整合kafka:生產(chǎn)者ack機制和消費者AckMode消費模式、手動提交ACK

    Kafka 生產(chǎn)者的 ACK 機制指的是生產(chǎn)者在發(fā)送消息后,對消息副本的確認機制。ACK 機制可以幫助生產(chǎn)者確保消息被成功寫入 Kafka 集群中的多個副本,并在需要時獲取確認信息。 Kafka 提供了三種 ACK 機制的配置選項,分別是: acks=0:生產(chǎn)者在成功將消息發(fā)送到網(wǎng)絡(luò)緩沖區(qū)后即視

    2024年02月04日
    瀏覽(28)
  • Kafka篇——Kafka消費者端常見配置,涵蓋自動手動提交offset、poll消息細節(jié)、健康狀態(tài)檢查、新消費組消費offset規(guī)則以及指定分區(qū)等技術(shù)點配置,全面無死角,一篇文章拿下!

    Kafka篇——Kafka消費者端常見配置,涵蓋自動手動提交offset、poll消息細節(jié)、健康狀態(tài)檢查、新消費組消費offset規(guī)則以及指定分區(qū)等技術(shù)點配置,全面無死角,一篇文章拿下!

    一、自動提交offset 1、概念 Kafka中默認是自動提交offset。消費者在poll到消息后默認情況下,會自動向Broker的_consumer_offsets主題提交當前 主題-分區(qū)消費的偏移量 2、自動提交offset和手動提交offset流程圖 3、在Java中實現(xiàn)配置 4、自動提交offset問題 自動提交會丟消息。因為如果消費

    2024年01月22日
    瀏覽(22)
  • Kafka3.0.0版本——手動調(diào)整分區(qū)副本示例

    Kafka3.0.0版本——手動調(diào)整分區(qū)副本示例

    四臺服務(wù)器 原始服務(wù)器名稱 原始服務(wù)器ip 節(jié)點 centos7虛擬機1 192.168.136.27 broker0 centos7虛擬機2 192.168.136.28 broker1 centos7虛擬機3 192.168.136.29 broker2 centos7虛擬機4 192.168.136.30 broker3 2.1、先啟動zookeeper集群 啟動zookeeper集群 2.2、再啟動kafka集群 啟動kafka集群 3.1、手動調(diào)整分區(qū)副本的前提

    2024年02月11日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包