一、MapReduce基礎(chǔ)
MapReduce的思想核心是“分而治之”,適用于大量復(fù)雜的任務(wù)處理場景(大規(guī)模數(shù)據(jù)處理場景)。
Map負(fù)責(zé)“分”,把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來并行處理。可以進(jìn)行拆分的前提是這些小任務(wù)可以并行計算,彼此間幾乎沒有依賴關(guān)系。
Reduce負(fù)責(zé)“合”,即對map階段的結(jié)果進(jìn)行全局匯總。
MapReduce運行在yarn集群。ResourceManager+NodeManager這兩個階段合起來就是MapReduce思想的體現(xiàn)。

1.1 MapReduce設(shè)計構(gòu)思
MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個完整的分布式運算程序,并發(fā)運行在Hadoop集群上。
MapReduce設(shè)計并提供了一個同意的計算框架,為程序員隱藏了絕大多數(shù)系統(tǒng)層面的處理細(xì)節(jié),為程序員提供了一個抽象和高層的編程接口和框架。程序員僅需要關(guān)心應(yīng)用層的具體計算問題,僅需要編寫少量的處理應(yīng)用本身計算問題的程序代碼。
Map和Reduce為程序員提供了一個清晰的操作接口抽象描述。MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現(xiàn)Map和Reduce,MapReduce處理的數(shù)據(jù)類型是<key,value>鍵值對。

一個完整的MapReduce程序在分布式運行時有三類實例進(jìn)程:
MRAppMaster 負(fù)責(zé)整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
MapTask 負(fù)責(zé)map階段的整個數(shù)據(jù)處理流程
ReduceTask 負(fù)責(zé)reduce階段的整個數(shù)據(jù)處理流程
1.2 MapReduce工作原理


1、分片操作:
FileInputstream,首先要計算切片大小,F(xiàn)ileInputstream是一個抽象類,繼承InputFormat接口,真正完成工作的是它的實現(xiàn)類,默認(rèn)為是TextInputFormat,TextInputFormat是讀取文件的,默認(rèn)為一行一行讀取,將輸入文件切分為邏輯上的多個input split,input split是MapReduce對文件進(jìn)行處理和運算的輸入單位,只是一個邏輯概念。
在進(jìn)行Map計算之前,MapReduce會根據(jù)輸入文件計算的切片數(shù)開啟map任務(wù),一個輸入切片對應(yīng)一個maptask,輸入分片存儲的并非數(shù)據(jù)本身,而是一個分片長度和一個記錄數(shù)據(jù)位置的集合,每個input spilt中存儲著該分片的數(shù)據(jù)信息如:文件塊信息、起始位置、數(shù)據(jù)長度、所在節(jié)點列表等,并不是對文件實際分割成多個小文件,輸入切片大小往往與hdfs的block關(guān)系密切,默認(rèn)一個切片對應(yīng)一個block,大小為128M;注意:盡管我們可以使用默認(rèn)塊大小或自定義的方式來定義分片的大小,但一個文件的大小,如果是在切片大小的1.1倍以內(nèi),仍作為一個片存儲,而不會將那多出來的0.1單獨分片。
2、數(shù)據(jù)格式化操作:
TextInputFormat 會創(chuàng)建RecordReader去讀取數(shù)據(jù),通過getCurrentkey,getCurrentvalue,nextkey,value等方法來讀取,讀取結(jié)果會形成key,value形式返回給maptask,key為偏移量,value為每一行的內(nèi)容,此操作的作用為在分片中每讀取一條記錄就調(diào)用一次map方法,反復(fù)這一過程直到將整個分片讀取完畢。
3、map階段操作:
此階段就是程序員通過需求偏寫了map函數(shù),將數(shù)據(jù)格式化的<K,V>鍵值對通過Mapper的map()方法邏輯處理,形成新的<k,v>鍵值對,通過Context.write輸出到OutPutCollector收集器
map端的shuffle(數(shù)據(jù)混洗)過程:溢寫(分區(qū),排序,合并,歸并)
溢寫:
由map處理的結(jié)果并不會直接寫入磁盤,而是會在內(nèi)存中開啟一個環(huán)形內(nèi)存緩沖區(qū),先將map結(jié)果寫入緩沖區(qū),這個緩沖區(qū)默認(rèn)大小為100M,并且在配置文件里為這個緩沖區(qū)設(shè)了一個閥值,默認(rèn)為0.8,同時map還會為輸出操作啟動一個守護(hù)線程,如果緩沖區(qū)內(nèi)存達(dá)到了閥值0.8,這個線程會將內(nèi)容寫入到磁盤上,這個過程叫作spill(溢寫)。
分區(qū)Partition
當(dāng)數(shù)據(jù)寫入內(nèi)存時,決定數(shù)據(jù)由哪個Reduce處理,從而需要分區(qū),默認(rèn)分區(qū)方式采用hash函數(shù)對key進(jìn)行哈布后再用Reduce任務(wù)數(shù)量進(jìn)行取模,表示為hash(key)modR,這樣就可以把map輸出結(jié)果均勻分配給Reduce任務(wù)處理,Partition與Reduce是一一對應(yīng)關(guān)系,類似于一個分片對應(yīng)一個map task,最終形成的形式為(分區(qū)號,key,value)
排序Sort:
在溢出的數(shù)據(jù)寫入磁盤前,會對數(shù)據(jù)按照key進(jìn)行排序,默認(rèn)采用快速排序,第一關(guān)鍵字為分區(qū)號,第二關(guān)鍵字為key。
合并combiner:
程序員可選是否合并,數(shù)據(jù)合并,在Reduce計算前對相同的key數(shù)據(jù)、value值合并,減少輸出量,如(“a”,1)(“a”,1)合并之后(“a”,2)
歸并menge
每塊溢寫會成一個溢寫文件,這些溢寫文件最終需要被歸并為一個大文件,生成key對應(yīng)的value-list,會進(jìn)行歸并排序<"a",1><"a",1>歸并后<"a",<1,1>>。
Reduce 端的shffle:
數(shù)據(jù)copy:map端的shffle結(jié)束后,所有map的輸出結(jié)果都會保存在map節(jié)點的本地磁盤上,文件都經(jīng)過分區(qū),不同的分區(qū)會被copy到不同的Recuce任務(wù)并進(jìn)行并行處理,每個Reduce任務(wù)會不斷通過RPC向JobTracker詢問map任務(wù)是否完成,JobTracker檢測到map位務(wù)完成后,就會通過相關(guān)Reduce任務(wù)去aopy拉取數(shù)據(jù),Recluce收到通知就會從Map任務(wù)節(jié)點Copy自己分區(qū)的數(shù)據(jù)此過程一般是Reduce任務(wù)采用寫個線程從不同map節(jié)點拉取
歸并數(shù)據(jù):
Map端接取的數(shù)據(jù)會被存放到 Reduce端的緩存中,如果緩存被占滿,就會溢寫到磁盤上,緩存數(shù)據(jù)來自不同的Map節(jié)點,會存在很多合并的鍵值對,當(dāng)溢寫啟動時,相同的keg會被歸并,最終各個溢寫文件會被歸并為一個大類件歸并時會進(jìn)行排序,磁盤中多個溢寫文許歸并為一個大文許可能需要多次歸并,一次歸并溢寫文件默認(rèn)為10個
4、Reduce階段:
Reduce任務(wù)會執(zhí)行Reduce函數(shù)中定義的各種映射,輸出結(jié)果存在分布式文件系統(tǒng)中。
二、 MapReduce編程模型
2.1 編程模型概述
2.1.1 Map階段2個步驟
設(shè)置InputFormat類,將數(shù)據(jù)切分為Key-Value(K1和V1)對,輸入到第二步
自定義Map邏輯,將第一步的結(jié)果轉(zhuǎn)換為另外的Key-Value(K2和V2)對,輸出結(jié)果
2.1.2 Shuffle階段4個步驟
對輸出的Key-Value進(jìn)行分區(qū)
對不同分區(qū)的數(shù)據(jù)按照相同的key排序
(可選)對分組過的數(shù)據(jù)初步規(guī)約,降低數(shù)據(jù)的網(wǎng)絡(luò)拷貝
對數(shù)據(jù)進(jìn)行分組,相同的Key和Value放入一個集合中
2.1.3 Reduce階段2個步驟
對多個Map任務(wù)的結(jié)果進(jìn)行排序以及合并,編寫Reduce函數(shù)實現(xiàn)自己的邏輯,對輸入的Key-Value進(jìn)行處理,轉(zhuǎn)為新的Key-Value(K3和V3)輸出
設(shè)置OutputFromat處理并保存Reduce輸出的Key-Value數(shù)據(jù)
2.2 編程模型三部曲
(1)Input:一系列(K1,V1)。
(2)Map和Reduce:
Map:(K1,V1) -> list(K2,V2) (其中K2/V2是中間結(jié)果對)
Reduce:(K2,list(v2)) -> list(K3,V3)
(3)Output:一系列(K3,V3)。
三、詞頻統(tǒng)計WordCount
3.1 添加Maven依賴
# 將maven版本改為1.8
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
# 添加如下依賴
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-common</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-hdfs</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-mapreduce-client-core</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-mapreduce-client-common</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
3.2 代碼實現(xiàn)
《wordcount.txt》
hello java
hello hadoop
hello java
hello java hadoop
java hadoop
hadoop java
Mapper類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
IntWritable intWritable = new IntWritable();
System.out.println("WordCountMap stage Key:"+key+" Value:"+value);
String[] words = value.toString().split(" "); // "hello world"->[hello,world]
for (String word : words) {
text.set(word);
intWritable.set(1);
context.write(text,intWritable); // <hello,1> <word,1>
}
}
}
Reduce類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("Reduce stage Key:"+key+" Values"+values.toString());
int count=0;
for (IntWritable intWritable : values) {
count += intWritable.get();
}
LongWritable longWritable = new LongWritable(count);
System.out.println("Key:"+key+" ResultValue:"+longWritable.get());
context.write(key,longWritable);
}
}
Driver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
// 設(shè)置mapper類
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設(shè)置Reduce類
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定map輸入的文件路徑
FileInputFormat.
setInputPaths(job,new Path("D:\\Servers\\hadoopstu\\in\\wordcount.txt"));
// 指定reduce結(jié)果的輸出路徑
Path path = new Path("D:\\Servers\\hadoopstu\\out1");
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
}
}
3.3 代碼說明
3.3.1 對于map函數(shù)的方法

protected void map(LongWritable key, Text value, Context context)
繼承Mapper類,實現(xiàn)map方法,重寫的map方法中包含三個參數(shù),key、value就是輸入的key、value鍵值對(<K1,V1>),context記錄的是整個上下文,可以通過context將數(shù)據(jù)寫出去。

3.3.2 對于reduce函數(shù)的方法

protected void reduce(Text key, Iterable<IntWritable> values, Context context)
繼承Reduce類,實現(xiàn)reduce方法,reduce函數(shù)輸入的是一個<K,V>形式,但是這里的value是以迭代器的形式Iterable<IntWritable> value。即,reduce的輸入是一個key對應(yīng)一組value。
reduce中context參數(shù)與map中的reduce參數(shù)作用一致。

3.3.3 對于main函數(shù)的調(diào)用

創(chuàng)建configuration()類,作用是讀取MapReduce系統(tǒng)配置信息
創(chuàng)建job類
設(shè)置map函數(shù)、map函數(shù)輸出的key、value類型文章來源:http://www.zghlxwxcb.cn/news/detail-786888.html
設(shè)置reduce函數(shù)、reduce函數(shù)輸出的key、value類型,即最終存儲再HDFS結(jié)果文件中的key、value類型文章來源地址http://www.zghlxwxcb.cn/news/detail-786888.html
到了這里,關(guān)于Hadoop三大框架之MapReduce工作流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!