国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka之offset位移

這篇具有很好參考價值的文章主要介紹了Kafka之offset位移。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

首先回顧下 offset 的定義

offset :在 Apache Kafka 中,offset 是一個用來唯一標識消息在分區(qū)中位置的數(shù)字。每個分區(qū)中的消息都會被分配一個唯一的 offset 值,用來表示該消息在該分區(qū)中的位置。消費者可以通過記錄自己消費的最后一個 offset 值來跟蹤自己消費消息的進度,確保不會漏掉消息或者重復消費消息。通過管理 offset,Kafka 實現(xiàn)了高效的消息傳遞和消費處理。

在 kafka0.9版本之前,consumer 默認將 offset 保存在 Zookeeper 中,但在0.9版本之后,offset被保存在 Kafka 一個內(nèi)置的 topic 中,該 topic 為 __consumer_offsets

__consumer_offsets :采用 KV 鍵值對的方式存儲,key:group.id + topic +?分區(qū)號,value :當前 offset 的值

__consumer_offsets 既然作為一個 topic 存在與 Kafka 中,那么它也可以通過消費者消費數(shù)據(jù)的方式進行消費。

自動提交 offset

在 Kafka 所提供的API中,enable.auto.commit 參數(shù)的值表示是否開啟自動提交 offset,默認為 true,消費者會自動周期性地向服務(wù)器提交偏移量,而 auto.commit.interval.ms 則表示自動提交?offset?的時間間隔,默認是5s。

核心代碼的實現(xiàn)

public class CustomConsumerAutoOffset {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 連接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消費者組id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // 自動提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        // 提交時間間隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

        // 1 創(chuàng)建一個消費者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 訂閱主題first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消費數(shù)據(jù)
        while (true) {

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }


    }
}

手動提交 offset

相比起自動提交,手動提交可以讓開發(fā)者更加容易的把握提交時機,同樣手動提交也分為同步提交(commitSync)和異步提交(commitAsync)

commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。

commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。

首先將?enable.auto.commit 參數(shù)的值改為 false

// 手動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

然后在消費數(shù)據(jù)的代碼中,加入手動提交 offset?

        // 3 消費數(shù)據(jù)
        while (true) {

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            // 手動提交offset(同步提交)
//            kafkaConsumer.commitSync();
            // 異步提交(主要)
            kafkaConsumer.commitAsync();
        }

指定 offset 消費

當消費者組第一次消費或者服務(wù)器上不再存在當前偏移量時,可以通過設(shè)置 auto.offset.reset 參數(shù)來指定偏移量的重置策略。

  1. 如果設(shè)置為?earliest?,則會將偏移量重置為最早的可用偏移量,相當于從最早的消息開始消費(即 --from-beginning)。
  2. 如果設(shè)置為 latest(默認值),則會將偏移量重置為最新的偏移量,即從最新的消息開始消費。
  3. 如果設(shè)置為 none,當未找到消費者組的先前偏移量時,會向消費者拋出異常。
  4. 也可以通過任意指定 offset 位移來開始消費。

主要介紹下通過任意指定 offset 位移來開始消費。在消費者代碼的基礎(chǔ)上,指定所要消費的位置,以及指定 offset

        // 指定位置進行消費
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        // 保證分區(qū)分配方案已經(jīng)指定完畢
        while (assignment.size() == 0){
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        // 指定offset
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition, 600);
        }

指定時間消費

除了上面說的指定 offset 進行消費,也可以指定時間進行消費,比如指定消費前一天以后的數(shù)據(jù)

核心思路是,將想要指定的時間轉(zhuǎn)換為對應(yīng)的 offset 值,會用到 Kafka 所提供的 API:offsetsForTimes (這里的邏輯比較饒,這里只做介紹)

核心代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-846998.html

        // 指定位置進行消費
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        // 保證分區(qū)分配方案已經(jīng)指定完畢
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        // 希望將時間轉(zhuǎn)換為對應(yīng)的offset
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

        // 封裝對應(yīng)集合
        for (TopicPartition topicPartition : assignment) {
            topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

            Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

        // 指定offset
        for (TopicPartition topicPartition : assignment) {

            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());

        }
 

到了這里,關(guān)于Kafka之offset位移的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 分布式消息服務(wù)kafka

    分布式消息服務(wù)kafka

    什么是消息中間件? 消息中間件是分布式系統(tǒng)中重要的組件,本質(zhì)就是一個具有接收消息、存儲消息、分發(fā)消息的隊列,應(yīng)用程序通過讀寫隊列消息來通信。 例如:在淘寶購物時,訂單系統(tǒng)處理完訂單后,把訂單消息發(fā)送到消息中間件中,由消息中間件將訂單消息分發(fā)到下

    2024年02月01日
    瀏覽(23)
  • 【分布式應(yīng)用】kafka集群、Filebeat+Kafka+ELK搭建

    【分布式應(yīng)用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并發(fā)環(huán)境下,同步請求來不及處理,請求往往會發(fā)生阻塞。比如大量的請求并發(fā)訪問數(shù)據(jù)庫,導致行鎖表鎖,最后請求線程會堆積過多,從而觸發(fā) too many connection 錯誤,引發(fā)雪崩效應(yīng)。 我們使用消息隊列,通過異步處理請求,從而緩解系統(tǒng)的壓力。消息隊

    2024年02月16日
    瀏覽(96)
  • 【分布式技術(shù)】消息隊列Kafka

    【分布式技術(shù)】消息隊列Kafka

    目錄 一、Kafka概述 二、消息隊列Kafka的好處 三、消息隊列Kafka的兩種模式 四、Kafka 1、Kafka 定義 2、Kafka 簡介 3、Kafka 的特性 五、Kafka的系統(tǒng)架構(gòu) 六、實操部署Kafka集群 ?步驟一:在每一個zookeeper節(jié)點上完成kafka部署 ?編輯 步驟二:傳給其他節(jié)點 步驟三:啟動3個節(jié)點 kafka管理

    2024年01月23日
    瀏覽(27)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    生產(chǎn)者發(fā)送消息流程參考圖1: 先從創(chuàng)建一個ProducerRecord對象開始,其中需要包含目標主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時間戳或標頭。在發(fā)送ProducerRecord對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 接下來,如果沒有顯式

    2024年02月13日
    瀏覽(28)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應(yīng)的消費組。當消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    01. 創(chuàng)建消費者 在讀取消息之前,需要先創(chuàng)建一個KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費者的屬性放在Properties對象里。 為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • 分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    01. Kafka 消費者分區(qū)再均衡是什么? 消費者群組里的消費者共享主題分區(qū)的所有權(quán)。當一個新消費者加入群組時,它將開始讀取一部分原本由其他消費者讀取的消息。當一個消費者被關(guān)閉或發(fā)生崩潰時,它將離開群組,原本由它讀取的分區(qū)將由群組里的其他消費者讀取。 分區(qū)

    2024年02月12日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因,就是為了實現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包