国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka核心原理第一彈——更新中

這篇具有很好參考價值的文章主要介紹了Kafka核心原理第一彈——更新中。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

架構原理

一、高性能讀寫架構原理——順序寫+零拷貝

首先了解兩個專業(yè)術語,研究kafka這個東西,你必須得搞清楚這兩個概念,吞吐量,延遲。

寫數據請求發(fā)送給kafka一直到他處理成功,你認為寫請求成功,假設是1毫秒,這個就說明性能很高,這個就是延遲。

kafka,每毫秒可以處理1條數據,每秒可以處理1000條數據,這個單位時間內可以處理多少條數據,就叫做吞吐量,1000條數據,每條數據10kb,10mb,吞吐量相當于是每秒處理10mb的數據

1. Kafka是如何利用順序磁盤寫機制實現單機每秒幾十萬消息寫入的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
kafka的特點:高吞吐低延遲

直接寫入os的page cache中

文件,kafka僅僅是追加數據到文件末尾,磁盤順序寫,性能極高,幾乎跟寫內存是一樣高的。磁盤隨機寫,你要隨機在文件的某個位置修改數據,這個叫做磁盤隨機寫,性能是很低的,磁盤順序寫,僅僅追加數據到文件末尾

而且寫磁盤的方式是順序寫,不是隨機寫,性能跟內存寫幾乎一樣。就是僅僅在磁盤文件的末尾追加寫,不能在文件隨機位置寫入

假設基于上面說的os cache寫 + 磁盤順序寫,0.01毫秒,低延遲,高吞吐,每毫秒可以處理100條數據,每秒可以處理10萬條數據,不需要依托類似spark straeming那種batch微批處理的機制

正是依靠了這個超高的寫入性能,單物理機可以做到每秒幾十萬條消息寫入Kafka

這種方式讓kafka的寫性能極高,最大程度減少了每條數據處理的時間開銷,反過來就大幅度提升了每秒處理數據的吞吐量,一般kafka部署在物理機上,單機每秒寫入幾萬到幾十萬條消息是沒問題的

這種方式是不是就兼顧了低延遲和高吞吐兩個要求,盡量把每條消息的寫入性能壓榨到極致,就可以實現低延遲的寫入,同時對應的每秒的吞吐量自然就提升了

所以這是kafka非常核心的一個底層機制

而且這里很關鍵的一點,比如rabbitmq這種消息中間件,他會先把數據寫入內存里,然后到了一定時候再把數據一次性從內存寫入磁盤里,但是kafka不是這種機制,他收到數據直接寫磁盤

只不過是先寫的page cache,然后是磁盤順序寫,所以寫入的性能非常高,而且這樣不需要讓kafka自身的jvm進程占用過多內存,可以更多的把內存空間留給os的page cache來緩存磁盤文件的數據

只要能讓更多的磁盤數據緩存在os cache里,那么后續(xù)消費數據從磁盤讀的時候,就可以直接走os cache讀數據了,性能是非常高的

2. Kafka是如何利用零拷貝和頁緩存技術實現高性能讀取的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
那么在消費數據的時候,需要從磁盤文件里讀取數據后通過網絡發(fā)送出去,這個時候怎么提升性能呢?

首先就是利用了page cache技術,之前說過,kafka寫入數據到磁盤文件的時候,實際上是寫入page cache的,沒有直接發(fā)生磁盤IO,所以寫入的數據大部分都是停留在os層的page cache里的

這個本質其實跟elasticsearch的實現原理是類似的

然后在讀取的時候,如果正常情況下從磁盤讀取數據,先嘗試從page cache讀,讀不到才從磁盤IO讀,讀到數據以后先會放在os層的一個page cache里,接著會發(fā)生上下文切換到系統(tǒng)那邊,把os的讀緩存數據拷貝到應用緩存里

接著再次發(fā)生上下文二切換到os層,把應用緩存的數據拷貝到os的socket緩存中,最后數據再發(fā)送到網卡上

這個過程里,發(fā)生了好幾次上下文切換,而且還涉及到了好幾次數據拷貝,如果不考慮跟硬件之間的交互,起碼是從os cache到用戶緩存,從用戶緩存到socket緩存,有兩次拷貝是絕對沒必要的
Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
但是如果用零拷貝技術,就是linux的sendfile,就可以直接把操作交給os,os看page cache里是否有數據,如果沒有就從磁盤上讀取,如果有的話直接把os cache里的數據拷貝給網卡了,中間不用走那么多步驟了

對比一下,是不是所謂的零考別了?

所以呢,通過零拷貝技術來讀取磁盤上的數據,還有page cahce的幫助,這個性能就非常高了

3. Kafka的底層數據存儲結構:日志文件以及offset

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
基本上可以認為每個partition就是一個日志文件,存在于某臺Kafka服務器上,然后這個日志里寫入了很多消息,每個消息在partition日志文件里都有一個序號,叫做offset,代表這個消息是日志文件里的第幾條消息

但是在消費消息的時候也有一個所謂的offset,這個offset是代表消費者目前在partition日志文件里消費到了第幾條消息,是兩回事兒

4. Kafka是如何通過精心設計消息格式節(jié)約磁盤空間占用開銷的?

kafka的消息格式如下:
crc32,magic,attribute,時間戳,key長度,key,value長度,value

kafka是直接通過NIO的ByteBuffer以二進制的方式來保存消息的,這種二級制緊湊保存格式可以比使用Java對象保存消息要節(jié)約40%的內存空間

然后這個消息實際上是封裝在一個log entry里的,你可以認為是一個日志條目吧,在kafka里認為每個partition實際上就是一個磁盤上的日志文件,寫到parttion里去的消息就是一個日志,所以log entry就是一個日志

這個日志條目包含了一個offset,一個消息的大小,然后是消息自身,就是上面那個數據結構,但是這里要注意的一點,就是這個message里可能會包含多條消息壓縮在一起,所以可能找一條消息,需要從這個壓縮數據里遍歷搜索

而且這里還有一個概念就是消息集合,一個消息集合里包含多個日志,最新名稱叫做RecordBatch

后來消息格式演化為了如下所示:
(1)消息總長度
(2)屬性:廢棄了,已經不用
(3)時間戳增量:跟RecordBatch的時間戳的增量差值
(4)offset增量:跟RecordBatch的offset的增量差值
(5)key長度
(6)key
(7)value長度
(8)value
(9)header個數
(10)header:自定義的消息元數據,key-value對

通過時間戳、offset、key長度等都用可變長度來盡可能減少空間占用,v2版本的數據格式比v1版本的數據格式要節(jié)約很多磁盤開銷

5. 如何實現TB量級的數據在Kafka集群中分布式的存儲?

但是這里有一個很大的問題,就是不可能說把TB量級的數據都放在一臺Kafka服務器上吧?這樣肯定會遇到容量有限的問題,所以Kafka是支持分布式存儲的,也就是說你的一個topic,代表了邏輯上的一個數據集

你大概可以認為一個業(yè)務上的數據集合吧,比如說用戶行為日志都走一個topic,數據庫里的每個表的數據分別是一個topic,訂單表的增刪改的變更記錄進入一個topic,促銷表的增刪改的變更記錄進入一個topic

每個topic都有很多個partition,你認為是數據分區(qū),或者是數據分片,大概這些意思都可以,就是說這個topic假設有10TB的數據量需要存儲在磁盤上,此時你給他分配了5個partition,那么每個partition都可以存放2TB的數據

然后每個partition不就可以放在一臺機器上,通過這個方式就可以實現數據的分布式存儲了,每臺機器上都運行一個Kafka的進程,叫做Broker,以后大家記住,broker就是一個kafka進程,在一臺服務器上就可以了
Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java

二、高可用架構原理——異步復制+ISR列表

1. 如何基于多副本冗余機制保證Kafka宕機時還具備高可用性?

但是這里就有一個問題了,如果此時Kafka某臺機器宕機了,那么一個topic就丟失了一個partition的數據,此時不就導致數據丟失了嗎?所以啊,所以對數據做多副本冗余,也就是每個parttion都有副本

比如最基本的就是每個partition做一個副本,副本放在另外一臺機器上
Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
然后呢kafka自動從一個partition的多個副本中選舉出來一個leader partition,這個leader partition就負責對外提供這個partiton的數據讀寫,接收到寫過來的數據,就可以把數據復制到副本partition上去

這個時候如果說某臺機器宕機了,上面的leader partition沒了,此時怎么辦呢?通過zookeeper來維持跟每個kafka的會話,如果一個kafka進程宕機了,此時kafka集群就會重新選舉一個leader partition,就是用他的某個副本partition即可

通過副本partition可以繼續(xù)體統(tǒng)這個partition的數據寫入和讀取,這樣就可以實現容錯了,這個副本partition的專業(yè)術語叫做follower partition,所以每個partitino都有多個副本,其中一個是leader,是選舉出來的,其他的都是follower partition

多副本冗余的機制,就可以實現Kafka高可用架構

2. 保證寫入Kafka的數據不丟失:ISR機制到底是什么意思?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
光是依靠多副本機制能保證Kafka的高可用性,但是能保證數據不丟失嗎?不行,因為如果leader宕機,但是leader的數據還沒同步到follower上去,此時即使選舉了follower作為新的leader,當時剛才的數據已經丟失了

ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的數量,只有處于ISR列表中的follower才可以在leader宕機之后被選舉為新的leader,因為在這個ISR列表里代表他的數據跟leader是同步的

如果要保證寫入kafka的數據不丟失,首先需要保證ISR中至少有一個follower,其次就是在一條數據寫入了leader partition之后,要求必須復制給ISR中所有的follower partition,才能說代表這條數據已提交,絕對不會丟失,這是Kafka給出的承諾

3. 如何讓Kafka集群處理請求的時候實現負載均衡的效果?

假如說很多partition的leader都在一臺機器上,那么不就會導致大量的客戶端都請求那一臺機器?這樣是不對的,kafka集群會自動實現負載均衡的算法,盡量把leader partition均勻分布在集群各個機器上

然后客戶端在請求的時候,就會盡可能均勻的請求到kafka集群的每一臺機器上去了,假如出現了partition leader的變動,那么客戶端會感知到,然后下次就可以請求最新的那個leader partition了

4. 基于ZooKeeper實現Kafka無狀態(tài)可伸縮的架構設計思路

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java

5. Kafka集群是如何基于Zookeeper實現節(jié)點發(fā)現與故障感知的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java

6. 再看ISR機制:Leader宕機時只能選舉同步的Follower

7. Partition的幾個核心offset:高水位offset、LEO代表了什么?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
實際上來說,每次leader接收到一條消息,都會更新自己的LEO,也就是log end offset,把最后一位offset + 1,這個大家都能理解吧?接著各個follower會從leader請求同步數據,這是持續(xù)進行的

offset = 0 ~ offset = 4,LEO = 5,代表了最后一條數據后面的offset,下一次將要寫入的數據的offset,LEO,你一定要明白他的名詞

然后follower同步到數據之后,就會更新自己的LEO

并不是leader主動推送數據給follower,他實際上是follower主動向leader嘗試獲取數據,不斷的發(fā)送請求到leader來fetch最新的數據

然后對于接收到的某一條數據,所有follower的LEO都更新之后,leader才會把自己的HW(High Water Mark)高水位offset + 1,這個高水位offset表示的就是最新的一條所有follower都同步完成的消息

partition中最開始的一條數據的offset是base offset

LEO和HW分別是干什么的呢?

LEO很重要的一個功能,是負責用來更新HW的,就是如果leader和follower的LEO同步了,此時HW就可以更新

所有對于消費者來說,他只能看到base offset到HW offset之間的數據因為只有這之間的數據才表明是所有follower都同步完成的,這些數據叫做“已提交”的,也就是committed,是可以被消費到的

HW offset到LEO之間的數據,是“未提交的”,這時候消費者是看不到的

HW offset表示的是當前已經提交的數據offset,LEO表示的是下一個要寫入的數據的offset

8. 深入探究Leader與Follower上的LEO是如何更新的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
首先leader接收到數據字后就會更新自己的LEO值

接著follower會不斷的向leader發(fā)送fetch請求同步數據,然后每次一條數據同步到follower之后,他的LEO就會更新,同時leader發(fā)送數據給follower的時候,在leader端會維護所有follower的LEO值

follower發(fā)送fetch請求給leader的時候會帶上自己的LEO值,然后leader每次收到一個fetch請求就會更新自己維護的每個follower的LEO值

所以這里大家要知道的是,leader上是會保存所有follower的LEO值的,這個是非常關鍵和核心的一點

9. 深入探究Leader與Follower上的高水位offset是如何更新的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
每次leader發(fā)送數據給follower的時候,都會發(fā)送自己的HW值,然后follower獲取到leader HW之后,就會跟自己的LEO比較一下,取里面小的那個值作為自己的HW值,換句話說,如果follower的LEO比leader HW大了,那么follower的HW就是leader HW

但是如果follower的LEO比leader HW小,說明自己明顯落后于leader,那么follower的HW就是自己的LEO值

然后leader上的HW就很明顯了,那就是主要是他在接收follower的fetch請求的時候,就會在更新自己維護的所有follower的LEO之后,判斷一下當前自己的LEO是否跟所有follower都保持一致,那么就會自動更新自己的HW值

這個leader partition的HW值,代表了從這個partition的哪個offset之前可以被消費數據

10. 用真實場景圖解剖析Leader與Follower的LEO與高水位如何更新?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
假設leader收到第一條數據,此時leader LEO = 1,HW = 0,因為他發(fā)現其他follower的LEO也是0,所以HW必須是0

接著follower來發(fā)送fetch請求給leader同步數據,帶過去follower的LEO = 0,所以leader上維護的follower LEO = 0,更新了一下,此時發(fā)現follower的LEO還是0,所以leader的HW繼續(xù)是0

接著leader發(fā)送一條數據給follower,這里帶上了leader的HW = 0,因為發(fā)現leader的HW = 0,此時follower LEO更新為1,但是follower HW = 0,取leader HW

接著下次follower再次發(fā)送fetch請求給leader的時候,就會帶上自己的LEO = 1,leader更新自己維護的follower LEO = 1,此時發(fā)現follower跟自己的LEO同步了,那么leader的HW更新為1

接著leader發(fā)送給follower的數據里包含了HW = 1,此時follower發(fā)現leader HW = 1,自己的LEO = 1,此時follower的HW有更新為1

5個數據:全部都要往前推進更新,需要2次請求,第一次請求是僅僅是更新兩邊的LEO,第二次請求是更新另外leader管理的follower LEO,以及兩個HW

11. 高水位機制可能導致leader切換時發(fā)生數據丟失問題

基于之前說的高水位機制,可能會導致一些問題,比如數據丟失

假如說生產者的min.insync.replicas設置為1,這個就會導致說生產者發(fā)送消息給leader,leader寫入log成功后,生產者就會認為寫成功了,此時假設生產者發(fā)送了兩條數據給leader,leader寫成功了

此時leader的LEO = 1,HW = 0,因為follower還沒同步,HW肯定是0

接著follower發(fā)送fetch請求,此時leader發(fā)現follower LEO = 0,所以HW還是0,給follower帶回去的HW也是0,然后follower開始同步數據也寫入了兩條數據,自己的LEO = 1,但是HW = 0,因為leader HW為0

接著follower再次發(fā)送fetch請求過來,自己的LEO = 1,leader發(fā)現自己LEO = 1,follower LEO = 1,所以HW更新為1,同時會把HW = 1帶回給follower,但是此時follower還沒更新HW的時候,HW還是0

這個時候假如說follower機器宕機了,重啟機器之后,follower的LEO會自動被調整為0,因為會依據HW來調整LEO,而且自己的那兩條數據會被從日志文件里刪除,數據就沒了

這個時候如果leader宕機,就會選舉follower為leader,此時HW = 0,接著leader那臺機器被重啟后作為follower,這個follower會從leader同步HW是0,此時會截斷自己的日志,刪除兩條數據

這種場景就會導致數據的丟失

非常極端的一個場景,數據可能會莫名其妙的丟失

12. 高水位機制可能導致leader切換時發(fā)生數據不一致問題

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
假設min.insync.replicas = 1,那么只要leader寫入成功,生產者而就會認為寫入成功

如果leader寫入了兩條數據,但是follower才同步了一條數據,第二條數據還沒同步,假設這個時候leader HW = 2,follower HW = 1,因為follower LEO小于leader HW,所以follower HW取自己的LEO

這個時候如果leader掛掉,切換follower變成leader,此時HW = 1,就一條數據,然后生產者又發(fā)了一條數據給新leader,此時HW變?yōu)?,但是第二條數據是新的數據。接著老leader重啟變?yōu)閒ollower,這個時候發(fā)現兩者的HW都是2

所以他們倆就會繼續(xù)運行了

這個時候他們倆數據是不一致的,本來合理的應該是新的follower要刪掉自己原來的第二條數據,跟新leader同步的,讓他們倆的數據一致,但是因為依賴HW發(fā)現一樣,所以就不會截斷數據了

13. Kafka 0.11.x版本引入leader epoch機制解決高水位機制弊端

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
所謂的leader epoch大致理解為每個leader的版本號,以及自己是從哪個offset開始寫數據的,類似[epoch = 0, offset = 0],這個就是epoch是版本號的意思,接著的話,按照之前的那個故障場景

假如說follower先宕機再重啟,他會找leader繼續(xù)同步最新的數據,更新自己的LEO和HW,不會截斷數據,因為他會看看自己這里有沒有[epoch, offset]對,如果有的話,除非是自己的offset大于了leader的offset,才會截斷自己的數據

而且人家leader的最新offset = 1,自己的offset = 0,明顯自己落后于人家,有什么資格去截斷數據呢?對不對,就是這個道理。而且還會去從leader同步最新的數據過來,此時自己跟Leader數據一致。

如果此時leader宕機,切換到follower上,此時就會更新自己的[epoch = 1, offset = 2],意思是自己的leader版本號是epoch = 1,自己從offset = 2開始寫數據的

然后接著老leader恢復變?yōu)閒ollower,從新leader看一下epoch跟自己對比,人家offset = 2,自己的offset = 0,也不需要做任何數據截斷,直接同步人家數據就可以了

然后針對數據不一致的場景,如果說老leader恢復之后作為follower,從新leader看到[epoch = 1, offset = 1],此時會發(fā)現自己的offset也是1,但是人家新leader是從offset = 1開始寫的,自己的offset = 1怎么已經有數據了呢?

此時就會截斷掉自己一條數據,然后跟人家同步保持數據一致

14. Kafka為Partition維護ISR列表的底層機制是如何設計的?

很多公司比較常用的一個kafka的版本,是0.8.2.x系列,這個系列的版本是非常經典的,在過去幾年相當大比例的公司都是用這個版本的kafka。當然,現在很多公司開始用更新版本的kafka了,就是0.9.x或者是1.x系列的

我們先說說在0.9.x之前的版本里,這個kafka到底是如何維護ISR列表的,什么樣的follower才有資格放到ISR列表里呢?

在之前的版本里,有一個核心的參數:replica.lag.max.messages。這個參數就規(guī)定了follower如果落后leader的消息數量超過了這個參數指定的數量之后,就會認為follower是out-of-sync,就會從ISR列表里移除了

咱們來舉個例子好了,假設一個partition有3個副本,其中一個leader,兩個follower,然后replica.lag.max.messages = 3,剛開始的時候leader和follower都有3條數據,此時HW和LEO都是offset = 2的位置,大家都同步上來了

現在來了一條數據,leader和其中一個follower都寫入了,但是另外一個follower因為自身所在機器性能突然降低,導致沒及時去同步數據,follower所在機器的網絡負載、內存負載、磁盤負載過高,導致整體性能下降了,此時leader partition的HW還是offset = 2的位置,沒動,但是LEO變成了offset = 3的位置

依托LEO來更新ISR的話,在每個follower不斷的發(fā)送Fetch請求過來的時候,就會判斷l(xiāng)eader和follower的LEO相差了多少,如果差的數量超過了replica.lag.max.messages參數設置的一個閾值之后,就會把follower給踢出ISR列表

但是這個時候第二個follower的LEO就落后了leader才1個offset,還沒到replica.lag.max.messages = 3,所以第二個follower實際上還在ISR列表里,只不過剛才那條消息沒有算“提交的”,在HW外面,所以消費者是讀不到的

而且這個時候,生產者寫數據的時候,如果默認值是要求必須同步所有follower才算寫成功的,可能這個時候會導致生產者一直卡在那兒,認為自己還沒寫成功,這個是有可能的

一共有3個副本,1個leaderr,2個是follower,此時其中一個follower落后,被ISR踢掉了,ISR里還有2個副本,此時一個leader和另外一個follower都同步成功了,此時就可以讓那些卡住的生產者就可以返回,認為寫數據就成功了

min.sync.replicas = 2,ack = -1,生產者要求你必須要有2個副本在isr里,才可以寫,此外,必須isr里的副本全部都接受到數據,才可以算寫入成功了,一旦說你的isr副本里面少于2了,其實還是可能會導致你生產數據被卡住的

假設這個時候,第二個follower fullgc持續(xù)了幾百毫秒然后結束了,接著從leader同步了那條數據,此時大家LEO都一樣,而且leader發(fā)現所有follower都同步了這條數據,leader就會把HW推進一位,HW變成offset = 3

這個時候,消費者就可以讀到這條在HW范圍內的數據了,而且生產者認為寫成功了

但是要是此時follower fullgc一直持續(xù)了好幾秒鐘,此時其他的生產者一直在發(fā)送數據過來,leader和第一個follower的LEO又推進了2位,LEO offset = 5,但是HW還是停留在offset = 2,這個時候HW后面的數據都是消費不了的,而且HW后面的那幾條數據的生產者可能都會認為寫未成功

現在導致第二個follower的LEO跟leader的LEO差距超過3了,此時觸發(fā)閾值,follower認為是out-of-sync,就會從ISR列表里移除了

一旦第二個follower從ISR列表里移除了,世界清靜了,此時ISR列表里就leader和第一個follower兩個副本了,此時leader和第一個follower的LEO都是offset = 5,是同步的,leader就會把HW推進到offset = 5,此時消費者就可以消費全部數據了,生產者也認為他們的寫操作成功了

那如果第二個follower后來他的fullgc結束了,開始大力追趕leader的數據,慢慢LEO又控制在replica.lag.max.messages限定的范圍內了,此時follower會重新加回到ISR列表里去

上面就是ISR的工作原理和機制,一般導致follower跟不上的情況主要就是以下三種:

(1)follower所在機器的性能變差,比如說網絡負載過高,IO負載過高,CPU負載過高,機器負載過高,都可能導致機器性能變差,同步 過慢,這個時候就可能導致某個follower的LEO一直跟不上leader,就從ISR列表里移除了

我們生產環(huán)境遇到的一些問題,kafka,機器層面,某臺機器磁盤壞了,物理機的磁盤有故障,寫入性能特別差,此時就會導致follower,CPU負載太高了,線程間的切換太頻繁了,CPU忙不過來了,網卡被其他的程序給打滿了,就導致網絡傳輸的速度特別慢

(2)follower所在的broker進程卡頓,常見的就是fullgc問題

kafka自己本身對jvm的使用是很有限的,生產集群部署的時候,他主要是接收到數據直接寫本地磁盤,寫入os cache,他一般不怎么在自己的內存里維護過多的數據,主要是依托os cache(緩存)來提高讀和寫的性能的

(3)kafka是支持動態(tài)調節(jié)副本數量的,如果動態(tài)增加了partition的副本,就會增加新的follower,此時新的follower會拼命從leader上同步數據,但是這個是需要過程的,所以此時需要等待一段時間才能跟leader同步

replica.lag.max.messages主要是解決第一種情況的,還有一個replica.lag.time.max.ms是解決第二種情況的,比如設置為500ms,那么如果在500ms內,follower沒法送請求找leader來同步數據,說明他可能在fullgc,此時就會從ISR里移除

15. Kafka 0.8.2.x版本的ISR機制在生產環(huán)境有什么缺陷?

之前說的那套ISR機制是kafka 0.8.x系列的機制,其實是有缺陷的,那個參數默認的值是4000,也就是follower落后4000條數據就認為是out-of-sync,但是這里有一個問題,就是這個數字是固定死的

如果說現在生產端突然之間涌入幾萬條數據呢?是不是有可能leader瞬間剛接收到幾萬條消息,然后所有follower還沒來得及同步過去,此時所有follower都會被踢出ISR列表?然后同步了之后,再回到ISR列表

所以這種依靠固定參數判斷的機制,會導致可能在系統(tǒng)高峰時期,follower會頻繁的踢出ISR列表再回到ISR列表,這種完全無意義的事情

一般來說在kafka 0.8.2.x系列版本上生產的時候,一遍都會把這個ISR落后 判定閾值設置的大一些,避免上述的情況出現,你可以設置個幾萬,10萬,4000,如果你公司里沒那么大的高峰并發(fā)量,每秒就是幾千的并發(fā),那就沒問題了

16. Kafka 0.9x版本之后的ISR機制做了哪些優(yōu)化適應生產環(huán)境?

kafka 0.9.x之后去掉了原來的replica.lag.max.messages參數,引入了一個新的replica.lag.time.max.ms參數,默認值是10秒,這個就不按照落后的條數來判斷了,而是說如果某個follower的LEO一直落后leader超過了10秒,那么才判定這個follower是out-of-sync的

這樣假如說線上出現了流量洪峰,一下子導致幾個follower都落后了不少數據,但是只要盡快追上來,在10秒內別一直落后,就不會認為是out-of-sync,這個機制在線上實踐會發(fā)現效果要好多了

三、稀疏索引文件底層原理

1. 深入看看Kafka在磁盤上是如何采用分段機制保存日志的?

每個分區(qū)對應的目錄,就是“topic-分區(qū)號”的格式,比如說有個topic叫做“order-topic”,那么假設他有3個分區(qū),每個分區(qū)在一臺機器上,那么3臺機器上分別會有3個目錄,“order-topic-0”,“order-topic-1”,“order-topic-2”

每個分區(qū)里面就是很多的log segment file,也就是日志段文件,每個分區(qū)的數據會被拆分為多個段,放在多個文件里,每個文件還有自己的索引文件,大概格式可能如下所示:

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex

00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex

00000000000009936472.index
00000000000009936472.log
00000000000009936472.timeindex

這個9936472之類的數字,就是代表了這個日志段文件里包含的起始offset,也就說明這個分區(qū)里至少都寫入了接近1000萬條數據了

kafka broker有一個參數,log.segment.bytes,限定了每個日志段文件的大小,最大就是1GB,一個日志段文件滿了,就自動開一個新的日志段文件來寫入,避免單個文件過大,影響文件的讀寫性能,這個過程叫做log rolling

正在被寫入的那個日志段文件,叫做active log segment

2. 引入索引文件之后如何基于二分查找快速定位數據?

日志段文件,.log文件會對應一個.index和.timeindex兩個索引文件

kafka在寫入日志文件的時候,同時會寫索引文件,就是.index和.timeindex,一個是位移索引,一個是時間戳索引,是兩種索引

默認情況下,有個參數log.index.interval.bytes限定了在日志文件寫入多少數據,就要在索引文件寫一條索引,默認是4KB,寫4kb的數據然后在索引里寫一條索引,所以索引本身是稀疏格式的索引,不是每條數據對應一條索引的

而且索引文件里的數據是按照位移和時間戳升序排序的,所以kafka在查找索引的時候,會用二分查找,時間復雜度是O(logN),找到索引,就可以在.log文件里定位到數據了

.index

44576 物理文件(.log位置)
57976 物理文件(.log位置)
64352 物理文件(.log位置)

offset = 58892 => 57976這條數據對應的.log文件的位置

接著就可以從.log文件里的57976這條數對應的位置開始查找,去找offset = 58892這條數據在.log里的完整數據

.timeindex是時間戳索引文件,如果要查找某段時間范圍內的時間,先在這個文件里二分查找找到offset,然后再去.index里根據offset二分查找找對應的.log文件里的位置,最后就去.log文件里查找對應的數據

3. 磁盤上的日志文件是按照什么策略定期清理騰出空間的?

大家可以想,不可能說每天涌入的數據都一直留存在磁盤上,本質kafka是一個流式數據的中間件,不需要跟離線存儲系統(tǒng)一樣保存全量的大數據,所以kafka是會定期清理掉數據的,這里有幾個清理策略

kafka默認是保留最近7天的數據,每天都會把7天以前的數據給清理掉,包括.log、.index和.timeindex幾個文件,log.retention.hours參數,可以自己設置數據要保留多少天,你可以根據自己線上的場景來判斷一下

只要你的數據保留在kafka里,你隨時可以通過offset的指定,隨時可以從kafka樓出來幾天之前的數據,數據回放一遍,下游的數據,有多么的重要,如果是特別核心的數據,在kafka這個層面,可以保留7天,甚至是15天的數據

下游的消費者消費了數據之后,數據丟失了,你需要從kafka里樓出來3天前的數據,重新來回放處理一遍

在大數據的實時分析的項目里,其實就會涉及到這個東西的一個使用,如果你今天實時分析的一些數據出錯了,此時你就需要把過去幾天的數據重新樓出來回放一遍,重新來算一遍。實時數據分析的結果和hadoop離線分析的結果做一個比對

你每天都會從kafka里摟出來幾天前的數據,算一下,跟離線數據的結果做一個比對

kafka broker會在后臺啟動線程異步的進行日志清理的工作

四、Reactor網絡模型

1. Kafka是如何自定義TCP之上的通信協議以及使用長連接通信的

kafka的通信主要發(fā)生于生產端和broker之間,broker和消費端之間,broker和broker之間,這些通信都是基于TCP協議進行的,大家自己看看網絡課程,底層基于TCP連接和傳輸數據,應用層的協議,是Kafka自己自定義的

所謂自定義協議,就是定好傳輸數據的格式,請求格式、響應格式,這樣大家就可以統(tǒng)一按照規(guī)定好的格式來封裝、傳輸和解析數據了

生產端發(fā)送數據到kafka broker來,此時發(fā)送的數據是這樣子的:

sent data: 一大串數據

kafka broker直接就從sent data:截取一大段數據就可以用了,如果你沒有自定義一套完整的協議,是沒辦法進行通信的

http協議,生產端也可以發(fā)送http協議的數據給kafka broker,http請求,http響應。應用層的協議,規(guī)定了數據請求和響應的種種復雜的格式,大家全部按照這個格式和規(guī)范來走,不要亂來

request v1.1

isCache: true

一大串數據

對于生產端和broker,消費端和broker來說,還會基于TCP建立長連接(具體見網絡課程),也就是維護一批長連接,然后通過固定的連接不斷的傳輸數據,避免頻繁的創(chuàng)建連接和銷毀連接的開銷

broker端會構造一個請求隊列,然后不停的獲取請求放入隊列,后臺再搞一堆的線程來獲取請求進行處理

2. Broker是如何基于Reactor模式進行多路復用請求處理的?

Kafka核心原理第一彈——更新中,Java架構,kafka,分布式,java
每個broker上都有一個acceptor線程和很多個processor線程,可以用num.network.threads參數設置processor線程的數量,默認是3,client跟一個broker之間只會創(chuàng)建一個socket長連接,他會復用

然后broker就用一個acceptor來監(jiān)聽每個socket連接的接入,分配這個socket連接給一個processor線程,processor線程負責處理這個socket連接,監(jiān)聽socket連接的數據傳輸以及客戶端發(fā)送過來的請求,acceptor線程會不停的輪詢各個processor來分配接入的socket連接

proessor需要處理多個客戶端的socket連接,就是通過java nio的selector多路復用思想來實現的,用一個selector監(jiān)聽各個socket連接,看其是否有請求發(fā)送過來,這樣一個processor就可以處理多個客戶端的socket連接了

processor線程會負責把請求放入一個broker全局唯一的請求隊列,默認大小是500,是queued.max.requests參數控制的,所以那幾個processor會不停的把請求放入這個請求隊列中

接著就是一個KafkaRequestHandler線程池負責不停的從請求隊列中獲取請求來處理,這個線程池大小默認是8個,由num.io.threads參數來控制,處理完請求后的響應,會放入每個processor自己的響應隊列里

每個processor其實就是負責對多個socket連接不停的監(jiān)聽其傳入的請求,放入請求隊列讓KafkaRequestHandler來處理,然后會監(jiān)聽自己的響應隊列,把響應拿出來通過socket連接發(fā)送回客戶端

五、Controller選舉與故障轉移原理剖析

1. 如何對Kafka集群進行整體控制:Controller是什么東西?

不知道大家有沒有思考過一個問題,就是Kafka集群中某個broker宕機之后,是誰負責感知到他的宕機,以及負責進行Leader Partition的選舉?如果你在Kafka集群里新加入了一些機器,此時誰來負責把集群里的數據進行負載均衡的遷移?

包括你的kafka集群的各種元數據,比如說每臺機器上有哪些partition,誰是leader,誰是follower,是誰來管理的?如果你要刪除一個topic,那么背后的各種partition如何刪除,是誰來控制?

還有就是比如kafka集群擴容加入一個新的broker,是誰負責監(jiān)聽這個broker的加入?如果某個broker崩潰了,是誰負責監(jiān)聽這個broker崩潰?

這里就需要一個kafka集群的總控組件,Controller。他負責管理整個kafka集群范圍內的各種東西

2. 如何基于Zookeeper實現Controller的選舉以及故障轉移

在kafka集群啟動的時候,會自動選舉一臺broker出來承擔controller的責任,然后負責管理整個集群,這個過程就是說集群中每個broker都會嘗試在zk上創(chuàng)建一個/controller臨時節(jié)點

zk的一些基礎知識和臨時節(jié)點是什么,百度一下zookeeper入門

但是zk會保證只有一個人可以創(chuàng)建成功,這個人就是所謂controller角色

一旦controller所在broker宕機了,此時臨時節(jié)點消失,集群里其他broker會一直監(jiān)聽這個臨時節(jié)點,發(fā)現臨時節(jié)點消失了,就爭搶再次創(chuàng)建臨時節(jié)點,保證有一臺新的broker會成為controller角色

3. 創(chuàng)建Topic時Kafka Controller是如何完成Leader選舉的呢?

如果你現在創(chuàng)建一個Topic,肯定會分配幾個Partition,每個partition還會指定幾個副本,這個時候創(chuàng)建的過程中就會在zookeeper中注冊對應的topic的元數據,包括他有幾個partition,每個partition有幾個副本,每個partition副本的狀態(tài),此時狀態(tài)都是:NonExistentReplica

然后Kafka Controller本質其實是會監(jiān)聽zk上的數據變更的,所以此時就會感知到topic變動,接著會從zk中加載所有partition副本到內存里,把這些partition副本狀態(tài)變更為:NewReplica,然后選擇的第一個副本作為leader,其他都是follower,并且把他們都放到partition的ISR列表中

比如說你創(chuàng)建一topic,order_topic,3個partition,每個partition有2個副本,寫入zk里去

/topics/order_topic

partitions = 3, replica_factor = 2

[partition0_1, partition0_2]
[partition1_1, partition1_2]
[partition2_1, partition2_2]

從每個parititon的副本列表中取出來第一個作為leader,其他的就是follower,把這些東西給放到partition對應的ISR列表里去

每個partition的副本在哪臺機器上呢?會做一個均勻的分配,把partition分散在各個機器上面,通過算法來保證,盡可能把每個leader partition均勻分配在各個機器上,讀寫請求流量都是打在leader partition上的

同時還會設置整個Partition的狀態(tài):OnlinePartition

接著Controller會把這個partition和副本所有的信息(包括誰是leader,誰是follower,ISR列表),都發(fā)送給所有broker讓他們知曉,在kafka集群里,controller負責集群的整體控制,但是每個broker都有一份元數據

4. 刪除Topic時又是如何通過Kafka Controller控制數據清理?

如果你要是刪除某個Topic的話,Controller會發(fā)送請求給這個Topic所有Partition所在的broker機器,通知設置所有Partition副本的狀態(tài)為:OfflineReplica,也就是讓副本全部下線,接著Controller接續(xù)將全部副本狀態(tài)變?yōu)椋篟eplicaDeletionStarted

然后Controller還要發(fā)送請求給broker,把各個partition副本的數據給刪了,其實對應的就是刪除磁盤上的那些文件,刪除成功之后,副本狀態(tài)變?yōu)椋篟eplicaDeletionSuccessful,接著再變?yōu)镹onExistentReplica

而且還會設置分區(qū)狀態(tài)為:Offline文章來源地址http://www.zghlxwxcb.cn/news/detail-676188.html

5. Kafka Controller是如何基于ZK感知Broker的上線以及崩潰的?

到了這里,關于Kafka核心原理第一彈——更新中的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • kafka第一課-Kafka快速實戰(zhàn)以及基本原理詳解

    kafka第一課-Kafka快速實戰(zhàn)以及基本原理詳解

    Kafka是一個分布式的發(fā)布-訂閱消息系統(tǒng),可以快速地處理高吞吐量的數據流,并將數據實時地分發(fā)到多個消費者中。Kafka消息系統(tǒng)由多個broker(服務器)組成,這些broker可以在多個數據中心之間分布式部署,以提供高可用性和容錯性。 Kafka的基本架構由生產者、消費者和主題

    2024年02月16日
    瀏覽(21)
  • Kafka的核心原理

    Kafka的核心原理

    目錄 Tpoic的分區(qū)和副本機制 分區(qū) 副本 ?消息存儲機制和查詢機制 消息存儲機制 ? log文件和index文件的解析 ?index文件內容基本結構 查詢機制 Kafka中生產者數據分發(fā)策略 隨機分發(fā)策略 指定分區(qū)策略 Hash取模策略 自定義分區(qū)策略 輪詢分發(fā)策略 和 粘性分發(fā)策略 Kafka消費者的負

    2024年01月17日
    瀏覽(18)
  • Kafka核心原理之精準一次性投遞

    Kafka核心原理之精準一次性投遞

    在Kafka中,精準一次性投遞(Exactly Once)=至少投遞一次(At Least Once)+冪等性。 至少投遞一次(At Least Once):將生產端參數acks設置為-1(all),可以保證生產端發(fā)送到Broker的消息不會丟失,即:至少投遞一次(At Least Once)。 冪等性: 冪等生產者冪保證單分區(qū)單會話內精準一

    2024年04月25日
    瀏覽(27)
  • 軟考A計劃-系統(tǒng)架構師-學習筆記-第一彈

    軟考A計劃-系統(tǒng)架構師-學習筆記-第一彈

    點擊跳轉專欄=Unity3D特效百例 點擊跳轉專欄=案例項目實戰(zhàn)源碼 點擊跳轉專欄=游戲腳本-輔助自動化 點擊跳轉專欄=Android控件全解手冊 點擊跳轉專欄=Scratch編程案例 點擊跳轉=軟考全系列 專注于 Android/Unity 和各種游戲開發(fā)技巧,以及 各種資源分享 (網站、工具、素材、源碼、

    2024年02月08日
    瀏覽(16)
  • 深入Kafka核心設計與實踐原理讀書筆記第二章

    深入Kafka核心設計與實踐原理讀書筆記第二章

    配置生產者客戶端參數及創(chuàng)建相應的生產者實例。 構建待發(fā)送的消息。 發(fā)送消息 關閉實列 參數說明 bootstrap.servers :用來指定生產者客戶端鏈接Kafka集群搜需要的broker地址清單,具體格式 host1:port1,host2:port2,可以設置一個或多個地址中間,號分割,參數默認 空串。 這里要注意

    2023年04月08日
    瀏覽(54)
  • kafka架構和原理詳解

    kafka架構和原理詳解

    Apache Kafka 是一個分布式流數據平臺,用于高吞吐量、持久性、可擴展的發(fā)布和訂閱消息。它具有高度的可靠性,被廣泛用于構建實時數據流處理、日志收集和數據管道等應用。 1. 主題(Topic): 主題是消息的邏輯分類 生產者將消息發(fā)布到特定的主題中,而消費者可以訂閱一

    2024年02月10日
    瀏覽(28)
  • Kafka架構原理(超級詳細)

    Kafka架構原理(超級詳細)

    1 ) Kafka 是開源 消息系統(tǒng) 2 )最初由 LinkedIn 公司開發(fā),2011年開源,2012年10月從 Apache 畢業(yè)。 項目目標是:為處理實時數據,提供一個統(tǒng)一、高通量、低等待的平臺。 3 ) Kafka 是一個分布式消息隊列。 Kafka 對消息根據 Topic 進行歸類。發(fā)送消息 Producer,接收消息 Consumer kafka

    2024年02月08日
    瀏覽(22)
  • Kafka架構原理(三)

    Kafka架構原理(三)

    3.1 整體架構圖 一個典型的kafka集群中包含若干個Producer,若干個Broker,若干個Consumer,以及一個zookeeper集群; kafka通過zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時進行Rebalance(負載均衡);Producer使用push模式將消息發(fā)布到Broker;Consumer使用pull模式從Broker中訂

    2024年02月05日
    瀏覽(15)
  • kafka原理&架構深入

    kafka原理&架構深入

    https://www.cnblogs.com/zhangzhonghui/articles/12444070.html kafka配置詳解 若kafka運行在內網服務器允許外網訪問,例如內網ip: 172.10.22.134,外網ip: 9.70.168.130 進行如下配置: 外網訪問時使用 9.70.168.130:9092 訪問即可 cd kafka安裝目錄 后臺啟動 停止 topic producer 查看 consumer-groups 開啟consumer消費某

    2024年02月11日
    瀏覽(29)
  • Kafka核心設計與實踐原理:設計理念、基本概念、主要功能與應用場景

    Kafka核心設計與實踐原理:設計理念、基本概念、主要功能與應用場景

    詳細介紹Kafka作為分布式流式處理平臺的設計理念、基本概念,以及其主要功能與應用場景,包括消息系統(tǒng)、容錯的持久化、流式處理平臺等功能,同時探討如何保證消息的唯一性、消費順序等問題。

    2024年02月22日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包