1、基礎(chǔ)準(zhǔn)備
?pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
# 導(dǎo)包
from pyspark import SparkConf,SparkContext
#創(chuàng)建SparkConf類對象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
#基于SparkXConf類對象創(chuàng)建SparkContext對象
sc=SparkContext(conf=conf)
#打印PySpark的運行版本
print(sc.version)
#停止SparkContext對象的運行(停止pySpark程序)
sc.stop()
?2、數(shù)據(jù)輸入
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 通過parallelize方法將Python對象加載到Spark內(nèi),成為RDD對象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize("abcdefg")
rdd4=sc.parallelize({1,2,3,4,5})
rdd5=sc.parallelize({"key1":"value1","key2":"value2"})
#如果要查看RDD里面有什么內(nèi)容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
sc.stop()
注意:字符串返回的是['a','b','c','d','e','f','g']? ?字典返回的是['key1','key2']??
讀取hello.txt的內(nèi)容:
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# # 通過parallelize方法將Python對象加載到Spark內(nèi),成為RDD對象
# rdd1=sc.parallelize([1,2,3,4,5])
# rdd2=sc.parallelize((1,2,3,4,5))
# rdd3=sc.parallelize("abcdefg")
# rdd4=sc.parallelize({1,2,3,4,5})
# rdd5=sc.parallelize({"key1":"value1","key2":"value2"})
#
# #如果要查看RDD里面有什么內(nèi)容,需要用collect()方法
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())
#用textFile方法,讀取文件數(shù)據(jù)加載到Spark內(nèi),成為RDD對象
rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/hello.txt")
print(rdd.collect())
sc.stop()
3、數(shù)據(jù)計算-map方法
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 準(zhǔn)備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#通過map方法將全部數(shù)據(jù)都乘以10
def func(data):
return data*10
rdd2=rdd.map(func) #(T) -> U
#(T) -> T
print(rdd2.collect())
#鏈?zhǔn)秸{(diào)用
注意:
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
如果沒有添加上行代碼程序會報出錯誤!
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
?解釋器的位置: (是在電腦中安裝的位置)
代碼中:
def func(data):
return data*10
可以替換成lambda
rdd2=rdd.map(lambda x:x*10)
完整代碼:
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 準(zhǔn)備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#通過map方法將全部數(shù)據(jù)都乘以10
# def func(data):
# return data*10
rdd2=rdd.map(lambda x:x*10) #(T) -> U
#(T) -> T
print(rdd2.collect())
#鏈?zhǔn)秸{(diào)用
?鏈?zhǔn)秸{(diào)用 可以直接使用.的方式
rdd2=rdd.map(lambda x:x*10).map(lambda x:x+5)
完整代碼:
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 準(zhǔn)備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#通過map方法將全部數(shù)據(jù)都乘以10
# def func(data):
# return data*10
rdd2=rdd.map(lambda x:x*10).map(lambda x:x+5) #(T) -> U
#(T) -> T
print(rdd2.collect())
#鏈?zhǔn)秸{(diào)用
5、flatMap方法
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 準(zhǔn)備一個RDD
rdd=sc.parallelize(["itheima itcast 666","itheima itheima it cast","python itheima"])
#需求,將RDD數(shù)據(jù)里面的一個個單詞提取出來
rdd2=rdd.map(lambda x:x.split(" "))
rdd1=rdd.flatMap(lambda x:x.split(" "))
print(rdd1.collect())
print(rdd2.collect())
flatMap算子
計算邏輯和map一樣
可以比map多出,接觸一層嵌套的功能
6、 reduceByKey算子
reduceBeKey中的聚合邏輯是:
[1,2,3,4,5] 然后聚合函數(shù):lambda a,b:a+b
a? b
1+2=3
? ? 3+3=6
? ? ? 6+4=10
? ? ? ? ? 10+5=15
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個RDD
rdd =sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生兩組的成績之和
rdd2=rdd.reduceByKey(lambda a,b:a+b)
print(rdd2.collect())
出現(xiàn):?UserWarning: Please install psutil to have better support with spilling??
在cmd中 pip install psutil 即可
?文章來源地址http://www.zghlxwxcb.cn/news/detail-745160.html
?7、數(shù)據(jù)計算練習(xí)案例
要求:
# 完成單詞計數(shù)統(tǒng)計 # 1.構(gòu)建執(zhí)行環(huán)境入口對象 # 2.讀取數(shù)據(jù)文件 # 3.取出全部單詞 # 4.將所有單詞都轉(zhuǎn)換為二元元組,單詞為Key value 設(shè)置為1 # 5.分組并求和 # 6.打印輸出
# 完成單詞計數(shù)統(tǒng)計
# 1.構(gòu)建執(zhí)行環(huán)境入口對象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#2.讀取數(shù)據(jù)文件
rdd=sc.textFile("C:/Users/GYH\Desktop/data/pyspark_heima/hello.txt")
#3.取出全部單詞
word_rdd=rdd.flatMap(lambda x:x.split(" "))
# print(word_rdd.collect())
#4.將所有單詞都轉(zhuǎn)換為二元元組,單詞為Key value 設(shè)置為1
word_with_one_rdd=word_rdd.map(lambda word:(word,1))
# print(word_with_one_rdd.collect())
#5.分組并求和
result=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
#6打印輸出
print(result.collect())
8、filter方法?
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
# 對RDD的數(shù)據(jù)進行過濾
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
9、distinct方法
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個RDD
rdd=sc.parallelize([1,1,3,3,5,5,7,8,8,9,10])
#對RDD的數(shù)據(jù)進行去重
rdd2=rdd.distinct()
print(rdd2.collect())
?
10、sortBy方法
rdd.sortBy(func,ascending=Flase,numPartition=1)
#func(T)-->U:告知按照rdd中的哪一個數(shù)據(jù)進行排序,比如lambda x:x[1]表示按照rdd中的第二列元素進行排序
#ascending True升序 Flase降序
#numPartitions:用多少分區(qū)排序
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#1.讀取文件
rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/hello.txt")
#2.取出全部單詞
word_rdd=rdd.flatMap(lambda x:x.split(" "))
#3.將所有單詞都轉(zhuǎn)換為二元元組,單詞為Key,value設(shè)置為1
word_with_one_rdd=word_rdd.map(lambda word:(word,1))
#4.分組并求和
ressult_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
print(ressult_rdd.collect())
#5.對結(jié)果進行排序
final_rdd=ressult_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())
11、數(shù)據(jù)計算-練習(xí)案例2
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# TODO 需求1 城市銷售額排名
# 1.1 讀取文件到RDD
file_rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/orders.txt")
# 1.2 取出一個JSON字符串
json_str_rdd=file_rdd.flatMap(lambda x:x.split("|"))
# print(json_str_rdd)
# 1.3 將一個個JSON字符串轉(zhuǎn)換為字典
dict_rdd=json_str_rdd.map(lambda x:json.loads(x))
print(dict_rdd.collect())
# 1.4 取出城市和銷售數(shù)據(jù)
# (城市,銷售額)
city_with_money_rdd=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
# 1.5 按城市分組按銷售聚合
city_result_edd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按銷售額聚合結(jié)果進行排序
result1_rdd=city_result_edd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(f"需求1的結(jié)果是{result1_rdd.collect()}")
# TODO 需求2:取出城市有哪些商品類別在銷售
# 2.1 取出全部的商品類別
category_rdd=dict_rdd.map(lambda x:x['category']).distinct()
print(f"需求2的結(jié)果{category_rdd.collect()}")
#2.2 對全部商品類別進行去重
# TODO 需求3
# 3.1過濾北京市的數(shù)據(jù)
beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
# 3.2 取出全部商品類別
result3_rdd=beijing_data_rdd.map(lambda x:x['category']).distinct()
print(f"需求3的結(jié)果:{result3_rdd.collect()}")
# 3.3 進行商品類別去重
#12、輸出為Python對象
數(shù)據(jù)輸出的方法
collect 將RDD內(nèi)容轉(zhuǎn)換為list
reduce 對RDD內(nèi)容進行自定義聚合
take 取出RDD的前N個元素組成list
count 統(tǒng)計RDD元素個數(shù)
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備RDD
rdd=sc.parallelize([1,2,3,4,5])
# collect算子,輸出RDD為list的對象
rdd_list:list=rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce 算子,對RDD進行兩兩聚合
num=rdd.reduce(lambda a,b:a+b)
print(num)
# take算子,取出RDD前N個元素,組成list返回
take_list=rdd.take(3)
print(take_list)
# count,統(tǒng)計rdd內(nèi)有多少條數(shù)據(jù),返回值為數(shù)字
number_count=rdd.count()
print(f"rdd內(nèi)有{number_count}個元素")
sc.stop()
?#13、數(shù)據(jù)輸出到文件中
1、下載hadoop3.3.0壓縮包
? ? ? 百度網(wǎng)盤:鏈接:https://pan.baidu.com/s/1y4a2w4D8zCzYKEDY9aPWtw?
? ? ? ? ? ? ? ? ? 提取碼:1234
? ? ? hadoop3.3.0解壓到任意位置即可
2、將haoop3.3.0的bin文件夾下的 hadoop.dll? 復(fù)制到C:\Windows\System32中
在pycharm中添加如下代碼
os.environ['HADOOP_HOME']="E:/spark/hadoop-3.3.0"
?運行后成功寫入:
?14、綜合案例
讀取文件轉(zhuǎn)換成RDD,并完成:
打印輸出:熱門搜索時間段(小時精度)Top3
打印輸出:統(tǒng)計黑馬程序員關(guān)鍵字在哪個時段被搜索最多
將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫出文件
1、熱門搜索時間段(小時精度)Top3
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON']="C:/Users/GYH/AppData/Local/Programs/Python/Python310/python.exe" #python解釋器的位置
os.environ['HADOOP_HOME']="E:/spark/hadoop-3.3.0"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
# 讀取文件轉(zhuǎn)換成RDD
# TODO 需求1:熱門城市時間段TOP3(小時精度)
file_rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/SogouQ.txt")
#1.1取出全部時間并轉(zhuǎn)換為小時
#1.2轉(zhuǎn)換為(小時,1)的二元元組
#1.3Key分組聚合Value
#1.4排序(降序)
#1.5取前3
result1=file_rdd.map(lambda x:x.split("\t")).\
map(lambda x:x[0][:2]).\
map(lambda x:(x,1)).\
reduceByKey(lambda a,b:a+b).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)
print(f"需求1的結(jié)果:{result1}")
?# TODO 需求2:熱門搜索詞TOP3
# TODO 需求2:熱門搜索詞TOP3
# 2.1 取出全部搜索詞
# 2.2 (詞,1) 二元元組
# 2.3 分組聚合
# 2.4 排序
# 2.5 TOP3
file_rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/SogouQ.txt")
result2=file_rdd.map(lambda x:x.split('\t')).\
map(lambda x:x[2]).\
map(lambda x:(x,1)).\
reduceByKey(lambda a,b:a+b).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)
print(f"需求2的結(jié)果:{result2}")
?# TODO 需求3:統(tǒng)計黑馬程序員關(guān)鍵字在什么時段被搜索的最多
# TODO 需求3:統(tǒng)計黑馬程序員關(guān)鍵字在什么時段被搜索的最多
# 3.1 過濾內(nèi)容 只能保留黑馬程序員關(guān)鍵字
# 3.2 轉(zhuǎn)換為(小時,1)的二元元組
# 3.3 Key分組聚合Value
# 3.4 排序(降序)
# 3.5 取前1
file_rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/SogouQ.txt")
result3=file_rdd.map(lambda x:x.split("\t")).\
filter(lambda x:x[2]=='黑馬程序員').\
map(lambda x:(x[0][:2],1)).\
reduceByKey(lambda a,b:a+b).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(1)
print(f"需求3的結(jié)果{result3}")
?# TODO 需求4:將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫到文件中
# TODO 需求4:將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫到文件中
file_rdd=sc.textFile("C:/Users/GYH/Desktop/data/pyspark_heima/SogouQ.txt")
# 4.1 轉(zhuǎn)換為JSON格式的RDD
# 4.2 寫出為文件
file_rdd.map(lambda x:x.split("\t")).\
map(lambda x:{"time":x[0],"user_id":x[1],"key_word":x[2],"rank1":x[3],"rank2":x[4],"url":x[5]}).\
saveAsTextFile("C:/Users/GYH/Desktop/data/pyspark_heima/output1_JSON")
打開output1_JSON文件夾下的part_00000?
?成功寫入:
文章來源:http://www.zghlxwxcb.cn/news/detail-745160.html
?
到了這里,關(guān)于Python黑馬程序員(Spark實戰(zhàn))筆記的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!