SparkSQL 定義UDF函數(shù)
方式1語(yǔ)法:
udf對(duì)象 = sparksession.udf.register(參數(shù)1,參數(shù)2,參數(shù)3)
參數(shù)1:UDF名稱(chēng),可用于SQL風(fēng)格
參數(shù)2:被注冊(cè)成UDF的方法名
參數(shù)3:聲明UDF的返回值類(lèi)型
udf對(duì)象: 返回值對(duì)象,是一個(gè)UDF對(duì)象,可用于DSL風(fēng)格
方式2語(yǔ)法:
udf對(duì)象 = F.udf(參數(shù)1, 參數(shù)2)
參數(shù)1:被注冊(cè)成UDF的方法名
參數(shù)2:聲明UDF的返回值類(lèi)型
udf對(duì)象: 返回值對(duì)象,是一個(gè)UDF對(duì)象,可用于DSL風(fēng)格
其中F是:
from pyspark.sql import functions as F
其中,被注冊(cè)成UDF的方法名是指具體的計(jì)算方法,如:
def add(x, y): x + y
add就是將要被注冊(cè)成UDF的方法名
??
# coding:utf8
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 構(gòu)建一個(gè)RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
df = rdd.toDF(["num"])
# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL風(fēng)格均可以使用
# UDF的處理函數(shù)
def num_ride_10(num):
return num * 10
# 參數(shù)1: 注冊(cè)的UDF的名稱(chēng), 這個(gè)udf名稱(chēng), 僅可以用于 SQL風(fēng)格
# 參數(shù)2: UDF的處理邏輯, 是一個(gè)單獨(dú)的方法
# 參數(shù)3: 聲明UDF的返回值類(lèi)型, 注意: UDF注冊(cè)時(shí)候, 必須聲明返回值類(lèi)型, 并且UDF的真實(shí)返回值一定要和聲明的返回值一致
# 返回值對(duì)象: 這是一個(gè)UDF對(duì)象, 僅可以用于 DSL 語(yǔ)法
# 當(dāng)前這種方式定義的UDF, 可以通過(guò)參數(shù)1的名稱(chēng)用于SQL風(fēng)格, 通過(guò)返回值對(duì)象用戶(hù)DSL風(fēng)格
udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())
# SQL風(fēng)格中使用
# selectExpr 以SELECT的表達(dá)式執(zhí)行, 表達(dá)式 SQL風(fēng)格的表達(dá)式(字符串)
# select方法, 接受普通的字符串字段名, 或者返回值是Column對(duì)象的計(jì)算
df.selectExpr("udf1(num)").show()
# DSL 風(fēng)格中使用
# 返回值UDF對(duì)象 如果作為方法使用, 傳入的參數(shù) 一定是Column對(duì)象
df.select(udf2(df['num'])).show()
# TODO 2: 方式2注冊(cè), 僅能用于DSL風(fēng)格
udf3 = F.udf(num_ride_10, IntegerType())
df.select(udf3(df['num'])).show()
df.selectExpr("udf3(num)").show()
?
# coding:utf8
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 構(gòu)建一個(gè)RDD
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])
# 注冊(cè)UDF, UDF的執(zhí)行函數(shù)定義
def split_line(data):
return data.split(" ") # 返回值是一個(gè)Array對(duì)象
# TODO1 方式1 構(gòu)建UDF
udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))
# DLS風(fēng)格
df.select(udf2(df['line'])).show()
# SQL風(fēng)格
df.createTempView("lines")
spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)
# TODO 2 方式2的形式構(gòu)建UDF
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)
?
# coding:utf8
import string
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 假設(shè) 有三個(gè)數(shù)字 1 2 3 我們傳入數(shù)字 ,返回?cái)?shù)字所在序號(hào)對(duì)應(yīng)的 字母 然后和數(shù)字結(jié)合形成dict返回
# 比如傳入1 我們返回 {"num":1, "letters": "a"}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])
# 注冊(cè)UDF
def process(data):
return {"num": data, "letters": string.ascii_letters[data]}
"""
UDF的返回值是字典的話(huà), 需要用StructType來(lái)接收
"""
udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
add("letters", StringType(), nullable=True))
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)
??
?SparkSQL 使用窗口函數(shù)
# coding:utf8
# 演示sparksql 窗口函數(shù)(開(kāi)窗函數(shù))
import string
from pyspark.sql import SparkSession
# 導(dǎo)入StructType對(duì)象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder. \
appName("create df"). \
master("local[*]"). \
config("spark.sql.shuffle.partitions", "2"). \
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
('張三', 'class_1', 99),
('王五', 'class_2', 35),
('王三', 'class_3', 57),
('王久', 'class_4', 12),
('王麗', 'class_5', 99),
('王娟', 'class_1', 90),
('王軍', 'class_2', 91),
('王俊', 'class_3', 33),
('王君', 'class_4', 55),
('王珺', 'class_5', 66),
('鄭穎', 'class_1', 11),
('鄭輝', 'class_2', 33),
('張麗', 'class_3', 36),
('張張', 'class_4', 79),
('黃凱', 'class_5', 90),
('黃開(kāi)', 'class_1', 90),
('黃愷', 'class_2', 90),
('王凱', 'class_3', 11),
('王凱杰', 'class_1', 11),
('王開(kāi)杰', 'class_2', 3),
('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \
add("class", StringType()). \
add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函數(shù)只用于SQL風(fēng)格, 所以注冊(cè)表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 兩個(gè)SQL的結(jié)果集進(jìn)行整合而來(lái)
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 兩個(gè)SQL的結(jié)果集進(jìn)行整合而來(lái)
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()
SparkSQL支持UDF和UDAF定義,但在Python中,暫時(shí)只能定義UDF
UDF定義支持2種方式, 1:使用SparkSession對(duì)象構(gòu)建. 2: 使用functions包中提供的UDF API構(gòu)建. 要注意, 方式1可用DSL和SQL風(fēng)格, 方式2 僅可用于DSL風(fēng)格
SparkSQL支持窗口函數(shù)使用, 常用SQL中的窗口函數(shù)均支持, 如聚合窗口\排序窗口\NTILE分組窗口等文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-674732.html
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-674732.html
到了這里,關(guān)于Spark 7:Spark SQL 函數(shù)定義的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!