国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

MapReduce序列化【用戶流量使用統(tǒng)計(jì)】

這篇具有很好參考價(jià)值的文章主要介紹了MapReduce序列化【用戶流量使用統(tǒng)計(jì)】。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

什么是序列化和反序列化?

序列化

反序列化

為什么要序列化?

序列化的主要應(yīng)用場景

MapReduce實(shí)現(xiàn)序列化

自定義bean對象實(shí)現(xiàn)Writable接口

1.實(shí)現(xiàn)Writable接口

2.無參構(gòu)造

3.重寫序列化方法

4.重寫反序列化方法

5.順序一致

6.重寫toString

7.實(shí)現(xiàn)Comparable接口

MapReduce自定義序列化案例?

案例

解決思路

Map階段

Reduce階段

Bean

Coding

1、編寫B(tài)ean

2、編寫Mapper類

3、編寫Reducer類

4、編寫Runner類?

運(yùn)行結(jié)果

斷點(diǎn)設(shè)置技巧


什么是序列化和反序列化?

MapReduce序列化【用戶流量使用統(tǒng)計(jì)】

序列化

序列化是將對象的狀態(tài)信息轉(zhuǎn)化為可以存儲(chǔ)或傳輸?shù)男问降倪^程,通常指將對象在內(nèi)存中的狀態(tài)信息轉(zhuǎn)換為可以被存儲(chǔ)在外部介質(zhì)上的二進(jìn)制流或其他格式的數(shù)據(jù),以便在需要時(shí)可以重新讀取和還原對象的狀態(tài)信息。

反序列化

反序列化則是將存儲(chǔ)或傳輸?shù)臄?shù)據(jù)重新裝配成對象的過程。

為什么要序列化?

因?yàn)镸apReduce是一個(gè)分布式計(jì)算框架,需要將數(shù)據(jù)在各個(gè)節(jié)點(diǎn)之間傳輸。而網(wǎng)絡(luò)傳輸必須是二進(jìn)制數(shù)據(jù),所以不同節(jié)點(diǎn)之間的數(shù)據(jù)傳輸就需要將數(shù)據(jù)轉(zhuǎn)換為二進(jìn)制流進(jìn)行傳輸,因此需要進(jìn)行序列化。

序列化的主要應(yīng)用場景

  1. 對象的持久化:將對象保存到磁盤或數(shù)據(jù)庫中,以便在需要時(shí)可以重新讀取和還原對象的狀態(tài)信息。
  2. 遠(yuǎn)程方法調(diào)用(RPC):將對象轉(zhuǎn)換為可以在網(wǎng)絡(luò)上傳輸?shù)母袷?,以便在不同的進(jìn)程或計(jì)算機(jī)之間進(jìn)行遠(yuǎn)程通信。

    MapReduce序列化【用戶流量使用統(tǒng)計(jì)】

  3. 分布式計(jì)算:將對象轉(zhuǎn)換為可以在分布式計(jì)算環(huán)境中進(jìn)行傳輸和計(jì)算的格式,以便在不同的計(jì)算節(jié)點(diǎn)之間進(jìn)行數(shù)據(jù)傳輸和計(jì)算。

MapReduce實(shí)現(xiàn)序列化

在實(shí)際開發(fā)中,基本的序列化類型往往不能滿足實(shí)際開發(fā)需求,比如在Hadoop內(nèi)部傳遞一個(gè)bean對象,Hadoop的基本序列化類型是沒有這種類型的,所以這就需要我們自己去構(gòu)造該序列化類型。

自定義bean對象實(shí)現(xiàn)Writable接口

Writable接口是Hadoop序列化框架的核心接口,用戶可以通過實(shí)現(xiàn)該接口來實(shí)現(xiàn)自定義的序列化類型。Writable接口的實(shí)現(xiàn)類包括IntWritable、DoubleWritable、Text等,我們可以通過繼承Writable接口來實(shí)現(xiàn)自定義的序列化類。

1.實(shí)現(xiàn)Writable接口

2.無參構(gòu)造

反序列化需要反射調(diào)用無參構(gòu)造函數(shù),所以必須有無參構(gòu)造

3.重寫序列化方法

4.重寫反序列化方法

5.順序一致

序列化和反序列化的順序必須完全一致,也就是說序列化的順序?yàn)椋╝,b,c),那么反序列化的順序也應(yīng)該為(a,b,c)

6.重寫toString

如果需要把結(jié)果顯示在文件中,需要重寫toString,不然對象輸出就是一個(gè)地址值

7.實(shí)現(xiàn)Comparable接口

如果需要將我們自定義的bean放在key中傳輸,就必須重寫Comparable接口,因?yàn)镸apReduce框架中的Shuffle過程要求key必須能夠排序。

MapReduce自定義序列化案例?

案例

統(tǒng)計(jì)每一個(gè)用戶耗費(fèi)的總上行流量、總下行流量、總流量。

輸入案例:

1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	視頻網(wǎng)站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.#	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.#	站點(diǎn)統(tǒng)計(jì)	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站點(diǎn)統(tǒng)計(jì)	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	綜合門戶	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	綜合門戶	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

輸入格式

????????時(shí)間戳、電話號(hào)碼、基站的物理地址、訪問網(wǎng)址的ip、網(wǎng)站域名、數(shù)據(jù)包、接包數(shù)、上行/傳流量、下行/載流量、響應(yīng)碼

輸出格式

????????手機(jī)號(hào)碼 上行流量 下行流量 總流量

解決思路

Map階段

  1. 讀取一行數(shù)據(jù),切分字段
  2. 獲取我們需要的值(手機(jī)號(hào)、上行流和下行流量)
  3. 以手機(jī)號(hào)為key,bean對象為value輸出(即context.write(手機(jī)號(hào),bean))

Map<KRYIN,VALUEIN,KEYOUT,VALUEOUT>

  1. KEYIN:map階段key是文本偏移量,不需要設(shè)置。
  2. VALUEIN:同樣value是文本中一行的數(shù)據(jù),我們不需要設(shè)置。
  3. KEYOUT:我們希望對相同的手機(jī)號(hào)的流量進(jìn)行累加,所以KEYOUT應(yīng)該是手機(jī)號(hào)。
  4. VALUEOUT:map階段我們希望輸出一個(gè)手機(jī)號(hào)(KEY)對應(yīng)三個(gè)值(上行流量、下行流量和總流量)。

Reduce階段

累加上行流量和下行總流量得到總流量(合并相同手機(jī)號(hào)的上行流量和下行流量,即<手機(jī)號(hào),bean1+bean2+...>)

  1. KEYIN:reduce階段KEYIN就是map階段的輸出KEYOUT,上面設(shè)計(jì)好了我們就不需要設(shè)置了。
  2. VALUEIN:同樣VALUEIN就是map階段的輸出VALUEOUT,我們不需要設(shè)置。
  3. KEYOUT:輸出手機(jī)號(hào)。
  4. VALUEOUT:輸出該手機(jī)號(hào)對應(yīng)的bean對象(需要重寫toString)。

Bean

我們這里的Bean是作為輸出的Value,所以不需要繼承Comparable接口,僅僅需要注意是就是重寫toString方法。

Coding

1、編寫B(tài)ean

  • 我們這里的Bean不需要繼承Comparable接口,因?yàn)樗蛔鳛镵ey,我們這里的Key是手機(jī)號(hào),是一個(gè)字符串,它是Text序列化類型,在Hadoop中,它已經(jīng)繼承了Comparable接口。
  • 下面的Bean中,我們重載了setSumFlow方法,因?yàn)閟umFlow并不是原始數(shù)據(jù)中存在的,而是我們我們通過獲取upFlow和downFlow計(jì)算和得來的。
  • 我們重寫了toString方法來滿足輸出格式的要求。
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 1.實(shí)現(xiàn)Writable接口
 * 2.重寫序列化接口和反序列化接口
 * 3.重寫無參構(gòu)造
 * 4.重寫toString方法
 */
public class FlowBean implements Writable {
    private long upFlow;    //上行流量
    private long downFlow;    //下行流量
    private long sumFlow;    //總流量

    //3.無參構(gòu)造
    public FlowBean(){
    }

    //2.1序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //序列化順序無所謂,但是必須和反序列化順序一致
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    }

    //2.2反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    //重載setSumFlow方法
    public void setSumFlow() {
        this.sumFlow = this.upFlow+this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

2、編寫Mapper類

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    //盡可能節(jié)省內(nèi)存,不要每次讀取一行就新建對象
    private String phone;   //手機(jī)號(hào)
    private long upFlow;  //上行流量
    private long downFlow;    //下行流量

    private Text outKey = new Text();
    private FlowBean outValue = new FlowBean();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1.獲取一行
        //1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
        String line = value.toString();

        //2.切割
        String[] words =  StringUtils.split(line,'\t');

        //3.獲取想要的數(shù)據(jù)
        phone = words[1];
        upFlow = Long.parseLong(words[words.length-3]);
        downFlow = Long.parseLong(words[words.length-2]);

        //4.封裝
        outKey.set(phone);
        outValue.setUpFlow(upFlow);
        outValue.setDownFlow(downFlow);
        outValue.setSumFlow();

        //5.寫出
        context.write(outKey,outValue);
    }

}

3、編寫Reducer類

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {

    private FlowBean outValue = new FlowBean();

    //reduce方法每次只計(jì)算相同的key,所以totalUp和totalDown必須放在reduce方法內(nèi)部,否則會(huì)把所有key的上行流量和下行流量加在一起
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        //1.遍歷集合,累加值
        long totalUp = 0;    //上行流量
        long totalDown = 0;    //下行流量


        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }

        //2.封裝outKey,outValue
        outValue.setUpFlow(totalUp);
        outValue.setDownFlow(totalDown);
        outValue.setSumFlow();

        //3. 寫出
        context.write(key,outValue);
    }
}

4、編寫Runner類?

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowRunner extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new FlowRunner(),args);
    }

    @Override
    public int run(String[] args) throws Exception {
        //1.獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "flow compu");

        //2.配置jar包路徑
        job.setJarByClass(FlowRunner.class);

        //3.關(guān)聯(lián)mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4.設(shè)置map、reduce輸出的k、v類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //5.設(shè)置數(shù)據(jù)輸入的路徑
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\writable\\input1"));

        //6.設(shè)置輸出路徑-輸出目錄不可存在
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\writable\\output1"));

        //7.提交job
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否監(jiān)控并打印job的信息
    }
}

運(yùn)行結(jié)果

計(jì)算正確 !

MapReduce序列化【用戶流量使用統(tǒng)計(jì)】

斷點(diǎn)設(shè)置技巧

MapReduce序列化【用戶流量使用統(tǒng)計(jì)】

?調(diào)試MapReduce程序的時(shí)候,我們一般把斷點(diǎn)設(shè)置在map和reduce方法內(nèi)部。

MapReduce序列化【用戶流量使用統(tǒng)計(jì)】文章來源地址http://www.zghlxwxcb.cn/news/detail-462783.html

到了這里,關(guān)于MapReduce序列化【用戶流量使用統(tǒng)計(jì)】的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包