系列文章目錄
spark第一章:環(huán)境安裝
spark第二章:sparkcore實(shí)例
spark第三章:工程化代碼
spark第四章:SparkSQL基本操作
前言
接下來(lái)我們學(xué)習(xí)SparkSQL他和Hql有些相似。Hql是將操作裝換成MR,SparkSQL也是,不過(guò)是使用Spark引擎來(lái)操作,效率更高一些
一、添加pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.2.3</version>
以上是這次博客需要的所有依賴(lài),一次性全加上。
二、常用操作
一共這么多,挨個(gè)講解一下
1.類(lèi)型轉(zhuǎn)換
SparkSQL中有三種常用的類(lèi)型,RDD之前說(shuō)過(guò)就不說(shuō)了。
DataFrame
Spark SQL 的 DataFrame API 允許我們使用 DataFrame 而不用必須去注冊(cè)臨時(shí)表或者生成 SQL 表達(dá)式。DataFrame API 既有 transformation 操作也有 action 操作。
DSL 語(yǔ)法
DataFrame 提供一個(gè)特定領(lǐng)域語(yǔ)言(domain-specific language, DSL)去管理結(jié)構(gòu)化的數(shù)據(jù)??梢栽?Scala, Java, Python 和 R 中使用 DSL,使用 DSL 語(yǔ)法風(fēng)格不必去創(chuàng)建臨時(shí)視圖了
SparkSql_Basic.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SparkSql_Basic {
def main(args: Array[String]): Unit = {
// 創(chuàng)建SparkSQL的運(yùn)行環(huán)境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
// val df: DataFrame = spark.read.json("datas/user.json")
// df.show()
//DataFrame => SQL
// df.createOrReplaceTempView("user")
// spark.sql("select age from user").show()
//DtaFrame => DSL
// 在使用DataFrame時(shí),如何涉及到轉(zhuǎn)換操作,需要引入轉(zhuǎn)換規(guī)則
// df.select("age","username").show()
// df.select($"age"+1).show()
// df.select('age+1).show()
// DataSet
// DataFrame 是特定泛型的DataSet
// val seq: Seq[Int] = Seq(1, 2, 3, 4)
// val ds: Dataset[Int] = seq.toDS()
// ds.show()
// RDD <=>DataFrame
val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
val df: DataFrame = rdd.toDF("id", "name", "age")
val rowRDD: RDD[Row] = df.rdd
// DataFrame <=> DatsSet
val ds: Dataset[User] = df.as[User]
val df1: DataFrame = ds.toDF()
// RDD <=> DataSet
val ds1: Dataset[User] = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}.toDS()
val userRDD: RDD[User] = ds1.rdd
// 關(guān)閉環(huán)境
spark.close()
}
case class User(id:Int,name:String,age:Int)
}
2.連接mysql
SparkSQL提供了多種數(shù)據(jù)接口,我們可以通過(guò)JDBC連接Mysql數(shù)據(jù)庫(kù),我們先隨便在數(shù)據(jù)庫(kù)里邊寫(xiě)點(diǎn)東西。
SparkSql_JDBC.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkSql_JDBC {
def main(args: Array[String]): Unit = {
// 創(chuàng)建SparkSQL的運(yùn)行環(huán)境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "user")
.option("useSSL","false")
.load()
df.show
df.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "user1")
.option("useSSL","false")
.mode(SaveMode.Append)
.save()
// 關(guān)閉環(huán)境
spark.close()
}
}
3.UDF函數(shù)
這個(gè)函數(shù)可以對(duì)簡(jiǎn)單的數(shù)據(jù)進(jìn)行處理,但是比較局限.
這次我們從json文件讀取數(shù)據(jù)
{"username": "zhangsan", "age": 20}
{"username": "lisi", "age": 30}
{"username": "wangwu", "age": 40}
SparkSql_UDF.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SparkSql_UDF {
def main(args: Array[String]): Unit = {
// 創(chuàng)建SparkSQL的運(yùn)行環(huán)境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register("prefixName",(name:String)=>{
"Name:" + name
})
spark.sql("select age ,prefixName(username) from user").show()
// 關(guān)閉環(huán)境
spark.close()
}
}
4.UDAF函數(shù)
UDAF函數(shù)的處理能力就比UDF強(qiáng)大多了,可以完成一些更復(fù)雜的操作.
SparkSql_UDAF1.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession, functions}
object SparkSql_UDAF1 {
def main(args: Array[String]): Unit = {
// 創(chuàng)建SparkSQL的運(yùn)行環(huán)境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
//計(jì)算平均年齡
spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))
spark.sql("select ageAvg(age) from user").show()
// 關(guān)閉環(huán)境
spark.close()
}
case class Buff( var total:Long,var count:Long)
class MyAvgUDAF extends Aggregator[Long,Buff,Long]{
//初始值
override def zero: Buff = {
Buff(0L,0L)
}
//更新緩沖區(qū)
override def reduce(buff: Buff, in: Long): Buff = {
buff.total=buff.total+in
buff.count=buff.count+1
buff
}
//合并緩沖區(qū)
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.total=buff1.total+buff2.total
buff1.count=buff1.count+buff2.count
buff1
}
//計(jì)算結(jié)果
override def finish(buff: Buff): Long = {
buff.total/buff.count
}
//緩沖區(qū)編碼操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
//輸出的編碼操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
還有一種方法,在Spark3已經(jīng)不被官方推薦了,所以這里就不敘述了.
5.連接hive
首先我們?cè)诩合?啟動(dòng)Hadoop和Hive
然后將jdbc的jar包放到hive的lib文件中
這個(gè)jar包在安裝Hive環(huán)境時(shí),使用過(guò).
將虛擬機(jī)中的hive配置文件,hive-site.xml導(dǎo)出
放到idea的resource文件夾中,然后最好吧target文件夾刪除,因?yàn)閕dea有可能從target中直接讀取之前的數(shù)據(jù),從而沒(méi)有掃描hive-site.xml
我們就做最簡(jiǎn)單的查詢(xún)操作
SparkSql_Hive.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkSql_Hive {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("show tables").show
// 關(guān)閉環(huán)境
spark.close()
}
}
如果能查詢(xún)hive中的數(shù)據(jù)庫(kù),代表成功.文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-465018.html
總結(jié)
SparkSQL的常用操作基本就這些,至于項(xiàng)目嗎,下次專(zhuān)門(mén)在寫(xiě)一次吧文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-465018.html
到了這里,關(guān)于spark第四章:SparkSQL基本操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!