一、【SparkCore篇】項目實(shí)戰(zhàn)—電商用戶行為分析
前言:數(shù)據(jù)準(zhǔn)備
我們看看在實(shí)際的工作中如何使用這些 API 實(shí)現(xiàn)具體的需求。這些需求是電商網(wǎng)站的真實(shí)需求,所以在實(shí)現(xiàn)功能前,咱們必須先將數(shù)據(jù)準(zhǔn)備好。
上面的數(shù)據(jù)圖是從數(shù)據(jù)文件中截取的一部分內(nèi)容,表示為電商網(wǎng)站的用戶行為數(shù)據(jù),主要包含用戶的 4 種行為:搜索,點(diǎn)擊,下單,支付。
1、數(shù)據(jù)規(guī)則如下:
- 數(shù)據(jù)文件中每行數(shù)據(jù)采用下劃線分隔數(shù)據(jù)
- 每一行數(shù)據(jù)表示用戶的一次行為,這個行為只能是 4 種行為的一種
- 如果搜索關(guān)鍵字為 null,表示數(shù)據(jù)不是搜索數(shù)據(jù)
- 如果點(diǎn)擊的品類 ID 和產(chǎn)品 ID 為-1,表示數(shù)據(jù)不是點(diǎn)擊數(shù)據(jù)
- 針對于下單行為,一次可以下單多個商品,所以品類 ID 和產(chǎn)品 ID 可以是多個,id 之間采用逗號分隔,如果本次不是下單行為,則數(shù)據(jù)采用 null 表示
- 支付行為和下單行為類似
2、詳細(xì)字段說明:
3、樣例類
//用戶訪問動作表
case class UserVisitAction(
date: String,//用戶點(diǎn)擊行為的日期
user_id: Long,//用戶的 ID
session_id: String,//Session 的 ID
page_id: Long,//某個頁面的 ID
action_time: String,//動作的時間點(diǎn)
search_keyword: String,//用戶搜索的關(guān)鍵詞
click_category_id: Long,//某一個商品品類的 ID
click_product_id: Long,//某一個商品的 ID
order_category_ids: String,//一次訂單中所有品類的 ID 集合
order_product_ids: String,//一次訂單中所有商品的 ID 集合
pay_category_ids: String,//一次支付中所有品類的 ID 集合
pay_product_ids: String,//一次支付中所有商品的 ID 集合
city_id: Long
)//城市 id
(一)需求1:TOP10熱門品類
1、需求說明
不同的公司可能對熱門的定義不一樣。我們按照每個品類的點(diǎn)擊、下單、支付的量來統(tǒng)計熱門品類。
本項目需求優(yōu)化為:先按照點(diǎn)擊數(shù)排名,靠前的就排名高;如果點(diǎn)擊數(shù)相同,再比較下單數(shù);下單數(shù)再相同,就比較支付數(shù)。
2、代碼實(shí)現(xiàn)方案1
package req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_req01_HotCategeryTop10 {
def main(args: Array[String]): Unit = {
//TODO 熱門品類
val sparkconf: SparkConf = new SparkConf().setAppName("hotCategery").setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkconf)
//1、讀取原始日志數(shù)據(jù)
val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt")
actionRDD.cache() //調(diào)用緩存
//2、統(tǒng)計品類的點(diǎn)擊數(shù)量:(品類id,點(diǎn)擊數(shù)量)
val clickActionRDD: RDD[String] = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickcountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
clickcountRDD.sortBy(_._2, false).take(10).foreach(println)
//3、統(tǒng)計品類的下單數(shù)量:(品類id,下單數(shù)量)
val orderActionRDD: RDD[String] = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null" //下單不為null值
}
)
//orderid=>1,2,3 【(1,1),(2,1),(3,1)】
//1個order拆分成多個商品
val ordercountRdd: RDD[(String, Int)] = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid: String = datas(8)
val cids: Array[String] = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
ordercountRdd.sortBy(_._2).take(10).foreach(println)
//4、統(tǒng)計品類的支付數(shù)量:(品類id,支付數(shù)量)
val payActionRDD: RDD[String] = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null" //下單不為null值
}
)
//orderid=>1,2,3 【(1,1),(2,1),(3,1)】
val paycountRdd: RDD[(String, Int)] = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid: String = datas(10)
val cids: Array[String] = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
paycountRdd.sortBy(_._2).take(10).foreach(println)
//5、將品類進(jìn)行排序,并且取前10名
// 點(diǎn)擊數(shù)量排序,下單數(shù)量排序,支付數(shù)量排序 => 使用元組排序:先比較第1個,再比較第2個,再比較第3個
// (品類ID,(點(diǎn)擊數(shù)量,下單數(shù)量,支付數(shù)量)),后面的括號構(gòu)成一個元組
//join:從原則上,點(diǎn)擊、下單、支付并非一定存在,會出現(xiàn)一些商品點(diǎn)擊數(shù)很多,但是沒有支付的情況,所以不用join
//leftoutjoin:有些商品可能沒有瀏覽頁點(diǎn)擊,直接通過下單進(jìn)入,所以leftoutjoin也不合適
// cogroup=connect+group
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
clickcountRDD.cogroup(ordercountRdd, paycountRdd)
val analysisRDD = cogroupRDD.mapValues {
case (clickIter, orderIter, payIter) => {
var clickcnt = 0
val iter1 = clickIter.iterator //Iterator(迭代器)it
if (iter1.hasNext) { //it.hasNext() 用于檢測集合中是否還有元素
clickcnt = iter1.next() //it.next() 會返回迭代器的下一個元素
}
var ordercnt = 0
val iter2 = orderIter.iterator
if (iter2.hasNext) {
ordercnt = iter2.next()
}
var paycnt = 0
val iter3 = payIter.iterator
if (iter3.hasNext) {
paycnt = iter3.next()
}
(clickcnt, ordercnt, paycnt)
}
}
val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10)
//6、將結(jié)果采集到控制臺打印出來
resultRDD.foreach(println)
sc.stop()
}
}
(二)需求2:TOP10熱門品類中每個品類的TOP10活躍Session統(tǒng)計
1、需求說明
在需求1的基礎(chǔ)上,增加每個品類用戶session的點(diǎn)擊統(tǒng)計文章來源:http://www.zghlxwxcb.cn/news/detail-480810.html
2、需求分析
- 過濾原始數(shù)據(jù),保留點(diǎn)擊和前10品類ID
- 根據(jù)品類ID和sessionID進(jìn)行點(diǎn)擊量的統(tǒng)計
- 將統(tǒng)計結(jié)果進(jìn)行結(jié)構(gòu)轉(zhuǎn)換:((品類ID,sessionID),sum)=> (品類ID,(sessionID,sum))
- 相同品類進(jìn)行分組groupByKey
- 將分組后的數(shù)據(jù)進(jìn)行點(diǎn)擊量的排序,取前10名
3、代碼實(shí)現(xiàn)
package req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_req02_HotTop10Session {
def main(args: Array[String]): Unit = {
//TODO 熱門品類
val sparkconf: SparkConf = new SparkConf().setAppName("hotCategery").setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkconf)
//1、讀取原始日志數(shù)據(jù)
val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt")
actionRDD.cache() //調(diào)用緩存
val top10IDS: Array[String] = top10category(actionRDD)
//1、過濾原始數(shù)據(jù),保留點(diǎn)擊和前10品類ID
val filteractionRDD: RDD[String] = actionRDD.filter(
action => {
val datas: Array[String] = action.split("_")
if (datas(6) != "-1") {
top10IDS.contains(datas(6))
} else {
false
}
}
)
//2、根據(jù)品類ID和sessionID進(jìn)行點(diǎn)擊量的統(tǒng)計
val reduceRDD: RDD[((String, String), Int)] = filteractionRDD.map(
action => {
val datas: Array[String] = action.split("_")
((datas(6), datas(2)), 1)
}
).reduceByKey(_ + _)
//3、將統(tǒng)計結(jié)果進(jìn)行結(jié)構(gòu)轉(zhuǎn)換
//((品類ID,sessionID),sum)=> (品類ID,(sessionID,sum))
val mapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
case ((cid, sid), sum) => {
(cid, (sid, sum))
}
}
//4、相同品類進(jìn)行分組
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
//5、將分組后的數(shù)據(jù)進(jìn)行點(diǎn)擊量的排序,取前10名
val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)
resultRDD.collect().foreach(println)
sc.stop()
}
def top10category(actionRDD: RDD[String]) = {
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
//點(diǎn)擊的場合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
//下單的場合
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
//支付的場合
val ids = datas(10).split(",")
ids.map(id => ((id, (0, 0, 1))))
} else {
Nil
}
}
)
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
analysisRDD.sortBy(_._2, false).take(10).map(_._1)
}
}
(三)需求 3:頁面單跳轉(zhuǎn)換率統(tǒng)計
1、需求說明
1)頁面單跳轉(zhuǎn)化率
- 計算頁面單跳轉(zhuǎn)化率,什么是頁面單跳轉(zhuǎn)換率,比如一個用戶在一次 Session 過程中,訪問的頁面路徑 3,5,7,9,10,21,那么頁面 3 跳到頁面 5 叫一次單跳,7-9 也叫一次單跳,那么單跳轉(zhuǎn)化率就是要統(tǒng)計頁面點(diǎn)擊的概率。
- 比如:計算 3-5 的單跳轉(zhuǎn)化率,先獲取符合條件的 Session 對于頁面 3 的訪問次數(shù)(PV) 為 A,然后獲取符合條件的 Session 中訪問了頁面 3 又緊接著訪問了頁面 5 的次數(shù)為 B,那么 B/A 就是 3-5 的頁面單跳轉(zhuǎn)化率。
2)指標(biāo)意義
- 這個指標(biāo)可以用來分析,整個網(wǎng)站,產(chǎn)品,各個頁面的表現(xiàn)怎么樣,是不是需要去優(yōu)化產(chǎn)品的布局;吸引用戶最終可以進(jìn)入最后的支付頁面。
2、需求分析
文章來源地址http://www.zghlxwxcb.cn/news/detail-480810.html
3、功能實(shí)現(xiàn)
package req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_req03_PageflowAnalysis {
def main(args: Array[String]): Unit = {
//TODO 熱門品類
val sparkconf: SparkConf = new SparkConf().setAppName("hotCategery").setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkconf)
//讀取原始日志數(shù)據(jù)
val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt")
actionRDD.cache() //調(diào)用緩存
val actiondataRDD= actionRDD.map(
action => {
val datas= action.split("_")
UserVisitAction(
datas(0) ,
datas(1).toLong ,
datas(2),
datas(3).toLong ,
datas(4),
datas(5) ,
datas(6).toLong,
datas(7).toLong ,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12).toLong,
)
}
)
//TODO 對指定的頁面連續(xù)跳轉(zhuǎn)進(jìn)行統(tǒng)計;統(tǒng)計頁面1-6的跳轉(zhuǎn)率
// 1-2,2-3,3-4,4-5,5-6,6-7
val ids: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
val okflowids = ids.zip(ids.tail)
//TODO 計算分母
val pageidcount= actiondataRDD.filter(
action => {
ids.init.contains(action.page_id)
}
).map(
action => {
(action.page_id, 1L)
}
).reduceByKey(_ + _).collect().toMap
//TODO 計算分子
//根據(jù)session進(jìn)行分組
val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actiondataRDD.groupBy(_.session_id)
//分組后,根據(jù)訪問時間進(jìn)行排序(升序)
val mvRDD = sessionRDD.mapValues(
iter => {
val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
//[1,2,3,4]
//[1,2].[2,3],[3,4] => 滑窗sliding
val flowids: List[Long] = sortList.map(_.page_id)
val pageflowids: List[(Long, Long)] = flowids.zip(flowids.tail)
//將不合法的頁面過濾
pageflowids.filter(
t=> {
okflowids.contains(t)
}
).map(
t => {(t,1)}
)
}
)
//((1,2),1)
val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
//((1,2),1) => ((1,2),sum)
val dataRDD: RDD[((Long, Long), Int)] = flatRDD.reduceByKey(_ + _)
//TODO 計算單跳轉(zhuǎn)換率:分子/分母
dataRDD.foreach{
case((pageid1,pageid2),sum) => {
val lon= pageidcount.getOrElse(pageid1, 0L)
println(s"頁面${pageid1}跳轉(zhuǎn)到頁面${pageid2}單跳轉(zhuǎn)換率為:"+( sum.toDouble / lon))
}
}
sc.stop()
}
case class UserVisitAction(
date: String,//用戶點(diǎn)擊行為的日期
user_id: Long,//用戶的 ID
session_id: String,//Session 的 ID
page_id: Long,//某個頁面的 ID
action_time: String,//動作的時間點(diǎn)
search_keyword: String,//用戶搜索的關(guān)鍵詞
click_category_id: Long,//某一個商品品類的 ID
click_product_id: Long,//某一個商品的 ID
order_category_ids: String,//一次訂單中所有品類的 ID 集合
order_product_ids: String,//一次訂單中所有商品的 ID 集合
pay_category_ids: String,//一次支付中所有品類的 ID 集合
pay_product_ids: String,//一次支付中所有商品的 ID 集合
city_id: Long
)//城市 id
}
到了這里,關(guān)于Spark項目實(shí)戰(zhàn)—電商用戶行為分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!