国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

為什么kafka 需要 subscribe 的 group.id?我們是否需要使用 commitSync 手動提交偏移量?

這篇具有很好參考價值的文章主要介紹了為什么kafka 需要 subscribe 的 group.id?我們是否需要使用 commitSync 手動提交偏移量?。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、為什么需要帶有 subscribe 的 group.id

  • 消費概念:
    Kafka 使用消費者組的概念來實現(xiàn)主題的并行消費 - 每條消息都將在每個消費者組中傳遞一次,無論該組中實際有多少個消費者。所以 group 參數(shù)是強制性的,如果沒有組,Kafka 將不知道如何對待訂閱同一主題的其他消費者。
  • 偏移量
    每當(dāng)我們啟動一個消費者時,它都會加入一個消費者組,然后根據(jù)該消費者組中的其他消費者數(shù)量,為其分配要讀取的分區(qū)。對于這些分區(qū),它會檢查列表讀取偏移量是否已知,如果找到,它將從這一點開始讀取消息。如果沒有找到偏移量,則參數(shù) auto.offset.reset 控制是從分區(qū)中最早的消息還是從最新的消息開始讀取。

二、我們需要使用commitSync手動提交偏移量嗎?

  • 是否需要手動提交偏移?
    是否需要提交偏移量取決于作為參數(shù) enable.auto.commit 選擇的值。默認情況下,此設(shè)置為 true,這意味著消費者將定期自動提交其偏移量(由auto.commit.interval.ms 決定提交的頻率)。如果將其設(shè)置為 false,那么將需要自己提交偏移量。這種默認行為可能也是導(dǎo)致很多發(fā)現(xiàn) kafka 總是從最新的開始消費的原因,由于偏移量是自動提交的,因此它將使用該偏移量。

  • 有沒有辦法從頭開始重播消息?
    如果想每次都從頭開始讀取,可以調(diào)用seekToBeginning,如果不帶參數(shù)調(diào)用,它將重置為所有訂閱分區(qū)中的第一條消息,或者僅重置您傳入的那些分區(qū)。

  • seekToBeginning
    查找每個給定分區(qū)的第一個偏移量。poll(long) 該函數(shù)延遲計算,僅在調(diào)用或時才查找所有分區(qū)中的第一個偏移量position(TopicPartition)。如果未提供分區(qū),則查找所有當(dāng)前分配的分區(qū)的第一個偏移量。

    public class MyListener implements ConsumerSeekAware {
    
    ...
    
      @Override
      public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
          callback.seekToBeginning(assignments.keySet());
      }
    
    }
    
  • 有沒有辦法從最后開始重播消息?
    有的,可以使用 seekToEnd() 查找所有分配的分區(qū)到最后?;蛘呤褂?seekToTimestamp(long time)- 查找所有分配的分區(qū)到該時間戳表示的偏移量。

    public class MyListener extends AbstractConsumerSeekAware {
    
      @KafkaListener(...)
      void listn(...) {
          ...
      }
    }
    
    public class SomeOtherBean {
    
      MyListener listener;
    
      ...
    
      void someMethod() {
          this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
      }
    
    }
    

三、如果我想手動提交偏移量,該怎么做?

  • 1、禁用自動提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  • 提交方法
    對于手動提交,KafkaConsumers提供了兩種方法,即 commitSync() 和 commitAsync()。commitSync()是一個阻塞調(diào)用,在偏移量成功提交后返回,commitAsync()則立即返回。如果想知道提交是否成功,可以為回調(diào)處理程序 ( OffsetCommitCallback) 提供一個方法參數(shù)。請注意,在兩次提交調(diào)用中,消費者都會提交最新poll()調(diào)用的偏移量。
    舉個例子:假設(shè)一個分區(qū)主題有一個消費者并且最后一次調(diào)用poll()返回偏移量為 4、5、6 的消息。提交時,偏移量 6 將被提交,因為這是消費者客戶端跟蹤的最新偏移量。
    同時,commitSync() 和 commitAsync() 都允許更多地控制我們想要提交的偏移量:如果你使用允許你指定的相應(yīng)重載,那么Map<TopicPartition, OffsetAndMetadata>消費者將僅提交指定的偏移量(即,映射可以包含分配的分區(qū)的任何子集) ,并且指定的偏移量可以為任意值)。

  • 同步提交:
    阻塞線程,直到提交成功或遇到不可恢復(fù)的錯誤(在這種情況下,它被拋出給調(diào)用者)

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitSync();
      }
    }
    

    對于 for 循環(huán)中的每次迭代,只有在consumer.commitSync()成功返回或因拋出異常而中斷后,代碼才會移至下一次迭代。

  • 異步提交:
    是一種非阻塞方法。調(diào)用它不會阻塞線程。相反,它將繼續(xù)處理以下指令,無論最終是成功還是失敗。

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitAsync(callback);
      }
    }
    

    對于 for 循環(huán)中的每次迭代,無論consumer.commitAsync()最終會發(fā)生什么,代碼都會移至下一次迭代。并且,提交的結(jié)果將由定義的回調(diào)函數(shù)處理。

  • 權(quán)衡:延遲與數(shù)據(jù)一致性
    1、如果必須確保數(shù)據(jù)一致性,請選擇commitSync(),因為它將確保在執(zhí)行任何進一步操作之前,你將知道偏移量提交是成功還是失敗。但由于它是同步和阻塞的,你將花費更多的時間來等待提交完成,這會導(dǎo)致高延遲。
    2、如果可以接受某些數(shù)據(jù)不一致并希望具有低延遲,請選擇commitAsync(),因為它不會等待完成。相反,它只會發(fā)出提交請求并稍后處理來自 Kafka 的響應(yīng)(成功或失?。?,同時代碼將繼續(xù)執(zhí)行。文章來源地址http://www.zghlxwxcb.cn/news/detail-658353.html

到了這里,關(guān)于為什么kafka 需要 subscribe 的 group.id?我們是否需要使用 commitSync 手動提交偏移量?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 視覺化洞察:為什么我們需要數(shù)據(jù)可視化?

    視覺化洞察:為什么我們需要數(shù)據(jù)可視化?

    為什么我們需要數(shù)據(jù)可視化?這個問題在信息時代變得愈發(fā)重要。數(shù)據(jù),如今已成為生活的一部分,我們每天都在產(chǎn)生大量的數(shù)據(jù),從社交媒體到購物記錄,從健康數(shù)據(jù)到工作表現(xiàn),數(shù)據(jù)無處不在。然而,數(shù)據(jù)本身通常是冷冰冰的數(shù)字,對于大多數(shù)人而言,理解和分析這些數(shù)

    2024年02月10日
    瀏覽(31)
  • 什么是Web3.0?為什么我們需要 Web 3.0

    什么是Web3.0?為什么我們需要 Web 3.0

    為了更好地理解什么是 Web 3.0,我們需要知道什么是 Web 1.0 和 2.0。 為了不讓你厭煩,這里簡單的解釋一下: WEB 3.0 例子:xiaqo.com Web 1.0 ?—— 信息僅從網(wǎng)站傳遞給用戶。 Web 2.0 ?—— 信息是雙向的。 用戶可以與網(wǎng)站交互互動。 Web 3.0 ?—— 偉大的超越。 信息變得開放、分散

    2024年02月03日
    瀏覽(25)
  • 【云原生-白皮書】簡章1:為什么我們需要云原生架構(gòu)?

    【云原生-白皮書】簡章1:為什么我們需要云原生架構(gòu)?

    聲明:本文為《阿里云云原生架構(gòu)核心技術(shù)白皮書》的一些讀書筆記與感想。 一文大致了解云原生架構(gòu)模式特點傳送門:五分鐘了解云原生的架構(gòu)模式 聲明:本文是閱讀阿里云云原生架構(gòu)核心技術(shù)白皮書的一些讀書筆記與感想。 云原生架構(gòu)是一種創(chuàng)新的軟件開發(fā)方法,專為

    2023年04月26日
    瀏覽(25)
  • 什么是分布式操作系統(tǒng)?我們?yōu)槭裁葱枰植际讲僮飨到y(tǒng)?

    什么是分布式操作系統(tǒng)?我們?yōu)槭裁葱枰植际讲僮飨到y(tǒng)?

    分布式操作系統(tǒng)是一種特殊的操作系統(tǒng),本質(zhì)上屬于多機操作系統(tǒng),是傳統(tǒng)單機操作系統(tǒng)的發(fā)展和延伸。它是將一個計算機系統(tǒng)劃分為多個獨立的計算單元(或者也可稱為節(jié)點),這些節(jié)點被部署到每臺計算機上,然后被網(wǎng)絡(luò)連接起來,并保持著持續(xù)的通信狀態(tài)。在分布式操作

    2024年02月16日
    瀏覽(37)
  • 【Golang】三分鐘讓你快速了解Go語言&為什么我們需要Go語言?

    【Golang】三分鐘讓你快速了解Go語言&為什么我們需要Go語言?

    博主簡介: 努力學(xué)習(xí)的大一在校計算機專業(yè)學(xué)生,熱愛學(xué)習(xí)和創(chuàng)作。目前在學(xué)習(xí)和分享:數(shù)據(jù)結(jié)構(gòu)、Go,Java等相關(guān)知識。 博主主頁: @是瑤瑤子啦 所屬專欄: Go語言核心編程 近期目標(biāo): 寫好專欄的每一篇文章 Go 語言從 2009 年 9 月 21 日開始作為谷歌公司 20% 兼職項目,即相關(guān)

    2023年04月21日
    瀏覽(29)
  • Linux drm內(nèi)存管理(一) 淺談TTM與GEM,為什么我們需要TTM和GEM?

    @[TOC](Linux drm內(nèi)存管理(一) 為什么我們需要TTM和GEM?) 系列文章(更新中): Linux drm內(nèi)存管理(二) TTM內(nèi)存管理基礎(chǔ)概念 ??目前Kernel中DRM中GPU的VRAM(GPU片上顯存)的管理框架是有GEM和TTM,其中TTM早于GEM出現(xiàn),GEM的出現(xiàn)是為了解決TTM復(fù)雜的使用方法,將大部分的VRAM管理實現(xiàn)邏輯交由

    2023年04月20日
    瀏覽(29)
  • Python(一):為什么我們要學(xué)習(xí)Python?

    Python(一):為什么我們要學(xué)習(xí)Python?

    ?? 專欄簡介:本專欄記錄了我個人從零開始學(xué)習(xí)Python編程的過程。在這個專欄中,我將分享我在學(xué)習(xí)Python的過程中的學(xué)習(xí)筆記、學(xué)習(xí)路線以及各個知識點。 ?? 專欄適用人群 :本專欄適用于希望學(xué)習(xí)Python編程的初學(xué)者和有一定編程基礎(chǔ)的人。無論你是學(xué)生、職場人士還是

    2024年02月13日
    瀏覽(95)
  • Elasticsearch:什么是向量和向量存儲數(shù)據(jù)庫,我們?yōu)槭裁搓P(guān)心?

    Elasticsearch:什么是向量和向量存儲數(shù)據(jù)庫,我們?yōu)槭裁搓P(guān)心?

    Elasticsearch 從 7.3 版本開始支持向量搜索。從 8.0 開始支持帶有 HNSW 的 ANN 向量搜索。目前 Elasticsearch 已經(jīng)是全球下載量最多的向量數(shù)據(jù)庫。它允許使用密集向量和向量比較來搜索文檔。 向量搜索在人工智能和機器學(xué)習(xí)領(lǐng)域有許多重要的應(yīng)用。 有效存儲和檢索向量的數(shù)據(jù)庫對于

    2024年02月08日
    瀏覽(31)
  • 數(shù)據(jù)結(jié)構(gòu)與算法這么難,為什么我們還要學(xué)習(xí)?

    提到數(shù)據(jù)結(jié)構(gòu)與算法,就一定會伴隨著諸多所謂的堅持和抱怨。同時,還有兩個詞總是出現(xiàn),一個是內(nèi)功,是對知識的定位,一個是吃透,是對自己

    2024年01月19日
    瀏覽(29)
  • 為什么需要uboot?

    bootROM: 一種固化在芯片內(nèi)部的只讀存儲器(ROM),用于啟動和初始化系統(tǒng)。BootROM 中通常包含了一些預(yù)先編寫好的代碼,用于完成系統(tǒng)啟動前的基本初始化和配置, 例如初始化時鐘、GPIO控制器、中斷控制器、存儲設(shè)備(SD卡、NAND Flash、SPicy Flash)等硬件資源, 檢測啟動設(shè)備

    2023年04月23日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包