目錄
前言
一、PySpark基礎功能
?1.Spark SQL 和DataFrame
2.Pandas API on Spark
3.Streaming
4.MLBase/MLlib
5.Spark Core
二、PySpark依賴
Dependencies
三、DataFrame
1.創(chuàng)建
創(chuàng)建不輸入schema格式的DataFrame
創(chuàng)建帶有schema的DataFrame
從Pandas DataFrame創(chuàng)建
通過由元組列表組成的RDD創(chuàng)建
2.查看
DataFrame.show()
spark.sql.repl.eagerEval.enabled
縱向顯示
?查看DataFrame格式和列名
查看統(tǒng)計描述信息
PySpark DataFrame轉(zhuǎn)換為Pandas DataFrame
?3.查詢
添加新列實例:
條件查詢DataFrame.filter()
?4.運算
Pandas_udf
DataFrame.mapInPandas
5.分組
?聯(lián)合分組和應用函數(shù)
?6.獲取數(shù)據(jù)輸入/輸出
CSV
?Parquet
?ORC
?四、結(jié)合Spark SQL
點關(guān)注,防走丟,如有紕漏之處,請留言指教,非常感謝
前言
要想了解PySpark能夠干什么可以去看看我之前寫的文章,里面很詳細介紹了Spark的生態(tài):
Spark框架深度理解一:開發(fā)緣由及優(yōu)缺點
Spark框架深度理解二:生態(tài)圈
Spark框架深度理解三:運行架構(gòu)、核心數(shù)據(jù)集RDD
PySpark只是通過JVM轉(zhuǎn)換使得Python代碼能夠在Spark集群上識別運行。故Spark的絕大多數(shù)功能都可以被Python程序使用。
上篇文章:一文速學-PySpark數(shù)據(jù)分析基礎:PySpark原理詳解
已經(jīng)把PySpark運行原理講的很清楚了,現(xiàn)在我們需要了解PySpark語法基礎來逐漸編寫PySpark程序?qū)崿F(xiàn)分布式數(shù)據(jù)計算。
已搭建環(huán)境:
Spark:3.3.0
Hadoop:3.3.3
Scala:2.11.12
JDK:1.8.0_201
PySpark:3.1.2
一、PySpark基礎功能
PySpark是Python中Apache Spark的接口。它不僅可以使用Python API編寫Spark應用程序,還提供了PySpark shell,用于在分布式環(huán)境中交互分析數(shù)據(jù)。PySpark支持Spark的大多數(shù)功能,如Spark SQL、DataFrame、Streaming、MLlib(機器學習)和Spark Core。
?1.Spark SQL 和DataFrame
Spark SQL是用于結(jié)構(gòu)化數(shù)據(jù)處理的Spark模塊。它提供了一種稱為DataFrame的編程抽象,是由SchemaRDD發(fā)展而來。不同于SchemaRDD直接繼承RDD,DataFrame自己實現(xiàn)了RDD的絕大多數(shù)功能??梢园裇park SQL DataFrame理解為一個分布式的Row對象的數(shù)據(jù)集合。
Spark SQL已經(jīng)集成在spark-shell中,因此只要啟動spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中執(zhí)行SQL語句,需要使用SQLContext對象來調(diào)用sql()方法。Spark SQL對數(shù)據(jù)的查詢分成了兩個分支:SQLContext和HiveContext,其中HiveContext繼承了SQLContext,因此HiveContext除了擁有SQLContext的特性之外還擁有自身的特性。
Spark SQL允許開發(fā)人員直接處理RDD,同時也可查詢例如在 Apache Hive上存在的外部數(shù)據(jù)。Spark SQL的一個重要特點是其能夠統(tǒng)一處理關(guān)系表和RDD,使得開發(fā)人員可以輕松地使用SQL命令進行外部查詢,同時進行更復雜的數(shù)據(jù)分析。
2.Pandas API on Spark
Spark上的pandas API可以擴展使用 python pandas庫。
- 輕松切換到pandas API和PySpark API上下文,無需任何開銷。
- 有一個既適用于pandas(測試,較小的數(shù)據(jù)集)又適用于Spark(分布式數(shù)據(jù)集)的代碼庫。
- 熟練使用pandas的話很快上手
3.Streaming
Apache Spark中的Streaming功能運行在Spark之上,支持跨Streaming和歷史數(shù)據(jù)的強大交互和分析應用程序,同時繼承了Spark的易用性和容錯特性。Spark Streaming是將流式計算分解成一系列短小的批處理作業(yè)。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數(shù)據(jù)按照batch size(如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream),每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變?yōu)獒槍park中對RDD的Transformation操作,將RDD經(jīng)過操作變成中間結(jié)果保存在內(nèi)存中。
4.MLBase/MLlib
MLlib構(gòu)建在Spark之上,是一個可擴展的機器學習庫,它提供了一組統(tǒng)一的高級API,幫助用戶創(chuàng)建和調(diào)整實用的機器學習管道。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。
- ML Optimizer會選擇它認為最適合的已經(jīng)在內(nèi)部實現(xiàn)好了的機器學習算法和相關(guān)參數(shù),來處理用戶輸入的數(shù)據(jù),并返回模型或別的幫助分析的結(jié)果;
- MLI 是一個進行特征抽取和高級ML編程抽象的算法實現(xiàn)的API或平臺;
- MLlib是Spark實現(xiàn)一些常見的機器學習算法和實用程序,包括分類、回歸、聚類、協(xié)同過濾、降維以及底層優(yōu)化,該算法可以進行可擴充; MLRuntime 基于Spark計算框架,將Spark的分布式計算應用到機器學習領域。
?
5.Spark Core
Spark Core是Spark平臺的底層通用執(zhí)行引擎,所有其他功能都構(gòu)建在其之上。它提供了RDD(彈性分布式數(shù)據(jù)集)和內(nèi)存計算能力。
二、PySpark依賴
Dependencies
Package | 最低版本限制 | Note |
pandas | 1.0.5 | 支撐Spark SQL |
Numpy | 1.7 | 滿足支撐MLlib基礎API |
pyarrow | 1.0.0 | 支撐Spark SQL |
Py4j | 0.10.9.5 | 要求 |
pandas | 1.0.5 | pandas API on Spark需要 |
pyarrow | 1.0.0 | pandas API on Spark需要 |
Numpy | 1.14 | pandas API on Spark需要 |
請注意,PySpark需要Java 8或更高版本,并正確設置Java_HOME。如果使用JDK 11,請設置Dio.netty.tryReflectionSetAccessible=true
?以獲取與箭頭相關(guān)的功能。
AArch64(ARM64)用戶注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了對AArch64的PyArrow支持。如果由于PyArrow安裝錯誤導致PyArrow安裝在AArch64上失敗,可以按如下方式安裝PyArrow>=4.0.0:
pip install "pyarrow>=4.0.0" --prefer-binary
三、DataFrame
PySpark應用程序從初始化SparkSession開始,SparkSession是PySpark的入口點,如下所示。如果通過PySpark可執(zhí)行文件在PySpark shell中運行它,shell會自動在變量spark中為用戶創(chuàng)建會話。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
1.創(chuàng)建
PySpark DataFrame能夠通過pyspark.sql.SparkSession.createDataFrame創(chuàng)建,通常通過傳遞列表(list)、元組(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此類列表組成的RDD轉(zhuǎn)換。pyspark.sql.SparkSession.createDataFrame接收schema參數(shù)指定DataFrame的架構(gòu)(優(yōu)化可加速)。省略時,PySpark通過從數(shù)據(jù)中提取樣本來推斷相應的模式。
創(chuàng)建不輸入schema格式的DataFrame
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
創(chuàng)建帶有schema的DataFrame
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
從Pandas DataFrame創(chuàng)建
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
通過由元組列表組成的RDD創(chuàng)建
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
?以上的DataFrame格式創(chuàng)建的都是一樣的。
df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)
2.查看
DataFrame.show()
使用格式:
df.show(<int>)
df.show(1)
+---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+ only showing top 1 row
spark.sql.repl.eagerEval.enabled
spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置??刂菩袛?shù)可以使用spark.sql.repl.eagerEval.maxNumRows。
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df
?
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1)
df
?
縱向顯示
行也可以垂直顯示。當行太長而無法水平顯示時,縱向顯示就很明顯。
df.show(1, vertical=True)
-RECORD 0------------------ a | 1 b | 2.0 c | string1 d | 2000-01-01 e | 2000-01-01 12:00:00 only showing top 1 row
?查看DataFrame格式和列名
df.columns
['a', 'b', 'c', 'd', 'e']
df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)
查看統(tǒng)計描述信息
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+ |summary| a| b| c| +-------+---+---+-------+ | count| 3| 3| 3| | mean|2.0|3.0| null| | stddev|1.0|1.0| null| | min| 1|2.0|string1| | max| 3|4.0|string3| +-------+---+---+-------+
DataFrame.collect()將分布式數(shù)據(jù)收集到驅(qū)動程序端,作為Python中的本地數(shù)據(jù)。請注意,當數(shù)據(jù)集太大而無法容納在驅(qū)動端時,這可能會引發(fā)內(nèi)存不足錯誤,因為它將所有數(shù)據(jù)從執(zhí)行器收集到驅(qū)動端。
df.collect()
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)), Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
?為了避免引發(fā)內(nèi)存不足異??梢允褂肈ataFrame.take()或者是DataFrame.tail():
df.take(1)
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
df.tail(1)
[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
PySpark DataFrame轉(zhuǎn)換為Pandas DataFrame
?PySpark DataFrame還提供了到pandas DataFrame的轉(zhuǎn)換,以利用pandas API。注意,toPandas還將所有數(shù)據(jù)收集到driver端,當數(shù)據(jù)太大而無法放入driver端時,很容易導致內(nèi)存不足錯誤。
df.toPandas()
?
?3.查詢
PySpark DataFrame是惰性計算的,僅選擇一列不會觸發(fā)計算,但它會返回一個列實例:
df.a
Column<'a'>
大多數(shù)按列操作都返回列:
from pyspark.sql import Column
from pyspark.sql.functions import upper
type(df.c) == type(upper(df.c)) == type(df.c.isNull())
True
上述生成的Column可用于從DataFrame中選擇列。例如,DataFrame.select()獲取返回另一個DataFrame的列實例:
df.select(df.c).show()
+-------+ | c| +-------+ |string1| |string2| |string3| +-------+
添加新列實例:
df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+ | a| b| c| d| e|upper_c| +---+---+-------+----------+-------------------+-------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1| | 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2| | 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3| +---+---+-------+----------+-------------------+-------+
條件查詢DataFrame.filter()
df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+
?4.運算
Pandas_udf
PySpark支持各種UDF和API,允許用戶執(zhí)行Python本機函數(shù)。另請參閱最新的Pandas UDF(?Pandas UDFs)和Pandas Function API(?Pandas Function APIs)。例如,下面的示例允許用戶在Python本機函數(shù)中直接使用pandas Series中的API。
Apache Arrow in PySpark
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
# Simply plus one by using pandas Series.
return series + 1
df.select(pandas_plus_one(df.a)).show()
+------------------+ |pandas_plus_one(a)| +------------------+ | 2| | 3| | 4| +------------------+
DataFrame.mapInPandas
DataFrame.mapInPandas允許用戶在pandas DataFrame中直接使用API,而不受結(jié)果長度等任何限制。
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]
df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+
5.分組
PySpark DataFrame還提供了一種使用常見方法,即拆分-應用-合并策略來處理分組數(shù)據(jù)的方法。它根據(jù)特定條件對數(shù)據(jù)進行分組,對每個組應用一個函數(shù),然后將它們組合回DataFrame。
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
+-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 1| 10| | blue|banana| 2| 20| | red|carrot| 3| 30| | blue| grape| 4| 40| | red|carrot| 5| 50| |black|carrot| 6| 60| | red|banana| 7| 70| | red| grape| 8| 80| +-----+------+---+---+
?分組,然后將avg()函數(shù)應用于結(jié)果組。
df.groupby('color').avg().show()
+-----+-------+-------+ |color|avg(v1)|avg(v2)| +-----+-------+-------+ | red| 4.8| 48.0| | blue| 3.0| 30.0| |black| 6.0| 60.0| +-----+-------+-------+
還可以使用pandas API對每個組應用Python自定義函數(shù)。
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 0| 60| | blue|banana| -1| 20| | blue| grape| 1| 40| | red|banana| -3| 10| | red|carrot| -1| 30| | red|carrot| 0| 50| | red|banana| 2| 70| | red| grape| 3| 80| +-----+------+---+---+
?聯(lián)合分組和應用函數(shù)
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
('time', 'id', 'v1'))
df2 = spark.createDataFrame(
[(20000101, 1, 'x'), (20000101, 2, 'y')],
('time', 'id', 'v2'))
def asof_join(l, r):
return pd.merge_asof(l, r, on='time', by='id')
df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
asof_join, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ |20000101| 1|1.0| x| |20000102| 1|3.0| x| |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+
?6.獲取數(shù)據(jù)輸入/輸出
CSV簡單易用。Parquet和ORC是高效緊湊的文件格式,讀寫速度更快。
PySpark中還有許多其他可用的數(shù)據(jù)源,如JDBC、text、binaryFile、Avro等。另請參閱Apache Spark文檔中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide
CSV
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()
這里記錄一個報錯:
java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0
?將Hadoop安裝目錄下的 bin 文件夾中的 hadoop.dll 和 winutils.exe 這兩個文件拷貝到 C:\Windows\System32 下,問題解決。
+---+---+-------+----------+--------------------+ | a| b| c| d| e| +---+---+-------+----------+--------------------+ | 1|2.0|string1|2000-01-01|2000-01-01T12:00:...| | 2|3.0|string2|2000-02-01|2000-01-02T12:00:...| | 3|4.0|string3|2000-03-01|2000-01-03T12:00:...| +---+---+-------+----------+--------------------+
?Parquet
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()
+-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 6| 60| | blue|banana| 2| 20| | blue| grape| 4| 40| | red|carrot| 5| 50| | red|banana| 7| 70| | red|banana| 1| 10| | red|carrot| 3| 30| | red| grape| 8| 80| +-----+------+---+---+
?ORC
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()
+-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 7| 70| | red| grape| 8| 80| |black|carrot| 6| 60| | blue|banana| 2| 20| | red|banana| 1| 10| | red|carrot| 5| 50| | red|carrot| 3| 30| | blue| grape| 4| 40| +-----+------+---+---+
?四、結(jié)合Spark SQL
DataFrame和Spark SQL共享同一個執(zhí)行引擎,因此可以無縫地互換使用。例如,可以將數(shù)據(jù)幀注冊為表,并按如下方式輕松運行SQL:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()
+--------+ |count(1)| +--------+ | 8| +--------+
?此外UDF也可在現(xiàn)成的SQL中注冊和調(diào)用
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()
?
這些SQL表達式可以直接混合并用作PySpark列。
from pyspark.sql.functions import expr
df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()
文章來源:http://www.zghlxwxcb.cn/news/detail-801315.html
點關(guān)注,防走丟,如有紕漏之處,請留言指教,非常感謝
以上就是本期全部內(nèi)容。我是fanstuck ,有問題大家隨時留言討論 ,我們下期見。文章來源地址http://www.zghlxwxcb.cn/news/detail-801315.html
到了這里,關(guān)于PySpark數(shù)據(jù)分析基礎:PySpark基礎功能及DataFrame操作基礎語法詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!