一、實(shí)驗(yàn)?zāi)康?/h4>
-
了解Mapper類,Reducer類和Job類
-
掌握什么是MapReduce及使用MapReduce進(jìn)行運(yùn)算
-
掌握挖掘父子輩關(guān)系,給出祖孫輩關(guān)系的表格
二、實(shí)驗(yàn)內(nèi)容
-
使用Map/Reduce計(jì)算班級(jí)中年齡最大的學(xué)生
-
使用Map/Reduce編程實(shí)現(xiàn)文件合并和去重操作
-
對(duì)給定的表格進(jìn)行信息挖掘
-
編寫(xiě)實(shí)現(xiàn)日期操作的程序
三、實(shí)驗(yàn)步驟
了解Mapper類,Reducer類和Job類
掌握什么是MapReduce及使用MapReduce進(jìn)行運(yùn)算
掌握挖掘父子輩關(guān)系,給出祖孫輩關(guān)系的表格
使用Map/Reduce計(jì)算班級(jí)中年齡最大的學(xué)生
使用Map/Reduce編程實(shí)現(xiàn)文件合并和去重操作
對(duì)給定的表格進(jìn)行信息挖掘
編寫(xiě)實(shí)現(xiàn)日期操作的程序
(一)使用Map/Reduce計(jì)算班級(jí)中年齡最大的學(xué)生
什么是MapReduce
MapReduce
是一種可用于數(shù)據(jù)處理的編程模型,我們現(xiàn)在設(shè)想一個(gè)場(chǎng)景,你接到一個(gè)任務(wù),任務(wù)是:挖掘分析我國(guó)氣象中心近年來(lái)的數(shù)據(jù)日志,該數(shù)據(jù)日志大小有3T
,讓你分析計(jì)算出每一年的最高氣溫,如果你現(xiàn)在只有一臺(tái)計(jì)算機(jī),如何處理呢?我想你應(yīng)該會(huì)讀取這些數(shù)據(jù),并且將讀取到的數(shù)據(jù)與目前的最大氣溫值進(jìn)行比較。比較完所有的數(shù)據(jù)之后就可以得出最高氣溫了。不過(guò)以我們的經(jīng)驗(yàn)都知道要處理這么多數(shù)據(jù)肯定是非常耗時(shí)的。
創(chuàng)建file01
輸入內(nèi)容:
Hello World Bye World
創(chuàng)建file02
輸入內(nèi)容:
Hello Hadoop Goodbye Hadoop
將文件上傳到HDFS
的/usr/input/
目錄下:
不要忘了啟動(dòng)DFS
: start-dfs.sh
然后創(chuàng)建文件夾并上傳:
點(diǎn)擊評(píng)測(cè),運(yùn)行代碼,可以看到/usr/output
目錄下已經(jīng)生成了文件。
我們來(lái)查看part--r-00000
文件的內(nèi)容:
可以看到統(tǒng)計(jì)的數(shù)據(jù)已經(jīng)生成在文件中了。
編程要求
使用MapReduce
計(jì)算班級(jí)每個(gè)學(xué)生的最好成績(jī),輸入文件路徑為/user/test/input
,請(qǐng)將計(jì)算后的結(jié)果輸出到/user/test/output/
目錄下。
輸入文件的數(shù)據(jù)格式如下: 張三
12
李四
13
張三
89
李四
92
...
依照如上格式你應(yīng)該輸出:
張三
89
李四
92
相關(guān)代碼:
1. //首先在命令行啟動(dòng) hadoop: start-all.sh
2. import java.io.IOException;
3. import java.util.StringTokenizer;
4.
5. import java.io.IOException;
6. import java.util.StringTokenizer;
7. import org.apache.hadoop.conf.Configuration;
8. import org.apache.hadoop.fs.Path;
9. import org.apache.hadoop.io.*;
10. import org.apache.hadoop.io.Text;
11. import org.apache.hadoop.mapreduce.Job;
12. import org.apache.hadoop.mapreduce.Mapper;
13. import org.apache.hadoop.mapreduce.Reducer;
14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16. import org.apache.hadoop.util.GenericOptionsParser;
17.
18. public class WordCount {
19. /********** Begin **********/
20. public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
21. private final static IntWritable one = new IntWritable(1);
22. private Text word = new Text();
23. private int maxValue = 0;
24. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
25. StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
26. while (itr.hasMoreTokens()) {
27. String[] str = itr.nextToken().split(" ");
28. String name = str[0];
29. one.set(Integer.parseInt(str[1]));
30. word.set(name);
31. context.write(word,one);
32. }
33. //context.write(word,one);
34. }
35. }
36. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
37. private IntWritable result = new IntWritable();
38. public void reduce(Text key, Iterable<IntWritable> values, Context context)
39. throws IOException, InterruptedException {
40. int maxAge = 0;
41. int age = 0;
42. for (IntWritable intWritable : values) {
43. maxAge = Math.max(maxAge, intWritable.get());
44. }
45. result.set(maxAge);
46. context.write(key, result);
47. }
48. }
49. public static void main(String[] args) throws Exception {
50. Configuration conf = new Configuration();
51. Job job = new Job(conf, "word count");
52. job.setJarByClass(WordCount.class);
53. job.setMapperClass(TokenizerMapper.class);
54. job.setCombinerClass(IntSumReducer.class);
55. job.setReducerClass(IntSumReducer.class);
56. job.setOutputKeyClass(Text.class);
57. job.setOutputValueClass(IntWritable.class);
58. String inputfile = "/user/test/input";
59. String outputFile = "/user/test/output/";
60. FileInputFormat.addInputPath(job, new Path(inputfile));
61. FileOutputFormat.setOutputPath(job, new Path(outputFile));
62. job.waitForCompletion(true);
63. /********** End **********/
64. }
65. }
(二)使用Map/Reduce編程實(shí)現(xiàn)文件合并和去重操作
map類
首先我們來(lái)看看Mapper
對(duì)象:
在編寫(xiě)MapReduce
程序時(shí),要編寫(xiě)一個(gè)類繼承Mapper
類,這個(gè)Mapper
類是一個(gè)泛型類型,它有四個(gè)形參類型,分別指定了map()
函數(shù)的輸入鍵,輸入值,和輸出鍵,輸出值的類型。就第一關(guān)的例子來(lái)說(shuō),輸入鍵是一個(gè)長(zhǎng)整型,輸入值是一行文本,輸出鍵是單詞,輸出值是單詞出現(xiàn)的次數(shù)。
Hadoop
提供了一套可優(yōu)化網(wǎng)絡(luò)序列化傳輸?shù)幕绢愋?,而不是直接使?/span>Java
內(nèi)嵌的類型。這些類型都在org.apache.hadoop.io
包中,這里使用LongWritable
(相當(dāng)于Java
中的Long
類型),Text
類型(相當(dāng)于Java
中的String
類型)和IntWritable
(相當(dāng)于Integer
類型)。
map()
函數(shù)的輸入是一個(gè)鍵和一個(gè)值,我們一般首先將包含有一行輸入的text
值轉(zhuǎn)換成Java
的String
類型,然后再使用對(duì)字符串操作的類或者其他方法進(jìn)行操作即可。
Reducer類
同樣Reducer
也有四個(gè)參數(shù)類型用于指定輸入和輸出類型,reduce()
函數(shù)的輸入類型必須匹配map
函數(shù)的輸出類型,即Text
類型和IntWritable
類型,在這種情況下,reduce
函數(shù)的輸出類型也必須是Text
和IntWritable
類型,即分別輸出單詞和次數(shù)。
Job類
一般我們用Job
對(duì)象來(lái)運(yùn)行MapReduce
作業(yè),Job
對(duì)象用于指定作業(yè)執(zhí)行規(guī)范,我們可以用它來(lái)控制整個(gè)作業(yè)的運(yùn)行,我們?cè)?/span>Hadoop
集群上運(yùn)行這個(gè)作業(yè)時(shí),要把代碼打包成一個(gè)JAR
文件(Hadoop
在集群上發(fā)布的這個(gè)文件),不用明確指定JAR
文件的名稱,在Job
對(duì)象的setJarByClass()
函數(shù)中傳入一個(gè)類即可,Hadoop
利用這個(gè)類來(lái)查找包含他的JAR
文件。addInputPath()
函數(shù)和setOutputPath()
函數(shù)用來(lái)指定作業(yè)的輸入路徑和輸出路徑。值的注意的是,輸出路徑在執(zhí)行程序之前不能存在,否則Hadoop
會(huì)拒絕執(zhí)行你的代碼。
最后我們使用waitForCompletion()
方法提交代碼并等待執(zhí)行,該方法唯一的參數(shù)是一個(gè)布爾類型的值,當(dāng)該值為true
時(shí),作業(yè)會(huì)把執(zhí)行過(guò)程打印到控制臺(tái),該方法也會(huì)返回一個(gè)布爾值,表示執(zhí)行的成敗。
編程要求
對(duì)于兩個(gè)輸入文件,即文件file1
和文件file2
,請(qǐng)編寫(xiě)MapReduce
程序,對(duì)兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件file3
。 為了完成文件合并去重的任務(wù),你編寫(xiě)的程序要能將含有重復(fù)內(nèi)容的不同文件合并到一個(gè)沒(méi)有重復(fù)的整合文件,規(guī)則如下:
- 第一列按學(xué)號(hào)排列;
- 學(xué)號(hào)相同,按
x,y,z
排列; - 輸入文件路徑為:
/user/tmp/input/
; - 輸出路徑為:
/user/tmp/output/
。
程序會(huì)對(duì)你編寫(xiě)的代碼進(jìn)行測(cè)試: 輸入已經(jīng)指定了測(cè)試文本數(shù)據(jù):需要你的程序輸出合并去重后的結(jié)果。 下面是輸入文件和輸出文件的一個(gè)樣例供參考。
輸入文件file1
的樣例如下: 20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
輸入文件file2
的樣例如下: 20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根據(jù)輸入文件file1
和file2
合并得到的輸出文件file3
的樣例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
相關(guān)代碼:
1. import java.io.IOException;
2.
3.
4. import java.util.*;
5. import org.apache.hadoop.conf.Configuration;
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.io.*;
8. import org.apache.hadoop.mapreduce.Job;
9. import org.apache.hadoop.mapreduce.Mapper;
10. import org.apache.hadoop.mapreduce.Reducer;
11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13. import org.apache.hadoop.util.GenericOptionsParser;
14. public class Merge {
15.
16. /**
17. * @param args
18. * 對(duì)A,B兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件C
19. */
20. //在這重載map函數(shù),直接將輸入中的value復(fù)制到輸出數(shù)據(jù)的key上 注意在map方法中要拋出異常:throws IOException,InterruptedException
21. /********** Begin **********/
22. public static class Map extends Mapper<LongWritable, Text, Text, Text >
23. {
24. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
25. throws IOException, InterruptedException {
26. String str = value.toString();
27. String[] data = str.split(" ");
28. Text t1= new Text(data[0]);
29. Text t2 = new Text(data[1]);
30. context.write(t1,t2);
31. }
32. }
33. /********** End **********/
34.
35. //在這重載reduce函數(shù),直接將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上 注意在reduce方法上要拋出異常:throws IOException,InterruptedException
36. /********** Begin **********/
37. public static class Reduce extends Reducer<Text, Text, Text, Text>
38. {
39. protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
40. throws IOException, InterruptedException {
41. List<String> list = new ArrayList<>();
42. for (Text text : values) {
43. String str = text.toString();
44. if(!list.contains(str)){
45. list.add(str);
46. }
47. }
48. Collections.sort(list);
49. for (String text : list) {
50. context.write(key, new Text(text));
51. }
52. }
53. /********** End **********/
54. }
55.
56. public static void main(String[] args) throws Exception{
57. Configuration conf = new Configuration();
58. Job job = new Job(conf, "word count");
59. job.setJarByClass(Merge.class);
60. job.setMapperClass(Map.class);
61. job.setCombinerClass(Reduce.class);
62. job.setReducerClass(Reduce.class);
63. job.setOutputKeyClass(Text.class);
64. job.setOutputValueClass(Text.class);
65. String inputPath = "/user/tmp/input/"; //在這里設(shè)置輸入路徑
66. String outputPath = "/user/tmp/output/"; //在這里設(shè)置輸出路徑
67. FileInputFormat.addInputPath(job, new Path(inputPath));
68. FileOutputFormat.setOutputPath(job, new Path(outputPath));
69. System.exit(job.waitForCompletion(true) ? 0 : 1);
70. }
71. }
(三)對(duì)給定的表格進(jìn)行信息挖掘
編程要求
你編寫(xiě)的程序要能挖掘父子輩關(guān)系,給出祖孫輩關(guān)系的表格。規(guī)則如下:
- 孫子在前,祖父在后;
- 輸入文件路徑:
/user/reduce/input
; - 輸出文件路徑:
/user/reduce/output
。
輸入文件內(nèi)容如下: child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內(nèi)容如下:
grand_child??? grand_parent
Mark??? Jesse
Mark??? Alice
Philip??? Jesse
Philip??? Alice
Jone??? Jesse
Jone??? Alice
Steven??? Jesse
Steven??? Alice
Steven??? Frank
Steven??? Mary
Jone??? Frank
Jone??? Mary
相關(guān)代碼:
1. import java.io.IOException;
2. import java.util.*;
3.
4. import org.apache.hadoop.conf.Configuration;
5. import org.apache.hadoop.fs.Path;
6. import org.apache.hadoop.io.IntWritable;
7. import org.apache.hadoop.io.Text;
8. import org.apache.hadoop.mapreduce.Job;
9. import org.apache.hadoop.mapreduce.Mapper;
10. import org.apache.hadoop.mapreduce.Reducer;
11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13. import org.apache.hadoop.util.GenericOptionsParser;
14.
15. public class simple_data_mining {
16. public static int time = 0;
17.
18. /**
19. * @param args
20. * 輸入一個(gè)child-parent的表格
21. * 輸出一個(gè)體現(xiàn)grandchild-grandparent關(guān)系的表格
22. */
23. //Map將輸入文件按照空格分割成child和parent,然后正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表區(qū)別標(biāo)志
24. public static class Map extends Mapper<Object, Text, Text, Text>{
25. public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
26. /********** Begin **********/
27.
28.
29. String child_name = new String();
30. String parent_name = new String();
31. String relation_type = new String();
32. String line = value.toString();
33. int i = 0;
34. while(line.charAt(i) != ' '){
35. i++;
36. }
37. String[] values = {line.substring(0,i),line.substring(i+1)};
38. if(values[0].compareTo("child") != 0){
39. child_name = values[0];
40. parent_name = values[1];
41. relation_type = "1";//左右表區(qū)分標(biāo)志
42. context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
43. //左表
44. relation_type = "2";
45. context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
46. //右表
47. }
48.
49.
50.
51.
52.
53.
54. /********** End **********/
55. }
56. }
57.
58. public static class Reduce extends Reducer<Text, Text, Text, Text>{
59. public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
60. /********** Begin **********/
61. if(time == 0){ //輸出表頭
62. context.write(new Text("grand_child"), new Text("grand_parent"));
63. time++;
64. }
65. int grand_child_num = 0;
66. String grand_child[] = new String[10];
67. int grand_parent_num = 0;
68. String grand_parent[]= new String[10];
69. Iterator ite = values.iterator();
70. while(ite.hasNext()){
71. String record = ite.next().toString();
72. int len = record.length();
73. int i = 2;
74. if(len == 0) continue;
75. char relation_type = record.charAt(0);
76. String child_name = new String();
77. String parent_name = new String();
78. //獲取value-list中value的child
79. while(record.charAt(i) != '+'){
80. child_name = child_name + record.charAt(i);
81. i++;
82. }
83. i=i+1;
84. //獲取value-list中value的parent
85. while(i<len){
86. parent_name = parent_name+record.charAt(i);
87. i++;
88. }
89. //左表,取出child放入grand_child
90. if(relation_type == '1'){
91. grand_child[grand_child_num] = child_name;
92. grand_child_num++;
93. }
94. else{//右表,取出parent放入grand_parent
95. grand_parent[grand_parent_num] = parent_name;
96. grand_parent_num++;
97. }
98. }
99. if(grand_parent_num != 0 && grand_child_num != 0 ){
100. for(int m = 0;m<grand_child_num;m++){
101. for(int n=0;n<grand_parent_num;n++){
102. context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
103. //輸出結(jié)果
104. }
105. }
106. }
107.
108. /********** End **********/
109.
110. }
111. }
112. public static void main(String[] args) throws Exception{
113. // TODO Auto-generated method stub
114. Configuration conf = new Configuration();
115. Job job = Job.getInstance(conf,"Single table join");
116. job.setJarByClass(simple_data_mining.class);
117. job.setMapperClass(Map.class);
118. job.setReducerClass(Reduce.class);
119. job.setOutputKeyClass(Text.class);
120. job.setOutputValueClass(Text.class);
121. String inputPath = "/user/reduce/input"; //設(shè)置輸入路徑
122. String outputPath = "/user/reduce/output"; //設(shè)置輸出路徑
123. FileInputFormat.addInputPath(job, new Path(inputPath));
124. FileOutputFormat.setOutputPath(job, new Path(outputPath));
125. System.exit(job.waitForCompletion(true) ? 0 : 1);
126.
127. }
128.
129. }
四、實(shí)驗(yàn)心得?
掌握了什么是MapReduce及使用MapReduce進(jìn)行運(yùn)算文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-423540.html
掌握了挖掘父子輩關(guān)系,給出祖孫輩關(guān)系的表格文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-423540.html
到了這里,關(guān)于云計(jì)算與大數(shù)據(jù)實(shí)驗(yàn)五 MapReduce編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!