(99)WritableComparable排序
什么是排序
排序是MR中最重要的操作之一,也是面試中可能被問到的重點。
MapTask和ReduceTask中都會對數(shù)據(jù)按照KEY來排序,主要是為了效率,排完序之后,相同key值的數(shù)據(jù)會被放在一起,更方便下一步(如Reducer())的匯總處理。
默認排序是按照字典順序(字母由小到大,或者是數(shù)字由小到大)排序,且實現(xiàn)該排序的方法是快速排序。
什么時候需要排序
MR的過程中,什么時候用到了排序呢?
Map階段:
- 環(huán)形緩沖區(qū)溢寫到磁盤之前,會將每個分區(qū)內(nèi)數(shù)據(jù)分別進行一個快排,這個排序是在內(nèi)存中完成的;(對key的索引,按照字典順序排列)
- 環(huán)形緩沖區(qū)多輪溢寫完畢后,會形成一堆文件,這時候會對這些文件做merge歸并排序,我理解是單個MapTask最終會匯總形成一個文件;
Reduce階段:
- ReduceTask會主動拉取MapTask們的輸出文件,理論上是會優(yōu)先保存到內(nèi)存里,但是往往內(nèi)存里放不下,所以多數(shù)情況下會直接溢寫到磁盤,于是我們會得到多個文件。當文件數(shù)量超過閾值,之后需要做歸并排序,合并成一個大文件。如果是內(nèi)存中的數(shù)據(jù)超過閾值,則會進行一次合并后將數(shù)據(jù)溢寫到磁盤。當所有數(shù)據(jù)拷貝完后,ReduceTask會統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次歸并排序。
- 文件合并后其實還可以進行一個分組排序,過于復雜,這里就不介紹了。
排序有哪些分類
MR里的排序還有部分排序、全排序、輔助排序、二次排序的不同說法,注意,它們之間不是像那種傳統(tǒng)的排序算法之間的區(qū)別,只是當排序在不同場景的時候,分別起了個名字。
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序,保證輸出的每個文件內(nèi)部是有序的,這就是部分排序。
最終輸出結果只有一個文件,且文件內(nèi)部有序。這就是全排序。
全排序的實現(xiàn)方式是只設置一個ReduceTask。但是這種方式在處理大型文件時效率很低很低,因為一臺機器處理全部數(shù)據(jù),完全沒有利用MR所提供的并行架構的優(yōu)勢,生產(chǎn)環(huán)境上完全不適用。
所以生產(chǎn)環(huán)境里,常用的還是部分排序。
輔助排序,就是GroupingComparator分組。
這個似乎是可選的,是在Reduce階段,Reducer在從Map階段主動拉取完數(shù)據(jù)后,會對所有文件做一次歸并排序。做完歸并排序之后,理論上就可以進行輔助排序。
輔助排序有啥用呢,就是當接收到的Key是個bean對象時,輔助排序可以讓一個或者幾個字段相同的key(全部字段不相同)進入同一個Reduce(),所以也起名叫做分組排序。
二次排序比較簡單,在自定義排序過程中,如果compareTo中的判斷條件為兩個,那它就是二次排序。
如何實現(xiàn)自定義排序
說到這里,那 如何實現(xiàn)自定義排序 呢?
如果是bean對象作為key傳輸,那需要實現(xiàn)WritableComparable接口,重寫compareTo方法,就可以實現(xiàn)自定義排序。
@Override
public int compareTo(FlowBean bean) {
int result;
// 按照總流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
(100)全排序案例
案例需求
之前我們做過一個案例,輸入文件有一個,里面放的是每個手機號的上行流量和下行流量,輸出同樣是一個文件,里面放的除了手機號的上行流量和下行流量之外,還多了一行總流量。
這時候我們提一個新需求,就是我不止要這個輸出文件,我還要這個文件里的內(nèi)容,按照總流量降序排列。
思路分析
MapReduce里,只能對Key進行排序。在先前的需求里,我們是用手機號作為key,上行流量、下行流量和總流量組成一個bean,作為value,這樣的安排顯然不適合新需求。
因此我們需要改變一下,將上行流量、下行流量和總流量組成的bean作為key,而將手機號作為value,如此來排序。
所以第一步,我們需要對我們自定義的FlowBean對象聲明WritableComparable接口,并重寫CompareTo方法,這一步的目的是使得FlowBean可進行算數(shù)比較,從而允許排序:
@Override
public int CompareTo(FlowBean o){
// 按照總流量,降序排列
return this.sumFlow > o.getSumFlow()?-1:1;
}
注意這里,因為Hadoop里默認的字典排序是從小到大排序,如果想實現(xiàn)案例里由大到小的排序,那么當大于的時候,就要返回-1,從而將大的值排在前面。
其次,Mapper類里:
context.write(bean, 手機號)
bean成了key,手機號成了value。
最后,Reduce類里,需要循環(huán)輸出,避免出現(xiàn)總流量相同的情況。
for (Text text: values){
context.write(text, key); // 注意順序,原先的key放在value位置
}
2023-7-19 11:16:04 這里沒懂。。。
哦哦明白了,什么樣的數(shù)據(jù)會進一個Reducer呢,當然是key 值相同的會進同一個,又因為我們之前compareTo
的時候用的是總流量,所以最后是總流量相同的記錄會送進同一個Reducer,然后匯總成一條記錄做輸出,畢竟reducer就是用來做匯總的。
但"匯總成一條記錄"這并不是我們想要的,我們需要的是把這些數(shù)據(jù)原模原樣輸出來。這就是為什么我們在Reducer的reduce()里面,要加上循環(huán)輸出的原因。
實際代碼
貼一下教程里的代碼實現(xiàn):
首先是FlowBean對象,需要聲明WritableComparable
接口,并重寫CompareTo()
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //總流量
//提供無參構造
public FlowBean() {
}
//生成三個屬性的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//實現(xiàn)序列化和反序列化方法,注意順序一定要一致
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//重寫ToString,最后要輸出FlowBean
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
//按照總流量比較,倒序排列
if(this.sumFlow > o.sumFlow){
return -1;
}else if(this.sumFlow < o.sumFlow){
return 1;
}else {
return 0;
}
}
}
然后編寫Mapper類:
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 獲取一行數(shù)據(jù)
String line = value.toString();
//2 按照"\t",切割數(shù)據(jù)
String[] split = line.split("\t");
//3 封裝outK outV
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);
//4 寫出outK outV
context.write(outK,outV);
}
}
然后編寫Reducer類:
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//遍歷values集合,循環(huán)寫出,避免總流量相同的情況
for (Text value : values) {
//調(diào)換KV位置,反向寫出
context.write(value,key);
}
}
}
最后編寫驅動類:
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 獲取job對象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 關聯(lián)本Driver類
job.setJarByClass(FlowDriver.class);
//3 關聯(lián)Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 設置Map端輸出數(shù)據(jù)的KV類型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5 設置程序最終輸出的KV類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 設置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
完成,僅做了解即可。
(101)二次排序案例
二次排序的概念很簡單,其實之前提過了,就是在自定義排序的時候,判斷條件有兩個。
比如說,原先我對一堆人排序,是按照身高從高到低排,但是身高一樣的就沒法排序了,這時候我可以再加入一個判斷條件,比如說如果身高一樣的話,就按體重排序。
具體就是修改FlowBean的CompareTo方法,在第一條件相等的時候,添加第二判定條件。
public int compareTo(FlowBean o) {
//按照總流量比較,倒序排列
if(this.sumFlow > o.sumFlow){
return -1;
}else if(this.sumFlow < o.sumFlow){
return 1;
}else {
if (this.upFlow > o.upFlow){
return 1;
} else if (this.upFlow < o.upFlow){
return -1;
}
else {
return 0;
}
}
}
如果有需要的話,還可以繼續(xù)加第三判定條件。
(102) 區(qū)內(nèi)排序案例
還是之前的手機號案例,之前我們想要的是,只有一個文件,然后文件內(nèi)所有數(shù)據(jù)按照總流量降序排列。
現(xiàn)在我們提出一個新要求,按照前3位來分區(qū)輸出,比如說136的在一個文件里,137的在一個文件里,以此類推。而且每個文件內(nèi)部,還需要按照總流量降序排列。
本質上就是之前說的分區(qū) + 排序,這兩部分的結合。需要額外定義好Partitioner類。
貼一下教程里的代碼示例,其實只需要在上一小節(jié)的基礎上補充自定義分區(qū)類即可:
首先自定義好分區(qū)類:
package com.atguigu.mapreduce.partitionercompable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
//獲取手機號前三位
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定義一個分區(qū)號變量partition,根據(jù)prePhone設置分區(qū)號
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
//最后返回分區(qū)號partition
return partition;
}
}
然后在驅動類里注冊好分區(qū)器:文章來源:http://www.zghlxwxcb.cn/news/detail-724441.html
// 設置自定義分區(qū)器
job.setPartitionerClass(ProvincePartitioner2.class);
// 設置對應的ReduceTask的個數(shù)
job.setNumReduceTasks(5);
其他跟上一小節(jié)保持一致即可。文章來源地址http://www.zghlxwxcb.cn/news/detail-724441.html
參考文獻
- 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】
到了這里,關于Hadoop3教程(十四):MapReduce中的排序的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!