国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark_SQL函數定義(定義UDF函數、使用窗口函數)

這篇具有很好參考價值的文章主要介紹了Spark_SQL函數定義(定義UDF函數、使用窗口函數)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

????????????????? ??一、UDF函數定義

? ? ? ? (1)函數定義

? ? ? ? (2)Spark支持定義函數

? ? ? ? (3)定義UDF函數

? ? ? ? ? ? ? ? (4)定義返回Array類型的UDF

????????(5)定義返回字典類型的UDF

二、窗口函數

? ? ? ? (1)開窗函數簡述

? ? ? ? (2)窗口函數的語法


一、UDF函數定義

? ? ? ? (1)函數定義

????????無論Hive還是SparkSQL分析處理數據時,往往需要使用函數,SparkSQL模塊本身自帶很多實現(xiàn)公共功能的函數,在pyspark.sql.functions中。SparkSQL與Hive一樣支持定義函數:UDF和UDAF,尤其是UDF函數在實際項目中使用最為廣泛。
????????Hive中自定義函數有三種類型:

? ? ? ? 第一種:UDF(User-Defined_-function)函數

? ? ? ? ? ? ? ? · 一對一的關系,輸入一個值經過函數以后輸出一個值;

? ? ? ? ? ? ? ? · 在Hive中繼承UDF類,方法名稱為evaluate,返回值不能為void,其實就是實現(xiàn)一個方法;

? ? ? ? 第二種:UDAF(User-Defined Aggregation Function)聚合函數

? ? ? ? ? ? ? ? · 多對一的關系,輸入多個值輸出一個值,通常于groupBy聯(lián)合使用;

? ? ? ? 第三種:UDTF(User-Defined Table-Generating Functions)函數

? ? ? ? ? ? ? ? · 一對多的關系,輸入一個值輸出多個值(一行變多為行);

? ? ? ? ? ? ? ? · 用戶自定義生成函數,有點像flatMap;

? ? ? ? (2)Spark支持定義函數

????????目前來說Spark框架各個版本及各種語言對自定義函數的支持:在SparkSQL中,目前僅僅支持UDF函數和UDAF函數,目前Python僅支持UDF。

Spark版本及支持函數定義
Apache Spark Version Spark SQL UDF(Python,Java,Scala) Spark SQL UDAF(Java,Scala) Spark SQL UDF(R) Hive UDF,UDAF,UDTF
1.1-1.4
1.5 experimental
1.6
2.0
? ? ? ? (3)定義UDF函數

? ? ? ? ①sparksession.udf.register()

? ? ? ? 注冊的UDF可以用于DSL和SQL,返回值用于DSL風格,傳參內給的名字用于SQL風格。

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? ②pyspark.sql.functions.udf

? ? ? ? 僅能用于DSL風格。

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? 其中F是:from pyspark.sql import functions as F。其中,被注冊為UDF的方法名是指具體的計算方法,如:def add(x, y): x + y? 。 add就是將要被注冊成UDF的方法名

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 構建一個RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(['num'])

    # TODO 1:方式1 sparksession.udf.register(),DSL和SQL風格均可使用
    # UDF的處理函數
    def num_ride_10(num):
        return num * 10
    # 參數1:注冊的UDF的名稱,這個UDF名稱,僅可以用于SQL風格
    # 參數2:UDF的處理邏輯,是一個單獨定義的方法
    # 參數3:聲明UDF的返回值類型,注意:UDF注冊時候,必要聲明返回值類型,并且UDF的真實返回值一定要和聲明的返回值一致
    # 當前這種方式定義的UDF,可以通過參數1的名稱用于SQL風格,通過返回值對象用戶的DSL風格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL風格中使用
    # selectExpr 以SELECT的表達式執(zhí)行,表達式SQL風格的表達式(字符串)
    # select方法,接受普通的字符串字段名,或者返回值時Column對象的計算
    df.selectExpr('udf1(num)').show()

    # DSL 風格使用
    # 返回值UDF對象,如果作為方法使用,傳入的參數一定是Column對象
    df.select(udf2(df['num'])).show()

    # TODO 2:方式2注冊,僅能用于DSL風格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

? ? ? ? 方式1結果:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? 方式2結果:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? ? ? ? ? (4)定義返回Array類型的UDF

????????注意:數組或者list類型,可以使用spark的ArrayType來描述即可。

????????注意:聲明ArrayType要類似這樣::ArrayType(StringType()),在ArrayType中傳入數組內的數據類型。

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 構建一個RDD
    rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    df = rdd.toDF(['line'])

    # 注冊UDF,UDF的執(zhí)行函數定義
    def split_line(data):
        return data.split(' ')

    # TODO 1:方式1 后見UDF
    udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))

    # DLS 風格
    df.select(udf2(df['line'])).show()

    # SQL風格
    df.createTempView('lines')
    spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)

    # TODO 2:方式的形式構建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)????????

????????(5)定義返回字典類型的UDF

????????注意:字典類型返回值,可以用StructType來進行描述,StructType是—個普通的Spark支持的結構化類型.
????????只是可以用在:
? ? ? ? ? ? ? ? · DF中用于描述Schema
? ? ? ? ? ? ? ? · UDF中用于描述返回值是字典的數據

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 假設 有三個數字: 1 2 3 在傳入數字,返回數字所在序號對應的 字母 然后和數字結合組成dict返回
    # 例:傳入1 返回{'num':1, 'letters': 'a'}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])

    # 注冊UDF
    def process(data):
        return {'num': data, 'letters': string.ascii_letters[data]}

    '''
    UDF返回值是字典的話,需要用StructType來接收
    '''
    udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
                              add('letters', StringType(), nullable=True))
    # SQL風格
    df.selectExpr('udf1(num)').show(truncate=False)
    # DSL風格
    df.select(udf1(df['num'])).show(truncate=False)

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? (6)通過RDD構建UDAF函數

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 方法:使用RDD的mapPartitions 算子來完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合,一定要單分區(qū)
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list,因為mapPartitions方法要求返回值是list對象

    print(single_partition_rdd.mapPartitions(process).collect())

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

二、窗口函數

? ? ? ? (1)開窗函數簡述

????????●介紹

????????開窗函數的引入是為了既顯示聚集前的數據又顯示聚集后的數據。即在每一行的最后一列添加聚合函數的結果。 開窗用于為行定義一個窗口(這里的窗口是指運算將要操作的行的集合),它對一組值進行操作,不需要使用GROUP BY子句對數據進行分組,能夠在同一行中同時返回基礎行的列和聚合列。

????????●聚合函數和開窗函數

????????聚合函數是將多行變成一行,count,avg...

????????開窗函數是將一行變成多行;

????????聚合函數如果要顯示其他的列必須將列加入到group by中,開窗函數可以不使用group by,直接將所有信息顯示出來。

????????●開窗函數分類

????????1.聚合開窗函數 聚合函數(列)OVER(選項),這里的選項可以是PARTITION BY子句,但不可以是ORDER BY子句。

????????2.排序開窗函數 排序函數(列)OVER(選項),這里的選項可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

????????3.分區(qū)類型NTILE的窗口函數

? ? ? ? (2)窗口函數的語法

? ? ? ? 窗口函數的語法:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ('張三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王麗', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王軍', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('鄭穎', 'class_1', 11),
        ('鄭輝', 'class_2', 33),
        ('張麗', 'class_3', 36),
        ('張張', 'class_4', 79),
        ('黃凱', 'class_5', 90),
        ('黃開', 'class_1', 90),
        ('黃愷', 'class_2', 90),
        ('王凱', 'class_3', 11),
        ('王凱杰', 'class_1', 11),
        ('王開杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)])
    schema = StructType().add('name', StringType()).\
        add('class', StringType()).\
        add('score', IntegerType())
    df = rdd.toDF(schema)
    # 創(chuàng)建表
    df.createTempView('stu')

    # TODO 1:聚合窗口函數的演示
    spark.sql('''
        SELECT *, AVG(score) over() AS avg_socre FROM stu
    ''').show()

    # TODO 2: 排序相關的窗口函數計算
    # RANK over, DENSE_RANK over, ROW_NUMBER over
    spark.sql('''
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS RANK
        FROM stu
    ''').show()

    # TODO NTILE
    spark.sql('''
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    ''').show()

? ? ? ? TODO1結果:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? TODO2結果展示:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)

? ? ? ? TODO3結果展示:

spark-sql創(chuàng)建hive udf,大數據,1024程序員節(jié)文章來源地址http://www.zghlxwxcb.cn/news/detail-774358.html

到了這里,關于Spark_SQL函數定義(定義UDF函數、使用窗口函數)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flinksql 流表轉換, 自定義udf/udtf,SQL 內置函數及自定義函數

    flinksql 流表轉換, 自定義udf/udtf,SQL 內置函數及自定義函數

    1、在大多數情況下,用戶定義的函數必須先注冊,然后才能在查詢中使用。不需要專門為 Scala 的 Table API 注冊函數。 2、函數通過調用 registerFunction()方法在 TableEnvironment 中注冊。當用戶定義的函數 被注冊時,它被插入到 TableEnvironment 的函數目錄中, 這樣 Table API 或 SQL 解

    2024年02月22日
    瀏覽(27)
  • spark-sql

    [root@localhost bin]# ./spark-sql Error: Failed to load class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. You need to build Spark with -Phive and -Phive-thriftserver. 24/02/22?00:23:20 INFO ShutdownHookManager: Shutdown hook called 24/02/22?00:23:20 INFO Shutd

    2024年02月22日
    瀏覽(16)
  • Spark-SQL小結

    Spark-SQL小結

    目錄 一、RDD、DataFrame、DataSet的概念、區(qū)別聯(lián)系、相互轉換操作 ? 1.RDD概念 ? 2.DataFrame概念 ? 3.DataSet概念 ? 4.RDD、DataFrame、DataSet的區(qū)別聯(lián)系 ? 5.RDD、DataFrame、DataSet的相互轉換操作 ? ?1 RDD-DataFrame、DataSet ? ?2? DataFrame-RDD,DataSet ? ?3 DataSet-RDD,DataFrame 二、Spark-SQL連接JDBC的方式

    2024年02月09日
    瀏覽(19)
  • spark-sql字段血緣實現(xiàn)

    spark-sql字段血緣實現(xiàn)

    Apache Spark是一個開源的大數據處理框架,它提供了一種高效、易于使用的方式來處理大規(guī)模數據集。在Spark中,數據是通過DataFrame和Dataset的形式進行操作的,這些數據結構包含了一系列的字段(也稱為列)。字段血緣是Spark中的一個關鍵概念,它幫助我們理解數據的來源和流

    2024年02月02日
    瀏覽(20)
  • Hudi-集成Spark之spark-sql方式

    啟動spark-sql 創(chuàng)建表 建表參數: 參數名 默認值 說明 primaryKey uuid 表的主鍵名,多個字段用逗號分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的預合并字段。同 hoodie.datasource.write.precombine.field type cow 創(chuàng)建的表類型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    瀏覽(24)
  • Spark參數配置和調優(yōu),Spark-SQL、Config

    一、Hive-SQL / Spark-SQL參數配置和調優(yōu) 二、shell腳本spark-submit參數配置 三、sparkSession中配置參數

    2024年02月13日
    瀏覽(21)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目錄 0. 相關文章鏈接 1.?創(chuàng)建表 1.1.?啟動spark-sql 1.2.?建表參數 1.3.?創(chuàng)建非分區(qū)表 1.4.?創(chuàng)建分區(qū)表 1.5.?在已有的hudi表上創(chuàng)建新表 1.6.?通過CTAS (Create Table As Select)建表 2.?插入數據 2.1.?向非分區(qū)表插入數據 2.2.?向分區(qū)表動態(tài)分區(qū)插入數據 2.3.?向分區(qū)表靜態(tài)分區(qū)插入數據 2.4

    2024年02月06日
    瀏覽(21)
  • Spark-SQL連接Hive的五種方法

    Spark-SQL連接Hive的五種方法

    若使用Spark內嵌的Hive,直接使用即可,什么都不需要做(在實際生產活動中,很少會使用這一模式) 步驟: 將Hive中conf/下的hive-site.xml拷貝到Spark的conf/目錄下; 把Mysql的驅動copy到jars/目錄下; 如果訪問不到hdfs,則將core-site.xml和hdfs-site.xml拷貝到conf/目錄下; 重啟spark-shell;

    2024年02月16日
    瀏覽(21)
  • spark-udf函數

    from pyspark.sql import SparkSession from pyspark.sql.types import * ss = SparkSession.builder.getOrCreate() df_csv = ss.read.csv(‘hdfs://node1:8020/user/hive/warehouse/data/stu.csv’, schema=‘name string,age int,gender string,phone string,email string,city string,address string’) df_csv.show() def func(email): username = email.split(‘@’)[0] email

    2024年01月22日
    瀏覽(16)
  • Spark-SQL連接JDBC的方式及代碼寫法

    Spark-SQL連接JDBC的方式及代碼寫法

    提示:文章內容僅供參考! 目錄 一、數據加載與保存 通用方式: 加載數據: 保存數據: 二、Parquet 加載數據: 保存數據: 三、JSON 四、CSV ?五、MySQL SparkSQL 提供了通用的保存數據和數據加載的方式。這里的通用指的是使用相同的API,根據不同的參數讀取和保存不同格式的

    2024年02月13日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包