水善利萬物而不爭,處眾人之所惡,故幾于道??
一、 網(wǎng)站總流量數(shù)統(tǒng)計 - PV
? 1. 需求分析
? 2. 代碼實現(xiàn)
?? 方式一
?? 方式二
?? 方式三:使用process算子實現(xiàn)
?? 方式四:使用process算子實現(xiàn)
二、網(wǎng)站獨立訪客數(shù)統(tǒng)計 - UV
? 1. 需求分析
? 2. 代碼實現(xiàn)
一、 網(wǎng)站總流量數(shù)統(tǒng)計 - PV
??PV全稱 Page View,也就是一個網(wǎng)站的頁面瀏覽量。每當(dāng)用戶進(jìn)入網(wǎng)站加載或者刷新某個頁面時,就會給該網(wǎng)站帶來PV量,它往往用來衡量一個網(wǎng)站的流量和用戶活躍度。當(dāng)然了,單個指標(biāo)并不能全面的反映網(wǎng)站的實際情況,往往需要結(jié)合其他的指標(biāo)進(jìn)行分析。
1. 需求分析
??埋點采集到的數(shù)據(jù)格式大概是這個樣子(文件已上傳資源)第一個是userId、第二個是itemId、第三個是categoryId、第四個是behavior、第五個是timestamp
所以我們要統(tǒng)計PV的話要先從第四列中篩選出PV,然后再進(jìn)行累加,求出最終的PV
2. 代碼實現(xiàn)
方式一:
??先用readTextFile()
讀取文件,然后將讀取到的每行數(shù)據(jù)封裝成一個bean對象,再通過
filter
過濾出我們需要的PV數(shù)據(jù),這時得到的都是封裝好的一個個對象沒法直接sum,所以通過
map
將數(shù)據(jù)映射為一個個的元組類型(PV,1)然后,使用
keyBy()
將他們分到同一個并行度中進(jìn)行
sum
,得出最終的結(jié)果。
public class Flink01_Project_PV {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port",1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/UserBehavior.csv")
// 將數(shù)據(jù)封裝成 UserBehavior 對象
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String line) throws Exception {
String[] data = line.split(",");
return new UserBehavior(
Long.valueOf(data[0]),
Long.valueOf(data[1]),
Integer.valueOf(data[2]),
data[3],
Long.valueOf(data[4]));
}
})
// 過濾出行為為PV的數(shù)據(jù)
.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior value) throws Exception {
return "pv".equals(value.getBehavior());
}
})
// 因為直接求和的話沒法求,所以做一次映射,映射成 (PV,1) 這樣的結(jié)構(gòu)
.map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(UserBehavior value) throws Exception {
return Tuple2.of(value.getBehavior(),1L);
}
})
// 然后 將他們通過key進(jìn)行分組,進(jìn)入同一個并行度里面 進(jìn)行求和
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
// 進(jìn)行求和
.sum(1)
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運行結(jié)果:
方式二:
??這種方式省去了方式一的封裝對象,其他的思路都一樣。
public class Flink01_Project_PV {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port",1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/UserBehavior.csv")
// 直接過濾出我們想要的數(shù)據(jù)
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
String[] data = value.split(",");
return "pv".equals(data[3]);
}
})
// 然后將結(jié)構(gòu)轉(zhuǎn)換為元組類型 (PV,1)
.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] data = value.split(",");
return Tuple2.of(data[3],1L);
}
})
// 通過key分組
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
// 求和
.sum(1)
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運行結(jié)果:
方式三:使用process算子實現(xiàn)
??首先使用readTextFile
讀取數(shù)據(jù),使用map
將讀取到的數(shù)據(jù)封裝為對象,然后使用keyBy
進(jìn)行分組,最后使用process
算子進(jìn)行求解
- 為什么要使用keyBy():目的是讓pv數(shù)據(jù)進(jìn)入同一個并行度,如果不使用直接process的話,兩個并行度里面都有一個sum,結(jié)果就不對了
- 為什么不使用filter過濾呢?因為我們的過濾邏輯是再process里面完成的,所以不用再額外過濾
public class Flink02_Project_PV_process {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port",1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/UserBehavior.csv")
// 封裝成 UserBehavior 對象
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String line) throws Exception {
String[] data = line.split(",");
return new UserBehavior(
Long.valueOf(data[0]),
Long.valueOf(data[1]),
Integer.valueOf(data[2]),
data[3],
Long.valueOf(data[4]));
}
})
// 通過key分組
.keyBy(new KeySelector<UserBehavior, String>() {
@Override
public String getKey(UserBehavior value) throws Exception {
return value.getBehavior();
}
})
// 使用proces算子實現(xiàn) PV 的統(tǒng)計
.process(new ProcessFunction<UserBehavior, String>() {
// 定義累加變量
long sum =0L ;
@Override
public void processElement(UserBehavior value, Context ctx, Collector<String> out) throws Exception {
// 判斷用戶行為是否是PV
if ("pv".equals(value.getBehavior())){
// 條件滿足 sum+1
sum++;
// 將結(jié)果收集
out.collect("pv = "+sum);
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運行結(jié)果:
方式四:使用process算子實現(xiàn)
??方式四相比于方式三省去了對象的封裝,其他思路一樣。
public class Flink02_Project_PV_process {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port",1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/UserBehavior.csv")
// 通過key分組
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value.split(",")[3];
}
})
// 直接使用process求 PV
.process(new ProcessFunction<String, String>() {
// 定義累加變量
long sum = 0L;
@Override
public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
// 將過來的每行數(shù)據(jù)切割
String[] datas = line.split(",");
// 判斷是否是我們想要的數(shù)據(jù)
if("pv".equals(datas[3])){
// 符合條件,將累加變量+1
sum++;
// 收集結(jié)果
out.collect("pv = "+sum);
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運行結(jié)果:
二、網(wǎng)站獨立訪客數(shù)統(tǒng)計 - UV
??UV全稱 Unique Visitor,也就是獨立訪客數(shù)。在PV中,我們統(tǒng)計的是所有用戶對所有頁面的瀏覽行為,也就是同一個用戶的瀏覽行為會被重復(fù)統(tǒng)計。實際上我們關(guān)注的是在某一特定范圍內(nèi)(一天、一周或者一個月)內(nèi)訪問該網(wǎng)站的用戶數(shù),也就是每個訪客只計算一次。它能從側(cè)面反映出該網(wǎng)站的受歡迎程度和用戶規(guī)模的大小。
1. 需求分析
??要統(tǒng)計UV量的話,只需要對全量的PV,使用userId去重,然后就能得到獨立訪客數(shù)了。2. 代碼實現(xiàn)
??先filter
過濾出PV數(shù)據(jù),然后通過keyBy
將PV分到同一組,然后使用process
進(jìn)行處理,處理方法是:用set集合存放userId,如果下一個userId可以加入該集合說明是一個新的獨立訪客,則收集當(dāng)前集合的大小,若加入失敗,說明集合中已經(jīng)存在該userId,也就是不是一個新的獨立訪客,也就不做處理了。文章來源:http://www.zghlxwxcb.cn/news/detail-623517.html
public class Flink03_Project_UV {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port",1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/UserBehavior.csv")
// 過濾出 PV 數(shù)據(jù)
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return "pv".equals(value.split(",")[3]);
}
})
// 將PV的數(shù)據(jù)分到同一個組里面
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value.split(",")[3];
}
})
// 對同一組里面的數(shù)據(jù)進(jìn)行處理
.process(new ProcessFunction<String, String>() {
// 存放 userId 的容器,回自動對數(shù)據(jù)進(jìn)行去重,最后直接拿它的大小就知道UV了
Set<Long> userIdSet = new HashSet<>();
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
Long userId = Long.valueOf(value.split(",")[0]);
// 向set中添加userId,判斷是否添加成功
if (userIdSet.add(userId)) {
// 添加成功的話,說明是一個新的獨立訪客,收集到此時容器大小
out.collect("UV = "+ userIdSet.size());
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運行結(jié)果:文章來源地址http://www.zghlxwxcb.cn/news/detail-623517.html
到了這里,關(guān)于基于埋點日志數(shù)據(jù)的網(wǎng)絡(luò)流量統(tǒng)計 - PV、UV的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!