多副本機(jī)制
副本是分布式系統(tǒng)中對數(shù)據(jù)和服務(wù)提供的一種冗余方式。為了對外提供可用的服務(wù),往往會對數(shù)據(jù)和服務(wù)進(jìn)行副本處理。
- 數(shù)據(jù)副本:在不同的節(jié)點(diǎn)持久化同一份數(shù)據(jù),當(dāng)某個(gè)節(jié)點(diǎn)存儲的數(shù)據(jù)丟失時(shí),可以從副本中讀取數(shù)據(jù),這是分布式系統(tǒng)解決數(shù)據(jù)丟失問題的最有效的手段。
- 服務(wù)副本:多個(gè)節(jié)點(diǎn)提供相同的服務(wù),每個(gè)節(jié)點(diǎn)都有能力接收外部的請求并進(jìn)行相應(yīng)的處理。
Kafka 從 0.8 版本開始為分區(qū)引入了多副本機(jī)制,通過增加副本數(shù)量提升數(shù)據(jù)容災(zāi)能力。同時(shí),Kafka通過多副本機(jī)制實(shí)現(xiàn)了故障自動轉(zhuǎn)移,在 Kafka 集群中的某個(gè)節(jié)點(diǎn)失效的情況下仍然保證服務(wù)可用。從生產(chǎn)者發(fā)出的一條消息,首先會被寫入分區(qū)的 Leader 副本,然后需要等待 ISR 集合中的所有 Follower 副本同步完成之后才能被認(rèn)為已經(jīng)提交,接著更新分區(qū)的 HW,進(jìn)而消費(fèi)者可以消費(fèi)到這條消息。
副本:相對于分區(qū)而言的,即副本是特定分區(qū)的副本。一個(gè)分區(qū)包含一個(gè)或者多個(gè)副本,其中一個(gè)為 Leader 副本,其余為 Follower 副本,各個(gè)副本位于不同的 broker 節(jié)點(diǎn)上。只有 Leader 副本對外提供服務(wù),F(xiàn)ollower 副本只負(fù)責(zé)與 Leader 副本進(jìn)行數(shù)據(jù)同步。
AR:分區(qū)中的所有副本的統(tǒng)稱。
ISR:所有與 Leader 副本保持同步狀態(tài)的副本集合(Leader 副本也是 ISR 集合的一員)。
LEO:標(biāo)識每個(gè)分區(qū)的最后一條消息的下一個(gè)位置,分區(qū)的每個(gè)副本都有自己的 LEO。
HW:ISR 中最小的 LEO,俗稱高水位,消費(fèi)者只能拉取到 HW 之前的消息。
失效副本
處于失效狀態(tài)(同步失效或者功能失效)的副本會被剝離出 ISR 集合,失效副本對應(yīng)的分區(qū)稱為失效分區(qū),即 under-replicated 分區(qū)。
可以通過如下腳本命令查看失效分區(qū):
bin/kafka-topics.sh --bootstrap-server 10.211.55.6:9092,10.211.55.7:9092,10.211.55.8:9092 --describe --topic thing1 --under-replicated-partitions
-
功能失效:比如副本下線。
-
同步失效:當(dāng) ISR 集合中的一個(gè) Follower 副本滯后 Leader 副本的時(shí)間超過 broker 端參數(shù) -
replica.lag.time.max.ms
(默認(rèn)10000)就會判定為同步失效。 比如 Follower 副本的 I/O 開銷過大導(dǎo)致 Follower 副本同步速度太慢,在一段時(shí)間內(nèi)都無法追趕上 Leader 副本;頻繁的 Full GC 導(dǎo)致進(jìn)程卡住,在一段時(shí)間內(nèi)沒有向 Leader 副本發(fā)送同步請求。
ISR的伸縮與擴(kuò)充
當(dāng)檢測到 ISR 集合中有失效副本時(shí),就會收縮 ISR 集合。
ISR的收縮過程如下圖:
1、Kafka啟動的時(shí)候會開啟兩個(gè)與 ISR 相關(guān)的定時(shí)任務(wù) - “isr-expiration”、“isr-change-propagation”。
2、“isr-expiration” 任務(wù) 每隔 replica.lag.time.max.ms / 2
(默認(rèn)5000ms)檢測每個(gè)分區(qū)是否需要縮減其 ISR 集合,當(dāng)檢測到 ISR 集合中有失效副本時(shí),就會收縮 ISR 集合。如果某個(gè)分區(qū)的 ISR 集合發(fā)生變更,則將變更后的數(shù)據(jù)記錄到 ZooKeeper 的 /brokers/topics/<topic>/partition/<partition>/state
節(jié)點(diǎn)中。此外,還會將變更后的記錄緩存到 isrChangeSet 中。
3、“isr-change-propagation” 任務(wù) 每隔 2500ms 檢查 isrChangeSet,如果發(fā)現(xiàn) isrChangeSet 中有 ISR 集合的變更記錄,則在 ZooKeeper 的 “/isr_change_notification” 路徑下創(chuàng)建一個(gè)以 “isr_change” 開頭的持久順序節(jié)點(diǎn),并將 isrChangeSet 中的信息保存到這個(gè)節(jié)點(diǎn)中。
4、Kafka的控制器為 “/isr_change_notification” 添加一個(gè) Watcher,當(dāng)這個(gè)節(jié)點(diǎn)有子節(jié)點(diǎn)發(fā)生變化時(shí),會觸發(fā) Watcher 的處理,通知控制器更新元數(shù)據(jù)并向它管理的 Kafka Broker 節(jié)點(diǎn)發(fā)送更新元數(shù)據(jù)的請求,最后刪除 /isr_change_notification 路徑下已經(jīng)處理過的子節(jié)點(diǎn)。
Note:為了避免頻繁觸發(fā) Watcher 的處理影響 Kafka 控制器、其它broker節(jié)點(diǎn)、ZooKeeper 的性能,Kafka 添加了限定條件,當(dāng)檢測到分區(qū)的 ISR 集合發(fā)生變化時(shí),還需要檢查以下兩個(gè)條件:
(1)上一次 ISR 集合發(fā)生變化距離現(xiàn)在已經(jīng)超過5s。
(2)上一次寫入 ZooKeeper 的時(shí)間距離現(xiàn)在已經(jīng)超過60s。
滿足以上兩個(gè)條件之一才可以將 ISR 集合的變化寫入 ZooKeeper 中。
當(dāng) Follower 副本不斷與 Leader 副本進(jìn)行數(shù)據(jù)同步,并最終追趕上 Leader 副本(當(dāng)前 Follower 副本的 LEO 大于等于 Leader 副本的 HW)時(shí),F(xiàn)ollower 副本就有資格進(jìn)入 ISR 集合。ISR 集合的擴(kuò)充過程與收縮過程相似,這里不再展開分析。
副本更新LEO、HW的過程
生產(chǎn)者往 Leader 副本寫入消息,消息被追加到 Leader 副本的本地日志,并且會更新 LEO。
之后 Follower 副本向 Leader 副本拉取消息,并且?guī)в凶陨淼?LEO 信息。
Leader 副本讀取本地日志,返回給 Follower 副本對應(yīng)的消息,并且?guī)в凶陨淼?HW 信息。
Follower 副本收到 Leader 副本返回的消息,會將消息追加到本地日志中,并且更新 LEO、HW。
Follower 副本更新 HW 的算法:比較當(dāng)前 LEO 與 Leader 副本返回的 HW 的值,取較小值作為自己的 HW
Follower 副本再次請求拉取 Leader 副本中的消息,并且?guī)в懈潞蟮?HW 的值。
Leader 副本收到 Follower 的請求后,選擇最小的 LEO 作為 HW。
Follower 副本收到 Leader 副本返回的消息,會接著將消息追加到本地日志中,并且更新 LEO、HW。
Leader Epoch 的引入
在 0.11.0.0 版本之前,Kafka 使用的是基于 HW 的同步方式。這種方式可能會出現(xiàn) 數(shù)據(jù)丟失 或者 Leader 副本和 Follower 副本數(shù)據(jù)不一致 的問題。
Follower 副本在更新 LEO 為 2 和 更新 HW 為 2 之間存在一輪的 FetchRequest/FetchResponse。如果在這個(gè)過程中,F(xiàn)ollower 副本宕機(jī)了,那么重啟后會根據(jù)之前記錄的 HW(讀取 replication-offset-checkpoint 文件)進(jìn)行日志截?cái)?,那么會?dǎo)致 b 這條消息被刪除。然后 Follower 副本再向 Leader 副本拉取消息,如果此時(shí) Leader 副本宕機(jī)并且 Follower 副本被選舉為新的 Leader,接著原來的 Leader 副本恢復(fù)后就會成為 Follower 副本,由于 Follower 副本的 HW 不能比 Leader 副本的 HW 高,所以還會進(jìn)行一次日志截?cái)啵纱?b 這條消息就丟失了。或者原來的 Leader 副本無法恢復(fù),b 這條消息也是會丟失的。
如果 min.insync.replicas=1
的場景下,上述兩個(gè)副本處于掛掉狀態(tài),Replica B 先恢復(fù)并成為 Leader 副本,接著寫入消息 c 并更新 LEO、HW 為 2。此時(shí) Replica A 也恢復(fù)過來了,成為 Follower 副本并且需要根據(jù) HW 截?cái)嗳罩疽约跋?Leader 副本拉取數(shù)據(jù),由于此時(shí) Replica A 的 HW 也是 2,所以可以不做任何調(diào)整。如此一來,Replica A 和 Replica B 就會出現(xiàn)數(shù)據(jù)不一致的問題。
為了解決上述兩個(gè)問題,Kafka 從0.11.0.0開始引入了 Leader Epoch 的概念,在需要截?cái)鄶?shù)據(jù)的時(shí)候使用 Leader Epoch 作為參考依據(jù)。Leader Epoch 初始值為 0,代表 Leader 的紀(jì)元,每次 Leader 變更,該值都會加一。每個(gè)副本在本地的 leader-epoch-checkpoint
文件中記錄 <LeaderEpoch => StartOffset> 信息。
- 解決數(shù)據(jù)丟失問題:Follower 副本重啟,先發(fā)送請求(包含 Leader Epoch 值)給 Leader 副本,如果 Leader 副本收到請求后發(fā)現(xiàn)當(dāng)前的 Leader Epoch 與 Follower 傳過來的 Leader Epoch 一致,則返回當(dāng)前的 LEO;如果不一致,則 Leader 副本會查找 Follower 傳過來的 Leader Epoch + 1 對應(yīng)的 StartOffset 返回給 Follower 副本。Follower 副本根據(jù) Leader 副本返回的 StartOffset 判斷是否需要截?cái)嗳罩尽?/li>
StartOffset:當(dāng)前 Leader Epoch 下的寫入的第一條消息的偏移量
-
解決數(shù)據(jù)不一致的問題:還是在
min.insync.replicas=1
的場景下,Replica A、Replica B 都處于掛掉的狀態(tài),Replica B 先恢復(fù)通過選舉成為 Leader并更新 LE,然后寫入 c 這條消息并更新 LEO、HW。這時(shí) Replica A 恢復(fù)過來成為 Follower,向 Replica B 發(fā)送請求并攜帶自身的 LE。Replica B 接收到請求后,比較兩者的 LE 發(fā)現(xiàn)不一致,然后返回 Replica B 當(dāng)前 LE 對應(yīng)的 StartOffset。Replica A 比較 Replica B 返回的 StartOffset 與自己的 LEO ,判斷是否需要日志截?cái)唷?/li>
文章來源:http://www.zghlxwxcb.cn/news/detail-475700.html
? 文章來源地址http://www.zghlxwxcb.cn/news/detail-475700.html
到了這里,關(guān)于Kafka架構(gòu)篇 - 多副本機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!