????????????????? ??一、UDF函數定義
? ? ? ? (1)函數定義
? ? ? ? (2)Spark支持定義函數
? ? ? ? (3)定義UDF函數
? ? ? ? ? ? ? ? (4)定義返回Array類型的UDF
????????(5)定義返回字典類型的UDF
二、窗口函數
? ? ? ? (1)開窗函數簡述
? ? ? ? (2)窗口函數的語法
一、UDF函數定義
? ? ? ? (1)函數定義
????????無論Hive還是SparkSQL分析處理數據時,往往需要使用函數,SparkSQL模塊本身自帶很多實現(xiàn)公共功能的函數,在pyspark.sql.functions中。SparkSQL與Hive一樣支持定義函數:UDF和UDAF,尤其是UDF函數在實際項目中使用最為廣泛。
????????Hive中自定義函數有三種類型:
? ? ? ? 第一種:UDF(User-Defined_-function)函數
? ? ? ? ? ? ? ? · 一對一的關系,輸入一個值經過函數以后輸出一個值;
? ? ? ? ? ? ? ? · 在Hive中繼承UDF類,方法名稱為evaluate,返回值不能為void,其實就是實現(xiàn)一個方法;
? ? ? ? 第二種:UDAF(User-Defined Aggregation Function)聚合函數
? ? ? ? ? ? ? ? · 多對一的關系,輸入多個值輸出一個值,通常于groupBy聯(lián)合使用;
? ? ? ? 第三種:UDTF(User-Defined Table-Generating Functions)函數
? ? ? ? ? ? ? ? · 一對多的關系,輸入一個值輸出多個值(一行變多為行);
? ? ? ? ? ? ? ? · 用戶自定義生成函數,有點像flatMap;
? ? ? ? (2)Spark支持定義函數
????????目前來說Spark框架各個版本及各種語言對自定義函數的支持:在SparkSQL中,目前僅僅支持UDF函數和UDAF函數,目前Python僅支持UDF。
Apache Spark Version | Spark SQL UDF(Python,Java,Scala) | Spark SQL UDAF(Java,Scala) | Spark SQL UDF(R) | Hive UDF,UDAF,UDTF |
1.1-1.4 | √ | √ | ||
1.5 | √ | experimental | √ | |
1.6 | √ | √ | √ | |
2.0 | √ | √ | √ | √ |
? ? ? ? (3)定義UDF函數
? ? ? ? ①sparksession.udf.register()
? ? ? ? 注冊的UDF可以用于DSL和SQL,返回值用于DSL風格,傳參內給的名字用于SQL風格。
? ? ? ? ②pyspark.sql.functions.udf
? ? ? ? 僅能用于DSL風格。
? ? ? ? 其中F是:from pyspark.sql import functions as F。其中,被注冊為UDF的方法名是指具體的計算方法,如:def add(x, y): x + y? 。 add就是將要被注冊成UDF的方法名
# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 構建一個RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
df = rdd.toDF(['num'])
# TODO 1:方式1 sparksession.udf.register(),DSL和SQL風格均可使用
# UDF的處理函數
def num_ride_10(num):
return num * 10
# 參數1:注冊的UDF的名稱,這個UDF名稱,僅可以用于SQL風格
# 參數2:UDF的處理邏輯,是一個單獨定義的方法
# 參數3:聲明UDF的返回值類型,注意:UDF注冊時候,必要聲明返回值類型,并且UDF的真實返回值一定要和聲明的返回值一致
# 當前這種方式定義的UDF,可以通過參數1的名稱用于SQL風格,通過返回值對象用戶的DSL風格
udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())
# SQL風格中使用
# selectExpr 以SELECT的表達式執(zhí)行,表達式SQL風格的表達式(字符串)
# select方法,接受普通的字符串字段名,或者返回值時Column對象的計算
df.selectExpr('udf1(num)').show()
# DSL 風格使用
# 返回值UDF對象,如果作為方法使用,傳入的參數一定是Column對象
df.select(udf2(df['num'])).show()
# TODO 2:方式2注冊,僅能用于DSL風格
udf3 = F.udf(num_ride_10, IntegerType())
df.select(udf3(df['num'])).show()
? ? ? ? 方式1結果:
? ? ? ? 方式2結果:
? ? ? ? ? ? ? ? (4)定義返回Array類型的UDF
????????注意:數組或者list類型,可以使用spark的ArrayType來描述即可。
????????注意:聲明ArrayType要類似這樣::ArrayType(StringType()),在ArrayType中傳入數組內的數據類型。
# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 構建一個RDD
rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
df = rdd.toDF(['line'])
# 注冊UDF,UDF的執(zhí)行函數定義
def split_line(data):
return data.split(' ')
# TODO 1:方式1 后見UDF
udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))
# DLS 風格
df.select(udf2(df['line'])).show()
# SQL風格
df.createTempView('lines')
spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)
# TODO 2:方式的形式構建UDF
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)
????????
????????(5)定義返回字典類型的UDF
????????注意:字典類型返回值,可以用StructType來進行描述,StructType是—個普通的Spark支持的結構化類型.
????????只是可以用在:
? ? ? ? ? ? ? ? · DF中用于描述Schema
? ? ? ? ? ? ? ? · UDF中用于描述返回值是字典的數據
# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 假設 有三個數字: 1 2 3 在傳入數字,返回數字所在序號對應的 字母 然后和數字結合組成dict返回
# 例:傳入1 返回{'num':1, 'letters': 'a'}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(['num'])
# 注冊UDF
def process(data):
return {'num': data, 'letters': string.ascii_letters[data]}
'''
UDF返回值是字典的話,需要用StructType來接收
'''
udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
add('letters', StringType(), nullable=True))
# SQL風格
df.selectExpr('udf1(num)').show(truncate=False)
# DSL風格
df.select(udf1(df['num'])).show(truncate=False)
? ? ? ? (6)通過RDD構建UDAF函數
# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
df = rdd.map(lambda x: [x]).toDF(['num'])
# 方法:使用RDD的mapPartitions 算子來完成聚合操作
# 如果用mapPartitions API 完成UDAF聚合,一定要單分區(qū)
single_partition_rdd = df.rdd.repartition(1)
def process(iter):
sum = 0
for row in iter:
sum += row['num']
return [sum] # 一定要嵌套list,因為mapPartitions方法要求返回值是list對象
print(single_partition_rdd.mapPartitions(process).collect())
二、窗口函數
? ? ? ? (1)開窗函數簡述
????????●介紹
????????開窗函數的引入是為了既顯示聚集前的數據又顯示聚集后的數據。即在每一行的最后一列添加聚合函數的結果。 開窗用于為行定義一個窗口(這里的窗口是指運算將要操作的行的集合),它對一組值進行操作,不需要使用GROUP BY子句對數據進行分組,能夠在同一行中同時返回基礎行的列和聚合列。
????????●聚合函數和開窗函數
????????聚合函數是將多行變成一行,count,avg...
????????開窗函數是將一行變成多行;
????????聚合函數如果要顯示其他的列必須將列加入到group by中,開窗函數可以不使用group by,直接將所有信息顯示出來。
????????●開窗函數分類
????????1.聚合開窗函數 聚合函數(列)OVER(選項),這里的選項可以是PARTITION BY子句,但不可以是ORDER BY子句。
????????2.排序開窗函數 排序函數(列)OVER(選項),這里的選項可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。
????????3.分區(qū)類型NTILE的窗口函數
? ? ? ? (2)窗口函數的語法
? ? ? ? 窗口函數的語法:
# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
('張三', 'class_1', 99),
('王五', 'class_2', 35),
('王三', 'class_3', 57),
('王久', 'class_4', 12),
('王麗', 'class_5', 99),
('王娟', 'class_1', 90),
('王軍', 'class_2', 91),
('王俊', 'class_3', 33),
('王君', 'class_4', 55),
('王珺', 'class_5', 66),
('鄭穎', 'class_1', 11),
('鄭輝', 'class_2', 33),
('張麗', 'class_3', 36),
('張張', 'class_4', 79),
('黃凱', 'class_5', 90),
('黃開', 'class_1', 90),
('黃愷', 'class_2', 90),
('王凱', 'class_3', 11),
('王凱杰', 'class_1', 11),
('王開杰', 'class_2', 3),
('王景亮', 'class_3', 99)])
schema = StructType().add('name', StringType()).\
add('class', StringType()).\
add('score', IntegerType())
df = rdd.toDF(schema)
# 創(chuàng)建表
df.createTempView('stu')
# TODO 1:聚合窗口函數的演示
spark.sql('''
SELECT *, AVG(score) over() AS avg_socre FROM stu
''').show()
# TODO 2: 排序相關的窗口函數計算
# RANK over, DENSE_RANK over, ROW_NUMBER over
spark.sql('''
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS RANK
FROM stu
''').show()
# TODO NTILE
spark.sql('''
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
''').show()
? ? ? ? TODO1結果:
? ? ? ? TODO2結果展示:
? ? ? ? TODO3結果展示:文章來源:http://www.zghlxwxcb.cn/news/detail-774358.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-774358.html
到了這里,關于Spark_SQL函數定義(定義UDF函數、使用窗口函數)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!