購物信息分析基于spark
目錄
本案例中三個(gè)文案例中需要處理的文件為 order_goods.txt、products.txt 以及 orders.txt 三個(gè)文件,三個(gè)文件的說明如下
一、本實(shí)訓(xùn)項(xiàng)目針對(duì)實(shí)驗(yàn)數(shù)據(jù)主要完成了哪些處理?
二、Hadoop+Spark集群環(huán)境的搭建步驟有哪些?(只介紹完全分布式集群環(huán)境的搭建)
三、本人在搭建Hadoop+Spark完全分布式集群過程中出現(xiàn)了哪些問題?如何解決的?
四、描述數(shù)據(jù)清理過程過程,最終要得到的數(shù)據(jù)中是對(duì)原數(shù)據(jù)做了哪些處理?
五、計(jì)算商鋪服裝風(fēng)格實(shí)現(xiàn)思路是?最終程序的運(yùn)行結(jié)果是?
??????? 代碼
六、計(jì)算店鋪訂單數(shù)時(shí)如何實(shí)現(xiàn)的?結(jié)果如何?
七、計(jì)算買家服裝風(fēng)格畫像的實(shí)現(xiàn)思路是?最終程序的運(yùn)行結(jié)果是?
八、 消費(fèi)習(xí)慣是如何實(shí)現(xiàn)的?最終的運(yùn)行結(jié)果是?
九 、開發(fā)過程中遇到的技術(shù)難點(diǎn)有哪些?如何解決的?
十、本項(xiàng)目開發(fā)中獲得的經(jīng)驗(yàn)和不足?
本案例中三個(gè)文案例中需要處理的文件為 order_goods.txt、products.txt 以及 orders.txt 三個(gè)文件,三個(gè)文件的說明如下
order_goods.txt
products.txt
orders.txt
一、本實(shí)訓(xùn)項(xiàng)目針對(duì)實(shí)驗(yàn)數(shù)據(jù)主要完成了哪些處理?
1數(shù)據(jù)清洗
1.1需求描述 數(shù)據(jù)清洗--DataClear
1)order_goods.txt 中缺少元素的
2) products.txt 中缺少元素的
1.2 解題思路 案例中需要處理的文件為 order_goods.txt、products.txt,讀取每個(gè)文件并按行處理,使 用 filter 函數(shù)過濾行中個(gè)字段是否為空字符串,如為空字符串則丟掉。
2商戶畫像
2.1 需求描述 計(jì)算店鋪畫像,既合并商戶的所有不同服裝的風(fēng)格
2.2 解題思路
a. 讀取文件并切分
b. 重新排列數(shù)據(jù),商戶 id 為 key,以服裝風(fēng)格為值重新排列
c. 按照商戶 id分組并合并服裝風(fēng)格并且去重
3計(jì)算店鋪訂單數(shù)
3.1 ?需求描述 計(jì)算每個(gè)商鋪的總訂單數(shù)
3.2 解題思路
a. 以商品 id 作為關(guān)聯(lián),將 order_goods 和 product 進(jìn)行 join 操作,操作結(jié)果形成單商品 對(duì)應(yīng)多((用戶 id,商品風(fēng)格),店鋪 id))結(jié)構(gòu), 格式為(商品 id,((用戶 id,商品風(fēng) 格),店鋪 id)));
b. 取出店鋪 id 計(jì)算數(shù)量
4計(jì)算用戶(賣家)畫像
4.1 需求描述 通過買家購買的服裝,計(jì)算買家穿衣風(fēng)格畫像
4.2. 解題思路
a.讀取 order_goods 和 products,以商品 id 為 key 進(jìn)行 join 操作,生成數(shù)據(jù)結(jié)構(gòu)為(商 品 id,(用戶 id,服裝風(fēng)格))
b. 取出(用戶 id,服裝風(fēng)格)進(jìn)行 groupByKey(或 reduceByKey,建議 reduceByKey)操作, 將用戶購買服裝風(fēng)格進(jìn)行連接,
c. 去除掉結(jié)果數(shù)據(jù)中重復(fù)的風(fēng)格
5消費(fèi)習(xí)慣
5.1 需求描述 取每個(gè) uid 下訂單最多的那一天,并判斷是周末還是工作日(即提取用戶是喜歡在周末購物 還是工作日購物特征)
5.2 解題思路
a.讀取 orders.txt,根據(jù)毫秒值計(jì)算星期幾
b.按照整條記錄分組,也就是將用戶名和周 X 整個(gè)作為分組條件
c.分組后,t._2 默認(rèn)是 CompactBuffer 類型,將其轉(zhuǎn)換為 List 然后計(jì)算其元素?cái)?shù)量,就是 這個(gè)用戶在周 X
二、Hadoop+Spark集群環(huán)境的搭建步驟有哪些?(只介紹完全分布式集群環(huán)境的搭建)
1.安裝虛擬機(jī)
?????
2設(shè)置網(wǎng)絡(luò)
???????
3主機(jī)名和ip映射
???????
4上傳安裝包并解壓安裝
5設(shè)置環(huán)境變量
???????
6啟動(dòng)hadoop
???????
7使用jps 顯示進(jìn)程??? Hadoop的hdfs和yarn成功啟動(dòng)
8上傳spark安裝包并且安裝 spark本地模式安裝完畢
三、本人在搭建Hadoop+Spark完全分布式集群過程中出現(xiàn)了哪些問題?如何解決的?
1.經(jīng)常直接關(guān)閉虛擬機(jī)損壞了hdfs文件,導(dǎo)致進(jìn)入安全模式
刪除所有hdfs受損文件或者強(qiáng)制退出安全模式
2.清理電腦內(nèi)存時(shí),不小心刪除了鏡像文件,導(dǎo)致虛擬機(jī)全部崩潰
重新下載鏡像文件,并且重置網(wǎng)絡(luò)設(shè)置
3.Hdfs的從節(jié)點(diǎn)中有一個(gè)datanode不啟動(dòng)
關(guān)閉集群刪除從節(jié)點(diǎn)所有日志和文件,主節(jié)點(diǎn)重新格式化hdfs,啟動(dòng)集群
4.集群中的hdfs datanode節(jié)點(diǎn)容易崩潰
重置了網(wǎng)絡(luò),改用手機(jī)熱點(diǎn),比以前穩(wěn)點(diǎn)了許多
四、描述數(shù)據(jù)清理過程過程,最終要得到的數(shù)據(jù)中是對(duì)原數(shù)據(jù)做了哪些處理?
1讀取文件中的數(shù)據(jù)構(gòu)建RDD,然后把其中的每個(gè)數(shù)據(jù)按照\t進(jìn)行分割構(gòu)成數(shù)組,然后遍歷數(shù)組中的數(shù)據(jù)如果有空數(shù)據(jù)就不寫入RDD,數(shù)據(jù)齊全寫入RDD ??
兩個(gè)數(shù)據(jù)清洗類似,就不做過多介紹。
代碼如下
import org.apache.spark.{SparkConf, SparkContext}
object DataClean {
def main(args: Array[String]): Unit = {
//第一步 數(shù)據(jù)清洗 order_goods.txt、products.txt 買家和賣家信息
val Conf = new SparkConf().setMaster("local").setAppName("DataClean")
val sc = new SparkContext(Conf)
val data1 = sc.textFile("data/products.txt")
data1.filter(num => {
//將數(shù)據(jù)按行分割
val lines = num.split("\t")
var judge = true
//遍歷數(shù)組元素
for (s <- lines) {
if (s.isEmpty) {
judge = false
}
}
//返回boolean類型
judge
}).saveAsTextFile("data/product_clean1")
val data2 = sc.textFile("data/order_goods.txt")
data2.filter(num => {
//將數(shù)據(jù)按行分割
val lines = num.split("\t")
var judge = true
//遍歷數(shù)組元素
for (s <- lines) {
if (s.isEmpty) {
judge = false
}
}
//返回boolean類型
judge
}).saveAsTextFile("data/order_good_clean1")
sc.stop()
}
}
2運(yùn)行結(jié)果
五、計(jì)算商鋪服裝風(fēng)格實(shí)現(xiàn)思路是?最終程序的運(yùn)行結(jié)果是?
??????? 代碼
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CompactStyle {
def main(args: Array[String]): Unit = {
//第二步 商家畫像 將商戶所有的風(fēng)格進(jìn)行合并
val Conf = new SparkConf().setMaster("local").setAppName("CompactStyle")
val sc = new SparkContext(Conf)
val value = sc.textFile("data/product_clean1/part-00000")
.map(line => line.split("\t"))
.map(kv => (kv(2), kv(1)))
.reduceByKey(_ + _)
.mapValues(v => {
val strings: Array[String] = v.split(";")
var list = List[String]()
for(values<-strings){
if(!list.contains(values)){
list=values::list
}
}
list.mkString(";")
})
.map(t => t._1 + "\t" + t._2)
.saveAsTextFile("data/product_compact2")
}
}
1讀入數(shù)據(jù)后,將數(shù)據(jù)進(jìn)行切分,然后將店鋪id和風(fēng)格映射成(店鋪id,風(fēng)格)的元組
???????
2運(yùn)用reduceBykey將數(shù)據(jù)進(jìn)行累加
???????
3但是我們不難發(fā)現(xiàn)第一行數(shù)據(jù)中風(fēng)格有重復(fù),所以我們要去重,運(yùn)用mapvalues將數(shù)據(jù)中的風(fēng)格生成list,遍歷通過是否包含來達(dá)到去重的目的,結(jié)果如下也是最終結(jié)果。
??????? ???????
六、計(jì)算店鋪訂單數(shù)時(shí)如何實(shí)現(xiàn)的?結(jié)果如何?
??????? 代碼
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object OrdersCalculate {
def main(args: Array[String]): Unit = {
val Conf = new SparkConf().setMaster("local").setAppName("Calculate")
val sc = new SparkContext(Conf)
val data1: RDD[String] = sc.textFile("data/order_good_clean1/part-00000")
val data2: RDD[String] = sc.textFile("data/product_clean1/part-00000")
val data1_split: RDD[Array[String]] = data1.map(line => line.split("\t"))
val data2_split: RDD[Array[String]] = data2.map(line => line.split("\t"))
val rd1: RDD[(String, String)] = data1_split.map(v => (v(1), v(0))) //(商品id,用戶id)
val rd2: RDD[(String, (String, String))] = data2_split.map(v => (v(0), (v(1), v(2)))) //(商品id,(風(fēng)格,店鋪id))
val rd3: RDD[(String, (String, (String, String)))] = rd1.join(rd2)
val rd4: RDD[(String, ((String, String), String))] = rd3.map(v => (v._1, ((v._2._1, v._2._2._1), v._2._2._2)))
val rd5: RDD[(String, Int)] = rd4.map(v => (v._2._2, 1))
val rd6: RDD[(String, Int)] = rd5.reduceByKey(_ + _)
//rd6.collect().foreach(println)
val data3: RDD[String] = sc.textFile("data/product_compact2")
val rdd1: RDD[Array[String]] = data3.map(line => line.split("\t"))
val rdd2: RDD[(String, String)] = rdd1.map(kv => (kv(0), kv(1)))
val rdd3: RDD[(String, (Int, String))] = rd6.join(rdd2)
rdd3.collect().foreach(println)
val rd7: RDD[(String, (Int, String))] = rdd3.sortBy(_._2._1, false)
//rd7.collect().foreach(println)
val rd8: RDD[String] = rd7.map(v => v._1 + "\t" + v._2._1 + "\t" + v._2._2)
rd8.saveAsTextFile("data/store_number3")
sc.stop()
}
}
1讀取product和order清洗過的數(shù)據(jù)并且進(jìn)行映射,兩者的key都用商品id為接下來的join操作做準(zhǔn)備
(商品id,用戶id)
???????
(商品id,(風(fēng)格,店鋪id))
3通過join鏈接兩個(gè)rdd,相同商品id的會(huì)鏈接在一起 結(jié)果如下圖1
??然后通過map變成題目要求中rdd
(商品 id,((用戶 id,商品風(fēng) 格),店鋪 id)))如下圖2
???????
?
4取出店鋪id,通過map操作將其映射為(店鋪id,1),然后通過reducebykey累加,計(jì)算該店鋪的訂單但數(shù)量,至此我們得到了(店鋪id,數(shù)量)的rdd。
???????
5我們一定得到了店鋪的訂單數(shù)量的一個(gè)rdd,假設(shè)我們是一個(gè)大數(shù)據(jù)推薦系統(tǒng)的話,我們肯定會(huì)把訂單數(shù)量最高的推薦給用戶,亦或者把用戶所需要的風(fēng)格銷量最高的店鋪或者商品推薦給用戶,因?yàn)橹拔覀冏鲞^商戶畫像,所以這里直接拿它的結(jié)果來用,進(jìn)行一個(gè)jion操作,并且進(jìn)行排序。
畫像結(jié)果如下
???????
通過店鋪id join的結(jié)果為(店鋪id,(訂單數(shù),風(fēng)格))如下
???????
通過訂單數(shù)進(jìn)行排序得到的最終結(jié)果就是如下
???????
通過此圖可以看出,如果用戶趨向于買某個(gè)風(fēng)格的衣服,可以從上而下進(jìn)行篩選推薦,如果我想買ol的衣服,就會(huì)推薦79,531,721,357店鋪,更加立體顯明。
七、計(jì)算買家服裝風(fēng)格畫像的實(shí)現(xiàn)思路是?最終程序的運(yùn)行結(jié)果是?
代碼如下
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object OrderStyle {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("OrderStyle")
val sc = new SparkContext(sparkConf)
val data1: RDD[(String,String)] = sc.textFile("data/order_good_clean1/part-00000")
.map(v => v.split("\t"))
.map(v => (v(1),v(0))) //商品id,用戶id
val data2: RDD[(String, String)] = sc.textFile("data/product_clean1/part-00000")
.map(v => v.split("\t"))
.map(v => (v(0), v(1))) //商品id,風(fēng)格
val data_join: RDD[(String, (String, String))] = data1.join(data2)
//data_join.collect().foreach(println)
val rd1: RDD[(String, String)] = data_join.map(v => (v._2._1, v._2._2))
//rd1.collect().foreach(println)
val rd2: RDD[(String, String)] = rd1.reduceByKey(_ + _)
rd2.collect().foreach(println)
val rd3: RDD[(String, String)] = rd2.mapValues(v=>{
val strings: Array[String] = v.split(";")
var list = List[String]()
for(values<-strings){
if(!list.contains(values)){
list=values::list
}
}
list.mkString(";")
})
rd3.collect().foreach(println)
//rd3.map(v => v._1+"\t"+v._2).saveAsTextFile("data/orders_style_sta4")
sc.stop()
}
}
1讀取producst和order_goods清洗過的數(shù)據(jù)并且進(jìn)行映射,兩者的key都用商品id為接下來的join操作做準(zhǔn)備
結(jié)果為(商品id,(用戶id,風(fēng)格)如下
???????
2我們可從圖中看出,無論是店鋪還是用戶都是由重復(fù)的,接下來就是要進(jìn)行風(fēng)格的合并,店鋪id對(duì)我們來說已經(jīng)沒有用了,我們要將其舍棄,將其映射為(用戶id,風(fēng)格)
結(jié)果為
???????
3通過用戶id進(jìn)行風(fēng)格累加并且去重
累加結(jié)果為
???????
我們可以看到有很多風(fēng)格重復(fù)
去重結(jié)果
???????
最終結(jié)果就是將其存儲(chǔ) ?如下
???????
八、 消費(fèi)習(xí)慣是如何實(shí)現(xiàn)的?最終的運(yùn)行結(jié)果是?
代碼如下
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import java.util.{Calendar, Date}
object BuyTime {
def main(args: Array[String]): Unit = {
def weekArr(i: Int):Int ={
var a = i
if(a==1) {
a = 7
}else{
a -=1
}
a
}
val sparkConf = new SparkConf().setMaster("local").setAppName("BuyTime")
sparkConf.set("spark.default.parallelism", "1")
val sc = new SparkContext(sparkConf)
val data: RDD[String] = sc.textFile("data/orders.txt")
val datas: RDD[Array[String]] = data.map(_.split("\t"))
val value_date: RDD[(String, Int)] = datas.map(d => {
val date = new Date(d(1).toLong)
val cal = Calendar.getInstance()
cal.setTime(date)
val date_week = weekArr(cal.get(Calendar.DAY_OF_WEEK))
(d(0),date_week)
})
//value_date.collect().foreach(println)
val week_gp: RDD[((String, Int), Iterable[(String, Int)])] = value_date.groupBy(week => week)
//week_gp.collect().foreach(println)
val week_tj: RDD[((String, Int), Int)] = week_gp.map(week => (week._1, week._2.toList.count(num => num != null)))
week_tj.collect().foreach(println)
val week_px: RDD[((String, Int), Int)] = week_tj.sortBy({
t => t._2
},false)
.sortBy({
t => t._1._2
},true)
week_px.map(k => k._1._1+"\t"+k._1._2+"\t"+k._2)
.saveAsTextFile("data/shopping_habit5")
sc.stop()
}
}
1讀入數(shù)據(jù)拆分,首先要對(duì)購買日期進(jìn)行處理,將毫秒值轉(zhuǎn)換成星期,因?yàn)橹型馕幕町?,國外一周的第一天是周六,所以有編寫了函?shù)date_week()將其轉(zhuǎn)化成中式的
并且將其映射成(用戶id,周幾)
???????
2通過groupby分組 CompactBuffer的長度就是購買次數(shù)
???????
3通過map操作構(gòu)成((用戶id,周幾),次數(shù))
???????
4根據(jù)購買次數(shù)排序,根據(jù)周數(shù)排序要將同一周的排在一起
結(jié)果如下
???????
九 、開發(fā)過程中遇到的技術(shù)難點(diǎn)有哪些?如何解決的?
1.針對(duì)同一店鋪或者同一賣家,進(jìn)行風(fēng)格累加時(shí),會(huì)造成風(fēng)格重復(fù)問題
解決:運(yùn)用mapvalues針對(duì)值進(jìn)行操作,按照‘;’進(jìn)行分割,遍歷通過是否包含來達(dá)到去重的目的。
2.join問題,不理解這個(gè)join操作認(rèn)為這一步?jīng)]有用
解決:仔細(xì)發(fā)現(xiàn)我們所需要的數(shù)據(jù)在兩個(gè)文件中,只能通過join操作鏈接,這個(gè)操作類似于數(shù)據(jù)空中的鏈接操作。
3.排序問題(最后一步的)
解決:如果用兩個(gè)單獨(dú)的排序達(dá)不到想要的結(jié)果,只有連續(xù)的排序才可以,就是先安周數(shù)排序,再組內(nèi)排序。
十、本項(xiàng)目開發(fā)中獲得的經(jīng)驗(yàn)和不足?
??????? 經(jīng)驗(yàn):親身經(jīng)歷了從數(shù)據(jù)清洗到結(jié)果輸出的過程,越發(fā)理解大數(shù)據(jù)的內(nèi)涵,把大量的,雜亂的數(shù)據(jù)通過spark技術(shù)得出相應(yīng)的更加立體的數(shù)據(jù)結(jié)果的時(shí)候,才真正發(fā)現(xiàn)數(shù)據(jù)的價(jià)值,更能通過這些數(shù)據(jù)結(jié)果對(duì)未來進(jìn)行預(yù)測(cè),是一件非常有成就感的事情,借此希望學(xué)習(xí)更多的大數(shù)據(jù)技術(shù)。文章來源:http://www.zghlxwxcb.cn/news/detail-402391.html
??????? 不足:發(fā)現(xiàn)自己處理數(shù)據(jù)的思路單一,不連貫,不夠抽象化,可能還是接觸項(xiàng)目太少吧,希望以后浪潮能夠提供更多的項(xiàng)目案例,提升實(shí)戰(zhàn)經(jīng)驗(yàn)提高自我。文章來源地址http://www.zghlxwxcb.cn/news/detail-402391.html
到了這里,關(guān)于【spark大數(shù)據(jù)】spark大數(shù)據(jù)處理技術(shù)入門項(xiàng)目--購物信息分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!