實(shí)驗(yàn)環(huán)境
- ubuntu18.04虛擬機(jī)和一個(gè)win10物理主機(jī)
- 編程環(huán)境 IDEA
- 虛擬機(jī)ip:192.168.1.108
- JDK:1.8
實(shí)驗(yàn)內(nèi)容
使用Java編程一個(gè)WordCount程序,并將該程序打包成Jar包在虛擬機(jī)內(nèi)執(zhí)行
首先使用IDEA創(chuàng)建一個(gè)Maven項(xiàng)目
在pom.xml文件內(nèi)引入依賴和打包為Jar包的插件:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>2.4.11</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.4.11</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.MyProgramDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
編寫對(duì)應(yīng)的程序:
MyProgramDriver類用于執(zhí)行程序入口:
import org.apache.hadoop.util.ProgramDriver;
public class MyProgramDriver {
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver programDriver = new ProgramDriver();
try {
programDriver.addClass("com.WordCount", WordCount.class, "com.WordCount Program");
exitCode = programDriver.run(args);
} catch (Throwable e) {
throw new RuntimeException(e);
}
System.exit(exitCode);
}
}
;
WordCount程序:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator var5 = values.iterator(); var5.hasNext(); sum += val.get()) {
val = (IntWritable)var5.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
}
;
項(xiàng)目結(jié)構(gòu)截圖:
在右側(cè)點(diǎn)擊maven的package進(jìn)行項(xiàng)目打包為Jar文件
打包完成后的打包文件在target目錄下
之后將打包好的Jar包發(fā)送到虛擬機(jī)內(nèi),我是放在/root/hadoop/a_dir目錄下,放在哪隨意,但自己要知道在哪
;
然后編寫輸入文件input1和input2,內(nèi)容分別為:
然后將兩個(gè)文件上傳到hadoop的系統(tǒng)路徑,這里我放在了hadoop的/root/input目錄下,注意不是物理路徑,是Hadoop啟動(dòng)后的網(wǎng)絡(luò)路徑
;
之后執(zhí)行程序:
bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar com.WordCount /root/input/* /root/out
其中a_dir/MyMapReduce-1.0-SNAPSHOT.jar是需要執(zhí)行的Jar包的路徑,com.WordCount是需要執(zhí)行的WordCount程序名稱,這個(gè)名稱就是在MyProgramDriver內(nèi)注明的名稱
/root/input/* 是輸入的文件, /root/out是輸出路徑
;
查看輸出:
編程實(shí)現(xiàn)文件合并和去重操作
輸入樣例:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
主要思想:使用map將文件的每一行使用正則拆分為key,value ,如將20150101 x拆分后的key為20150101,value為x,類型為Text類型,將map處理后的由shuffle處理送往reduce進(jìn)行處理,在reduce內(nèi)使用HashSet的去重特性(在HashSet內(nèi)的元素不重復(fù))對(duì)輸入的值進(jìn)行去重。
;
Merge程序代碼:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
public class Merge {
public Merge() {
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: merge <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "merge");
job.setJarByClass(Merge.class);
job.setMapperClass(Merge.MyMapper.class);
job.setCombinerClass(Merge.MyReduce.class);
job.setReducerClass(Merge.MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class MyMapper extends Mapper<Object, Text, Text, Text> {
public MyMapper() {
}
@Override
public void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String line = value.toString();
//匹配空白符
String[] split = line.split("\\s+");
if (split.length <= 1) {
return;
}
context.write(new Text(split[0]), new Text(split[1]));
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
public MyReduce() {
}
@Override
public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
//使用HashSet進(jìn)行去重操作
HashSet<String> hashSet = new HashSet<>();
Iterator<Text> iterator = values.iterator();
while (iterator.hasNext()) {
hashSet.add(iterator.next().toString());
}
Iterator<String> hashIt = hashSet.iterator();
while (hashIt.hasNext()) {
Text val = new Text(hashIt.next());
context.write(key, val);
}
}
}
}
將Merge程序?qū)懭隡yProgramDriver類:
import org.apache.hadoop.util.ProgramDriver;
public class MyProgramDriver {
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver programDriver = new ProgramDriver();
try {
programDriver.addClass("com.WordCount", WordCount.class, "com.WordCount Program");
programDriver.addClass("Merge", Merge.class, "xll");
exitCode = programDriver.run(args);
} catch (Throwable e) {
throw new RuntimeException(e);
}
System.exit(exitCode);
}
}
將程序打包后發(fā)送到虛擬機(jī),運(yùn)行程序:
bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar Merge /root/input/* /root/out
運(yùn)行結(jié)果:
編程實(shí)現(xiàn)對(duì)輸入文件的排序
思路:在Map端將數(shù)值分離出來形成<key,1>這樣的鍵值對(duì),由于排序是MapReduce的默認(rèn)操作,所以在Reduce端只需要將Map端分離出來的值進(jìn)行輸出就行,將Map端的key值設(shè)置為Reduce端的value值。
MyConf類代碼:
這里我將一般需要進(jìn)行的配置提取出來了,減少以后一下代碼的重復(fù)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class MyConf {
public static void setConf(Class mainClass,Class outKeyClass, Class outValueClass, String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("otherArgs length error, length < 2");
System.exit(2);
}
Job job = Job.getInstance(conf, mainClass.getName());
Class[] innerClass = mainClass.getClasses();
for (Class c : innerClass) {
if (c.getSimpleName().equals("MyReduce")) {
job.setReducerClass(c);
// job.setCombinerClass(c);
} else if (c.getSimpleName().equals("MyMapper")) {
job.setMapperClass(c);
}
}
job.setJarByClass(mainClass);
job.setOutputKeyClass(outKeyClass);
job.setOutputValueClass(outValueClass);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
;
Sort類:
import com.utils.MyConf;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Sort {
public Sort() {}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
MyConf.setConf(Sort.class, IntWritable.class, IntWritable.class, args);
}
public static class MyMapper extends Mapper<Object, org.apache.hadoop.io.Text, IntWritable, IntWritable> {
@Override
protected void map(Object key, Text value, Mapper<Object, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException {
String var = value.toString();
context.write(new IntWritable(Integer.parseInt(var.trim())), new IntWritable(1));
}
}
public static class MyReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
static int sort = 1;
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException {
for (IntWritable va : values) {
context.write(new IntWritable(sort), key);
sort++;
}
}
}
}
然后再將Sort類注入MyProgramDriver類就可以了
程序的輸入:
打包后放在虛擬機(jī)運(yùn)行
bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar Sort /root/input* /root/out5
運(yùn)行結(jié)果:
對(duì)給定的表格進(jìn)行信息挖掘
思路:(參考),舉個(gè)例子:
steven lucy
lucy mary
這個(gè)輸入在經(jīng)過map(map的具體邏輯參考下面的代碼)出來后得到輸出:
<steven,old#lucy>,<lucy,young#steven>,<lucy,old#mary>,<mary,young#lucy>,
之后經(jīng)過shuffle處理之后得到輸入:
<steven,old#lucy>,<lucy,<young#steven,old#mary>>,<mary,young#lucy>,
之后每個(gè)鍵值對(duì)作為Reduce端的輸入
<lucy,<young#steven,old#mary>>鍵值對(duì)在經(jīng)過reduce的邏輯處理后得到一個(gè)有效輸出:
<steven, mary>
InfoFind類:
package com;
import com.utils.MyConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
public class InfoFind {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
MyConf.setConf(InfoFind.class, Text.class, Text.class, args);
}
public static class MyMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] splStr = value.toString().split("\\s+");
String child = splStr[0];
String parent = splStr[1];
if (child.equals("child") && parent.equals("parent"))
return;
context.write(new Text(child), new Text("old#" + parent));
context.write(new Text(parent), new Text("young#" + child));
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
private static boolean head = true ;
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
{
if(head)
{
context.write(new Text("grandchild"), new Text("grandparent"));
head = false;
}
ArrayList<String> grandchild = new ArrayList<>();
ArrayList<String> grandparent = new ArrayList<>();
String[] temp;
for(Text val:values)
{
temp = val.toString().split("#");
if(temp[0].equals("young"))
grandchild.add(temp[1]);
else
grandparent.add(temp[1]);
}
for(String gc:grandchild)
for(String gp:grandparent)
context.write(new Text(gc), new Text(gp));
}
}
}
輸入:
運(yùn)行:
bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar InfoFind /root/input/* /root/out6
輸出:文章來源:http://www.zghlxwxcb.cn/news/detail-425398.html
參考資料
https://blog.csdn.net/u013384984/article/details/80229459 (一個(gè)重點(diǎn)內(nèi)容)
https://blog.csdn.net/qq_43310845/article/details/123298811
https://blog.csdn.net/zhangwenbingbingyu/article/details/52210348
https://www.cnblogs.com/ginkgo-/p/13273671.html文章來源地址http://www.zghlxwxcb.cn/news/detail-425398.html
到了這里,關(guān)于MapReduce初級(jí)編程實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!