一、說明
在實(shí)際應(yīng)用中,我們往往會(huì)關(guān)注,到底有多少不同的用戶訪問了網(wǎng)站,所以另外一個(gè)統(tǒng)計(jì)流量的重要指標(biāo)是網(wǎng)站的獨(dú)立訪客數(shù)(Unique Visitor,UV)。文章來源:http://www.zghlxwxcb.cn/news/detail-680671.html
二、數(shù)據(jù)準(zhǔn)備
package com.lyh.flink06;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {
private Long userId;
private Long itemId;
private Integer categoryId;
private String behavior;
private Long timestamp;
}
三、思路
對(duì)于UserBehavior數(shù)據(jù)源來說,我們直接可以根據(jù)userId來區(qū)分不同的用戶。
將userid放到SET集合里面,統(tǒng)計(jì)集合長(zhǎng)度,便可以統(tǒng)計(jì)到網(wǎng)站的訪客數(shù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-680671.html
四、代碼
package com.lyh.flink06;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import javax.naming.ldap.HasControls;
import java.util.HashSet;
import java.util.Set;
public class PUcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.readTextFile("input/UserBehavior.csv")
.map(line -> {
String[] data = line.split(",");
return new UserBehavior(
Long.valueOf(data[0]),
Long.valueOf(data[1]),
Integer.valueOf(data[2]),
data[3],
Long.valueOf(data[4])
);
}).filter(ub ->"pv".equals(ub.getBehavior()))
.keyBy(UserBehavior::getBehavior)
.process(new KeyedProcessFunction<String, UserBehavior, String>() {
Set<Long> userIdset = new HashSet<>();
@Override
public void processElement(UserBehavior value,
Context ctx,
Collector<String> out) throws Exception {
userIdset.add(value.getUserId());
out.collect("uv:"+ userIdset.size());
}
}).print();
env.execute();
}
}
到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 網(wǎng)站UV統(tǒng)計(jì)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!