一、RDD 簡介
1、RDD 概念
RDD 英文全稱為 " Resilient Distributed Datasets " , 對應(yīng)中文名稱 是 " 彈性分布式數(shù)據(jù)集 " ;
Spark 是用于 處理大規(guī)模數(shù)據(jù) 的 分布式計(jì)算引擎 ;
RDD 是 Spark 的基本數(shù)據(jù)單元 , 該 數(shù)據(jù)結(jié)構(gòu) 是 只讀的 , 不可寫入更改 ;
RDD 對象 是 通過 SparkContext 執(zhí)行環(huán)境入口對象 創(chuàng)建的 ;
SparkContext 讀取數(shù)據(jù)時(shí) , 通過將數(shù)據(jù)拆分為多個(gè)分區(qū) , 以便在 服務(wù)器集群 中進(jìn)行并行處理 ;
每個(gè) RDD 數(shù)據(jù)分區(qū) 都可以在 服務(wù)器集群 中的 不同服務(wù)器節(jié)點(diǎn) 上 并行執(zhí)行 計(jì)算任務(wù) , 可以提高數(shù)據(jù)處理速度 ;
2、RDD 中的數(shù)據(jù)存儲與計(jì)算
PySpark 中 處理的 所有的數(shù)據(jù) ,
- 數(shù)據(jù)存儲 : PySpark 中的數(shù)據(jù)都是以 RDD 對象的形式承載的 , 數(shù)據(jù)都存儲在 RDD 對象中 ;
- 計(jì)算方法 : 大數(shù)據(jù)處理過程中使用的計(jì)算方法 , 也都定義在了 RDD 對象中 ;
- 計(jì)算結(jié)果 : 使用 RDD 中的計(jì)算方法對 RDD 中的數(shù)據(jù)進(jìn)行計(jì)算處理 , 獲得的結(jié)果數(shù)據(jù)也是封裝在 RDD 對象中的 ;
PySpark 中 , 通過 SparkContext 執(zhí)行環(huán)境入口對象 讀取 基礎(chǔ)數(shù)據(jù)到 RDD 對象中 , 調(diào)用 RDD 對象中的計(jì)算方法 , 對 RDD 對象中的數(shù)據(jù)進(jìn)行處理 , 得到新的 RDD 對象 其中有 上一次的計(jì)算結(jié)果 , 再次對新的 RDD 對象中的數(shù)據(jù)進(jìn)行處理 , 執(zhí)行上述若干次計(jì)算 , 會 得到一個(gè)最終的 RDD 對象 , 其中就是數(shù)據(jù)處理結(jié)果 , 將其保存到文件中 , 或者寫入到數(shù)據(jù)庫中 ;
二、Python 容器數(shù)據(jù)轉(zhuǎn) RDD 對象
1、RDD 轉(zhuǎn)換
在 Python 中 , 使用 PySpark 庫中的 SparkContext # parallelize 方法 , 可以將 Python 容器數(shù)據(jù) 轉(zhuǎn)換為 PySpark 的 RDD 對象 ;
PySpark 支持下面幾種 Python 容器變量 轉(zhuǎn)為 RDD 對象 :
- 列表 list : 可重復(fù) , 有序元素 ;
- 元組 tuple : 可重復(fù) , 有序元素 , 可讀不可寫 , 不可更改 ;
- 集合 set : 不可重復(fù) , 無序元素 ;
- 字典 dict : 鍵值對集合 , 鍵 Key 不可重復(fù) ;
- 字符串 str : 字符串 ;
2、轉(zhuǎn)換 RDD 對象相關(guān) API
調(diào)用 SparkContext # parallelize 方法 可以將 Python 容器數(shù)據(jù)轉(zhuǎn)為 RDD 對象 ;
# 將數(shù)據(jù)轉(zhuǎn)換為 RDD 對象
rdd = sparkContext.parallelize(data)
調(diào)用 RDD # getNumPartitions 方法 , 可以獲取 RDD 的分區(qū)數(shù) ;
print("RDD 分區(qū)數(shù)量: ", rdd.getNumPartitions())
調(diào)用 RDD # collect 方法 , 可以查看 RDD 數(shù)據(jù) ;
print("RDD 元素: ", rdd.collect())
完整代碼示例 :
# 創(chuàng)建一個(gè)包含列表的數(shù)據(jù)
data = [1, 2, 3, 4, 5]
# 將數(shù)據(jù)轉(zhuǎn)換為 RDD 對象
rdd = sparkContext.parallelize(data)
# 打印 RDD 的分區(qū)數(shù)和元素
print("RDD 分區(qū)數(shù)量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())
3、代碼示例 - Python 容器轉(zhuǎn) RDD 對象 ( 列表 )
在下面的代碼中 ,
首先 , 創(chuàng)建 SparkConf 對象 , 并將 PySpark 任務(wù) 命名為 " hello_spark " , 并設(shè)置為本地單機(jī)運(yùn)行 ;
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個(gè)名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
然后 , 創(chuàng)建了一個(gè) SparkContext 對象 , 傳入 SparkConf 實(shí)例對象作為參數(shù) ;
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
再后 , 創(chuàng)建一個(gè)包含整數(shù)的簡單列表 ;
# 創(chuàng)建一個(gè)包含列表的數(shù)據(jù)
data = [1, 2, 3, 4, 5]
再后 , 并使用 parallelize() 方法將其轉(zhuǎn)換為 RDD 對象 ;
# 將數(shù)據(jù)轉(zhuǎn)換為 RDD 對象
rdd = sparkContext.parallelize(data)
最后 , 我們打印出 RDD 的分區(qū)數(shù)和所有元素 ;
# 打印 RDD 的分區(qū)數(shù)和元素
print("RDD 分區(qū)數(shù)量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())
代碼示例 :
"""
PySpark 數(shù)據(jù)處理
"""
# 導(dǎo)入 PySpark 相關(guān)包
from pyspark import SparkConf, SparkContext
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個(gè)名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
# 打印 PySpark 版本號
print("PySpark 版本號 : ", sparkContext.version)
# 創(chuàng)建一個(gè)包含列表的數(shù)據(jù)
data = [1, 2, 3, 4, 5]
# 將數(shù)據(jù)轉(zhuǎn)換為 RDD 對象
rdd = sparkContext.parallelize(data)
# 打印 RDD 的分區(qū)數(shù)和元素
print("RDD 分區(qū)數(shù)量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())
# 停止 PySpark 程序
sparkContext.stop()
執(zhí)行結(jié)果 :
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:11:35 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本號 : 3.4.1
RDD 分區(qū)數(shù)量: 12
RDD 元素: [1, 2, 3, 4, 5]
Process finished with exit code 0
4、代碼示例 - Python 容器轉(zhuǎn) RDD 對象 ( 列表 / 元組 / 集合 / 字典 / 字符串 )
除了 列表 list 之外 , 還可以將其他容器數(shù)據(jù)類型 轉(zhuǎn)換為 RDD 對象 , 如 : 元組 / 集合 / 字典 / 字符串 ;
調(diào)用 RDD # collect 方法 , 打印出來的 RDD 數(shù)據(jù)形式 :
- 列表 / 元組 / 集合 轉(zhuǎn)換后的 RDD 數(shù)據(jù)打印出來都是列表 ;
data1 = [1, 2, 3, 4, 5]
data2 = (1, 2, 3, 4, 5)
data3 = {1, 2, 3, 4, 5}
# 輸出結(jié)果
rdd1 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
rdd2 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
rdd3 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
- 字典 轉(zhuǎn)換后的 RDD 數(shù)據(jù)打印出來只有 鍵 Key , 沒有值 ;
data4 = {"Tom": 18, "Jerry": 12}
# 輸出結(jié)果
rdd4 分區(qū)數(shù)量和元素: 12 , ['Tom', 'Jerry']
- 字符串 轉(zhuǎn)換后的 RDD 數(shù)據(jù)打印出來 是 列表 , 元素是單個(gè)字符 ;
data5 = "Tom"
# 輸出結(jié)果
rdd5 分區(qū)數(shù)量和元素: 12 , ['T', 'o', 'm']
代碼示例 :
"""
PySpark 數(shù)據(jù)處理
"""
# 導(dǎo)入 PySpark 相關(guān)包
from pyspark import SparkConf, SparkContext
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個(gè)名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
# 打印 PySpark 版本號
print("PySpark 版本號 : ", sparkContext.version)
# 創(chuàng)建一個(gè)包含列表的數(shù)據(jù)
data1 = [1, 2, 3, 4, 5]
data2 = (1, 2, 3, 4, 5)
data3 = {1, 2, 3, 4, 5}
data4 = {"Tom": 18, "Jerry": 12}
data5 = "Tom"
# 將數(shù)據(jù)轉(zhuǎn)換為 RDD 對象
rdd1 = sparkContext.parallelize(data1)
rdd2 = sparkContext.parallelize(data2)
rdd3 = sparkContext.parallelize(data3)
rdd4 = sparkContext.parallelize(data4)
rdd5 = sparkContext.parallelize(data5)
# 打印 RDD 的元素
print("rdd1 分區(qū)數(shù)量和元素: ", rdd1.getNumPartitions(), " , ", rdd1.collect())
print("rdd2 分區(qū)數(shù)量和元素: ", rdd2.getNumPartitions(), " , ", rdd2.collect())
print("rdd3 分區(qū)數(shù)量和元素: ", rdd3.getNumPartitions(), " , ", rdd3.collect())
print("rdd4 分區(qū)數(shù)量和元素: ", rdd4.getNumPartitions(), " , ", rdd4.collect())
print("rdd5 分區(qū)數(shù)量和元素: ", rdd5.getNumPartitions(), " , ", rdd5.collect())
# 停止 PySpark 程序
sparkContext.stop()
執(zhí)行結(jié)果 :
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:37:03 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:37:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本號 : 3.4.1
rdd1 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
rdd2 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
rdd3 分區(qū)數(shù)量和元素: 12 , [1, 2, 3, 4, 5]
rdd4 分區(qū)數(shù)量和元素: 12 , ['Tom', 'Jerry']
rdd5 分區(qū)數(shù)量和元素: 12 , ['T', 'o', 'm']
Process finished with exit code 0
三、文件文件轉(zhuǎn) RDD 對象
調(diào)用 SparkContext#textFile 方法 , 傳入 文件的 絕對路徑 或 相對路徑 , 可以將 文本文件 中的數(shù)據(jù) 讀取并轉(zhuǎn)為 RDD 數(shù)據(jù) ;
文本文件數(shù)據(jù) :
Tom
18
Jerry
12
代碼示例 :
"""
PySpark 數(shù)據(jù)處理
"""
# 導(dǎo)入 PySpark 相關(guān)包
from pyspark import SparkConf, SparkContext
# 創(chuàng)建 SparkConf 實(shí)例對象 , 該對象用于配置 Spark 任務(wù)
# setMaster("local[*]") 表示在單機(jī)模式下 本機(jī)運(yùn)行
# setAppName("hello_spark") 是給 Spark 程序起一個(gè)名字
sparkConf = SparkConf() \
.setMaster("local[*]") \
.setAppName("hello_spark")
# 創(chuàng)建 PySpark 執(zhí)行環(huán)境 入口對象
sparkContext = SparkContext(conf=sparkConf)
# 打印 PySpark 版本號
print("PySpark 版本號 : ", sparkContext.version)
# 讀取文件內(nèi)容到 RDD 中
rdd = sparkContext.textFile("data.txt")
# 打印 RDD 的元素
print("rdd1 分區(qū)數(shù)量和元素: ", rdd.getNumPartitions(), " , ", rdd.collect())
# 停止 PySpark 程序
sparkContext.stop()
執(zhí)行結(jié)果 :文章來源:http://www.zghlxwxcb.cn/news/detail-621775.html
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:43:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本號 : 3.4.1
rdd1 分區(qū)數(shù)量和元素: 2 , ['Tom', '18', 'Jerry', '12']
Process finished with exit code 0
文章來源地址http://www.zghlxwxcb.cn/news/detail-621775.html
到了這里,關(guān)于【Python】PySpark 數(shù)據(jù)輸入 ① ( RDD 簡介 | RDD 中的數(shù)據(jù)存儲與計(jì)算 | Python 容器數(shù)據(jù)轉(zhuǎn) RDD 對象 | 文件文件轉(zhuǎn) RDD 對象 )的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!