国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hadoop3教程(十四):MapReduce中的排序

這篇具有很好參考價值的文章主要介紹了Hadoop3教程(十四):MapReduce中的排序。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

(99)WritableComparable排序

什么是排序

排序是MR中最重要的操作之一,也是面試中可能被問到的重點。

MapTask和ReduceTask中都會對數(shù)據(jù)按照KEY來排序,主要是為了效率,排完序之后,相同key值的數(shù)據(jù)會被放在一起,更方便下一步(如Reducer())的匯總處理。

默認排序是按照字典順序(字母由小到大,或者是數(shù)字由小到大)排序,且實現(xiàn)該排序的方法是快速排序。

什么時候需要排序

MR的過程中,什么時候用到了排序呢?

Map階段:

  • 環(huán)形緩沖區(qū)溢寫到磁盤之前,會將每個分區(qū)內(nèi)數(shù)據(jù)分別進行一個快排,這個排序是在內(nèi)存中完成的;(對key的索引,按照字典順序排列)
  • 環(huán)形緩沖區(qū)多輪溢寫完畢后,會形成一堆文件,這時候會對這些文件做merge歸并排序,我理解是單個MapTask最終會匯總形成一個文件;

Reduce階段:

  • ReduceTask會主動拉取MapTask們的輸出文件,理論上是會優(yōu)先保存到內(nèi)存里,但是往往內(nèi)存里放不下,所以多數(shù)情況下會直接溢寫到磁盤,于是我們會得到多個文件。當文件數(shù)量超過閾值,之后需要做歸并排序,合并成一個大文件。如果是內(nèi)存中的數(shù)據(jù)超過閾值,則會進行一次合并后將數(shù)據(jù)溢寫到磁盤。當所有數(shù)據(jù)拷貝完后,ReduceTask會統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次歸并排序。
  • 文件合并后其實還可以進行一個分組排序,過于復雜,這里就不介紹了。

排序有哪些分類

MR里的排序還有部分排序、全排序輔助排序、二次排序的不同說法,注意,它們之間不是像那種傳統(tǒng)的排序算法之間的區(qū)別,只是當排序在不同場景的時候,分別起了個名字。

MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序,保證輸出的每個文件內(nèi)部是有序的,這就是部分排序

最終輸出結果只有一個文件,且文件內(nèi)部有序。這就是全排序

全排序的實現(xiàn)方式是只設置一個ReduceTask。但是這種方式在處理大型文件時效率很低很低,因為一臺機器處理全部數(shù)據(jù),完全沒有利用MR所提供的并行架構的優(yōu)勢,生產(chǎn)環(huán)境上完全不適用。

所以生產(chǎn)環(huán)境里,常用的還是部分排序。

輔助排序,就是GroupingComparator分組。

這個似乎是可選的,是在Reduce階段,Reducer在從Map階段主動拉取完數(shù)據(jù)后,會對所有文件做一次歸并排序。做完歸并排序之后,理論上就可以進行輔助排序。

輔助排序有啥用呢,就是當接收到的Key是個bean對象時,輔助排序可以讓一個或者幾個字段相同的key(全部字段不相同)進入同一個Reduce(),所以也起名叫做分組排序。

二次排序比較簡單,在自定義排序過程中,如果compareTo中的判斷條件為兩個,那它就是二次排序。

如何實現(xiàn)自定義排序

說到這里,那 如何實現(xiàn)自定義排序 呢?

如果是bean對象作為key傳輸,那需要實現(xiàn)WritableComparable接口,重寫compareTo方法,就可以實現(xiàn)自定義排序。

@Override
public int compareTo(FlowBean bean) {

	int result;
		
	// 按照總流量大小,倒序排列
	if (this.sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (this.sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}

	return result;
}

(100)全排序案例

案例需求

之前我們做過一個案例,輸入文件有一個,里面放的是每個手機號的上行流量和下行流量,輸出同樣是一個文件,里面放的除了手機號的上行流量和下行流量之外,還多了一行總流量。

這時候我們提一個新需求,就是我不止要這個輸出文件,我還要這個文件里的內(nèi)容,按照總流量降序排列。

思路分析

MapReduce里,只能對Key進行排序。在先前的需求里,我們是用手機號作為key,上行流量、下行流量和總流量組成一個bean,作為value,這樣的安排顯然不適合新需求。

因此我們需要改變一下,將上行流量、下行流量和總流量組成的bean作為key,而將手機號作為value,如此來排序。

所以第一步,我們需要對我們自定義的FlowBean對象聲明WritableComparable接口,并重寫CompareTo方法,這一步的目的是使得FlowBean可進行算數(shù)比較,從而允許排序:

@Override
public int CompareTo(FlowBean o){
    // 按照總流量,降序排列
    return this.sumFlow > o.getSumFlow()?-1:1;
}

注意這里,因為Hadoop里默認的字典排序是從小到大排序,如果想實現(xiàn)案例里由大到小的排序,那么當大于的時候,就要返回-1,從而將大的值排在前面。

其次,Mapper類里:

context.write(bean, 手機號)

bean成了key,手機號成了value。

最后,Reduce類里,需要循環(huán)輸出,避免出現(xiàn)總流量相同的情況。

for (Text text: values){
    context.write(text, key);	// 注意順序,原先的key放在value位置
}

2023-7-19 11:16:04 這里沒懂。。。

哦哦明白了,什么樣的數(shù)據(jù)會進一個Reducer呢,當然是key 值相同的會進同一個,又因為我們之前compareTo的時候用的是總流量,所以最后是總流量相同的記錄會送進同一個Reducer,然后匯總成一條記錄做輸出,畢竟reducer就是用來做匯總的。

但"匯總成一條記錄"這并不是我們想要的,我們需要的是把這些數(shù)據(jù)原模原樣輸出來。這就是為什么我們在Reducer的reduce()里面,要加上循環(huán)輸出的原因。

實際代碼

貼一下教程里的代碼實現(xiàn):

首先是FlowBean對象,需要聲明WritableComparable接口,并重寫CompareTo()

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //總流量

    //提供無參構造
    public FlowBean() {
    }

    //生成三個屬性的getter和setter方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //實現(xiàn)序列化和反序列化方法,注意順序一定要一致
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.upFlow);
        out.writeLong(this.downFlow);
        out.writeLong(this.sumFlow);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    //重寫ToString,最后要輸出FlowBean
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {

        //按照總流量比較,倒序排列
        if(this.sumFlow > o.sumFlow){
            return -1;
        }else if(this.sumFlow < o.sumFlow){
            return 1;
        }else {
            return 0;
        }
    }
}

然后編寫Mapper類:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1 獲取一行數(shù)據(jù)
        String line = value.toString();

        //2 按照"\t",切割數(shù)據(jù)
        String[] split = line.split("\t");

        //3 封裝outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);

        //4 寫出outK outV
        context.write(outK,outV);
    }
}

然后編寫Reducer類:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //遍歷values集合,循環(huán)寫出,避免總流量相同的情況
        for (Text value : values) {
            //調(diào)換KV位置,反向寫出
            context.write(value,key);
        }
    }
}

最后編寫驅動類:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 獲取job對象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 關聯(lián)本Driver類
        job.setJarByClass(FlowDriver.class);

        //3 關聯(lián)Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 設置Map端輸出數(shù)據(jù)的KV類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //5 設置程序最終輸出的KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));

        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

完成,僅做了解即可。

(101)二次排序案例

二次排序的概念很簡單,其實之前提過了,就是在自定義排序的時候,判斷條件有兩個。

比如說,原先我對一堆人排序,是按照身高從高到低排,但是身高一樣的就沒法排序了,這時候我可以再加入一個判斷條件,比如說如果身高一樣的話,就按體重排序。

具體就是修改FlowBean的CompareTo方法,在第一條件相等的時候,添加第二判定條件。

public int compareTo(FlowBean o) {

    //按照總流量比較,倒序排列
    if(this.sumFlow > o.sumFlow){
        return -1;
    }else if(this.sumFlow < o.sumFlow){
        return 1;
    }else {
        if (this.upFlow > o.upFlow){
            return 1;
        } else if (this.upFlow < o.upFlow){
            return -1;
        }
        else {
            return 0;
        }
        
    }
}

如果有需要的話,還可以繼續(xù)加第三判定條件。

(102) 區(qū)內(nèi)排序案例

還是之前的手機號案例,之前我們想要的是,只有一個文件,然后文件內(nèi)所有數(shù)據(jù)按照總流量降序排列。

現(xiàn)在我們提出一個新要求,按照前3位來分區(qū)輸出,比如說136的在一個文件里,137的在一個文件里,以此類推。而且每個文件內(nèi)部,還需要按照總流量降序排列。

本質上就是之前說的分區(qū) + 排序,這兩部分的結合。需要額外定義好Partitioner類。

貼一下教程里的代碼示例,其實只需要在上一小節(jié)的基礎上補充自定義分區(qū)類即可:

首先自定義好分區(qū)類:

package com.atguigu.mapreduce.partitionercompable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        //獲取手機號前三位
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定義一個分區(qū)號變量partition,根據(jù)prePhone設置分區(qū)號
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分區(qū)號partition
        return partition;
    }
}

然后在驅動類里注冊好分區(qū)器:

// 設置自定義分區(qū)器
job.setPartitionerClass(ProvincePartitioner2.class);

// 設置對應的ReduceTask的個數(shù)
job.setNumReduceTasks(5);

其他跟上一小節(jié)保持一致即可。文章來源地址http://www.zghlxwxcb.cn/news/detail-724441.html

參考文獻

  1. 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】

到了這里,關于Hadoop3教程(十四):MapReduce中的排序的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Hadoop3教程(二十四):Yarn的常用命令與參數(shù)配置實例

    Hadoop3教程(二十四):Yarn的常用命令與參數(shù)配置實例

    本章我是僅做了解,所以很多地方并沒有深入去探究,用處估計不大,可酌情參考。 列出所有Application : yarn application -list 根據(jù)Application狀態(tài)過濾出指定Application ,如過濾出已完成的Application: yarn application -list -appStates FINISHED Application的狀態(tài)有:ALL、NEW、NEW_SAVING、SUBMITTED、

    2024年02月08日
    瀏覽(76)
  • Hadoop3.0大數(shù)據(jù)處理學習3(MapReduce原理分析、日志歸集、序列化機制、Yarn資源調(diào)度器)

    Hadoop3.0大數(shù)據(jù)處理學習3(MapReduce原理分析、日志歸集、序列化機制、Yarn資源調(diào)度器)

    前言:如果想知道一堆牌中有多少張紅桃,直接的方式是一張張的檢查,并數(shù)出有多少張紅桃。 而MapReduce的方法是,給所有的節(jié)點分配這堆牌,讓每個節(jié)點計算自己手中有幾張是紅桃,然后將這個數(shù)匯總,得到結果。 官方介紹:MapReduce是一種分布式計算模型,由Google提出,

    2024年02月08日
    瀏覽(45)
  • Hadoop3 - MapReduce COVID-19 案例實踐

    Hadoop3 - MapReduce COVID-19 案例實踐

    上篇文章對 MapReduce 進行了介紹,并編寫了 WordCount 經(jīng)典案例的實現(xiàn),本篇為繼續(xù)加深 MapReduce 的用法,實踐 COVID-19 新冠肺炎案例,下面是上篇文章的地址: https://blog.csdn.net/qq_43692950/article/details/127195121 COVID-19,簡稱“新冠肺炎”,世界衛(wèi)生組織命名為“2019冠狀病毒病” [1-

    2024年02月08日
    瀏覽(18)
  • 【大數(shù)據(jù)基礎】Hadoop3.1.3安裝教程

    【大數(shù)據(jù)基礎】Hadoop3.1.3安裝教程

    來源: https://dblab.xmu.edu.cn/blog/2441/ 前言:重裝解決一切bug!事實上,問題中的絕大部分衍生問題都可以通過重裝解決。 創(chuàng)建Hadoop用戶 首先按 ctrl+alt+t 打開終端窗口,輸入如下命令創(chuàng)建新用戶 : 接著使用如下命令設置密碼,可簡單設置為 hadoop,按提示輸入兩次密碼: 可為

    2024年02月09日
    瀏覽(56)
  • Hadoop3教程(三十六):(生產(chǎn)調(diào)優(yōu)篇)企業(yè)開發(fā)場景中的參數(shù)調(diào)優(yōu)案例概述

    這章僅做興趣了解即可。 需求:從1G數(shù)據(jù)中,統(tǒng)計每個單詞出現(xiàn)次數(shù)。服務器3臺,每臺配置4G內(nèi)存,4核CPU,4線程。 需求分析: 1G / 128m = 8個MapTask;1個ReduceTask;1個mrAppMaster 平均每個節(jié)點運行10個 / 3臺 ≈ 3個任務(4 3 3) 當然,這只是個案例演示,生產(chǎn)環(huán)境中一般是結合機器

    2024年02月08日
    瀏覽(25)
  • 大數(shù)據(jù)-安裝 Hadoop3.1.3 詳細教程-偽分布式配置(Centos7)

    大數(shù)據(jù)-安裝 Hadoop3.1.3 詳細教程-偽分布式配置(Centos7)

    **相關資源:**https://musetransfer.com/s/q43oznf6f(有效期至2023年3月16日)|【Muse】你有一份文件待查收,請點擊鏈接獲取文件 1.檢查是否安裝ssh (CentOS 7 即使是最小化安裝也已附帶openssh 可跳過本步驟) 若已安裝進行下一步驟 若未安裝 請自行百度 本教程不做過多講解 2.配置ss

    2023年04月08日
    瀏覽(24)
  • MapReduce排序機制(Hadoop)

    MapReduce排序機制(Hadoop)

    在MapReduce中, 排序的目的是為了方便Reduce階段的處理,通常是為了將相同鍵的鍵值對聚合在一起,以便進行聚合操作或其他處理。 對于MapTask,它會將處理的結果暫時放到環(huán)形緩沖區(qū)中,當環(huán)形緩沖區(qū)使 用率達到一定 閾值 后,再對緩沖區(qū)中的數(shù)據(jù)進行一次快速排序,并將這

    2024年04月24日
    瀏覽(20)
  • Hadoop3教程(二十八):(生產(chǎn)調(diào)優(yōu)篇)NN、DN的多目錄配置及磁盤間數(shù)據(jù)均衡

    Hadoop3教程(二十八):(生產(chǎn)調(diào)優(yōu)篇)NN、DN的多目錄配置及磁盤間數(shù)據(jù)均衡

    NN多目錄的意思是,本地目錄可以配置成多個,且每個目錄存放內(nèi)容相同,這樣的目的是增加可靠性。比如說下圖這樣: 但其實生產(chǎn)中不常用哈, 生產(chǎn)中要增加NN的可靠性的話,一般會開啟NN的高可用,即在不同節(jié)點上開啟多個NN,靠zookeeper來協(xié)調(diào) 。 所以本節(jié)就 了解一下即可

    2024年02月08日
    瀏覽(27)
  • Hadoop-MapReduce排序(超級詳細)

    Hadoop-MapReduce排序(超級詳細)

    ———————————————————————— ————————————————————————? (1)map map task 會從本地?件系統(tǒng)讀取數(shù)據(jù),轉換成key-value形式的鍵值對集合。使?的是hadoop內(nèi)置的數(shù)據(jù)類型,?如longwritable、text等。 (2)shuffle [1] 溢出 [2] 分區(qū)

    2024年02月03日
    瀏覽(15)
  • Hadoop(01) Hadoop3.3.6安裝教程,單機/偽分布式配置

    Hadoop(01) Hadoop3.3.6安裝教程,單機/偽分布式配置

    在安裝 Hadoop 3.3.6 前,需要滿足以下前置條件: Java Development Kit (JDK):Hadoop 是用 Java 編寫的,因此需要安裝并配置適當版本的 JDK。Hadoop 3.3.6 建議使用 JDK 8 或更高版本。確保正確安裝 JDK,并設置 JAVA_HOME 環(huán)境變量。 SSH:Hadoop 集群中的節(jié)點需要通過 SSH 進行通信和管理。確保在

    2024年02月06日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包