鏈接: 大數(shù)據(jù)技術(shù)原理與應(yīng)用實(shí)驗(yàn)1——熟悉常用的HDFS操作
鏈接: 大數(shù)據(jù)技術(shù)原理與應(yīng)用實(shí)驗(yàn)2——熟悉常用的Hbase操作
鏈接: 大數(shù)據(jù)技術(shù)原理與應(yīng)用實(shí)驗(yàn)3——NoSQL和關(guān)系數(shù)據(jù)庫(kù)的操作比較
一、實(shí)驗(yàn)?zāi)康?/h2>
(1)通過(guò)實(shí)驗(yàn)掌握基本的MapReduce編程方法;
(2)掌握用MapReduce解決一些常見(jiàn)的數(shù)據(jù)處理問(wèn)題,包括數(shù)據(jù)去重、數(shù)據(jù)排序和數(shù)據(jù)挖掘等。
二、實(shí)驗(yàn)環(huán)境
(1)Linux操作系統(tǒng)(CentOS7.5)
(2)VMware Workstation Pro 15.5
(3)遠(yuǎn)程終端工具Xshell7
(4)Xftp7傳輸工具;
(5)Hadoop版本:3.1.3;
(6)HBase版本:2.2.2;
(7)JDK版本:1.8;
(8)Java IDE:Idea;
(9)MySQL版本:5.7;
三、實(shí)驗(yàn)內(nèi)容
(一)編程實(shí)現(xiàn)文件合并和去重操作
1. 具體內(nèi)容
對(duì)于兩個(gè)輸入文件,即文件A和文件B,請(qǐng)編寫(xiě)MapReduce程序,對(duì)兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件C。下面是輸入文件和輸出文件的一個(gè)樣例供參考。
輸入文件A的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 x
輸入文件B的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據(jù)輸入文件A和B合并得到的輸出文件C的樣例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x
2. 操作過(guò)程
1.啟動(dòng) hadoop:
2. 需要首先刪除HDFS中與當(dāng)前Linux用戶hadoop對(duì)應(yīng)的input和output目錄(即HDFS中的“/opt/module/hadoop-3.1.3/input”和“/opt/module/hadoop-3.1.3/output”目錄),這樣確保后面程序運(yùn)行不會(huì)出現(xiàn)問(wèn)題
cd /opt/module/hadoop-3.1.3/
./bin/hdfs dfs -rm -r input
./bin/hdfs dfs -rm -r output
3. 再在HDFS中新建與當(dāng)前Linux用戶hadoop對(duì)應(yīng)的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
./bin/hdfs dfs -mkdir /input/test1/
創(chuàng)建A.txt B.txt,輸入上述內(nèi)容
vim A.txt
vim B.txt
4. 將A,B上傳到HDFS中
./bin/hdfs dfs -put ./A.txt /input/test1/
./bin/hdfs dfs -put ./B.txt /input/test1/
3. 實(shí)驗(yàn)代碼
package com.xusheng.mapreduce.shiyan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Merge {
/**
* @param xusheng
* 對(duì)A,B兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件C
*/
//重載map函數(shù),直接將輸入中的value復(fù)制到輸出數(shù)據(jù)的key上
public static class Map extends Mapper<Object, Text, Text, Text>{
private static Text text = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
text = value;
context.write(text, new Text(""));
}
}
//重載reduce函數(shù),直接將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//conf.set("fs.default.name","hdfs://localhost:9000");
conf.set("fs.defaultFS","hdfs://hadoop102:8020");
String[] otherArgs = new String[]{"/input/test1","/output/test1"}; //* 直接設(shè)置輸入?yún)?shù) *//*
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"Merge and duplicate removal");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4. 運(yùn)行結(jié)果
./bin/hdfs dfs -cat /output/test1/*
(二)編寫(xiě)程序?qū)崿F(xiàn)對(duì)輸入文件的排序
1. 具體內(nèi)容
現(xiàn)在有多個(gè)輸入文件,每個(gè)文件中的每行內(nèi)容均為一個(gè)整數(shù)。要求讀取所有文件中的整數(shù),進(jìn)行升序排序后,輸出到一個(gè)新的文件中,輸出的數(shù)據(jù)格式為每行兩個(gè)整數(shù),第一個(gè)數(shù)字為第二個(gè)整數(shù)的排序位次,第二個(gè)整數(shù)為原待排列的整數(shù)。下面是輸入文件和輸出文件的一個(gè)樣例供參考。
輸入文件1的樣例如下:
33
37
12
40
輸入文件2的樣例如下:
4
16
39
5
輸入文件3的樣例如下:
1
45
25
根據(jù)輸入文件1、2和3得到的輸出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
2. 操作過(guò)程
1.創(chuàng)建1.txt ,2.txt ,3.txt,輸入上述內(nèi)容
再在HDFS中新建與當(dāng)前Linux用戶hadoop對(duì)應(yīng)的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
./bin/hdfs dfs -mkdir /input/test2/
vim 1.txt
vim 2.txt
vim 3.txt
2.將1.txt ,2.txt ,3.txt上傳到HDFS中
./bin/hdfs dfs -put ./1.txt /input/test2/
./bin/hdfs dfs -put ./2.txt /input/test2/
./bin/hdfs dfs -put ./3.txt /input/test2/
3. 實(shí)驗(yàn)代碼
package com.xusheng.mapreduce.shiyan;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MergeSort {
/**
* @param xusheng
* 輸入多個(gè)文件,每個(gè)文件中的每行內(nèi)容均為一個(gè)整數(shù)
* 輸出到一個(gè)新的文件中,輸出的數(shù)據(jù)格式為每行兩個(gè)整數(shù),第一個(gè)數(shù)字為第二個(gè)整數(shù)的排序位次,第二個(gè)整數(shù)為原待排列的整數(shù)
*/
//map函數(shù)讀取輸入中的value,將其轉(zhuǎn)化成IntWritable類型,最后作為輸出key
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
String text = value.toString();
data.set(Integer.parseInt(text));
context.write(data, new IntWritable(1));
}
}
//reduce函數(shù)將map輸入的key復(fù)制到輸出的value上,然后根據(jù)輸入的value-list中元素的個(gè)數(shù)決定key的輸出次數(shù),定義一個(gè)全局變量line_num來(lái)代表key的位次
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
for(IntWritable val : values){
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
}
}
}
//自定義Partition函數(shù),此函數(shù)根據(jù)輸入數(shù)據(jù)的最大值和MapReduce框架中Partition的數(shù)量獲取將輸入數(shù)據(jù)按照大小分塊的邊界,然后根據(jù)輸入數(shù)值和邊界的關(guān)系返回對(duì)應(yīng)的Partiton ID
public static class Partition extends Partitioner<IntWritable, IntWritable>{
public int getPartition(IntWritable key, IntWritable value, int num_Partition){
int Maxnumber = 65223;//int型的最大數(shù)值
int bound = Maxnumber/num_Partition+1;
int keynumber = key.get();
for (int i = 0; i<num_Partition; i++){
if(keynumber<bound * (i+1) && keynumber>=bound * i){
return i;
}
}
return -1;
}
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//conf.set("fs.default.name","hdfs://localhost:9000");
conf.set("fs.defaultFS","hdfs://hadoop102:8020");
String[] otherArgs = new String[]{"/input/test2","/output/test2"}; /* 直接設(shè)置輸入?yún)?shù) */
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"Merge and sort");//實(shí)例化Merge類
job.setJarByClass(MergeSort.class);//設(shè)置主類名
job.setMapperClass(Map.class);//指定使用上述代碼自定義的Map類
job.setReducerClass(Reduce.class);//指定使用上述代碼自定義的Reduce類
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);//設(shè)定Reduce類輸出的<K,V>,V類型
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//添加輸入文件位置
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//設(shè)置輸出結(jié)果文件位置
System.exit(job.waitForCompletion(true) ? 0 : 1);//提交任務(wù)并監(jiān)控任務(wù)狀態(tài)
}
}
4. 運(yùn)行結(jié)果
./bin/hdfs dfs -cat /output/test2/*
(三)對(duì)給定的表格進(jìn)行信息挖掘
1. 具體內(nèi)容
下面給出一個(gè)child-parent的表格,要求挖掘其中的父子輩關(guān)系,給出祖孫輩關(guān)系的表格。
輸入文件內(nèi)容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內(nèi)容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
2. 操作過(guò)程
1.創(chuàng)建child.txt,輸入上述內(nèi)容
再在HDFS中新建與當(dāng)前Linux用戶hadoop對(duì)應(yīng)的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
./bin/hdfs dfs -mkdir /input/test3/
vim child.txt
2. 將child.txt上傳到HDFS中
./bin/hdfs dfs -put ./ child.txt /input/test3/
3. 實(shí)驗(yàn)代碼
package com.xusheng.mapreduce.shiyan;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class simple_data_mining {
public static int time = 0;
/**
* @param xusheng
* 輸入一個(gè)child-parent的表格
* 輸出一個(gè)體現(xiàn)grandchild-grandparent關(guān)系的表格
*/
//Map將輸入文件按照空格分割成child和parent,然后正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表區(qū)別標(biāo)志
public static class Map extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
String child_name = new String();
String parent_name = new String();
String relation_type = new String();
String line = value.toString();
int i = 0;
while(line.charAt(i) != ' '){
i++;
}
String[] values = {line.substring(0,i),line.substring(i+1)};
if(values[0].compareTo("child") != 0){
child_name = values[0];
parent_name = values[1];
relation_type = "1";//左右表區(qū)分標(biāo)志
context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
//左表
relation_type = "2";
context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
//右表
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
if(time == 0){ //輸出表頭
context.write(new Text("grand_child"), new Text("grand_parent"));
time++;
}
int grand_child_num = 0;
String grand_child[] = new String[10];
int grand_parent_num = 0;
String grand_parent[]= new String[10];
Iterator ite = values.iterator();
while(ite.hasNext()){
String record = ite.next().toString();
int len = record.length();
int i = 2;
if(len == 0) continue;
char relation_type = record.charAt(0);
String child_name = new String();
String parent_name = new String();
//獲取value-list中value的child
while(record.charAt(i) != '+'){
child_name = child_name + record.charAt(i);
i++;
}
i=i+1;
//獲取value-list中value的parent
while(i<len){
parent_name = parent_name+record.charAt(i);
i++;
}
//左表,取出child放入grand_child
if(relation_type == '1'){
grand_child[grand_child_num] = child_name;
grand_child_num++;
}
else{//右表,取出parent放入grand_parent
grand_parent[grand_parent_num] = parent_name;
grand_parent_num++;
}
}
if(grand_parent_num != 0 && grand_child_num != 0 ){
for(int m = 0;m<grand_child_num;m++){
for(int n=0;n<grand_parent_num;n++){
context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
//輸出結(jié)果
}
}
}
}
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//conf.set("fs.default.name","hdfs://localhost:9000");
conf.set("fs.default.name","hdfs://hadoop102:8020");
String[] otherArgs = new String[]{"/input/test3","/output/test3"}; /* 直接設(shè)置輸入?yún)?shù) */
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"Single table join");
job.setJarByClass(simple_data_mining.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4. 運(yùn)行結(jié)果
./bin/hdfs dfs -cat /output/test3/*
四、實(shí)驗(yàn)總結(jié)
三個(gè)實(shí)驗(yàn)的思路:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-455655.html
(1)編程實(shí)現(xiàn)文件合并和去重操作
本道題主要目的是去重,我在編寫(xiě)的時(shí)候的思路就是 通過(guò)map函數(shù)讀取 key,value 因?yàn)槲业哪康氖侨ブ?所以在這里我完全可以把整個(gè)數(shù)據(jù)作為一個(gè)key,而value我可以不管他 而reduce會(huì)接受到的是<key,value-list>形式的數(shù)據(jù),我們只需要輸出他接受到的key就可以了,因?yàn)橹貜?fù)的值體現(xiàn)在value-list里面,而key是位移的.
可以從python字典的角度來(lái)理解,我們把文檔的每一行作為字典的鍵,出現(xiàn)的次數(shù)作為值,最終我們循環(huán)輸出所有的鍵
(2)編寫(xiě)程序?qū)崿F(xiàn)對(duì)輸入文件的排序
因?yàn)镸R自帶排序,所以我們只要把輸入的數(shù)字以int的形式交給map map將這個(gè)數(shù)字作為key輸出,而rerduce函數(shù)將map輸入的key復(fù)制到輸出的value上即可,因?yàn)橐斎肱判虻男蛱?hào),所以再定義個(gè)變量用來(lái)記錄輸出數(shù)字的排序即可
(3)對(duì)給定的表格進(jìn)行信息挖掘
本題其實(shí)相當(dāng)于一個(gè)表的自身join,但是我們需要轉(zhuǎn)化一下,輸入的文件只是child和parent ,將他正序輸出一次作為右表,反序輸出一次作為左表,這樣就可以完成child parent grand三個(gè)字段的兩張表操作,輸出的時(shí)候加上兩張表的標(biāo)識(shí)來(lái)區(qū)分 reduce函數(shù)則用來(lái)取出左表中的child 即為grandchild 再取出右表的parent相當(dāng)于grandparent即可。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-455655.html
到了這里,關(guān)于大數(shù)據(jù)技術(shù)原理與應(yīng)用實(shí)驗(yàn)4——MapReduce初級(jí)編程實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!