目錄
窗口函數(shù)
SQL函數(shù)分類
Spark原生自定義UDF函數(shù)
Pandas的UDF函數(shù)
Apache Arrow框架基本介紹
基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)
基于Pandas完成UDF函數(shù)
?自定義UDF函數(shù)
自定義UDAF函數(shù)
窗口函數(shù)
分析函數(shù) over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])
分析函數(shù)可以大致分成如下3類:
1- 第一類: 聚合函數(shù) sum() count() avg() max() min()
2- 第二類: row_number() rank() dense_rank() ntile()
3- 第三類: first_value() last_value() lead() lag()
在Spark SQL中使用窗口函數(shù)案例:
需求是找出每個(gè)cookie中pv排在前3位的數(shù)據(jù),也就是分組取TOPN問題
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對象
spark = SparkSession.builder\
.config('spark.sql.shuffle.partitions',1)\
.appName('sparksql_win_function')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.read.csv(
path='file:///export/data/gz16_pyspark/02_spark_sql/data/cookie.txt',
schema='cookie string,datestr string,pv int',
sep=',',
encoding='UTF-8'
)
init_df.createTempView('win_data')
init_df.show()
init_df.printSchema()
# 3- 數(shù)據(jù)處理
# SQL
spark.sql("""
select
cookie,datestr,pv
from (
select
cookie,datestr,pv,
row_number() over (partition by cookie order by pv desc) as rn
from win_data
) tmp where rn<=3
""").show()
# DSL
"""
select:注意點(diǎn),結(jié)果中需要看到哪幾個(gè)字段,就要明確寫出來
"""
init_df.select(
"cookie","datestr","pv",
F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')
).where('rn<=3').select("cookie","datestr","pv").show()
# 4- 數(shù)據(jù)輸出
# 5- 釋放資源
spark.stop()
SQL函數(shù)分類
SQL函數(shù),主要分為以下三大類:
-
UDF函數(shù):用戶自定義函數(shù)
-
特點(diǎn):一對一,輸入一個(gè)得到一個(gè)
-
例如:split() substr()
-
-
UDAF函數(shù):用戶自定義聚合函數(shù)
-
特點(diǎn):多對一,輸入多個(gè)得到一個(gè)
-
例如:sum() avg() count() min()
-
-
UDTF函數(shù):用戶自定義表數(shù)據(jù)生成函數(shù)
-
特點(diǎn):一對多,輸入一個(gè)得到多個(gè)
-
例如:explode()
-
在SQL中提供的所有的內(nèi)置函數(shù),都是屬于以上三類中某一類函數(shù)
思考:有這么多的內(nèi)置函數(shù),為啥還需要自定義函數(shù)呢?
為了擴(kuò)充函數(shù)功能。在實(shí)際使用中,并不能保證所有的操作函數(shù)都已經(jīng)提前的內(nèi)置好了。很多基于業(yè)務(wù)處理的功能,其實(shí)并沒有提供對應(yīng)的函數(shù),提供的函數(shù)更多是以公共功能函數(shù)。此時(shí)需要進(jìn)行自定義,來擴(kuò)充新的功能函數(shù)
1- SparkSQL原生的時(shí)候,Python只能開發(fā)UDF函數(shù)
2- SparkSQL借助其他第三方組件,Python可以開發(fā)UDF、UDAF函數(shù)在Spark SQL中,針對Python語言,對于自定義函數(shù),原生支持的并不是特別好。目前原生僅支持自定義UDF函數(shù),而無法自定義UDAF函數(shù)和UDTF函數(shù)。
在1.6版本后,Java 和scala語言支持自定義UDAF函數(shù),但Python并不支持。
Spark SQL原生存在的問題:大量的序列化和反序列
?雖然Python支持自定義UDF函數(shù),但是其效率并不是特別的高效。因?yàn)樵谑褂玫臅r(shí)候,傳遞一行處理一行,返回一行的方式。這樣會帶來非常大的序列化的開銷的問題,導(dǎo)致原生UDF函數(shù)效率不好
?? ?
早期解決方案: 基于Java/Scala來編寫自定義UDF函數(shù),然后基于python調(diào)用即可
?? ?
目前主要的解決方案: 引入Arrow框架,可以基于內(nèi)存來完成數(shù)據(jù)傳輸工作,可以大大的降低了序列化的開銷,提供傳輸?shù)男剩鉀Q原生的問題。同時(shí)還可以基于pandas的自定義函數(shù),利用pandas的函數(shù)優(yōu)勢完成各種處理操作
Spark原生自定義UDF函數(shù)
?自定義函數(shù)流程:
第一步: 在PySpark中創(chuàng)建一個(gè)Python的函數(shù),在這個(gè)函數(shù)中書寫自定義的功能邏輯代碼即可
第二步: 將Python函數(shù)注冊到Spark SQL中
?? ?注冊方式一: udf對象 = sparkSession.udf.register(參數(shù)1,參數(shù)2,參數(shù)3)
?? ??? ?參數(shù)1: 【UDF函數(shù)名稱】,此名稱用于后續(xù)在SQL中使用,可以任意取值,但是要符合名稱的規(guī)范
?? ??? ?參數(shù)2: 【自定義的Python函數(shù)】,表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)3: 【UDF函數(shù)的返回值類型】。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象,可以在DSL中使用
?? ?
?? ??? ?說明: 如果通過方式一來注冊函數(shù), 【可以用在SQL和DSL】
?? ?
?? ?注冊方式二:? udf對象 = F.udf(參數(shù)1,參數(shù)2)
?? ??? ?參數(shù)1: Python函數(shù)的名稱,表示將那個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)2: 返回值的類型。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象,可以在DSL中使用
?? ??? ?
?? ??? ?說明: 如果通過方式二來注冊函數(shù),【僅能用在DSL中】
?? ??? ?
?? ?注冊方式三:? 語法糖寫法? @F.udf(returnType=返回值類型)? 放置到對應(yīng)Python的函數(shù)上面
?? ??? ?說明: 實(shí)際是方式二的擴(kuò)展。如果通過方式三來注冊函數(shù),【僅能用在DSL中】
?? ?
?? ??? ?
第三步: 在Spark SQL的 DSL/ SQL 中進(jìn)行使用即可
# 自定義一個(gè)函數(shù),完成對數(shù)據(jù)統(tǒng)一添加一個(gè)后綴名的操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
from pyspark.sql.types import StringType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("請自定義一個(gè)函數(shù),完成對數(shù)據(jù)統(tǒng)一添加一個(gè)后綴名的操作_itheima")
# 1- 創(chuàng)建SparkSession對象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('sparksql_udf_basetype')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.createDataFrame(
data=[(1,'張三','廣州'),(2,'李四','深圳')],
schema='id int,name string,address string'
)
init_df.printSchema()
init_df.show()
init_df.createTempView('tmp')
# 3- 數(shù)據(jù)處理
# 3.1- 創(chuàng)建自定義的Python函數(shù)
def add_suffix(address):
return address + "_itheima"
# 3.2- 將Python函數(shù)注冊到Spark SQL
# 注冊方式一
dsl_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())
# 3.3- 在SQL/DSL中調(diào)用
# SQL
spark.sql("""
select
id,name,address,
sql_add_suffix(address) as new_address
from tmp
""").show()
# DSL
init_df.select(
"id",
"name",
"address",
dsl_add_suffix("address").alias("new_address")
).show()
print("-"*30)
# 在錯(cuò)誤的地方調(diào)用了錯(cuò)誤的函數(shù)。spark.udf.register參數(shù)1取的函數(shù)名只能在SQL中使用,不能在DSL中用。
# spark.sql("""
# select
# id,name,address,
# dsl_add_suffix(address) as new_address
# from tmp
# """).show()
# 注冊方式二:UDF返回值類型傳值方式一
dsl2_add_suffix = F.udf(add_suffix,StringType())
# DSL
init_df.select(
"id",
"name",
"address",
dsl2_add_suffix("address").alias("new_address")
).show()
# 注冊方式二:UDF返回值類型傳值方式二
dsl3_add_suffix = F.udf(add_suffix, 'string')
# DSL
init_df.select(
"id",
"name",
"address",
dsl3_add_suffix("address").alias("new_address")
).show()
# 注冊方式三:語法糖/裝飾器
@F.udf(returnType=StringType())
def add_suffix_candy(address):
return address + "_itheima"
# DSL
init_df.select(
"id",
"name",
"address",
add_suffix_candy("address").alias("new_address")
).show()
# 4- 數(shù)據(jù)輸出
# 5- 釋放資源
spark.stop()
Pandas的UDF函數(shù)
Apache Arrow框架基本介紹
Apache Arrow是Apache旗下的一款頂級的項(xiàng)目。是一個(gè)跨平臺的在內(nèi)存中以列式存儲的數(shù)據(jù)層,它的設(shè)計(jì)目標(biāo)就是作為一個(gè)跨平臺的數(shù)據(jù)層,來加快大數(shù)據(jù)分析項(xiàng)目的運(yùn)行效率
Pandas 與 Spark SQL 進(jìn)行交互的時(shí)候,建立在Apache Arrow上,帶來低開銷 高性能的UDF函數(shù)
Arrow并不會自動使用,在某些情況下,需要配置 以及在代碼中需要進(jìn)行小的更改才可以使用
如何安裝? 三個(gè)節(jié)點(diǎn)建議都安裝
檢查服務(wù)器上是否有安裝pyspark
pip list | grep pyspark? 或者 conda list | grep pyspark如果服務(wù)器已經(jīng)安裝了pyspark的庫,那么僅需要執(zhí)行以下內(nèi)容,即可安裝。例如在 node1安裝
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
?? ?
如果服務(wù)器中python環(huán)境中沒有安裝pyspark,建議執(zhí)行以下操作,即可安裝。例如在 node2 和 node3安裝
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==10.0.0?
如何使用呢? 默認(rèn)不會自動啟動的, 一般建議手動配置
sparkSession.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)
基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)
使用場景:
1- Spark的DataFrame -> Pandas的DataFrame:當(dāng)大數(shù)據(jù)處理到后期的時(shí)候,可能數(shù)據(jù)量會越來越少,這樣可以考慮使用單機(jī)版的Pandas來做后續(xù)數(shù)據(jù)的分析
2- Pandas的DataFrame -> Spark的DataFrame:當(dāng)數(shù)據(jù)量達(dá)到單機(jī)無法高效處理的時(shí)候,或者需要和其他大數(shù)據(jù)框架集成的時(shí)候,可以轉(zhuǎn)成Spark中的DataFrame
總結(jié):
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)")
# 1- 創(chuàng)建SparkSession對象
spark = SparkSession.builder\
.appName('dataframe')\
.master('local[*]')\
.getOrCreate()
# 手動開啟Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 數(shù)據(jù)輸入
init_df = spark.createDataFrame(
data=[(1, '張三', '廣州'), (2, '李四', '深圳')],
schema='id int,name string,address string'
)
# 3- 數(shù)據(jù)處理
# sparksql dataframe -> pandas dataframe
pd_df = init_df.toPandas()
print(type(pd_df),pd_df)
new_pd_df = pd_df[pd_df['id']==2]
# pandas dataframe -> sparksql dataframe
spark_df = spark.createDataFrame(data=new_pd_df)
spark_df.show()
spark_df.printSchema()
# 4- 數(shù)據(jù)輸出
# 5- 釋放資源
spark.stop()
基于Pandas完成UDF函數(shù)
基于Pandas的UDF函數(shù)來轉(zhuǎn)換為Spark SQL的UDF函數(shù)進(jìn)行使用。底層是基于Arrow框架來完成數(shù)據(jù)傳輸,允許向量化(可以充分利用計(jì)算機(jī)CPU性能)操作。
Pandas的UDF函數(shù)其實(shí)本質(zhì)上就是Python的函數(shù),只不過函數(shù)的傳入數(shù)據(jù)類型為Pandas的類型
基于Pandas的UDF可以使用自定義UDF函數(shù)和自定義UDAF函數(shù)
自定義函數(shù)流程:
第一步: 在PySpark中創(chuàng)建一個(gè)Python的函數(shù),在這個(gè)函數(shù)中書寫自定義的功能邏輯代碼即可
第二步: 將Python函數(shù)包裝成Spark SQL的函數(shù)
?? ?注冊方式一: udf對象 = spark.udf.register(參數(shù)1, 參數(shù)2)
?? ??? ?參數(shù)1: UDF函數(shù)名稱。此名稱用于后續(xù)在SQL中使用,可以任意取值,但是要符合名稱的規(guī)范
?? ??? ?參數(shù)2: Python函數(shù)的名稱。表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?使用: udf對象只能在DSL中使用。參數(shù)1指定的名稱只能在SQL中使用
?? ??? ?注意: 如果編寫的是UDAF函數(shù),那么注冊方式一需要配合注冊方式三,一起使用
?? ??? ?
?? ?注冊方式二: udf對象 = F.pandas_udf(參數(shù)1, 參數(shù)2)
?? ??? ?參數(shù)1: 自定義的Python函數(shù)。表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)2: UDF函數(shù)的返回值類型。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型對應(yīng)到Spark SQL的數(shù)據(jù)類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象。僅能用在DSL中使用
?? ?
?? ?注冊方式三: 語法糖寫法? @F.pandas_udf(returnType=返回值Spark SQL的數(shù)據(jù)類型)? 放置到對應(yīng)Python的函數(shù)上面
?? ??? ?說明: 實(shí)際是方式一的擴(kuò)展。僅能用在DSL中使用
?? ?
?? ?
第三步: 在Spark SQL的 DSL/ SQL 中進(jìn)行使用即可
?自定義UDF函數(shù)
自定義Python函數(shù)的要求:SeriesToSeries
表示:第一步中創(chuàng)建自定義Python函數(shù)的時(shí)候,輸入?yún)?shù)的類型和返回值類型必須都是Pandas中的Series類型
需求:完成a列和b列的求和計(jì)算操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
from pyspark.sql.types import IntegerType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對象
spark = SparkSession.builder\
.appName('pandas_udf')\
.master('local[*]')\
.getOrCreate()
# 手動開啟Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 數(shù)據(jù)輸入
init_df = spark.createDataFrame(
data=[(1,2),(2,3),(3,4)],
schema='num1 int,num2 int'
)
init_df.createTempView('tmp')
# 3- 數(shù)據(jù)處理
# 3.1- 自定義Python函數(shù)
"""
1- num1:pd.Series用來限定輸入的參數(shù)類型是Pandas中的Series對象
2- -> pd.Series用來限定返回值類型是Pandas中的Series對象
"""
def my_sum(num1:pd.Series, num2:pd.Series) -> pd.Series:
return num1+num2
# 3.2- 注冊進(jìn)SparkSQL。注冊方式一
dsl_my_sum = spark.udf.register('sql_my_sum',my_sum)
# 3.3- 使用
# SQL
spark.sql("""
select
num1,num2,
sql_my_sum(num1,num2) as result
from tmp
""").show()
# DSL
init_df.select(
"num1",
"num2",
dsl_my_sum("num1", "num2").alias("result")
).show()
# 注冊方式二
dsl2_my_sum = F.pandas_udf(my_sum,IntegerType())
# DSL
init_df.select(
"num1",
"num2",
dsl2_my_sum("num1", "num2").alias("result")
).show()
# 注冊方式三
@F.pandas_udf(IntegerType())
def my_sum_candy(num1:pd.Series, num2:pd.Series) -> pd.Series:
return num1+num2
# DSL
init_df.select(
"num1",
"num2",
my_sum_candy("num1", "num2").alias("result")
).show()
# 4- 數(shù)據(jù)輸出
# 5- 釋放資源
spark.stop()
自定義UDAF函數(shù)
自定義Python函數(shù)的要求:Series To 標(biāo)量
表示:自定義函數(shù)的輸入數(shù)據(jù)類型是Pandas中的Series對象,返回值數(shù)據(jù)類型是標(biāo)量數(shù)據(jù)類型。也就是Python中的數(shù)據(jù)類型,例如:int、float、bool、list....文章來源:http://www.zghlxwxcb.cn/news/detail-813273.html
需求:對某一列數(shù)據(jù)計(jì)算平均值的操作文章來源地址http://www.zghlxwxcb.cn/news/detail-813273.html
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
from pyspark.sql.types import IntegerType, FloatType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對象
spark = SparkSession.builder\
.appName('pandas_udaf')\
.master('local[*]')\
.getOrCreate()
# 手動開啟Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 數(shù)據(jù)輸入
init_df = spark.createDataFrame(
data=[(1,2),(2,3),(3,3)],
schema='num1 int,num2 int'
)
init_df.createTempView('tmp')
# 3- 數(shù)據(jù)處理
# 3.1- 自定義Python函數(shù)
"""
UDAF對自定義Python函數(shù)的要求:輸入數(shù)據(jù)的類型必須是Pandas中的Series對象,返回值類型必須是Python中的標(biāo)量數(shù)據(jù)類型
"""
@F.pandas_udf(returnType=FloatType())
def my_avg(num2_col:pd.Series) -> float:
print(type(num2_col))
print(num2_col)
# 計(jì)算平均值
return num2_col.mean()
# 3.2- 注冊進(jìn)SparkSQL。注冊方式一
dsl_my_avg = spark.udf.register('sql_my_avg',my_avg)
# 3.3- 使用
# SQL
spark.sql("""
select
sql_my_avg(num2) as result
from tmp
""").show()
# DSL
init_df.select(dsl_my_avg("num2").alias("result")).show()
# 4- 數(shù)據(jù)輸出
# 5- 釋放資源
spark.stop()
到了這里,關(guān)于Spark SQL函數(shù)定義的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!