Spark SQL是Apache Spark的一個模塊,它提供了一種基于結(jié)構(gòu)化數(shù)據(jù)的編程接口。
Spark SQL支持結(jié)構(gòu)化數(shù)據(jù)的處理,包括數(shù)據(jù)的讀取、轉(zhuǎn)換和查詢。它可以將傳統(tǒng)的基于表和SQL的操作和Spark的分布式計算相結(jié)合,提供強大的數(shù)據(jù)處理和分析能力。
Spark SQL也可以與其他Spark組件集成,如MLlib和GraphX,以支持更廣泛的數(shù)據(jù)處理場景。
- 讀入數(shù)據(jù)
val spark: SparkSession = SparkSession.builder().master("local").appName("agent_log_df").getOrCreate()
val fileRDD: RDD[String] = spark.sparkContext.textFile("datas/agent.log")
val rowRDD: RDD[Row] = fileRDD.map(_.split(" ")).map(
line => Row(line(0), line(1).toInt, line(2).toInt)
)
- 創(chuàng)建表結(jié)構(gòu)
// 定義表結(jié)構(gòu)
val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,
StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
)
- 創(chuàng)建臨時表
df.createTempView("tmp_table")
- sql邏輯
val sql =
"""
|select t1,t2,t3
|from (
|select t1, sum(t2) as t2, sum(t3) as t3 from tmp_table group by t1
|) t
|order by t2 desc,t3 desc
|limit 10
|""".stripMargin
- sql執(zhí)行
val result: DataFrame = spark.sql(sql)
- 結(jié)果展示
result.show()
上述中有幾個關(guān)鍵的類和方法:
- sqlContext
Spark的SQLContext是負(fù)責(zé)Spark SQL操作的上下文對象,它提供了許多與SQL相關(guān)的功能,包括讀取和處理各種數(shù)據(jù)源中的數(shù)據(jù)、執(zhí)行SQL查詢、創(chuàng)建數(shù)據(jù)框架和表等等。
通過SQLContext,用戶可以使用DataFrame API來以結(jié)構(gòu)化和類型安全的方式處理數(shù)據(jù),并可以使用SQL語言和Spark SQL的內(nèi)置函數(shù)來進(jìn)行數(shù)據(jù)分析和查詢。
總體來說,Spark的SQLContext是非常強大和靈活的,可以適應(yīng)各種數(shù)據(jù)處理和分析需求,并且在處理大規(guī)模數(shù)據(jù)時具有出色的性能和擴(kuò)展性。
在使用Spark的SqlContext之前,需要首先初始化一個SparkContext對象并創(chuàng)建一個RDD。
使用SqlContext需要進(jìn)行以下步驟:
創(chuàng)建一個SparkConf對象,并設(shè)置一些參數(shù),如AppName和Master。
使用SparkConf對象創(chuàng)建一個SparkContext對象。
通過SparkContext對象創(chuàng)建一個SqlContext對象。
使用SqlContext對象加載數(shù)據(jù),并將其轉(zhuǎn)換為DataFrame類型。
以下是具體的代碼示例:
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
#創(chuàng)建SparkConf
conf = SparkConf().setAppName("sql_example").setMaster("local[*]")
#創(chuàng)建SparkContext
sc = SparkContext(conf=conf)
#創(chuàng)建SqlContext
sqlContext = SQLContext(sc)
#讀取數(shù)據(jù)文件
people = sc.textFile("people.txt")
#將數(shù)據(jù)轉(zhuǎn)換為一個DataFrame對象
people_df = sqlContext.createDataFrame(people.map(lambda row: row.split(",")), ["name", "age"])
在以上示例中,我們首先創(chuàng)建了一個SparkConf對象,并設(shè)置了AppName和Master屬性。然后使用SparkConf對象創(chuàng)建了一個SparkContext對象,并將其傳遞給SqlContext構(gòu)造函數(shù)。接著讀取了一個數(shù)據(jù)文件,并使用SqlContext對象將數(shù)據(jù)轉(zhuǎn)換成DataFrame對象。
注意:使用SqlContext時需要將數(shù)據(jù)轉(zhuǎn)換成DataFrame對象,而不是RDD對象。如果需要在SqlContext中使用RDD對象,可以將其轉(zhuǎn)換為DataFrame對象,再進(jìn)行操作。
- StructType
Spark的StructType是一種定義結(jié)構(gòu)化數(shù)據(jù)的數(shù)據(jù)類型。
它類似于SQL表的結(jié)構(gòu),每個StructType都由一組結(jié)構(gòu)字段組成,每個結(jié)構(gòu)字段都有一個名稱和數(shù)據(jù)類型。
使用StructType,用戶可以對結(jié)構(gòu)化數(shù)據(jù)進(jìn)行索引、查詢和分析。
StructType被廣泛應(yīng)用于Spark中的DataFrame API和SQL查詢中。
使用方法:
導(dǎo)入 Spark SQL 的相關(guān)包
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
定義 StructType
例如,假設(shè)要定義一個包含 name 和 age 兩個字段的 StructType。則可以按照以下方式定義:
val schema = StructType(
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil
)
// 其中,StructType 用于表示整個數(shù)據(jù)結(jié)構(gòu),StructField 用于表示每個字段的信息,StringType 用于表示字段類型為字符串類,IntegerType 用于表示字段類型為整數(shù)。
// 使用 StructType
// 在創(chuàng)建 DataFrame 時,可以通過傳遞定義好的 StructType 對象來指定 DataFrame 的結(jié)構(gòu)。例如:
val data = spark.sparkContext.parallelize(Seq(("John", 25), ("Mary", 30), ("Jack", 22)))
val df = spark.createDataFrame(data).toDF("name", "age")
df.printSchema()
df.show()
輸出結(jié)果為:文章來源:http://www.zghlxwxcb.cn/news/detail-611258.html
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+----+---+
|name|age|
+----+---+
|John| 25|
|Mary| 30|
|Jack| 22|
+----+---+
栗子
給一個日志文件,過濾出兩張表,然后設(shè)計表結(jié)構(gòu),使用Spark SQL實現(xiàn)兩張表的連接文章來源地址http://www.zghlxwxcb.cn/news/detail-611258.html
object spark_sql_code_1 {
def main(args: Array[String]): Unit = {
// TODO 1: 創(chuàng)建spark環(huán)境
val spark: SparkSession = SparkSession.builder().master("local").appName("spark sql code").getOrCreate()
// TODO 2: 讀取數(shù)據(jù)
val rowRDD: RDD[Row] = spark.sparkContext.textFile("datas/agent.log")
.map(line => {
val words: Array[String] = line.split(" ");
Row(words(1), words(2).toInt, words(3).toInt)
})
rowRDD.persist()
val tableRDD1: RDD[Row] = rowRDD.filter(row => {
row.getInt(1) % 2 == 0
})
val tableRDD2: RDD[Row] = rowRDD.filter(row => {
row.getInt(2) % 2 == 0
})
// TODO 3: 創(chuàng)建表結(jié)構(gòu)和臨時表
// 定義表結(jié)構(gòu)
val df1: DataFrame = spark.sqlContext.createDataFrame(tableRDD1,
StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
)
df1.createTempView("t")
val df2: DataFrame = spark.sqlContext.createDataFrame(tableRDD2,
StructType(Seq(StructField("r1", StringType), StructField("r2", IntegerType), StructField("r3", IntegerType)))
)
df2.createTempView("r")
// TODO 4: sql邏輯
val sql: String =
"""
|select r1 as t1, r2 as t2, r3 as t3, 'r' as tp from r
|""".stripMargin
// TODO 5: 執(zhí)行sql
val result: DataFrame = spark.sql(sql)
// TODO 6: 結(jié)果顯示
result.show()
// TODO 7: 關(guān)閉spark環(huán)境
spark.stop()
}
到了這里,關(guān)于【Spark】Spark SQL基礎(chǔ)使用詳解和案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!