国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark 7:Spark SQL 函數(shù)定義

這篇具有很好參考價(jià)值的文章主要介紹了Spark 7:Spark SQL 函數(shù)定義。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

SparkSQL 定義UDF函數(shù)

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

方式1語(yǔ)法:
udf對(duì)象 = sparksession.udf.register(參數(shù)1,參數(shù)2,參數(shù)3)
參數(shù)1:UDF名稱(chēng),可用于SQL風(fēng)格
參數(shù)2:被注冊(cè)成UDF的方法名
參數(shù)3:聲明UDF的返回值類(lèi)型
udf對(duì)象: 返回值對(duì)象,是一個(gè)UDF對(duì)象,可用于DSL風(fēng)格
方式2語(yǔ)法:
udf對(duì)象 = F.udf(參數(shù)1, 參數(shù)2)
參數(shù)1:被注冊(cè)成UDF的方法名
參數(shù)2:聲明UDF的返回值類(lèi)型
udf對(duì)象: 返回值對(duì)象,是一個(gè)UDF對(duì)象,可用于DSL風(fēng)格
其中F是:
from pyspark.sql import functions as F
其中,被注冊(cè)成UDF的方法名是指具體的計(jì)算方法,如:
def add(x, y): x + y
add就是將要被注冊(cè)成UDF的方法名

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式??

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 構(gòu)建一個(gè)RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(["num"])

    # TODO 1: 方式1 sparksession.udf.register(), DSL和SQL風(fēng)格均可以使用
    # UDF的處理函數(shù)
    def num_ride_10(num):
        return num * 10
    # 參數(shù)1: 注冊(cè)的UDF的名稱(chēng), 這個(gè)udf名稱(chēng), 僅可以用于 SQL風(fēng)格
    # 參數(shù)2: UDF的處理邏輯, 是一個(gè)單獨(dú)的方法
    # 參數(shù)3: 聲明UDF的返回值類(lèi)型, 注意: UDF注冊(cè)時(shí)候, 必須聲明返回值類(lèi)型, 并且UDF的真實(shí)返回值一定要和聲明的返回值一致
    # 返回值對(duì)象: 這是一個(gè)UDF對(duì)象, 僅可以用于 DSL 語(yǔ)法
    # 當(dāng)前這種方式定義的UDF, 可以通過(guò)參數(shù)1的名稱(chēng)用于SQL風(fēng)格, 通過(guò)返回值對(duì)象用戶(hù)DSL風(fēng)格
    udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

    # SQL風(fēng)格中使用
    # selectExpr 以SELECT的表達(dá)式執(zhí)行, 表達(dá)式 SQL風(fēng)格的表達(dá)式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column對(duì)象的計(jì)算
    df.selectExpr("udf1(num)").show()

    # DSL 風(fēng)格中使用
    # 返回值UDF對(duì)象 如果作為方法使用, 傳入的參數(shù) 一定是Column對(duì)象
    df.select(udf2(df['num'])).show()

    # TODO 2: 方式2注冊(cè), 僅能用于DSL風(fēng)格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

    df.selectExpr("udf3(num)").show()

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式?

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 構(gòu)建一個(gè)RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注冊(cè)UDF, UDF的執(zhí)行函數(shù)定義
    def split_line(data):
        return data.split(" ")  # 返回值是一個(gè)Array對(duì)象
    # TODO1 方式1 構(gòu)建UDF
    udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

    # DLS風(fēng)格
    df.select(udf2(df['line'])).show()
    # SQL風(fēng)格
    df.createTempView("lines")
    spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

    # TODO 2 方式2的形式構(gòu)建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式?

# coding:utf8
import string
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 假設(shè) 有三個(gè)數(shù)字  1 2 3  我們傳入數(shù)字 ,返回?cái)?shù)字所在序號(hào)對(duì)應(yīng)的 字母 然后和數(shù)字結(jié)合形成dict返回
    # 比如傳入1 我們返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    # 注冊(cè)UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}

    """
    UDF的返回值是字典的話(huà), 需要用StructType來(lái)接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式??

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

?SparkSQL 使用窗口函數(shù)

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

Spark 7:Spark SQL 函數(shù)定義,spark,大數(shù)據(jù),分布式

# coding:utf8
# 演示sparksql 窗口函數(shù)(開(kāi)窗函數(shù))
import string
from pyspark.sql import SparkSession
# 導(dǎo)入StructType對(duì)象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder. \
        appName("create df"). \
        master("local[*]"). \
        config("spark.sql.shuffle.partitions", "2"). \
        getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
    ('張三', 'class_1', 99),
    ('王五', 'class_2', 35),
    ('王三', 'class_3', 57),
    ('王久', 'class_4', 12),
    ('王麗', 'class_5', 99),
    ('王娟', 'class_1', 90),
    ('王軍', 'class_2', 91),
    ('王俊', 'class_3', 33),
    ('王君', 'class_4', 55),
    ('王珺', 'class_5', 66),
    ('鄭穎', 'class_1', 11),
    ('鄭輝', 'class_2', 33),
    ('張麗', 'class_3', 36),
    ('張張', 'class_4', 79),
    ('黃凱', 'class_5', 90),
    ('黃開(kāi)', 'class_1', 90),
    ('黃愷', 'class_2', 90),
    ('王凱', 'class_3', 11),
    ('王凱杰', 'class_1', 11),
    ('王開(kāi)杰', 'class_2', 3),
    ('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \
    add("class", StringType()). \
    add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函數(shù)只用于SQL風(fēng)格, 所以注冊(cè)表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 兩個(gè)SQL的結(jié)果集進(jìn)行整合而來(lái)
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 兩個(gè)SQL的結(jié)果集進(jìn)行整合而來(lái)
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定義,但在Python中,暫時(shí)只能定義UDF
UDF定義支持2種方式, 1:使用SparkSession對(duì)象構(gòu)建. 2: 使用functions包中提供的UDF API構(gòu)建. 要注意, 方式1可用DSL和SQL風(fēng)格, 方式2 僅可用于DSL風(fēng)格
SparkSQL支持窗口函數(shù)使用, 常用SQL中的窗口函數(shù)均支持, 如聚合窗口\排序窗口\NTILE分組窗口等

?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-674732.html

到了這里,關(guān)于Spark 7:Spark SQL 函數(shù)定義的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)筆記(第三章 Spark RDD 彈性分布式數(shù)據(jù)集-02)

    Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)筆記(第三章 Spark RDD 彈性分布式數(shù)據(jù)集-02)

    人生很長(zhǎng),不必慌張。你未長(zhǎng)大,我要擔(dān)當(dāng)。 傳統(tǒng)的MapReduce雖然具有自動(dòng)容錯(cuò)、平衡負(fù)載和可拓展性的優(yōu)點(diǎn),但是其最大缺點(diǎn)是采用非循環(huán)式的數(shù)據(jù)流模型,使得在迭代計(jì)算式要進(jìn)行大量的磁盤(pán)IO操作。Spark中的RDD可以很好的解決這一缺點(diǎn)。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    瀏覽(372)
  • 大數(shù)據(jù)課程K2——Spark的RDD彈性分布式數(shù)據(jù)集

    大數(shù)據(jù)課程K2——Spark的RDD彈性分布式數(shù)據(jù)集

    文章作者郵箱:yugongshiye@sina.cn? ? ? ? ? ? ? 地址:廣東惠州 ? 了解Spark的RDD結(jié)構(gòu); ??掌握Spark的RDD操作方法; ??掌握Spark的RDD常用變換方法、常用執(zhí)行方法; 初學(xué)Spark時(shí),把RDD看做是一個(gè)集合類(lèi)型(類(lèi)似于Array或List),用于存儲(chǔ)數(shù)據(jù)和操作數(shù)據(jù),但RDD和普通集合的區(qū)別

    2024年02月12日
    瀏覽(97)
  • 大數(shù)據(jù)開(kāi)源框架環(huán)境搭建(七)——Spark完全分布式集群的安裝部署

    大數(shù)據(jù)開(kāi)源框架環(huán)境搭建(七)——Spark完全分布式集群的安裝部署

    前言:七八九用于Spark的編程實(shí)驗(yàn) 大數(shù)據(jù)開(kāi)源框架之基于Spark的氣象數(shù)據(jù)處理與分析_木子一個(gè)Lee的博客-CSDN博客_spark輿情分析 目錄 實(shí)驗(yàn)環(huán)境: 實(shí)驗(yàn)步驟: 一、解壓 二、配置環(huán)境變量:? 三、修改配置文件? 1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分發(fā)配置文件:

    2024年02月11日
    瀏覽(94)
  • 云計(jì)算與大數(shù)據(jù)第16章 分布式內(nèi)存計(jì)算平臺(tái)Spark習(xí)題

    1、Spark是Hadoop生態(tài)(? B? )組件的替代方案。 A. Hadoop? ???B. MapReduce ???????C. Yarn ????????????D.HDFS 2、以下(? D? )不是Spark的主要組件。 A. Driver?? ???B. SparkContext ??????C. ClusterManager D. ResourceManager 3、Spark中的Executor是(? A? )。 A.執(zhí)行器????? B.主節(jié)

    2024年02月14日
    瀏覽(449)
  • 分布式計(jì)算中的大數(shù)據(jù)處理:Hadoop與Spark的性能優(yōu)化

    大數(shù)據(jù)處理是現(xiàn)代計(jì)算機(jī)科學(xué)的一個(gè)重要領(lǐng)域,它涉及到處理海量數(shù)據(jù)的技術(shù)和方法。隨著互聯(lián)網(wǎng)的發(fā)展,數(shù)據(jù)的規(guī)模不斷增長(zhǎng),傳統(tǒng)的計(jì)算方法已經(jīng)無(wú)法滿(mǎn)足需求。因此,分布式計(jì)算技術(shù)逐漸成為了主流。 Hadoop和Spark是目前最為流行的分布式計(jì)算框架之一,它們都提供了高

    2024年01月23日
    瀏覽(93)
  • 分布式計(jì)算框架:Spark、Dask、Ray
分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray

    分布式計(jì)算框架:Spark、Dask、Ray 分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray

    目錄 什么是分布式計(jì)算 分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray 2 選擇正確的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式計(jì)算是一種計(jì)算方法,和集中式計(jì)算是相對(duì)的。 隨著計(jì)算技術(shù)的發(fā)展, 有些應(yīng)用需要非常巨大的計(jì)算能力才能完成,如果采用集中式計(jì)算,需要耗費(fèi)相當(dāng)長(zhǎng)的時(shí)間來(lái)完成

    2024年02月11日
    瀏覽(102)
  • 數(shù)據(jù)存儲(chǔ)和分布式計(jì)算的實(shí)際應(yīng)用:如何使用Spark和Flink進(jìn)行數(shù)據(jù)處理和分析

    作為一名人工智能專(zhuān)家,程序員和軟件架構(gòu)師,我經(jīng)常涉及到數(shù)據(jù)處理和分析。在當(dāng)前大數(shù)據(jù)和云計(jì)算的時(shí)代,分布式計(jì)算已經(jīng)成為了一個(gè)重要的技術(shù)方向。Spark和Flink是當(dāng)前比較流行的分布式計(jì)算框架,它們提供了強(qiáng)大的分布式計(jì)算和數(shù)據(jù)分析功能,為數(shù)據(jù)處理和分析提供了

    2024年02月16日
    瀏覽(92)
  • Spark單機(jī)偽分布式環(huán)境搭建、完全分布式環(huán)境搭建、Spark-on-yarn模式搭建

    Spark單機(jī)偽分布式環(huán)境搭建、完全分布式環(huán)境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala環(huán)境。三種Spark環(huán)境搭建互不關(guān)聯(lián),都是從零開(kāi)始搭建。 如果將文章中的配置文件修改內(nèi)容復(fù)制粘貼的話(huà),所有配置文件添加的內(nèi)容后面的注釋記得刪除,可能會(huì)報(bào)錯(cuò)。保險(xiǎn)一點(diǎn)刪除最好。 上傳安裝包解壓并重命名 rz上傳 如果沒(méi)有安裝rz可以使用命

    2024年02月06日
    瀏覽(106)
  • 【Spark分布式內(nèi)存計(jì)算框架——Spark 基礎(chǔ)環(huán)境】1. Spark框架概述

    【Spark分布式內(nèi)存計(jì)算框架——Spark 基礎(chǔ)環(huán)境】1. Spark框架概述

    第一章 說(shuō)明 整個(gè)Spark 框架分為如下7個(gè)部分,總的來(lái)說(shuō)分為Spark 基礎(chǔ)環(huán)境、Spark 離線分析和Spark實(shí)時(shí)分析三個(gè)大的方面,如下圖所示: 第一方面、Spark 基礎(chǔ)環(huán)境 主要講述Spark框架安裝部署及開(kāi)發(fā)運(yùn)行,如何在本地模式和集群模式運(yùn)行,使用spark-shell及IDEA開(kāi)發(fā)應(yīng)用程序,測(cè)試及

    2024年02月11日
    瀏覽(92)
  • spark分布式解壓工具

    ? spark解壓縮工具,目前支持tar、gz、zip、bz2、7z壓縮格式,默認(rèn)解壓到當(dāng)前路下,也支持自定義的解壓輸出路徑。另外支持多種提交模式,進(jìn)行解壓任務(wù),可通過(guò)自定義配置文件,作為spark任務(wù)的資源設(shè)定 2.1 使用hadoop的FileSystem類(lèi),對(duì)tos文件的進(jìn)行讀取、查找、寫(xiě)入等操作

    2024年02月02日
    瀏覽(96)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包