国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hadoop三大框架之MapReduce工作流程

這篇具有很好參考價值的文章主要介紹了Hadoop三大框架之MapReduce工作流程。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、MapReduce基礎(chǔ)

MapReduce的思想核心是“分而治之”,適用于大量復(fù)雜的任務(wù)處理場景(大規(guī)模數(shù)據(jù)處理場景)。

  • Map負(fù)責(zé)“分”,把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來并行處理。可以進(jìn)行拆分的前提是這些小任務(wù)可以并行計算,彼此間幾乎沒有依賴關(guān)系。

  • Reduce負(fù)責(zé)“合”,即對map階段的結(jié)果進(jìn)行全局匯總。

  • MapReduce運行在yarn集群。ResourceManager+NodeManager這兩個階段合起來就是MapReduce思想的體現(xiàn)。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

1.1 MapReduce設(shè)計構(gòu)思

MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個完整的分布式運算程序,并發(fā)運行在Hadoop集群上。

MapReduce設(shè)計并提供了一個同意的計算框架,為程序員隱藏了絕大多數(shù)系統(tǒng)層面的處理細(xì)節(jié),為程序員提供了一個抽象和高層的編程接口和框架。程序員僅需要關(guān)心應(yīng)用層的具體計算問題,僅需要編寫少量的處理應(yīng)用本身計算問題的程序代碼。

Map和Reduce為程序員提供了一個清晰的操作接口抽象描述。MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現(xiàn)Map和Reduce,MapReduce處理的數(shù)據(jù)類型是<key,value>鍵值對。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

一個完整的MapReduce程序在分布式運行時有三類實例進(jìn)程:

  1. MRAppMaster 負(fù)責(zé)整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)

  1. MapTask 負(fù)責(zé)map階段的整個數(shù)據(jù)處理流程

  1. ReduceTask 負(fù)責(zé)reduce階段的整個數(shù)據(jù)處理流程

1.2 MapReduce工作原理

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔
mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

1、分片操作:

FileInputstream,首先要計算切片大小,F(xiàn)ileInputstream是一個抽象類,繼承InputFormat接口,真正完成工作的是它的實現(xiàn)類,默認(rèn)為是TextInputFormat,TextInputFormat是讀取文件的,默認(rèn)為一行一行讀取,將輸入文件切分為邏輯上的多個input split,input split是MapReduce對文件進(jìn)行處理和運算的輸入單位,只是一個邏輯概念。

在進(jìn)行Map計算之前,MapReduce會根據(jù)輸入文件計算的切片數(shù)開啟map任務(wù),一個輸入切片對應(yīng)一個maptask,輸入分片存儲的并非數(shù)據(jù)本身,而是一個分片長度和一個記錄數(shù)據(jù)位置的集合,每個input spilt中存儲著該分片的數(shù)據(jù)信息如:文件塊信息、起始位置、數(shù)據(jù)長度、所在節(jié)點列表等,并不是對文件實際分割成多個小文件,輸入切片大小往往與hdfs的block關(guān)系密切,默認(rèn)一個切片對應(yīng)一個block,大小為128M;注意:盡管我們可以使用默認(rèn)塊大小或自定義的方式來定義分片的大小,但一個文件的大小,如果是在切片大小的1.1倍以內(nèi),仍作為一個片存儲,而不會將那多出來的0.1單獨分片。

2、數(shù)據(jù)格式化操作:

TextInputFormat 會創(chuàng)建RecordReader去讀取數(shù)據(jù),通過getCurrentkey,getCurrentvalue,nextkey,value等方法來讀取,讀取結(jié)果會形成key,value形式返回給maptask,key為偏移量,value為每一行的內(nèi)容,此操作的作用為在分片中每讀取一條記錄就調(diào)用一次map方法,反復(fù)這一過程直到將整個分片讀取完畢。

3、map階段操作:

此階段就是程序員通過需求偏寫了map函數(shù),將數(shù)據(jù)格式化的<K,V>鍵值對通過Mapper的map()方法邏輯處理,形成新的<k,v>鍵值對,通過Context.write輸出到OutPutCollector收集器

map端的shuffle(數(shù)據(jù)混洗)過程:溢寫(分區(qū),排序,合并,歸并)

溢寫:

由map處理的結(jié)果并不會直接寫入磁盤,而是會在內(nèi)存中開啟一個環(huán)形內(nèi)存緩沖區(qū),先將map結(jié)果寫入緩沖區(qū),這個緩沖區(qū)默認(rèn)大小為100M,并且在配置文件里為這個緩沖區(qū)設(shè)了一個閥值,默認(rèn)為0.8,同時map還會為輸出操作啟動一個守護(hù)線程,如果緩沖區(qū)內(nèi)存達(dá)到了閥值0.8,這個線程會將內(nèi)容寫入到磁盤上,這個過程叫作spill(溢寫)。

分區(qū)Partition

當(dāng)數(shù)據(jù)寫入內(nèi)存時,決定數(shù)據(jù)由哪個Reduce處理,從而需要分區(qū),默認(rèn)分區(qū)方式采用hash函數(shù)對key進(jìn)行哈布后再用Reduce任務(wù)數(shù)量進(jìn)行取模,表示為hash(key)modR,這樣就可以把map輸出結(jié)果均勻分配給Reduce任務(wù)處理,Partition與Reduce是一一對應(yīng)關(guān)系,類似于一個分片對應(yīng)一個map task,最終形成的形式為(分區(qū)號,key,value)

排序Sort:

在溢出的數(shù)據(jù)寫入磁盤前,會對數(shù)據(jù)按照key進(jìn)行排序,默認(rèn)采用快速排序,第一關(guān)鍵字為分區(qū)號,第二關(guān)鍵字為key。

合并combiner:

程序員可選是否合并,數(shù)據(jù)合并,在Reduce計算前對相同的key數(shù)據(jù)、value值合并,減少輸出量,如(“a”,1)(“a”,1)合并之后(“a”,2)

歸并menge

每塊溢寫會成一個溢寫文件,這些溢寫文件最終需要被歸并為一個大文件,生成key對應(yīng)的value-list,會進(jìn)行歸并排序<"a",1><"a",1>歸并后<"a",<1,1>>。

Reduce 端的shffle

數(shù)據(jù)copy:map端的shffle結(jié)束后,所有map的輸出結(jié)果都會保存在map節(jié)點的本地磁盤上,文件都經(jīng)過分區(qū),不同的分區(qū)會被copy到不同的Recuce任務(wù)并進(jìn)行并行處理,每個Reduce任務(wù)會不斷通過RPC向JobTracker詢問map任務(wù)是否完成,JobTracker檢測到map位務(wù)完成后,就會通過相關(guān)Reduce任務(wù)去aopy拉取數(shù)據(jù),Recluce收到通知就會從Map任務(wù)節(jié)點Copy自己分區(qū)的數(shù)據(jù)此過程一般是Reduce任務(wù)采用寫個線程從不同map節(jié)點拉取

歸并數(shù)據(jù)

Map端接取的數(shù)據(jù)會被存放到 Reduce端的緩存中,如果緩存被占滿,就會溢寫到磁盤上,緩存數(shù)據(jù)來自不同的Map節(jié)點,會存在很多合并的鍵值對,當(dāng)溢寫啟動時,相同的keg會被歸并,最終各個溢寫文件會被歸并為一個大類件歸并時會進(jìn)行排序,磁盤中多個溢寫文許歸并為一個大文許可能需要多次歸并,一次歸并溢寫文件默認(rèn)為10個

4、Reduce階段:

Reduce任務(wù)會執(zhí)行Reduce函數(shù)中定義的各種映射,輸出結(jié)果存在分布式文件系統(tǒng)中。

二、 MapReduce編程模型

2.1 編程模型概述

2.1.1 Map階段2個步驟

  1. 設(shè)置InputFormat類,將數(shù)據(jù)切分為Key-Value(K1和V1)對,輸入到第二步

  1. 自定義Map邏輯,將第一步的結(jié)果轉(zhuǎn)換為另外的Key-Value(K2和V2)對,輸出結(jié)果

2.1.2 Shuffle階段4個步驟

  1. 對輸出的Key-Value進(jìn)行分區(qū)

  1. 對不同分區(qū)的數(shù)據(jù)按照相同的key排序

  1. (可選)對分組過的數(shù)據(jù)初步規(guī)約,降低數(shù)據(jù)的網(wǎng)絡(luò)拷貝

  1. 對數(shù)據(jù)進(jìn)行分組,相同的Key和Value放入一個集合中

2.1.3 Reduce階段2個步驟

  1. 對多個Map任務(wù)的結(jié)果進(jìn)行排序以及合并,編寫Reduce函數(shù)實現(xiàn)自己的邏輯,對輸入的Key-Value進(jìn)行處理,轉(zhuǎn)為新的Key-Value(K3和V3)輸出

  1. 設(shè)置OutputFromat處理并保存Reduce輸出的Key-Value數(shù)據(jù)

2.2 編程模型三部曲

(1)Input:一系列(K1,V1)。

(2)Map和Reduce:

Map:(K1,V1) -> list(K2,V2) (其中K2/V2是中間結(jié)果對)

Reduce:(K2,list(v2)) -> list(K3,V3)

(3)Output:一系列(K3,V3)。

三、詞頻統(tǒng)計WordCount

3.1 添加Maven依賴

# 將maven版本改為1.8
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

# 添加如下依賴
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-common</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-hdfs</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-mapreduce-client-core</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hadoop</groupId>
? ? <artifactId>hadoop-mapreduce-client-common</artifactId>
? ? <version>${hadoop-version}</version>
</dependency>

3.2 代碼實現(xiàn)

《wordcount.txt》

hello java
hello hadoop
hello java
hello java hadoop
java hadoop
hadoop java

Mapper類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        IntWritable intWritable = new IntWritable();

        System.out.println("WordCountMap stage Key:"+key+" Value:"+value);
        String[] words = value.toString().split(" "); // "hello world"->[hello,world]
        for (String word : words) {
            text.set(word);
            intWritable.set(1);
            context.write(text,intWritable); // <hello,1> <word,1>
        }
    }
}

Reduce類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("Reduce stage Key:"+key+" Values"+values.toString());
        int count=0;
        for (IntWritable intWritable : values) {
            count += intWritable.get();
        }

        LongWritable longWritable = new LongWritable(count);
        System.out.println("Key:"+key+" ResultValue:"+longWritable.get());

        context.write(key,longWritable);
    }
}

Driver類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        // 設(shè)置mapper類
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設(shè)置Reduce類
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定map輸入的文件路徑
        FileInputFormat.
                setInputPaths(job,new Path("D:\\Servers\\hadoopstu\\in\\wordcount.txt"));
        // 指定reduce結(jié)果的輸出路徑
        Path path = new Path("D:\\Servers\\hadoopstu\\out1");

        FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }

        FileOutputFormat.setOutputPath(job,path);

        job.waitForCompletion(true);

    }
}

3.3 代碼說明

3.3.1 對于map函數(shù)的方法

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

protected void map(LongWritable key, Text value, Context context)

繼承Mapper類,實現(xiàn)map方法,重寫的map方法中包含三個參數(shù),key、value就是輸入的key、value鍵值對(<K1,V1>),context記錄的是整個上下文,可以通過context將數(shù)據(jù)寫出去。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

3.3.2 對于reduce函數(shù)的方法

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

protected void reduce(Text key, Iterable<IntWritable> values, Context context)

繼承Reduce類,實現(xiàn)reduce方法,reduce函數(shù)輸入的是一個<K,V>形式,但是這里的value是以迭代器的形式Iterable<IntWritable> value。即,reduce的輸入是一個key對應(yīng)一組value。

reduce中context參數(shù)與map中的reduce參數(shù)作用一致。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔

3.3.3 對于main函數(shù)的調(diào)用

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大數(shù)據(jù),Powered by 金山文檔
  1. 創(chuàng)建configuration()類,作用是讀取MapReduce系統(tǒng)配置信息

  1. 創(chuàng)建job類

  1. 設(shè)置map函數(shù)、map函數(shù)輸出的key、value類型

  1. 設(shè)置reduce函數(shù)、reduce函數(shù)輸出的key、value類型,即最終存儲再HDFS結(jié)果文件中的key、value類型文章來源地址http://www.zghlxwxcb.cn/news/detail-786888.html

到了這里,關(guān)于Hadoop三大框架之MapReduce工作流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 大數(shù)據(jù)框架之Hadoop:MapReduce(三)MapReduce框架原理——MapTask工作機(jī)制

    大數(shù)據(jù)框架之Hadoop:MapReduce(三)MapReduce框架原理——MapTask工作機(jī)制

    MapTask工作機(jī)制如下圖所示。 (1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。 (2)Map階段:該節(jié)點主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value。 (3)Collect收集階段:在用戶編寫map()函數(shù)中,當(dāng)數(shù)據(jù)處

    2023年04月08日
    瀏覽(19)
  • Hadoop三大“金剛”完美剖析 ─────── HDFS、MapReduce、YARN

    Hadoop三大“金剛”完美剖析 ─────── HDFS、MapReduce、YARN

    因為HDFS是分布式儲存文件的模式,所以在儲存文件的數(shù)據(jù)時,會將文件切分為大小一致的數(shù)據(jù)塊, 如果出現(xiàn)文件大小不是128M的倍數(shù)時,那么最后一個文件會與之前切分文件大小不一致。 被切分成的數(shù)據(jù)塊就是Block塊,NameNode將Block塊進(jìn)行分布式儲存到DataNode中。? ? (Block塊

    2024年04月10日
    瀏覽(17)
  • Hadoop MapReduce 是如何工作的?

    作者:禪與計算機(jī)程序設(shè)計藝術(shù) Hadoop MapReduce(以下簡稱MR)是一個分布式計算框架,基于Google開發(fā),用于并行處理海量數(shù)據(jù)集。其提供簡單、高效的數(shù)據(jù)處理能力,并可運行于多種平臺上,廣泛應(yīng)用于數(shù)據(jù)分析領(lǐng)域。因此,掌握MR的原理及其工作方式對于利用它進(jìn)行海量數(shù)據(jù)

    2024年02月10日
    瀏覽(20)
  • Hadoop-MapReduce-跟著日志理解整體流程

    Hadoop-MapReduce-跟著日志理解整體流程

    vi?input_01.txt vi?input_02.txt vi?input_03.txt 文本內(nèi)容如下: -----------------input_01.txt---------------- java ? ?scala ? python c++ ? ? java ? ?js go ? ? ?go ? ? ?vba c ? ? ? c ? ? ? c++ java ? ?scala ? python php ? ? css ? ? html js ? ? ?java ? ?java ? ?scala ? vba c# ? ? ?.net R ? ? ? R ? ? ? R ? ?

    2024年01月25日
    瀏覽(13)
  • Hadoop的第二個核心組件:MapReduce框架第四節(jié)

    Hadoop的第二個核心組件:MapReduce框架第四節(jié)

    MapReduce可以對海量數(shù)據(jù)進(jìn)行計算,但是有些情況下,計算的結(jié)果可能來自于多個文件,每個文件的數(shù)據(jù)格式是不一致,但是多個文件存在某種關(guān)聯(lián)關(guān)系,類似于MySQL中外鍵關(guān)系,如果想計算這樣的結(jié)果,MR程序也是支持的。這種計算我們稱之為join計算。 MR的join根據(jù)join數(shù)據(jù)的位

    2024年02月09日
    瀏覽(24)
  • Hadoop的第二個核心組件:MapReduce框架第三節(jié)

    Hadoop的第二個核心組件:MapReduce框架第三節(jié)

    InputFormat階段 :兩個作用 負(fù)責(zé)對輸入的數(shù)據(jù)進(jìn)行切片,切片的數(shù)據(jù)和Mapper階段的MapTask的數(shù)量是相對應(yīng)的。 負(fù)責(zé)MapTask讀取切片數(shù)據(jù)時,如何將切片的數(shù)據(jù)轉(zhuǎn)換成為Key-value類型的數(shù)據(jù),包括key-value的數(shù)據(jù)類型的定義。 Mapper階段 作用處理每一個切片數(shù)據(jù)的計算邏輯。 map方法的執(zhí)

    2024年02月09日
    瀏覽(24)
  • Hadoop的第二個核心組件:MapReduce框架第二節(jié)

    Hadoop的第二個核心組件:MapReduce框架第二節(jié)

    1、客戶端在執(zhí)行MR程序時,客戶端先根據(jù)設(shè)置的InputFormat實現(xiàn)類去對輸入的數(shù)據(jù)文件進(jìn)行切片(getSplits),如果沒有設(shè)置InputFormat實現(xiàn)類,MR程序會使用默認(rèn)的實現(xiàn)類(TextInputFormat–FileInputFormat的子類)進(jìn)行切片規(guī)劃,生成一個切片規(guī)劃文件 2、客戶端的切片規(guī)劃文件生成以后

    2024年02月09日
    瀏覽(29)
  • Hadoop的第二個核心組件:MapReduce框架第一節(jié)

    Hadoop的第二個核心組件:MapReduce框架第一節(jié)

    Hadoop解決了大數(shù)據(jù)面臨的兩個核心問題:海量數(shù)據(jù)的存儲問題、海量數(shù)據(jù)的計算問題 其中MapReduce就是專門設(shè)計用來解決海量數(shù)據(jù)計算問題的,同時MapReduce和HDFS不一樣的地方在于,雖然兩者均為分布式組件,但是HDFS是一個完善的軟件,我們只需要使用即可,不需要去進(jìn)行任何

    2024年02月09日
    瀏覽(22)
  • Hadoop入門學(xué)習(xí)筆記——四、MapReduce的框架配置和YARN的部署

    Hadoop入門學(xué)習(xí)筆記——四、MapReduce的框架配置和YARN的部署

    視頻課程地址:https://www.bilibili.com/video/BV1WY4y197g7 課程資料鏈接:https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd=5ay8 Hadoop入門學(xué)習(xí)筆記(匯總) 本次YARN的部署結(jié)構(gòu)如下圖所示: 當(dāng)前,共有三臺服務(wù)器(虛擬機(jī))構(gòu)成集群,集群規(guī)劃如下所示: 主機(jī) 部署的服務(wù) node1 ResourceManager、N

    2024年02月04日
    瀏覽(17)
  • MapReduce是Hadoop的一個核心組件,它是一個編程模型和計算框架

    MapReduce是Hadoop的一個核心組件,它是一個編程模型和計算框架

    MapReduce是Hadoop的一個核心組件,它是一個編程模型和計算框架,用于處理和生成大數(shù)據(jù)集。MapReduce模型將大數(shù)據(jù)處理任務(wù)分解為兩個階段:Map階段和Reduce階段。在Map階段,輸入的數(shù)據(jù)被分割成一系列的鍵值對,然后通過用戶定義的函數(shù)進(jìn)行處理,生成中間的鍵值對。在Reduce階

    2024年02月03日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包