国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SparkSQL函數(shù)定義——UDF函數(shù),窗口函數(shù)

這篇具有很好參考價(jià)值的文章主要介紹了SparkSQL函數(shù)定義——UDF函數(shù),窗口函數(shù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

1 定義UDF函數(shù)

?1.1? 返回值是數(shù)組類型的UDF定義

1.2 返回字典類型的UDF定義

2 窗口函數(shù)


1 定義UDF函數(shù)

目前python僅支持UDF

兩種定義方式:

1. sparksession.udf.register()

注冊(cè)的UDF可以用于DSL和SQL

返回值用于DSL風(fēng)格,傳參內(nèi)給的名字用于SQL風(fēng)格

????????方法一語(yǔ)法:

udf對(duì)象 =? sparksession.udf.register( 參數(shù)1, 參數(shù)2, 參數(shù)3)

參數(shù)1:UDF名稱,可用于SQL風(fēng)格

參數(shù)2:被注冊(cè)成UDF的方法名

參數(shù)3:聲明UDF的返回值類型

udf 對(duì)象:返回值對(duì)象,是一個(gè) UDF對(duì)象,可用于DSL風(fēng)格

2.pyspark.sql.functions.udf

僅能用于DSL風(fēng)格

? ? ? ? 方式二語(yǔ)法:

udf對(duì)象 =? F.udf( 參數(shù)1, 參數(shù)2 )

參數(shù)1:被注冊(cè)成UDF的方法名

參數(shù)2:聲明UDF的返回值類型

udf 對(duì)象:返回值對(duì)象,是一個(gè) UDF對(duì)象,可用于DSL風(fēng)格

其中F是:

from pyspark.sql import functions as F

其中,被注冊(cè)成UDF的方法名是指具體的計(jì)算方法,如:

def add( x, y ):x+y

add就是將要被注冊(cè)成UDF的方法名

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 構(gòu)建一個(gè)RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])
    df = rdd.toDF(["num"])

    def num_ride_10(num):
        return num * 10

    # 參數(shù)1: 注冊(cè)的UDF名稱, 這個(gè)udf名稱,僅可以用于SQL風(fēng)格
    # 參數(shù)2: UDF處理邏輯,是一個(gè)單獨(dú)的方法
    # 參數(shù)3: UDF的返回值類型,注意,UDF注冊(cè)時(shí)候,必須聲明返回值類型,并且UDF的真實(shí)返回值一定要和聲明返回值一樣
    # 返回值對(duì)象: 這是一個(gè)UDF對(duì)象,僅可以用于DSL語(yǔ)法
    # 當(dāng)前這種方式定義的UDF,可以通過(guò)參數(shù)1的名稱用于SQL風(fēng)格,通過(guò)返回值對(duì)象用的DSL風(fēng)格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL風(fēng)格中使用
    # selectExpr 以SELECT的表達(dá)式執(zhí)行, 表達(dá)式SQL風(fēng)格的表達(dá)式(字符串)
    # select方法, 接受普通的字符串段名,或者返回值是Column 對(duì)象的計(jì)算
    df.selectExpr("udf1(num)").show()

    # DSL風(fēng)格中使用
    # 返回值UDF對(duì)象, 如果作為方法使用,傳入的參數(shù)一定是 Column對(duì)象
    df.select(udf2(df['num'])). show()

    # 方式2注冊(cè), 僅能用于DSL風(fēng)格
    udf_3 = F.udf(num_ride_10, IntegerType())
    df.select(udf_3(df['num'])).show()

# 輸出
+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+----------------+
|num_ride_10(num)|
+----------------+
|              10|
|              20|
|              30|
|              40|
|              50|
|              60|
|              70|
+----------------+

?1.1? 返回值是數(shù)組類型的UDF定義

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 構(gòu)建一個(gè)RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注冊(cè) UDF
    def split_line(data):
        return data.split(" ")  # 返回值是一個(gè)數(shù)組對(duì)象

    # 方式1 構(gòu)建UDF
    udf1 = spark.udf.register('udf1', split_line, ArrayType(StringType()))
    # DSL風(fēng)格
    df.select(udf1(df["line"])).show()

    # SQL風(fēng)格
    df.createTempView("line")
    spark.sql("SELECT udf1(line) FROM line").show(truncate=False)

    # 方式2構(gòu)建
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df["line"])).show()

# 輸出
+--------------------+
|          udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

+----------------------+
|udf1(line)            |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

+--------------------+
|    split_line(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

1.2 返回字典類型的UDF定義

# coding:utf8
import string

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 假設(shè) 有三個(gè)數(shù)字 1,2,3 我們傳入數(shù)字, 返回?cái)?shù)字所在序號(hào)對(duì)應(yīng)的 字母 然后和數(shù)字結(jié)合形成dict返回
    # 比如傳入 1 我們返回 {“num" : 1, "letters": "a"}

    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    def process(data):
        return {"num":data, "letters": string.ascii_letters[data]}


    # UDF的返回值是字典的話,需要用 StructType來(lái)接受

    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                             add("letters", StringType(), nullable=True))
    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

# 輸出結(jié)果
+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+

+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+

2 窗口函數(shù)

????????開(kāi)窗函數(shù)的引入是為了既顯示聚集前的數(shù)據(jù),又顯示聚集后的數(shù)據(jù)。即在每一行的最后一列添加聚合函數(shù)的結(jié)果。開(kāi)窗用于為定義一個(gè)窗口(這里的窗口是指運(yùn)算將要操作的行的集合),它對(duì)一組值進(jìn)行操作,不需要使用GROUP BY 子句 對(duì)數(shù)據(jù)進(jìn)行分組,能夠在同一行中同時(shí)返回基礎(chǔ)行的列和聚合列

? ? ? ? 聚合函數(shù) 和 開(kāi)窗函數(shù)

聚合函數(shù)是將多行變成一行,count,acg....

開(kāi)窗函數(shù)是將一行變成多行

聚合函數(shù)如果要顯示其他的列必須將列加入到 group by中

開(kāi)窗函數(shù)可以不使用group by, 直接將所有信息顯示出來(lái)

? ? ? ? 開(kāi)窗函數(shù)分類

1.聚合開(kāi)窗函數(shù)

聚合函數(shù)(列)OVER(選項(xiàng)),這里的選項(xiàng)可以是PARTITION BY 子句, 但不可以是 ORDER BY 句子

2.排序開(kāi)窗函數(shù)

排序函數(shù)(列)OVER(選項(xiàng)),這里的選項(xiàng)可以是ORDER BY 子句 ,也可以是OVER(?PARTITION BY子句?ORDER BY 子句 ?),但不可以是PARTITION BY 子句

3.分區(qū)類型NTIL的窗口函數(shù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-463143.html

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ("張三", "class_1", 99),
        ("張1", "class_2", 19),
        ("張2", "class_2", 19),
        ("張3", "class_2", 29),
        ("張4", "class_3", 49),
        ("張5", "class_3", 12),
        ("張6", "class_3", 14),
        ("張7", "class_3", 32),
        ("張8", "class_3", 22),
        ("張22", "class_4", 24),
        ("張11", "class_4", 49)
    ])
    schema = StructType().add("name", StringType()).\
        add("class", StringType()).\
        add("score", IntegerType())

    df = rdd.toDF(schema=schema)

    df.createTempView("stu")

    # 聚合窗口函數(shù) 的展示
    spark.sql(
        """
            SELECT *, avg(score) OVER() AS avg_score FROM stu
        """
    ).show()

    # 排序相關(guān)的 窗口函數(shù)計(jì)算
    # RANK over ,DENSE_RANK over  ROW_NUMBER over
    spark.sql("""
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(order by score) AS rank
        FROM stu
    """).show()

    # NTILE
    spark.sql("""
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) from stu
    """).show()
# 輸出

+----+-------+-----+-----------------+
|name|  class|score|        avg_score|
+----+-------+-----+-----------------+
|張三|class_1|   99|33.45454545454545|
| 張1|class_2|   19|33.45454545454545|
| 張2|class_2|   19|33.45454545454545|
| 張3|class_2|   29|33.45454545454545|
| 張4|class_3|   49|33.45454545454545|
| 張5|class_3|   12|33.45454545454545|
| 張6|class_3|   14|33.45454545454545|
| 張7|class_3|   32|33.45454545454545|
| 張8|class_3|   22|33.45454545454545|
|張22|class_4|   24|33.45454545454545|
|張11|class_4|   49|33.45454545454545|
+----+-------+-----+-----------------+


+----+-------+-----+---------------+----------+----+
|name|  class|score|row_number_rank|dense_rank|rank|
+----+-------+-----+---------------+----------+----+
|張三|class_1|   99|              1|         1|  11|
| 張3|class_2|   29|              5|         1|   7|
| 張1|class_2|   19|              8|         2|   3|
| 張2|class_2|   19|              9|         2|   3|
| 張4|class_3|   49|              2|         1|   9|
| 張7|class_3|   32|              4|         2|   8|
| 張8|class_3|   22|              7|         3|   5|
| 張6|class_3|   14|             10|         4|   2|
| 張5|class_3|   12|             11|         5|   1|
|張11|class_4|   49|              3|         1|   9|
|張22|class_4|   24|              6|         2|   6|
+----+-------+-----+---------------+----------+----+


+----+-------+-----+-----------------------------------------------------------------------------------------------+
|name|  class|score|ntile(6) OVER (ORDER BY score DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+----+-------+-----+-----------------------------------------------------------------------------------------------+
|張三|class_1|   99|                                                                                              1|
| 張4|class_3|   49|                                                                                              1|
|張11|class_4|   49|                                                                                              2|
| 張7|class_3|   32|                                                                                              2|
| 張3|class_2|   29|                                                                                              3|
|張22|class_4|   24|                                                                                              3|
| 張8|class_3|   22|                                                                                              4|
| 張1|class_2|   19|                                                                                              4|
| 張2|class_2|   19|                                                                                              5|
| 張6|class_3|   14|                                                                                              5|
| 張5|class_3|   12|                                                                                              6|
+----+-------+-----+-----------------------------------------------------------------------------------------------+

到了這里,關(guān)于SparkSQL函數(shù)定義——UDF函數(shù),窗口函數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包