第1關(guān):成績(jī)統(tǒng)計(jì):
任務(wù)描述
相關(guān)知識(shí)
什么是MapReduce
如何使用MapReduce進(jìn)行運(yùn)算
代碼解釋
編程要求
測(cè)試說明
任務(wù)描述
本關(guān)任務(wù):使用Map/Reduce計(jì)算班級(jí)中年齡最大的學(xué)生。
相關(guān)知識(shí)
為了完成本關(guān)任務(wù),你需要掌握:1.什么是MapReduce,2.如何使用MapReduce進(jìn)行運(yùn)算。
什么是MapReduce
MapReduce是一種可用于數(shù)據(jù)處理的編程模型,我們現(xiàn)在設(shè)想一個(gè)場(chǎng)景,你接到一個(gè)任務(wù),任務(wù)是:挖掘分析我國(guó)氣象中心近年來的數(shù)據(jù)日志,該數(shù)據(jù)日志大小有3T,讓你分析計(jì)算出每一年的最高氣溫,如果你現(xiàn)在只有一臺(tái)計(jì)算機(jī),如何處理呢?我想你應(yīng)該會(huì)讀取這些數(shù)據(jù),并且將讀取到的數(shù)據(jù)與目前的最大氣溫值進(jìn)行比較。比較完所有的數(shù)據(jù)之后就可以得出最高氣溫了。不過以我們的經(jīng)驗(yàn)都知道要處理這么多數(shù)據(jù)肯定是非常耗時(shí)的。
如果我現(xiàn)在給你三臺(tái)機(jī)器,你會(huì)如何處理呢?看到下圖你應(yīng)該想到了:最好的處理方式是將這些數(shù)據(jù)切分成三塊,然后分別計(jì)算處理這些數(shù)據(jù)(Map),處理完畢之后發(fā)送到一臺(tái)機(jī)器上進(jìn)行合并(merge),再計(jì)算合并之后的數(shù)據(jù),歸納(reduce)并輸出。
這就是一個(gè)比較完整的MapReduce的過程了。
開始你的任務(wù)吧,祝你成功!
答案代碼--------------------------------------
import java.io.IOException;
import java.util.StringTokenizer;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/********** Begin **********/
//Mapper函數(shù)
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private int maxValue = 0;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
while (itr.hasMoreTokens()) {
String[] str = itr.nextToken().split(" ");
String name = str[0];
one.set(Integer.parseInt(str[1]));
word.set(name);
context.write(word,one);
}
//context.write(word,one);
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxAge = 0;
int age = 0;
for (IntWritable intWritable : values) {
maxAge = Math.max(maxAge, intWritable.get());
}
result.set(maxAge);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
String inputfile = "/user/test/input";
String outputFile = "/user/test/output/";
FileInputFormat.addInputPath(job, new Path(inputfile));
FileOutputFormat.setOutputPath(job, new Path(outputFile));
job.waitForCompletion(true);
/********** End **********/
}
}
命令行
touch file01
echo Hello World Bye World
cat file01
echo Hello World Bye World >file01
cat file01
touch file02
echo Hello Hadoop Goodbye Hadoop >file02
cat file02
start-dfs.sh
hadoop fs -mkdir /usr
hadoop fs -mkdir /usr/input
hadoop fs -ls /usr/output
hadoop fs -ls /
hadoop fs -ls /usr
hadoop fs -put file01 /usr/input
hadoop fs -put file02 /usr/input
hadoop fs -ls /usr/input
第2關(guān):文件內(nèi)容合并去重
任務(wù)描述
相關(guān)知識(shí)
map類
Reducer類
Job類
編程要求
測(cè)試說明
任務(wù)描述
本關(guān)任務(wù):使用Map/Reduce編程實(shí)現(xiàn)文件合并和去重操作。
相關(guān)知識(shí)
通過上一小節(jié)的學(xué)習(xí)我們了解了MapReduce大致的使用方式,本關(guān)我們來了解一下Mapper類,Reducer類和Job類。
map類
首先我們來看看Mapper對(duì)象:
在編寫MapReduce程序時(shí),要編寫一個(gè)類繼承Mapper類,這個(gè)Mapper類是一個(gè)泛型類型,它有四個(gè)形參類型,分別指定了map()函數(shù)的輸入鍵,輸入值,和輸出鍵,輸出值的類型。就第一關(guān)的例子來說,輸入鍵是一個(gè)長(zhǎng)整型,輸入值是一行文本,輸出鍵是單詞,輸出值是單詞出現(xiàn)的次數(shù)。
答案代碼-------------------
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
/**
* @param args
* 對(duì)A,B兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件C
*/
//在這重載map函數(shù),直接將輸入中的value復(fù)制到輸出數(shù)據(jù)的key上 注意在map方法中要拋出異常:throws IOException,InterruptedException
public static class Map extends Mapper<Object, Text, Text, Text>{
/********** Begin **********/
public void map(Object key, Text value, Context content)
throws IOException, InterruptedException {
Text text1 = new Text();
Text text2 = new Text();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
text1.set(itr.nextToken());
text2.set(itr.nextToken());
content.write(text1, text2);
}
}
/********** End **********/
}
//在這重載reduce函數(shù),直接將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上 注意在reduce方法上要拋出異常:throws IOException,InterruptedException
public static class Reduce extends Reducer<Text, Text, Text, Text> {
/********** Begin **********/
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<String> set = new TreeSet<String>();
for(Text tex : values){
set.add(tex.toString());
}
for(String tex : set){
context.write(key, new Text(tex));
}
}
/********** End **********/
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
Job job = Job.getInstance(conf,"Merge and duplicate removal");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String inputPath = "/user/tmp/input/"; //在這里設(shè)置輸入路徑
String outputPath = "/user/tmp/output/"; //在這里設(shè)置輸出路徑
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
第3關(guān):信息挖掘 - 挖掘父子關(guān)系
任務(wù)描述
編程要求
測(cè)試說明
任務(wù)描述
本關(guān)任務(wù):對(duì)給定的表格進(jìn)行信息挖掘。
編程要求
你編寫的程序要能挖掘父子輩關(guān)系,給出祖孫輩關(guān)系的表格。規(guī)則如下:
孫子在前,祖父在后;
輸入文件路徑:/user/reduce/input;
輸出文件路徑:/user/reduce/output。
測(cè)試說明
程序會(huì)對(duì)你編寫的代碼進(jìn)行測(cè)試:
下面給出一個(gè)child-parent的表格,要求挖掘其中的父子輩關(guān)系,給出祖孫輩關(guān)系的表格。
輸入文件內(nèi)容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內(nèi)容如下:文章來源:http://www.zghlxwxcb.cn/news/detail-415950.html
grand_child grand_parent
Mark Jesse
Mark Alice
Philip Jesse
Philip Alice
Jone Jesse
Jone Alice
Steven Jesse
Steven Alice
Steven Frank
Steven Mary
Jone Frank
Jone Mary
開始你的任務(wù)吧,祝你成功!文章來源地址http://www.zghlxwxcb.cn/news/detail-415950.html
答案代碼------------------------
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class simple_data_mining {
public static int time = 0;
/**
* @param args
* 輸入一個(gè)child-parent的表格
* 輸出一個(gè)體現(xiàn)grandchild-grandparent關(guān)系的表格
*/
//Map將輸入文件按照空格分割成child和parent,然后正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表區(qū)別標(biāo)志
public static class Map extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
/********** Begin **********/
String line = value.toString();
String[] childAndParent = line.split(" ");
List<String> list = new ArrayList<>(2);
for (String childOrParent : childAndParent) {
if (!"".equals(childOrParent)) {
list.add(childOrParent);
}
}
if (!"child".equals(list.get(0))) {
String childName = list.get(0);
String parentName = list.get(1);
String relationType = "1";
context.write(new Text(parentName), new Text(relationType + "+"
+ childName + "+" + parentName));
relationType = "2";
context.write(new Text(childName), new Text(relationType + "+"
+ childName + "+" + parentName));
}
/********** End **********/
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
/********** Begin **********/
//輸出表頭
if (time == 0) {
context.write(new Text("grand_child"), new Text("grand_parent"));
time++;
}
//獲取value-list中value的child
List<String> grandChild = new ArrayList<>();
//獲取value-list中value的parent
List<String> grandParent = new ArrayList<>();
//左表,取出child放入grand_child
for (Text text : values) {
String s = text.toString();
String[] relation = s.split("\\+");
String relationType = relation[0];
String childName = relation[1];
String parentName = relation[2];
if ("1".equals(relationType)) {
grandChild.add(childName);
} else {
grandParent.add(parentName);
}
}
//右表,取出parent放入grand_parent
int grandParentNum = grandParent.size();
int grandChildNum = grandChild.size();
if (grandParentNum != 0 && grandChildNum != 0) {
for (int m = 0; m < grandChildNum; m++) {
for (int n = 0; n < grandParentNum; n++) {
//輸出結(jié)果
context.write(new Text(grandChild.get(m)), new Text(
grandParent.get(n)));
}
}
}
/********** End **********/
}
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Single table join");
job.setJarByClass(simple_data_mining.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String inputPath = "/user/reduce/input"; //設(shè)置輸入路徑
String outputPath = "/user/reduce/output"; //設(shè)置輸出路徑
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
到了這里,關(guān)于educoder--MapReduce基礎(chǔ)實(shí)戰(zhàn)各關(guān)卡通關(guān)答案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!