分布式計算模型MapReduce
1.MapReduce設(shè)計思想 2.MapReduce分布式計算的基本原理 3.使用Java進(jìn)行MapReduce編程 4.在Hadoop集群中提交MapReduce任務(wù) 5.Yarn工作機(jī)制 |
1. MapReduce設(shè)計思想
1.1? 什么是MapReduce 1)MapReduce是一個分布式計算框架 它將大型數(shù)據(jù)操作作業(yè)分解為可以跨服務(wù)器集群并行執(zhí)行的單個任務(wù)。 起源于Google 2)適用于大規(guī)模數(shù)據(jù)處理場景 每個節(jié)點(diǎn)處理存儲在該節(jié)點(diǎn)的數(shù)據(jù) 3)每個job包含Map(分類kv)和Reduce(計算)兩部分 |
1.2?? MapReduce的設(shè)計思想 1)分而治之:簡化并行計算的編程模型 2)構(gòu)建抽象模型:Map和Reduce 開發(fā)人員專注于實(shí)現(xiàn)Mapper和Reducer函數(shù) 3)隱藏系統(tǒng)層細(xì)節(jié):開發(fā)人員專注于業(yè)務(wù)邏輯實(shí)現(xiàn) |
1.3? ?MapReduce特點(diǎn) 1)優(yōu)點(diǎn):易于編程;可擴(kuò)展性;高容錯性;高吞吐量 2)不適用領(lǐng)域:難以實(shí)時計算;不適合流式計算;不適合DGA(有向圖)計算 |
1.4? MapReduce編程規(guī)范 1)MapReduce框架處理的數(shù)據(jù)格式是<K,V>鍵值對形式 Mapper?? Map端接收<K,V>鍵值對數(shù)據(jù),經(jīng)過處理輸出新的<K,V>鍵值對 Map端處理邏輯寫在Mapper類中map()方法中 2)Reducer Reduce端搜集多個Mapper端輸出的<K,V>數(shù)據(jù),進(jìn)行匯總 Reducer的業(yè)務(wù)邏輯寫在reduce()方法中 每一組相同k的<k,Iterator<v>>組調(diào)用一次reduce()方法 |
2. 使用Java進(jìn)行MapReduce編程
3.1? WordCount功能實(shí)現(xiàn) (1)物料準(zhǔn)備:wordcount (2)WordCountMapper (3)WordCountReduce (4)WordCountDriver 執(zhí)行后輸出: |
3. Hadoop集群中提交MapReduce任務(wù)
Idea打包工程成jar包,執(zhí)行命令 |
4. MapReduce分布式計算的基本原理
4.1? Hadoop序列化 什么是序列化:序列化就是把內(nèi)存中的對象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲到磁盤(持久化)和網(wǎng)絡(luò)傳輸。 反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是磁盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對象。 (1)必須可序列化(serializable) 作用:網(wǎng)絡(luò)傳輸以及持久化存儲 IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable, BooleanWritable、NullWritable等 (2)都繼承了Writable接口 并實(shí)現(xiàn)write()和readFields()方法 (3)Keys必須實(shí)現(xiàn)WritableComparable接口 MapReduce框架會按照Key進(jìn)行排序 Reduce階段需要sort keys需要可比較 |
4.2? MapReduce框架原理 MapReduce執(zhí)行流程: (1)split階段:計算分片 (2)map階段:調(diào)用map()方法對數(shù)據(jù)進(jìn)行處理 (3)shffule階段:主要負(fù)責(zé)將map端生成的數(shù)據(jù)傳遞給reduce端 (4)reduce階段:對Shffule階段傳來的數(shù)據(jù)進(jìn)行最后的整理合并 |
4.3? MapTask (1)Read階段:MapTask通過InputFormat獲得的RecordReader,從輸入InputSplit中解析出一個個key/value。 (2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value。 (3)Collect收集階段:在用戶編寫map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中。 (4)Spill階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進(jìn)行一次本地排序,并在必要時對數(shù)據(jù)進(jìn)行合并、壓縮等操作。 ??????? 溢寫階段詳情: ??????? 步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號Partition進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。 ??????? 步驟2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時文件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner,則寫入文件之前,對每個分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。 ??????? 步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中,其中每個分區(qū)的元信息包括在臨時文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。 (5)Merge階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對所有臨時文件進(jìn)行一次合并,以確保最終只會生成一個數(shù)據(jù)文件。 ??????? 當(dāng)所有數(shù)據(jù)處理完后,MapTask會將所有臨時文件合并成一個大文件,并保存到文件output/file.out中,同時生成相應(yīng)的索引文件output/file.out.index。 ??????? 在進(jìn)行文件合并過程中,MapTask以分區(qū)為單位進(jìn)行合并。對于某個分區(qū),它將采用多輪遞歸合并的方式。每輪合并mapreduce.task.io.sort.factor(默認(rèn)10)個文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復(fù)以上過程,直到最終得到一個大文件。 ??????? 讓每個MapTask最終只生成一個數(shù)據(jù)文件,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來的開銷。 |
4.4? ReduceTask (1)Copy階段:ReduceTask從各個MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。 (2)Sort階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過多或磁盤上文件過多。按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經(jīng)實(shí)現(xiàn)對自己的處理結(jié)果進(jìn)行了局部排序,因此,ReduceTask只需對所有數(shù)據(jù)進(jìn)行一次歸并排序即可。 (3)Reduce階段:reduce()函數(shù)將計算結(jié)果寫到HDFS上。 |
4.5? InputFormat數(shù)據(jù)輸入接口 切片與MapTask并行度決定機(jī)制1)問題引出 MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個Job的處理速度。 思考:1G的數(shù)據(jù),啟動8個MapTask,可以提高集群的并發(fā)處理能力。那么1K的數(shù)據(jù),也啟動8個MapTask,會提高集群性能嗎?MapTask并行任務(wù)是否越多越好呢?哪些因素影響了MapTask并行度? 2)MapTask并行度決定機(jī)制 數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS存儲數(shù)據(jù)單位。 數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進(jìn)行分片,并不會在磁盤上將其切分成片進(jìn)行存儲。數(shù)據(jù)切片是MapReduce程序計算輸入數(shù)據(jù)的單位,一個切片會對應(yīng)啟動一個MapTask。 |
4.6? InputSplit(輸入分片) |
4.7? Shuffle階段 數(shù)據(jù)從Map輸出到Reduce輸入的過程 |
4.8? Combiner類 (1)Combiner相當(dāng)于本地化的Reduce操作 在shuffle之前進(jìn)行本地聚合 用于性能優(yōu)化,可選項 輸入和輸出類型一致 (2)Reducer可以被用作Combiner的條件 符合交換律和結(jié)合律 (3)實(shí)現(xiàn)Combiner job.setCombinerClass(WCReducer.class) |
4.9? Partitioner類 (1)用于在Map端對key進(jìn)行分區(qū) 默認(rèn)使用的是HashPartitioner 獲取key的哈希值 使用key的哈希值對Reduce任務(wù)數(shù)求模 決定每條記錄應(yīng)該送到哪個Reducer處理 (2)自定義Partitioner 繼承抽象類Partitioner,重寫getPartition方法 job.setPartitionerClass(MyPartitioner.class) |
5. MapReduce實(shí)現(xiàn) SQL Join操作
4.1? map端join 1)使用場景 Map Join適用于一張表十分小、一張表很大的場景。 2)優(yōu)點(diǎn) 思考:在Reduce端處理過多的表,非常容易產(chǎn)生數(shù)據(jù)傾斜。怎么辦? 在Map端緩存多張表,提前處理業(yè)務(wù)邏輯,這樣增加Map端業(yè)務(wù),減少Reduce端數(shù)據(jù)的壓力,盡可能的減少數(shù)據(jù)傾斜。 3)具體辦法:采用DistributedCache ?????? (1)在Mapper的setup階段,將文件讀取到緩存集合中。 ?????? (2)在Driver驅(qū)動類中加載緩存。 //緩存普通文件到Task運(yùn)行節(jié)點(diǎn)。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群運(yùn)行,需要設(shè)置HDFS路徑 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt")); |
4.2? reduce端join Map端的主要工作:為來自不同表或文件的key/value對,打標(biāo)簽以區(qū)別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標(biāo)志作為value,最后進(jìn)行輸出。 ??? Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經(jīng)完成,我們只需要在每一個分組當(dāng)中將那些來源于不同文件的記錄(在Map階段已經(jīng)打標(biāo)志)分開,最后進(jìn)行合并就ok了。 |
6. Yarn
Yarn工作機(jī)制 (1)MR程序提交到客戶端所在的節(jié)點(diǎn)。 (2)YarnRunner向ResourceManager申請一個Application。 (3)RM將該應(yīng)用程序的資源路徑返回給YarnRunner。 (4)該程序?qū)⑦\(yùn)行所需資源提交到HDFS上。 (5)程序資源提交完畢后,申請運(yùn)行mrAppMaster。 (6)RM將用戶的請求初始化成一個Task。 (7)其中一個NodeManager領(lǐng)取到Task任務(wù)。 (8)該NodeManager創(chuàng)建容器Container,并產(chǎn)生MRAppmaster。 (9)Container從HDFS上拷貝資源到本地。 (10)MRAppmaster向RM 申請運(yùn)行MapTask資源。 (11)RM將運(yùn)行MapTask任務(wù)分配給另外兩個NodeManager,另兩個NodeManager分別領(lǐng)取任務(wù)并創(chuàng)建容器。 (12)MR向兩個接收到任務(wù)的NodeManager發(fā)送程序啟動腳本,這兩個NodeManager分別啟動MapTask,MapTask對數(shù)據(jù)分區(qū)排序。 (13)MrAppMaster等待所有MapTask運(yùn)行完畢后,向RM申請容器,運(yùn)行ReduceTask。 (14)ReduceTask向MapTask獲取相應(yīng)分區(qū)的數(shù)據(jù)。 (15)程序運(yùn)行完畢后,MR會向RM申請注銷自己。 HDFS、YARN、MapReduce三者關(guān)系 作業(yè)提交過程之HDFS & MapReduce 作業(yè)提交全過程詳解 (1)作業(yè)提交 第1步:Client調(diào)用job.waitForCompletion方法,向整個集群提交MapReduce作業(yè)。 第2步:Client向RM申請一個作業(yè)id。 第3步:RM給Client返回該job資源的提交路徑和作業(yè)id。 第4步:Client提交jar包、切片信息和配置文件到指定的資源提交路徑。 第5步:Client提交完資源后,向RM申請運(yùn)行MrAppMaster。 (2)作業(yè)初始化 第6步:當(dāng)RM收到Client的請求后,將該job添加到容量調(diào)度器中。 第7步:某一個空閑的NM領(lǐng)取到該Job。 第8步:該NM創(chuàng)建Container,并產(chǎn)生MRAppmaster。 第9步:下載Client提交的資源到本地。 (3)任務(wù)分配 第10步:MrAppMaster向RM申請運(yùn)行多個MapTask任務(wù)資源。 第11步:RM將運(yùn)行MapTask任務(wù)分配給另外兩個NodeManager,另兩個NodeManager分別領(lǐng)取任務(wù)并創(chuàng)建容器。 (4)任務(wù)運(yùn)行 第12步:MR向兩個接收到任務(wù)的NodeManager發(fā)送程序啟動腳本,這兩個NodeManager分別啟動MapTask,MapTask對數(shù)據(jù)分區(qū)排序。 第13步:MrAppMaster等待所有MapTask運(yùn)行完畢后,向RM申請容器,運(yùn)行ReduceTask。 第14步:ReduceTask向MapTask獲取相應(yīng)分區(qū)的數(shù)據(jù)。 第15步:程序運(yùn)行完畢后,MR會向RM申請注銷自己。 (5)進(jìn)度和狀態(tài)更新 YARN中的任務(wù)將其進(jìn)度和狀態(tài)(包括counter)返回給應(yīng)用管理器, 客戶端每秒(通過mapreduce.client.progressmonitor.pollinterval設(shè)置)向應(yīng)用管理器請求進(jìn)度更新, 展示給用戶。 (6)作業(yè)完成文章來源:http://www.zghlxwxcb.cn/news/detail-695883.html 除了向應(yīng)用管理器請求作業(yè)進(jìn)度外, 客戶端每5秒都會通過調(diào)用waitForCompletion()來檢查作業(yè)是否完成。時間間隔可以通過mapreduce.client.completion.pollinterval來設(shè)置。作業(yè)完成之后, 應(yīng)用管理器和Container會清理工作狀態(tài)。作業(yè)的信息會被作業(yè)歷史服務(wù)器存儲以備之后用戶核查。文章來源地址http://www.zghlxwxcb.cn/news/detail-695883.html |
到了這里,關(guān)于hadoop-MapReduce的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!