架構(gòu)
所有的分布式計(jì)算引擎都需要有集群的資源管理器,例如:可以把MapReduce、Spark程序運(yùn)行在YARN集群中、或者是Mesos中。Flink也是一個(gè)分布式計(jì)算引擎,要運(yùn)行Flink程序,也需要一個(gè)資源管理器。而學(xué)習(xí)每一種分布式計(jì)算引擎,首先需要搞清楚的就是:我們開發(fā)的分布式應(yīng)用程序是如何在集群中執(zhí)行的,這其中一定會涉及到與資源管理器的交互。其實(shí),可以把資源管理看成是一個(gè)cluster的抽象。
我們來看一下Flink集群會涉及到的重要角色。
- client
client將編寫的代碼轉(zhuǎn)換為程序的Dataflow,并對Dataflow進(jìn)行優(yōu)化,生成Dataflow Graph,再將job提交給JobManager。我們編寫的Flink代碼,其實(shí)主要是用來描述Flink程序在集群中應(yīng)該如何執(zhí)行,F(xiàn)link集群當(dāng)然也不是像運(yùn)行編寫的單機(jī)程序一樣,順序往下執(zhí)行。它只會接受一個(gè)一個(gè)的Job,然后運(yùn)行Job中一個(gè)個(gè)的任務(wù)。
- Job Manager
Job Manager其實(shí)是Flink集群的作業(yè)管理器,它負(fù)責(zé)調(diào)度、管理集群的計(jì)算資源。
- Task Manager
一個(gè)集群往往由很多的Task Manager組成,Task Manager負(fù)責(zé)管理、運(yùn)行具體的任務(wù)。Task Manager與Task Manager之間也是能夠互相通信的。
組件 |
用途 |
實(shí)現(xiàn) |
Flink Client |
將批處理或流式應(yīng)用程序編譯成數(shù)據(jù)流圖,然后提交給JobManager。 |
|
JobManager |
Flink系統(tǒng)的管理節(jié)點(diǎn),管理所有的TaskManager,并決策用戶任務(wù)在哪些Taskmanager執(zhí)行。 JobManager的作業(yè)提交模式有三種 Application Mode Per-Job Mode Session Mode |
|
TaskManager |
Flink系統(tǒng)的業(yè)務(wù)執(zhí)行節(jié)點(diǎn),執(zhí)行具體的用戶任務(wù)、Flink作業(yè)。 |
調(diào)度
Flink通過Task Slots來定義執(zhí)行資源。每個(gè)TaskManager有一到多個(gè)task slot,每個(gè)task slot 可以運(yùn)行一條由多個(gè)并行task組成的流水線。 這樣一條pipeline由多個(gè)連續(xù)的task組成。
每個(gè)slot能夠使用的資源是固定的,例如:如果一個(gè)TaskManager上配置了3個(gè)slot,那每個(gè)slot能夠使用的內(nèi)存為TaskManager管理的內(nèi)存的1/3。slot與slot之間并不存在內(nèi)存資源上的競爭。Flink運(yùn)行用戶調(diào)整TaskManager的slot數(shù)量,如果slot數(shù)量為1,那表示每個(gè)任務(wù)都是在獨(dú)立的JVM中執(zhí)行。而如果大于1,表示多個(gè)任務(wù)運(yùn)行在一個(gè)JVM中。
每個(gè)slot運(yùn)行可以運(yùn)行一個(gè)任務(wù)。一個(gè)JOB中如果Operator和并行度比較多,就會包含很多任務(wù),而Flink集群中的默認(rèn)配置,任務(wù)是可以共享Slot的。也就是說,一個(gè)Slot中可以運(yùn)行多個(gè)任務(wù)。
client將Flink代碼解析為JobGraph,并且會將一些子任務(wù)打包到一個(gè)任務(wù)中,每個(gè)任務(wù)運(yùn)行在一個(gè)線程中。每一個(gè)任務(wù)都是運(yùn)行在TaskManager中的Slot中。針對流式處理,F(xiàn)link都會將一個(gè)完整的pipeline放在一個(gè)Slot中。
這樣一個(gè)程序運(yùn)行在一個(gè)有兩個(gè)TaskManager、每個(gè)TaskManager有3個(gè)slot的Flink集群中。Flink并不是基于每個(gè)Operator執(zhí)行實(shí)例來調(diào)度的,而是優(yōu)先會將一個(gè)完整的Pipeline,調(diào)度到一個(gè)slot中。我們看到,針對此處的并行度設(shè)置,有三個(gè)slot中,都調(diào)度了完整的pipeline。
這種方式,可以提高程序運(yùn)行的吞吐量。如果每一個(gè)operator并行度都以獨(dú)立的線程執(zhí)行,那么當(dāng)線程數(shù)量較多時(shí),線程需要不停地切換、緩存,這是會有一定開銷的。
JobManager數(shù)據(jù)結(jié)構(gòu)
在作業(yè)執(zhí)行期間,JobManager會持續(xù)跟蹤各個(gè)task,決定何時(shí)調(diào)度下一個(gè)或一組task,處理已完成的task或執(zhí)行失敗的情況。
JobManager 接收 JobGraph,JobGraph 是數(shù)據(jù)流的表現(xiàn)形式,包括算子(JobVertex)和中間結(jié)果(IntermediateDataSet)。每個(gè)算子都有諸如并行度和執(zhí)行代碼等屬性。
我們編寫的代碼會轉(zhuǎn)換為JobGraph。其實(shí)它也是有向無環(huán)圖。既然是圖結(jié)構(gòu),那就一定會有Vertex(頂點(diǎn))以及Edge(邊)。Flink中的JobGraph頂點(diǎn)就是JobVertex,它其實(shí)就是Flink中的Operator,而JobGraph的邊就是IntermediateDataSet,Operator處理后的中間結(jié)果。
每個(gè)JobVertex都有自己的屬性。例如:并行度、以及Operator要執(zhí)行的代碼。而且,為了確保每個(gè)JobVertex中的代碼能夠正確的在JVM中運(yùn)行,每個(gè)JobGraph還得包含一組庫(一堆的jar包)
而要真正在集群中運(yùn)行Flink程序,需要將JobGraph轉(zhuǎn)換為ExecutionGraph。其實(shí),可以把ExecutionGraph理解為JobGraph的并行版本,或者是JobGraph的并行放大。
ExecutionGraph中的頂點(diǎn)為ExecutionVertex。如果某個(gè)JobVertex的并行度為50,那么在ExecutionGraph中將會有50個(gè)ExecutionVertex(頂點(diǎn))。每個(gè)ExecutionVertex包含了每個(gè)任務(wù)的執(zhí)行狀態(tài)。ExecutionGraph中的邊就是IntermediatePartition。因?yàn)槊總€(gè)并行度頂點(diǎn)對應(yīng)的中間結(jié)果數(shù)據(jù)其實(shí)就是一個(gè)個(gè)的分區(qū)。
作業(yè)狀態(tài)
每個(gè)ExecutionGraph都有一個(gè)與之相關(guān)的作業(yè)狀態(tài)信息,用來描述當(dāng)前的作業(yè)執(zhí)行狀態(tài)。
- 一次完整的執(zhí)行
Flink作業(yè)剛開始會處于一個(gè)created狀態(tài),然后開始調(diào)度運(yùn)行時(shí),切換到running狀態(tài)。在作業(yè)運(yùn)行完后切換到finished狀態(tài)。
- 作業(yè)運(yùn)行出現(xiàn)故障
?如果期間出現(xiàn)故障,作業(yè)首先切換到failing狀態(tài)以便取消所有正在運(yùn)行的task。如果所有job節(jié)點(diǎn)都到達(dá)最終狀態(tài)并且job無法重啟, 那么job 進(jìn)入failed狀態(tài)。
- 作業(yè)重啟
如果作業(yè)運(yùn)行期間出現(xiàn)故障,且作業(yè)可以重新啟動(dòng),則作業(yè)會進(jìn)入重啟restarting狀態(tài),當(dāng)作業(yè)徹底重啟之后會進(jìn)入到created狀態(tài)。
- 用戶手動(dòng)取消作業(yè)
如果用戶手動(dòng)取消作業(yè),它會進(jìn)入到cancelling狀態(tài),并取消所有正在運(yùn)行的 task。當(dāng)所有正在運(yùn)行的task進(jìn)入到最終狀態(tài)的時(shí)候,作業(yè)轉(zhuǎn)換為cancelled狀態(tài)。
- 作業(yè)掛起
Finished、canceled和failed會導(dǎo)致全局的終結(jié)狀態(tài),并且觸發(fā)作業(yè)的清理。跟這些狀態(tài)不同,suspended狀態(tài)只是一個(gè)局部的終結(jié)。局部的終結(jié)意味著作業(yè)的執(zhí)行已經(jīng)被對應(yīng)的JobManager 終結(jié),但是集群中另外的JobManager 依然可以從高可用存儲里獲取作業(yè)信息并重啟。因此一個(gè)處于suspended狀態(tài)的作業(yè)不會被徹底清理掉。
Finished、Canceled、Failed狀態(tài)都是全局終端狀態(tài),這些狀態(tài)會觸發(fā)作業(yè)的清理工作。而掛起suspended狀態(tài)是一種本地終端狀態(tài)。它意味著,如果作業(yè)已經(jīng)在一個(gè)JobManager上是終止的,但如果是HA集群,另一個(gè)JobManager依然可以從HA存儲中檢索到Job,并重新啟動(dòng)。所以Suspended狀態(tài)是不會進(jìn)行Job的完全清理的。
- 任務(wù)的狀態(tài)
在整個(gè)ExecutionGraph執(zhí)行期間,每個(gè)并行task都會經(jīng)歷多個(gè)階段,從created狀態(tài)到finished或failed。下圖展示了各種狀態(tài)以及他們之間的轉(zhuǎn)換關(guān)系。由于一個(gè) task可能會被執(zhí)行多次(比如在異?;謴?fù)時(shí)),ExecutionVertex的執(zhí)行是由Execution來跟蹤的,每個(gè)ExecutionVertex 會記錄當(dāng)前的執(zhí)行,以及之前的執(zhí)行。文章來源:http://www.zghlxwxcb.cn/news/detail-499498.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-499498.html
到了這里,關(guān)于Flink流批一體計(jì)算(3):FLink作業(yè)調(diào)度的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!