零、本節(jié)學習目標
- 利用RDD計算總分與平均分
- 利用RDD統(tǒng)計每日新增用戶
- 利用RDD實現(xiàn)分組排行榜
一、利用RDD計算總分與平均分
(一)提出任務
- 針對成績表,計算每個學生總分和平均分
?文章來源地址http://www.zghlxwxcb.cn/news/detail-462220.html
(二)實現(xiàn)思路
- 讀取成績文件,生成lines;定義二元組成績列表;遍歷lines,填充二元組成績列表;基于二元組成績列表創(chuàng)建RDD;對rdd按鍵歸約得到rdd1,計算總分;將rdd1映射成rdd2,計算總分與平均分。
(三)準備工作
1、啟動HDFS服務
- 執(zhí)行命令:
start-dfs.sh
2、啟動Spark服務
- 執(zhí)行命令:
start-all.sh
3、在本地創(chuàng)建成績文件
- 在
/home
里創(chuàng)建scores.txt
文件
4、將成績文件上傳到HDFS
- 在HDFS上創(chuàng)建
/scoresumavg/input
目錄,將成績文件上傳至該目錄
?
四)完成任務
1、在Spark Shell里完成任務
(1)讀取成績文件,生成RDD
- 執(zhí)行命令:
val lines = sc.textFile("hdfs://master:9000/scoresumavg/input/scores.txt")
?
(2)定義二元組成績列表
- 執(zhí)行命令:
import scala.collection.mutable.ListBuffer
- 執(zhí)行命令:
val scores = new ListBuffer[(String, Int)]()
?
(3)利用RDD填充二元組成績列表
lines.collect.foreach(line => {
val fields = line.split(" ")
scores.append((fields(0), fields(1).toInt))
scores.append((fields(0), fields(2).toInt))
scores.append((fields(0), fields(3).toInt))
})
scores.foreach(println)
- 執(zhí)行上述代碼
?
(4)基于二元組成績列表創(chuàng)建RDD
- 執(zhí)行命令:
val rdd = sc.makeRDD(scores);
?
?
(5)對rdd按鍵歸約得到rdd1,計算總分
- 執(zhí)行命令:
val rdd1 = rdd.reduceByKey(_ + _)
(6)將rdd1映射成rdd2,計算總分與平均分
- 執(zhí)行命令:
val rdd2 = rdd1.map(score => (score._1, score._2, (score._2 / 3.0).formatted("%.2f")))
?
2、在IntelliJ IDEA里完成任務
(1)打開RDD項目
SparkRDDDemo
(2)創(chuàng)建計算總分平均分對象
- 在
net.cl.rdd
包里創(chuàng)建day07
子包,然后在子包里創(chuàng)建CalculateSumAvg
對象
package net.huawei.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* 功能:統(tǒng)計總分與平均分
* 作者:華衛(wèi)
* 日期:2023年05月11日
*/
object CalculateSumAvg {
def main(args: Array[String]): Unit = {
// 創(chuàng)建Spark配置對象
val conf = new SparkConf()
.setAppName("CalculateSumAvg ") // 設置應用名稱
.setMaster("local[*]") // 設置主節(jié)點位置(本地調(diào)試)
// 基于Spark配置對象創(chuàng)建Spark容器
val sc = new SparkContext(conf)
// 讀取成績文件,生成RDD
val lines = sc.textFile("hdfs://master:9000/scoresumavg/input/scores.txt")
// 定義二元組成績列表
val scores = new ListBuffer[(String, Int)]()
// 利用RDD填充二元組成績列表
lines.collect.foreach(line => {
val fields = line.split(" ")
scores.append((fields(0), fields(1).toInt))
scores.append((fields(0), fields(2).toInt))
scores.append((fields(0), fields(3).toInt))
})
// 基于二元組成績列表創(chuàng)建RDD
val rdd = sc.makeRDD(scores);
// 對rdd按鍵歸約得到rdd1,計算總分
val rdd1 = rdd.reduceByKey(_ + _)
// 將rdd1映射成rdd2,計算總分與平均分
val rdd2 = rdd1.map(score => (score._1, score._2, (score._2 / 3.0).formatted("%.2f")))
// 在控制臺輸出rdd2的內(nèi)容
rdd2.collect.foreach(println)
// 將rdd2內(nèi)容保存到HDFS指定位置
rdd2.saveAsTextFile("hdfs://master:9000/scoresumavg/output")
// 關閉Spark容器
sc.stop()
}
}
(3)運行程序,查看結果
- 運行程序
CalculateSumAvg
,控制臺結果
- 查看HDFS的結果文件
?
二、利用RDD統(tǒng)計每日新增用戶
(一)提出任務
- 已知有以下用戶訪問歷史數(shù)據(jù),第一列為用戶訪問網(wǎng)站的日期,第二列為用戶名。
2023-05-01,mike
2023-05-01,alice
2023-05-01,brown
2023-05-02,mike
2023-05-02,alice
2023-05-02,green
2023-05-03,alice
2023-05-03,smith
2023-05-03,brian
?
?
- 現(xiàn)需要根據(jù)上述數(shù)據(jù)統(tǒng)計每日新增的用戶數(shù)量,期望統(tǒng)計結果。
2023-05-01新增用戶數(shù):3
2023-05-02新增用戶數(shù):1
2023-05-03新增用戶數(shù):2
- 即2023-05-01新增了3個用戶(分別為mike、alice、brown),2023-05-02新增了1個用戶(green),2023-05-03新增了兩個用戶(分別為smith、brian)。
(二)實現(xiàn)思路
- 使用倒排索引法,若將用戶名看作關鍵詞,訪問日期看作文檔ID,則用戶名與訪問日期的映射關系如下圖所示。
- 若同一個用戶對應多個訪問日期,則最小的日期為該用戶的注冊日期,即新增日期,其他日期為重復訪問日期,不應統(tǒng)計在內(nèi)。因此每個用戶應該只計算用戶訪問的最小日期即可。如下圖所示,將每個用戶訪問的最小日期都移到第一列,第一列為有效數(shù)據(jù),只統(tǒng)計第一列中每個日期的出現(xiàn)次數(shù),即為對應日期的新增用戶數(shù)。
?
(三)準備工作
1、在本地創(chuàng)建用戶文件
- 在
/home
目錄里創(chuàng)建users.txt
文件
2、將用戶文件上傳到HDFS指定位置
- 先創(chuàng)建
/newusers/input
目錄,再將用戶文件上傳到該目錄
(四)完成任務
1、在Spark Shell里完成任務
(1)讀取文件,得到RDD
- 執(zhí)行命令:
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
?
(2)倒排,互換RDD中元組的元素順序
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)
rdd2.collect.foreach(println)
- 執(zhí)行上述語句
?
(3)倒排后的RDD按鍵分組
- 執(zhí)行命令:
val rdd3 = rdd2.groupByKey()
?
(4)取分組后的日期集合最小值,計數(shù)為1
- 執(zhí)行命令:
val rdd4 = rdd3.map(line => (line._2.min, 1))
(5)按鍵計數(shù),得到每日新增用戶數(shù)
- 執(zhí)行命令:
val result = rdd4.countByKey()
- 執(zhí)行命令:
result.keys.foreach(key => println(key + "新增用戶:" + result(key)))
(6)讓輸出結果按日期升序
- 映射不能直接排序,只能讓鍵集轉(zhuǎn)成列表之后先排序,再遍歷鍵集輸出映射
- 執(zhí)行命令:
val keys = result.keys.toList.sorted
,讓鍵集升序排列
?
2、在IntelliJ IDEA里完成任務
(1)打開RDD項目
- SparkRDDDemo
(2)創(chuàng)建統(tǒng)計新增用戶對象
- 在
net.cl.day07
包里創(chuàng)建CountNewUsers
對象
package net.cl.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
object CountNewUsers {
def main(args: Array[String]): Unit = {
// 創(chuàng)建Spark配置對象
val conf = new SparkConf()
.setAppName("CountNewUsers") // 設置應用名稱
.setMaster("local[*]") // 設置主節(jié)點位置(本地調(diào)試)
// 基于Spark配置對象創(chuàng)建Spark容器
val sc = new SparkContext(conf)
// 讀取文件,得到RDD
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
// 倒排,互換RDD中元組的元素順序
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)
// 倒排后的RDD按鍵分組
val rdd3 = rdd2.groupByKey()
// 取分組后的日期集合最小值,計數(shù)為1
val rdd4 = rdd3.map(line => (line._2.min, 1))
// 按鍵計數(shù),得到每日新增用戶數(shù)
val result = rdd4.countByKey()
// 讓統(tǒng)計結果按日期升序
val keys = result.keys.toList.sorted
keys.foreach(key => println(key + "新增用戶:" + result(key)))
// 停止Spark容器
sc.stop()
}
}
(3)運行程序,查看結果
- 運行程序
CountNewUsers
,控制臺結果
?
三、利用RDD實現(xiàn)分組排行榜
(一)提出任務
- 分組求TopN是大數(shù)據(jù)領域常見的需求,主要是根據(jù)數(shù)據(jù)的某一列進行分組,然后將分組后的每一組數(shù)據(jù)按照指定的列進行排序,最后取每一組的前N行數(shù)據(jù)。
- 有一組學生成績數(shù)據(jù)
張三豐 90
李孟達 85
張三豐 87
王曉云 93
李孟達 65
張三豐 76
王曉云 78
李孟達 60
張三豐 94
王曉云 97
李孟達 88
張三豐 80
王曉云 88
李孟達 82
王曉云 98
- 同一個學生有多門成績,現(xiàn)需要計算每個學生分數(shù)最高的前3個成績,期望輸出結果如下所示
張三豐:94 90 87 李孟達:88 85 82 王曉云:98 97 93
(二)實現(xiàn)思路
- 使用Spark RDD的
groupByKey()
算子可以對(key, value)
形式的RDD按照key進行分組,key相同的元素的value將聚合到一起,形成(key, value-list)
,將value-list
中的元素降序排列取前N個即可。
(三)準備工作
1、在本地創(chuàng)建成績文件
- 在
/home
目錄里創(chuàng)建grades.txt
文件
?
2、將成績文件上傳到HDFS上指定目錄
- 將
grades.txt
上傳到HDFS的/topn/input
目錄
?
(四)完成任務
1、在Spark Shell里完成任務
(1)讀取成績文件得到RDD
- 執(zhí)行命令:
val lines = sc.textFile("hdfs://master:9000/topn/input/grades.txt")
- 將
grades.txt
上傳到HDFS的/topn/input
目錄
?
(2)利用映射算子生成二元組構成的RDD
val grades = lines.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
})
grades.collect.foreach(println)
- 執(zhí)行上述代碼
?
(3)按鍵分組得到新的二元組構成的RDD
- 執(zhí)行命令:
val groupGrades = grades.groupByKey()
?
(4)按值排序,取前三
val top3 = groupGrades.map(item => {
val name = item._1
val top3 = item._2.toList.sortWith(_ > _).take(3)
(name, top3)
})
top3.collect.foreach(println)
- 執(zhí)行上述代碼
?
(5)按指定格式輸出結果
top3.collect.foreach(line => {
val name = line._1
var scores = ""
line._2.foreach(score => scores = scores + " " + score)
println(name + ":" + scores)
})
- 執(zhí)行上述代碼
?
- 其實,代碼可以優(yōu)化
?
2、在IntelliJ IDEA里完成任務
(1)打開RDD項目
SparkRDDDemo
(2)創(chuàng)建分組排行榜單例對象
- 在
net.cl.rdd.day07
包里創(chuàng)建GradeTopN
單例對象
package net.huawei.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
* 功能:成績分組排行榜
* 作者:華衛(wèi)
* 日期:2023年05月11日
*/
object GradeTopN {
def main(args: Array[String]): Unit = {
// 創(chuàng)建Spark配置對象
val conf = new SparkConf()
.setAppName("GradeTopN") // 設置應用名稱
.setMaster("local[*]") // 設置主節(jié)點位置(本地調(diào)試)
// 基于Spark配置對象創(chuàng)建Spark容器
val sc = new SparkContext(conf)
// 實現(xiàn)分組排行榜
val top3 = sc.textFile("hdfs://master:9000/topn/input/grades.txt")
.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
}) // 將每行成績映射成二元組(name, grade)
.groupByKey() // 按鍵分組
.map(item => {
val name = item._1
val top3 = item._2.toList.sortWith(_ > _).take(3)
(name, top3)
}) // 值排序,取前三
// 輸出分組排行榜結果
top3.collect.foreach(line => {
val name = line._1
val scores = line._2.mkString(" ")
println(name + ": " + scores)
})
// 停止Spark容器,結束任務
sc.stop()
}
}
(3)運行程序,查看結果
- 在控制臺查看輸出結果
文章來源:http://www.zghlxwxcb.cn/news/detail-462220.html
?
到了這里,關于Spark大數(shù)據(jù)處理講課筆記---Spark RDD典型案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!