(113)ReduceJoin案例需求分析
現(xiàn)在有兩個(gè)文件:
- orders.txt,存放的是訂單ID、產(chǎn)品ID、產(chǎn)品數(shù)量
- pd.txt,這是一個(gè)產(chǎn)品碼表,存放的是產(chǎn)品ID、產(chǎn)品中文名;
現(xiàn)在是想通過join,來實(shí)現(xiàn)這么一個(gè)預(yù)期輸出,即訂單ID、產(chǎn)品中文名、產(chǎn)品數(shù)量。
以上是本次案例需求。
簡單思考一下思路。我們需要將關(guān)聯(lián)條件作為Map輸出的key,將兩表滿足Join條件的數(shù)據(jù)以及數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)ReduceTask,在Reduce中進(jìn)行數(shù)據(jù)的串聯(lián)。
具體該怎么做呢?
Map中在處理的時(shí)候,需要獲取輸入的文件內(nèi)容和文件名(這個(gè)是可以在切片的時(shí)候獲取的),然后不同文件分別做不同處理,處理完成后封裝bean對象輸出。
注意,Map在輸出的時(shí)候,需要以產(chǎn)品ID作為key,只有這樣做,才能將相同產(chǎn)品ID的orders.txt記錄和pd.txt記錄,放在同一個(gè)reduceTask里,進(jìn)而實(shí)現(xiàn)最終的替換。value的話,選擇訂單ID、訂單數(shù)量、文件名。這里傳入文件名的原因是Reduce階段需要根據(jù)不同文件名實(shí)現(xiàn)不同處理,所以一定得需要傳一個(gè)文件名進(jìn)來。
另外提一句,封裝bean對象的時(shí)候,需要把兩個(gè)文件里的所有字段合起來作為一個(gè)bean對象,這樣子,orders文件的數(shù)據(jù)可以用這個(gè)bean對象,pd.txt里的數(shù)據(jù)也可以用這個(gè)bean對象。相當(dāng)于做一個(gè)大寬表。
reduce階段就很簡單了,相同產(chǎn)品ID的orders.txt記錄和pd.txt記錄,被放在同一個(gè)reduceTask里,可以把來自orders的bean放在一個(gè)集合里,來自pd的bean放在一個(gè)集合里,然后遍歷set覆蓋就可以。
(114)ReduceJoin案例代碼實(shí)操 - TableBean
首先需要定義一個(gè)Bean對象,用來序列化兩個(gè)輸入文件的數(shù)據(jù),我們命名為TableBean。
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements Writable {
private String id; //訂單id
private String pid; //產(chǎn)品id
private int amount; //產(chǎn)品數(shù)量
private String pname; //產(chǎn)品名稱
private String flag; //判斷是order表還是pd表的標(biāo)志字段
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
// 反序列化方法
// 注意,序列化的順序必須要跟反序列化的順序一致
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
}
注意,序列化的順序必須要跟反序列化的順序一致。
(115)ReduceJoin案例代碼實(shí)操 - TableMapper
TableMapper的主要作用,就是將輸入的數(shù)據(jù),劃分成指定的KV對,以供Reduce階段使用。
命名為TableMapper,獲取文件名稱的代碼也包含在這里。
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//獲取對應(yīng)文件名稱
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取一行
String line = value.toString();
//判斷是哪個(gè)文件,然后針對文件進(jìn)行不同的操作
if(filename.contains("order")){ //訂單表的處理
String[] split = line.split("\t");
//封裝outK
outK.set(split[1]);
//封裝outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
}else { //商品表的處理
String[] split = line.split("\t");
//封裝outK
outK.set(split[0]);
//封裝outV
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
//寫出KV
context.write(outK,outV);
}
}
(116)ReduceJoin案例代碼實(shí)操 - Reducer及Driver
主要是編寫Reduce部分。Mapper之后,一組相同的key的數(shù)據(jù)會進(jìn)入一個(gè)ReduceTask,接下來需要編寫自定義邏輯,讓Reduce可以實(shí)現(xiàn)關(guān)聯(lián)后輸出。
需要?jiǎng)?chuàng)建兩個(gè)集合,每個(gè)集合接收不同文件,一個(gè)接收order文件數(shù)據(jù),另一個(gè)接收碼表數(shù)據(jù)。然后循環(huán)遍歷order集合,把碼表集合里的值set進(jìn)去。
新建TableReducer:
package com.atguigu.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
ArrayList<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
for (TableBean value : values) {
//判斷數(shù)據(jù)來自哪個(gè)表
if("order".equals(value.getFlag())){ //訂單表
//創(chuàng)建一個(gè)臨時(shí)TableBean對象接收value
TableBean tmpOrderBean = new TableBean();
try {
BeanUtils.copyProperties(tmpOrderBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//將臨時(shí)TableBean對象添加到集合orderBeans
orderBeans.add(tmpOrderBean);
}else { //商品表
try {
BeanUtils.copyProperties(pdBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//遍歷集合orderBeans,替換掉每個(gè)orderBean的pid為pname,然后寫出
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
//寫出修改后的orderBean對象
context.write(orderBean,NullWritable.get());
}
}
}
根據(jù)教程上說的,這里只有一個(gè)地方需要注意,但是用處也不是很大,就是 集合在add value的時(shí)候,不能直接add 傳進(jìn)來的value,而是需要重新new一個(gè)TableBean,將value值賦值給這個(gè)新的TableBean,最后add這個(gè)新的TableBean。
這么做的原因是,傳進(jìn)來的values,其實(shí)是一個(gè)Iterable<TableBean>
,不是傳統(tǒng)意義上的迭代器,可以簡單理解成,Iterable<TableBean>
里的每個(gè)value用的是同一個(gè)內(nèi)存地址,每次讀取出value就總是賦給那個(gè)內(nèi)存地址,所以不能直接add value,否則add 一百次,也只會記住最后一次add的那個(gè)value。
這似乎是Hadoop為了避免因創(chuàng)建過多實(shí)例引起資源浪費(fèi),而做的優(yōu)化。
沒有測過,做簡單了解吧。
最后在驅(qū)動(dòng)類里注冊:文章來源:http://www.zghlxwxcb.cn/news/detail-723261.html
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
大功告成文章來源地址http://www.zghlxwxcb.cn/news/detail-723261.html
參考文獻(xiàn)
- 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】
到了這里,關(guān)于Hadoop3教程(十七):MapReduce之ReduceJoin案例分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!