大數(shù)據(jù)學(xué)習(xí)筆記02
Pyspark交互式編程
- 有該數(shù)據(jù)集Data01.txt 該數(shù)據(jù)集包含了某大學(xué)計算機系的成績,數(shù)據(jù)格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Jim,DataBase,90
Jim,Algorithm,60......
根據(jù)給定的數(shù)據(jù)集,在pyspark中通過編程來完成以下內(nèi)容:文章來源地址http://www.zghlxwxcb.cn/news/detail-403708.html
- 該系總共有多少學(xué)生; (提前啟動好pyspark)
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")加載文件
res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //獲取每行數(shù)據(jù)的第1列
distinct_res = res.distinct() //去重操作
distinct_res.count()//取元素總個數(shù)
- 該系共開設(shè)了多少門課程;
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) //獲取每行數(shù)據(jù)的第2列
distinct_res = res.distinct()//去重操作
distinct_res.count()//取元素總個數(shù)
- Tom同學(xué)的總成績平均分是多少;
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") //篩選Tom同學(xué)的成績信息
res.foreach(print)
score = res.map(lambda x:int(x[2])) //提取Tom同學(xué)的每門成績,并轉(zhuǎn)換為int類型
num = res.count() //Tom同學(xué)選課門數(shù)
sum_score = score.reduce(lambda x,y:x+y) //Tom同學(xué)的總成績
avg = sum_score/num // 總成績/門數(shù)=平均分
print(avg)
- 求每名同學(xué)的選修的課程門數(shù);
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) //學(xué)生每門課程都對應(yīng)(學(xué)生姓名,1),學(xué)生有n門課程則有n個(學(xué)生姓名,1)
each_res = res.reduceByKey(lambda x,y: x+y) //按學(xué)生姓名獲取每個學(xué)生的選課總數(shù)
each_res.foreach(print)
- 該系DataBase課程共有多少人選修;
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
res.count()
- 各門課程的平均分是多少;
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) //為每門課程的分數(shù)后面新增一列1,表示1個學(xué)生選擇了該課程。格式如('ComputerNetwork', (44, 1))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) //按課程名聚合課程總分和選課人數(shù)。格式如('ComputerNetwork', (7370, 142))
avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))//課程總分/選課人數(shù) = 平均分,并利用round(x,2)保留兩位小數(shù)
avg.foreach(print)
- 使用累加器計算共有多少人選了DataBase這門課。
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")//篩選出選了DataBase課程的數(shù)據(jù)
accum = sc.accumulator(0) //定義一個從0開始的累加器accum
res.foreach(lambda x:accum.add(1))//遍歷res,每掃描一條數(shù)據(jù),累加器加1
accum.value //輸出累加器的最終值
編寫?yīng)毩?yīng)用程序?qū)崿F(xiàn)數(shù)據(jù)去重
- 對于兩個輸入文件A和B,編寫Spark獨立應(yīng)用程序,對兩個文件進行合并,并剔除其中重復(fù)的內(nèi)容,得到一個新文件C。
- 假設(shè)當(dāng)前目錄為/usr/local/spark/mycode/remdup,該目錄下新建一個remdup.py文件
from pyspark import SparkContext#初始化SparkContext
sc = SparkContext('local','remdup')#加載兩個文件A和B
lines1 = sc.textFile("file:///usr/local/spark/mycode/remdup/A")
lines2 = sc.textFile("file:///usr/local/spark/mycode/remdup/B")#合并兩個文件的內(nèi)容
lines = lines1.union(lines2)#去重操作
distinct_lines = lines.distinct() #排序操作
res = distinct_lines.sortBy(lambda x:x)
#將結(jié)果寫入result文件中,repartition(1)的作用是讓結(jié)果合并到一個文件中,不加的話會結(jié)果寫入到兩個文件
res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/remdup/result")
#最后在目錄/usr/local/spark/mycode/remdup下執(zhí)行下面命令執(zhí)行程序(注意執(zhí)行程序時請先退出pyspark shell,否則會出現(xiàn)“地址已在使用”的警告);
編寫?yīng)毩?yīng)用程序?qū)崿F(xiàn)求平均值問題
- 每個輸入文件表示班級學(xué)生某個學(xué)科的成績,每行內(nèi)容由兩個字段組成,第一個是學(xué)生名字,第二個是學(xué)生的成績;編寫Spark獨立應(yīng)用程序求出所有學(xué)生的平均成績,并輸出到一個新文件中。
- 假設(shè)當(dāng)前目錄為/usr/local/spark/mycode/avgscore,在當(dāng)前目錄下新建一個avgscore.py,執(zhí)行如下代碼
from pyspark import SparkContext
sc = SparkContext('local',' avgscore')
lines1 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Algorithm.txt")
lines2 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Database.txt")
lines3 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Python.txt")
lines = lines1.union(lines2).union(lines3)
data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
result.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/avgscore/result")
文章來源:http://www.zghlxwxcb.cn/news/detail-403708.html
到了這里,關(guān)于Pyspark交互式編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!