目錄
1 定義UDF函數(shù)
?1.1? 返回值是數(shù)組類型的UDF定義
1.2 返回字典類型的UDF定義
2 窗口函數(shù)
1 定義UDF函數(shù)
目前python僅支持UDF
兩種定義方式:
1. sparksession.udf.register()
注冊(cè)的UDF可以用于DSL和SQL
返回值用于DSL風(fēng)格,傳參內(nèi)給的名字用于SQL風(fēng)格
????????方法一語(yǔ)法:
udf對(duì)象 =? sparksession.udf.register( 參數(shù)1, 參數(shù)2, 參數(shù)3)
參數(shù)1:UDF名稱,可用于SQL風(fēng)格
參數(shù)2:被注冊(cè)成UDF的方法名
參數(shù)3:聲明UDF的返回值類型
udf 對(duì)象:返回值對(duì)象,是一個(gè) UDF對(duì)象,可用于DSL風(fēng)格
2.pyspark.sql.functions.udf
僅能用于DSL風(fēng)格
? ? ? ? 方式二語(yǔ)法:
udf對(duì)象 =? F.udf( 參數(shù)1, 參數(shù)2 )
參數(shù)1:被注冊(cè)成UDF的方法名
參數(shù)2:聲明UDF的返回值類型
udf 對(duì)象:返回值對(duì)象,是一個(gè) UDF對(duì)象,可用于DSL風(fēng)格
其中F是:
from pyspark.sql import functions as F
其中,被注冊(cè)成UDF的方法名是指具體的計(jì)算方法,如:
def add( x, y ):x+y
add就是將要被注冊(cè)成UDF的方法名
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
# 構(gòu)建一個(gè)RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])
df = rdd.toDF(["num"])
def num_ride_10(num):
return num * 10
# 參數(shù)1: 注冊(cè)的UDF名稱, 這個(gè)udf名稱,僅可以用于SQL風(fēng)格
# 參數(shù)2: UDF處理邏輯,是一個(gè)單獨(dú)的方法
# 參數(shù)3: UDF的返回值類型,注意,UDF注冊(cè)時(shí)候,必須聲明返回值類型,并且UDF的真實(shí)返回值一定要和聲明返回值一樣
# 返回值對(duì)象: 這是一個(gè)UDF對(duì)象,僅可以用于DSL語(yǔ)法
# 當(dāng)前這種方式定義的UDF,可以通過(guò)參數(shù)1的名稱用于SQL風(fēng)格,通過(guò)返回值對(duì)象用的DSL風(fēng)格
udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())
# SQL風(fēng)格中使用
# selectExpr 以SELECT的表達(dá)式執(zhí)行, 表達(dá)式SQL風(fēng)格的表達(dá)式(字符串)
# select方法, 接受普通的字符串段名,或者返回值是Column 對(duì)象的計(jì)算
df.selectExpr("udf1(num)").show()
# DSL風(fēng)格中使用
# 返回值UDF對(duì)象, 如果作為方法使用,傳入的參數(shù)一定是 Column對(duì)象
df.select(udf2(df['num'])). show()
# 方式2注冊(cè), 僅能用于DSL風(fēng)格
udf_3 = F.udf(num_ride_10, IntegerType())
df.select(udf_3(df['num'])).show()
# 輸出
+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+---------+
+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+---------+
+----------------+
|num_ride_10(num)|
+----------------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+----------------+
?1.1? 返回值是數(shù)組類型的UDF定義
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
# 構(gòu)建一個(gè)RDD
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])
# 注冊(cè) UDF
def split_line(data):
return data.split(" ") # 返回值是一個(gè)數(shù)組對(duì)象
# 方式1 構(gòu)建UDF
udf1 = spark.udf.register('udf1', split_line, ArrayType(StringType()))
# DSL風(fēng)格
df.select(udf1(df["line"])).show()
# SQL風(fēng)格
df.createTempView("line")
spark.sql("SELECT udf1(line) FROM line").show(truncate=False)
# 方式2構(gòu)建
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df["line"])).show()
# 輸出
+--------------------+
| udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+
+----------------------+
|udf1(line) |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+
+--------------------+
| split_line(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+
1.2 返回字典類型的UDF定義
# coding:utf8
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
# 假設(shè) 有三個(gè)數(shù)字 1,2,3 我們傳入數(shù)字, 返回?cái)?shù)字所在序號(hào)對(duì)應(yīng)的 字母 然后和數(shù)字結(jié)合形成dict返回
# 比如傳入 1 我們返回 {“num" : 1, "letters": "a"}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])
def process(data):
return {"num":data, "letters": string.ascii_letters[data]}
# UDF的返回值是字典的話,需要用 StructType來(lái)接受
udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
add("letters", StringType(), nullable=True))
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)
# 輸出結(jié)果
+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+
+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+
2 窗口函數(shù)
????????開(kāi)窗函數(shù)的引入是為了既顯示聚集前的數(shù)據(jù),又顯示聚集后的數(shù)據(jù)。即在每一行的最后一列添加聚合函數(shù)的結(jié)果。開(kāi)窗用于為定義一個(gè)窗口(這里的窗口是指運(yùn)算將要操作的行的集合),它對(duì)一組值進(jìn)行操作,不需要使用GROUP BY 子句 對(duì)數(shù)據(jù)進(jìn)行分組,能夠在同一行中同時(shí)返回基礎(chǔ)行的列和聚合列
? ? ? ? 聚合函數(shù) 和 開(kāi)窗函數(shù)
聚合函數(shù)是將多行變成一行,count,acg....
開(kāi)窗函數(shù)是將一行變成多行
聚合函數(shù)如果要顯示其他的列必須將列加入到 group by中
開(kāi)窗函數(shù)可以不使用group by, 直接將所有信息顯示出來(lái)
? ? ? ? 開(kāi)窗函數(shù)分類
1.聚合開(kāi)窗函數(shù)
聚合函數(shù)(列)OVER(選項(xiàng)),這里的選項(xiàng)可以是PARTITION BY 子句, 但不可以是 ORDER BY 句子
2.排序開(kāi)窗函數(shù)
排序函數(shù)(列)OVER(選項(xiàng)),這里的選項(xiàng)可以是ORDER BY 子句 ,也可以是OVER(?PARTITION BY子句?ORDER BY 子句 ?),但不可以是PARTITION BY 子句文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-463143.html
3.分區(qū)類型NTIL的窗口函數(shù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-463143.html
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
("張三", "class_1", 99),
("張1", "class_2", 19),
("張2", "class_2", 19),
("張3", "class_2", 29),
("張4", "class_3", 49),
("張5", "class_3", 12),
("張6", "class_3", 14),
("張7", "class_3", 32),
("張8", "class_3", 22),
("張22", "class_4", 24),
("張11", "class_4", 49)
])
schema = StructType().add("name", StringType()).\
add("class", StringType()).\
add("score", IntegerType())
df = rdd.toDF(schema=schema)
df.createTempView("stu")
# 聚合窗口函數(shù) 的展示
spark.sql(
"""
SELECT *, avg(score) OVER() AS avg_score FROM stu
"""
).show()
# 排序相關(guān)的 窗口函數(shù)計(jì)算
# RANK over ,DENSE_RANK over ROW_NUMBER over
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(order by score) AS rank
FROM stu
""").show()
# NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) from stu
""").show()
# 輸出
+----+-------+-----+-----------------+
|name| class|score| avg_score|
+----+-------+-----+-----------------+
|張三|class_1| 99|33.45454545454545|
| 張1|class_2| 19|33.45454545454545|
| 張2|class_2| 19|33.45454545454545|
| 張3|class_2| 29|33.45454545454545|
| 張4|class_3| 49|33.45454545454545|
| 張5|class_3| 12|33.45454545454545|
| 張6|class_3| 14|33.45454545454545|
| 張7|class_3| 32|33.45454545454545|
| 張8|class_3| 22|33.45454545454545|
|張22|class_4| 24|33.45454545454545|
|張11|class_4| 49|33.45454545454545|
+----+-------+-----+-----------------+
+----+-------+-----+---------------+----------+----+
|name| class|score|row_number_rank|dense_rank|rank|
+----+-------+-----+---------------+----------+----+
|張三|class_1| 99| 1| 1| 11|
| 張3|class_2| 29| 5| 1| 7|
| 張1|class_2| 19| 8| 2| 3|
| 張2|class_2| 19| 9| 2| 3|
| 張4|class_3| 49| 2| 1| 9|
| 張7|class_3| 32| 4| 2| 8|
| 張8|class_3| 22| 7| 3| 5|
| 張6|class_3| 14| 10| 4| 2|
| 張5|class_3| 12| 11| 5| 1|
|張11|class_4| 49| 3| 1| 9|
|張22|class_4| 24| 6| 2| 6|
+----+-------+-----+---------------+----------+----+
+----+-------+-----+-----------------------------------------------------------------------------------------------+
|name| class|score|ntile(6) OVER (ORDER BY score DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+----+-------+-----+-----------------------------------------------------------------------------------------------+
|張三|class_1| 99| 1|
| 張4|class_3| 49| 1|
|張11|class_4| 49| 2|
| 張7|class_3| 32| 2|
| 張3|class_2| 29| 3|
|張22|class_4| 24| 3|
| 張8|class_3| 22| 4|
| 張1|class_2| 19| 4|
| 張2|class_2| 19| 5|
| 張6|class_3| 14| 5|
| 張5|class_3| 12| 6|
+----+-------+-----+-----------------------------------------------------------------------------------------------+
到了這里,關(guān)于SparkSQL函數(shù)定義——UDF函數(shù),窗口函數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!