MapReduce是Hadoop系統(tǒng)核心組件之一,它是一種可用于大數(shù)據(jù)并行處理的計(jì)算模型、框架和平臺,主要解決海量數(shù)據(jù)的計(jì)算,是目前分布式計(jì)算模型中應(yīng)用較為廣泛的一種。
練習(xí):計(jì)算a.txt文件中每個(gè)單詞出現(xiàn)的次數(shù)
hello world
hello hadoop
hello 51doit
hadoop mapreduce
mapreduce spark
public class WordCount {
public static void main(String[] args) throws IOException {
//獲取到resource文件夾下a.txt的路徑
URL resource = WordCount.class.getClassLoader().getResource("a.txt");
String path = resource.getPath();
//使用FileUtils將文件讀取成字符串
String s = FileUtils.readFileToString(new File(path),"utf-8");
//將文件使用空格進(jìn)行切割 \s可以切割 空格 tab鍵
String[] arr = s.split("\\s+");
//創(chuàng)建Map集合
Map<String,Integer> map = new HashMap<>();
//遍歷數(shù)組
for (String s1 : arr) {
//判斷集合是否包含指定鍵
if(!map.containsKey(s1)){
//如果不包含 添加 單詞 1
map.put(s1,1);
}else{
//如果包含 獲取當(dāng)前鍵的次數(shù) +1 在添加回集合
Integer count = map.get(s1);
map.put(s1,count+1);
}
}
System.out.println(map);
}
}
通過以上的方式 計(jì)算出來了a.txt文件中每個(gè)單詞出現(xiàn)的次數(shù),但是我們想一下 ,如果a.txt文件非常大,怎么辦?
比如有一個(gè)a.txt文件10個(gè)T的大小。這時(shí)一臺計(jì)算機(jī)就沒有辦法計(jì)算了,因?yàn)槲覀兏敬鎯?chǔ)不了,計(jì)算不了,那么一臺計(jì)算機(jī)無法計(jì)算,就使用多臺計(jì)算機(jī)來進(jìn)行計(jì)算!
MapReduce核心思想
? MapReduce的核心思想是“分而治之”。所謂“分而治之”就是把一個(gè)復(fù)雜的問題,按照一定的“分解”方法分為等價(jià)的規(guī)模較小的若干部分,然后逐個(gè)解決,分別找出各部分的結(jié)果,把各部分的結(jié)果組成整個(gè)問題的結(jié)果,這種思想來源于日常生活與工作時(shí)的經(jīng)驗(yàn),同樣也完全適合技術(shù)領(lǐng)域。
為了更好地理解“分而治之”思想,我們光來舉一個(gè)生活的例子。例如,某大型公司在全國設(shè)立了分公司,假設(shè)現(xiàn)在要統(tǒng)計(jì)公司今年的營收情況制作年報(bào),有兩種統(tǒng)計(jì)方式,第1種方式是全國分公司將自己的賬單數(shù)據(jù)發(fā)送至總部,由總部統(tǒng)一計(jì)算公司今年的營收報(bào)表:第2種方式是采用分而治之的思想,也就是說,先要求分公司各自統(tǒng)計(jì)營收情況,再將統(tǒng)計(jì)結(jié)果發(fā)給總部進(jìn)行統(tǒng)一匯總計(jì)算。這兩種方式相比,顯然第2種方式的策略更好,工作效率更高效。
MapReduce 作為一種分布式計(jì)算模型,它主要用于解決海量數(shù)據(jù)的計(jì)算問題。使用MapReduce操作海量數(shù)據(jù)時(shí),每個(gè)MapReduce程序被初始化為一個(gè)工作任務(wù),每個(gè)工作任務(wù)可以分為Map 和l Reducc兩個(gè)階段,具體介紹如下:
Map階段::負(fù)責(zé)將任務(wù)分解,即把復(fù)雜的任務(wù)分解成若干個(gè)“簡單的任務(wù)”來行處理,但前提是這些任務(wù)沒有必然的依賴關(guān)系,可以單獨(dú)執(zhí)行任務(wù)。
Reduce階段:負(fù)責(zé)將任務(wù)合并,即把Map階段的結(jié)果進(jìn)行全局匯總。下面通過一個(gè)圖來描述上述MapReduce 的核心思想。
MapReduce就是“任務(wù)的分解與結(jié)和的匯總”。即使用戶不懂分布式計(jì)算框架的內(nèi)部運(yùn)行機(jī)制,但是只要能用Map和 Reduce思想描述清楚要處理的問題,就能輕松地在Hadoop集群上實(shí)現(xiàn)分布式計(jì)算功能。
MapReduce編程模型
MapReduce是一種編程模型,用于處理大規(guī)模數(shù)據(jù)集的并行運(yùn)算。使用MapReduce執(zhí)行計(jì)算任務(wù)的時(shí)候,每個(gè)任務(wù)的執(zhí)行過程都會(huì)被分為兩個(gè)階段,分別是Map和Reduce,其中Map階段用于對原始數(shù)據(jù)進(jìn)行處理,Reduce階段用于對Map階段的結(jié)果進(jìn)行匯總,得到最終結(jié)果。
MapReduce編程模型借鑒了函數(shù)式程序設(shè)計(jì)語言的設(shè)計(jì)思想,其程序?qū)崿F(xiàn)過程是通過map()和l reduce()函數(shù)來完成的。從數(shù)據(jù)格式上來看,map()函數(shù)接收的數(shù)據(jù)格式是鍵值對,生的輸出結(jié)果也是鍵值對形式,reduce()函數(shù)會(huì)將map()函數(shù)輸出的鍵值對作為輸入,把相同key 值的 value進(jìn)行匯總,輸出新的鍵值對。
(1)將原始數(shù)據(jù)處理成鍵值對<K1,V1>形式。
(2)將解析后的鍵值對<K1,V1>傳給map()函數(shù),map()函數(shù)會(huì)根據(jù)映射規(guī)則,將鍵值對<K1,V1>映射為一系列中間結(jié)果形式的鍵值對<K2,V2>。
(3)將中間形式的鍵值對<K2,V2>形成<K2,{V2,....>形式傳給reduce()函數(shù)處理,把具有相同key的value合并在一起,產(chǎn)生新的鍵值對<K3,V3>,此時(shí)的鍵值對<K3,V3>就是最終輸出的結(jié)果。
詞頻統(tǒng)計(jì)
因?yàn)槲覀兊臄?shù)據(jù)都存儲(chǔ)在不同的計(jì)算機(jī)中,那么將對象中的數(shù)據(jù)從網(wǎng)絡(luò)中傳輸,就一定要用到序列化!
/*
JDK序列化對象的弊端
我們進(jìn)行序列化 其實(shí)最主要的目的是為了 序列化對象的屬性數(shù)據(jù)
比如如果序列化一個(gè)Person對象 new Person("柳巖",38); 其實(shí)我們想要的是 柳巖 38
但是如果直接序列化一個(gè)對象的話 JDK為了反序列化方便 會(huì)在文件中加入其他的數(shù)據(jù) 這樣
序列化后的文件會(huì)變的很大,占用空間
*/
public class Test {
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));
//JDK序列化對象
Person p = new Person();
p.setName("柳巖");
p.setAge(38);
oos.writeObject(p);
oos.close();
}
}
本來其實(shí)數(shù)據(jù)就占幾個(gè)字節(jié),序列化后,多占用了很多字節(jié),這樣如果序列化多的話就會(huì)浪費(fèi)很多空間.
/*
可以通過序列化屬性的方式解決問題
只序列化屬性 可以減小序列化后的文件大小
*/
public class Test {
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));
Person p = new Person();
p.setName("柳巖");
p.setAge(38);
//只序列化屬性
oos.writeUTF(p.getName());
oos.writeInt(p.getAge());
oos.close();
}
}
/*
需要注意
反序列化時(shí) 需要按照序列化的順序來反序列化
*/
public class Test {
public static void main(String[] args) throws Exception {
ObjectInputStream ois = new ObjectInputStream(new FileInputStream("d:\\person.txt"));
//先反序列化name 在反序列化age
String name = ois.readUTF();
int age = ois.readInt();
System.out.println(name + " "+age);
ois.close();
}
}
Hadoop對java的序列化又進(jìn)行了優(yōu)化,對一些類型進(jìn)行了進(jìn)一步的封裝,方便按照自己的方式序列化
Integer ----> IntWritable
Long ----> LongWritable
String ----> Text
Double ----> DoubleWritable
Boolean ----> BooleanWritable
WorldCount代碼編寫
map函數(shù)定義
/*
KEYIN: K1
VALUIN: V1
KEYOUT:K2
VALUEOUT:V2
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
}
我們只需要繼承Mapper類,重寫map方法就好
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
K1 : 行起始位置 數(shù)字 Long ---- > LongWritable
V1 : 一行數(shù)據(jù) 字符串 String -----> Text
K2 : 單詞 字符串 String -----> Text
V2 : 固定數(shù)字1 數(shù)組 Long -----> LongWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
/**
*
* @param key K1
* @param value V1
* @param context 上下文對象 將map的結(jié)果 輸出給reduce
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將一行數(shù)據(jù) 轉(zhuǎn)換成字符串 按照空格切割
String[] arr = value.toString().split("\\s+");
for (String k2 : arr) {
//將單詞輸出給reduce
context.write(new Text(k2),new LongWritable(1));
}
}
}
reduce函數(shù)定義
/*
KEYIN:K2
VALUEIN:V2
KEYOUT:K3
VALUEOUT:V3
*/
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
}
我們只需要繼承Reducer類型重寫reduce方法就好
/*
K2:單詞 String ----> Text
V2:固定數(shù)字 1 Long ----> LongWritable
K3:單詞 String ----> Text
V3:相加后的結(jié)果 Long ----> LongWritable
*/
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
*
* @param key K2
* @param values V2的集合 {1,1,1,1}
* @param context 上下文對象 輸出結(jié)果
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
//將次數(shù)相加
for (LongWritable value : values) {
count+=value.get();
}
//寫出 k3 v3
context.write(key,new LongWritable(count));
}
}
最后編寫啟動(dòng)程序文章來源:http://www.zghlxwxcb.cn/news/detail-500797.html
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Test {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//創(chuàng)建配置對象
Configuration conf = new Configuration();
//創(chuàng)建工作任務(wù)
Job job = Job.getInstance(conf, "wordCount");
//設(shè)置Map類
job.setMapperClass(WordCountMapper.class);
//設(shè)置Reduce類
job.setReducerClass(WordCountReducer.class);
//設(shè)置map的輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//設(shè)置reduce的輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設(shè)置讀取文件位置 可以是文件 也可以是文件夾
FileInputFormat.setInputPaths(job,new Path("d:\\work\\abc"));
//設(shè)置輸出文件位置
FileOutputFormat.setOutputPath(job,new Path("d:\\work\\abc\\out_put"));
//提交任務(wù) 并等待任務(wù)結(jié)束
job.waitForCompletion(true);
}
}
如果拋這個(gè)異常 需要查看windows環(huán)境
Exception in thread "main"java.lang .UnsatisfiedLinkError: org.apache .hadoop.io.nativeio.NativeIO$windows.access0(Ljava/lang/string;1) .
如果已經(jīng)配置了環(huán)境 還不行 在src新建包 org.apache.hadoop.io.nativeio
然后hadoop02文件夾中的 NativeIO.java添加到這個(gè)包下 重新運(yùn)行嘗試
若要顯示報(bào)錯(cuò)信息在resouces目錄下添加log4j.properties
內(nèi)容如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-500797.html
log4j.rootCategory=INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
到了這里,關(guān)于MapReduce分布式計(jì)算(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!