前言
通常我們?cè)陂_(kāi)發(fā)完Flink任務(wù)提交運(yùn)行后,需要對(duì)任務(wù)的參數(shù)進(jìn)行一些調(diào)整,通常需要調(diào)整的情況是任務(wù)消費(fèi)速度跟不上數(shù)據(jù)寫(xiě)入速度,從而導(dǎo)致實(shí)時(shí)任務(wù)出現(xiàn)反壓、內(nèi)存GC頻繁(FullGC)頻繁、內(nèi)存溢出導(dǎo)致TaskManager被Kill。
今天講一下Flink任務(wù)中常見(jiàn)的性能場(chǎng)景及解決思路。
反壓
在Flink任務(wù)中多個(gè)Task之間需要進(jìn)行數(shù)據(jù)交換,在流式計(jì)算中數(shù)據(jù)的生產(chǎn)方的生產(chǎn)速度和消費(fèi)方的消費(fèi)速度不匹配時(shí),可能會(huì)導(dǎo)致計(jì)算節(jié)點(diǎn)OOM或丟失數(shù)據(jù),在Flink中通過(guò)反壓機(jī)制平衡數(shù)據(jù)生產(chǎn)方和消費(fèi)方的處理速度,以求系統(tǒng)達(dá)到整體的平衡。
實(shí)時(shí)任務(wù)出現(xiàn)反壓時(shí),在Blink版本中做了大量的改進(jìn),從資源使用、作業(yè)調(diào)優(yōu)、日志查詢等維度新增了大量功能,使得用戶可以更方便的對(duì)Flink作業(yè)進(jìn)行運(yùn)維,Vertex 增加了InQueue,OutQueue等多項(xiàng)指標(biāo),可以方便的追蹤數(shù)據(jù)的反壓、過(guò)濾及傾斜情況通常,我們可以通過(guò)在Flink Web?UI中觀察出現(xiàn)紅色的Vertex節(jié)點(diǎn)及其上下游,重點(diǎn)需要關(guān)注的指標(biāo)是Out Queue的占用率,當(dāng)Out Queue占用率高表示該節(jié)點(diǎn)的下游節(jié)點(diǎn)消費(fèi)能力不足,需要重點(diǎn)調(diào)解該下游節(jié)點(diǎn)的計(jì)算資源(已貢獻(xiàn)社區(qū))。
如果是老的Flink版本,可以先在 Flink web ui 中,定位到具體的算子之后,查看?BackPressure
?模塊,通過(guò)顏色和數(shù)值來(lái)判斷任務(wù)的繁忙和反壓情況(若顏色為紅色,表示當(dāng)前算子繁忙,有反壓的情況;若顏色為綠色,標(biāo)識(shí)當(dāng)前算子不繁忙,沒(méi)有反壓)。
如果你看到 subtasks 的狀態(tài)為?OK?表示沒(méi)有反壓。HIGH?表示這個(gè) subtask 被反壓。狀態(tài)用如下定義:
- OK: 0% <= 反壓比例 <= 10%
- LOW: 10% < 反壓比例 <= 50%
- HIGH: 50% < 反壓比例 <= 100%
常見(jiàn)場(chǎng)景及解決思路
場(chǎng)景一、任務(wù)反壓(算子消費(fèi)瓶頸)
典型場(chǎng)景為,一連串的計(jì)算節(jié)點(diǎn)都是紅色,Out Queue都是100%,此時(shí)需要定位到最后一個(gè)Out Queue為100%的算子節(jié)點(diǎn)的下游節(jié)點(diǎn),該節(jié)點(diǎn)的消費(fèi)能力不達(dá)標(biāo),導(dǎo)致上游消息堆積。我們可以對(duì)該算子的資源進(jìn)行調(diào)整,如 適當(dāng)調(diào)大并發(fā)度,對(duì)應(yīng)內(nèi)存可適當(dāng)調(diào)小,如果是窗口聚合節(jié)點(diǎn)則可以調(diào)大內(nèi)存(在開(kāi)窗場(chǎng)景下,window數(shù)據(jù)計(jì)算節(jié)點(diǎn)需要緩存窗口大小時(shí)長(zhǎng)的數(shù)據(jù),并在checkpoint時(shí)需要將窗口的中間狀態(tài)存儲(chǔ),因此需要增加窗口計(jì)算節(jié)點(diǎn)的堆內(nèi)存)
場(chǎng)景二、任務(wù)無(wú)反壓,但延遲高(source端瓶頸)
這種情況表現(xiàn)為,整體沒(méi)有出現(xiàn)明顯反壓,即所有計(jì)算節(jié)點(diǎn)的Out Queue都不高。
這種情況的出現(xiàn),有可能是上游源頭節(jié)點(diǎn)的并發(fā)度不夠,如kafka的topic有三個(gè)分區(qū),消費(fèi)的時(shí)候,只開(kāi)了一個(gè)并發(fā),通常建議消費(fèi)并發(fā)數(shù)和topic的分區(qū)一致。
如果增加source的并發(fā)度之后,延遲沒(méi)有下降,則可能是在任務(wù)源頭節(jié)點(diǎn)包含復(fù)雜計(jì)算,且該算子和源頭并發(fā)一致,出現(xiàn)了合并任務(wù)鏈(operater chain),此時(shí)可以考慮將source算子單獨(dú)剝離出來(lái),即調(diào)整source下游算子的并發(fā)度,解除合并任務(wù)鏈。
場(chǎng)景三、任務(wù)異常(內(nèi)存超用)
實(shí)時(shí)任務(wù)異常Failover的情況下,我們需要關(guān)注任務(wù)是否因?yàn)槟硞€(gè)TaskManager內(nèi)存超用被kill的情況,如果發(fā)現(xiàn)異常日志中記錄了:
"org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'null'. This might indicate that the remote task manager was lost"
則普遍情況是因?yàn)閮?nèi)存超用,我們需要根據(jù)異常信息中提示的任務(wù)節(jié)點(diǎn),調(diào)整執(zhí)行計(jì)劃中對(duì)應(yīng)節(jié)點(diǎn)的內(nèi)存配置,具體可在WebUI中查看Exceptions模塊中查看,其中Root Exception里面記錄了最新一次發(fā)生的異常棧,Exception History中記錄的是任務(wù)運(yùn)行過(guò)程中所有發(fā)生的異常,以及每次異常的計(jì)算節(jié)點(diǎn)是哪些。
場(chǎng)景四、GroupBy
針對(duì)group by場(chǎng)景,可以通過(guò)配置minibatch,來(lái)提升吞吐,降低狀態(tài)的訪問(wèn),減少對(duì)下游的輸出壓力。
在Stram SQL純流模式下,每進(jìn)來(lái)一條數(shù)據(jù)都會(huì)去操作state,IO消耗較大,設(shè)置minibatch后,同一個(gè)key的一批數(shù)據(jù)只訪問(wèn)一次state,且只輸出最新的一條數(shù)據(jù),即減少了state的訪問(wèn)也減少了向下游的數(shù)據(jù)更新,minibatch的配置如下:
# 1. 表示整個(gè)job允許延遲
blink.miniBatch.allowLatencyMs=5000
# 2. 單個(gè)batch的size
blink.miniBatch.size=1000
場(chǎng)景五、任務(wù)重啟,并設(shè)置重啟時(shí)間(初始時(shí)間)
這種情況一般出現(xiàn)在任務(wù)剛啟動(dòng)時(shí)有非常高的延遲,可能是因?yàn)樵谌蝿?wù)啟動(dòng)時(shí)或重啟時(shí)設(shè)置了一個(gè)比較老的start time,導(dǎo)致任務(wù)從很早的時(shí)間開(kāi)始拉取數(shù)據(jù),會(huì)導(dǎo)致剛開(kāi)始整個(gè)任務(wù)的qps非常高,在監(jiān)控上的表現(xiàn)為一開(kāi)始有很高的延遲,隨后緩慢下降直到正常水平,若沒(méi)有下降則可以適當(dāng)增加資源,一般來(lái)說(shuō)這種情況不需要特殊處理,可以根據(jù)實(shí)際需求來(lái)判斷是否需要調(diào)整start time為當(dāng)前時(shí)間。
場(chǎng)景六、Time Interval Join 代替 雙流Join
建議在雙流join的時(shí)候,使用時(shí)間窗口join,而不是雙流join。
默認(rèn)情況下雙流join會(huì)將兩條流的數(shù)據(jù)都緩存到狀態(tài)中,默認(rèn)狀態(tài)存儲(chǔ)時(shí)長(zhǎng)為1.5天,狀態(tài)太大會(huì)導(dǎo)致join算子性能低下。
而實(shí)際上大部分場(chǎng)景,join都是由時(shí)效性要求的,比如商品曝光1分鐘引導(dǎo)的點(diǎn)擊,其業(yè)務(wù)上隱含了數(shù)據(jù)的時(shí)效性關(guān)聯(lián)條件,當(dāng)數(shù)據(jù)失效后,它的狀態(tài)是可以清理掉釋放資源。
?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-560254.html
總結(jié)
- 判斷是否出現(xiàn)反壓,在反壓節(jié)點(diǎn)定位算子,增加并發(fā)或調(diào)整cpu資源;
- 若無(wú)明顯反壓,則可能是source端瓶頸,可以提升并發(fā)度,盡量和source源的分區(qū)數(shù)量一致,另外可以查看是否是因?yàn)閟ource數(shù)據(jù)處理的算子邏輯太復(fù)雜,且和讀算子并行一致出現(xiàn)合并任務(wù)鏈(operater chain)的情況,此時(shí)可以調(diào)整該計(jì)算算子的并行度,將source算子剝離出鏈。
- 參數(shù)優(yōu)化,配置minibatch(針對(duì)GroupBy),可提升吞吐,降低狀態(tài)的訪問(wèn)次數(shù),減少對(duì)下游的輸出壓力。
- 雙流join場(chǎng)景中使用Time Interval Join,而不是雙流Join,雙流Join會(huì)把狀態(tài)保持1.5天,非常消耗資源。
- 重置任務(wù)時(shí),根據(jù)實(shí)際需求出發(fā),若默認(rèn)很久以前的數(shù)據(jù)可放棄,則可以調(diào)整start time為較近的時(shí)間。
- 提升batchSize增加讀寫(xiě)IO。
希望本文對(duì)你有幫助,請(qǐng)點(diǎn)個(gè)贊鼓勵(lì)一下作者吧~ 謝謝!文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-560254.html
到了這里,關(guān)于Flink實(shí)時(shí)任務(wù)性能調(diào)優(yōu)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!