Kafka與時間輪
Kafka中存在大量的延時操作。
1、發(fā)送消息-超時+重試機制
2、ACKS 用于指定分區(qū)中必須要有多少副本收到這條消息,生產(chǎn)者才認為寫入成功(延時 等)
Kafka并沒有使用JDK自帶的Timer或者DelayQueue來實現(xiàn)延遲的功能,而是基于時間輪自定義了一個用于實現(xiàn)延遲功能的定時器(SystemTimer)
JDK的Timer和DelayQueue插入和刪除操作的平均時間復(fù)雜度為O(log(n)),并不能滿足Kafka的高性能要求,而基于時間輪可以將插入和刪除操作的時間復(fù)雜度都降為O(1)。
時間輪的應(yīng)用并非Kafka獨有,其應(yīng)用場景還有很多,在Netty、Akka、Quartz、Zookeeper等組件中都存在時間輪的蹤影。
時間輪
Java中任務(wù)調(diào)度
要回答這個問題,我們先從Java中最原始的任務(wù)調(diào)度的方法說起。
給你一批任務(wù)(假設(shè)有1000個任務(wù)),都是不同的時間執(zhí)行的,時間精確到秒,你怎么實現(xiàn)對所有的任務(wù)的調(diào)度?
第一種思路是啟動一個線程,每秒鐘對所有的任務(wù)進行遍歷,找出執(zhí)行時間跟當(dāng)前時間匹配的,執(zhí)行它。如果任務(wù)數(shù)量太大,遍歷和比較所有任務(wù)會比較浪費時間。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Cu92Jn2L-1690353345189)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps14.jpg)]
第二個思路,把這些任務(wù)進行排序,執(zhí)行時間近(先觸發(fā))的放在前面。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-GfOAIem8-1690353345192)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps15.jpg)]
如果是數(shù)組的時間的話,這里會涉及到大量的元素移動(新加入任務(wù),任務(wù)執(zhí)行–刪除任務(wù)之類,都需要重新排序)
那么在Java代碼怎么實現(xiàn)呢?
JDK包里面自帶了一個Timer工具類(java.util包下),可以實現(xiàn)延時任務(wù)(例如30分鐘以后觸發(fā)),也可以實現(xiàn)周期性任務(wù)(例如每1小時觸發(fā)一次)。
它的本質(zhì)是一個優(yōu)先隊列(TaskQueue),和一個執(zhí)行任務(wù)的線程(TimerThread)。
(普通的隊列是一種先進先出的數(shù)據(jù)結(jié)構(gòu),元素在隊列尾追加,而從隊列頭刪除。在優(yōu)先隊列中,元素被賦予優(yōu)先級。當(dāng)訪問元素時,具有最高優(yōu)先級的元素最先刪除。優(yōu)先隊列具有最高級先出 (first in, largest out)的行為特征。通常采用堆數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)。)
在這個優(yōu)先隊列中,最先需要執(zhí)行的任務(wù)排在優(yōu)先隊列的第一個。然后 TimerThread 不斷地拿第一個任務(wù)的執(zhí)行時間和當(dāng)前時間做對比。如果時間到了先看看這個任務(wù)是不是周期性執(zhí)行的任務(wù),如果是則修改當(dāng)前任務(wù)時間為下次執(zhí)行的時間,如果不是周期性任務(wù)則將任務(wù)從優(yōu)先隊列中移除。最后執(zhí)行任務(wù)。
但是Timer是單線程的,在很多場景下不能滿足業(yè)務(wù)需求。
在JDK1.5之后,引入了一個支持多線程的任務(wù)調(diào)度工具ScheduledThreadPoolExecutor用來替代TImer,它是幾種常用的線程池之一??纯礃?gòu)造函數(shù),里面是一個延遲隊列DelayedWorkQueue,也是一個優(yōu)先隊列。
DelayedWorkQueue的最小堆實現(xiàn)
優(yōu)先隊列的使用的是最小堆實現(xiàn)。
最小堆的含義: 一種完全二叉樹, 父結(jié)點的值小于或等于它的左子節(jié)點和右子節(jié)點
比如插入以下的數(shù)據(jù) [1,2,3,7,17,19,25,36,100]
最小堆就長成這個樣子。
優(yōu)先隊列的插入和刪除的時間復(fù)雜度是O(logn),當(dāng)數(shù)據(jù)量大的時候,頻繁的入堆出堆性能不是很好。
比如要插入0,過程如下:
1、插入末尾元素
2、0比19小,所以要向上移動且互換。
3、0比2小,所以要向上移動且互換。
4、0比2小,所以要向上移動且互換。
算法復(fù)雜度
N個數(shù)據(jù)的最小堆, 共有l(wèi)ogN層, 最壞的情況下, 需要移動logN次
時間輪
這里我們先考慮對所有的任務(wù)進行分組,把相同執(zhí)行時刻的任務(wù)放在一起。比如這里,數(shù)組里面的一個下標就代表1秒鐘。它就會變成一個數(shù)組加鏈表的數(shù)據(jù)結(jié)構(gòu)。分組以后遍歷和比較的時間會減少一些。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-dpk4Jicj-1690353345195)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps16.jpg)]
但是還是有問題,如果任務(wù)數(shù)量非常大,而且時間都不一樣,或者有執(zhí)行時間非常遙遠的任務(wù),那這個數(shù)組長度是不是要非常地長?比如有個任務(wù)2個月之后執(zhí)行,從現(xiàn)在開始計算,它的下標是5253120。
所以長度肯定不能是無限的,只能是固定長度的。比如固定長度是8,一個格子代表1秒(現(xiàn)在叫做一個bucket槽),一圈可以表示8秒。遍歷的線程只要一個格子一個格子的獲取任務(wù),并且執(zhí)行就OK了。
固定長度的數(shù)組怎么用來表示超出最大長度的時間呢?可以用循環(huán)數(shù)組。
比如一個循環(huán)數(shù)組長度8,可以表示8秒。8秒以后執(zhí)行的任務(wù)怎么放進去?只要除以8,用得到的余數(shù),放到對應(yīng)的格子就OK了。比如10%8=2,它放在第2個格子。這里就有了輪次的概念,第10秒的任務(wù)是第二輪的時候才執(zhí)行。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-qCMzQcjf-1690353345196)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps17.jpg)]
這時候,時間輪的概念已經(jīng)出來了。
如果任務(wù)數(shù)量太多,相同時刻執(zhí)行的任務(wù)很多,會導(dǎo)致鏈表變得非常長。這里我們可以進一步對這個時間輪做一個改造,做一個多層的時間輪。
比如:最內(nèi)層8個格子,每個格子1秒;外層8個格子,每個格子8*8=64秒;最內(nèi)層走一圈,外層走一格。這時候時間輪就跟時鐘更像了。隨著時間流動,任務(wù)會降級,外層的任務(wù)會慢慢地向內(nèi)層移動。
時間輪任務(wù)插入和刪除時間復(fù)雜度都為O(1),應(yīng)用范圍非常廣泛,更適合任務(wù)數(shù)很大的延時場景。Dubbo、Netty、Kafka中都有實現(xiàn)。
Kafka中時間輪實現(xiàn)
Kafka里面TimingWheel的數(shù)據(jù)結(jié)構(gòu)
kafka會啟動一個線程,去推動時間輪的指針轉(zhuǎn)動。其實現(xiàn)原理其實就是通過queue.poll()取出放在最前面的槽的TimerTaskList
添加新的延遲任務(wù)
往時間輪添加新的任務(wù)
時間輪指針的推進
第二層時間輪的創(chuàng)建代碼如下
Kafka性能問題
1、kafka如何確保消息的可靠性傳輸
這個問題需要從以下3個方面分析和解決
(1)消費端弄丟了數(shù)據(jù)
唯一可能導(dǎo)致消費者弄丟數(shù)據(jù)的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經(jīng)消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
大家都知道kafka會自動提交offset,那么只要關(guān)閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數(shù)據(jù)不會丟。但是此時確實還是會重復(fù)消費,比如你剛處理完,還沒提交offset,結(jié)果自己掛了,此時肯定會重復(fù)消費一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的kafka消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的queue里先緩沖一下,結(jié)果有的時候,你剛把消息寫入內(nèi)存queue,然后消費者會自動提交offset。
然后此時我們重啟了系統(tǒng),就會導(dǎo)致內(nèi)存queue里還沒來得及處理的數(shù)據(jù)就丟失了
(2)kafka弄丟了數(shù)據(jù)
這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些數(shù)據(jù)沒有同步,結(jié)果此時leader掛了,然后選舉某個follower成leader之后,他不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
所以此時一般是要求起碼設(shè)置如下4個參數(shù):
給這個topic設(shè)置replication.factor參數(shù):這個值必須大于1,要求每個partition必須有至少2個副本。
在kafka服務(wù)端設(shè)置min.insync.replicas參數(shù):這個值必須大于1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯(lián)系,沒掉隊,這樣才能確保leader掛了還有一個follower吧。
在producer端設(shè)置acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有replica之后,才能認為是寫成功了。
在producer端設(shè)置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
(3)生產(chǎn)者會不會弄丟數(shù)據(jù)
如果按照上述的思路設(shè)置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試,重試無限次。
2、如何實現(xiàn)Kafka的高性能?
1、宏觀架構(gòu)層面利用Partition實現(xiàn)并行處理
Kafka中每個Topic都包含一個或多個Partition,不同Partition可位于不同節(jié)點。同時Partition在物理上對應(yīng)一個本地文件夾,每個Partition包含一個或多個Segment,每個Segment包含一個數(shù)據(jù)文件和一個與之對應(yīng)的索引文件。在邏輯上,可以把一個Partition當(dāng)作一個非常長的數(shù)組,可通過這個“數(shù)組”的索引(offset)去訪問其數(shù)據(jù)。
一方面,由于不同Partition可位于不同機器,因此可以充分利用集群優(yōu)勢,實現(xiàn)機器間的并行處理。另一方面,由于Partition在物理上對應(yīng)一個文件夾,即使多個Partition位于同一個節(jié)點,也可通過配置讓同一節(jié)點上的不同Partition置于不同的disk drive上,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢。
利用多磁盤的具體方法是,將不同磁盤mount到不同目錄,然后在server.properties中,將log.dirs設(shè)置為多目錄(用逗號分隔)。Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。
Partition是最小并發(fā)粒度,Partition個數(shù)決定了可能的最大并行度。
2、充分利用PageCache
Page Cache,又稱pcache,其中文名稱為頁高速緩沖存儲器,簡稱頁高緩。page cache的大小為一頁,通常為4K。在linux讀寫文件時,它用于緩存文件的邏輯內(nèi)容,從而加快對磁盤上映像和數(shù)據(jù)的訪問。 是Linux操作系統(tǒng)的一個特色。
1、讀Cache
當(dāng)內(nèi)核發(fā)起一個讀請求時(例如進程發(fā)起read()請求),首先會檢查請求的數(shù)據(jù)是否緩存到了Page Cache中。
如果有,那么直接從內(nèi)存中讀取,不需要訪問磁盤,這被稱為cache命中(cache hit);
如果cache中沒有請求的數(shù)據(jù),即cache未命中(cache miss),就必須從磁盤中讀取數(shù)據(jù)。然后內(nèi)核將讀取的數(shù)據(jù)緩存到cache中,這樣后續(xù)的讀請求就可以命中cache了。
page可以只緩存一個文件部分的內(nèi)容,不需要把整個文件都緩存進來。
2、寫Cache
當(dāng)內(nèi)核發(fā)起一個寫請求時(例如進程發(fā)起write()請求),同樣是直接往cache中寫入,后備存儲中的內(nèi)容不會直接更新(當(dāng)服務(wù)器出現(xiàn)斷電關(guān)機時,存在數(shù)據(jù)丟失風(fēng)險)。
內(nèi)核會將被寫入的page標記為dirty,并將其加入dirty list中。內(nèi)核會周期性地將dirty list中的page寫回到磁盤上,從而使磁盤上的數(shù)據(jù)和內(nèi)存中緩存的數(shù)據(jù)一致。
當(dāng)滿足以下兩個條件之一將觸發(fā)臟數(shù)據(jù)刷新到磁盤操作:
數(shù)據(jù)存在的時間超過了dirty_expire_centisecs(默認300厘秒,即30秒)時間;
臟數(shù)據(jù)所占內(nèi)存 > dirty_background_ratio,也就是說當(dāng)臟數(shù)據(jù)所占用的內(nèi)存占總內(nèi)存的比例超過dirty_background_ratio(默認10,即系統(tǒng)內(nèi)存的10%)的時候會觸發(fā)pdflush刷新臟數(shù)據(jù)。
如何查看Page Cache參數(shù)
執(zhí)行命令 sysctl -a|grep dirty
如何調(diào)整內(nèi)核參數(shù)來優(yōu)化IO性能?
(1)vm.dirty_background_ratio參數(shù)優(yōu)化
這個參數(shù)指定了當(dāng)文件系統(tǒng)緩存臟頁數(shù)量達到系統(tǒng)內(nèi)存百分之多少時(如5%)就會觸發(fā)后臺回寫進程運行,將一定緩存的臟頁異步地刷入磁盤;
當(dāng)cached中緩存當(dāng)數(shù)據(jù)占總內(nèi)存的比例達到這個參數(shù)設(shè)定的值時將觸發(fā)刷磁盤操作。
把這個參數(shù)適當(dāng)調(diào)小,這樣可以把原來一個大的IO刷盤操作變?yōu)槎鄠€小的IO刷盤操作,從而把IO寫峰值削平。
對于內(nèi)存很大和磁盤性能比較差的服務(wù)器,應(yīng)該把這個值設(shè)置的小一點。
(2)vm.dirty_ratio參數(shù)優(yōu)化
這個參數(shù)則指定了當(dāng)文件系統(tǒng)緩存臟頁數(shù)量達到系統(tǒng)內(nèi)存百分之多少時(如10%),系統(tǒng)不得不開始處理緩存臟頁(因為此時臟頁數(shù)量已經(jīng)比較多,為了避免數(shù)據(jù)丟失需要將一定臟頁刷入外存);在此過程中很多應(yīng)用進程可能會因為系統(tǒng)轉(zhuǎn)而處理文件IO而阻塞。
對于寫壓力特別大的,建議把這個參數(shù)適當(dāng)調(diào)大;對于寫壓力小的可以適當(dāng)調(diào)?。蝗绻鹀ached的數(shù)據(jù)所占比例(這里是占總內(nèi)存的比例)超過這個設(shè)置,
系統(tǒng)會停止所有的應(yīng)用層的IO寫操作,等待刷完數(shù)據(jù)后恢復(fù)IO。所以萬一觸發(fā)了系統(tǒng)的這個操作,對于用戶來說影響非常大的。
(3)vm.dirty_expire_centisecs參數(shù)優(yōu)化
這個參數(shù)會和參數(shù)vm.dirty_background_ratio一起來作用,一個表示大小比例,一個表示時間;即滿足其中任何一個的條件都達到刷盤的條件。
為什么要這么設(shè)計呢?我們來試想一下以下場景:
如果只有參數(shù) vm.dirty_background_ratio ,也就是說cache中的數(shù)據(jù)需要超過這個閥值才會滿足刷磁盤的條件;
如果數(shù)據(jù)一直沒有達到這個閥值,那相當(dāng)于cache中的數(shù)據(jù)就永遠無法持久化到磁盤,這種情況下,一旦服務(wù)器重啟,那么cache中的數(shù)據(jù)必然丟失。
結(jié)合以上情況,所以添加了一個數(shù)據(jù)過期時間參數(shù)。當(dāng)數(shù)據(jù)量沒有達到閥值,但是達到了我們設(shè)定的過期時間,同樣可以實現(xiàn)數(shù)據(jù)刷盤。
這樣可以有效的解決上述存在的問題,其實這種設(shè)計在絕大部分框架中都有。
(4)vm.dirty_writeback_centisecs參數(shù)優(yōu)化
理論上調(diào)小這個參數(shù),可以提高刷磁盤的頻率,從而盡快把臟數(shù)據(jù)刷新到磁盤上。但一定要保證間隔時間內(nèi)一定可以讓數(shù)據(jù)刷盤完成。
(5)vm.swappiness參數(shù)優(yōu)化
禁用swap空間,設(shè)置vm.swappiness=0
3、減少網(wǎng)絡(luò)開銷批處理
批處理是一種常用的用于提高I/O性能的方式。對Kafka而言,批處理既減少了網(wǎng)絡(luò)傳輸?shù)腛verhead,又提高了寫磁盤的效率。
Kafka 的send方法并非立即將消息發(fā)送出去,而是通過batch.size和linger.ms控制實際發(fā)送頻率,從而實現(xiàn)批量發(fā)送。
由于每次網(wǎng)絡(luò)傳輸,除了傳輸消息本身以外,還要傳輸非常多的網(wǎng)絡(luò)協(xié)議本身的一些內(nèi)容(稱為Overhead),所以將多條消息合并到一起傳輸,可有效減少網(wǎng)絡(luò)傳輸?shù)腛verhead,進而提高了傳輸效率。
4、數(shù)據(jù)壓縮降低網(wǎng)絡(luò)負載
Kafka支持將數(shù)據(jù)壓縮后再傳輸給Broker。除了可以將每條消息單獨壓縮然后傳輸外,Kafka還支持在批量發(fā)送時,將整個Batch的消息一起壓縮后傳輸。數(shù)據(jù)壓縮的一個基本原理是,重復(fù)數(shù)據(jù)越多壓縮效果越好。因此將整個Batch的數(shù)據(jù)一起壓縮能更大幅度減小數(shù)據(jù)量,從而更大程度提高網(wǎng)絡(luò)傳輸效率。
Broker接收消息后,并不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch到數(shù)據(jù)后再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網(wǎng)絡(luò)傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了Consumer與Broker間的網(wǎng)絡(luò)傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
5、高效的序列化方式
Kafka消息的Key和Value的類型可自定義,只需同時提供相應(yīng)的序列化器和反序列化器即可。文章來源:http://www.zghlxwxcb.cn/news/detail-610180.html
因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網(wǎng)絡(luò)傳輸和磁盤存儲的數(shù)據(jù)規(guī)模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。文章來源地址http://www.zghlxwxcb.cn/news/detail-610180.html
到了這里,關(guān)于八、Kafka時間輪與常見問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!