1 整合原理及使用
Apache Spark 是一個(gè)快速、可擴(kuò)展的分布式計(jì)算引擎,而 Hive 則是一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具,它提供了數(shù)據(jù)存儲(chǔ)和查詢(xún)功能。在 Spark 中使用 Hive 可以提高數(shù)據(jù)處理和查詢(xún)的效率。
場(chǎng)景
歷史原因積累下來(lái)的,很多數(shù)據(jù)原先是采用Hive來(lái)進(jìn)行處理的,現(xiàn)想改用Spark操作數(shù)據(jù),須要求Spark能夠無(wú)縫對(duì)接已有的Hive的數(shù)據(jù),實(shí)現(xiàn)平滑過(guò)渡。
MetaStore
Hive底層的元數(shù)據(jù)信息是存儲(chǔ)在MySQL中,$HIVE_HOME/conf/hive-site.xml
Spark若能直接訪(fǎng)問(wèn)MySQL中已有的元數(shù)據(jù)信息 $SPARK_HOME/conf/hive-site.xml
前置條件
在使用 Spark 整合 Hive 之前,需要安裝配置以下軟件:
- Hadoop:用于數(shù)據(jù)存儲(chǔ)和分布式計(jì)算。
- Hive:用于數(shù)據(jù)存儲(chǔ)和查詢(xún)。
- Spark:用于分布式計(jì)算。
整合 Hive
在 Spark 中使用 Hive,需要將 Hive 的依賴(lài)庫(kù)添加到 Spark 的類(lèi)路徑中。在 Java 代碼中,可以使用 SparkConf 對(duì)象來(lái)設(shè)置 Spark 應(yīng)用程序的配置。下面是一個(gè)示例代碼:
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class SparkHiveIntegration {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("SparkHiveIntegration")
.setMaster("local[*]")
.set("spark.sql.warehouse.dir", "/user/hive/warehouse");
SparkSession spark = SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate();
spark.sql("SELECT * FROM mytable").show();
spark.stop();
}
}
在上面的代碼中,首先創(chuàng)建了一個(gè) SparkConf 對(duì)象,設(shè)置了應(yīng)用程序的名稱(chēng)、運(yùn)行模式以及 Hive 的元數(shù)據(jù)存儲(chǔ)路徑。然后,創(chuàng)建了一個(gè) SparkSession 對(duì)象,啟用了 Hive 支持。最后,使用 Spark SQL 查詢(xún)語(yǔ)句查詢(xún)了一個(gè)名為 mytable 的 Hive 表,并將結(jié)果打印出來(lái)。最后,停止了 SparkSession 對(duì)象。
需要注意的是,Spark SQL 語(yǔ)法與 Hive SQL 語(yǔ)法略有不同,可以參考 Spark SQL 官方文檔。
2 ThiriftServer使用
javaedge@JavaEdgedeMac-mini sbin % pwd
/Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/sbin
javaedge@JavaEdgedeMac-mini sbin % ./start-thriftserver.sh --master local --jars /Users/javaedge/.m2/repository/mysql/mysql-connector-java/8.0.15/mysql-connector-java-8.0.15.jar
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/logs/spark-javaedge-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-JavaEdgedeMac-mini.local.out
beeline
內(nèi)置了一個(gè)客戶(hù)端工具:
javaedge@JavaEdgedeMac-mini bin % ./beeline -u jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.4.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000>
當(dāng)你執(zhí)行一條命令后:
就能在 Web UI 看到該命令記錄:
3 通過(guò)代碼訪(fǎng)問(wèn)數(shù)據(jù)
總是手敲命令行肯定太慢了,我們更多是代碼訪(fǎng)問(wèn):
package com.javaedge.bigdata.chapter06
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object JDBCClientApp {
def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn: Connection = DriverManager.getConnection("jdbc:hive2://localhost:10000")
val pstmt: PreparedStatement = conn.prepareStatement("show tables")
val rs: ResultSet = pstmt.executeQuery()
while (rs.next()) {
println(rs.getObject(1) + " : " + rs.getObject(2))
}
}
}
最后打成 jar 包,扔到服務(wù)器定時(shí)運(yùn)行即可執(zhí)行作業(yè)啦。
ThiriftServer V.S Spark Application 例行作業(yè)
Thrift Server 獨(dú)立的服務(wù)器應(yīng)用程序,它允許多個(gè)客戶(hù)端通過(guò)網(wǎng)絡(luò)協(xié)議訪(fǎng)問(wèn)其上運(yùn)行的 Thrift 服務(wù)。Thrift 服務(wù)通常是由一組 Thrift 定義文件定義的,這些文件描述了可以從客戶(hù)端發(fā)送到服務(wù)器的請(qǐng)求和響應(yīng)消息的數(shù)據(jù)結(jié)構(gòu)和協(xié)議。Thrift Server 可以使用各種編程語(yǔ)言進(jìn)行開(kāi)發(fā),包括 Java、C++、Python 等,并支持多種傳輸和序列化格式,例如 TSocket、TFramedTransport、TBinaryProtocol 等。使用 Thrift Server,您可以輕松地創(chuàng)建高性能、可伸縮和跨平臺(tái)的分布式應(yīng)用程序。
Spark Application,基于 Apache Spark 的應(yīng)用程序,它使用 Spark 編寫(xiě)的 API 和庫(kù)來(lái)處理大規(guī)模數(shù)據(jù)集。Spark Application 可以部署在本地計(jì)算機(jī)或云環(huán)境中,并且支持各種數(shù)據(jù)源和格式,如 Hadoop 分布式文件系統(tǒng)(HDFS)、Apache Cassandra、Apache Kafka 等。Spark Application 可以并行處理數(shù)據(jù)集,以加快數(shù)據(jù)處理速度,并提供了廣泛的機(jī)器學(xué)習(xí)算法和圖形處理功能。使用 Spark Application,您可以輕松地處理海量數(shù)據(jù),提取有價(jià)值的信息和洞察,并幫助您做出更明智的業(yè)務(wù)決策。
因此,Thrift Server 和 Spark Application 適用不同的場(chǎng)景和應(yīng)用程序:
- 需要?jiǎng)?chuàng)建一個(gè)分布式服務(wù)并為多個(gè)客戶(hù)端提供接口,使用 Thrift Server
- 需要處理大規(guī)模數(shù)據(jù)集并使用分布式計(jì)算和機(jī)器學(xué)習(xí)算法來(lái)分析數(shù)據(jù),使用 Spark Application
4 Spark 代碼訪(fǎng)問(wèn) Hive 數(shù)據(jù)
5 Spark SQL 函數(shù)實(shí)戰(zhàn)
parallelize
SparkContext 一個(gè)方法,將一個(gè)本地?cái)?shù)據(jù)集轉(zhuǎn)為RDD。
parallelize` 方法接受一個(gè)集合作為輸入?yún)?shù),并根據(jù)指定的并行度創(chuàng)建一個(gè)新的 RDD。
語(yǔ)法:
// data表示要轉(zhuǎn)換為 RDD 的本地集合
// numSlices表示 RDD 的分區(qū)數(shù),通常等于集群中可用的 CPU 核心數(shù)量。
val rdd =
sc.parallelize(data, numSlices)
將一個(gè)包含整數(shù)值的本地?cái)?shù)組轉(zhuǎn)換為RDD:
import org.apache.spark.{SparkConf, SparkContext}
// 創(chuàng)建 SparkConf 對(duì)象
val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local[*]")
// 創(chuàng)建 SparkContext 對(duì)象
val sc = new SparkContext(conf)
// 定義本地序列
val data = Seq(1, 2, 3, 4, 5)
// 使用 parallelize 方法創(chuàng)建 RDD
val rdd = sc.parallelize(data)
// 執(zhí)行轉(zhuǎn)換操作
val result = rdd.map(_ * 2)
// 顯示輸出結(jié)果
result.foreach(println)
創(chuàng)建了一個(gè)包含整數(shù)值的本地序列 data
,然后使用 parallelize
方法將其轉(zhuǎn)換為一個(gè) RDD。接下來(lái),我們對(duì) RDD 進(jìn)行轉(zhuǎn)換操作,并打印輸出結(jié)果。
使用 parallelize
方法時(shí),請(qǐng)確保正確配置 Spark 應(yīng)用程序,并設(shè)置正確 CPU 核心數(shù)量和內(nèi)存大小。否則,可能會(huì)導(dǎo)致應(yīng)用程序性能下降或崩潰。
5.1 內(nèi)置函數(shù)
都在這:
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-413930.html
統(tǒng)計(jì) PV、UV 實(shí)例
package com.javaedge.bigdata.chapter06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 內(nèi)置函數(shù)
*/
object BuiltFunctionApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local").appName("HiveSourceApp")
.getOrCreate()
// day userid
val userAccessLog = Array(
"2016-10-01,1122",
"2016-10-01,1122",
"2016-10-01,1123",
"2016-10-01,1124",
"2016-10-01,1124",
"2016-10-02,1122",
"2016-10-02,1121",
"2016-10-02,1123",
"2016-10-02,1123"
)
import spark.implicits._
// Array ==> RDD
val userAccessRDD: RDD[String] = spark.sparkContext.parallelize(userAccessLog)
val userAccessDF: DataFrame = userAccessRDD.map(x => {
val splits: Array[String] = x.split(",")
Log(splits(0), splits(1).toInt)
}).toDF
userAccessDF.show()
import org.apache.spark.sql.functions._
// select day, count(user_id) from xxx group by day;
userAccessDF.groupBy("day").agg(count("userId").as("pv")).show()
userAccessDF.groupBy("day").agg(countDistinct("userId").as("uv")).show()
spark.stop()
}
private case class Log(day: String, userId: Int)
}
5.2 自定義函數(shù)
package com.javaedge.bigdata.chapter06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 統(tǒng)計(jì)每個(gè)人愛(ài)好的個(gè)數(shù)
* pk:3
* jepson: 2
*
*
* 1)定義函數(shù)
* 2)注冊(cè)函數(shù)
* 3)使用函數(shù)
*/
object UDFFunctionApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local").appName("HiveSourceApp")
.getOrCreate()
import spark.implicits._
val infoRDD: RDD[String] = spark.sparkContext.textFile(
"/Users/javaedge/Downloads/sparksql-train/data/hobbies.txt")
val infoDF: DataFrame = infoRDD.map(_.split("###")).map(x => {
Hobbies(x(0), x(1))
}).toDF
infoDF.show(false)
// TODO... 定義函數(shù) 和 注冊(cè)函數(shù)
spark.udf.register("hobby_num", (s: String) => s.split(",").size)
infoDF.createOrReplaceTempView("hobbies")
//TODO... 函數(shù)的使用
spark.sql("select name, hobbies, hobby_num(hobbies) as hobby_count from hobbies").show(false)
// select name, hobby_num(hobbies) from xxx
spark.stop()
}
private case class Hobbies(name: String, hobbies: String)
}
output:
+------+----------------------+
|name |hobbies |
+------+----------------------+
|pk |jogging,Coding,cooking|
|jepson|travel,dance |
+------+----------------------+
+------+----------------------+-----------+
|name |hobbies |hobby_count|
+------+----------------------+-----------+
|pk |jogging,Coding,cooking|3 |
|jepson|travel,dance |2 |
+------+----------------------+-----------+
6 總結(jié)
通過(guò)上述示例代碼,可以看到如何在 Java 中使用 Spark 整合 Hive。通過(guò)使用 Hive 的數(shù)據(jù)存儲(chǔ)和查詢(xún)功能,可以在 Spark 中高效地處理和分析數(shù)據(jù)。當(dāng)然,還有許多其他功能和配置可以使用,例如設(shè)置 Spark 應(yīng)用程序的資源分配、數(shù)據(jù)分區(qū)、數(shù)據(jù)格式轉(zhuǎn)換等等。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-413930.html
到了這里,關(guān)于Spark SQL實(shí)戰(zhàn)(08)-整合Hive的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!