目錄
什么是序列化和反序列化?
序列化
反序列化
為什么要序列化?
序列化的主要應(yīng)用場景
MapReduce實(shí)現(xiàn)序列化
自定義bean對象實(shí)現(xiàn)Writable接口
1.實(shí)現(xiàn)Writable接口
2.無參構(gòu)造
3.重寫序列化方法
4.重寫反序列化方法
5.順序一致
6.重寫toString
7.實(shí)現(xiàn)Comparable接口
MapReduce自定義序列化案例?
案例
解決思路
Map階段
Reduce階段
Bean
Coding
1、編寫B(tài)ean
2、編寫Mapper類
3、編寫Reducer類
4、編寫Runner類?
運(yùn)行結(jié)果
斷點(diǎn)設(shè)置技巧
什么是序列化和反序列化?
序列化
序列化是將對象的狀態(tài)信息轉(zhuǎn)化為可以存儲(chǔ)或傳輸?shù)男问降倪^程,通常指將對象在內(nèi)存中的狀態(tài)信息轉(zhuǎn)換為可以被存儲(chǔ)在外部介質(zhì)上的二進(jìn)制流或其他格式的數(shù)據(jù),以便在需要時(shí)可以重新讀取和還原對象的狀態(tài)信息。
反序列化
反序列化則是將存儲(chǔ)或傳輸?shù)臄?shù)據(jù)重新裝配成對象的過程。
為什么要序列化?
因?yàn)镸apReduce是一個(gè)分布式計(jì)算框架,需要將數(shù)據(jù)在各個(gè)節(jié)點(diǎn)之間傳輸。而網(wǎng)絡(luò)傳輸必須是二進(jìn)制數(shù)據(jù),所以不同節(jié)點(diǎn)之間的數(shù)據(jù)傳輸就需要將數(shù)據(jù)轉(zhuǎn)換為二進(jìn)制流進(jìn)行傳輸,因此需要進(jìn)行序列化。
序列化的主要應(yīng)用場景
- 對象的持久化:將對象保存到磁盤或數(shù)據(jù)庫中,以便在需要時(shí)可以重新讀取和還原對象的狀態(tài)信息。
-
遠(yuǎn)程方法調(diào)用(RPC):將對象轉(zhuǎn)換為可以在網(wǎng)絡(luò)上傳輸?shù)母袷?,以便在不同的進(jìn)程或計(jì)算機(jī)之間進(jìn)行遠(yuǎn)程通信。
- 分布式計(jì)算:將對象轉(zhuǎn)換為可以在分布式計(jì)算環(huán)境中進(jìn)行傳輸和計(jì)算的格式,以便在不同的計(jì)算節(jié)點(diǎn)之間進(jìn)行數(shù)據(jù)傳輸和計(jì)算。
MapReduce實(shí)現(xiàn)序列化
在實(shí)際開發(fā)中,基本的序列化類型往往不能滿足實(shí)際開發(fā)需求,比如在Hadoop內(nèi)部傳遞一個(gè)bean對象,Hadoop的基本序列化類型是沒有這種類型的,所以這就需要我們自己去構(gòu)造該序列化類型。
自定義bean對象實(shí)現(xiàn)Writable接口
Writable接口是Hadoop序列化框架的核心接口,用戶可以通過實(shí)現(xiàn)該接口來實(shí)現(xiàn)自定義的序列化類型。Writable接口的實(shí)現(xiàn)類包括IntWritable、DoubleWritable、Text等,我們可以通過繼承Writable接口來實(shí)現(xiàn)自定義的序列化類。
1.實(shí)現(xiàn)Writable接口
2.無參構(gòu)造
反序列化需要反射調(diào)用無參構(gòu)造函數(shù),所以必須有無參構(gòu)造
3.重寫序列化方法
4.重寫反序列化方法
5.順序一致
序列化和反序列化的順序必須完全一致,也就是說序列化的順序?yàn)椋╝,b,c),那么反序列化的順序也應(yīng)該為(a,b,c)
6.重寫toString
如果需要把結(jié)果顯示在文件中,需要重寫toString,不然對象輸出就是一個(gè)地址值
7.實(shí)現(xiàn)Comparable接口
如果需要將我們自定義的bean放在key中傳輸,就必須重寫Comparable接口,因?yàn)镸apReduce框架中的Shuffle過程要求key必須能夠排序。
MapReduce自定義序列化案例?
案例
統(tǒng)計(jì)每一個(gè)用戶耗費(fèi)的總上行流量、總下行流量、總流量。
輸入案例:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網(wǎng)站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.# 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.# 站點(diǎn)統(tǒng)計(jì) 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點(diǎn)統(tǒng)計(jì) 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
輸入格式:
????????時(shí)間戳、電話號(hào)碼、基站的物理地址、訪問網(wǎng)址的ip、網(wǎng)站域名、數(shù)據(jù)包、接包數(shù)、上行/傳流量、下行/載流量、響應(yīng)碼
輸出格式:
????????手機(jī)號(hào)碼 上行流量 下行流量 總流量
解決思路
Map階段
- 讀取一行數(shù)據(jù),切分字段
- 獲取我們需要的值(手機(jī)號(hào)、上行流和下行流量)
- 以手機(jī)號(hào)為key,bean對象為value輸出(即context.write(手機(jī)號(hào),bean))
Map<KRYIN,VALUEIN,KEYOUT,VALUEOUT>
- KEYIN:map階段key是文本偏移量,不需要設(shè)置。
- VALUEIN:同樣value是文本中一行的數(shù)據(jù),我們不需要設(shè)置。
- KEYOUT:我們希望對相同的手機(jī)號(hào)的流量進(jìn)行累加,所以KEYOUT應(yīng)該是手機(jī)號(hào)。
- VALUEOUT:map階段我們希望輸出一個(gè)手機(jī)號(hào)(KEY)對應(yīng)三個(gè)值(上行流量、下行流量和總流量)。
Reduce階段
累加上行流量和下行總流量得到總流量(合并相同手機(jī)號(hào)的上行流量和下行流量,即<手機(jī)號(hào),bean1+bean2+...>)
- KEYIN:reduce階段KEYIN就是map階段的輸出KEYOUT,上面設(shè)計(jì)好了我們就不需要設(shè)置了。
- VALUEIN:同樣VALUEIN就是map階段的輸出VALUEOUT,我們不需要設(shè)置。
- KEYOUT:輸出手機(jī)號(hào)。
- VALUEOUT:輸出該手機(jī)號(hào)對應(yīng)的bean對象(需要重寫toString)。
Bean
我們這里的Bean是作為輸出的Value,所以不需要繼承Comparable接口,僅僅需要注意是就是重寫toString方法。
Coding
1、編寫B(tài)ean
- 我們這里的Bean不需要繼承Comparable接口,因?yàn)樗蛔鳛镵ey,我們這里的Key是手機(jī)號(hào),是一個(gè)字符串,它是Text序列化類型,在Hadoop中,它已經(jīng)繼承了Comparable接口。
- 下面的Bean中,我們重載了setSumFlow方法,因?yàn)閟umFlow并不是原始數(shù)據(jù)中存在的,而是我們我們通過獲取upFlow和downFlow計(jì)算和得來的。
- 我們重寫了toString方法來滿足輸出格式的要求。
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1.實(shí)現(xiàn)Writable接口
* 2.重寫序列化接口和反序列化接口
* 3.重寫無參構(gòu)造
* 4.重寫toString方法
*/
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //總流量
//3.無參構(gòu)造
public FlowBean(){
}
//2.1序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
//序列化順序無所謂,但是必須和反序列化順序一致
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
//2.2反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
//重載setSumFlow方法
public void setSumFlow() {
this.sumFlow = this.upFlow+this.downFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
2、編寫Mapper類
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
//盡可能節(jié)省內(nèi)存,不要每次讀取一行就新建對象
private String phone; //手機(jī)號(hào)
private long upFlow; //上行流量
private long downFlow; //下行流量
private Text outKey = new Text();
private FlowBean outValue = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.獲取一行
//1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
String line = value.toString();
//2.切割
String[] words = StringUtils.split(line,'\t');
//3.獲取想要的數(shù)據(jù)
phone = words[1];
upFlow = Long.parseLong(words[words.length-3]);
downFlow = Long.parseLong(words[words.length-2]);
//4.封裝
outKey.set(phone);
outValue.setUpFlow(upFlow);
outValue.setDownFlow(downFlow);
outValue.setSumFlow();
//5.寫出
context.write(outKey,outValue);
}
}
3、編寫Reducer類
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outValue = new FlowBean();
//reduce方法每次只計(jì)算相同的key,所以totalUp和totalDown必須放在reduce方法內(nèi)部,否則會(huì)把所有key的上行流量和下行流量加在一起
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
//1.遍歷集合,累加值
long totalUp = 0; //上行流量
long totalDown = 0; //下行流量
for (FlowBean value : values) {
totalUp += value.getUpFlow();
totalDown += value.getDownFlow();
}
//2.封裝outKey,outValue
outValue.setUpFlow(totalUp);
outValue.setDownFlow(totalDown);
outValue.setSumFlow();
//3. 寫出
context.write(key,outValue);
}
}
4、編寫Runner類?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowRunner extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new FlowRunner(),args);
}
@Override
public int run(String[] args) throws Exception {
//1.獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "flow compu");
//2.配置jar包路徑
job.setJarByClass(FlowRunner.class);
//3.關(guān)聯(lián)mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4.設(shè)置map、reduce輸出的k、v類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//5.設(shè)置數(shù)據(jù)輸入的路徑
FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\writable\\input1"));
//6.設(shè)置輸出路徑-輸出目錄不可存在
FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\writable\\output1"));
//7.提交job
return job.waitForCompletion(true) ? 0 : 1;//verbose:是否監(jiān)控并打印job的信息
}
}
運(yùn)行結(jié)果
計(jì)算正確 !
斷點(diǎn)設(shè)置技巧
?調(diào)試MapReduce程序的時(shí)候,我們一般把斷點(diǎn)設(shè)置在map和reduce方法內(nèi)部。文章來源:http://www.zghlxwxcb.cn/news/detail-462783.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-462783.html
到了這里,關(guān)于MapReduce序列化【用戶流量使用統(tǒng)計(jì)】的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!