在同一個Consumer Group中,同一個Topic的不同分區(qū)會分配給不同的消費者進行消費,那么為消費者分配分區(qū)的操作是在Kafka服務(wù)端完成的嗎?分區(qū)是如何進行分配呢?下面來分析Rebalance操作的原理。
方案一
Kafka最開始的解決方案是通過ZooKeeper的Watcher實現(xiàn)的。
每個Consumer Group在ZooKeeper下都維護了一個“/consumers/[group_id]/ids”路徑,在此路徑下使用臨時節(jié)點記錄屬于此Consumer Group的消費者的Id,由Consumer啟動時創(chuàng)建。
還有兩個與ids節(jié)點同級的節(jié)點,它們分別是:
owners節(jié)點,記錄了分區(qū)與消費者的對應(yīng)關(guān)系;
offsets節(jié)點,記錄了此Consumer Group在某個分區(qū)上的消費位置。
每個Broker、Topic以及分區(qū)在ZooKeeper中也都對應(yīng)一個路徑,如下所示。
- /brokers/ids/broker_id:記錄了host、port以及分配在此Broker上的Topic的分區(qū)列表。
- /brokers/topics/[topic_name]:記錄了每個Partition的Leader、ISR等信息。
- /brokers/topics/[topic_name]/partitions/[partition_num]/state:記錄了當前Leader、選舉epoch等信息
路徑圖如圖所示。
每個Consumer都分別在“/consumers/[group_id]/ids”和“brokers/ids”路徑上注冊一個Watcher。
當“/consumers/[group_id]/ids”路徑的子節(jié)點發(fā)生變化時,表示ConsumerGroup中的消費者出現(xiàn)了變化;
當“/brokers/ids”路徑的子節(jié)點發(fā)生變化時,表示Broker出現(xiàn)了增減。
這樣,通過Watcher,每個消費者就可以監(jiān)控Consumer Group和Kafka集群的狀態(tài)了。
這個方案看上去不錯,但是嚴重依賴于ZooKeeper集群,有兩個比較嚴重的問題:
-
羊群效應(yīng)(Herd Effect):先解釋一下什么是“羊群效應(yīng)”,一個被Watch的ZooKeeper節(jié)點變化,導致大量的Watcher通知需要被發(fā)送給客戶端,這將導致在通知期間其他操作延遲。
一般出現(xiàn)這種情況的主要原因就是沒有找到客戶端真正的關(guān)注點,也算是濫用Watcher的一種場景。
繼續(xù)前面的分析,任何Broker或Consumer加入或退出,都會向其余所有的Consumer發(fā)送Watcher通知觸發(fā)Rebalance,就出現(xiàn)了“羊群效應(yīng)”。 -
腦裂(Split Brain):每個Consumer都是通過ZooKeeper中保存的這些元數(shù)據(jù)判斷Consumer Group狀態(tài)、Broker的狀態(tài)以及Rebalance結(jié)果的,由于ZooKeeper只保證“最終一致性”,不保證“Simultaneously Consistent Cross-Client Views”,不同Consumer在同一時刻可能連接到ZooKeeper集群中不同的服務(wù)器,看到的元數(shù)據(jù)就可能不一樣,這就會造成不正確的Rebalance嘗試。
方案二
由于上述兩個原因,Kafka的后續(xù)版本對Rebalance操作進行了改進,也對消費者進行了重新設(shè)計。
其核心設(shè)計思想是:將全部的Consumer Group分成多個子集,每個Consumer Group子集在服務(wù)端對應(yīng)一個GroupCoordinator對其進行管理,GroupCoordinator是KafkaServer中用于管理Consumer Group的組件,消費者不再依賴ZooKeeper,而只有GroupCoordinator在ZooKeeper上添加Watcher。
消費者在加入或退出Consumer Group時會修改ZooKeeper中保存的元數(shù)據(jù),這點與上文描述的方案一類似,此時會觸發(fā)GroupCoordinator設(shè)置的Watcher,通知GroupCoordinator開始Rebalance操作。
下面簡述這個過程:
-
當前消費者準備加入某Consumer Group或是GroupCoordinator發(fā)生故障轉(zhuǎn)移時,消費者并不知道GroupCoordinator的網(wǎng)絡(luò)位置,消費者會向Kafka集群中的任一Broker發(fā)送ConsumerMetadataRequest,此請求中包含了其Consumer Group的Groupld,收到請求的Broker會返回ConsumerMetadataResponse作為響應(yīng),其中包含了管理此ConsumerGroup的GroupCoordinator的相關(guān)信息。
-
消費者根據(jù)ConsumerMetadataResponse中的GroupCoordinator信息,連接到GroupCoordinator并周期性地發(fā)送HeartbeatRequest,HeartbeatRequest的具體格式在后面會詳細介紹。
發(fā)送HeartbeatRequest的主要作用是為了告訴GroupCoordinator此消費者正常在線,GroupCoordinator會認為長時間未發(fā)送HeartbeatRequest的消費者已經(jīng)下線,觸發(fā)新一輪的Rebalance操作。
-
如果HeartbeatResponse中帶有IllegalGeneration異常,說明GroupCoordinator發(fā)起了Rebalance操作,此時消費者發(fā)送JoinGroupRequest(具體格式在后面介紹)給GroupCoordinator,JoinGroupRequest的主要目的是為了通知GroupCoordinator,當前消費者要加入指定的Consumer Group。
之后,GroupCoordinator會根據(jù)收到的JoinGroupRequest和ZooKeeper中的元數(shù)據(jù)完成對此Consumer Group的分區(qū)分配。
-
GroupCoordinator會在分配完成后,將分配結(jié)果寫入ZooKeeper保存,并通過JoinGroupResponse返回給消費者。消費者就可以根據(jù)JoinGroupResponse中分配的分區(qū)開始消費數(shù)據(jù)。
-
消費者成功成為Consumer Group的成員后,會周期性發(fā)送HeartbeatRequest。如果HeartbeatResponse包含IlegalGeneration異常,則執(zhí)行步驟3。如果找不到對應(yīng)的GroupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup異常),則周期性地執(zhí)行步驟1,直至成功。
這里只是簡略地描述此方案的步驟,整個方案還是有點復(fù)雜的,其中比較嚴謹?shù)孛枋隽讼M者和GroupCoordinator的狀態(tài)圖和各個階段可能發(fā)生的故障以及故障轉(zhuǎn)移處理,本文重點關(guān)注Consumer Group Rebalance方面。
上面這種方案雖然解決了“羊群效應(yīng)”、“腦裂”問題,但是還是有兩個問題:
-
分區(qū)分配的操作是在服務(wù)端的GroupCoordinator中完成的,這就要求服務(wù)端實現(xiàn)Partition的分配策略。當要使用新的Partition分配策略時,就必須修改服務(wù)端的代碼或配置,之后重啟服務(wù),這就顯得比較麻煩。
-
不同的Rebalance策略有不同的驗證需求。當需要自定義分區(qū)分配策略和驗證需求時,就會很麻煩。
方案三
為了解決上述問題,Kafka進行了重新設(shè)計,將分區(qū)分配的工作放到了消費者這一端進行處理,而Consumer Group管理的工作則依然由GroupCoordinator處理。
這就讓不同的模塊關(guān)注不同的業(yè)務(wù),實現(xiàn)了業(yè)務(wù)的切分和解耦,這種思想在設(shè)計時很重要。
重新設(shè)計后的協(xié)議在上一版本的協(xié)議上進行了修改,將JoinGroupRequest的處理過程拆分成了兩個階段,分別是Join Group階段和Synchronizing Group State階段。
當消費者查找到管理當前Consumer Group的GroupCoordinator后,就會進入Join Group階段,Consumer首先向GroupCoordinator發(fā)送JoinGroupRequest請求,其中包含消費者的相關(guān)信息;
服務(wù)端的GroupCoordinator收到JoinGroupRequest后會暫存消息,收集到全部消費者之后,根據(jù)JoinGroupRequest中的信息來確定Consumer Group中可用的消費者,從中選取一個消費者成為Group Leader,還會選取使用的分區(qū)分配策略,最后將這些信息封裝成JoinGroupResponse返回給消費者。
雖然每個消費者都會收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupResponse中封裝了所有消費者的信息。
當消費者確定自己是Group Leader后,會根據(jù)消費者的信息以及選定的分區(qū)分配策略進行分區(qū)分配。
在Synchronizing Group State階段,每個消費者會發(fā)送SyncGroupRequest到GroupCoordinator,但是只有Group Leader的SyncGroupRequest請求包含了分區(qū)的分配結(jié)果,GroupCoordinator根據(jù)Group Leader的分區(qū)分配結(jié)果,形成SyncGroupResponse返回給所有Consumer。
消費者收到SyncGroupResponse后進行解析,即可獲取分配給自身的分區(qū)。
最后,我們來了解消費者的狀態(tài)轉(zhuǎn)移與各請求之間的關(guān)系,如圖所示。文章來源:http://www.zghlxwxcb.cn/news/detail-802914.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-802914.html
到了這里,關(guān)于Kafka-消費者-Consumer Group Rebalance設(shè)計的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!