?目錄
一. 訓(xùn)練要點(diǎn)
二.需求說明
三.關(guān)鍵實(shí)現(xiàn)思路及步驟
?四、LogCount.scala文件完整代碼實(shí)現(xiàn):
五、運(yùn)行過程與結(jié)果截圖:
??六、具體實(shí)現(xiàn)步驟
?七、相關(guān)知識點(diǎn)?
?1、過濾出訪問次數(shù)在 50 次以上的用戶記錄
?2、統(tǒng)計訪問 50 次以上的用戶主要訪問的前 5 類網(wǎng)頁
?3. 合并部分網(wǎng)頁
?4.根據(jù)訪問時間加入對應(yīng)時段:
實(shí)訓(xùn)題目:競賽網(wǎng)站訪問日志分析
一. 訓(xùn)練要點(diǎn)
(1)搭建Spurk工程環(huán)境。
(2) Spark編程。
(3)通過spark-submit提交應(yīng)用。
二.需求說明
? ? ?某競賽網(wǎng)站每年都會開展數(shù)據(jù)挖據(jù)的競賽,在競賽期間網(wǎng)站會有大量人群訪問,生成了大量的用戶訪向記錄?,F(xiàn)在提供2016年10月到2017年6月的部分脫敏訪問日志數(shù)據(jù)。日志數(shù)據(jù)的基本內(nèi)容如圖所示,僅提供以下6個字段。
屬性名稱 |
屬性解析 |
Id |
序號 |
Content_id |
網(wǎng)頁ID |
Page_path |
網(wǎng)址 |
Userid |
用戶ID |
Sessionid |
緩存生成ID |
Date_time |
訪問時間 |
? ? ?要求根據(jù)提供的用戶訪問日志數(shù)據(jù),利用Spark技術(shù)統(tǒng)計訪向的用戶數(shù)、被訪問的不同網(wǎng)頁個數(shù)以及每月的訪問量,并將結(jié)果保存到HDFS上。
文章所用文檔以及目錄等等說明:
(點(diǎn)擊可免費(fèi)下載)訪問日志數(shù)據(jù):? ??jc_content_viewlog.txt
IDEA內(nèi)實(shí)現(xiàn)代碼存儲路徑與名字:LogCount.scala
? jc_content_viewlog.txt? ?內(nèi)部分?jǐn)?shù)據(jù)如下圖:
三.關(guān)鍵實(shí)現(xiàn)思路及步驟
(1)配置好Spark的IntelliJ IDEA開發(fā)環(huán)境。
(2)啟動IntelliJ IDEA,并進(jìn)行Spark編程。
(3)對訪向記錄中的網(wǎng)頁去重,統(tǒng)計本周期內(nèi)被訪問網(wǎng)頁的個數(shù)。
val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
?(4) userid為用戶注冊登錄的標(biāo)識,對userid去重,統(tǒng)計登錄用戶的數(shù)量。
val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
?(5)按月統(tǒng)計訪問記錄數(shù)。
val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
?(6)將結(jié)果保存到不同文件中。
wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))
?(7)打包Spark工程,在集群提交應(yīng)用程序。
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client?? --class?? net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
注:jc.jar是上面文件生成的jar包改名并上傳而來;
hdfs://node1:8020/user/root/jc_content_viewlog.txt? 是hdfs里面jc_content_viewlog.txt存儲路徑,也需要自己上傳,目錄自己決定;
hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3? 是設(shè)置它的輸出存儲路徑,因?yàn)闀敵鋈齻€不同數(shù)據(jù),需要三個目錄,不然會報錯。
?四、LogCount.scala文件完整代碼實(shí)現(xiàn):
package net
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object LogCount {
def main(args: Array[String]): Unit = {
if(args.length < 2){
println("請指定input和output")
System.exit(1)//非0表示非正常退出程序
}
//TODO 1.env/準(zhǔn)備sc/SparkContext/Spark上下文執(zhí)行環(huán)境
val conf: SparkConf = new SparkConf().setAppName("wc")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//TODO 2.source/讀取數(shù)據(jù)
//RDD:A Resilient Distributed Dataset (RDD):彈性分布式數(shù)據(jù)集,簡單理解為分布式集合!使用起來和普通集合一樣簡單!
//RDD[就是一行行的數(shù)據(jù)]
val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
//TODO 3.transformation/數(shù)據(jù)操作/轉(zhuǎn)換
//對訪問記錄中的網(wǎng)頁去重,統(tǒng)計本周期內(nèi)被訪問網(wǎng)頁的個數(shù)
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
//userid為用戶注冊登錄的標(biāo)識,對userid去重,統(tǒng)計登錄用戶的數(shù)量
val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
//按月統(tǒng)計訪問記錄數(shù)
val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
//TODO 4.sink/輸出
//輸出到指定path(可以是文件/夾)
wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))
//為了便于查看Web-UI可以讓程序睡一會
Thread.sleep(1000 * 60)
//TODO 5.關(guān)閉資源
sc.stop()
}
//獲取年月,時間段作為輸入?yún)?shù)
def date_time(date:String):String={
val nianye =date.trim.substring(0,7)
nianye
}
}
五、運(yùn)行過程與結(jié)果截圖:
?
?六、具體實(shí)現(xiàn)步驟
1、修改打包好的jar名字,并把jar上傳到node1結(jié)點(diǎn)
2、開啟一系列集群:
start-dfs.sh? ?//一鍵開啟
start-yarn.sh? //開啟
cd /myserver/
?mr-jobhistory-daemon.sh start historyserver
?/myserver/spark301/sbin/start-history-server.sh
?jps? //查看
這里不再具體說明如何開啟。
3、上傳jc_content_viewlog.txt到node1節(jié)點(diǎn),并上傳到hdfs
?
[root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt? /user/root/
?4、在集群提交應(yīng)用程序
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client?? --class?? net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
?七、相關(guān)知識點(diǎn)?
進(jìn)入spark-shell
[root@node1 bin]# /myserver/spark301/bin/spark-shell
?1、過濾出訪問次數(shù)在 50 次以上的用戶記錄
(1)統(tǒng)計用戶訪問次數(shù)并篩選出訪問次數(shù)在50次以上的用戶ID
scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}
data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24
scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect
?(2)根據(jù)過濾后的用戶ID,在原數(shù)據(jù)中篩選出這一部分用戶的訪問記錄
scala> val valib_data=data.filter(x=>userid.contains(x(3)))
valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27
scala> valib_data.take(2)? ?//查看
res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))
?2、統(tǒng)計訪問 50 次以上的用戶主要訪問的前 5 類網(wǎng)頁
?scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)
web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25
scala> web.take(5)
res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))
?3. 合并部分網(wǎng)頁
(URL 后面帶有_1、_2 字樣的翻頁網(wǎng)址,統(tǒng)一為一個網(wǎng)址)通過字符串截取的方法,對網(wǎng)頁網(wǎng)址字符串進(jìn)行截取,只截取“_”前面的字符串
?scala> val data2=data.filter(_.length>=6).map{
? ? x=>
????? var page="";
????? if(x(2).contains("_"))
??????? { page=x(2).substring(0,x(2).lastIndexOf("_")) }
????? else
????? ??{ page=x(2) };
????? (x(0),x(1),page,x(3),x(4),x(5))
????? }
data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25
?4.根據(jù)訪問時間加入對應(yīng)時段:
6:30~11:30 為上午,11:30~14:00 為中午,14:00~17:30為下午,17:30~19:00 為傍晚,19:00~23:00 為晚上,23:00~6:30 為深夜,統(tǒng)計所有用戶各時段訪問情況
(1)首先定義一個函數(shù),用于匹配時間段并返回相應(yīng)的字段值
scala> def date_time(date:String):String={
val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
if(hour<6 && hour>=23) "深夜"
else if(hour==6 && min<=30) "深夜"
else if(hour<11 && hour>=6) "上午"
else if(hour==11 && min<=30) "上午"
else if(hour<14 && hour>=11) "中午"
else if(hour>=14 && hour<17) "下午"
else if(hour==17 && hour<=30) "下午"
else if(hour>=17 && hour<19) "傍晚"
else if(hour==19 && min<=30) "傍晚"
else "晚上"
}
date_time: (date: String)String
(2)通過map方法對每一條記錄的時間進(jìn)行匹配,增加一個時間段的值到記錄中
?scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}
data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27
(3)將時段值作為鍵,值為1,利用reduceByKey的方法統(tǒng)計各時段訪問情況?
scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)
date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25
scala> date_count.take(10)文章來源:http://www.zghlxwxcb.cn/news/detail-430718.html
res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))文章來源地址http://www.zghlxwxcb.cn/news/detail-430718.html
到了這里,關(guān)于【Spark實(shí)訓(xùn)】--競賽網(wǎng)站訪問日志分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!