国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark SQL函數(shù)定義

這篇具有很好參考價(jià)值的文章主要介紹了Spark SQL函數(shù)定義。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

窗口函數(shù)

SQL函數(shù)分類

Spark原生自定義UDF函數(shù)

Pandas的UDF函數(shù)

Apache Arrow框架基本介紹

基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)

基于Pandas完成UDF函數(shù)

?自定義UDF函數(shù)

自定義UDAF函數(shù)


窗口函數(shù)

分析函數(shù) over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

分析函數(shù)可以大致分成如下3類:
1- 第一類: 聚合函數(shù) sum() count() avg() max() min()
2- 第二類: row_number() rank() dense_rank() ntile()
3- 第三類: first_value() last_value() lead() lag()

在Spark SQL中使用窗口函數(shù)案例:

需求是找出每個(gè)cookie中pv排在前3位的數(shù)據(jù),也就是分組取TOPN問題

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win

# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 創(chuàng)建SparkSession對象
    spark = SparkSession.builder\
        .config('spark.sql.shuffle.partitions',1)\
        .appName('sparksql_win_function')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 數(shù)據(jù)輸入
    init_df = spark.read.csv(
        path='file:///export/data/gz16_pyspark/02_spark_sql/data/cookie.txt',
        schema='cookie string,datestr string,pv int',
        sep=',',
        encoding='UTF-8'
    )

    init_df.createTempView('win_data')
    init_df.show()
    init_df.printSchema()

    # 3- 數(shù)據(jù)處理
    # SQL
    spark.sql("""
        select 
            cookie,datestr,pv
        from (
            select
                cookie,datestr,pv,
                row_number() over (partition by cookie order by pv desc) as rn
            from win_data
        ) tmp where rn<=3
    """).show()

    # DSL
    """
        select:注意點(diǎn),結(jié)果中需要看到哪幾個(gè)字段,就要明確寫出來
    """
    init_df.select(
        "cookie","datestr","pv",
        F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')
    ).where('rn<=3').select("cookie","datestr","pv").show()


    # 4- 數(shù)據(jù)輸出
    # 5- 釋放資源
    spark.stop()

SQL函數(shù)分類

SQL函數(shù),主要分為以下三大類:

  • UDF函數(shù):用戶自定義函數(shù)

    • 特點(diǎn):一對一,輸入一個(gè)得到一個(gè)

    • 例如:split() substr()

  • UDAF函數(shù):用戶自定義聚合函數(shù)

    • 特點(diǎn):多對一,輸入多個(gè)得到一個(gè)

    • 例如:sum() avg() count() min()

  • UDTF函數(shù):用戶自定義表數(shù)據(jù)生成函數(shù)

    • 特點(diǎn):一對多,輸入一個(gè)得到多個(gè)

    • 例如:explode()

在SQL中提供的所有的內(nèi)置函數(shù),都是屬于以上三類中某一類函數(shù)

思考:有這么多的內(nèi)置函數(shù),為啥還需要自定義函數(shù)呢?

為了擴(kuò)充函數(shù)功能。在實(shí)際使用中,并不能保證所有的操作函數(shù)都已經(jīng)提前的內(nèi)置好了。很多基于業(yè)務(wù)處理的功能,其實(shí)并沒有提供對應(yīng)的函數(shù),提供的函數(shù)更多是以公共功能函數(shù)。此時(shí)需要進(jìn)行自定義,來擴(kuò)充新的功能函數(shù)

Spark SQL函數(shù)定義,spark,sql,大數(shù)據(jù)

1- SparkSQL原生的時(shí)候,Python只能開發(fā)UDF函數(shù)
2- SparkSQL借助其他第三方組件,Python可以開發(fā)UDF、UDAF函數(shù)

在Spark SQL中,針對Python語言,對于自定義函數(shù),原生支持的并不是特別好。目前原生僅支持自定義UDF函數(shù),而無法自定義UDAF函數(shù)和UDTF函數(shù)。

在1.6版本后,Java 和scala語言支持自定義UDAF函數(shù),但Python并不支持。

Spark SQL原生存在的問題:大量的序列化和反序列

Spark SQL函數(shù)定義,spark,sql,大數(shù)據(jù)

?雖然Python支持自定義UDF函數(shù),但是其效率并不是特別的高效。因?yàn)樵谑褂玫臅r(shí)候,傳遞一行處理一行,返回一行的方式。這樣會帶來非常大的序列化的開銷的問題,導(dǎo)致原生UDF函數(shù)效率不好
?? ?
早期解決方案: 基于Java/Scala來編寫自定義UDF函數(shù),然后基于python調(diào)用即可
?? ?
目前主要的解決方案: 引入Arrow框架,可以基于內(nèi)存來完成數(shù)據(jù)傳輸工作,可以大大的降低了序列化的開銷,提供傳輸?shù)男剩鉀Q原生的問題。同時(shí)還可以基于pandas的自定義函數(shù),利用pandas的函數(shù)優(yōu)勢完成各種處理操作

Spark原生自定義UDF函數(shù)

?自定義函數(shù)流程:

第一步: 在PySpark中創(chuàng)建一個(gè)Python的函數(shù),在這個(gè)函數(shù)中書寫自定義的功能邏輯代碼即可

第二步: 將Python函數(shù)注冊到Spark SQL中
?? ?注冊方式一: udf對象 = sparkSession.udf.register(參數(shù)1,參數(shù)2,參數(shù)3)
?? ??? ?參數(shù)1: 【UDF函數(shù)名稱】,此名稱用于后續(xù)在SQL中使用,可以任意取值,但是要符合名稱的規(guī)范
?? ??? ?參數(shù)2: 【自定義的Python函數(shù)】,表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)3: 【UDF函數(shù)的返回值類型】。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象,可以在DSL中使用
?? ?
?? ??? ?說明: 如果通過方式一來注冊函數(shù), 【可以用在SQL和DSL】
?? ?
?? ?注冊方式二:? udf對象 = F.udf(參數(shù)1,參數(shù)2)
?? ??? ?參數(shù)1: Python函數(shù)的名稱,表示將那個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)2: 返回值的類型。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象,可以在DSL中使用
?? ??? ?
?? ??? ?說明: 如果通過方式二來注冊函數(shù),【僅能用在DSL中】
?? ??? ?
?? ?注冊方式三:? 語法糖寫法? @F.udf(returnType=返回值類型)? 放置到對應(yīng)Python的函數(shù)上面
?? ??? ?說明: 實(shí)際是方式二的擴(kuò)展。如果通過方式三來注冊函數(shù),【僅能用在DSL中】
?? ?
?? ??? ?
第三步: 在Spark SQL的 DSL/ SQL 中進(jìn)行使用即可

# 自定義一個(gè)函數(shù),完成對數(shù)據(jù)統(tǒng)一添加一個(gè)后綴名的操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
from pyspark.sql.types import StringType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("請自定義一個(gè)函數(shù),完成對數(shù)據(jù)統(tǒng)一添加一個(gè)后綴名的操作_itheima")

    # 1- 創(chuàng)建SparkSession對象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_udf_basetype')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 數(shù)據(jù)輸入
    init_df = spark.createDataFrame(
        data=[(1,'張三','廣州'),(2,'李四','深圳')],
        schema='id int,name string,address string'
    )
    init_df.printSchema()
    init_df.show()
    init_df.createTempView('tmp')

    # 3- 數(shù)據(jù)處理
    # 3.1- 創(chuàng)建自定義的Python函數(shù)
    def add_suffix(address):
        return address + "_itheima"

    # 3.2- 將Python函數(shù)注冊到Spark SQL
    # 注冊方式一
    dsl_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())

    # 3.3- 在SQL/DSL中調(diào)用
    # SQL
    spark.sql("""
        select
            id,name,address,
            sql_add_suffix(address) as new_address
        from tmp
    """).show()

    # DSL
    init_df.select(
        "id",
        "name",
        "address",
        dsl_add_suffix("address").alias("new_address")
    ).show()

    print("-"*30)
    # 在錯(cuò)誤的地方調(diào)用了錯(cuò)誤的函數(shù)。spark.udf.register參數(shù)1取的函數(shù)名只能在SQL中使用,不能在DSL中用。
    # spark.sql("""
    #     select
    #         id,name,address,
    #         dsl_add_suffix(address) as new_address
    #     from tmp
    # """).show()

    # 注冊方式二:UDF返回值類型傳值方式一
    dsl2_add_suffix = F.udf(add_suffix,StringType())

    # DSL
    init_df.select(
        "id",
        "name",
        "address",
        dsl2_add_suffix("address").alias("new_address")
    ).show()

    # 注冊方式二:UDF返回值類型傳值方式二
    dsl3_add_suffix = F.udf(add_suffix, 'string')

    # DSL
    init_df.select(
        "id",
        "name",
        "address",
        dsl3_add_suffix("address").alias("new_address")
    ).show()

    # 注冊方式三:語法糖/裝飾器
    @F.udf(returnType=StringType())
    def add_suffix_candy(address):
        return address + "_itheima"

    # DSL
    init_df.select(
        "id",
        "name",
        "address",
        add_suffix_candy("address").alias("new_address")
    ).show()

    # 4- 數(shù)據(jù)輸出
    # 5- 釋放資源
    spark.stop()

Pandas的UDF函數(shù)

Apache Arrow框架基本介紹

Apache Arrow是Apache旗下的一款頂級的項(xiàng)目。是一個(gè)跨平臺的在內(nèi)存中以列式存儲的數(shù)據(jù)層,它的設(shè)計(jì)目標(biāo)就是作為一個(gè)跨平臺的數(shù)據(jù)層,來加快大數(shù)據(jù)分析項(xiàng)目的運(yùn)行效率

Pandas 與 Spark SQL 進(jìn)行交互的時(shí)候,建立在Apache Arrow上,帶來低開銷 高性能的UDF函數(shù)

Arrow并不會自動使用,在某些情況下,需要配置 以及在代碼中需要進(jìn)行小的更改才可以使用

如何安裝? 三個(gè)節(jié)點(diǎn)建議都安裝

檢查服務(wù)器上是否有安裝pyspark
pip list | grep pyspark? 或者 conda list | grep pyspark

如果服務(wù)器已經(jīng)安裝了pyspark的庫,那么僅需要執(zhí)行以下內(nèi)容,即可安裝。例如在 node1安裝
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
?? ?
如果服務(wù)器中python環(huán)境中沒有安裝pyspark,建議執(zhí)行以下操作,即可安裝。例如在 node2 和 node3安裝
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==10.0.0

?

如何使用呢? 默認(rèn)不會自動啟動的, 一般建議手動配置

sparkSession.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)

基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)

使用場景:

1- Spark的DataFrame -> Pandas的DataFrame:當(dāng)大數(shù)據(jù)處理到后期的時(shí)候,可能數(shù)據(jù)量會越來越少,這樣可以考慮使用單機(jī)版的Pandas來做后續(xù)數(shù)據(jù)的分析

2- Pandas的DataFrame -> Spark的DataFrame:當(dāng)數(shù)據(jù)量達(dá)到單機(jī)無法高效處理的時(shí)候,或者需要和其他大數(shù)據(jù)框架集成的時(shí)候,可以轉(zhuǎn)成Spark中的DataFrame

總結(jié):
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("基于Arrow完成Pandas DataFrame和Spark DataFrame互轉(zhuǎn)")

    # 1- 創(chuàng)建SparkSession對象
    spark = SparkSession.builder\
        .appName('dataframe')\
        .master('local[*]')\
        .getOrCreate()

    # 手動開啟Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 數(shù)據(jù)輸入
    init_df = spark.createDataFrame(
        data=[(1, '張三', '廣州'), (2, '李四', '深圳')],
        schema='id int,name string,address string'
    )

    # 3- 數(shù)據(jù)處理
    # sparksql dataframe -> pandas dataframe
    pd_df = init_df.toPandas()
    print(type(pd_df),pd_df)

    new_pd_df = pd_df[pd_df['id']==2]

    # pandas dataframe -> sparksql dataframe
    spark_df = spark.createDataFrame(data=new_pd_df)
    spark_df.show()
    spark_df.printSchema()

    # 4- 數(shù)據(jù)輸出
    # 5- 釋放資源
    spark.stop()

基于Pandas完成UDF函數(shù)

基于Pandas的UDF函數(shù)來轉(zhuǎn)換為Spark SQL的UDF函數(shù)進(jìn)行使用。底層是基于Arrow框架來完成數(shù)據(jù)傳輸,允許向量化(可以充分利用計(jì)算機(jī)CPU性能)操作。

Pandas的UDF函數(shù)其實(shí)本質(zhì)上就是Python的函數(shù),只不過函數(shù)的傳入數(shù)據(jù)類型為Pandas的類型

基于Pandas的UDF可以使用自定義UDF函數(shù)和自定義UDAF函數(shù)

自定義函數(shù)流程:

第一步: 在PySpark中創(chuàng)建一個(gè)Python的函數(shù),在這個(gè)函數(shù)中書寫自定義的功能邏輯代碼即可

第二步: 將Python函數(shù)包裝成Spark SQL的函數(shù)
?? ?注冊方式一: udf對象 = spark.udf.register(參數(shù)1, 參數(shù)2)
?? ??? ?參數(shù)1: UDF函數(shù)名稱。此名稱用于后續(xù)在SQL中使用,可以任意取值,但是要符合名稱的規(guī)范
?? ??? ?參數(shù)2: Python函數(shù)的名稱。表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?使用: udf對象只能在DSL中使用。參數(shù)1指定的名稱只能在SQL中使用
?? ??? ?注意: 如果編寫的是UDAF函數(shù),那么注冊方式一需要配合注冊方式三,一起使用
?? ??? ?
?? ?注冊方式二: udf對象 = F.pandas_udf(參數(shù)1, 參數(shù)2)
?? ??? ?參數(shù)1: 自定義的Python函數(shù)。表示將哪個(gè)Python的函數(shù)注冊為Spark SQL的函數(shù)
?? ??? ?參數(shù)2: UDF函數(shù)的返回值類型。用于表示當(dāng)前這個(gè)Python的函數(shù)返回的類型對應(yīng)到Spark SQL的數(shù)據(jù)類型
?? ??? ?udf對象: 返回值對象,是一個(gè)UDF對象。僅能用在DSL中使用
?? ?
?? ?注冊方式三: 語法糖寫法? @F.pandas_udf(returnType=返回值Spark SQL的數(shù)據(jù)類型)? 放置到對應(yīng)Python的函數(shù)上面
?? ??? ?說明: 實(shí)際是方式一的擴(kuò)展。僅能用在DSL中使用
?? ?
?? ?
第三步: 在Spark SQL的 DSL/ SQL 中進(jìn)行使用即可

?自定義UDF函數(shù)

自定義Python函數(shù)的要求:SeriesToSeries

  • 表示:第一步中創(chuàng)建自定義Python函數(shù)的時(shí)候,輸入?yún)?shù)的類型和返回值類型必須都是Pandas中的Series類型

  • 需求:完成a列和b列的求和計(jì)算操作

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F

# 綁定指定的Python解釋器
from pyspark.sql.types import IntegerType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 創(chuàng)建SparkSession對象
    spark = SparkSession.builder\
        .appName('pandas_udf')\
        .master('local[*]')\
        .getOrCreate()

    # 手動開啟Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 數(shù)據(jù)輸入
    init_df = spark.createDataFrame(
        data=[(1,2),(2,3),(3,4)],
        schema='num1 int,num2 int'
    )

    init_df.createTempView('tmp')

    # 3- 數(shù)據(jù)處理
    # 3.1- 自定義Python函數(shù)
    """
        1- num1:pd.Series用來限定輸入的參數(shù)類型是Pandas中的Series對象
        2-  -> pd.Series用來限定返回值類型是Pandas中的Series對象
    """
    def my_sum(num1:pd.Series, num2:pd.Series) -> pd.Series:
        return num1+num2

    # 3.2- 注冊進(jìn)SparkSQL。注冊方式一
    dsl_my_sum = spark.udf.register('sql_my_sum',my_sum)

    # 3.3- 使用
    # SQL
    spark.sql("""
        select
            num1,num2,
            sql_my_sum(num1,num2) as result
        from tmp
    """).show()

    # DSL
    init_df.select(
        "num1",
        "num2",
        dsl_my_sum("num1", "num2").alias("result")
    ).show()


    # 注冊方式二
    dsl2_my_sum = F.pandas_udf(my_sum,IntegerType())

    # DSL
    init_df.select(
        "num1",
        "num2",
        dsl2_my_sum("num1", "num2").alias("result")
    ).show()

    # 注冊方式三
    @F.pandas_udf(IntegerType())
    def my_sum_candy(num1:pd.Series, num2:pd.Series) -> pd.Series:
        return num1+num2

    # DSL
    init_df.select(
        "num1",
        "num2",
        my_sum_candy("num1", "num2").alias("result")
    ).show()

    # 4- 數(shù)據(jù)輸出
    # 5- 釋放資源
    spark.stop()

自定義UDAF函數(shù)

自定義Python函數(shù)的要求:Series To 標(biāo)量

  • 表示:自定義函數(shù)的輸入數(shù)據(jù)類型是Pandas中的Series對象,返回值數(shù)據(jù)類型是標(biāo)量數(shù)據(jù)類型。也就是Python中的數(shù)據(jù)類型,例如:int、float、bool、list....

  • 需求:對某一列數(shù)據(jù)計(jì)算平均值的操作文章來源地址http://www.zghlxwxcb.cn/news/detail-813273.html

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F

# 綁定指定的Python解釋器
from pyspark.sql.types import IntegerType, FloatType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 創(chuàng)建SparkSession對象
    spark = SparkSession.builder\
        .appName('pandas_udaf')\
        .master('local[*]')\
        .getOrCreate()

    # 手動開啟Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 數(shù)據(jù)輸入
    init_df = spark.createDataFrame(
        data=[(1,2),(2,3),(3,3)],
        schema='num1 int,num2 int'
    )

    init_df.createTempView('tmp')

    # 3- 數(shù)據(jù)處理
    # 3.1- 自定義Python函數(shù)
    """
        UDAF對自定義Python函數(shù)的要求:輸入數(shù)據(jù)的類型必須是Pandas中的Series對象,返回值類型必須是Python中的標(biāo)量數(shù)據(jù)類型
    """
    @F.pandas_udf(returnType=FloatType())
    def my_avg(num2_col:pd.Series) -> float:
        print(type(num2_col))
        print(num2_col)
        # 計(jì)算平均值
        return num2_col.mean()

    # 3.2- 注冊進(jìn)SparkSQL。注冊方式一
    dsl_my_avg = spark.udf.register('sql_my_avg',my_avg)

    # 3.3- 使用
    # SQL
    spark.sql("""
        select
            sql_my_avg(num2) as result
        from tmp
    """).show()

    # DSL
    init_df.select(dsl_my_avg("num2").alias("result")).show()

    # 4- 數(shù)據(jù)輸出
    # 5- 釋放資源
    spark.stop()

到了這里,關(guān)于Spark SQL函數(shù)定義的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Spark SQL自定義collect_list分組排序

    想要在spark sql中對group by + concat_ws()的字段進(jìn)行排序,可以參考如下方法。 原始數(shù)據(jù)如下: 目標(biāo)數(shù)據(jù)如下: spark-shell: 1.使用開窗函數(shù) 因?yàn)槭褂瞄_窗函數(shù)本身會使用比較多的資源, 這種方式在大數(shù)據(jù)量下性能會比較慢,所以嘗試下面的操作。 2.使用struct和sort_array(array,asc?tru

    2024年02月01日
    瀏覽(22)
  • Spark【Spark SQL(二)RDD轉(zhuǎn)換DataFrame、Spark SQL讀寫數(shù)據(jù)庫 】

    Spark【Spark SQL(二)RDD轉(zhuǎn)換DataFrame、Spark SQL讀寫數(shù)據(jù)庫 】

    Saprk 提供了兩種方法來實(shí)現(xiàn)從 RDD 轉(zhuǎn)換得到 DataFrame: 利用反射機(jī)制推斷 RDD 模式 使用編程方式定義 RDD 模式 下面使用到的數(shù)據(jù) people.txt : ????????在利用反射機(jī)制推斷 RDD 模式的過程時(shí),需要先定義一個(gè) case 類,因?yàn)橹挥?case 類才能被 Spark 隱式地轉(zhuǎn)換為DataFrame對象。 注意

    2024年02月09日
    瀏覽(26)
  • Spark SQL示例用法所有函數(shù)示例權(quán)威詳解一【建議收藏】

    Spark中所有功能的入口點(diǎn)是 SparkSession 類。要?jiǎng)?chuàng)建一個(gè)基本的 SparkSession ,只需使用 SparkSession.builder() : 完整示例代碼可在Spark存儲庫的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中找到。 在Spark 2.0中, SparkSession 提供了 對Hive功能的內(nèi)置支持 ,包括 使用Hi

    2024年02月05日
    瀏覽(20)
  • 大數(shù)據(jù)技術(shù)之Spark——Spark SQL

    大數(shù)據(jù)技術(shù)之Spark——Spark SQL

    ? ? ? ? Spark SQL是Spark用于結(jié)構(gòu)化數(shù)據(jù)處理的Spark模塊。 ????????我們之前學(xué)習(xí)過hive,hive是一個(gè)基于hadoop的SQL引擎工具,目的是為了簡化mapreduce的開發(fā)。由于mapreduce開發(fā)效率不高,且學(xué)習(xí)較為困難,為了提高mapreduce的開發(fā)效率,出現(xiàn)了hive,用SQL的方式來簡化mapreduce:hiv

    2024年02月12日
    瀏覽(20)
  • spark-sql處理json字符串的常用函數(shù)

    整理了spark-sql處理json字符串的幾個(gè)函數(shù): 1?get_json_object 解析不含數(shù)組的 json ? 2 from_json? 解析json 3 schema_of_json?提供生成json格式的方法 4 explode? ?把JSONArray轉(zhuǎn)為多行 get_json_object(string json_string, string path) :適合最外層為{}的json解析。 ?第一個(gè)參數(shù)是json對象變量,也就是含j

    2023年04月08日
    瀏覽(17)
  • Spark SQL數(shù)據(jù)源:JDBC

    Spark SQL數(shù)據(jù)源:JDBC

    Spark SQL還可以使用JDBC API從其他關(guān)系型數(shù)據(jù)庫讀取數(shù)據(jù),返回的結(jié)果仍然是一個(gè)DataFrame,可以很容易地在Spark SQL中處理,或者與其他數(shù)據(jù)源進(jìn)行連接查詢。 在使用JDBC連接數(shù)據(jù)庫時(shí)可以指定相應(yīng)的連接屬性 屬性 介紹 url 連接的JDBC URL driver JDBC驅(qū)動的類名 user 數(shù)據(jù)庫用戶名 pass

    2024年02月09日
    瀏覽(20)
  • 【Spark大數(shù)據(jù)習(xí)題】習(xí)題_Spark SQL&&&Kafka&& HBase&&Hive

    PDF資源路徑-Spark1 PDF資源路徑-Spark2 一、填空題 1、Scala語言的特性包含面向?qū)ο缶幊?、函?shù)式編程的、靜態(tài)類型的、可擴(kuò)展的、可以交互操作的。 2、在Scala數(shù)據(jù)類型層級結(jié)構(gòu)的底部有兩個(gè)數(shù)據(jù)類型,分別是 Nothing和Null。 3、在Scala中,聲明變量的有var聲明變量和val聲明常

    2024年02月06日
    瀏覽(21)
  • Spark大數(shù)據(jù)處理講課筆記4.1 Spark SQL概述、數(shù)據(jù)幀與數(shù)據(jù)集

    Spark大數(shù)據(jù)處理講課筆記4.1 Spark SQL概述、數(shù)據(jù)幀與數(shù)據(jù)集

    ? 目錄 零、本講學(xué)習(xí)目標(biāo) 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL結(jié)構(gòu) 1、Spark SQL架構(gòu)圖 2、Spark SQL三大過程 3、Spark SQL內(nèi)部五大組件 (四)Spark SQL工作流程 (五)Spark SQL主要特點(diǎn) 1、將SQL查詢與Spark應(yīng)用程序無縫組合 2、Spark SQL以相同方式連接多種數(shù)據(jù)

    2024年02月09日
    瀏覽(25)
  • Spark大數(shù)據(jù)處理講課筆記4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    Spark大數(shù)據(jù)處理講課筆記4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    ? 目錄 零、本講學(xué)習(xí)目標(biāo) 一、基本操作 二、默認(rèn)數(shù)據(jù)源 (一)默認(rèn)數(shù)據(jù)源Parquet (二)案例演示讀取Parquet文件 1、在Spark Shell中演示 2、通過Scala程序演示 三、手動指定數(shù)據(jù)源 (一)format()與option()方法概述 (二)案例演示讀取不同數(shù)據(jù)源 1、讀取房源csv文件 2、讀取json,保

    2024年02月09日
    瀏覽(26)
  • Spark SQL數(shù)據(jù)源:Hive表

    Spark SQL數(shù)據(jù)源:Hive表

    Spark SQL還支持讀取和寫入存儲在Apache Hive中的數(shù)據(jù)。然而,由于Hive有大量依賴項(xiàng),這些依賴項(xiàng)不包括在默認(rèn)的Spark發(fā)行版中,如果在classpath上配置了這些Hive依賴項(xiàng),Spark就會自動加載它們。需要注意的是,這些Hive依賴項(xiàng)必須出現(xiàn)在所有Worker節(jié)點(diǎn)上,因?yàn)樗鼈冃枰L問Hive序列化

    2024年02月11日
    瀏覽(37)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包