寫(xiě)在前面:博主是一只經(jīng)過(guò)實(shí)戰(zhàn)開(kāi)發(fā)歷練后投身培訓(xùn)事業(yè)的“小山豬”,昵稱(chēng)取自動(dòng)畫(huà)片《獅子王》中的“彭彭”,總是以樂(lè)觀、積極的心態(tài)對(duì)待周邊的事物。本人的技術(shù)路線從Java全棧工程師一路奔向大數(shù)據(jù)開(kāi)發(fā)、數(shù)據(jù)挖掘領(lǐng)域,如今終有小成,愿將昔日所獲與大家交流一二,希望對(duì)學(xué)習(xí)路上的你有所助益。同時(shí),博主也想通過(guò)此次嘗試打造一個(gè)完善的技術(shù)圖書(shū)館,任何與文章技術(shù)點(diǎn)有關(guān)的異常、錯(cuò)誤、注意事項(xiàng)均會(huì)在末尾列出,歡迎大家通過(guò)各種方式提供素材。
- 對(duì)于文章中出現(xiàn)的任何錯(cuò)誤請(qǐng)大家批評(píng)指出,一定及時(shí)修改。
- 有任何想要討論和學(xué)習(xí)的問(wèn)題可聯(lián)系我:zhuyc@vip.163.com。
- 發(fā)布文章的風(fēng)格因?qū)诙?,均自成體系,不足之處請(qǐng)大家指正。
一個(gè)例子帶你了解MapReduce
本文關(guān)鍵字:大數(shù)據(jù)、Hadoop、MapReduce、WordCount
一、前期準(zhǔn)備
1. 運(yùn)行環(huán)境
想要運(yùn)行WordCount程序,其實(shí)可以不需要安裝任何的Hadoop軟件環(huán)境,因?yàn)閷?shí)際上執(zhí)行計(jì)算任務(wù)的是Hadoop框架集成的各種jar包。Hadoop啟動(dòng)后的各項(xiàng)進(jìn)程主要用于支持HDFS的使用,各個(gè)節(jié)點(diǎn)間的通訊,任務(wù)調(diào)度等等。所以如果我們只是想測(cè)試程序的可用性的話可以只新建一個(gè)Java項(xiàng)目,然后集成Hadoop相關(guān)的jar包,直接運(yùn)行程序即可。
這種方式只限于代碼測(cè)試,因?yàn)榭梢噪S時(shí)修改代碼并且執(zhí)行,結(jié)果也可以很方便查看。本文主要講解MapReduce的運(yùn)行流程,因此不需要搭建任何Hadoop環(huán)境,關(guān)于Hadoop任務(wù)的提交方式將在其它文章中詳細(xì)說(shuō)明。
2. 項(xiàng)目新建
- 首先在IDEA中新建一個(gè)Maven項(xiàng)目:
- 修改pom.xml,添加Hadoop相關(guān)的依賴(lài):
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
</dependencies>
二、從WordCount開(kāi)始
對(duì)于Hadoop來(lái)說(shuō),它的Hello World經(jīng)典案例當(dāng)屬WordCount了,給出一段文本,我們統(tǒng)計(jì)出其中一共包含多少單詞。我們可以使用MapReduce的思想來(lái)將任務(wù)分步執(zhí)行,這樣的好處是更利于任務(wù)的分割與合并?,F(xiàn)在描述可能沒(méi)有多大的感覺(jué),我們直接來(lái)看下面兩個(gè)對(duì)比。
1. 基本流程梳理
按照常規(guī)思路,我們希望最終的結(jié)果是以Map形式存儲(chǔ),每個(gè)key存儲(chǔ)單詞,對(duì)應(yīng)的value存儲(chǔ)統(tǒng)計(jì)數(shù)量。于是,我們定義一個(gè)Map<String, Integer>
類(lèi)型用來(lái)存儲(chǔ)最終的結(jié)果。數(shù)據(jù)集先使用一個(gè)String[]
來(lái)代替,在最后的MR完整實(shí)現(xiàn)中,會(huì)從文件中進(jìn)行讀取。
static String[] text = {
"what day is today",
"today is a good day",
"good good study",
"day day up"
};
2. 常規(guī)思路實(shí)現(xiàn)
如果只是單個(gè)的Java程序,我們可以這樣做:
public static void main(String[] args) {
// 定義用于存放統(tǒng)計(jì)結(jié)果的Map結(jié)構(gòu)
Map<String, Integer> map = new HashMap<>();
// 讀取數(shù)組中的每個(gè)元素,模擬一次讀取一行
for (String line : text){
// 將每個(gè)單詞以空格分割
String[] words = line.split(" ");
// 讀取每一個(gè)單詞
for (String word : words){
// 每次將單詞的統(tǒng)計(jì)結(jié)果取出,加1后放回
if (map.containsKey(word)){
map.put(word, map.get(word) + 1);
}else {
// 如果是第一次遇到這個(gè)單詞,則存放1
map.put(word, 1);
}
}
}
// 輸出結(jié)果
System.out.println(map);
}
由于是簡(jiǎn)單的Java程序,這里就不過(guò)多說(shuō)明了,大家可以自己看一下注釋。
3. MR思想實(shí)現(xiàn)
從上面的程序可以看到,我們使用循環(huán)結(jié)構(gòu),逐行逐個(gè)的處理每行字符串中的每個(gè)單詞,然后將結(jié)果不斷的更新到Map結(jié)構(gòu)中。在這種情況下,如果我們讓不同的線程【相當(dāng)于不同的Hadoop節(jié)點(diǎn)】去處理不同行的數(shù)據(jù),再放到Map中時(shí),為了考慮線程安全問(wèn)題,其實(shí)是無(wú)法發(fā)揮最大作用的,很多時(shí)候要等待鎖的釋放。如果我們用MapReduce的思想來(lái)將程序改寫(xiě)一些就會(huì)不同了。
- 定義一個(gè)K-V鍵值對(duì)結(jié)構(gòu)
static class KeyValuePair<K,V>{
K key;
V value;
public KeyValuePair(K key, V value){
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "{" +
"key=" + key +
", value=" + value +
'}';
}
}
以下程序的編寫(xiě)可以幫助大家理解MR過(guò)程中最為重要的3個(gè)核心步驟:Map、Shuffling、Reduce。這三個(gè)階段會(huì)完成許許多多的工作,對(duì)于開(kāi)發(fā)者來(lái)說(shuō)我們最關(guān)心的是數(shù)據(jù)結(jié)構(gòu)上的變化,因此,其中涉及到的排序等相關(guān)操作并沒(méi)有去實(shí)現(xiàn),想要深挖的小伙伴可以去看源碼。
- Map階段
在這一階段,會(huì)對(duì)數(shù)據(jù)逐行處理,key為偏移量,value則是這一行出現(xiàn)的數(shù)據(jù)鍵值對(duì)列表。
static Map<Integer, List<KeyValuePair<String, Integer>>> doMapper(){
Map<Integer, List<KeyValuePair<String, Integer>>> mapper = new HashMap<>();
// 定義偏移量指標(biāo),作為key
int offset = 0;
for (String line : text){
String[] words = line.split(" ");
List<KeyValuePair<String, Integer>> list = new ArrayList<>();
for (String word : words){
// 將出現(xiàn)的單詞作為鍵值對(duì)的key,將出現(xiàn)次數(shù)作為鍵值對(duì)的value
KeyValuePair<String, Integer> keyValuePair = new KeyValuePair<>(word, 1);
list.add(keyValuePair);
}
// 每次處理一行的數(shù)據(jù),生成對(duì)應(yīng)的鍵值對(duì)列表
mapper.put(offset, list);
// 調(diào)整偏移量,總字符加一個(gè)換行符
offset += line.length() + 1;
}
return mapper;
}
結(jié)果如下所示:
{0=[{key=what, value=1}, {key=day, value=1}, {key=is, value=1}, {key=today, value=1}], 18=[{key=today, value=1}, {key=is, value=1}, {key=a, value=1}, {key=good, value=1}, {key=day, value=1}], 38=[{key=good, value=1}, {key=good, value=1}, {key=study, value=1}], 54=[{key=day, value=1}, {key=day, value=1}, {key=up, value=1}]}
- Shuffling階段
在這一階段,將會(huì)把所有的key進(jìn)行排序,并把相同的value放在同一個(gè)列表中。
static Map<String, List<Integer>> doShuffle(Map<Integer, List<KeyValuePair<String, Integer>>> mapper){
Map<String, List<Integer>> shuffle = new HashMap<>();
for (Integer key : mapper.keySet()){
List<KeyValuePair<String, Integer>> keyValuePairs = mapper.get(key);
for (KeyValuePair<String, Integer> keyValuePair : keyValuePairs){
// 將出現(xiàn)過(guò)的相同單詞放在同一個(gè)列表中
if (shuffle.containsKey(keyValuePair.key)){
shuffle.get(keyValuePair.key).add(keyValuePair.value);
} else {
// 如果是第一次記錄,則創(chuàng)建一個(gè)列表
List<Integer> list = new ArrayList<>();
list.add(keyValuePair.value);
shuffle.put(keyValuePair.key, list);
}
}
}
return shuffle;
}
此時(shí),依然不涉及計(jì)算邏輯,結(jié)果如圖所示:
{a=[1], study=[1], what=[1], today=[1, 1], is=[1, 1], up=[1], day=[1, 1, 1, 1], good=[1, 1, 1]}
- Reduce階段
在這一階段,會(huì)在每個(gè)key對(duì)應(yīng)的value列表中執(zhí)行我們需要的計(jì)算邏輯。
static Map<String, Integer> doReducer(Map<String, List<Integer>> shuffle){
Map<String, Integer> reducer = new HashMap<>();
for (String key : shuffle.keySet()){
List<Integer> values = shuffle.get(key);
Integer result = 0;
// 此處對(duì)value進(jìn)行處理,執(zhí)行累加
for (Integer value : values){
result += value;
}
reducer.put(key, result);
}
return reducer;
}
得到最終結(jié)果,執(zhí)行結(jié)果如下:
{a=1, study=1, what=1, today=2, is=2, up=1, day=4, good=3}
- 程序運(yùn)行結(jié)果
三、MapReduce
上面的例子幫大家簡(jiǎn)單的梳理了一下整體流程,這樣我們就不需要debug去看每一步的執(zhí)行效果了,因?yàn)橹皇悄M實(shí)現(xiàn),所以省略了一些步驟。上面定義的KeyValuePair中出現(xiàn)的泛型也是整個(gè)流程的重要組成部分,實(shí)際執(zhí)行計(jì)算任務(wù)時(shí)經(jīng)常要根據(jù)需要合理的去定義Key與Value的類(lèi)型。
1. Mapper
新建一個(gè)Class,繼承Mapper,重寫(xiě)其中的map方法??梢韵榷x好泛型,然后再自動(dòng)生成map方法。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
/**
* 以下泛型聲明的是map階段輸入和輸出數(shù)據(jù)的對(duì)應(yīng)類(lèi)型
* KEYIN: 偏移量,為整數(shù)類(lèi)型
* VALUEIN: 每一行的字符串,為文本類(lèi)型
* KEYOUT: 單詞,為文本類(lèi)型
* VALUEOUT: 出現(xiàn)次數(shù)1,為整數(shù)類(lèi)型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* map階段將字符的偏移量作為key,每次得到的value為一行的數(shù)據(jù)
* @param key 字符偏移量,包含換行符
* @param value 整行的數(shù)據(jù)
* @param context 將結(jié)果輸出到下一階段的對(duì)象
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
if (value != null){
// 獲取該行的數(shù)據(jù)
String line = value.toString();
// 根據(jù)空格分離出每個(gè)單詞
String[] words = StringUtils.split(line, ' ');
// 將每個(gè)單詞以鍵值對(duì)輸出
for(String word : words){
context.write(new Text(word), new LongWritable(1));
}
}
}
}
2. Reducer
新建一個(gè)Class,繼承Reducer,重寫(xiě)其中的reduce方法??梢韵榷x好泛型,然后再自動(dòng)生成reduce方法。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 以下泛型聲明的是reduce階段輸入和輸出數(shù)據(jù)的對(duì)應(yīng)類(lèi)型,輸入類(lèi)型對(duì)應(yīng)的是Map階段的輸出
* KEYIN: 單詞,為文本類(lèi)型
* VALUEIN: 出現(xiàn)次數(shù)1,為整數(shù)類(lèi)型
* KEYOUT: 單詞,為文本類(lèi)型
* VALUEOUT: 統(tǒng)計(jì)次數(shù),為整數(shù)類(lèi)型
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* 本例中省略了對(duì)shuffle的自定義,獲取到的是默認(rèn)處理后的數(shù)據(jù)
* @param key 單詞
* @param values 出現(xiàn)1次的數(shù)據(jù)列表[1,1,...]
* @param context 將結(jié)果最終輸出的對(duì)象
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// 定義用于記錄累加結(jié)果的變量
long sum = 0;
// 遍歷列表,執(zhí)行累加操作
for (LongWritable value : values){
sum += value.get();
}
// 輸出最后的統(tǒng)計(jì)結(jié)果
context.write(key, new LongWritable(sum));
}
}
3. Executor
新建一個(gè)Class,繼承Configured,并實(shí)現(xiàn)Tool接口,完整代碼如下:
import edu.sand.mapper.WordCountMapper;
import edu.sand.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountExecutor extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
// 初始化配置,可以通過(guò)這個(gè)對(duì)象設(shè)置各種參數(shù)
Configuration conf = new Configuration();
// 完成Job初始化,設(shè)置任務(wù)名稱(chēng)
Job job = Job.getInstance(conf, "wordCount");
// 設(shè)置Job的運(yùn)行主類(lèi)
job.setJarByClass(WordCountExecutor.class);
// 設(shè)置Map階段的執(zhí)行類(lèi)
job.setMapperClass(WordCountMapper.class);
// 設(shè)置Map階段的數(shù)據(jù)輸出類(lèi)型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 設(shè)置Reduce階段的執(zhí)行類(lèi)
job.setReducerClass(WordCountReducer.class);
// 設(shè)置Reduce階段的數(shù)據(jù)輸出類(lèi)型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定數(shù)據(jù)輸入文件路徑,如果指定的是文件夾,將讀取目錄下所有文件
FileInputFormat.setInputPaths(job, new Path("input/"));
// 指定結(jié)果輸出文件路徑,最后一級(jí)路徑會(huì)自動(dòng)創(chuàng)建,每次重新執(zhí)行時(shí)需要?jiǎng)h除或修改名稱(chēng)
FileOutputFormat.setOutputPath(job, new Path("output/wordCount"));
// 使用job調(diào)用執(zhí)行,true代表顯示詳細(xì)信息,成功時(shí)返回0
return job.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
// 調(diào)用執(zhí)行
ToolRunner.run(new Configuration(), new WordCountExecutor(), args);
}
}
4. 運(yùn)行結(jié)果
- 項(xiàng)目結(jié)構(gòu)說(shuō)明
由于是本地代碼運(yùn)行,所以數(shù)據(jù)輸入和結(jié)果輸出都保存在本地磁盤(pán)上,可以在src同級(jí)創(chuàng)建兩個(gè)文件夾input和output。
- 日志配置
如果希望看到更詳細(xì)的日志輸出,可以在resources文件夾下創(chuàng)建一個(gè)log4j.properties,內(nèi)容如下:
log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p\t%d{ISO8601}\t%r\t%c\t[%t]\t%m%n
第一行的日志級(jí)別可以設(shè)置為INOF或者DEBUG。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-775178.html
- 執(zhí)行結(jié)果
運(yùn)行后會(huì)在對(duì)應(yīng)路徑下自動(dòng)生成一個(gè)文件夾,其中主要包含3類(lèi)文件:任務(wù)執(zhí)行標(biāo)志文件、結(jié)果輸出文件、校驗(yàn)文件。以crc結(jié)尾的文件為校驗(yàn)類(lèi)文件,當(dāng)任務(wù)成功執(zhí)行時(shí),會(huì)產(chǎn)生一個(gè)**_SUCCESS文件,具體的運(yùn)行結(jié)果會(huì)存放在part-r-xxxxx**文件中,part文件的名稱(chēng)和個(gè)數(shù)取決于Reduce的數(shù)量以及開(kāi)發(fā)者的需要。
掃描下方二維碼,加入CSDN官方粉絲微信群,可以與我直接交流,還有更多福利哦~文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-775178.html
到了這里,關(guān)于【Hadoop】一個(gè)例子帶你了解MapReduce的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!