国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hadoop3教程(十七):MapReduce之ReduceJoin案例分析

這篇具有很好參考價(jià)值的文章主要介紹了Hadoop3教程(十七):MapReduce之ReduceJoin案例分析。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

(113)ReduceJoin案例需求分析

現(xiàn)在有兩個(gè)文件:

  • orders.txt,存放的是訂單ID、產(chǎn)品ID、產(chǎn)品數(shù)量
  • pd.txt,這是一個(gè)產(chǎn)品碼表,存放的是產(chǎn)品ID、產(chǎn)品中文名;

現(xiàn)在是想通過join,來實(shí)現(xiàn)這么一個(gè)預(yù)期輸出,即訂單ID、產(chǎn)品中文名、產(chǎn)品數(shù)量。

以上是本次案例需求。

簡單思考一下思路。我們需要將關(guān)聯(lián)條件作為Map輸出的key,將兩表滿足Join條件的數(shù)據(jù)以及數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)ReduceTask,在Reduce中進(jìn)行數(shù)據(jù)的串聯(lián)。

具體該怎么做呢?

Map中在處理的時(shí)候,需要獲取輸入的文件內(nèi)容和文件名(這個(gè)是可以在切片的時(shí)候獲取的),然后不同文件分別做不同處理,處理完成后封裝bean對象輸出。

注意,Map在輸出的時(shí)候,需要以產(chǎn)品ID作為key,只有這樣做,才能將相同產(chǎn)品ID的orders.txt記錄和pd.txt記錄,放在同一個(gè)reduceTask里,進(jìn)而實(shí)現(xiàn)最終的替換。value的話,選擇訂單ID、訂單數(shù)量、文件名。這里傳入文件名的原因是Reduce階段需要根據(jù)不同文件名實(shí)現(xiàn)不同處理,所以一定得需要傳一個(gè)文件名進(jìn)來。

另外提一句,封裝bean對象的時(shí)候,需要把兩個(gè)文件里的所有字段合起來作為一個(gè)bean對象,這樣子,orders文件的數(shù)據(jù)可以用這個(gè)bean對象,pd.txt里的數(shù)據(jù)也可以用這個(gè)bean對象。相當(dāng)于做一個(gè)大寬表。

reduce階段就很簡單了,相同產(chǎn)品ID的orders.txt記錄和pd.txt記錄,被放在同一個(gè)reduceTask里,可以把來自orders的bean放在一個(gè)集合里,來自pd的bean放在一個(gè)集合里,然后遍歷set覆蓋就可以。

(114)ReduceJoin案例代碼實(shí)操 - TableBean

首先需要定義一個(gè)Bean對象,用來序列化兩個(gè)輸入文件的數(shù)據(jù),我們命名為TableBean。

package com.atguigu.mapreduce.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String id; //訂單id
    private String pid; //產(chǎn)品id
    private int amount; //產(chǎn)品數(shù)量
    private String pname; //產(chǎn)品名稱
    private String flag; //判斷是order表還是pd表的標(biāo)志字段

    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    // 反序列化方法
    // 注意,序列化的順序必須要跟反序列化的順序一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }
}

注意,序列化的順序必須要跟反序列化的順序一致。

(115)ReduceJoin案例代碼實(shí)操 - TableMapper

TableMapper的主要作用,就是將輸入的數(shù)據(jù),劃分成指定的KV對,以供Reduce階段使用。

命名為TableMapper,獲取文件名稱的代碼也包含在這里。

package com.atguigu.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {

    private String filename;
    private Text outK = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //獲取對應(yīng)文件名稱
        InputSplit split = context.getInputSplit();
        FileSplit fileSplit = (FileSplit) split;
        filename = fileSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //獲取一行
        String line = value.toString();

        //判斷是哪個(gè)文件,然后針對文件進(jìn)行不同的操作
        if(filename.contains("order")){  //訂單表的處理
            String[] split = line.split("\t");
            //封裝outK
            outK.set(split[1]);
            //封裝outV
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
        }else {                             //商品表的處理
            String[] split = line.split("\t");
            //封裝outK
            outK.set(split[0]);
            //封裝outV
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        //寫出KV
        context.write(outK,outV);
    }
}

(116)ReduceJoin案例代碼實(shí)操 - Reducer及Driver

主要是編寫Reduce部分。Mapper之后,一組相同的key的數(shù)據(jù)會進(jìn)入一個(gè)ReduceTask,接下來需要編寫自定義邏輯,讓Reduce可以實(shí)現(xiàn)關(guān)聯(lián)后輸出。

需要?jiǎng)?chuàng)建兩個(gè)集合,每個(gè)集合接收不同文件,一個(gè)接收order文件數(shù)據(jù),另一個(gè)接收碼表數(shù)據(jù)。然后循環(huán)遍歷order集合,把碼表集合里的值set進(jìn)去。

新建TableReducer:

package com.atguigu.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {

        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        for (TableBean value : values) {

            //判斷數(shù)據(jù)來自哪個(gè)表
            if("order".equals(value.getFlag())){   //訂單表

			  //創(chuàng)建一個(gè)臨時(shí)TableBean對象接收value
                TableBean tmpOrderBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tmpOrderBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

			  //將臨時(shí)TableBean對象添加到集合orderBeans
                orderBeans.add(tmpOrderBean);
            }else {                                    //商品表
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        //遍歷集合orderBeans,替換掉每個(gè)orderBean的pid為pname,然后寫出
        for (TableBean orderBean : orderBeans) {

            orderBean.setPname(pdBean.getPname());

		   //寫出修改后的orderBean對象
            context.write(orderBean,NullWritable.get());
        }
    }
}

根據(jù)教程上說的,這里只有一個(gè)地方需要注意,但是用處也不是很大,就是 集合在add value的時(shí)候,不能直接add 傳進(jìn)來的value,而是需要重新new一個(gè)TableBean,將value值賦值給這個(gè)新的TableBean,最后add這個(gè)新的TableBean。

這么做的原因是,傳進(jìn)來的values,其實(shí)是一個(gè)Iterable<TableBean> ,不是傳統(tǒng)意義上的迭代器,可以簡單理解成,Iterable<TableBean> 里的每個(gè)value用的是同一個(gè)內(nèi)存地址,每次讀取出value就總是賦給那個(gè)內(nèi)存地址,所以不能直接add value,否則add 一百次,也只會記住最后一次add的那個(gè)value。

這似乎是Hadoop為了避免因創(chuàng)建過多實(shí)例引起資源浪費(fèi),而做的優(yōu)化。

沒有測過,做簡單了解吧。

最后在驅(qū)動(dòng)類里注冊:

package com.atguigu.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

大功告成文章來源地址http://www.zghlxwxcb.cn/news/detail-723261.html

參考文獻(xiàn)

  1. 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】

到了這里,關(guān)于Hadoop3教程(十七):MapReduce之ReduceJoin案例分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Hadoop3 - MapReduce COVID-19 案例實(shí)踐

    Hadoop3 - MapReduce COVID-19 案例實(shí)踐

    上篇文章對 MapReduce 進(jìn)行了介紹,并編寫了 WordCount 經(jīng)典案例的實(shí)現(xiàn),本篇為繼續(xù)加深 MapReduce 的用法,實(shí)踐 COVID-19 新冠肺炎案例,下面是上篇文章的地址: https://blog.csdn.net/qq_43692950/article/details/127195121 COVID-19,簡稱“新冠肺炎”,世界衛(wèi)生組織命名為“2019冠狀病毒病” [1-

    2024年02月08日
    瀏覽(18)
  • Hadoop3.0大數(shù)據(jù)處理學(xué)習(xí)3(MapReduce原理分析、日志歸集、序列化機(jī)制、Yarn資源調(diào)度器)

    Hadoop3.0大數(shù)據(jù)處理學(xué)習(xí)3(MapReduce原理分析、日志歸集、序列化機(jī)制、Yarn資源調(diào)度器)

    前言:如果想知道一堆牌中有多少張紅桃,直接的方式是一張張的檢查,并數(shù)出有多少張紅桃。 而MapReduce的方法是,給所有的節(jié)點(diǎn)分配這堆牌,讓每個(gè)節(jié)點(diǎn)計(jì)算自己手中有幾張是紅桃,然后將這個(gè)數(shù)匯總,得到結(jié)果。 官方介紹:MapReduce是一種分布式計(jì)算模型,由Google提出,

    2024年02月08日
    瀏覽(44)
  • 【大數(shù)據(jù)基礎(chǔ)】Hadoop3.1.3安裝教程

    【大數(shù)據(jù)基礎(chǔ)】Hadoop3.1.3安裝教程

    來源: https://dblab.xmu.edu.cn/blog/2441/ 前言:重裝解決一切bug!事實(shí)上,問題中的絕大部分衍生問題都可以通過重裝解決。 創(chuàng)建Hadoop用戶 首先按 ctrl+alt+t 打開終端窗口,輸入如下命令創(chuàng)建新用戶 : 接著使用如下命令設(shè)置密碼,可簡單設(shè)置為 hadoop,按提示輸入兩次密碼: 可為

    2024年02月09日
    瀏覽(56)
  • 大數(shù)據(jù)Hadoop教程-學(xué)習(xí)筆記06【Hadoop生態(tài)綜合案例:陌陌聊天數(shù)據(jù)分析】

    大數(shù)據(jù)Hadoop教程-學(xué)習(xí)筆記06【Hadoop生態(tài)綜合案例:陌陌聊天數(shù)據(jù)分析】

    視頻教程:嗶哩嗶哩網(wǎng)站:黑馬大數(shù)據(jù)Hadoop入門視頻教程,總時(shí)長:14:22:04 教程資源:https://pan.baidu.com/s/1WYgyI3KgbzKzFD639lA-_g,提取碼:6666 【P001-P017】大數(shù)據(jù)Hadoop教程-學(xué)習(xí)筆記01【大數(shù)據(jù)導(dǎo)論與Linux基礎(chǔ)】【17p】 【P018-P037】大數(shù)據(jù)Hadoop教程-學(xué)習(xí)筆記02【Apache Hadoop、HDFS】【20p】

    2024年02月02日
    瀏覽(26)
  • Hadoop(01) Hadoop3.3.6安裝教程,單機(jī)/偽分布式配置

    Hadoop(01) Hadoop3.3.6安裝教程,單機(jī)/偽分布式配置

    在安裝 Hadoop 3.3.6 前,需要滿足以下前置條件: Java Development Kit (JDK):Hadoop 是用 Java 編寫的,因此需要安裝并配置適當(dāng)版本的 JDK。Hadoop 3.3.6 建議使用 JDK 8 或更高版本。確保正確安裝 JDK,并設(shè)置 JAVA_HOME 環(huán)境變量。 SSH:Hadoop 集群中的節(jié)點(diǎn)需要通過 SSH 進(jìn)行通信和管理。確保在

    2024年02月06日
    瀏覽(21)
  • Hadoop3教程(二):HDFS的定義及概述

    隨著實(shí)際生產(chǎn)環(huán)境中的數(shù)據(jù)越來越大,在一臺服務(wù)器上無法存儲下所有的數(shù)據(jù),那么就要把數(shù)據(jù)分散到多臺服務(wù)器的磁盤里存放。但是像這樣做跨服務(wù)器的數(shù)據(jù)管理和維護(hù)是很難的,所以就迫切需要一種方式,來協(xié)調(diào)管理多臺機(jī)器上的文件,這就是分布式文件管理系統(tǒng)。 HD

    2024年02月07日
    瀏覽(20)
  • Hadoop3.1.3安裝教程_單機(jī)/偽分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)

    Hadoop3.1.3安裝教程_單機(jī)/偽分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)

    目錄 前言: 一、VMware Workstation 二、Ubuntu系統(tǒng)安裝 新建虛擬機(jī) 三、Ubuntu系統(tǒng)的配置 四、更新apt 五、安裝SSH、配置SSH無密碼登陸? 六、安裝Java環(huán)境 七、安裝 Hadoop3.1.3 八、Hadoop單機(jī)配置(非分布式) 九、Hadoop偽分布式配置 前言: ????????本篇教程由作者本人進(jìn)行修改,原教

    2024年02月03日
    瀏覽(56)
  • Hadoop3教程(三):HDFS文件系統(tǒng)常用命令一覽

    hdfs命令的完整形式: 其中subcommand有三種形式: admin commands client commands:如本節(jié)重點(diǎn)要講的dfs daemon commands dfs(文件系統(tǒng)命令),這個(gè)是HDFS里,日常使用 最為頻繁的一種命令 ,用來在HDFS的文件系統(tǒng)上運(yùn)行各種文件系統(tǒng)命令,如查看文件、刪除文件等。 基本語法: 這倆基本

    2024年02月06日
    瀏覽(111)
  • Hadoop3教程(三十一):(生產(chǎn)調(diào)優(yōu)篇)異構(gòu)存儲

    Hadoop3教程(三十一):(生產(chǎn)調(diào)優(yōu)篇)異構(gòu)存儲

    異構(gòu)存儲,也叫做冷熱數(shù)據(jù)分離 。其中,經(jīng)常使用的數(shù)據(jù)被叫做是熱數(shù)據(jù),不經(jīng)常使用的數(shù)據(jù)被叫做冷數(shù)據(jù)。 把冷熱數(shù)據(jù),分別存儲在不同的存儲介質(zhì)里,從而達(dá)到對每個(gè)介質(zhì)的利用率最高,從而實(shí)現(xiàn)整體最佳性能,或者說性價(jià)比更高(比如說高性能硬盤放經(jīng)常使用的數(shù)據(jù)

    2024年02月08日
    瀏覽(22)
  • Hadoop3.1.3安裝教程單機(jī)偽分布式配置

    本教程使用 Ubuntu 18.04 64位 作為系統(tǒng)環(huán)境(或者Ubuntu 14.04,Ubuntu16.04 也行,32位、64位均可),請自行安裝系統(tǒng)。裝好了 Ubuntu 系統(tǒng)之后,在安裝 Hadoop 前還需要做一些必備工作。 創(chuàng)建hadoop用戶 如果你安裝 Ubuntu 的時(shí)候不是用的 “hadoop” 用戶,那么需要增加一個(gè)名為 hadoop 的用

    2024年02月04日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包