国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

  • CDH 6.2.0 搭建的環(huán)境,并不能直接使用 spark 相關(guān)資源,需要對此服務端環(huán)境進行一些修改
  • Spark 目前僅支持 JDK1.8, Java項目運行環(huán)境只能使用JDK 1.8
  • 我這里使用的是 CDH6.2.0集群,因此使用的依賴為CDH專用依賴,需要先添加倉庫
  • spark 使用scala 語言編寫,因此項目中使用的scala依賴版本要和cdh中的 scala 版本一致
  • 因為需要將計算結(jié)果寫入到MySQL,所以當前項目中需要加入MySQL-JDBC驅(qū)動程序
  • Spark 在運行過程中,會將JAR上傳到節(jié)點,進行網(wǎng)絡傳輸,因此,Spark計算類,必須實現(xiàn)序列化接口 java.io.Serializable,同時設(shè)置序列化id( private static final long serialVersionUID = 1L;),如果不知道怎么設(shè)置,那就默認值1L,每次更新代碼,切記 maven clean package,缺一不可
  • Spark 在進行RDD計算的時候,可能會在集群中的任一節(jié)點上,因此每個節(jié)點也需要有 MySQL的JDBC驅(qū)動程序,否則無法創(chuàng)建數(shù)據(jù)庫表,我這里用了偷懶的方式,將JAR上傳到HDFS,通過配置文件進行加載啟動

代碼庫地址:https://github.com/lcy19930619/cdh-demo

環(huán)境處理

步驟一:添加 spark 基礎(chǔ)環(huán)境

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

步驟二,處理對應的 master 和 slave 節(jié)點

修改基礎(chǔ)環(huán)境配置文件

文件:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/conf/spark-env.sh
在文件上方添加以下內(nèi)容

export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera # jdk 路徑
SPARK_LOCAL_IP=10.8.0.6 # 此ip為我的遠程訪問ip地址,spark 默認只處理鏈接此ip的數(shù)據(jù)
SPARK_MASTER_HOST=10.8.0.6 # master 節(jié)點ip地址

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

修改端口

文件:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/conf/spark-defaults.conf
修改內(nèi)容:
將 7337 端口修改為 7447

spark.shuffle.service.port=7447

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

分別啟動節(jié)點

文件路徑:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/sbin
啟動 master 執(zhí)行:

./start-master.sh

啟動 slave 執(zhí)行:

./start-slaves.sh # 注意,這個腳本是有 s 的,還有一個是start-slave.sh,別啟動錯了

確認正常啟動

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

了解 Spark

  • Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室) 所開源的類Hadoop MapReduce的通用并行框架
  • 擁有Hadoop MapReduce所具有的優(yōu)點;但不同于MapReduce的是——Job中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫HDFS,因此Spark能更好地適用于數(shù)據(jù)挖掘與機器學習等需要迭代的MapReduce的算法。
  • Spark 比 MapReduce 快,MapReduce只能進行離線運算,并且需要完全依靠HDFS,數(shù)據(jù)需要從磁盤加載,然后才能進行計算,因此MapReduce速度較慢,但Spark可以將計算結(jié)果存儲到內(nèi)存中,也可以進行流式計算,因此速度比MapReduce 快
  • Spark 提供了大量的庫,包括Spark Core、Spark SQL、Spark Streaming、MLlib、GraphX。 開發(fā)者可以在同一個應用程序中無縫組合使用這些庫。
  • Spark 支持 Hadoop YARN,Apache Mesos,及其自帶的獨立集群管理器

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

組件

  • SparkCore:相當于MapReduce,是spark的核心引擎。

  • SparkSQL:是一個用于處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,主要用于結(jié)構(gòu)化數(shù)據(jù)處理和對數(shù)據(jù)執(zhí)行類SQL查詢??梢葬槍Σ煌瑪?shù)據(jù)格式(如:JSON,Parquet, ORC等)和數(shù)據(jù)源執(zhí)行ETL操作(如:HDFS、數(shù)據(jù)庫等),完成特定的查詢操作。

  • SparkStreaming:微批處理的流處理引擎,將流數(shù)據(jù)分片以后用SparkCore的計算引擎中進行處理,可以進行實時運算。

  • Mllib和GraphX主要一些機器學習和圖計算的算法庫。

  • SparkR是一個R語言包,它提供了輕量級的方式使得可以在R語言中使用 Spark。
    大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

Spark數(shù)據(jù)結(jié)構(gòu)

在Spark中,數(shù)據(jù)以RDD或者DataFrame的格式儲存。

RDD

RDD 是 Spark 中最重要的概念之一,全稱為 Resilient Distributed Dataset,即彈性分布式數(shù)據(jù)集。它是一種容錯、可并行計算的數(shù)據(jù)類型,可以跨多個節(jié)點進行分布式計算。RDD 是 Spark 提供的核心分布式數(shù)據(jù)結(jié)構(gòu),可以通過一系列的轉(zhuǎn)換和動作(operation)進行處理,從而實現(xiàn)大規(guī)模數(shù)據(jù)處理。

在 Spark 中,RDD 表示一個不可變、可分區(qū)、支持并行操作的數(shù)據(jù)集合,每個 RDD 可以被分為多個分區(qū),這些分區(qū)可以被放置在不同的節(jié)點上,使得計算可以在節(jié)點間并行進行。用戶可以通過一系列的操作來構(gòu)建和轉(zhuǎn)換 RDD。

RDD 的特點如下:

  • 以分布式方式存儲在多個節(jié)點上,通過網(wǎng)絡進行傳輸,可以實現(xiàn)高效的數(shù)據(jù)計算和處理。

  • 支持多種數(shù)據(jù)來源,例如 HDFS, HBase, Cassandra 等大規(guī)模數(shù)據(jù)存儲系統(tǒng)。

  • 可以容錯并快速恢復,當某個節(jié)點失敗或數(shù)據(jù)損壞時,RDD 可以快速恢復原始數(shù)據(jù)。

  • 支持多種操作,例如轉(zhuǎn)換(transformation)和動作(action),可以在 RDD 上進行各種復雜的數(shù)據(jù)處理、過濾、排序等操作。

DataFrame

DataFrame 是 Spark SQL 中內(nèi)置的、分布式的數(shù)據(jù)處理結(jié)構(gòu)。它可以看做是基于 RDD 的分布式數(shù)據(jù)集合,但相對于 RDD,DataFrame 提供了更高層次的抽象,使得數(shù)據(jù)處理更加方便、高效。DataFrame 可以將不同數(shù)據(jù)源中的數(shù)據(jù)統(tǒng)一表示為一個分布式的表格,提供了一套 SQL 的查詢語言,支持豐富的數(shù)據(jù)轉(zhuǎn)換以及數(shù)據(jù)分析處理功能。

與 RDD 不同的是,DataFrame 中的數(shù)據(jù)結(jié)構(gòu)是由一組命名的列組成的,支持多種數(shù)據(jù)類型,并且可以自動推斷數(shù)據(jù)模式(schema)。而且 DataFrame 中的數(shù)據(jù)都是以列存儲的,因此可以更加高效地進行數(shù)據(jù)壓縮和編碼,提高數(shù)據(jù)處理的速度和效率。除此之外,DataFrame 還提供了很多類似于 SQL 的數(shù)據(jù)操作方法,例如 select, filter, groupBy, orderBy 等等。

使用 DataFrame 可以更加方便地進行數(shù)據(jù)處理工作,將常用的大部分復雜計算交給 Spark SQL 來處理,而不需要過多地自己實現(xiàn)。

例如,在 Spark SQL 中可以讀取各種結(jié)構(gòu)化數(shù)據(jù)(如 JSON, CSV, parquet 等等),然后使用 DataFrame API 進行數(shù)據(jù)查詢、篩選、聚合甚至機器學習算法的處理。在某些情況下,DataFrame 甚至可以代替編寫 MapReduce 作業(yè)來處理數(shù)據(jù)。

Dataset

在 Spark 中,Dataset 是一種強類型的、可分布式處理的數(shù)據(jù)集合,可以運用 Spark 的函數(shù)式編程方式,提供了更加方便、穩(wěn)定的 API 接口,支持如 SQL 語法風格的查詢,也可以與原生 Scala、Java 等語言的 API 緊密結(jié)合,支持對各種數(shù)據(jù)源的讀取和寫入等操作。

Dataset 實現(xiàn)了兩個經(jīng)典的 Spark 數(shù)據(jù)結(jié)構(gòu):RDD 和 DataFrame。與 RDD 相比,Dataset 提供了更加高級的類型約束和更好的性能優(yōu)化,可以在編譯期間捕獲類型錯誤,并且能夠利用 Catalyst(Spark 的高性能查詢優(yōu)化器)對查詢語句進行優(yōu)化。

與 DataFrame 相比,Dataset 不僅支持強類型編程,還支持面向?qū)ο缶幊獭?梢酝ㄟ^編寫強類型類來指定數(shù)據(jù)結(jié)構(gòu),支持使用標準 Scala、Java 類型檢查器進行檢查,避免了在運行時出現(xiàn)類型不匹配的錯誤。

一次Spark Job的運行過程簡述

  • 配置與初始化。在這個階段中,Spark 根據(jù)用戶設(shè)定的配置信息,對集群進行初始化,并加載用戶指定的代碼和依賴項。這個階段還包括 Spark 上下文的創(chuàng)建和運行環(huán)境的初始化等操作。

  • 轉(zhuǎn)換與優(yōu)化。在這個階段中,Spark 根據(jù)用戶設(shè)定的代碼和數(shù)據(jù)輸入,進行一系列的轉(zhuǎn)換和優(yōu)化操作,包括分區(qū)、排序、過濾、聚合等操作。Spark 會根據(jù) DAG (Directed Acyclic Graph) 的形式將轉(zhuǎn)換操作組織起來,并進行邏輯優(yōu)化和物理優(yōu)化。

  • 計算與執(zhí)行。在這個階段中,Spark 根據(jù) DAG 的構(gòu)建結(jié)果,將代碼和數(shù)據(jù)輸入根據(jù) DAG 拆分為多個階段,并按照計算依賴關(guān)系進行并行計算和執(zhí)行。Spark 會在集群中的多個節(jié)點上運行任務,從而實現(xiàn)高效的數(shù)據(jù)并行處理。

  • 結(jié)果輸出和保存。在這個階段中,Spark 將計算結(jié)果進行輸出和保存,可以將結(jié)果保存到內(nèi)存、磁盤或是外部存儲系統(tǒng)中(如 HDFS, S3, HBase 等)??梢酝ㄟ^ API 代碼或 Spark SQL 等途徑直接與結(jié)果進行交互和查詢。

運行角色

在 Spark 集群中,有以下幾個運行角色:

  • Driver:Driver 是整個 Spark 應用程序的主類,通過調(diào)用 SparkContext 來創(chuàng)建 RDD 并且定義數(shù)據(jù)處理流程。Driver 維護著集群任務的整體狀態(tài)、資源分配和任務調(diào)度等職責,是整個應用的控制節(jié)點。

  • Executor:Executor 是 Spark 中真正執(zhí)行任務的角色,每個應用程序啟動時,Spark 會為每個節(jié)點分配一個或多個 Executor,Executor 會在該節(jié)點上負責執(zhí)行分配給它的任務,包括數(shù)據(jù)的計算和轉(zhuǎn)換、計算結(jié)果的緩存和存儲、以及任務的監(jiān)控和重試等職責。

  • Cluster Manager:Cluster Manager 是 Spark 集群的管理組件,負責分配和管理集群的資源、監(jiān)控集群的狀態(tài)和健康狀況、處理節(jié)點的故障和重啟等職責。Spark 支持多種 Cluster Manager,包括 Standalone、Mesos、YARN、Kubernetes 等。

  • Worker:Worker 是 Spark 集群中的節(jié)點,可以是物理機、虛擬機或 Docker 容器等,它們負責提供計算和存儲資源、啟動和運行 Executor、定期向 Cluster Manager 匯報節(jié)點狀態(tài)等職責。

  • Client:Client 是 Spark 應用程序的啟動者,負責啟動 Driver 進程,向 Cluster Manager 請求計算資源、提交應用程序代碼等職責。通常來說,Client 與 Driver 運行在同一臺機器上。

常用的配置參數(shù)

SparkConf 是 Spark 配置的核心類,你可以使用 SparkConf 配置類來設(shè)置 Spark 應用程序的各種參數(shù)。下面是一些常見的 SparkConf 配置參數(shù)及其說明:

  • spark.master: 指定 Spark 應用程序運行在哪個集群(Standalone、Mesos 或 YARN)的哪個節(jié)點上,以及運行模式(local、client 或 cluster);示例:spark://master:7077(集群模式)或 local[*](本地模式)。
  • spark.app.name: 指定應用程序的名稱,方便在 Spark Web UI 和日志中定位;示例:MyApp
  • spark.driver.memory: 指定 Driver 程序占用的內(nèi)存大小,包括 JVM Heap 和其他內(nèi)存(如 I/O 緩存);示例:2g。
  • spark.executor.memory: 指定 Executor 程序占用的內(nèi)存大小,包括 JVM Heap 和其他內(nèi)存(如 I/O 緩存);示例:4g。
  • spark.executor.instances: 指定 Spark 應用程序啟動的 Executor 數(shù)量;示例:10。
  • spark.executor.cores: 指定每個 Executor 占用的 CPU 核數(shù);示例:4。
  • spark.default.parallelism: 指定默認的并行度,通常和數(shù)據(jù)分區(qū)數(shù)保持一致;示例:100。
  • spark.sql.shuffle.partitions: 指定 Spark SQL Shuffle 操作的默認并行度,通常和數(shù)據(jù)分區(qū)數(shù)保持一致;示例:100。
  • spark.hadoop.fs.s3a.endpoint: 指定 Object Store 的訪問地址;示例:s3.amazonaws.com。
  • spark.hadoop.fs.s3a.access.key: 指定 Object Store 的訪問 Key;示例:AKIATXDGSSAACXEXAMPLE
  • spark.hadoop.fs.s3a.secret.key: 指定 Object Store 的訪問 Secret;示例:wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY。

除了上面列出的常用配置參數(shù)外,還有許多其他的配置參數(shù),
以下是 Spark 官方文檔的鏈接:

  • Spark 配置指南
  • Spark SQL 配置指南
  • Spark Streaming 配置指南

在這些文檔中,可以找到 Spark 所有模塊的配置參數(shù),包括 Spark Core、Spark SQL、Spark Streaming、機器學習庫 MLlib 等。同時,這些文檔還提供了每個配置參數(shù)的用途、默認值和可用值范圍等信息。

代碼編寫

查看scala 版本和 spark 版本

登陸 Spark Master 服務器

# 找到執(zhí)行 spark-shell 的目錄
cd /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/bin
# 執(zhí)行該命令
./spark-shell

觀察執(zhí)行結(jié)果
大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算
Spark version: 2.4.0-cdh6.2.0
Scala version: 2.11.12

pom.xml

已知 Scala 版本,和 spark 版本,所以針對性的添加依賴文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>cdh-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cdh-demo</name>
    <description>cdh-demo</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <cdh.version>2.4.0-cdh6.2.0</cdh.version>
        <scala.version>2.11.12</scala.version>
        <hadoop.version>3.0.0-cdh6.2.0</hadoop.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- scala 依賴 開始 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency>
        <!-- scala 依賴 結(jié)束-->

        <!-- spark 依賴 開始-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${cdh.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${cdh.version}</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-xml_2.12</artifactId>
            <version>0.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${cdh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${cdh.version}</version>
        </dependency>
        <!-- spark 依賴 結(jié)束-->

        <dependency>
            <groupId>org.glassfish.jersey.inject</groupId>
            <artifactId>jersey-hk2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.example.cdh.CdhDemoApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>cloudera.repo</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>spring</id>
            <url>https://maven.aliyun.com/repository/central</url>
        </repository>
        <repository>
            <id>jcenter</id>
            <url>https://maven.aliyun.com/repository/jcenter</url>
        </repository>
        <repository>
            <id>public</id>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
    </repositories>
</project>

yml

spark:
  jars:
    # 當前 JAR 的名字,支持相對路徑,如果使用匿名內(nèi)部類,會生成$1的class,不添加jar,會出現(xiàn)ClassNotFoundException
    - target/cdh-demo-0.0.1-SNAPSHOT.jar
  app-name: cdh-demo
  master-url: spark://cdh-slave-1:7077
  driver:
    memory: 1g
    host: 10.8.0.5
    # JDBC 驅(qū)動地址,手動上傳到 hdfs 的
    extraClassPath: hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar
  worker:
    memory: 1g
  executor:
    memory: 1g
  rpc:
    message:
      maxSize: 1024

logging:
  level:
    org:
      apache:
        spark:
          storage: WARN
          deploy:
            client: WARN
          scheduler:
            cluster: WARN
hadoop:
  url: hdfs://cdh-slave-1:8020
  replication: 3
  blockSize: 2097152
  user: root

SparkAutoConfiguration

package com.example.cdh.configuration;

import com.example.cdh.properties.spark.SparkProperties;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.AbstractEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource;

/**
 * @author lcy
 */
@Configuration
public class SparkAutoConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(SparkAutoConfiguration.class);
    @Autowired
    private SparkProperties sparkProperties;
    @Autowired
    private Environment env;
    /**
     * spark 的基本配置
     *
     * @return 把 yml 里配置的內(nèi)容都寫入該配置項
     */
    @Bean
    public SparkConf sparkConf() {
        List<String> jars = sparkProperties.getJars();
        String[] sparkJars = jars.toArray(new String[0]);
        SparkConf conf = new SparkConf()
            .setAppName(sparkProperties.getAppName())
            .setMaster(sparkProperties.getMasterUrL())
            .setJars(sparkJars);
        AbstractEnvironment abstractEnvironment = ((AbstractEnvironment) env);

        MutablePropertySources sources = abstractEnvironment.getPropertySources();
        for (PropertySource<?> source : sources) {
            if (source instanceof MapPropertySource) {
                Map<String, Object> propertyMap = ((MapPropertySource) source).getSource();
                for (Map.Entry<String, Object> entry : propertyMap.entrySet()) {
                    String key = entry.getKey();
                    if (key.startsWith("spark.")) {
                        if ("spark.jars".equals(key)){
                            continue;
                        }
                        String value = env.getProperty(key);
                        conf.set(key,value);
                        logger.info("已識別 spark 配置屬性,{}:{}",key,value);
                    }
                }
            }
        }
     //   也可以通過此方式設(shè)置 (方式二)   二選一即可
     //   conf.set("spark.driver.extraClassPath","hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar");
     //   也可以通過此方式設(shè)置 (方式三)    二選一即可 
     //   conf.set("spark.executor.extraClassPath","hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar");
        return conf;
    }

    /**
     * 連接 spark 集群
     *
     * @param sparkConf
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(JavaSparkContext.class)
    public JavaSparkContext javaSparkContext(SparkConf sparkConf) {
        return new JavaSparkContext(sparkConf);
    }

    /**
     *
     * @param javaSparkContext
     * @return
     */
    @Bean
    public SparkSession sparkSession(JavaSparkContext javaSparkContext) {
        return SparkSession
            .builder()
            .sparkContext(javaSparkContext.sc())
            .appName(sparkProperties.getAppName())
            .getOrCreate();
    }


}

踩坑記錄

ClassNotFoundException:xxxxxx$1

異常信息截圖

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

產(chǎn)生的原因分析

Spark 在執(zhí)行過程中,會將jar 進行網(wǎng)絡傳輸,但是代碼中包含了匿名內(nèi)部類,因此產(chǎn)生了$1這種后綴的class 文件

示例

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

解決方案

在裝配時候,通過 setJars方法,添加當前的jar包作為傳輸對象,該路徑可以為相對路徑,或者 hdfs 路徑
大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算
大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

示例代碼目標

使用 spark sql 進行簡單的查詢示例

  • 簡單的條件查詢
  • 稍微復雜一些的聚合查詢
  • 每行數(shù)據(jù)對象,轉(zhuǎn)換為自定義Class對象
  • 目標數(shù)據(jù)存儲到MySQL數(shù)據(jù)庫中
  • 符合斷言判斷
package com.example.cdh.service;

import com.example.cdh.dto.UserDTO;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static org.apache.spark.sql.functions.column;
import static org.apache.spark.sql.functions.count;

/**
 * 使用 spark sql 離線計算
 *
 * @author chunyang.leng
 * @date 2023-04-12 14:53
 */
@Component
public class SparkOfflineService implements Serializable {
    private static final Logger logger = LoggerFactory.getLogger(SparkOfflineService.class);
    private static final long serialVersionUID = 1L;

    @Autowired
    private SparkSession sparkSession;

    /**
     * 統(tǒng)計 hdfs 中一個csv文件的行數(shù)
     *
     * @param hdfsPath demo: hdfs://cdh-slave-1:8020/demo/csv/input.csv
     * @return
     */
    public long countHdfsCsv(String hdfsPath) {
        return sparkSession.read().csv(hdfsPath).count();
    }

    /**
     * 小于等于 計算示例
     * <br/>
     * <pre>
     * {@code  select name, age from xx where age <=#{age} }
     * </pre>
     * @param hdfsPath 要計算的文件
     * @param age 閾值
     * @return 算出來的數(shù)據(jù)總量
     */
    public long lte(String hdfsPath, int age) {
        // 臨時表名稱
        String tempTableName = "cdh_demo_lte";
        // 加載 csv 數(shù)據(jù)
        Dataset<UserDTO> data = loadCsv(hdfsPath);

        // 創(chuàng)建臨時表
        data.createOrReplaceTempView(tempTableName);
        // 執(zhí)行 sql 語句
        Dataset<Row> sqlData = sparkSession
            .sql("select name,age from " + tempTableName + " where age <= " + age);

        // 存儲數(shù)據(jù)
        saveToMySQL(tempTableName, sqlData);
        return sqlData.count();
    }

    /**
     * 簡單的聚合查詢示例
     * <br/>
     * <pre>
     * {@code
     * select
     *      count(name) as c,
     *      age
     * from
     *      xx
     * group by age
     *
     * having c > #{count}
     *
     * order by c desc
     * }
     * </pre>
     * @param hdfsPath 要統(tǒng)計的文件
     * @param count having > #{count}
     * @return
     */
    public long agg(String hdfsPath, int count){
        // 臨時表名稱
        String tempTableName = "cdh_demo_agg";
        // 加載 csv 數(shù)據(jù)
        Dataset<UserDTO> data = loadCsv(hdfsPath);

        // 創(chuàng)建臨時表
        data.createOrReplaceTempView(tempTableName);
        // 執(zhí)行 sql 語句
        Dataset<Row> sqlData = sparkSession
            .sql("select name,age from " + tempTableName)
            .groupBy(column("age").alias("age"))
            .agg(count("name").alias("c"))
            // filter = having
            .filter(column("c").gt(count))
            // 按照統(tǒng)計出來的數(shù)量,降序排序
            .orderBy(column("c").desc());

        saveToMySQL(tempTableName, sqlData);
        return sqlData.count();
    }

    /**
     * 加載 hdfs 中 csv 文件內(nèi)容
     * @param hdfsPath
     * @return
     */
    private Dataset<UserDTO> loadCsv(String hdfsPath) {

        // 自定義數(shù)據(jù)類型,也可以使用數(shù)據(jù)類型自動推斷
        StructField nameField = DataTypes.createStructField("name", DataTypes.StringType, true);
        StructField ageField = DataTypes.createStructField("age", DataTypes.IntegerType, true);

        StructField[] fields = new StructField[2];
        fields[0] = nameField;
        fields[1] = ageField;
        StructType schema = new StructType(fields);

        return sparkSession
                .read()
                .schema(schema)
                .csv(hdfsPath)
                .map(new MapFunction<Row, UserDTO>() {
                    @Override
                    public UserDTO call(Row row) throws Exception {	
                        UserDTO dto = new UserDTO();
                        // 防止 npe 
                        if (!row.isNullAt(0)){
                            dto.setName(row.getString(0));
                        }
                        // 防止 npe 
                        if (!row.isNullAt(1)) {
                            dto.setAge(row.getInt(1));
                        }
                        return dto;
                    }
                }, Encoders.bean(UserDTO.class));
    }

    /**
     * 數(shù)據(jù)存儲到 mysql
     * @param tableName 表名字
     * @param dataset 數(shù)據(jù)
     */
    private void saveToMySQL(String tableName,Dataset<Row> dataset){
        dataset
            .write()
            // 覆蓋模式,原始數(shù)據(jù)會被覆蓋掉,如果需要追加,換成 SaveMode.Append
            .mode(SaveMode.Overwrite)
            .format("jdbc")
            .option("url", "jdbc:mysql://10.8.0.4/test")
            .option("driver", "com.mysql.jdbc.Driver")
            .option("dbtable", tableName)
            .option("user", "root")
            .option("password", "q")
            .save();
    }
}

測試用例代碼

package com.example.cdh.spark;

import com.example.cdh.service.HdfsService;
import com.example.cdh.service.SparkOfflineService;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.Assert;

/**
 * @author chunyang.leng
 * @date 2023-04-12 14:55
 */
@SpringBootTest
public class SparkOfflineTest {
    private static final Logger logger = LoggerFactory.getLogger(SparkOfflineTest.class);
    @Autowired
    private SparkOfflineService sparkOfflineService;
    @Autowired
    private HdfsService hdfsService;
    String path = "hdfs://cdh-slave-1:8020/demo/csv/input.csv";


    @Test
    public void countHdfsCsvTest() throws IOException {
        initHdfsData();
        long count = sparkOfflineService.countHdfsCsv(path);
        cleanup();
        Assert.isTrue(count == 6,"查詢的結(jié)果應該為6");
        logger.info("統(tǒng)計測試執(zhí)行完畢");
    }

    @Test
    public void lteTest() throws  IOException {
        initHdfsData();
        long count = sparkOfflineService.lte(path, 19);
        cleanup();
        Assert.isTrue(count == 3,"查詢的結(jié)果應該為 3");
        logger.info("簡單條件測試執(zhí)行完畢");

    }

    @Test
    public void aggTest() throws IOException {
        initHdfsData();
        long count = sparkOfflineService.agg(path, 1);
        cleanup();
        Assert.isTrue(count == 1,"查詢的結(jié)果應該為 1");
        logger.info("聚合測試執(zhí)行完畢");
    }


    private void initHdfsData() throws IOException {
        String data =
            "name,age\n"+
            "n0,17\n" +
            "n1,18\n" +
            "n2,19\n" +
            "n3,20\n" +
            "n4,20\n";
        hdfsService.delete(path,true);
        hdfsService.uploadFile(data.getBytes(StandardCharsets.UTF_8),path,true);
    }

    private void cleanup() throws IOException {
        hdfsService.delete(path,true);
    }
}

測試結(jié)果

單元測試通過

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

lte臨時表數(shù)據(jù)驗證通過

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算

agg 臨時表數(shù)據(jù)驗證通過

大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算文章來源地址http://www.zghlxwxcb.cn/news/detail-472469.html

到了這里,關(guān)于大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • CDH6.3.2集成Kerberos

    CDH6.3.2集成Kerberos

    CDH enable kerberos: Kerberos Security Artifacts Overview | 6.3.x | Cloudera Documentation CDH disable kerberos:https://www.sameerahmad.net/blog/disable-kerberos-on-CDH; https://community.cloudera.com/t5/Support-Questions/Disabling-Kerberos/td-p/19654 進入Cloudera Manager的**“管理”-“安全”**界面 1)選擇“啟用Kerberos”,進入如下界面

    2024年02月10日
    瀏覽(29)
  • CDH整合Flink(CDH6.3.0+Flink1.12.1)

    CDH整合Flink(CDH6.3.0+Flink1.12.1)

    下載 準備FLINK1.12.1包 準備paecel環(huán)境 修改配置文件 執(zhí)行這部分操作需要稍等一會,打包結(jié)束后執(zhí)行另外一個操作 生成這倆包為:FLINK-1.12.1-BIN-SCALA_2.12.tar FLINK_ON_YARN-1.12.1.jar 由于Flink1.12版本編譯后確實沒有flink-shaded-hadoop-2-uber 3.0.0-cdh6.3.0-10.0文件,但是flink-shaded-10.0也適配flink

    2024年01月23日
    瀏覽(24)
  • CDH6.3.2-組件安裝&安全認證

    CDH6.3.2-組件安裝&安全認證

    1.選擇自定義。 2.選擇HDFS ZK YARN然后點繼續(xù)。? ? 3.選擇安裝的主機。 4.審核更改默認就行,點繼續(xù)。? 5.配置HDFS的HA。 ? ?安裝好以后點擊hdfs進入實例就能夠看到啟動了高可用。 6.啟動YARN的高可用。 ? ? ? ? 更具需求修改資源? ? 一直點繼續(xù)就行了? ? ? ? ? ? ? ? ?在/

    2024年02月16日
    瀏覽(21)
  • CDH6.3.2企業(yè)級安裝實戰(zhàn)

    1、環(huán)境介紹 IP 操作系統(tǒng) 聯(lián)網(wǎng) 10.191.15.15 Centos 7.4 離網(wǎng) 10.191.15.16 Centos 7.4 離網(wǎng) 10.191.15.17 Centos 7.4 離網(wǎng) 10.191.15.18 Centos 7.4 離網(wǎng) 2、搭建本地Yum源 2.1 配置本地基礎(chǔ)Yum源 1、上傳鏡像到服務器 下載的Centos鏡像為 CentOS-7-x86_64-Everything-1708.iso , 放置目錄為: /root/download

    2024年01月18日
    瀏覽(35)
  • CDH6.3.2搭建HIVE ON TEZ

    CDH6.3.2搭建HIVE ON TEZ

    參考 https://blog.csdn.net/ly8951677/article/details/124152987 ----配置hive運行引擎 在/etc/hive/conf/hive-site.xml中修改如下: hive.execution.engine mr–tez 或者運行代碼的時候: 如果內(nèi)存不夠:可以修改如下參數(shù)設(shè)置 在配置文件設(shè)置后,如果集群重啟會把配置的恢復,需要再CDH界面配置:

    2024年02月13日
    瀏覽(31)
  • CDH6.3.2 集成 Flink 1.17.0 失敗過程

    CDH6.3.2 集成 Flink 1.17.0 失敗過程

    目錄 一:下載Flink,并制作parcel包 1.相關(guān)資源下載 2. 修改配置 準備工作一: 準備工作二: 3. 開始build 二:開始在CDH頁面分發(fā)激活 ?三:CDH添加Flink-yarn 服務 ?四:啟動不起來的問題解決 五:CDH6.3.2集群集成zookeeper3.6.3 六:重新適配Flink服務 環(huán)境說明: cdh版本:cdh6.3.2 組件版本信

    2024年01月17日
    瀏覽(27)
  • Spring Boot入門(14):使用Mybatis-Plus輕松實現(xiàn)高效自定義SQL操作!

    ? ? ? ? 在上幾期,我們既講了如何整合Mybatis-Plus進行數(shù)據(jù)庫的增刪改查,也講解了如何使用MP的 Wrapper 構(gòu)造器,但若是遇到復雜業(yè)務邏輯,如多表聯(lián)查、動態(tài)拼接條件等,這些操作往往會讓代碼變得冗長且難以維護。但是,有了Mybatis-Plus這個優(yōu)秀的框架,我們可以輕松實現(xiàn)

    2024年02月10日
    瀏覽(25)
  • 基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:為實現(xiàn)基于數(shù)據(jù)湖的流批一體,采用業(yè)內(nèi)主流技術(shù)棧hudi、flink、CDH(hive、spark)。flink使用sql client與hive的catalog打通,可以與hive共享元數(shù)據(jù),使用sql client可操作hive中的表,實現(xiàn)批流一體;flink與hudi集成可以實現(xiàn)數(shù)據(jù)實時入湖;hudi與hive集成可以實現(xiàn)湖倉一體,用flink實

    2024年02月12日
    瀏覽(26)
  • Spring Boot 中的 Redis 數(shù)據(jù)操作配置和使用

    Spring Boot 中的 Redis 數(shù)據(jù)操作配置和使用

    Redis(Remote Dictionary Server)是一種高性能的開源內(nèi)存數(shù)據(jù)庫,用于緩存、消息隊列、會話管理和數(shù)據(jù)存儲。在Spring Boot應用程序中,Redis被廣泛用于各種用例,包括緩存、持久性存儲和分布式鎖。本文將探討如何在Spring Boot中配置和使用Redis,包括數(shù)據(jù)操作和常見用例。 要在S

    2024年02月07日
    瀏覽(29)
  • Spring Boot入門(09):如何使用MyBatis的XML配置方式實現(xiàn)MySQL的增刪改查操作?

    Spring Boot入門(09):如何使用MyBatis的XML配置方式實現(xiàn)MySQL的增刪改查操作?

    ????????想要快速高效地開發(fā)Java Web應用程序,選擇使用Spring Boot和MyBatis無疑是明智之舉。本篇文章將教你使用MyBatis的XML配置方式,結(jié)合MySQL數(shù)據(jù)庫,實現(xiàn)常見的增刪改查操作,讓你的應用程序更加實用和強大。跟隨本文一起來探索MyBatis在Spring Boot中的力量吧! ? ? ? ?

    2024年02月11日
    瀏覽(33)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包