一、基本概念
Hadoop解決了大數(shù)據(jù)面臨的兩個(gè)核心問題:海量數(shù)據(jù)的存儲(chǔ)問題、海量數(shù)據(jù)的計(jì)算問題
其中MapReduce就是專門設(shè)計(jì)用來解決海量數(shù)據(jù)計(jì)算問題的,同時(shí)MapReduce和HDFS不一樣的地方在于,雖然兩者均為分布式組件,但是HDFS是一個(gè)完善的軟件,我們只需要使用即可,不需要去進(jìn)行任何的邏輯的編輯。而MapReduce進(jìn)行數(shù)據(jù)計(jì)算,計(jì)算什么樣的數(shù)據(jù),使用什么樣的邏輯,MR程序都不清楚,因此MR只是一個(gè)分布式的計(jì)算【框架】,所謂的框架就是MR程序把分布式計(jì)算的思想和邏輯全部封裝好了,我們只需要按照框架的思維編寫計(jì)算代碼(就是我們自己處理數(shù)據(jù)的邏輯代碼),編寫完成之后,我們的程序必然是分布式的程序。
使用分布式計(jì)算框架的好處就在于我們開發(fā)人員只需要把關(guān)注點(diǎn)和重點(diǎn)放在業(yè)務(wù)的邏輯開發(fā),而非分布式計(jì)算程序邏輯的邏輯。
二、MapReduce的分布式計(jì)算核心思想
MR框架實(shí)現(xiàn)分布式計(jì)算的邏輯是將MR程序分成了兩部分:Map階段、Reduce階段
其中運(yùn)行一個(gè)計(jì)算程序先執(zhí)行Map階段,map階段又可以同時(shí)運(yùn)行多個(gè)計(jì)算程序(MapTask)去計(jì)算map階段的邏輯,Map階段主要負(fù)責(zé)分?jǐn)?shù)據(jù),而且map階段的多個(gè)MapTask并行運(yùn)行互不干擾。
第二階段Reduce階段,Reduce階段也可以同時(shí)運(yùn)行多個(gè)計(jì)算程序(ReduceTask),Reduce階段的任務(wù)主要負(fù)責(zé)合數(shù)據(jù),同時(shí)多個(gè)ReduceTask同時(shí)運(yùn)行互不干擾的。
任何一個(gè)MR程序,只能有一個(gè)Map階段,一個(gè)Reduce階段。
三、MapReduce程序在運(yùn)行過程中三個(gè)核心進(jìn)程
MRAppMaster(一個(gè)):負(fù)責(zé)整個(gè)分布式程序的監(jiān)控
MapTask(多個(gè)):Map階段的核心進(jìn)程,每一個(gè)MapTask處理數(shù)據(jù)源的一部分?jǐn)?shù)據(jù)
ReduceTask(多個(gè)):Reduce階段的核心進(jìn)程,每一個(gè)ReduceTask負(fù)責(zé)處理Map階段輸出的一部分?jǐn)?shù)據(jù)
四、如何編寫MapReduce計(jì)算程序:(編程步驟)
1、編寫MapTask的計(jì)算邏輯
1、編寫一個(gè)Java類繼承Mapper類,繼承Mapper類之后必須指定四個(gè)泛型,四個(gè)泛型分別代表了MapTask階段的輸入的數(shù)據(jù)和輸出的數(shù)據(jù)類型。
MR程序要求輸入的數(shù)據(jù)和輸出的數(shù)據(jù)類型必須都得是key-value鍵值對(duì)類型的數(shù)據(jù)。
2、重寫繼承的Mapper類當(dāng)中的map方法,map方法處理數(shù)據(jù)的時(shí)候是文件中的一行數(shù)據(jù)調(diào)用一次map方法,map方法的計(jì)算邏輯就是MapTask的核心計(jì)算邏輯。
3、同時(shí)map方法中數(shù)據(jù)計(jì)算完成,需要把數(shù)據(jù)以指定的key-value格式類型輸出。
2、編寫ReduceTask的計(jì)算邏輯
1、編寫一個(gè)Java類繼承Reducer類,繼承Reducer類之后必須指定四個(gè)泛型,四個(gè)泛型分別代表了Reduce階段的輸入和輸出的KV數(shù)據(jù)類型。
Reduce的輸入的KV類型就是Map階段的輸出的KV類型。
Reduce的輸出類型自定義的。
2、重寫Reducer類當(dāng)中提供的reduce方法,reduce方法處理數(shù)據(jù)的時(shí)候一組相同的key調(diào)用一次reduce方法,reduce方法的計(jì)算邏輯就是ReduceTask的核心計(jì)算邏輯。
3、調(diào)用reduce方法,reduce邏輯處理完成,需要把數(shù)據(jù)以指定的key-value格式類型輸出。
3、編寫Driver驅(qū)動(dòng)程序
Driver驅(qū)動(dòng)程序是用來組裝MR程序,組裝MR程序的處理的文件路徑、MR程序的Map階段的計(jì)算邏輯、MR程序的Reduce階段的計(jì)算邏輯、MR程序運(yùn)行完成之后的結(jié)果的輸出路徑。
Driver驅(qū)動(dòng)程序本質(zhì)上就是一個(gè)main函數(shù)
MapReduce底層是由Java開發(fā)的,因此MR程序我們要編寫的話支持使用Java代碼來進(jìn)行編寫
五、MapReduce的案例實(shí)現(xiàn) —— 大數(shù)據(jù)分布式計(jì)算的經(jīng)典案例WordCount(單詞計(jì)數(shù))
1、案例需求
現(xiàn)在有一個(gè)文件,文件很大,文件中存儲(chǔ)的每一行數(shù)據(jù)都是由空格分割的多個(gè)單詞組成的,現(xiàn)在需要通過大數(shù)據(jù)分布式計(jì)算技術(shù)去統(tǒng)計(jì)文件中每一個(gè)單詞出現(xiàn)的總次數(shù)
2、案例分析(基于MapReduce)
3、代碼開發(fā)
1、創(chuàng)建一個(gè)maven管理的Java項(xiàng)目
2、引入MR的編程依賴
- hadoop-client
- hadoop-hdfs
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kang</groupId>
<artifactId>mr-study</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>mr-study</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.1.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
3、編寫Mapper階段的計(jì)算邏輯
package com.kang.wc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 單詞計(jì)數(shù)的MapTask的計(jì)算邏輯
* 1、繼承Mapper類。同時(shí)需要指定四個(gè)泛型 兩兩一組 分別 代表輸入的key value 和輸出的key value的數(shù)據(jù)類型
* 默認(rèn)情況下,map階段讀取文件數(shù)據(jù)是以每一行的偏移量為key 整數(shù)類型 每一行的數(shù)據(jù)為value讀取的 字符串類型
* map階段輸出以單詞為key 字符串 以1為value輸出 整數(shù)
* 數(shù)據(jù)類型不能使用Java中的數(shù)據(jù)類型,數(shù)據(jù)類型必須是Hadoop的一種序列化類型
* Int —— hadoop.io.IntWritable
* Long —— hadoop.io.LongWritable
* String —— hadoop.io.Text
* 2、重寫map方法
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* map方法就是MapTask的核心計(jì)算邏輯方法
* map方法是切片中的一行數(shù)據(jù)調(diào)用一次
* @param key 這一行數(shù)據(jù)的偏移量
* @param value 這一行數(shù)據(jù)
* @param context 上下文對(duì)象 用于輸出map階段處理完成的key value數(shù)據(jù)
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//拿到一行數(shù)據(jù),并且將一行數(shù)據(jù)轉(zhuǎn)成字符串類型
String line = value.toString();
//字符串以空格切割得到一個(gè)數(shù)組,數(shù)組中存放的就是一行的多個(gè)單詞
String[] words = line.split(" ");
//遍歷數(shù)組 得到每一個(gè)單詞 以單詞為key 以1為value輸出數(shù)據(jù)即可
for (String word : words) {
context.write(new Text(word),new LongWritable(1L));
}
}
}
4、編寫Reducer階段的計(jì)算邏輯
package com.kang.wc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Reduce的編程邏輯:
* 1、繼承Reducer類,指定輸入和輸出的kv類型
* 輸入KV就是Map階段的輸出KV Text LongWritable
* 輸出kv Text LongWritable
* 2、重寫reduce方法
*/
public class WCReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
/**
* Reduce方法是Reduce階段的核心計(jì)算邏輯
* reduce方法是一組相同的key執(zhí)行一次
* @param key 一組相同的key 某一個(gè)單詞
* @param values 是一個(gè)集合,集合存放的就是這一個(gè)單詞的所有的value值
* @param context 上下文對(duì)象 用于reduce階段輸出數(shù)據(jù)
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//只需要將某個(gè)單詞聚合起來的value數(shù)據(jù)累加起來 得到總次數(shù)
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
//只需要以單詞為key 以sum為value輸出即可
context.write(key,new LongWritable(sum));
}
}
5、編寫Driver驅(qū)動(dòng)程序
package com.kang.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Driver驅(qū)動(dòng)程序說白了就是封裝MR程序的
* Driver驅(qū)動(dòng)程序其實(shí)就是一個(gè)main函數(shù)
*/
public class WCDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、準(zhǔn)備一個(gè)配置文件對(duì)象Configuration
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://192.168.31.104:9000");
//2、創(chuàng)建封裝MR程序使用一個(gè)Job對(duì)象
Job job = Job.getInstance(conf);
//3、封裝處理的文件路徑hdfs://single:9000/wc.txt
FileInputFormat.setInputPaths(job,new Path("/wc.txt"));
//4、封裝MR程序的Mapper階段,還要封裝Mapper階段輸出的key-value類型
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5、封裝MR程序的Reducer階段,還要封裝reduce的輸出kv類型
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);//指定reduce階段只有一個(gè)ReduceTask
//6、封裝MR程序的輸出路徑 —— 輸出路徑一定不能存在 如果存在會(huì)報(bào)錯(cuò)
FileOutputFormat.setOutputPath(job,new Path("/wcoutput"));
//7、提交運(yùn)行MR程序
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-697294.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-697294.html
到了這里,關(guān)于Hadoop的第二個(gè)核心組件:MapReduce框架第一節(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!