在開始介紹Rebalance操作的實現(xiàn)細節(jié)之前,我們需要明確在哪幾種情況下會觸發(fā)Rebalance操作:
- 有新的消費者加入Consumer Group。
- 有消費者宕機下線。消費者并不一定需要真正下線,例如遇到長時間的GC、網(wǎng)絡(luò)延遲導(dǎo)致消費者長時間未向GroupCoordinator發(fā)送HeartbeatRequest時,GroupCoordinator會認(rèn)為消費者下線。
- 有消費者主動退出Consumer Group。
- Consumer Group訂閱的任一Topic出現(xiàn)分區(qū)數(shù)量的變化。
- 消費者調(diào)用unsubscrible()取消對某Topic的訂閱。
第一階段
Rebalance操作的第一步就是查找GroupCoordinator,這個階段消費者會向Kafka集群中的任意一個Broker發(fā)送GroupCoordinatorRequest請求,并處理返回的GroupCoordinatorResponse響應(yīng)。
GroupCoordinatorRequest消息體的格式比較簡單,只包含了Consumer Group的id。GroupCoordinatorResponse消息體包含了錯誤碼(short類型)、coordinator的節(jié)點Id(int類型)、GroupCoordinator的host(String類型)、GroupCoordinator的端口號(int類型)。
發(fā)送GroupCoordinatorRequest請求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如圖所示。
-
首先檢測是否需要重新查找GroupCoordinator,主要是檢查coordinator字段是否為空以及與GroupCoordinator之間的連接是否正常。
-
查找集群負載最低的Node節(jié)點,并創(chuàng)建GroupCoordinatorRequest請求。調(diào)用client.send方法將請求放入unsent隊列中等待發(fā)送,并返回RequestFuture對象。返回的RequestFuture對象經(jīng)過了compose方法適配,原理同HeartbeatCompletionHandler。
-
調(diào)用ConsumerNetworkClient.poll(future)方法,將GroupCoordinatorRequest請求發(fā)送出去。此處使用阻塞的方式發(fā)送,直到收到GroupCoordinatorResponse響應(yīng)或異常完成,才從此方法返回。
-
檢測檢查RequestFuture對象的狀態(tài)。如果出現(xiàn)RetriableException異常,則調(diào)用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中記錄的集群元數(shù)據(jù)后跳轉(zhuǎn)到步驟1繼續(xù)執(zhí)行。如果不是RetriableException異常則直接報錯。
-
如果成功找到GroupCoordinator節(jié)點,但是網(wǎng)絡(luò)連接失敗,則將其unsent中對應(yīng)的請求清空,并將coordinator字段置為null,準(zhǔn)備重新查找GroupCoordinator,退避一段時間后跳轉(zhuǎn)到步驟1繼續(xù)執(zhí)行。
下面介紹處理GroupCoordinatorResponse的相關(guān)操作。通過對sendGroupCoordinatorRequest方法的分析我們知道,handleGroupMetadataResponse)方法是處理GroupCoordinatorResponse的入口,其步驟如下:
- 調(diào)用coordinatorUnknown()檢測是否已經(jīng)找到GroupCoordinator且成功連接。如果是則忽略此GroupCoordinatorResponse,因為在發(fā)送GroupCoordinatorRequest時并沒有防止重發(fā)的機制,可能有多個GroupCoordinatorResponse;否則,繼續(xù)下面的步驟。
- 解析GroupCoordinatorResponse得到服務(wù)端GroupCoordinator的信息。
- 構(gòu)建Node對象賦值給coordinator字段,并嘗試與GroupCoordinator建立連接。
- 啟動HeartbeatTask定時任務(wù)。
- 最后,調(diào)用RequestFuture.complete()方法將正常收到GroupCoordinatorResponse的事件傳播出去。
- 如果GroupCoordinatorResponse中的錯誤碼不為NONE,則調(diào)用RequestFuture.raise方法將異常傳播出去。最終由ensureCoordinatorReady方法中的步驟4處理。
第二階段
在成功查找到對應(yīng)的GroupCoordinator之后進入Join Group階段。在此階段,消費者會向GroupCoordinator發(fā)送JoinGroupRequest請求,并處理響應(yīng)。先來了解JoinGroupRequest和JoinGroupResponse的消息體格式,如圖所示。
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再來分析第二階段的相關(guān)處理流程,其入口函數(shù)是ensurePartitionAssignment方法。
ensurePartitionAssignment方法的流程如圖所示。
-
調(diào)用SubscriptionState.partitionsAutoAssigned方法,檢測Consumer的訂閱是否是AUTO_TOPICS或AUTO_PATTERN。因為USER_ASSIGNED不需要進行Rebalance操作,而是由用戶手動指定分區(qū)。
-
如果訂閱模式是AUTO_PATTERN,則檢查Metadata是否需要更新。
在前面提到過,在ConsumerCoordinator的構(gòu)造函數(shù)中為Metadata添加了監(jiān)聽器。當(dāng)Metadata更新時就會使用SubscriptionState中的正則表達式過濾Topic,并更改SubscriptionState中的訂閱信息。同時,也會使用metadataSnapshot字段記錄當(dāng)前的Metadata的快照。這里要更新Metadata的原因,是為了防止因使用過期的Metadata進行Rebalance操作而導(dǎo)致多次連續(xù)的Rebalance操作。
-
調(diào)用ConsumerCoordinator.needRejoin()方法判斷是要發(fā)送JoinGroupRequest加入ConsumerGroup,其實現(xiàn)是檢測是否使用了AUTO_TOPICS或AUTO_PATTERN模式,檢測rejoinNeeded和needsPartitionAssignment兩個字段的值。
-
調(diào)用onJoinPrepare方法進行發(fā)送JoinGroupRequest請求之前的準(zhǔn)備,做了三件事:一是如果開啟了自動提交offset則進行同步提交offset,提交offset的內(nèi)容后面會詳細介紹,此步驟可能會阻塞線程;二是調(diào)用注冊在SubscriptionState中的ConsumerRebalanceListener上的 回調(diào)方法;三是將SubscriptionState的needsPartitionAssignment字段設(shè)置為true并收縮groupSubscription集合。
-
再次調(diào)用needRejoin方法檢測,之后調(diào)用ensureCoordinatorReady方法檢測已經(jīng)找到GroupCoordinator且與之建立了連接。
-
如果還有發(fā)往GroupCoordinator所在Node的請求,則阻塞等待這些請求全部發(fā)送完成并收到響應(yīng)(即等待unsent及InFlightRequests的對應(yīng)隊列為空),然后返回步驟5繼續(xù)執(zhí)行,主要是為了避免重復(fù)發(fā)送JoinGroupRequest請求。
-
調(diào)用sendJoinGroupRequest方法創(chuàng)建JoinGroupRequest請求,并調(diào)用ConsumerNetworkClient.send方法將請求放入unsent中緩存,等待發(fā)送。
-
在步驟7返回的RequestFuture對象上添加RequestFutureListener。
-
調(diào)用ConsumerNetworkClient.poll方法發(fā)送JoinGroupRequest,這里會阻塞等待,直到收到JoinGroupResponse或出現(xiàn)異常。
-
檢測RequestFuture.fail。如果出現(xiàn)RetriableException異常則進行重試,其他異常則報錯。如果無異常,則整個第二階段操作完成。
通過前面對JoinGroupRequest發(fā)送流程的分析,我們了解到JoinGrouResponse處理流程的入口是JoinGroupResponseHandler:handle()方法,其中還包括了SyncGroupRequest發(fā)送的操作。
JoinGrouResponse的處理流程如圖所示。
- 解析JoinGroupResponse,獲取GroupCoordinator分配的memberld、generation等信息,更新到本地。
- 消費者根據(jù)leaderld檢測自己是不是Leader。如果是Leader則進入onJoinLeader方法,如果不是Leader則進入onJoinFollower方法。從上面的流程圖也可以看出,onJoinFollower()方法的邏輯是onJoinLeader()方法的子集,下面主要分析onJoinLeader方法。
- Leader根據(jù)JoinGroupResponse的group_protocol字段指定的Parition分配策略,查找相應(yīng)的PartitionAssignor對象。
- Leader將JoinGroupResponse的members字段進行反序列化,得到ConsumerGroup中全部消費者訂閱的Topic。Leader會將這些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower則只關(guān)心自己訂閱的Topic信息。
- 第4步可能有新的Topic添加進來,所以更新Metadata信息。
- 待Metadata更新完成后,會在assignmentSnapshot字段中存儲一個Metadata快照(即通過Metadata的Listener創(chuàng)建的快照)。
- 調(diào)用PartitionAssignor.assign()方法進行分區(qū)分配。
- 將分配結(jié)果序列化,保存到Map中返回,其中key是消費者的memberld,value是分配結(jié)果序列化后的ByteBuffer。
第三階段
完成分區(qū)分配之后就進入了Synchronizing Group State 階段,主要邏輯是向GroupCoordinator 發(fā)送 SyncGroupRequest 請求并處理 SyncGroupResponse 響應(yīng)。
先來了解SyncGroupRequest 和 SyncGroupResponse 的消息體格式。
SyncGroupRequest 中各個字段的含義如表
SyncGroupResponse 中各個字段的含義如表
通過前面對onJoinLeader方法分析,我們知道發(fā)送 SyncGroupRequest 請求的邏輯緊接在分區(qū)分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:
- 得到序列化后的分區(qū)分配結(jié)果后,Leader將其封裝成 SyncGroupRequest,而Follower形成的SyncGroupRequest中這部分為空集合。
- 調(diào)用ConsumerNetworkClient.send方法將請求放入unsent集合中等待發(fā)送。
對SyncGroupResponse處理的入口是SyncGroupResponseHandler.handle方法。對于正常完成的情況,解析SyncGroupResponse,從中拿到分區(qū)分配結(jié)果并將其傳遞出去;對于出現(xiàn)異常情況,將rejoinNeeded設(shè)置為true,并針對不用的錯誤碼進行不同的處理。
從SyncGroupResponse中得到的分區(qū)分配結(jié)果最終由ConsumerCoordinator.onJoinComplete()方法處理,調(diào)用此方法的是在第二階段ensureActiveGroup方法的步驟8中添加的RequestFutureListener中調(diào)用。onJoinComplete()方法的流程如圖所示。文章來源:http://www.zghlxwxcb.cn/news/detail-809760.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-809760.html
- 在第二階段Leader開始分配分區(qū)之前,Leader使用assignmentSnapshot字段記錄了Metadata快照。此時在Leader中,將此快照與最新的Metadata快照進行對比。如果快照不一致則表示分區(qū)分配過程中出現(xiàn)了Topic增刪或分區(qū)數(shù)量的變化,則將needsPartitionAssignment置為true,需重新進行分區(qū)分配。
- 反序列化拿到分配給當(dāng)前消費者的分區(qū),并添加到SubscriptionStata.assignment集合中,之后消費者會按照此集合指定的分區(qū)進行消費,將needsPartitionAssignment置為false。
- 調(diào)用PartitionAssignor的onAssignment()回調(diào)函數(shù),默認(rèn)是空實現(xiàn)。當(dāng)用戶自定義PartitionAssignor時,可以自定義此方法。
- 如果開啟了自動提交offset的功能,則重新啟動AutoCommitTask定時任務(wù)。
- 調(diào)用SubscriptionState中注冊的ConsumerRebalanceListener。
- 將needsJoinPrepare重置為true,為下次Rebalance操作做準(zhǔn)備。
- 重啟HeartbeatTask定時任務(wù),定時發(fā)送心跳。
到了這里,關(guān)于Kafka-消費者-KafkaConsumer分析-Rebalance的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!