国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Hadoop】一個(gè)例子帶你了解MapReduce

這篇具有很好參考價(jià)值的文章主要介紹了【Hadoop】一個(gè)例子帶你了解MapReduce。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

寫(xiě)在前面:博主是一只經(jīng)過(guò)實(shí)戰(zhàn)開(kāi)發(fā)歷練后投身培訓(xùn)事業(yè)的“小山豬”,昵稱(chēng)取自動(dòng)畫(huà)片《獅子王》中的“彭彭”,總是以樂(lè)觀、積極的心態(tài)對(duì)待周邊的事物。本人的技術(shù)路線從Java全棧工程師一路奔向大數(shù)據(jù)開(kāi)發(fā)、數(shù)據(jù)挖掘領(lǐng)域,如今終有小成,愿將昔日所獲與大家交流一二,希望對(duì)學(xué)習(xí)路上的你有所助益。同時(shí),博主也想通過(guò)此次嘗試打造一個(gè)完善的技術(shù)圖書(shū)館,任何與文章技術(shù)點(diǎn)有關(guān)的異常、錯(cuò)誤、注意事項(xiàng)均會(huì)在末尾列出,歡迎大家通過(guò)各種方式提供素材。

  • 對(duì)于文章中出現(xiàn)的任何錯(cuò)誤請(qǐng)大家批評(píng)指出,一定及時(shí)修改。
  • 有任何想要討論和學(xué)習(xí)的問(wèn)題可聯(lián)系我:zhuyc@vip.163.com。
  • 發(fā)布文章的風(fēng)格因?qū)诙?,均自成體系,不足之處請(qǐng)大家指正。

一個(gè)例子帶你了解MapReduce

本文關(guān)鍵字:大數(shù)據(jù)、Hadoop、MapReduce、WordCount

一、前期準(zhǔn)備

1. 運(yùn)行環(huán)境

想要運(yùn)行WordCount程序,其實(shí)可以不需要安裝任何的Hadoop軟件環(huán)境,因?yàn)閷?shí)際上執(zhí)行計(jì)算任務(wù)的是Hadoop框架集成的各種jar包。Hadoop啟動(dòng)后的各項(xiàng)進(jìn)程主要用于支持HDFS的使用,各個(gè)節(jié)點(diǎn)間的通訊,任務(wù)調(diào)度等等。所以如果我們只是想測(cè)試程序的可用性的話可以只新建一個(gè)Java項(xiàng)目,然后集成Hadoop相關(guān)的jar包,直接運(yùn)行程序即可。
這種方式只限于代碼測(cè)試,因?yàn)榭梢噪S時(shí)修改代碼并且執(zhí)行,結(jié)果也可以很方便查看。本文主要講解MapReduce的運(yùn)行流程,因此不需要搭建任何Hadoop環(huán)境,關(guān)于Hadoop任務(wù)的提交方式將在其它文章中詳細(xì)說(shuō)明。

2. 項(xiàng)目新建

  • 首先在IDEA中新建一個(gè)Maven項(xiàng)目:

通過(guò)一個(gè)例子談?wù)勀銓?duì)mapreduce的理解,學(xué)習(xí)路上,# Hadoop,mapreduce,hadoop,hdfs

  • 修改pom.xml,添加Hadoop相關(guān)的依賴(lài):
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.5</version>
        </dependency>
    </dependencies>

二、從WordCount開(kāi)始

對(duì)于Hadoop來(lái)說(shuō),它的Hello World經(jīng)典案例當(dāng)屬WordCount了,給出一段文本,我們統(tǒng)計(jì)出其中一共包含多少單詞。我們可以使用MapReduce的思想來(lái)將任務(wù)分步執(zhí)行,這樣的好處是更利于任務(wù)的分割與合并?,F(xiàn)在描述可能沒(méi)有多大的感覺(jué),我們直接來(lái)看下面兩個(gè)對(duì)比。

1. 基本流程梳理

按照常規(guī)思路,我們希望最終的結(jié)果是以Map形式存儲(chǔ),每個(gè)key存儲(chǔ)單詞,對(duì)應(yīng)的value存儲(chǔ)統(tǒng)計(jì)數(shù)量。于是,我們定義一個(gè)Map<String, Integer>類(lèi)型用來(lái)存儲(chǔ)最終的結(jié)果。數(shù)據(jù)集先使用一個(gè)String[]來(lái)代替,在最后的MR完整實(shí)現(xiàn)中,會(huì)從文件中進(jìn)行讀取。

    static String[] text = {
            "what day is today",
            "today is a good day",
            "good good study",
            "day day up"
    };

2. 常規(guī)思路實(shí)現(xiàn)

如果只是單個(gè)的Java程序,我們可以這樣做:

    public static void main(String[] args) {
        // 定義用于存放統(tǒng)計(jì)結(jié)果的Map結(jié)構(gòu)
        Map<String, Integer> map = new HashMap<>();
        // 讀取數(shù)組中的每個(gè)元素,模擬一次讀取一行
        for (String line : text){
            // 將每個(gè)單詞以空格分割
            String[] words = line.split(" ");
            // 讀取每一個(gè)單詞
            for (String word : words){
                // 每次將單詞的統(tǒng)計(jì)結(jié)果取出,加1后放回
                if (map.containsKey(word)){
                    map.put(word, map.get(word) + 1);
                }else {
                    // 如果是第一次遇到這個(gè)單詞,則存放1
                    map.put(word, 1);
                }
            }
        }
        // 輸出結(jié)果
        System.out.println(map);
    }

由于是簡(jiǎn)單的Java程序,這里就不過(guò)多說(shuō)明了,大家可以自己看一下注釋。

3. MR思想實(shí)現(xiàn)

從上面的程序可以看到,我們使用循環(huán)結(jié)構(gòu),逐行逐個(gè)的處理每行字符串中的每個(gè)單詞,然后將結(jié)果不斷的更新到Map結(jié)構(gòu)中。在這種情況下,如果我們讓不同的線程【相當(dāng)于不同的Hadoop節(jié)點(diǎn)】去處理不同行的數(shù)據(jù),再放到Map中時(shí),為了考慮線程安全問(wèn)題,其實(shí)是無(wú)法發(fā)揮最大作用的,很多時(shí)候要等待鎖的釋放。如果我們用MapReduce的思想來(lái)將程序改寫(xiě)一些就會(huì)不同了。

  • 定義一個(gè)K-V鍵值對(duì)結(jié)構(gòu)
    static class KeyValuePair<K,V>{
        K key;
        V value;

        public KeyValuePair(K key, V value){
            this.key = key;
            this.value = value;
        }

        @Override
        public String toString() {
            return "{" +
                    "key=" + key +
                    ", value=" + value +
                    '}';
        }
    }

以下程序的編寫(xiě)可以幫助大家理解MR過(guò)程中最為重要的3個(gè)核心步驟:Map、Shuffling、Reduce。這三個(gè)階段會(huì)完成許許多多的工作,對(duì)于開(kāi)發(fā)者來(lái)說(shuō)我們最關(guān)心的是數(shù)據(jù)結(jié)構(gòu)上的變化,因此,其中涉及到的排序等相關(guān)操作并沒(méi)有去實(shí)現(xiàn),想要深挖的小伙伴可以去看源碼。

  • Map階段

在這一階段,會(huì)對(duì)數(shù)據(jù)逐行處理,key為偏移量,value則是這一行出現(xiàn)的數(shù)據(jù)鍵值對(duì)列表。

    static Map<Integer, List<KeyValuePair<String, Integer>>> doMapper(){
        Map<Integer, List<KeyValuePair<String, Integer>>> mapper = new HashMap<>();
        // 定義偏移量指標(biāo),作為key
        int offset = 0;
        for (String line : text){
            String[] words = line.split(" ");
            List<KeyValuePair<String, Integer>> list = new ArrayList<>();
            for (String word : words){
                // 將出現(xiàn)的單詞作為鍵值對(duì)的key,將出現(xiàn)次數(shù)作為鍵值對(duì)的value
                KeyValuePair<String, Integer> keyValuePair = new KeyValuePair<>(word, 1);
                list.add(keyValuePair);
            }
            // 每次處理一行的數(shù)據(jù),生成對(duì)應(yīng)的鍵值對(duì)列表
            mapper.put(offset, list);
            // 調(diào)整偏移量,總字符加一個(gè)換行符
            offset += line.length() + 1;
        }
        return mapper;
    }

結(jié)果如下所示:

{0=[{key=what, value=1}, {key=day, value=1}, {key=is, value=1}, {key=today, value=1}], 18=[{key=today, value=1}, {key=is, value=1}, {key=a, value=1}, {key=good, value=1}, {key=day, value=1}], 38=[{key=good, value=1}, {key=good, value=1}, {key=study, value=1}], 54=[{key=day, value=1}, {key=day, value=1}, {key=up, value=1}]}
  • Shuffling階段

在這一階段,將會(huì)把所有的key進(jìn)行排序,并把相同的value放在同一個(gè)列表中。

    static Map<String, List<Integer>> doShuffle(Map<Integer, List<KeyValuePair<String, Integer>>> mapper){
        Map<String, List<Integer>> shuffle = new HashMap<>();
        for (Integer key : mapper.keySet()){
            List<KeyValuePair<String, Integer>> keyValuePairs = mapper.get(key);
            for (KeyValuePair<String, Integer> keyValuePair : keyValuePairs){
                // 將出現(xiàn)過(guò)的相同單詞放在同一個(gè)列表中
                if (shuffle.containsKey(keyValuePair.key)){
                    shuffle.get(keyValuePair.key).add(keyValuePair.value);
                } else {
                    // 如果是第一次記錄,則創(chuàng)建一個(gè)列表
                    List<Integer> list = new ArrayList<>();
                    list.add(keyValuePair.value);
                    shuffle.put(keyValuePair.key, list);
                }
            }
        }
        return shuffle;
    }

此時(shí),依然不涉及計(jì)算邏輯,結(jié)果如圖所示:

{a=[1], study=[1], what=[1], today=[1, 1], is=[1, 1], up=[1], day=[1, 1, 1, 1], good=[1, 1, 1]}
  • Reduce階段

在這一階段,會(huì)在每個(gè)key對(duì)應(yīng)的value列表中執(zhí)行我們需要的計(jì)算邏輯。

    static Map<String, Integer> doReducer(Map<String, List<Integer>> shuffle){
        Map<String, Integer> reducer = new HashMap<>();
        for (String key : shuffle.keySet()){
            List<Integer> values = shuffle.get(key);
            Integer result = 0;
            // 此處對(duì)value進(jìn)行處理,執(zhí)行累加
            for (Integer value : values){
                result += value;
            }
            reducer.put(key, result);
        }
        return reducer;
    }

得到最終結(jié)果,執(zhí)行結(jié)果如下:

{a=1, study=1, what=1, today=2, is=2, up=1, day=4, good=3}
  • 程序運(yùn)行結(jié)果

通過(guò)一個(gè)例子談?wù)勀銓?duì)mapreduce的理解,學(xué)習(xí)路上,# Hadoop,mapreduce,hadoop,hdfs

三、MapReduce

上面的例子幫大家簡(jiǎn)單的梳理了一下整體流程,這樣我們就不需要debug去看每一步的執(zhí)行效果了,因?yàn)橹皇悄M實(shí)現(xiàn),所以省略了一些步驟。上面定義的KeyValuePair中出現(xiàn)的泛型也是整個(gè)流程的重要組成部分,實(shí)際執(zhí)行計(jì)算任務(wù)時(shí)經(jīng)常要根據(jù)需要合理的去定義Key與Value的類(lèi)型。

1. Mapper

新建一個(gè)Class,繼承Mapper,重寫(xiě)其中的map方法??梢韵榷x好泛型,然后再自動(dòng)生成map方法。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

/**
 * 以下泛型聲明的是map階段輸入和輸出數(shù)據(jù)的對(duì)應(yīng)類(lèi)型
 * KEYIN: 偏移量,為整數(shù)類(lèi)型
 * VALUEIN: 每一行的字符串,為文本類(lèi)型
 * KEYOUT: 單詞,為文本類(lèi)型
 * VALUEOUT: 出現(xiàn)次數(shù)1,為整數(shù)類(lèi)型
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    /**
     * map階段將字符的偏移量作為key,每次得到的value為一行的數(shù)據(jù)
     * @param key 字符偏移量,包含換行符
     * @param value 整行的數(shù)據(jù)
     * @param context 將結(jié)果輸出到下一階段的對(duì)象
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        if (value != null){
            // 獲取該行的數(shù)據(jù)
            String line = value.toString();
            // 根據(jù)空格分離出每個(gè)單詞
            String[] words = StringUtils.split(line, ' ');
            // 將每個(gè)單詞以鍵值對(duì)輸出
            for(String word : words){
                context.write(new Text(word), new LongWritable(1));
            }

        }
    }

}

2. Reducer

新建一個(gè)Class,繼承Reducer,重寫(xiě)其中的reduce方法??梢韵榷x好泛型,然后再自動(dòng)生成reduce方法。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 以下泛型聲明的是reduce階段輸入和輸出數(shù)據(jù)的對(duì)應(yīng)類(lèi)型,輸入類(lèi)型對(duì)應(yīng)的是Map階段的輸出
 * KEYIN: 單詞,為文本類(lèi)型
 * VALUEIN: 出現(xiàn)次數(shù)1,為整數(shù)類(lèi)型
 * KEYOUT: 單詞,為文本類(lèi)型
 * VALUEOUT: 統(tǒng)計(jì)次數(shù),為整數(shù)類(lèi)型
 */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    /**
     * 本例中省略了對(duì)shuffle的自定義,獲取到的是默認(rèn)處理后的數(shù)據(jù)
     * @param key 單詞
     * @param values 出現(xiàn)1次的數(shù)據(jù)列表[1,1,...]
     * @param context 將結(jié)果最終輸出的對(duì)象
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        // 定義用于記錄累加結(jié)果的變量
        long sum = 0;
        // 遍歷列表,執(zhí)行累加操作
        for (LongWritable value : values){
            sum += value.get();
        }
        // 輸出最后的統(tǒng)計(jì)結(jié)果
        context.write(key, new LongWritable(sum));
    }
}

3. Executor

新建一個(gè)Class,繼承Configured,并實(shí)現(xiàn)Tool接口,完整代碼如下:

import edu.sand.mapper.WordCountMapper;
import edu.sand.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class WordCountExecutor extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        // 初始化配置,可以通過(guò)這個(gè)對(duì)象設(shè)置各種參數(shù)
        Configuration conf = new Configuration();
        // 完成Job初始化,設(shè)置任務(wù)名稱(chēng)
        Job job = Job.getInstance(conf, "wordCount");
        // 設(shè)置Job的運(yùn)行主類(lèi)
        job.setJarByClass(WordCountExecutor.class);
        // 設(shè)置Map階段的執(zhí)行類(lèi)
        job.setMapperClass(WordCountMapper.class);
        // 設(shè)置Map階段的數(shù)據(jù)輸出類(lèi)型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 設(shè)置Reduce階段的執(zhí)行類(lèi)
        job.setReducerClass(WordCountReducer.class);
        // 設(shè)置Reduce階段的數(shù)據(jù)輸出類(lèi)型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 指定數(shù)據(jù)輸入文件路徑,如果指定的是文件夾,將讀取目錄下所有文件
        FileInputFormat.setInputPaths(job, new Path("input/"));
        // 指定結(jié)果輸出文件路徑,最后一級(jí)路徑會(huì)自動(dòng)創(chuàng)建,每次重新執(zhí)行時(shí)需要?jiǎng)h除或修改名稱(chēng)
        FileOutputFormat.setOutputPath(job, new Path("output/wordCount"));
        // 使用job調(diào)用執(zhí)行,true代表顯示詳細(xì)信息,成功時(shí)返回0
        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        // 調(diào)用執(zhí)行
        ToolRunner.run(new Configuration(), new WordCountExecutor(), args);
    }
}

4. 運(yùn)行結(jié)果

  • 項(xiàng)目結(jié)構(gòu)說(shuō)明

由于是本地代碼運(yùn)行,所以數(shù)據(jù)輸入和結(jié)果輸出都保存在本地磁盤(pán)上,可以在src同級(jí)創(chuàng)建兩個(gè)文件夾inputoutput
通過(guò)一個(gè)例子談?wù)勀銓?duì)mapreduce的理解,學(xué)習(xí)路上,# Hadoop,mapreduce,hadoop,hdfs

  • 日志配置

如果希望看到更詳細(xì)的日志輸出,可以在resources文件夾下創(chuàng)建一個(gè)log4j.properties,內(nèi)容如下:

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p\t%d{ISO8601}\t%r\t%c\t[%t]\t%m%n

第一行的日志級(jí)別可以設(shè)置為INOF或者DEBUG。

  • 執(zhí)行結(jié)果

運(yùn)行后會(huì)在對(duì)應(yīng)路徑下自動(dòng)生成一個(gè)文件夾,其中主要包含3類(lèi)文件:任務(wù)執(zhí)行標(biāo)志文件、結(jié)果輸出文件、校驗(yàn)文件。以crc結(jié)尾的文件為校驗(yàn)類(lèi)文件,當(dāng)任務(wù)成功執(zhí)行時(shí),會(huì)產(chǎn)生一個(gè)**_SUCCESS文件,具體的運(yùn)行結(jié)果會(huì)存放在part-r-xxxxx**文件中,part文件的名稱(chēng)和個(gè)數(shù)取決于Reduce的數(shù)量以及開(kāi)發(fā)者的需要。
通過(guò)一個(gè)例子談?wù)勀銓?duì)mapreduce的理解,學(xué)習(xí)路上,# Hadoop,mapreduce,hadoop,hdfs
掃描下方二維碼,加入CSDN官方粉絲微信群,可以與我直接交流,還有更多福利哦~
通過(guò)一個(gè)例子談?wù)勀銓?duì)mapreduce的理解,學(xué)習(xí)路上,# Hadoop,mapreduce,hadoop,hdfs文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-775178.html

到了這里,關(guān)于【Hadoop】一個(gè)例子帶你了解MapReduce的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包