一、RDD#reduceByKey 方法
1、RDD#reduceByKey 方法概念
RDD#reduceByKey 方法 是 PySpark 中 提供的計算方法 ,
- 首先 , 對 鍵值對 KV 類型 RDD 對象 數(shù)據(jù) 中 相同 鍵 key 對應(yīng)的 值 value 進(jìn)行分組 ,
- 然后 , 按照 開發(fā)者 提供的 算子 ( 邏輯 / 函數(shù) ) 進(jìn)行 聚合操作 ;
上面提到的 鍵值對 KV 型 的數(shù)據(jù) , 指的是 二元元組 , 也就是 RDD 對象中存儲的數(shù)據(jù)是 二元元組 ;
元組 可以看做為 只讀列表 ;
二元元組 指的是 元組 中的數(shù)據(jù) , 只有兩個 , 如 :
("Tom", 18)
("Jerry", 12)
PySpark 中 , 將 二元元組 中
- 第一個元素 稱為 鍵 Key ,
- 第二個元素 稱為 值 Value ;
按照 鍵 Key 分組 , 就是按照 二元元組 中的 第一個元素 的值進(jìn)行分組 ;
[("Tom", 18), ("Jerry", 12), ("Tom", 17), ("Jerry", 13)]
將上述列表中的 二元元組 進(jìn)行分組 , 按照 二元元組 第一個元素進(jìn)行分組 ,
-
("Tom", 18)
和("Tom", 17)
元組分為一組 , 在這一組中 , 將 18 和 17 兩個數(shù)據(jù)進(jìn)行聚合 , 如 : 相加操作 , 最終聚合結(jié)果是 35 ; -
("Jerry", 12)
和("Jerry", 13)
分為一組 ;
如果 鍵 Key 有 A, B, C 三個 值 Value 要進(jìn)行聚合 , 首先將 A 和 B 進(jìn)行聚合 得到 X , 然后將 X 與 C 進(jìn)行聚合得到新的值 Y ;
具體操作方法是 : 先將相同 鍵 key 對應(yīng)的 值 value 列表中的元素進(jìn)行 reduce 操作 , 返回一個減少后的值,并將該鍵值對存儲在RDD中 ;
2、RDD#reduceByKey 方法工作流程
RDD#reduceByKey 方法 工作流程 : reduceByKey(func)
;
- 首先 , 對 RDD 對象中的數(shù)據(jù) 分區(qū) , 每個分區(qū)中的相同 鍵 key 對應(yīng)的 值 value 被組成一個列表 ;
- 然后 , 對于 每個 鍵 key 對應(yīng)的 值 value 列表 , 使用 reduceByKey 方法提供的 函數(shù)參數(shù) func 進(jìn)行 reduce 操作 , 將列表中的元素減少為一個 ;
- 最后 , 將減少后的 鍵值對 存儲在新的 RDD 對象中 ;
3、RDD#reduceByKey 函數(shù)語法
RDD#reduceByKey 語法 :
reduceByKey(func, numPartitions=None)
- func 參數(shù) : 用于聚合的函數(shù) ;
- numPartitions 是可選參數(shù) , 指定 RDD 對象的分區(qū)數(shù) ;
傳入的 func 函數(shù)的類型為 :
(V, V) -> V
V 是泛型 , 指的是任意類型 , 上面的 三個 V 可以是任意類型 , 但是必須是 相同的類型 ;
該函數(shù) 接收 兩個 V 類型的參數(shù) , 參數(shù)類型要相同 , 返回一個 V 類型的返回值 , 傳入的兩個參數(shù)和返回值都是 V 類型的 ;
使用 reduceByKey 方法 , 需要保證函數(shù)的
- 可結(jié)合性 ( associativity ) : 將兩個具有 相同 參數(shù)類型 和 返回類型 的方法結(jié)合在一起 , 不會改變它們的行為的性質(zhì) ; 兩個方法結(jié)合使用的結(jié)果與執(zhí)行順序無關(guān) ;
- 可重入性 ( commutativity ) : 在多任務(wù)環(huán)境下 , 一個方法可以被多個任務(wù)調(diào)用 , 而不會出現(xiàn)數(shù)據(jù)競爭或狀態(tài)錯誤的問題 ;
以便在并行計算時能夠正確地聚合值列表 ;
二、代碼示例 - RDD#reduceByKey 方法
1、代碼示例
在下面的代碼中 , 要處理的數(shù)據(jù)是 列表 , 列表元素是 二元元組 ;
[("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)]
對 值 Value 進(jìn)行的聚合操作就是相加 , 也就是把同一個 鍵 Key 下的多個 Value 值 進(jìn)行相加操作 ,
# 應(yīng)用 reduceByKey 操作,將同一個 Key 下的 Value 相加
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
代碼示例 :
"""
PySpark 數(shù)據(jù)處理
"""
# 導(dǎo)入 PySpark 相關(guān)包
from pyspark import SparkConf, SparkContext
# 為 PySpark 配置 Python 解釋器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
# 打印 PySpark 版本號
print("PySpark 版本號 : ", sparkContext.version)
# 將 字符串列表 轉(zhuǎn)為 RDD 對象
rdd = sparkContext.parallelize([("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)])
# 應(yīng)用 reduceByKey 操作,將同一個 Key 下的 Value 相加
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
# 打印新的 RDD 中的內(nèi)容
print(rdd2.collect())
# 停止 PySpark 程序
sparkContext.stop()
2、執(zhí)行結(jié)果
D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/01 10:16:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/01 10:16:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本號 : 3.4.1
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
[('Jerry', 33), ('Tom', 21)]
Process finished with exit code 0
三、代碼示例 - 使用 RDD#reduceByKey 統(tǒng)計文件內(nèi)容
1、需求分析
給定一個 文本文件 word.txt , 文件內(nèi)容為 :
Tom Jerry
Tom Jerry Tom
Jack Jerry
讀取文件中的內(nèi)容 , 統(tǒng)計文件中單詞的個數(shù) ;
思路 :
- 先 讀取數(shù)據(jù)到 RDD 中 ,
- 然后 按照空格分割開 再展平 , 獲取到每個單詞 ,
- 根據(jù)上述單詞列表 , 生成一個 二元元組 列表 , 列表中每個元素的 鍵 Key 為單詞 , 值 Value 為 數(shù)字 1 ,
- 對上述 二元元組 列表 進(jìn)行 聚合操作 , 相同的 鍵 Key 對應(yīng)的 值 Value 進(jìn)行相加 ;
2、代碼示例
首先 , 讀取文件 , 將 文件轉(zhuǎn)為 RDD 對象 , 該 RDD 對象中 , 列表中的元素是 字符串 類型 , 每個字符串的內(nèi)容是 整行的數(shù)據(jù) ;
# 將 文件 轉(zhuǎn)為 RDD 對象
rdd = sparkContext.textFile("word.txt")
# 內(nèi)容為 ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']
然后 , 通過 flatMap 展平文件, 先按照 空格 切割每行數(shù)據(jù)為 字符串 列表 , 然后展平數(shù)據(jù)解除嵌套 ;
# 通過 flatMap 展平文件, 先按照 空格 切割每行數(shù)據(jù)為 字符串 列表
# 然后展平數(shù)據(jù)解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
# 內(nèi)容為 : ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']
再后 , 將 rdd 數(shù)據(jù) 的 列表中的元素 轉(zhuǎn)為二元元組 , 第一個元素設(shè)置為 單詞 字符串 , 第二個元素設(shè)置為 1
# 將 rdd 數(shù)據(jù) 的 列表中的元素 轉(zhuǎn)為二元元組, 第二個元素設(shè)置為 1
rdd3 = rdd2.map(lambda element: (element, 1))
# 內(nèi)容為 [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]
最后 , 應(yīng)用 reduceByKey 操作 , 對相同 鍵 Key 對應(yīng)的 值 Value 進(jìn)行聚合操作 , 將同一個 Key 下的 Value 相加, 也就是統(tǒng)計 鍵 Key 的個數(shù) ;
# 應(yīng)用 reduceByKey 操作,
# 將同一個 Key 下的 Value 相加, 也就是統(tǒng)計 鍵 Key 的個數(shù)
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
# [('Tom', 3), ('Jack', 1), ('Jerry', 3)]
代碼示例 :
"""
PySpark 數(shù)據(jù)處理
"""
# 導(dǎo)入 PySpark 相關(guān)包
from pyspark import SparkConf, SparkContext
# 為 PySpark 配置 Python 解釋器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
# 打印 PySpark 版本號
print("PySpark 版本號 : ", sparkContext.version)
# 將 文件 轉(zhuǎn)為 RDD 對象
rdd = sparkContext.textFile("word.txt")
print("查看文件內(nèi)容 : ", rdd.collect())
# 通過 flatMap 展平文件, 先按照 空格 切割每行數(shù)據(jù)為 字符串 列表
# 然后展平數(shù)據(jù)解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件內(nèi)容展平效果 : ", rdd2.collect())
# 將 rdd 數(shù)據(jù) 的 列表中的元素 轉(zhuǎn)為二元元組, 第二個元素設(shè)置為 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("轉(zhuǎn)為二元元組效果 : ", rdd3.collect())
# 應(yīng)用 reduceByKey 操作,
# 將同一個 Key 下的 Value 相加, 也就是統(tǒng)計 鍵 Key 的個數(shù)
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("最終統(tǒng)計單詞 : ", rdd4.collect())
# 停止 PySpark 程序
sparkContext.stop()
執(zhí)行結(jié)果 :文章來源:http://www.zghlxwxcb.cn/news/detail-623052.html
D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/01 11:25:24 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/01 11:25:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本號 : 3.4.1
查看文件內(nèi)容 : ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']
查看文件內(nèi)容展平效果 : ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']
轉(zhuǎn)為二元元組效果 : [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最終統(tǒng)計單詞 : [('Tom', 3), ('Jack', 1), ('Jerry', 3)]
Process finished with exit code 0
文章來源地址http://www.zghlxwxcb.cn/news/detail-623052.html
到了這里,關(guān)于【Python】PySpark 數(shù)據(jù)計算 ③ ( RDD#reduceByKey 函數(shù)概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 語法 | 代碼示例 )的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!