Hadoop作為成熟的分布式計算框架在大數(shù)據(jù)生態(tài)領(lǐng)域已經(jīng)使用多年,本文簡要介紹Hadoop的核心組件MapReduce、YARN和HDFS,以加深了解。
1、Hadoop基本介紹
Hadoop是分布式計算框架,主要解決海量數(shù)據(jù)的存儲和計算問題。Hadoop主要組件包括分布式文件系統(tǒng)HDFS、分布式離線并行計算框架MapReduce、作業(yè)調(diào)度與集群資源管理框架YARN。Hadoop生態(tài)系統(tǒng)一系列框架和組件如下:
2、MapReduce計算框架
Hadoop 1.0主要由兩部分組成,分別是分布式文件系統(tǒng)HDFS和分布式計算框架Map/Reduce。其中分布式文件系統(tǒng)主要用于大規(guī)模數(shù)據(jù)的分布式存儲,而MapReduce 則構(gòu)建在分布式文件系統(tǒng)之上,對存儲在分布式文件系統(tǒng)中的數(shù)據(jù)進行分布式計算。MapReduce作為一個分布式運算程序的編程框架,其核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合一個完整的分布式運算程序,并發(fā)布到Hadoop集群上運行。
2.1 MapReduce架構(gòu)
Hadoop MapReduce采用了Master/Slave(M/S)架構(gòu),它主要由以下幾個組件組成:Client、JobTracker、 TaskTracker 和Task。
1)client
用戶自定義的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業(yè)運行狀態(tài)。在Hadoop內(nèi)部用“作業(yè)”(Job)表示MapReduce程序,一個MapReduce程序可對應(yīng)若干個作業(yè),而每個作業(yè)會被分解成若干個Map/Reduce任務(wù)(Task)。
2)JobTracker
JobTracker主要負責資源監(jiān)控和作業(yè)調(diào)度。JobTracker監(jiān)控所有TaskTracker與作業(yè)的健康狀況,一旦發(fā)現(xiàn)失敗情況后,其會將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點;同時,JobTracker會跟蹤任務(wù)的執(zhí)行進度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器,而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)使用這些資源。在Hadoop中,任務(wù)調(diào)度器是一個可插拔的模塊,用戶可以根據(jù)自己的需要設(shè)計相應(yīng)的調(diào)度器。
3)TaskTracker
TaskTracker會周期性地通過Heartbeat將本節(jié)點上資源的使用情況和任務(wù)的運行進度匯報給JobTracker,同時接收JobTracker發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)。TaskTracker使用“slot”等量劃分本節(jié)點上的資源量?!皊lot”代表計算資源(CPU、內(nèi)存等)。一個Task獲取到一個slot后才有機會運行,而Hadoop調(diào)度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot分為Map slot和Reduce slot兩種,分別供MapTask和Reduce Task使用。TaskTracker通過slot數(shù)目(可配置參數(shù))限定Task的并發(fā)度。
4)Task
Task分為Map Task和Reduce Task兩種,均由TaskTracker啟動。從上一小節(jié)中我們知道,HDFS以固定大小的block為基本單位存儲數(shù)據(jù),而對于MapReduce而言,其處理單位是split。split是一個邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長度、數(shù)據(jù)所在節(jié)點等。它的劃分方法完全由用戶自己決定。但需要注意的是,split的多少決定了Map Task的數(shù)目,因為每個split會交由一個Map Task處理。
2.2 MapReduce編程模型
MapReduce適用的應(yīng)用場景具有一個共性:任務(wù)可以被分解為相互獨立的子任務(wù)?;谠撎攸c,MapReduce提供了其分布式編程的方法:
- 迭代(iteration)。遍歷輸入數(shù)據(jù),并將之解析成key/value對
- 將輸入key/value對映射(map)成另外一些key/value對
- 依據(jù)key對中間數(shù)據(jù)進行分組(grouping)
- 以組為單位對數(shù)據(jù)進行歸約(reduce)
- 迭代。將最終產(chǎn)生的key/value對保存到輸出文件中
MapReduce 編程模型主要思想分為InputFormat、Split、Mapper、Shuffle、Reducer 和OutputFormat。
- InputFormat:主要用于描述輸入數(shù)據(jù)的格式,將文件拆分為多個InputSplit,并由RecordReaders將InputSplit轉(zhuǎn)換為標準的<key,value>鍵值對,作為map的輸出;
- Split:對數(shù)據(jù)按行進行粗粒度切分,得到<Key,Value>型數(shù)據(jù)
- Map:細粒度切分,得到<Key,List>型數(shù)據(jù);在環(huán)形緩沖區(qū)內(nèi)對文件進行排序、分區(qū)操作,數(shù)據(jù)量大時會溢寫到磁盤,緩沖區(qū)大小將決定著MR任務(wù)的性能,默認size為100M。此過程可以設(shè)置Combine任務(wù),即將按照相同Key進行初步聚合
- Shuffle:將各個MapTask結(jié)果合并輸出到Reduce,此過程數(shù)據(jù)的輸出是一個Copy過程,此過程涉及到網(wǎng)絡(luò)IO ,是一個耗時的過程,也是一個核心的過程。
- Reduce:對拆開的數(shù)據(jù)碎片進行合并,會涉及到Merge排序。
- OutputFormat:主要用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩籼峁┑膋ey/value對寫入特定格式的文件中
2.3 JobTracker內(nèi)部實現(xiàn)
JobTracker是整個MapReduce計算框架中的主服務(wù),負責整個集群的作業(yè)控制和資源管理。在Hadoop內(nèi)部,每個應(yīng)用程序表示為一個作業(yè),每個作業(yè)又進一步分成多個任務(wù)。JobTracker的作用就是就是負責作業(yè)的分解以及狀態(tài)監(jiān)控,其中狀態(tài)監(jiān)控又包括TaskTracker狀態(tài)監(jiān)控、作業(yè)狀態(tài)監(jiān)控和任務(wù)狀態(tài)監(jiān)控。
2.3.1 狀態(tài)監(jiān)控
狀態(tài)監(jiān)控的一個重要目的是實現(xiàn)容錯功能,借助監(jiān)控信息,JobTracker實現(xiàn)全方位的容錯機制,同時推測出運行緩慢的任務(wù),并啟動任務(wù)加快數(shù)據(jù)處理速度。
- Job和Task運行狀態(tài)監(jiān)控:JobTracker為每個作業(yè)創(chuàng)建一個JobInProgress對象以跟蹤其運行狀態(tài),同時也會將每個作業(yè)拆分成若干個任務(wù),并為每個任務(wù)創(chuàng)建一個TaskInProgress對象以跟蹤和監(jiān)控其運行狀態(tài)。
- JobInProgress:用于監(jiān)控和跟蹤作業(yè)運行狀態(tài),并為調(diào)度器提供最底層的調(diào)度接口。其中維護了兩種作業(yè)信息:靜態(tài)信息和動態(tài)信息,靜態(tài)信息在作業(yè)提交時已經(jīng)確定好、動態(tài)信息隨著作業(yè)的運行而動態(tài)變化。
- TaskInProgress:維護Task運行過程中的全部信息
2.3.2 資源管理
Hadoop資源管理器由兩部分組成:資源表示模型和資源分配模型,其中資源表示模型描述資源的組織方式,在Hadoop上使用slot組織各節(jié)點的資源;資源分配模型則決定如何將資源分配給各個作業(yè),在Hadoop上通過調(diào)度器完成。Hadoop中引入slot概念,將各個節(jié)點上的資源等量的切分成若干份,每一份用一個slot表示。
在MapReduce框架中,由JobTracker實現(xiàn)資源調(diào)度。JobTracker不斷接收各個TaskTracker周期性的發(fā)送過來的資源量和任務(wù)狀態(tài)等信息,并綜合考慮TaskTracker的數(shù)據(jù)分布、資源剩余量、作業(yè)優(yōu)先級和作業(yè)提交時間等因素,為TaskTracker分配最合適的任務(wù)。
- 客戶端提交作業(yè)提交函數(shù)將程序提交到JobTracker端
- JobTracker收到新作業(yè)后,通知任務(wù)調(diào)度器(TaskScheduler)對作業(yè)進行初始化
- 某個TaskScheduler向JobTracker匯報心跳,其中包含剩余的slot數(shù)目和能否接收新任務(wù)等信息
- 如果該TaskScheduler能夠接收新任務(wù),則JobTracker調(diào)用TaskScheduler對外函數(shù)asignTasks為該TaskScheduler分配新任務(wù)
- TaskScheduler按照一定的調(diào)度策略為該TaskScheduler選擇最合適的任務(wù)列表,并將該列表返回給JobTracker
- JobTracker將任務(wù)列表以心跳應(yīng)答的形式返回給對應(yīng)的TaskTracker
- TaskTracker收到心跳應(yīng)答后,發(fā)現(xiàn)有需要啟動的新任務(wù),則直接啟動該任務(wù)
2.4 TaskTrack內(nèi)部實現(xiàn)
TaskTracker是Hadoop集群中運行于各個節(jié)點上的服務(wù),是JobTracker和Tasks之間溝通的橋梁。TaskTracker主要實現(xiàn)兩個功能:
- 匯報心跳:周期性地將所在節(jié)點上的各種信息通過心跳機制匯報給JobTracker,包括節(jié)點健康狀況、資源使用情況、任務(wù)執(zhí)行進度、任務(wù)運行狀態(tài)等
- 執(zhí)行命令:根據(jù)心跳信息和當前作業(yè)的運行情況為該TaskTracker下達命令,包括啟動任務(wù)、提交任務(wù)、kill任務(wù)、kill作業(yè)和重新初始化
2.5 MapReduce運行過程
- 客戶端提交任務(wù)之前,(InputFormat)會根據(jù)配置策略將數(shù)據(jù)劃分為切片(SpiltSize默認為blockSize 128M) ,每個切片都對應(yīng)的提交給一個 MapTask (YARN 負責提交);
- MapTask 執(zhí)行任務(wù),根據(jù)map函數(shù),生成<K,V>對,將結(jié)果輸出到環(huán)形緩存區(qū),然后分區(qū)、排序、溢出;
- Shuffle即將map結(jié)果劃分到多個分區(qū)并分配給了多個reduce任務(wù),此過程即為Shuffle。
- Reduce拷貝map后分區(qū)的數(shù)據(jù)(fetch過程,默認5個線程執(zhí)行拷貝),全部完成后執(zhí)行合并操作。
2.5.1 Map Task
Map Task整體計算流程如圖所示,分為5個階段:
- Read階段:Map Task通過自定義的RecordReader,從輸入InputSplit中解析出一個個key/value
- Map階段:將解析出的key-value交給自定義的map()函數(shù),并產(chǎn)生一系列新的key-value
- Collect階段:在自定義的map()函數(shù)中,當數(shù)據(jù)處理完成后會調(diào)用OutPutCollectior.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,會將生成的key/value分片,并寫入環(huán)形的內(nèi)存緩存區(qū)中
- Spill階段:當寫入環(huán)形緩沖區(qū)的數(shù)據(jù)達到最大值的80%(默認環(huán)形緩沖區(qū)大小100M)會觸發(fā)溢寫操作spill,會將緩沖區(qū)的數(shù)據(jù)先按照partition進行排序再按照key進行排序,并將數(shù)據(jù)寫入到磁盤中。
- Combine階段:當所有數(shù)據(jù)處理完畢后,Map Task將所有臨時文件進行一次合并,最終只會生成一個文件,并為這個文件提供一個索引文件記錄每個key對應(yīng)數(shù)據(jù)的偏移量
2.5.2 Reduce Task
Reduce Task整體計算流程如圖所示,共分為5個階段:
- Shuffle階段:也稱為copy階段,Reduce Task從各個map task上遠程拷貝一份數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中
- Merge階段:在遠程拷貝數(shù)據(jù)的同時,Reduce Task啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,以防止內(nèi)存使用過多或磁盤上文件過多
- Sort階段:把分散的數(shù)據(jù)文件再次合并成一個大文件,再進行一次歸并排序
- Reduce階段:將每組數(shù)據(jù)依次提交給自定義的reduce()函數(shù)處理
- Write階段:將計算結(jié)果寫到HDFS中
2.5.3 Shuffle和Merge階段分析
在Reduce Task中,Shuffle階段和Merge階段是并行進行的,可劃分為三個子階段。
1) 準備運行完成的Map Task列表
GetMapEventThread線程周期性的通過RPC從TaskTracker獲取已完成的Map Task列表,并保存到映射表mapLocations中,其中保存了TaskTracker Host與已完成任務(wù)列表的映射關(guān)系。為防止出現(xiàn)網(wǎng)絡(luò)熱點,Reduce Task通過對所有TaskTracker Host進行混洗操作以打亂數(shù)據(jù)拷貝順序,并將調(diào)整后 的Map Task輸出數(shù)據(jù)位置保存到scheduledCopies中。
2) 遠程拷貝數(shù)據(jù)
Reduce Task同時啟動多個MapOutoutCopier線程,這些線程從scheduledCopies列表中獲取Map Task輸出位置,并通過HTTP get遠程拷貝數(shù)據(jù)。對于獲取的數(shù)據(jù)分片,如果大小超過一定的閾值,則存放在磁盤上,否則直接放到內(nèi)存中。
3) 合并內(nèi)存文件和磁盤文件
為了防止內(nèi)存或者磁盤上的文件數(shù)據(jù)過多,Reduce Task啟動了LocalFSMerger和InMemFSMergeThread兩個線程分別對內(nèi)存和磁盤上的文件進行合并。
3、YARN資源調(diào)度
YARN(Yet Another Resource Negotiator)是Hadoop 2.0中的資源管理系統(tǒng),基本設(shè)計思想是將Hadoop 1.0中的JobTracker拆分成了兩個獨立的服務(wù):一個全局的資源管理器ResourceManager和每個應(yīng)用程序特有的ApplicationMaster。其中ResourceManager負責整個系統(tǒng)的資源管理和分配,而ApplicationMaster負責單個應(yīng)用程序的管理。
3.1 YARN基本組成結(jié)構(gòu)
YARN總體上仍然是Master/Slave架構(gòu),其中Resource Manager為master負責對各個NodeManager上的資源進行統(tǒng)一管理和調(diào)度、Node Manager為slave是每個節(jié)點上的資源和任務(wù)管理器。當用戶提交一個應(yīng)用程序時,需要提供一個用以跟蹤和管理這個程序的ApplicationMaster,它負責向ResourceManager申請資源,并要求NodeManger啟動可以占用一定資源的任務(wù)。由于不同的ApplicationMaster被分布到不同的節(jié)點上,因此它們之間不會相互影響。如圖中所示,YARN主要是由Resource Manager、NodeManager和ApplicationMaster組成:
1)Resource Manager
RM是全局的資源管理器,負責整個系統(tǒng)的資源管理和分片,主要又兩個組件組成:調(diào)度器和應(yīng)用程序管理器
- 調(diào)度器:根據(jù)容量、隊列等限制條件,將系統(tǒng)中的資源分配給各個正在運行的應(yīng)用程序
- 應(yīng)用程序管理器:負責管理整個系統(tǒng)中所有的應(yīng)用程序,包括應(yīng)用程序提交、與調(diào)度器協(xié)商資源以啟動ApplicationMaster、監(jiān)控ApplicationMaster運行狀態(tài)并在失敗時重啟等
2)Application Master
用戶提交的每個應(yīng)用程序均包含1個AM,主要功能包括
- 與ResourceManager調(diào)度器協(xié)商以獲取資源
- 將得到的任務(wù)進一步分配給內(nèi)部的任務(wù)
- 與NodeManager通信以啟動/停止任務(wù)
- 監(jiān)控所有任務(wù)運行狀態(tài)并在任務(wù)運行失敗時重新為任務(wù)申請資源以重啟任務(wù)等
3)NodeManager
NM是每個節(jié)點上的資源和任務(wù)管理器,它不時向RM匯報本節(jié)點上的資源使用情況和各個Container的運行狀態(tài),同時也會接收并處理來自AM的啟動或停止請求。
4)Container
Container是YARN中的資源抽象,封裝了某個節(jié)點上的多維度資源,如內(nèi)存、CPU、磁盤、網(wǎng)絡(luò)等,當ApplicationMaster向ResourceManager申請資源時,返回的資源便是用Container表示的。YARN會為每個任務(wù)分配一個Container,且該任務(wù)只能使用該Container中描述的資源。
3.2 YARN工作流程
當用戶向YARN提交一個應(yīng)用程序后,YARN分兩個階段運行該應(yīng)用程序:第一階段是啟動ApplicationMaster;第二階段是由ApplicationMaster創(chuàng)建應(yīng)用程序,為它申請資源并監(jiān)控整個運行過程,直至運行完成。
YARN工作流程如下:
- 用戶向YARN中提交應(yīng)用程序,其中包括用戶程序、ApplicationMaster程序、ApplicationMaster啟動命令等。
- ResourceManager為應(yīng)用程序分配第一個Container,并與對應(yīng)的NodeManager通信,要求它在這個Container中啟動應(yīng)用程序的ApplicationMaster。
- ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應(yīng)用程序的運行狀態(tài),然后ApplicationMaster為各個任務(wù)申請資源,并監(jiān)控它們的運行狀態(tài),直到運行結(jié)束,即重復(fù)步驟4-7。
- ApplicationMaster采用輪詢的方式通過RPC協(xié)議向ResourceManager申請和領(lǐng)取資源。
- 一旦ApplicationMaster成功申請到資源,便開始與對應(yīng)的NodeManager通信,要求它啟動任務(wù)。
- NodeManager為任務(wù)設(shè)置好運行環(huán)境后,將任務(wù)啟動命令寫到一個腳本中,并通過運行該腳本啟動任務(wù)。
- 各個任務(wù)通過某個RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進度,使ApplicationMaster能夠隨時掌握各個任務(wù)的運行狀態(tài),從而可以在任務(wù)失敗時重新啟動任務(wù)。在應(yīng)用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應(yīng)用程序的當前運行狀態(tài)。
- 應(yīng)用程序運行完成后,ApplicationMaster通過RPC協(xié)議向ResourceManager注銷并關(guān)閉自己。
4、HDFS分布式文件系統(tǒng)
HDFS全稱Hadoop Distribute File System,是分布式計算中數(shù)據(jù)存儲管理的基礎(chǔ),解決了海量數(shù)據(jù)的存儲問題,具有高容錯性、可擴展性,適合大數(shù)據(jù)量處理和流式批處理,可運行于廉價的服務(wù)器上。傳統(tǒng)的HDFS架構(gòu)采用master/slave架構(gòu),一個HDFS集群是由一個Namenode和一定數(shù)目的Datanodes組成。
- NameNode是master,它是一個中心服務(wù)器,是這個集群的管理者,負責管理HDFS的命名空間(NameSpace)、配置副本策略和數(shù)據(jù)塊(Block)映射信息,同時也會處理客戶端讀寫請求。
- DataNode是slave,NameNode下達命令,DataNode執(zhí)行實際的操作。集群中的DataNode負責管理所在節(jié)點上的存儲,主要負責存儲實際的數(shù)據(jù)塊和執(zhí)行數(shù)據(jù)塊的讀/寫操作。
- Client負責與用戶交互,同時可以執(zhí)行以下操作
- 文件切分,文件上傳HDFS的時候,Client將文件切分成一個一個的Block,然后進行存儲。
- 與NameNode交互,獲取文件的位置信息
- 與DataNode交互,讀取或者寫入數(shù)據(jù)
- Client提供一些命令來管理HDFS,比如啟動或者關(guān)閉HDFS
- Client可以通過一些命令來訪問HDFS
- Secondary NameNode:并非NameNode的熱備。當NameNode掛掉的時候,它并不能馬上替換NameNode并提供服務(wù)
- 輔助NameNode,分擔其工作量。
- 定期合并fsimage和fsedits,并推送給NameNode。
- 在緊急情況下,可輔助恢復(fù)NameNode
備注:HDFS分布式文件系統(tǒng)有關(guān)知識在“分布式文件系統(tǒng)HDFS概述”中詳細介紹了,這里只做簡單的描述。
5、總結(jié)
本文主要介紹了分布式計算框架Hadoop的三大組件:MapReduce、YARN和HDFS。MapReduce是整個分布式計算的基礎(chǔ),完成分布式任務(wù)的數(shù)據(jù)拆分、歸并排序和匯總工作;YARN是通用的資源調(diào)度框架,不僅僅局限于MapReduce,也支持Spark、Storm、Flume等在線和流式計算框架;HDFS作為分布式文件系統(tǒng),支持大數(shù)據(jù)量的數(shù)據(jù)存儲、高吞吐訪問和橫向擴展。文章來源:http://www.zghlxwxcb.cn/news/detail-455248.html
參考資料:文章來源地址http://www.zghlxwxcb.cn/news/detail-455248.html
- 《Hadoop技術(shù)內(nèi)幕 深入理解MapReduce架構(gòu)設(shè)計與實現(xiàn)原理》,董西成著
- 《Hadoop技術(shù)內(nèi)幕:深入解析YARN架構(gòu)設(shè)計與實現(xiàn)原理》,董西成著
- 分布式文件系統(tǒng)HDFS概述
- https://blog.csdn.net/qq_41858402/article/details/108207777
- https://blog.csdn.net/qq_40589204/article/details/118160989
- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html
Hadoop作為成熟的分布式計算框架在大數(shù)據(jù)生態(tài)領(lǐng)域已經(jīng)使用多年,本文簡要介紹Hadoop的核心組件MapReduce、YARN和HDFS,以加深了解。
1、Hadoop基本介紹
Hadoop是分布式計算框架,主要解決海量數(shù)據(jù)的存儲和計算問題。Hadoop主要組件包括分布式文件系統(tǒng)HDFS、分布式離線并行計算框架MapReduce、作業(yè)調(diào)度與集群資源管理框架YARN。Hadoop生態(tài)系統(tǒng)一系列框架和組件如下:
2、MapReduce計算框架
Hadoop 1.0主要由兩部分組成,分別是分布式文件系統(tǒng)HDFS和分布式計算框架Map/Reduce。其中分布式文件系統(tǒng)主要用于大規(guī)模數(shù)據(jù)的分布式存儲,而MapReduce 則構(gòu)建在分布式文件系統(tǒng)之上,對存儲在分布式文件系統(tǒng)中的數(shù)據(jù)進行分布式計算。MapReduce作為一個分布式運算程序的編程框架,其核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合一個完整的分布式運算程序,并發(fā)布到Hadoop集群上運行。
2.1 MapReduce架構(gòu)
Hadoop MapReduce采用了Master/Slave(M/S)架構(gòu),它主要由以下幾個組件組成:Client、JobTracker、 TaskTracker 和Task。
1)client
用戶自定義的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業(yè)運行狀態(tài)。在Hadoop內(nèi)部用“作業(yè)”(Job)表示MapReduce程序,一個MapReduce程序可對應(yīng)若干個作業(yè),而每個作業(yè)會被分解成若干個Map/Reduce任務(wù)(Task)。
2)JobTracker
JobTracker主要負責資源監(jiān)控和作業(yè)調(diào)度。JobTracker監(jiān)控所有TaskTracker與作業(yè)的健康狀況,一旦發(fā)現(xiàn)失敗情況后,其會將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點;同時,JobTracker會跟蹤任務(wù)的執(zhí)行進度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器,而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)使用這些資源。在Hadoop中,任務(wù)調(diào)度器是一個可插拔的模塊,用戶可以根據(jù)自己的需要設(shè)計相應(yīng)的調(diào)度器。
3)TaskTracker
TaskTracker會周期性地通過Heartbeat將本節(jié)點上資源的使用情況和任務(wù)的運行進度匯報給JobTracker,同時接收JobTracker發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)。TaskTracker使用“slot”等量劃分本節(jié)點上的資源量。“slot”代表計算資源(CPU、內(nèi)存等)。一個Task獲取到一個slot后才有機會運行,而Hadoop調(diào)度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot分為Map slot和Reduce slot兩種,分別供MapTask和Reduce Task使用。TaskTracker通過slot數(shù)目(可配置參數(shù))限定Task的并發(fā)度。
4)Task
Task分為Map Task和Reduce Task兩種,均由TaskTracker啟動。從上一小節(jié)中我們知道,HDFS以固定大小的block為基本單位存儲數(shù)據(jù),而對于MapReduce而言,其處理單位是split。split是一個邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長度、數(shù)據(jù)所在節(jié)點等。它的劃分方法完全由用戶自己決定。但需要注意的是,split的多少決定了Map Task的數(shù)目,因為每個split會交由一個Map Task處理。
2.2 MapReduce編程模型
MapReduce適用的應(yīng)用場景具有一個共性:任務(wù)可以被分解為相互獨立的子任務(wù)?;谠撎攸c,MapReduce提供了其分布式編程的方法:
- 迭代(iteration)。遍歷輸入數(shù)據(jù),并將之解析成key/value對
- 將輸入key/value對映射(map)成另外一些key/value對
- 依據(jù)key對中間數(shù)據(jù)進行分組(grouping)
- 以組為單位對數(shù)據(jù)進行歸約(reduce)
- 迭代。將最終產(chǎn)生的key/value對保存到輸出文件中
MapReduce 編程模型主要思想分為InputFormat、Split、Mapper、Shuffle、Reducer 和OutputFormat。
- InputFormat:主要用于描述輸入數(shù)據(jù)的格式,將文件拆分為多個InputSplit,并由RecordReaders將InputSplit轉(zhuǎn)換為標準的<key,value>鍵值對,作為map的輸出;
- Split:對數(shù)據(jù)按行進行粗粒度切分,得到<Key,Value>型數(shù)據(jù)
- Map:細粒度切分,得到<Key,List>型數(shù)據(jù);在環(huán)形緩沖區(qū)內(nèi)對文件進行排序、分區(qū)操作,數(shù)據(jù)量大時會溢寫到磁盤,緩沖區(qū)大小將決定著MR任務(wù)的性能,默認size為100M。此過程可以設(shè)置Combine任務(wù),即將按照相同Key進行初步聚合
- Shuffle:將各個MapTask結(jié)果合并輸出到Reduce,此過程數(shù)據(jù)的輸出是一個Copy過程,此過程涉及到網(wǎng)絡(luò)IO ,是一個耗時的過程,也是一個核心的過程。
- Reduce:對拆開的數(shù)據(jù)碎片進行合并,會涉及到Merge排序。
- OutputFormat:主要用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩籼峁┑膋ey/value對寫入特定格式的文件中
2.3 JobTracker內(nèi)部實現(xiàn)
JobTracker是整個MapReduce計算框架中的主服務(wù),負責整個集群的作業(yè)控制和資源管理。在Hadoop內(nèi)部,每個應(yīng)用程序表示為一個作業(yè),每個作業(yè)又進一步分成多個任務(wù)。JobTracker的作用就是就是負責作業(yè)的分解以及狀態(tài)監(jiān)控,其中狀態(tài)監(jiān)控又包括TaskTracker狀態(tài)監(jiān)控、作業(yè)狀態(tài)監(jiān)控和任務(wù)狀態(tài)監(jiān)控。
2.3.1 狀態(tài)監(jiān)控
狀態(tài)監(jiān)控的一個重要目的是實現(xiàn)容錯功能,借助監(jiān)控信息,JobTracker實現(xiàn)全方位的容錯機制,同時推測出運行緩慢的任務(wù),并啟動任務(wù)加快數(shù)據(jù)處理速度。
- Job和Task運行狀態(tài)監(jiān)控:JobTracker為每個作業(yè)創(chuàng)建一個JobInProgress對象以跟蹤其運行狀態(tài),同時也會將每個作業(yè)拆分成若干個任務(wù),并為每個任務(wù)創(chuàng)建一個TaskInProgress對象以跟蹤和監(jiān)控其運行狀態(tài)。
- JobInProgress:用于監(jiān)控和跟蹤作業(yè)運行狀態(tài),并為調(diào)度器提供最底層的調(diào)度接口。其中維護了兩種作業(yè)信息:靜態(tài)信息和動態(tài)信息,靜態(tài)信息在作業(yè)提交時已經(jīng)確定好、動態(tài)信息隨著作業(yè)的運行而動態(tài)變化。
- TaskInProgress:維護Task運行過程中的全部信息
2.3.2 資源管理
Hadoop資源管理器由兩部分組成:資源表示模型和資源分配模型,其中資源表示模型描述資源的組織方式,在Hadoop上使用slot組織各節(jié)點的資源;資源分配模型則決定如何將資源分配給各個作業(yè),在Hadoop上通過調(diào)度器完成。Hadoop中引入slot概念,將各個節(jié)點上的資源等量的切分成若干份,每一份用一個slot表示。
在MapReduce框架中,由JobTracker實現(xiàn)資源調(diào)度。JobTracker不斷接收各個TaskTracker周期性的發(fā)送過來的資源量和任務(wù)狀態(tài)等信息,并綜合考慮TaskTracker的數(shù)據(jù)分布、資源剩余量、作業(yè)優(yōu)先級和作業(yè)提交時間等因素,為TaskTracker分配最合適的任務(wù)。
- 客戶端提交作業(yè)提交函數(shù)將程序提交到JobTracker端
- JobTracker收到新作業(yè)后,通知任務(wù)調(diào)度器(TaskScheduler)對作業(yè)進行初始化
- 某個TaskScheduler向JobTracker匯報心跳,其中包含剩余的slot數(shù)目和能否接收新任務(wù)等信息
- 如果該TaskScheduler能夠接收新任務(wù),則JobTracker調(diào)用TaskScheduler對外函數(shù)asignTasks為該TaskScheduler分配新任務(wù)
- TaskScheduler按照一定的調(diào)度策略為該TaskScheduler選擇最合適的任務(wù)列表,并將該列表返回給JobTracker
- JobTracker將任務(wù)列表以心跳應(yīng)答的形式返回給對應(yīng)的TaskTracker
- TaskTracker收到心跳應(yīng)答后,發(fā)現(xiàn)有需要啟動的新任務(wù),則直接啟動該任務(wù)
2.4 TaskTrack內(nèi)部實現(xiàn)
TaskTracker是Hadoop集群中運行于各個節(jié)點上的服務(wù),是JobTracker和Tasks之間溝通的橋梁。TaskTracker主要實現(xiàn)兩個功能:
- 匯報心跳:周期性地將所在節(jié)點上的各種信息通過心跳機制匯報給JobTracker,包括節(jié)點健康狀況、資源使用情況、任務(wù)執(zhí)行進度、任務(wù)運行狀態(tài)等
- 執(zhí)行命令:根據(jù)心跳信息和當前作業(yè)的運行情況為該TaskTracker下達命令,包括啟動任務(wù)、提交任務(wù)、kill任務(wù)、kill作業(yè)和重新初始化
2.5 MapReduce運行過程
- 客戶端提交任務(wù)之前,(InputFormat)會根據(jù)配置策略將數(shù)據(jù)劃分為切片(SpiltSize默認為blockSize 128M) ,每個切片都對應(yīng)的提交給一個 MapTask (YARN 負責提交);
- MapTask 執(zhí)行任務(wù),根據(jù)map函數(shù),生成<K,V>對,將結(jié)果輸出到環(huán)形緩存區(qū),然后分區(qū)、排序、溢出;
- Shuffle即將map結(jié)果劃分到多個分區(qū)并分配給了多個reduce任務(wù),此過程即為Shuffle。
- Reduce拷貝map后分區(qū)的數(shù)據(jù)(fetch過程,默認5個線程執(zhí)行拷貝),全部完成后執(zhí)行合并操作。
2.5.1 Map Task
Map Task整體計算流程如圖所示,分為5個階段:
- Read階段:Map Task通過自定義的RecordReader,從輸入InputSplit中解析出一個個key/value
- Map階段:將解析出的key-value交給自定義的map()函數(shù),并產(chǎn)生一系列新的key-value
- Collect階段:在自定義的map()函數(shù)中,當數(shù)據(jù)處理完成后會調(diào)用OutPutCollectior.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,會將生成的key/value分片,并寫入環(huán)形的內(nèi)存緩存區(qū)中
- Spill階段:當寫入環(huán)形緩沖區(qū)的數(shù)據(jù)達到最大值的80%(默認環(huán)形緩沖區(qū)大小100M)會觸發(fā)溢寫操作spill,會將緩沖區(qū)的數(shù)據(jù)先按照partition進行排序再按照key進行排序,并將數(shù)據(jù)寫入到磁盤中。
- Combine階段:當所有數(shù)據(jù)處理完畢后,Map Task將所有臨時文件進行一次合并,最終只會生成一個文件,并為這個文件提供一個索引文件記錄每個key對應(yīng)數(shù)據(jù)的偏移量
2.5.2 Reduce Task
Reduce Task整體計算流程如圖所示,共分為5個階段:
- Shuffle階段:也稱為copy階段,Reduce Task從各個map task上遠程拷貝一份數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中
- Merge階段:在遠程拷貝數(shù)據(jù)的同時,Reduce Task啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,以防止內(nèi)存使用過多或磁盤上文件過多
- Sort階段:把分散的數(shù)據(jù)文件再次合并成一個大文件,再進行一次歸并排序
- Reduce階段:將每組數(shù)據(jù)依次提交給自定義的reduce()函數(shù)處理
- Write階段:將計算結(jié)果寫到HDFS中
2.5.3 Shuffle和Merge階段分析
在Reduce Task中,Shuffle階段和Merge階段是并行進行的,可劃分為三個子階段。
1) 準備運行完成的Map Task列表
GetMapEventThread線程周期性的通過RPC從TaskTracker獲取已完成的Map Task列表,并保存到映射表mapLocations中,其中保存了TaskTracker Host與已完成任務(wù)列表的映射關(guān)系。為防止出現(xiàn)網(wǎng)絡(luò)熱點,Reduce Task通過對所有TaskTracker Host進行混洗操作以打亂數(shù)據(jù)拷貝順序,并將調(diào)整后 的Map Task輸出數(shù)據(jù)位置保存到scheduledCopies中。
2) 遠程拷貝數(shù)據(jù)
Reduce Task同時啟動多個MapOutoutCopier線程,這些線程從scheduledCopies列表中獲取Map Task輸出位置,并通過HTTP get遠程拷貝數(shù)據(jù)。對于獲取的數(shù)據(jù)分片,如果大小超過一定的閾值,則存放在磁盤上,否則直接放到內(nèi)存中。
3) 合并內(nèi)存文件和磁盤文件
為了防止內(nèi)存或者磁盤上的文件數(shù)據(jù)過多,Reduce Task啟動了LocalFSMerger和InMemFSMergeThread兩個線程分別對內(nèi)存和磁盤上的文件進行合并。
3、YARN資源調(diào)度
YARN(Yet Another Resource Negotiator)是Hadoop 2.0中的資源管理系統(tǒng),基本設(shè)計思想是將Hadoop 1.0中的JobTracker拆分成了兩個獨立的服務(wù):一個全局的資源管理器ResourceManager和每個應(yīng)用程序特有的ApplicationMaster。其中ResourceManager負責整個系統(tǒng)的資源管理和分配,而ApplicationMaster負責單個應(yīng)用程序的管理。
3.1 YARN基本組成結(jié)構(gòu)
YARN總體上仍然是Master/Slave架構(gòu),其中Resource Manager為master負責對各個NodeManager上的資源進行統(tǒng)一管理和調(diào)度、Node Manager為slave是每個節(jié)點上的資源和任務(wù)管理器。當用戶提交一個應(yīng)用程序時,需要提供一個用以跟蹤和管理這個程序的ApplicationMaster,它負責向ResourceManager申請資源,并要求NodeManger啟動可以占用一定資源的任務(wù)。由于不同的ApplicationMaster被分布到不同的節(jié)點上,因此它們之間不會相互影響。如圖中所示,YARN主要是由Resource Manager、NodeManager和ApplicationMaster組成:
1)Resource Manager
RM是全局的資源管理器,負責整個系統(tǒng)的資源管理和分片,主要又兩個組件組成:調(diào)度器和應(yīng)用程序管理器
- 調(diào)度器:根據(jù)容量、隊列等限制條件,將系統(tǒng)中的資源分配給各個正在運行的應(yīng)用程序
- 應(yīng)用程序管理器:負責管理整個系統(tǒng)中所有的應(yīng)用程序,包括應(yīng)用程序提交、與調(diào)度器協(xié)商資源以啟動ApplicationMaster、監(jiān)控ApplicationMaster運行狀態(tài)并在失敗時重啟等
2)Application Master
用戶提交的每個應(yīng)用程序均包含1個AM,主要功能包括
- 與ResourceManager調(diào)度器協(xié)商以獲取資源
- 將得到的任務(wù)進一步分配給內(nèi)部的任務(wù)
- 與NodeManager通信以啟動/停止任務(wù)
- 監(jiān)控所有任務(wù)運行狀態(tài)并在任務(wù)運行失敗時重新為任務(wù)申請資源以重啟任務(wù)等
3)NodeManager
NM是每個節(jié)點上的資源和任務(wù)管理器,它不時向RM匯報本節(jié)點上的資源使用情況和各個Container的運行狀態(tài),同時也會接收并處理來自AM的啟動或停止請求。
4)Container
Container是YARN中的資源抽象,封裝了某個節(jié)點上的多維度資源,如內(nèi)存、CPU、磁盤、網(wǎng)絡(luò)等,當ApplicationMaster向ResourceManager申請資源時,返回的資源便是用Container表示的。YARN會為每個任務(wù)分配一個Container,且該任務(wù)只能使用該Container中描述的資源。
3.2 YARN工作流程
當用戶向YARN提交一個應(yīng)用程序后,YARN分兩個階段運行該應(yīng)用程序:第一階段是啟動ApplicationMaster;第二階段是由ApplicationMaster創(chuàng)建應(yīng)用程序,為它申請資源并監(jiān)控整個運行過程,直至運行完成。
YARN工作流程如下:
- 用戶向YARN中提交應(yīng)用程序,其中包括用戶程序、ApplicationMaster程序、ApplicationMaster啟動命令等。
- ResourceManager為應(yīng)用程序分配第一個Container,并與對應(yīng)的NodeManager通信,要求它在這個Container中啟動應(yīng)用程序的ApplicationMaster。
- ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應(yīng)用程序的運行狀態(tài),然后ApplicationMaster為各個任務(wù)申請資源,并監(jiān)控它們的運行狀態(tài),直到運行結(jié)束,即重復(fù)步驟4-7。
- ApplicationMaster采用輪詢的方式通過RPC協(xié)議向ResourceManager申請和領(lǐng)取資源。
- 一旦ApplicationMaster成功申請到資源,便開始與對應(yīng)的NodeManager通信,要求它啟動任務(wù)。
- NodeManager為任務(wù)設(shè)置好運行環(huán)境后,將任務(wù)啟動命令寫到一個腳本中,并通過運行該腳本啟動任務(wù)。
- 各個任務(wù)通過某個RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進度,使ApplicationMaster能夠隨時掌握各個任務(wù)的運行狀態(tài),從而可以在任務(wù)失敗時重新啟動任務(wù)。在應(yīng)用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應(yīng)用程序的當前運行狀態(tài)。
- 應(yīng)用程序運行完成后,ApplicationMaster通過RPC協(xié)議向ResourceManager注銷并關(guān)閉自己。
4、HDFS分布式文件系統(tǒng)
HDFS全稱Hadoop Distribute File System,是分布式計算中數(shù)據(jù)存儲管理的基礎(chǔ),解決了海量數(shù)據(jù)的存儲問題,具有高容錯性、可擴展性,適合大數(shù)據(jù)量處理和流式批處理,可運行于廉價的服務(wù)器上。傳統(tǒng)的HDFS架構(gòu)采用master/slave架構(gòu),一個HDFS集群是由一個Namenode和一定數(shù)目的Datanodes組成。
- NameNode是master,它是一個中心服務(wù)器,是這個集群的管理者,負責管理HDFS的命名空間(NameSpace)、配置副本策略和數(shù)據(jù)塊(Block)映射信息,同時也會處理客戶端讀寫請求。
- DataNode是slave,NameNode下達命令,DataNode執(zhí)行實際的操作。集群中的DataNode負責管理所在節(jié)點上的存儲,主要負責存儲實際的數(shù)據(jù)塊和執(zhí)行數(shù)據(jù)塊的讀/寫操作。
- Client負責與用戶交互,同時可以執(zhí)行以下操作
- 文件切分,文件上傳HDFS的時候,Client將文件切分成一個一個的Block,然后進行存儲。
- 與NameNode交互,獲取文件的位置信息
- 與DataNode交互,讀取或者寫入數(shù)據(jù)
- Client提供一些命令來管理HDFS,比如啟動或者關(guān)閉HDFS
- Client可以通過一些命令來訪問HDFS
- Secondary NameNode:并非NameNode的熱備。當NameNode掛掉的時候,它并不能馬上替換NameNode并提供服務(wù)
- 輔助NameNode,分擔其工作量。
- 定期合并fsimage和fsedits,并推送給NameNode。
- 在緊急情況下,可輔助恢復(fù)NameNode
備注:HDFS分布式文件系統(tǒng)有關(guān)知識在“分布式文件系統(tǒng)HDFS概述”中詳細介紹了,這里只做簡單的描述。
5、總結(jié)
本文主要介紹了分布式計算框架Hadoop的三大組件:MapReduce、YARN和HDFS。MapReduce是整個分布式計算的基礎(chǔ),完成分布式任務(wù)的數(shù)據(jù)拆分、歸并排序和匯總工作;YARN是通用的資源調(diào)度框架,不僅僅局限于MapReduce,也支持Spark、Storm、Flume等在線和流式計算框架;HDFS作為分布式文件系統(tǒng),支持大數(shù)據(jù)量的數(shù)據(jù)存儲、高吞吐訪問和橫向擴展。
參考資料:
- 《Hadoop技術(shù)內(nèi)幕 深入理解MapReduce架構(gòu)設(shè)計與實現(xiàn)原理》,董西成著
- 《Hadoop技術(shù)內(nèi)幕:深入解析YARN架構(gòu)設(shè)計與實現(xiàn)原理》,董西成著
- 分布式文件系統(tǒng)HDFS概述
- https://blog.csdn.net/qq_41858402/article/details/108207777
- https://blog.csdn.net/qq_40589204/article/details/118160989
- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html
到了這里,關(guān)于分布式計算框架Hadoop核心組件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!