国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark SQL實(shí)戰(zhàn)(08)-整合Hive

這篇具有很好參考價(jià)值的文章主要介紹了Spark SQL實(shí)戰(zhàn)(08)-整合Hive。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1 整合原理及使用

Apache Spark 是一個(gè)快速、可擴(kuò)展的分布式計(jì)算引擎,而 Hive 則是一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具,它提供了數(shù)據(jù)存儲(chǔ)和查詢(xún)功能。在 Spark 中使用 Hive 可以提高數(shù)據(jù)處理和查詢(xún)的效率。

場(chǎng)景

歷史原因積累下來(lái)的,很多數(shù)據(jù)原先是采用Hive來(lái)進(jìn)行處理的,現(xiàn)想改用Spark操作數(shù)據(jù),須要求Spark能夠無(wú)縫對(duì)接已有的Hive的數(shù)據(jù),實(shí)現(xiàn)平滑過(guò)渡。

MetaStore
Hive底層的元數(shù)據(jù)信息是存儲(chǔ)在MySQL中,$HIVE_HOME/conf/hive-site.xml

Spark若能直接訪(fǎng)問(wèn)MySQL中已有的元數(shù)據(jù)信息 $SPARK_HOME/conf/hive-site.xml

前置條件

在使用 Spark 整合 Hive 之前,需要安裝配置以下軟件:

  • Hadoop:用于數(shù)據(jù)存儲(chǔ)和分布式計(jì)算。
  • Hive:用于數(shù)據(jù)存儲(chǔ)和查詢(xún)。
  • Spark:用于分布式計(jì)算。

整合 Hive

在 Spark 中使用 Hive,需要將 Hive 的依賴(lài)庫(kù)添加到 Spark 的類(lèi)路徑中。在 Java 代碼中,可以使用 SparkConf 對(duì)象來(lái)設(shè)置 Spark 應(yīng)用程序的配置。下面是一個(gè)示例代碼:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

public class SparkHiveIntegration {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SparkHiveIntegration")
                .setMaster("local[*]")
                .set("spark.sql.warehouse.dir", "/user/hive/warehouse");
        SparkSession spark = SparkSession.builder()
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();
        spark.sql("SELECT * FROM mytable").show();
        spark.stop();
    }
}

在上面的代碼中,首先創(chuàng)建了一個(gè) SparkConf 對(duì)象,設(shè)置了應(yīng)用程序的名稱(chēng)、運(yùn)行模式以及 Hive 的元數(shù)據(jù)存儲(chǔ)路徑。然后,創(chuàng)建了一個(gè) SparkSession 對(duì)象,啟用了 Hive 支持。最后,使用 Spark SQL 查詢(xún)語(yǔ)句查詢(xún)了一個(gè)名為 mytable 的 Hive 表,并將結(jié)果打印出來(lái)。最后,停止了 SparkSession 對(duì)象。

需要注意的是,Spark SQL 語(yǔ)法與 Hive SQL 語(yǔ)法略有不同,可以參考 Spark SQL 官方文檔。

2 ThiriftServer使用

javaedge@JavaEdgedeMac-mini sbin % pwd
/Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/sbin

javaedge@JavaEdgedeMac-mini sbin % ./start-thriftserver.sh --master local --jars /Users/javaedge/.m2/repository/mysql/mysql-connector-java/8.0.15/mysql-connector-java-8.0.15.jar

starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/logs/spark-javaedge-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-JavaEdgedeMac-mini.local.out

Spark SQL實(shí)戰(zhàn)(08)-整合Hive

beeline

內(nèi)置了一個(gè)客戶(hù)端工具:

javaedge@JavaEdgedeMac-mini bin % ./beeline -u jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.4.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000>

當(dāng)你執(zhí)行一條命令后:

Spark SQL實(shí)戰(zhàn)(08)-整合Hive

就能在 Web UI 看到該命令記錄:

Spark SQL實(shí)戰(zhàn)(08)-整合Hive

3 通過(guò)代碼訪(fǎng)問(wèn)數(shù)據(jù)

總是手敲命令行肯定太慢了,我們更多是代碼訪(fǎng)問(wèn):

package com.javaedge.bigdata.chapter06

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object JDBCClientApp {

  def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn: Connection = DriverManager.getConnection("jdbc:hive2://localhost:10000")
    val pstmt: PreparedStatement = conn.prepareStatement("show tables")

    val rs: ResultSet = pstmt.executeQuery()

    while (rs.next()) {
      println(rs.getObject(1) + " : " + rs.getObject(2))
    }
  }
}

最后打成 jar 包,扔到服務(wù)器定時(shí)運(yùn)行即可執(zhí)行作業(yè)啦。

ThiriftServer V.S Spark Application 例行作業(yè)

Thrift Server 獨(dú)立的服務(wù)器應(yīng)用程序,它允許多個(gè)客戶(hù)端通過(guò)網(wǎng)絡(luò)協(xié)議訪(fǎng)問(wèn)其上運(yùn)行的 Thrift 服務(wù)。Thrift 服務(wù)通常是由一組 Thrift 定義文件定義的,這些文件描述了可以從客戶(hù)端發(fā)送到服務(wù)器的請(qǐng)求和響應(yīng)消息的數(shù)據(jù)結(jié)構(gòu)和協(xié)議。Thrift Server 可以使用各種編程語(yǔ)言進(jìn)行開(kāi)發(fā),包括 Java、C++、Python 等,并支持多種傳輸和序列化格式,例如 TSocket、TFramedTransport、TBinaryProtocol 等。使用 Thrift Server,您可以輕松地創(chuàng)建高性能、可伸縮和跨平臺(tái)的分布式應(yīng)用程序。

Spark Application,基于 Apache Spark 的應(yīng)用程序,它使用 Spark 編寫(xiě)的 API 和庫(kù)來(lái)處理大規(guī)模數(shù)據(jù)集。Spark Application 可以部署在本地計(jì)算機(jī)或云環(huán)境中,并且支持各種數(shù)據(jù)源和格式,如 Hadoop 分布式文件系統(tǒng)(HDFS)、Apache Cassandra、Apache Kafka 等。Spark Application 可以并行處理數(shù)據(jù)集,以加快數(shù)據(jù)處理速度,并提供了廣泛的機(jī)器學(xué)習(xí)算法和圖形處理功能。使用 Spark Application,您可以輕松地處理海量數(shù)據(jù),提取有價(jià)值的信息和洞察,并幫助您做出更明智的業(yè)務(wù)決策。

因此,Thrift Server 和 Spark Application 適用不同的場(chǎng)景和應(yīng)用程序:

  • 需要?jiǎng)?chuàng)建一個(gè)分布式服務(wù)并為多個(gè)客戶(hù)端提供接口,使用 Thrift Server
  • 需要處理大規(guī)模數(shù)據(jù)集并使用分布式計(jì)算和機(jī)器學(xué)習(xí)算法來(lái)分析數(shù)據(jù),使用 Spark Application

4 Spark 代碼訪(fǎng)問(wèn) Hive 數(shù)據(jù)

5 Spark SQL 函數(shù)實(shí)戰(zhàn)

parallelize

SparkContext 一個(gè)方法,將一個(gè)本地?cái)?shù)據(jù)集轉(zhuǎn)為RDD。parallelize` 方法接受一個(gè)集合作為輸入?yún)?shù),并根據(jù)指定的并行度創(chuàng)建一個(gè)新的 RDD。

語(yǔ)法:

// data表示要轉(zhuǎn)換為 RDD 的本地集合
// numSlices表示 RDD 的分區(qū)數(shù),通常等于集群中可用的 CPU 核心數(shù)量。 
val rdd = 
sc.parallelize(data, numSlices)

將一個(gè)包含整數(shù)值的本地?cái)?shù)組轉(zhuǎn)換為RDD:

import org.apache.spark.{SparkConf, SparkContext}

// 創(chuàng)建 SparkConf 對(duì)象
val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local[*]")

// 創(chuàng)建 SparkContext 對(duì)象
val sc = new SparkContext(conf)

// 定義本地序列
val data = Seq(1, 2, 3, 4, 5)

// 使用 parallelize 方法創(chuàng)建 RDD
val rdd = sc.parallelize(data)

// 執(zhí)行轉(zhuǎn)換操作
val result = rdd.map(_ * 2)

// 顯示輸出結(jié)果
result.foreach(println)

創(chuàng)建了一個(gè)包含整數(shù)值的本地序列 data,然后使用 parallelize 方法將其轉(zhuǎn)換為一個(gè) RDD。接下來(lái),我們對(duì) RDD 進(jìn)行轉(zhuǎn)換操作,并打印輸出結(jié)果。

使用 parallelize 方法時(shí),請(qǐng)確保正確配置 Spark 應(yīng)用程序,并設(shè)置正確 CPU 核心數(shù)量和內(nèi)存大小。否則,可能會(huì)導(dǎo)致應(yīng)用程序性能下降或崩潰。

5.1 內(nèi)置函數(shù)

都在這:

Spark SQL實(shí)戰(zhàn)(08)-整合Hive

統(tǒng)計(jì) PV、UV 實(shí)例
package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 內(nèi)置函數(shù)
 */
object BuiltFunctionApp {

  def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      .master("local").appName("HiveSourceApp")
      .getOrCreate()

    // day  userid
    val userAccessLog = Array(
      "2016-10-01,1122",
      "2016-10-01,1122",
      "2016-10-01,1123",
      "2016-10-01,1124",
      "2016-10-01,1124",
      "2016-10-02,1122",
      "2016-10-02,1121",
      "2016-10-02,1123",
      "2016-10-02,1123"
    )

    import spark.implicits._

    // Array ==> RDD
    val userAccessRDD: RDD[String] = spark.sparkContext.parallelize(userAccessLog)

    val userAccessDF: DataFrame = userAccessRDD.map(x => {
      val splits: Array[String] = x.split(",")
      Log(splits(0), splits(1).toInt)
    }).toDF

    userAccessDF.show()

    import org.apache.spark.sql.functions._

    // select day, count(user_id) from xxx group by day;
    userAccessDF.groupBy("day").agg(count("userId").as("pv")).show()

    userAccessDF.groupBy("day").agg(countDistinct("userId").as("uv")).show()
    spark.stop()
  }

  private case class Log(day: String, userId: Int)
}

5.2 自定義函數(shù)

package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


/**
 * 統(tǒng)計(jì)每個(gè)人愛(ài)好的個(gè)數(shù)
 * pk:3
 * jepson: 2
 *
 *
 * 1)定義函數(shù)
 * 2)注冊(cè)函數(shù)
 * 3)使用函數(shù)
 */
object UDFFunctionApp {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local").appName("HiveSourceApp")
      .getOrCreate()


    import spark.implicits._

    val infoRDD: RDD[String] = spark.sparkContext.textFile(
      "/Users/javaedge/Downloads/sparksql-train/data/hobbies.txt")
    val infoDF: DataFrame = infoRDD.map(_.split("###")).map(x => {
      Hobbies(x(0), x(1))
    }).toDF

    infoDF.show(false)

    // TODO... 定義函數(shù) 和 注冊(cè)函數(shù)
    spark.udf.register("hobby_num", (s: String) => s.split(",").size)

    infoDF.createOrReplaceTempView("hobbies")

    //TODO... 函數(shù)的使用
    spark.sql("select name, hobbies, hobby_num(hobbies) as hobby_count from hobbies").show(false)

    // select name, hobby_num(hobbies) from xxx

    spark.stop()
  }

  private case class Hobbies(name: String, hobbies: String)
}

output:
+------+----------------------+
|name  |hobbies               |
+------+----------------------+
|pk    |jogging,Coding,cooking|
|jepson|travel,dance          |
+------+----------------------+

+------+----------------------+-----------+
|name  |hobbies               |hobby_count|
+------+----------------------+-----------+
|pk    |jogging,Coding,cooking|3          |
|jepson|travel,dance          |2          |
+------+----------------------+-----------+

6 總結(jié)

通過(guò)上述示例代碼,可以看到如何在 Java 中使用 Spark 整合 Hive。通過(guò)使用 Hive 的數(shù)據(jù)存儲(chǔ)和查詢(xún)功能,可以在 Spark 中高效地處理和分析數(shù)據(jù)。當(dāng)然,還有許多其他功能和配置可以使用,例如設(shè)置 Spark 應(yīng)用程序的資源分配、數(shù)據(jù)分區(qū)、數(shù)據(jù)格式轉(zhuǎn)換等等。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-413930.html

到了這里,關(guān)于Spark SQL實(shí)戰(zhàn)(08)-整合Hive的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀(guān)點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • superset連接Apache Spark SQL(hive)過(guò)程中的各種報(bào)錯(cuò)解決

    superset連接Apache Spark SQL(hive)過(guò)程中的各種報(bào)錯(cuò)解決

    我的博客原文:superset連接Apache Spark SQL(hive)過(guò)程中的各種報(bào)錯(cuò)解決 我們用的是Apache Spark SQL,所以首先需要安裝下pyhive Apache Spark SQL連接的格式 ?安裝包下載完成,可以測(cè)試是否可以連接hive了。 因?yàn)轵?qū)動(dòng)不匹配導(dǎo)致的,返回重新下載依賴(lài)包 連接數(shù)據(jù)庫(kù)的時(shí)候一直報(bào)無(wú)法連

    2024年04月14日
    瀏覽(24)
  • Apache Doris (二十八):Doris 數(shù)據(jù)導(dǎo)入(六)Spark Load 1- 原理及配置

    目錄 1.?基本原理 ?2. Spark集群搭建 2.1?Spark Standalone 集群搭建 2.2?Spark On Yarn 配置

    2024年02月16日
    瀏覽(22)
  • SparkBug解決:Type mismatch; found : org.apache.spark.sql.Column required: Double

    assginFlag 方法中的條件判斷條件? (index = 0 index 720) ?返回的是一個(gè)布爾值,需要返回一個(gè)Option[Int]類(lèi)型。將判斷條件改為? if (index = 0 index 720) Some(index) else None ?來(lái)返回一個(gè)Option[Int]

    2024年04月10日
    瀏覽(26)
  • Spark SQL調(diào)優(yōu)實(shí)戰(zhàn)

    Spark SQL調(diào)優(yōu)實(shí)戰(zhàn)

    1、 新添參數(shù)說(shuō)明 // D river 和Executor內(nèi)存和CPU資源相關(guān)配置 -- 是否開(kāi)啟 executor 動(dòng)態(tài)分配 , 開(kāi)啟時(shí) spark.executor.instances 不生效 spark.dynamicAllocation.enabled= false --配置Driver內(nèi)存 spark.dirver.memory=5g --driver最大結(jié)果大小,設(shè)置為0代表不限制,driver在拉取結(jié)果時(shí),如果結(jié)果超過(guò)閾值會(huì)報(bào)異

    2024年02月21日
    瀏覽(18)
  • ChatGPT實(shí)戰(zhàn)100例 - (08) 數(shù)據(jù)庫(kù)設(shè)計(jì)轉(zhuǎn)化為SQL并獲取ER圖

    ChatGPT實(shí)戰(zhàn)100例 - (08) 數(shù)據(jù)庫(kù)設(shè)計(jì)轉(zhuǎn)化為SQL并獲取ER圖

    在你還在手?jǐn)]SQL?ChatGPT笑暈在廁所 這篇博文中 針對(duì)經(jīng)典3表設(shè)計(jì): 學(xué)生表 S(SNO,SNAME,AGE,SEX),其屬性表示學(xué)生的學(xué)號(hào)、姓名、年齡和性別; 選課表 SC(SNO,CNO,GRADE),其屬性表示學(xué)生的學(xué)號(hào)、所學(xué)課程的課程號(hào)和成績(jī); 課程表 C(CNO,CNAME,TEACHER),其屬性表示課程

    2024年02月10日
    瀏覽(23)
  • Spark SQL實(shí)戰(zhàn)(07)-Data Sources

    Spark SQL實(shí)戰(zhàn)(07)-Data Sources

    Spark SQL通過(guò)DataFrame接口支持對(duì)多種數(shù)據(jù)源進(jìn)行操作。 DataFrame可使用關(guān)系型變換進(jìn)行操作,也可用于創(chuàng)建臨時(shí)視圖。將DataFrame注冊(cè)為臨時(shí)視圖可以讓你對(duì)其數(shù)據(jù)運(yùn)行SQL查詢(xún)。 本節(jié)介紹使用Spark數(shù)據(jù)源加載和保存數(shù)據(jù)的一般方法,并進(jìn)一步介紹可用于內(nèi)置數(shù)據(jù)源的特定選項(xiàng)。 數(shù)

    2023年04月08日
    瀏覽(22)
  • Exception in thread “main“ org.apache.spark.sql.AnalysisException: Cannot write incompatible data to

    Exception in thread “main“ org.apache.spark.sql.AnalysisException: Cannot write incompatible data to

    這個(gè)問(wèn)題發(fā)生在 Spark SQL 將數(shù)據(jù)遷移進(jìn) Hive 時(shí)會(huì)出現(xiàn)。 這是因?yàn)閺?Spark 3.0.0 開(kāi)始,Spark SQL 增加了一個(gè)安全策略,不對(duì)非同類(lèi)型的數(shù)據(jù)進(jìn)行強(qiáng)制轉(zhuǎn)換,然后就會(huì)出現(xiàn)這個(gè)錯(cuò)誤。 我們?cè)谠创a文件 SQLConf.scala 中發(fā)現(xiàn)有這樣一個(gè)配置 StoreAssignmentPolicy : 其中有三種策略: ANSI 策略(

    2024年02月13日
    瀏覽(25)
  • Spark SQL實(shí)戰(zhàn)(04)-API編程之DataFrame

    Spark SQL實(shí)戰(zhàn)(04)-API編程之DataFrame

    Spark Core: SparkContext Spark SQL: 難道就沒(méi)有SparkContext? 2.x之后統(tǒng)一的 1.x的Spark SQL編程入口點(diǎn) SQLContext HiveContext Spark SQL中,SQLContext、HiveContext都是用來(lái)創(chuàng)建DataFrame和Dataset主要入口點(diǎn),二者區(qū)別如下: 數(shù)據(jù)源支持:SQLContext支持的數(shù)據(jù)源包括JSON、Parquet、JDBC等等,而HiveContext除了支持

    2023年04月09日
    瀏覽(21)
  • Apache Spark 練習(xí)六:使用Spark分析音樂(lè)專(zhuān)輯數(shù)據(jù)

    本章所分析的數(shù)據(jù)來(lái)自于Kaggle公開(kāi)的、人工合成的音樂(lè)專(zhuān)輯發(fā)行數(shù)據(jù)(https://www.kaggle.com/datasets/revilrosa/music-label-dataset)。以下,我們只針對(duì)albums.csv文件進(jìn)行分析。該數(shù)據(jù)具體包括以下字段: id: the album identifier; artist_id: the artist identifier; album_title: the title of the album; genre: the

    2024年02月15日
    瀏覽(34)
  • ssm整合原理與實(shí)戰(zhàn)

    ssm整合原理與實(shí)戰(zhàn)

    在前面,已經(jīng)發(fā)布過(guò)Maven,Spring,mybatis,SpringMvc的文章了,在這里進(jìn)行ssm整合。 微觀(guān) :將學(xué)習(xí)的Spring SpringMVC Mybatis框架應(yīng)用到項(xiàng)目中! SpringMVC框架負(fù)責(zé)控制層 Spring 框架負(fù)責(zé)整體和業(yè)務(wù)層的聲明式事務(wù)管理 MyBatis框架負(fù)責(zé)數(shù)據(jù)庫(kù)訪(fǎng)問(wèn)層 宏觀(guān) :Spring接管一切(將框架核心組件交

    2024年02月05日
    瀏覽(29)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包