網(wǎng)上有很多文章講述 Kafka rebalance 的原理,本文是列舉常見的幾種 rebalance 場景。文章來源:http://www.zghlxwxcb.cn/news/detail-430348.html
rebalance 期間,當前 consumer group 的所有 consumer 都要暫停消費,開銷較大。因此應該盡量減少 rebalance ,而 relalance 的原因通常是 consumer 數(shù)量變化,常見的幾種情況如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-430348.html
- 如果一個 consumer 剛啟動,則會向 broker 發(fā)送 JoinGroup 請求,加入 group ,被分配一個 member id ,觸發(fā)一次 rebalance 。
- 如果一個 consumer 終止,不再運行。則等到 Heartbeat 超時,broker 會認為該 consumer 下線,觸發(fā)一次 rebalance 。
- 上述 consumer 啟動、終止的情況一般不頻繁,可以容忍它觸發(fā) rebalance 。但有的情況下,consumer 會頻繁啟動、終止,比如被 k8s HPA 改變 consumer 數(shù)量。
- 解決方案:額外開發(fā)一個應用,稱為 dispatcher ,讓它作為唯一的 consumer 連接到 broker ,獲取消息。而原本的應用連接到 dispatcher ,間接獲取消息。
- 使用 dispatcher 還能解決另一個問題:group 中的 consumer 數(shù)量,大于當前 topic 的 partition 數(shù)量,導致部分 consumer 空閑、不能消費。
- 如果一個 consumer 消費太慢,連續(xù)調(diào)用 poll() 的時間間隔超過 max.poll.interval.ms ,也會導致 Heartbeat 超時,觸發(fā) rebalance 。
- 解決方案:增加 max.poll.interval.ms 閾值,或者優(yōu)化 consumer 客戶端代碼,例如減少每次拉取的數(shù)據(jù)量從而減少消費耗時、更快地開始下一次消費,例如從同步消費改為異步消費。
- 如果一個 consumer 終止,然后又重啟。則不記得自己之前的 member id ,依然會發(fā)送 JoinGroup 請求,加入 group ,被分配一個新的隨機 member id ,觸發(fā)一次 rebalance 。
- 而舊的 member id 不再使用,等到 Heartbeat 超時,又會觸發(fā)一次 rebalance 。因此 consumer 重啟時會觸發(fā)兩次 rebalance 。
- 解決方案:Kafka v2.3 開始,consumer 增加了配置參數(shù) group.instance.id ,用于避免 consumer 重啟時觸發(fā) rebalance 。
- 給該參數(shù)賦值為非空字符串時,consumer 會從默認的 Dynamic Member 類型變成 Static Member 類型,并采用該參數(shù)的值作為 client.id。
當 consumer 重啟之后發(fā)送 JoinGroup 請求時,Coordinator 會識別出它是 Static Member ,屬于 rejoin ,因此分配一個新 member id ,并刪除舊的 member id 。
這樣不會觸發(fā) rebalance ,除非 consumer 重啟太慢,導致 Heartbeat 超時。 - 日志示例:
INFO [GroupCoordinator 1]: Static member Some(static_member_1) of group test_group_1 with unknown member id rejoins, assigning new member id static_member_1-cdf1c4ea-2f1c-4f4d-bc46-bf443e5f7322, while old member id static_member_1-8b5d89b3-0757-4441-aeaa-50e7f9f55cee will be removed. INFO [GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.
- 如果一個 group 中只運行了一個 consumer ,則用戶可以配置一個固定的 group.instance.id 值。
- 如果一個 group 中運行了多個 consumer ,則用戶需要在客戶端增加一些代碼,給每個 consumer 配置一個互不相同的、長期不變的 group.instance.id 值。
例如以 k8s StatefulSet 方式部署多個 consumer ,它們的 Pod 名稱會從 0 開始編號??勺屆總€ consumer 通過環(huán)境變量讀取自己的 POD_NAME ,用作 group.instance.id 。
- 給該參數(shù)賦值為非空字符串時,consumer 會從默認的 Dynamic Member 類型變成 Static Member 類型,并采用該參數(shù)的值作為 client.id。
到了這里,關于Kafka rebalance 的幾種原因與解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!