目錄
Spark SQL基本介紹
Spark SQL特點
Spark SQL與Hive的異同
Spark SQL的數(shù)據(jù)結(jié)構(gòu)
Spark SQL的入門
創(chuàng)建SparkSession對象
DataFrame詳解
DataFrame基本介紹
?DataFrame的構(gòu)建方式
RDD構(gòu)建DataFrame
?內(nèi)部初始化數(shù)據(jù)得到DataFrame
schema總結(jié)
讀取外部文件得到DataFrame
Text方式讀取
CSV方式讀取
JSON方式讀取
Spark SQL基本介紹
概念:Spark SQL是Spark多種組件中其中一個,主要是用于處理大規(guī)模的結(jié)構(gòu)化數(shù)據(jù)
結(jié)構(gòu)化數(shù)據(jù):可以轉(zhuǎn)化為二維表格的數(shù)據(jù),一份數(shù)據(jù),每一行,每一列的了下都是一致的,我們將這樣的數(shù)據(jù)稱為結(jié)構(gòu)化數(shù)據(jù)
例如:mysql的表數(shù)據(jù)
??????? 1 張三 10
??????? 2 李四 18
??????? 3 王五 20
Spark SQL特點
1.融合性:既可以使用標(biāo)準(zhǔn)SQL語言,也可以編寫代碼,同時支持混合使用
2.統(tǒng)一的數(shù)據(jù)訪問:可以通過統(tǒng)一的API來對接不同的數(shù)據(jù)源
3.Hive的兼容性:Spark SQL可以和Hive進(jìn)行整合,整合后替換執(zhí)行引擎為Spark,核心是基于Hive的metastore來管理元數(shù)據(jù)
4.標(biāo)準(zhǔn)化連接:Spark SQL也支持JDBC/ODBC的連接方式
Spark SQL與Hive的異同
相同點:
??????? 1.都是分布式SQL計算引擎
??????? 2.都可以處理大規(guī)模的結(jié)構(gòu)化數(shù)據(jù)
??????? 3.都可以建立YARN集群上運行
不同點:
??????? 1.Spark SQL是基于內(nèi)存計算,而Hive SQL是基于磁盤進(jìn)行計算的
??????? 2.Spark SQL沒有元數(shù)據(jù)管理服務(wù)(自己維護),而Hive SQL是有metastore元數(shù)據(jù)管理服務(wù)的
??????? 3.Spark SQL底層執(zhí)行的是Spark RDD程序,而Hive SQL底層執(zhí)行的是mapreduce程序
??????? 4.Spark SQL可以編寫SQL也可以編寫代碼,而Hive SQL僅能編寫SQL語句
Spark SQL的數(shù)據(jù)結(jié)構(gòu)
核心 | 數(shù)據(jù)結(jié)構(gòu) | 區(qū)別 |
Pandas | DataFrame | ● 二維表數(shù)據(jù)結(jié)構(gòu) ● 處理單機(本地集合)結(jié)構(gòu)數(shù)據(jù) |
SparkCore | RDD | ● 無標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu)(任何的數(shù)據(jù)結(jié)構(gòu)) ● 大規(guī)模的分布式結(jié)構(gòu)數(shù)據(jù)(分區(qū)) |
SparkSQL | DataFrame | ● 二維表格結(jié)構(gòu) ● 大規(guī)模的分布式結(jié)構(gòu)數(shù)據(jù)(分區(qū)) |
以圖為例:
RDD:存儲直接就是對象,存儲就是一個Person的對象,無法看到對象的數(shù)據(jù)內(nèi)容
DataFrame:將Person中的各個字段數(shù)據(jù),進(jìn)行結(jié)構(gòu)化存儲,形成一個DataFrame,可以直接看到數(shù)據(jù)
Dataset:將Person對象中的數(shù)據(jù)按照結(jié)構(gòu)化的方式存儲,同時保留對象的類型,從而知道來源于開一個Person對象
Spark SQL的入門
創(chuàng)建SparkSession對象
????????Spark SQL需要將頂級對象SparkContext變成SparkSesssion對象。SparkContext是RDD中的頂級對象,里面沒有和SQL編程相關(guān)的API/方法。通過SparkSession對象還是可以得到SparkContext對象。
# 如何構(gòu)建一個SparkSession對象呢?
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 創(chuàng)建SparkSQL中的頂級對象SparkSession
"""
注意事項:
1- SparkSession和builder都沒有小括號
2- appName():給應(yīng)用程序取名詞。等同于SparkCore中的setAppName()
3- master():設(shè)置運行時集群類型。等同于SparkCore中的setMaster()
"""
spark = SparkSession.builder\
.appName('create_sparksession_demo')\
.master('local[*]')\
.getOrCreate()
# 通過SparkSQL的頂級對象獲取SparkCore中的頂級對象
sc = spark.sparkContext
# 釋放資源
sc.stop()
spark.stop()
DataFrame詳解
DataFrame基本介紹
DataFrame:表示的是一個二維得表,存在行,列等表結(jié)構(gòu)描述信息
表結(jié)構(gòu)描述信息(元數(shù)據(jù)schema):strucType對象
字段:structField對象,可以描述字段名稱,字段數(shù)據(jù)類型,是否可以為空
行:Row對象
列:column對象,包括字段名稱和字段值
在一個StructType對象下,由多個StructField組成,構(gòu)建成一個完整的元數(shù)據(jù)信息
?DataFrame的構(gòu)建方式
RDD構(gòu)建DataFrame
場景:RDD可以存儲任意結(jié)構(gòu)的數(shù)據(jù);而DataFrame只能處理二維表數(shù)據(jù)。在使用Spark處理數(shù)據(jù)的初期,可能輸入進(jìn)來的數(shù)據(jù)是半結(jié)構(gòu)化或者是非結(jié)構(gòu)化的數(shù)據(jù),那么我可以先通過RDD對數(shù)據(jù)進(jìn)行ETL處理成結(jié)構(gòu)化數(shù)據(jù),再使用開發(fā)效率高的SparkSQL來對后續(xù)數(shù)據(jù)進(jìn)行處理分析。
# 導(dǎo)包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
# 綁定指定的python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 創(chuàng)建main函數(shù)
if __name__ == '__main__':
print('通過Rdd創(chuàng)建DataFrame')
# 創(chuàng)建SparkSession對象
spark = SparkSession \
.builder.appName('rdd_to_DataFrame_demo') \
.master('local[*]') \
.getOrCreate()
# 通過SparkSession對象創(chuàng)建SparkContext頂級對象
sc = spark.sparkContext
# 數(shù)據(jù)輸入
# 構(gòu)建rdd
init_rdd = sc.parallelize(['1,張三,18', '2,李四,20', '3,王五,22'])
# 將qrdd數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)成二維結(jié)構(gòu)
new_rdd = init_rdd.map(lambda line: (
int(line.split(',')[0]),
line.split(',')[1],
int(line.split(',')[2], )))
# 將RDD轉(zhuǎn)成DataFrame:方式一
# 構(gòu)建schema方式一
schema = StructType() \
.add('id', IntegerType(), False) \
.add('name', StringType(), False) \
.add('age', IntegerType(), False)
# 構(gòu)建schema方式二
schema = StructType([
StructField('id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('age', IntegerType(), False),
])
# 構(gòu)建schema方式三
schema = "id:int,name:string,age:int"
schema = "id int,name string,age int"
# 構(gòu)建schema方式四,不能指定字段類型
schema = ['id', 'name', 'age']
# 構(gòu)建DataFrame
init_df = spark.createDataFrame(data=new_rdd, schema=schema)
# 數(shù)據(jù)輸出
init_df.show()
# 輸出schema
init_df.printSchema()
print('=' * 50)
# 將RDD轉(zhuǎn)成DataFrame:方式二
"""
toDF:中的schema既可以傳List,也可以傳字符串形式的schema信息
"""
# 方式一:傳入列表
init_df2 = new_rdd.toDF(schema=['id', 'name', 'age'])
# 方式一:傳入字符串
init_df2 = new_rdd.toDF(schema="id:int,name:string,age:int")
init_df2 = new_rdd.toDF(schema="id int,name string,age int")
# 數(shù)據(jù)輸出
init_df2.show()
# 輸出schema
init_df2.printSchema()
# 釋放資源
spark.stop()
sc.stop()
?內(nèi)部初始化數(shù)據(jù)得到DataFrame
場景:一般用在開發(fā)和測試中。因為只能處理少量的數(shù)據(jù)
# 導(dǎo)包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
# 綁定指定的python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 創(chuàng)建main函數(shù)
if __name__ == '__main__':
print('內(nèi)部初始化數(shù)據(jù)得到DataFrame')
# 創(chuàng)建SparkSession對象
spark = SparkSession \
.builder \
.appName('inner_create_dataframe') \
.master('local[*]') \
.getOrCreate()
# 2- 數(shù)據(jù)輸入
"""
內(nèi)部初始化數(shù)據(jù)得到DataFrame
通過createDataFrame創(chuàng)建DataFrame,schema數(shù)據(jù)類型可以是:DataType、字符串、List
字符串:格式要求
格式一 字段1 字段類型,字段2 字段類型
格式二(推薦) 字段1:字段類型,字段2:字段類型
List:格式要求
["字段1","字段2"]
"""
# 方式一
init_df = spark.createDataFrame(
data=[(1, '張三', 18), (2, '李四', 20), (3, '王五', 22)],
schema='id int,name string,age int'
)
# 方式二
init_df = spark.createDataFrame(
data=[(1, '張三', 18), (2, '李四', 20), (3, '王五', 22)],
schema='id:int,name:string,age:int'
)
# 方式三,列表形式不能指定字段類型,有輸入的數(shù)據(jù)自動推斷字段類型
init_df = spark.createDataFrame(
data=[(1, '張三', 18), (2, '李四', 20), (3, '王五', 22)],
schema=['id', 'name', 'age']
)
# 數(shù)據(jù)輸出
init_df.show()
# 輸出schema信息
init_df.printSchema()
# 是否資源
spark.stop()
schema總結(jié)
通過createDataFrame創(chuàng)建DataFrame,schema數(shù)據(jù)類型可以是:DataType、字符串、List
1: 字符串
??? 格式一 字段1 字段類型,字段2 字段類型
??? 格式二(推薦) 字段1:字段類型,字段2:字段類型
?? ?
2: List
??? ["字段1","字段2"]
?? ?
3: DataType(推薦,用的最多)
??? 格式一 schema = StructType()\
??????????? .add('id',IntegerType(),False)\
??????????? .add('name',StringType(),True)\
??????????? .add('age',IntegerType(),False)??? 格式二 schema = StructType([
??????????? StructField('id',IntegerType(),False),
??????????? StructField('name',StringType(),True),
??????????? StructField('age',IntegerType(),False)
????????? ])
讀取外部文件得到DataFrame
復(fù)雜API
統(tǒng)一API格式:
Sparksession.read
??????? .format('text | csv | json | parquet | orc |? avro | jdbc | ......')??? # 讀取外部文件的方式
??????? .option('k','v')????????? # 選項,可以設(shè)置相關(guān)的參數(shù)(可選)
??????? .schema(structType | string)? # 設(shè)置表的結(jié)構(gòu)信息
??????? .load('加載數(shù)據(jù)路徑')??? # 讀取外部文件的路徑,支持HDFS也支持本地
?簡寫API
注意:所有的復(fù)雜API外部讀取方式,都有簡單的寫法,spark內(nèi)置了一些常用的讀取方案的簡寫
格式:
?????????? spark.read.讀取方式()
例如:
??????? df=spark.read.csv(
??????????????? path='文件路徑',
??????????????? header=True,
??????????????? sep=' ',
??????????????? inferschema=True,
??????????????? encoding='utf-8'
????????)
Text方式讀取
load:支持讀取HDFS文件系統(tǒng)和本地文件系統(tǒng)
??????????? HDFS文件系統(tǒng):hdfs://node1:8020/文件路徑
??????????? 本地文件系統(tǒng):file:///文件路徑
?????????? ?
??????? text方式讀取文件總結(jié):
??????????? 1- 不管文件中內(nèi)容是什么樣的,text會將所有內(nèi)容全部放到一個列中處理
??????????? 2- 默認(rèn)生成的列名叫value,數(shù)據(jù)類型string
??????????? 3- 我們只能夠在schema中修改字段value的名稱,其他任何內(nèi)容不能修改
# 導(dǎo)包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# 綁定指定的python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 創(chuàng)建main函數(shù)
if __name__ == '__main__':
# text方式讀取
print('text方式讀取外部文件')
# 創(chuàng)建sparksession對象
spark = SparkSession.builder.appName('text_demo').master('local[*]').getOrCreate()
# 復(fù)雜API方式
# 數(shù)據(jù)輸入
init_df = spark.read \
.format('text') \
.schema('my_file string') \
.load('file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt')
# 數(shù)據(jù)輸出
init_df.show()
# 輸出schema
init_df.printSchema()
# 簡寫API方式
init_df = spark.read.text(
paths='file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt'
)
init_df.show()
# 輸出schema
init_df.printSchema()
CSV方式讀取
csv格式讀取外部文件總結(jié):
??? 1- 復(fù)雜API和簡寫API都須掌握
??? 2- 相關(guān)參數(shù)作用說明:
??????? 2.1- path:指定讀取的文件路徑。支持HDFS和本地文件路徑
??????? 2.2- schema:手動指定元數(shù)據(jù)信息
??????? 2.3- sep:指定字段間的分隔符
??????? 2.4- encoding:指定文件的編碼方式
??????? 2.5- header:指定文件中的第一行是否是字段名稱
??????? 2.6- inferSchema:根據(jù)數(shù)據(jù)內(nèi)容自動推斷數(shù)據(jù)類型。但是,推斷結(jié)果可能不精確文章來源:http://www.zghlxwxcb.cn/news/detail-792126.html
# 導(dǎo)包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# 綁定指定的python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 創(chuàng)建main函數(shù)
if __name__ == '__main__':
# json方式讀取
print('csv方式讀取外部文件')
# 創(chuàng)建sparksession對象
spark = SparkSession.builder.appName('csv_demo').master('local[*]').getOrCreate()
# 復(fù)雜API方式
# 數(shù)據(jù)輸入
init_df = spark.read \
.format('csv') \
.option('sep', ' ') \
.option('encoding', 'utf8') \
.option('header', 'True') \
.schema(schema='id int,name string,address string,sex string,age int') \
.load('file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt')
# 數(shù)據(jù)輸出
init_df.show()
# 輸出schema
init_df.printSchema()
print('='*50)
#簡寫API方式
init_df = spark.read.csv(
path='file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt',
schema='id int,name string,address string,sex string,age int',
sep=' ',
encoding='utf8',
header=True
)
init_df.show()
# 輸出schema
init_df.printSchema()
JSON方式讀取
json讀取數(shù)據(jù)總結(jié):
???? 1- 需要手動指定schema信息。如果手動指定的時候,字段名稱與json中的key名稱不一致,會解析不成功,以null值填充
???? 2- csv/json中schema的結(jié)構(gòu),如果是字符串類型,那么字段名稱和字段數(shù)據(jù)類型間,只能以空格分隔文章來源地址http://www.zghlxwxcb.cn/news/detail-792126.html
# 導(dǎo)包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# 綁定指定的python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 創(chuàng)建main函數(shù)
if __name__ == '__main__':
# json方式讀取
print('json方式讀取外部文件')
# 創(chuàng)建sparksession對象
spark = SparkSession.builder.appName('json_demo').master('local[*]').getOrCreate()
# 復(fù)雜API方式
# 數(shù)據(jù)輸入
init_df = spark.read \
.format('json') \
.option('sep', ':') \
.option('header', 'True') \
.option('encoding', 'utf8') \
.schema(schema='id int,name string,age int,address string') \
.load('file:///export/data/pyspark_projects/02_spark_sql/data/json.txt')
# 數(shù)據(jù)輸出
init_df.show()
# 輸出schema
init_df.printSchema()
print('=' * 50)
# # 簡寫API方式
init_df = spark.read.json(
path='file:///export/data/pyspark_projects/02_spark_sql/data/json.txt',
schema='id int,name string,age int,address string',
encoding='utf8'
)
init_df.show()
# 輸出schema
init_df.printSchema()
# 釋放資源
spark.stop()
到了這里,關(guān)于PySpark-Spark SQL基本介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!