MapReduce實踐編程
實驗一
問題描述
根據用戶手機上網的行為記錄,基于 MapReduce編程模型設計程序統(tǒng)計不同手機號的用戶使用的總流量。其中,數據記錄的字段描述如下。
序號 | 字段 | 字段類型 | 描述 |
---|---|---|---|
0 | reportTime | long | 記錄報告時間戳 |
1 | msisdn | String | 手機號碼 |
2 | apmac | String | AP mac |
3 | acmac | String | AC mac |
4 | host | String | 訪問的網址 |
5 | siteType | String | 網址種類 |
6 | upPackNum | long | 上行數據包數,單位:個 |
7 | downPackNum | long | 下行數據包數,單位:個 |
8 | upPayLoad | long | 上行總流量,要注意單位的轉換:byte |
9 | downPayLoad | long | 下行總流量。要注意單位的轉換:byte |
10 | httpStatus | String | HTTP Response |
數據文件具體內容如下:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 iface.qiyi.co 視頻網站 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 s19.# 站點統(tǒng)計 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sug.so.# 信息安全 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.# 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 input.shouji.sogou.com 搜索引擎 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.# 站點統(tǒng)計 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統(tǒng)計 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 sug.so.# 信息安全 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 站點統(tǒng)計 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 t3.baidu.com 搜索引擎 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 t3.baidu.com 搜索引擎 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 18 138 1080 186852 200
實驗過程
我們需要從數據中統(tǒng)計出每個用戶的所有請求的使用的總流量,即統(tǒng)計用戶所有請求的上行流量(索引為8)、下行流量(索引為9)之和。得到結果后輸出到單獨的文件中。
一、MapReduce程序編寫
1、創(chuàng)建maven項目
2、導入hadoop依賴包
在pom.xml中添加以下依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
如圖所示:
導入后記得ctrl+s保存一下。
3、創(chuàng)建類
3.1 Flow類
在Flow類中寫入如下代碼:
package com.njupt.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone; //手機號
private long up; //上行流量
private long down; //下線流量
private long sum; //總流量
//無參構造函數
public Flow() {
}
//有參構造函數
public Flow(String phone, long up, long down) {
super();
this.phone = phone;
this.up = up;
this.down = down;
this.sum=this.up+this.down;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.phone);
out.writeLong(this.up);
out.writeLong(this.down);
out.writeLong(this.sum);
}
@Override
public void readFields(DataInput in) throws IOException {
this.phone=in.readUTF();
this.up=in.readLong();
this.down=in.readLong();
this.sum=in.readLong();
}
@Override
public String toString() {
return this.up+"\t"+this.down+"\t"+this.sum;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public long getUp() {
return up;
}
public void setUp(long up) {
this.up = up;
}
public long getDown() {
return down;
}
public void setDown(long down) {
this.down = down;
}
public long getSum() {
return sum;
}
}
如圖所示:
在FlowSumMapper類中輸入以下代碼:
package com.njupt.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang3.StringUtils;
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, Flow>{
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
//拿一行數據
String line = value.toString();
//切分成各個字段
String[] fields = StringUtils.split(line, "\t");
//拿到我們需要的字段
String phone = fields[1];
long up= Long.parseLong(fields[8]);
long down = Long.parseLong(fields[9]);
//封裝數據為kv并輸出 <phone:flow>
context.write(new Text(phone), new Flow(phone,up,down));
}
}
如圖所示:
3.3 FlowSumReducer類
創(chuàng)建FlowSumReducer類
在FlowSumReducer中輸入以下代碼:
package com.njupt.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowSumReducer extends Reducer<Text, Flow, Text, Flow> {
@Override
protected void reduce(Text key, Iterable<Flow> values,
Context context)
throws IOException, InterruptedException {
// <phone:{flow,flow,flow,flow}>
// reduce中的業(yè)務邏輯就是遍歷values,然后進行累加求和再輸出
long up = 0;//
long down = 0;
for (Flow flow : values) {
up += flow.getUp();
down += flow.getDown();
}
context.write(key, new Flow(key.toString(), up, down));
}
}
如圖所示:
3.4 FlowSumRunner類
創(chuàng)建FlowSumRunner類
在FlowSumRunner類中輸入以下代碼:
package com.njupt.flowsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowSumRunner extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumRunner.class);
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//設置map程序的輸出key、value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
//設置 輸出 key、value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//輸入數據路徑 /flow/input
//檢查一下參數所指定的輸出路徑是否存在,如果已存在,先刪除
Path output = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));//輸出數據路徑 /flow/output
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
System.exit(status);
}
}
如圖所示:
4、將項目打包成jar包
查看porn.xml中是否包含這行代碼:
<packaging>jar</packaging>
如圖所示:
將flowsum項目打包:
在終端出現build success即可,如圖所示:
對flowsum進行刷新
此時target文件夾中就會出現打包好的jar包。
將該jar包改名成fs.jar,放入一個你可以找到的文件夾中。
二、提交到hadoop集群測試
1、 創(chuàng)建數據
創(chuàng)建一個記事本文件,命名為“data.txt”。
將數據復制到該文件中并保存,如圖所示:
2、啟動hadoop集群
2.1 啟動虛擬機node1、node2、node3
2.2 啟動hadoop
輸入以下命令:
start-dfs.sh
start-yarn.sh
如圖所示:
2.3 在瀏覽器打開hadoop界面
http://192.168.198.130:9870 (其中192.168.198.130換成自己的node1的IP地址)
查看文件夾:
3、測試MapReduce
3.1 用Xftp上傳數據文件和jar包
3.2 node1上cd到/export/server/hadoop-3.3.0/share/hadoop/mapreduce目錄
cd /export/server/hadoop-3.3.0/share/hadoop/mapreduce
3.3 輸入以下命令在hdfs上創(chuàng)建/flow/input文件夾
hdfs dfs -mkdir -p /flow/input
如圖所示:
可以在瀏覽器查看創(chuàng)建情況
3.4 輸入以下命令,將data.txt上傳到input文件夾中
hdfs dfs -put data.txt /flow/input
如圖所示:
可以在瀏覽器查看上傳情況:
3.5 輸入以下命令運行fs.jar統(tǒng)計不同手機號的用戶使用的總流量
hadoop jar fs.jar com.njupt.flowsum.FlowSumRunner /flow/input /flow/output
如圖所示:
3.6 可以輸入以下命令查看輸出結果
hdfs dfs -text /flow/output/part-r-00000
如圖所示:
image-20231020202354304.png&pos_id=img-ha3JxHz7-1698633023279)
也可以在瀏覽器下載結果文件:
20231020202416090.png&pos_id=img-2pQkSVHA-1698633023279)文章來源:http://www.zghlxwxcb.cn/news/detail-860592.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-860592.html
到了這里,關于使用eclipse實現MapReduce實踐編程---統(tǒng)計不同手機號的用戶使用的總流量的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!