国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka-消費者-KafkaConsumer分析-Rebalance

這篇具有很好參考價值的文章主要介紹了Kafka-消費者-KafkaConsumer分析-Rebalance。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

在開始介紹Rebalance操作的實現(xiàn)細節(jié)之前,我們需要明確在哪幾種情況下會觸發(fā)Rebalance操作:

  1. 有新的消費者加入Consumer Group。
  2. 有消費者宕機下線。消費者并不一定需要真正下線,例如遇到長時間的GC、網(wǎng)絡(luò)延遲導(dǎo)致消費者長時間未向GroupCoordinator發(fā)送HeartbeatRequest時,GroupCoordinator會認(rèn)為消費者下線。
  3. 有消費者主動退出Consumer Group。
  4. Consumer Group訂閱的任一Topic出現(xiàn)分區(qū)數(shù)量的變化。
  5. 消費者調(diào)用unsubscrible()取消對某Topic的訂閱。

第一階段

Rebalance操作的第一步就是查找GroupCoordinator,這個階段消費者會向Kafka集群中的任意一個Broker發(fā)送GroupCoordinatorRequest請求,并處理返回的GroupCoordinatorResponse響應(yīng)。

GroupCoordinatorRequest消息體的格式比較簡單,只包含了Consumer Group的id。GroupCoordinatorResponse消息體包含了錯誤碼(short類型)、coordinator的節(jié)點Id(int類型)、GroupCoordinator的host(String類型)、GroupCoordinator的端口號(int類型)。
發(fā)送GroupCoordinatorRequest請求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如圖所示。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式

  1. 首先檢測是否需要重新查找GroupCoordinator,主要是檢查coordinator字段是否為空以及與GroupCoordinator之間的連接是否正常。

  2. 查找集群負載最低的Node節(jié)點,并創(chuàng)建GroupCoordinatorRequest請求。調(diào)用client.send方法將請求放入unsent隊列中等待發(fā)送,并返回RequestFuture對象。返回的RequestFuture對象經(jīng)過了compose方法適配,原理同HeartbeatCompletionHandler。

  3. 調(diào)用ConsumerNetworkClient.poll(future)方法,將GroupCoordinatorRequest請求發(fā)送出去。此處使用阻塞的方式發(fā)送,直到收到GroupCoordinatorResponse響應(yīng)或異常完成,才從此方法返回。

  4. 檢測檢查RequestFuture對象的狀態(tài)。如果出現(xiàn)RetriableException異常,則調(diào)用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中記錄的集群元數(shù)據(jù)后跳轉(zhuǎn)到步驟1繼續(xù)執(zhí)行。如果不是RetriableException異常則直接報錯。

  5. 如果成功找到GroupCoordinator節(jié)點,但是網(wǎng)絡(luò)連接失敗,則將其unsent中對應(yīng)的請求清空,并將coordinator字段置為null,準(zhǔn)備重新查找GroupCoordinator,退避一段時間后跳轉(zhuǎn)到步驟1繼續(xù)執(zhí)行。

下面介紹處理GroupCoordinatorResponse的相關(guān)操作。通過對sendGroupCoordinatorRequest方法的分析我們知道,handleGroupMetadataResponse)方法是處理GroupCoordinatorResponse的入口,其步驟如下:

  1. 調(diào)用coordinatorUnknown()檢測是否已經(jīng)找到GroupCoordinator且成功連接。如果是則忽略此GroupCoordinatorResponse,因為在發(fā)送GroupCoordinatorRequest時并沒有防止重發(fā)的機制,可能有多個GroupCoordinatorResponse;否則,繼續(xù)下面的步驟。
  2. 解析GroupCoordinatorResponse得到服務(wù)端GroupCoordinator的信息。
  3. 構(gòu)建Node對象賦值給coordinator字段,并嘗試與GroupCoordinator建立連接。
  4. 啟動HeartbeatTask定時任務(wù)。
  5. 最后,調(diào)用RequestFuture.complete()方法將正常收到GroupCoordinatorResponse的事件傳播出去。
  6. 如果GroupCoordinatorResponse中的錯誤碼不為NONE,則調(diào)用RequestFuture.raise方法將異常傳播出去。最終由ensureCoordinatorReady方法中的步驟4處理。

第二階段

在成功查找到對應(yīng)的GroupCoordinator之后進入Join Group階段。在此階段,消費者會向GroupCoordinator發(fā)送JoinGroupRequest請求,并處理響應(yīng)。先來了解JoinGroupRequest和JoinGroupResponse的消息體格式,如圖所示。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再來分析第二階段的相關(guān)處理流程,其入口函數(shù)是ensurePartitionAssignment方法。

ensurePartitionAssignment方法的流程如圖所示。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式

  1. 調(diào)用SubscriptionState.partitionsAutoAssigned方法,檢測Consumer的訂閱是否是AUTO_TOPICS或AUTO_PATTERN。因為USER_ASSIGNED不需要進行Rebalance操作,而是由用戶手動指定分區(qū)。

  2. 如果訂閱模式是AUTO_PATTERN,則檢查Metadata是否需要更新。

    在前面提到過,在ConsumerCoordinator的構(gòu)造函數(shù)中為Metadata添加了監(jiān)聽器。當(dāng)Metadata更新時就會使用SubscriptionState中的正則表達式過濾Topic,并更改SubscriptionState中的訂閱信息。同時,也會使用metadataSnapshot字段記錄當(dāng)前的Metadata的快照。這里要更新Metadata的原因,是為了防止因使用過期的Metadata進行Rebalance操作而導(dǎo)致多次連續(xù)的Rebalance操作。

  3. 調(diào)用ConsumerCoordinator.needRejoin()方法判斷是要發(fā)送JoinGroupRequest加入ConsumerGroup,其實現(xiàn)是檢測是否使用了AUTO_TOPICS或AUTO_PATTERN模式,檢測rejoinNeeded和needsPartitionAssignment兩個字段的值。

  4. 調(diào)用onJoinPrepare方法進行發(fā)送JoinGroupRequest請求之前的準(zhǔn)備,做了三件事:一是如果開啟了自動提交offset則進行同步提交offset,提交offset的內(nèi)容后面會詳細介紹,此步驟可能會阻塞線程;二是調(diào)用注冊在SubscriptionState中的ConsumerRebalanceListener上的 回調(diào)方法;三是將SubscriptionState的needsPartitionAssignment字段設(shè)置為true并收縮groupSubscription集合。

  5. 再次調(diào)用needRejoin方法檢測,之后調(diào)用ensureCoordinatorReady方法檢測已經(jīng)找到GroupCoordinator且與之建立了連接。

  6. 如果還有發(fā)往GroupCoordinator所在Node的請求,則阻塞等待這些請求全部發(fā)送完成并收到響應(yīng)(即等待unsent及InFlightRequests的對應(yīng)隊列為空),然后返回步驟5繼續(xù)執(zhí)行,主要是為了避免重復(fù)發(fā)送JoinGroupRequest請求。

  7. 調(diào)用sendJoinGroupRequest方法創(chuàng)建JoinGroupRequest請求,并調(diào)用ConsumerNetworkClient.send方法將請求放入unsent中緩存,等待發(fā)送。

  8. 在步驟7返回的RequestFuture對象上添加RequestFutureListener。

  9. 調(diào)用ConsumerNetworkClient.poll方法發(fā)送JoinGroupRequest,這里會阻塞等待,直到收到JoinGroupResponse或出現(xiàn)異常。

  10. 檢測RequestFuture.fail。如果出現(xiàn)RetriableException異常則進行重試,其他異常則報錯。如果無異常,則整個第二階段操作完成。

通過前面對JoinGroupRequest發(fā)送流程的分析,我們了解到JoinGrouResponse處理流程的入口是JoinGroupResponseHandler:handle()方法,其中還包括了SyncGroupRequest發(fā)送的操作。

JoinGrouResponse的處理流程如圖所示。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式

  1. 解析JoinGroupResponse,獲取GroupCoordinator分配的memberld、generation等信息,更新到本地。
  2. 消費者根據(jù)leaderld檢測自己是不是Leader。如果是Leader則進入onJoinLeader方法,如果不是Leader則進入onJoinFollower方法。從上面的流程圖也可以看出,onJoinFollower()方法的邏輯是onJoinLeader()方法的子集,下面主要分析onJoinLeader方法。
  3. Leader根據(jù)JoinGroupResponse的group_protocol字段指定的Parition分配策略,查找相應(yīng)的PartitionAssignor對象。
  4. Leader將JoinGroupResponse的members字段進行反序列化,得到ConsumerGroup中全部消費者訂閱的Topic。Leader會將這些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower則只關(guān)心自己訂閱的Topic信息。
  5. 第4步可能有新的Topic添加進來,所以更新Metadata信息。
  6. 待Metadata更新完成后,會在assignmentSnapshot字段中存儲一個Metadata快照(即通過Metadata的Listener創(chuàng)建的快照)。
  7. 調(diào)用PartitionAssignor.assign()方法進行分區(qū)分配。
  8. 將分配結(jié)果序列化,保存到Map中返回,其中key是消費者的memberld,value是分配結(jié)果序列化后的ByteBuffer。

第三階段

完成分區(qū)分配之后就進入了Synchronizing Group State 階段,主要邏輯是向GroupCoordinator 發(fā)送 SyncGroupRequest 請求并處理 SyncGroupResponse 響應(yīng)。

先來了解SyncGroupRequest 和 SyncGroupResponse 的消息體格式。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式
SyncGroupRequest 中各個字段的含義如表

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式
SyncGroupResponse 中各個字段的含義如表

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式
通過前面對onJoinLeader方法分析,我們知道發(fā)送 SyncGroupRequest 請求的邏輯緊接在分區(qū)分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:

  1. 得到序列化后的分區(qū)分配結(jié)果后,Leader將其封裝成 SyncGroupRequest,而Follower形成的SyncGroupRequest中這部分為空集合。
  2. 調(diào)用ConsumerNetworkClient.send方法將請求放入unsent集合中等待發(fā)送。
    對SyncGroupResponse處理的入口是SyncGroupResponseHandler.handle方法。對于正常完成的情況,解析SyncGroupResponse,從中拿到分區(qū)分配結(jié)果并將其傳遞出去;對于出現(xiàn)異常情況,將rejoinNeeded設(shè)置為true,并針對不用的錯誤碼進行不同的處理。

從SyncGroupResponse中得到的分區(qū)分配結(jié)果最終由ConsumerCoordinator.onJoinComplete()方法處理,調(diào)用此方法的是在第二階段ensureActiveGroup方法的步驟8中添加的RequestFutureListener中調(diào)用。onJoinComplete()方法的流程如圖所示。

Kafka-消費者-KafkaConsumer分析-Rebalance,隊列,kafka,分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-809760.html

  1. 在第二階段Leader開始分配分區(qū)之前,Leader使用assignmentSnapshot字段記錄了Metadata快照。此時在Leader中,將此快照與最新的Metadata快照進行對比。如果快照不一致則表示分區(qū)分配過程中出現(xiàn)了Topic增刪或分區(qū)數(shù)量的變化,則將needsPartitionAssignment置為true,需重新進行分區(qū)分配。
  2. 反序列化拿到分配給當(dāng)前消費者的分區(qū),并添加到SubscriptionStata.assignment集合中,之后消費者會按照此集合指定的分區(qū)進行消費,將needsPartitionAssignment置為false。
  3. 調(diào)用PartitionAssignor的onAssignment()回調(diào)函數(shù),默認(rèn)是空實現(xiàn)。當(dāng)用戶自定義PartitionAssignor時,可以自定義此方法。
  4. 如果開啟了自動提交offset的功能,則重新啟動AutoCommitTask定時任務(wù)。
  5. 調(diào)用SubscriptionState中注冊的ConsumerRebalanceListener。
  6. 將needsJoinPrepare重置為true,為下次Rebalance操作做準(zhǔn)備。
  7. 重啟HeartbeatTask定時任務(wù),定時發(fā)送心跳。

到了這里,關(guān)于Kafka-消費者-KafkaConsumer分析-Rebalance的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka-消費者-KafkaConsumer分析-Heartbeat

    Kafka-消費者-KafkaConsumer分析-Heartbeat

    在前面分析Rebalance操作的原理時介紹到,消費者定期向服務(wù)端的GroupCoordinator發(fā)送HeartbeatRequest來確定彼此在線。 下面就來詳細分析KafkaConsumer中Heartbeat的相關(guān)實現(xiàn)。 首先了解一下心跳請求和響應(yīng)的格式。HeartbeatRequest的消息體格式比較簡單,依次包含group_id(String)、group_generati

    2024年01月20日
    瀏覽(26)
  • Kafka-消費者-KafkaConsumer分析-Rebalance

    Kafka-消費者-KafkaConsumer分析-Rebalance

    在開始介紹Rebalance操作的實現(xiàn)細節(jié)之前,我們需要明確在哪幾種情況下會觸發(fā)Rebalance操作: 有新的消費者加入Consumer Group。 有消費者宕機下線。消費者并不一定需要真正下線,例如遇到長時間的GC、網(wǎng)絡(luò)延遲導(dǎo)致消費者長時間未向GroupCoordinator發(fā)送HeartbeatRequest時,GroupCoordina

    2024年01月20日
    瀏覽(19)
  • 多個消費者訂閱一個Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    記錄 :466 場景 :一個KafkaProducer在一個Topic發(fā)布消息,多個消費者KafkaConsumer訂閱Kafka的Topic。每個KafkaConsumer指定一個特定的ConsumerGroup,達到一條消息被多個不同的ConsumerGroup消費。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安裝 :https://blog.csdn.net/zha

    2024年02月16日
    瀏覽(47)
  • kafka在創(chuàng)建KafkaConsumer消費者時,發(fā)生Exception in thread “main“ org.apache.kafka.common.KafkaException: Faile

    kafka在創(chuàng)建KafkaConsumer消費者時,發(fā)生Exception in thread “main“ org.apache.kafka.common.KafkaException: Faile

    原因:可能是序列化和反序列化沒正確使用。將以下代碼修改正確再次運行。 將以上代碼的 StringDeserializer 反序列化,確認(rèn)無誤?。?!

    2024年02月13日
    瀏覽(23)
  • 保障效率與可用,分析Kafka的消費者組與Rebalance機制

    保障效率與可用,分析Kafka的消費者組與Rebalance機制

    上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot對接Kafka 架構(gòu)必備能力——kafka的選型對比及應(yīng)用場景 Kafka存取原理與實現(xiàn)分析,打破面試難關(guān) 防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實踐 我們上一期從可靠性分析了消息

    2024年02月06日
    瀏覽(18)
  • 13、Kafka ------ kafka 消費者API用法(消費者消費消息代碼演示)

    13、Kafka ------ kafka 消費者API用法(消費者消費消息代碼演示)

    消費者API的核心類是 KafkaConsumer,它提供了如下常用方法: 下面這些方法都體現(xiàn)了Kafka是一個數(shù)據(jù)流平臺,消費者通過這些方法可以從分區(qū)的任意位置、重新開始讀取數(shù)據(jù)。 根據(jù)KafkaConsumer不難看出,使用消費者API拉取消息很簡單,基本只要幾步: 1、創(chuàng)建KafkaConsumer對象,創(chuàng)建

    2024年04月11日
    瀏覽(30)
  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責(zé)訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應(yīng)的消費組。當(dāng)消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)。 kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個數(shù)據(jù),某個topic有兩個partition,一

    2024年01月22日
    瀏覽(161)
  • Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    創(chuàng)建一個消費者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費請求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認(rèn)1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達到的超時時間,默認(rèn)500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • 10、Kafka ------ 消費者組 和 消費者實例,分區(qū) 和 消費者實例 之間的分配策略

    10、Kafka ------ 消費者組 和 消費者實例,分區(qū) 和 消費者實例 之間的分配策略

    形象來說:你可以把主題內(nèi)的多個分區(qū)當(dāng)成多個子任務(wù)、多個子任務(wù)組成項目,每個消費者實例就相當(dāng)于一個員工,假如你們 team 包含2個員工。 同理: 同一主題下,每個分區(qū)最多只會分給同一個組內(nèi)的一個消費者實例 消費者以組的名義來訂閱主題,前面的 kafka-console-consu

    2024年01月19日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包