3.5 MapReduce 內(nèi)核源碼解析
3.5.1 MapTask 工作機(jī)制
(1)Read階段:MapTask通過(guò)InputFormat獲得的RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)key/value。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value。
(3)Collect 收集階段:在用戶編寫 map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,它會(huì)將生成的 key/value 分區(qū)(調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
(4)Spill 階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號(hào)Partition 進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過(guò)排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N 表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了 Combiner,則寫入文件之前,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過(guò)1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。
(5)Merge 階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask 對(duì)所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件。 當(dāng)所有數(shù)據(jù)處理完后,MapTask 會(huì)將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件output/file.out 中,同時(shí)生成相應(yīng)的索引文件output/file.out.index。 在進(jìn)行文件合并過(guò)程中,MapTask以分區(qū)為單位進(jìn)行合并。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式。每輪合并mapreduce.task.io.sort.factor(默認(rèn) 10)個(gè)文件,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后,重復(fù)以上過(guò)程,直到最終得到一個(gè)大文件。
讓每個(gè) MapTask 最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來(lái)的開銷。
3.5.2 ReduceTask 工作機(jī)制
(1)Copy 階段:ReduceTask 從各個(gè) MapTask 上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù),如果其大小超過(guò)一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。
(2)Sort 階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過(guò)多或磁盤上文件過(guò)多。按照MapReduce語(yǔ)義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一
起,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
(3)Reduce 階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上。
3.5.3 ReduceTask 并行度決定機(jī)制
回顧:MapTask并行度由切片個(gè)數(shù)決定,切片個(gè)數(shù)由輸入文件和切片規(guī)則決定。
思考:ReduceTask并行度由誰(shuí)決定?
1)設(shè)置ReduceTask并行度(個(gè)數(shù))
ReduceTask 的并行度同樣影響整個(gè) Job 的執(zhí)行并發(fā)度和執(zhí)行效率,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同,ReduceTask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
// 默認(rèn)值是1,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);
2)實(shí)驗(yàn):測(cè)試ReduceTask多少合適
(1)實(shí)驗(yàn)環(huán)境:1個(gè)Master節(jié)點(diǎn),16個(gè)Slave節(jié)點(diǎn):CPU:8GHZ,內(nèi)存: 2G
(2)實(shí)驗(yàn)結(jié)論:
3)注意事項(xiàng)
(1)ReduceTask=0,表示沒有Reduce階段,輸出文件個(gè)數(shù)和Map個(gè)數(shù)一致。
(2)ReduceTask默認(rèn)值就是1,所以輸出文件個(gè)數(shù)為一個(gè)。
(3)如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)ReduceTask數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求,有些情況下,需要計(jì)算全局匯總結(jié)果,就只能有1個(gè)ReduceTask。
(5)具體多少個(gè)ReduceTask,需要根據(jù)集群性能而定。
(6)如果分區(qū)數(shù)不是1,但是ReduceTask為1,是否執(zhí)行分區(qū)過(guò)程。答案是:不執(zhí)行分區(qū)過(guò)程。因?yàn)樵贛apTask的源碼中,執(zhí)行分區(qū)的前提是先判斷ReduceNum個(gè)數(shù)是否大于1。不大于1肯定不執(zhí)行。
3.6 數(shù)據(jù)清洗(ETL)
“ETL,是英文 Extract-Transform-Load 的縮寫,用來(lái)描述將數(shù)據(jù)從來(lái)源端經(jīng)過(guò)抽取
(Extract)、轉(zhuǎn)換(Transform)、加載(Load)至目的端的過(guò)程。ETL一詞較常用在數(shù)據(jù)倉(cāng)庫(kù),但其對(duì)象并不限于數(shù)據(jù)倉(cāng)庫(kù)
在運(yùn)行核心業(yè)務(wù)MapReduce 程序之前,往往要先對(duì)數(shù)據(jù)進(jìn)行清洗,清理掉不符合用戶要求的數(shù)據(jù)。==清理的過(guò)程往往只需要運(yùn)行Mapper程序,不需要運(yùn)行Reduce程序。 ==
1)需求
去除日志中字段個(gè)數(shù)小于等于11的日志。
(1)輸入數(shù)據(jù)
(2)期望輸出數(shù)據(jù)
每行字段長(zhǎng)度都大于11。
2)需求分析
需要在Map階段對(duì)輸入的數(shù)據(jù)根據(jù)規(guī)則進(jìn)行過(guò)濾清洗。
3)實(shí)現(xiàn)代碼
(1)編寫WebLogMapper類
package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WebLogMapper extends Mapper<LongWritable, Text, Text,
NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 獲取1行數(shù)據(jù)
String line = value.toString();
// 2 解析日志
boolean result = parseLog(line,context);
// 3 日志不合法退出
if (!result) {
return;
}
// 4 日志合法就直接寫出
context.write(value, NullWritable.get());
}
// 2 封裝解析日志的方法
private boolean parseLog(String line, Context context) {
// 1 截取
String[] fields = line.split(" ");
// 2 日志長(zhǎng)度大于11的為合法
if (fields.length > 11) {
return true;
}else {
return false;
}
}
}
(2)編寫WebLogDriver類
package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {
public static void main(String[] args) throws Exception {
// 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
args = new String[] { "D:/input/inputlog", "D:/output1" };
// 1 獲取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加載jar包
job.setJarByClass(LogDriver.class);
// 3 關(guān)聯(lián)map
job.setMapperClass(WebLogMapper.class);
// 4 設(shè)置最終輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 設(shè)置reducetask個(gè)數(shù)為0
job.setNumReduceTasks(0);
// 5 設(shè)置輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
3.7 MapReduce 開發(fā)總結(jié)
1)輸入數(shù)據(jù)接口:InputFormat
(1)默認(rèn)使用的實(shí)現(xiàn)類是:TextInputFormat
(2)TextInputFormat 的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回。
(3)CombineTextInputFormat 可以把多個(gè)小文件合并成一個(gè)切片處理,提高處理效率。
2)邏輯處理接口:Mapper
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:map() setup() cleanup ()
3)Partitioner 分區(qū)
(1)有默認(rèn)實(shí)現(xiàn) HashPartitioner,邏輯是根據(jù)key的哈希值和numReduces來(lái)返回一個(gè)分區(qū)號(hào);key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果業(yè)務(wù)上有特別的需求,可以自定義分區(qū)。
4)Comparable 排序
(1)當(dāng)我們用自定義的對(duì)象作為key來(lái)輸出時(shí),就必須要實(shí)現(xiàn)WritableComparable 接口,重寫其中的compareTo()方法。
(2)部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序。
(3)全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce。
(4)二次排序:排序的條件有兩個(gè)。
5)Combiner 合并
Combiner 合并可以提高程序執(zhí)行效率,減少IO傳輸。但是使用時(shí)必須不能影響原有的
業(yè)務(wù)處理結(jié)果。
6)邏輯處理接口:Reducer
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:reduce() setup() cleanup ()文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-656739.html
7)輸出數(shù)據(jù)接口:OutputFormat
(1)默認(rèn)實(shí)現(xiàn)類是TextOutputFormat,功能邏輯是:將每一個(gè)KV對(duì),向目標(biāo)文本文件
輸出一行。
(2)用戶還可以自定義OutputFormat。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-656739.html
到了這里,關(guān)于Hadoop學(xué)習(xí):深入解析MapReduce的大數(shù)據(jù)魔力(三)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!