云計算與大數(shù)據(jù)入門實驗四 —— MapReduce 初級編程實踐
實驗目的
-
通過實驗掌握基本的 MapReduce 編程方法
-
掌握用 MapReduce 解決一些常見的數(shù)據(jù)處理問題,包括數(shù)據(jù)去重、數(shù)據(jù)排序和數(shù)據(jù)挖掘等
實驗內(nèi)容
(一)編程實現(xiàn)文件合并和去重操作
對于兩個輸入文件,即文件A和文件B,請編寫MapReduce程序,對兩個文件進行合并,并剔除其中重復的內(nèi)容,得到一個新的輸出文件C。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件A的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 x
輸入文件B的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據(jù)輸入文件A和B合并得到的輸出文件C的樣例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x
(二)編寫程序?qū)崿F(xiàn)對輸入文件的排序
現(xiàn)在有多個輸入文件,每個文件中的每行內(nèi)容均為一個整數(shù)。要求讀取所有文件中的整數(shù),進行升序排序后,輸出到一個新的文件中,輸出的數(shù)據(jù)格式為每行兩個整數(shù),第一個數(shù)字為第二個整數(shù)的排序位次,第二個整數(shù)為原待排列的整數(shù)。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件1的樣例如下:
33
37
12
40
輸入文件2的樣例如下:
4
16
39
5
輸入文件3的樣例如下:
1
45
25
根據(jù)輸入文件1、2和3得到的輸出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
(三)對給定的表格進行信息挖掘
下面給出一個child-parent的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格。
輸入文件內(nèi)容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內(nèi)容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
實驗步驟
- 編程實現(xiàn)文件合并和去重操作
對于兩個輸入文件,即文件 A 和文件 B,請編寫 MapReduce 程序,對兩個文件進行合并,并剔除其中重復的內(nèi)容,得到一個新的輸出文件 C
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Main {
/**
// * @param args 對 A,B 兩個文件進行合并,并剔除其中重復的內(nèi)容,得到一個新的輸出文件 C
*/
//重載 map 函數(shù),直接將輸入中的 value 復制到輸出數(shù)據(jù)的 key 上
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text text = new Text();
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
text = value;
context.write(text, new Text(""));
}
}
//重載 reduce 函數(shù),直接將輸入中的 key 復制到輸出數(shù)據(jù)的 key 上
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:8088");
String[] otherArgs = new String[]{"input", "output"}; /* 直接設置輸入?yún)?shù)
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Merge and duplicate removal");
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
編寫程序?qū)崿F(xiàn)對輸入文件的排序
現(xiàn)在有多個輸入文件,每個文件中的每行內(nèi)容均為一個整數(shù)。要求讀取所有文件中的整數(shù),進行升序排序后,輸出到一個新的文件中,輸出的數(shù)據(jù)格式為每行兩個整數(shù),第一個數(shù)字為第二個整數(shù)的排序位次,第二個整數(shù)為原待排列的整數(shù)。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Main {
/**
// * @param args
* 輸入多個文件,每個文件中的每行內(nèi)容均為一個整數(shù)
* 輸出到一個新的文件中,輸出的數(shù)據(jù)格式為每行兩個整數(shù),第一個數(shù)字為第二個整
數(shù)的排序位次,第二個整數(shù)為原待排列的整數(shù)
*/
//map 函數(shù)讀取輸入中的 value,將其轉(zhuǎn)化成 IntWritable 類型,最后作為輸出 key
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws
IOException,InterruptedException{
String text = value.toString();
data.set(Integer.parseInt(text));
context.write(data, new IntWritable(1));
}
}
//reduce 函數(shù)將 map 輸入的 key 復制到輸出的 value 上,然后根據(jù)輸入的 value-list
// 中元素的個數(shù)決定 key 的輸出次數(shù),定義一個全局變量 line_num 來代表 key 的位次
public static class Reduce extends Reducer<IntWritable, IntWritable,
IntWritable, IntWritable>{
private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context
context) throws IOException,InterruptedException{
for(IntWritable val : values){
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
}
}
}
//自定義 Partition 函數(shù),此函數(shù)根據(jù)輸入數(shù)據(jù)的最大值和 MapReduce 框架中
// Partition 的數(shù)量獲取將輸入數(shù)據(jù)按照大小分塊的邊界,然后根據(jù)輸入數(shù)值和邊界的關系返
// 回對應的 Partiton ID
public static class Partition extends Partitioner<IntWritable, IntWritable>{
public int getPartition(IntWritable key, IntWritable value, int
num_Partition){
int Maxnumber = 65223;//int 型的最大數(shù)值
int bound = Maxnumber/num_Partition+1;
int keynumber = key.get();
for (int i = 0; i<num_Partition; i++){
if(keynumber<bound * (i+1) && keynumber>=bound * i){
return i;
}
}
return -1;
}
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:8088");
String[] otherArgs = new String[]{"input","output"}; /* 直接設置輸入?yún)?shù)
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"Merge and sort");
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
對給定的表格進行信息挖掘
下面給出一個 child-parent 的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格文章來源:http://www.zghlxwxcb.cn/news/detail-752427.html
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Main {
public static int time = 0;
/**
// * @param args
* 輸入一個 child-parent 的表格
* 輸出一個體現(xiàn) grandchild-grandparent 關系的表格
*/
//Map 將輸入文件按照空格分割成 child 和 parent,然后正序輸出一次作為右表,反序
// 輸出一次作為左表,
// 需要注意的是在輸出的 value
// 中必須加上左右表區(qū)別標志
public static class Map extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String child_name = new String();
String parent_name = new String();
String relation_type = new String();
String line = value.toString();
int i = 0;
while (line.charAt(i) != ' ') {
i++;
}
String[] values = {line.substring(0, i), line.substring(i + 1)};
if (values[0].compareTo("child") != 0) {
child_name = values[0];
parent_name = values[1];
relation_type = "1";//左右表區(qū)分標志
context.write(new Text(values[1]), new
Text(relation_type + "+" + child_name + "+" + parent_name));
//左表
relation_type = "2";
context.write(new Text(values[0]), new
Text(relation_type + "+" + child_name + "+" + parent_name));
//右表
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
if (time == 0) { //輸出表頭
context.write(new Text("grand_child"), new
Text("grand_parent"));
time++;
}
int grand_child_num = 0;
String grand_child[] = new String[10];
int grand_parent_num = 0;
String grand_parent[] = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (len == 0) continue;
char relation_type = record.charAt(0);
String child_name = new String();
String parent_name = new String();
//獲取 value-list 中 value 的 child
while (record.charAt(i) != '+') {
child_name = child_name + record.charAt(i);
i++;
}
i = i + 1;
//獲取 value-list 中 value 的 parent
while (i < len) {
parent_name = parent_name + record.charAt(i);
i++;
}
//左表,取出 child 放入 grand_child
if (relation_type == '1') {
grand_child[grand_child_num] = child_name;
grand_child_num++;
} else {//右表,取出 parent 放入 grand_parent
grand_parent[grand_parent_num] = parent_name;
grand_parent_num++;
}
}
if (grand_parent_num != 0 && grand_child_num != 0) {
for (int m = 0; m < grand_child_num; m++) {
for (int n = 0; n < grand_parent_num; n++) {
context.write(new Text(grand_child[m]), new
Text(grand_parent[n]));
//輸出結(jié)果
}
}
}
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:8088");
String[] otherArgs = new String[]{"input", "output"}; /* 直接設置輸入?yún)?shù)
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Single table join");
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-752427.html
到了這里,關于云計算與大數(shù)據(jù)入門實驗四 —— MapReduce 初級編程實踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!