前言
CDH 6.2.0 搭建的環(huán)境,并不能直接使用 spark 相關(guān)資源,需要對此服務端環(huán)境進行一些修改
Spark 目前僅支持 JDK1.8, Java項目運行環(huán)境只能使用JDK 1.8
我這里使用的是 CDH6.2.0集群,因此使用的依賴為CDH專用依賴,需要先添加倉庫
spark 使用scala 語言編寫,因此項目中使用的scala依賴版本要和cdh中的 scala 版本一致
因為需要將計算結(jié)果寫入到MySQL,所以當前項目中需要加入MySQL-JDBC驅(qū)動程序
Spark 在運行過程中,會將JAR上傳到節(jié)點,進行網(wǎng)絡傳輸,因此,Spark計算類,必須實現(xiàn)序列化接口 java.io.Serializable,同時設(shè)置序列化id( private static final long serialVersionUID = 1L;),如果不知道怎么設(shè)置,那就默認值1L,每次更新代碼,切記 maven clean package,缺一不可
Spark 在進行RDD計算的時候,可能會在集群中的任一節(jié)點上,因此每個節(jié)點也需要有 MySQL的JDBC驅(qū)動程序,否則無法創(chuàng)建數(shù)據(jù)庫表,我這里用了偷懶的方式,將JAR上傳到HDFS,通過配置文件進行加載啟動
代碼庫地址:https://github.com/lcy19930619/cdh-demo
環(huán)境處理
步驟一:添加 spark 基礎(chǔ)環(huán)境
步驟二,處理對應的 master 和 slave 節(jié)點
修改基礎(chǔ)環(huán)境配置文件
文件:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/conf/spark-env.sh
在文件上方添加以下內(nèi)容
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera # jdk 路徑
SPARK_LOCAL_IP=10.8.0.6 # 此ip為我的遠程訪問ip地址,spark 默認只處理鏈接此ip的數(shù)據(jù)
SPARK_MASTER_HOST=10.8.0.6 # master 節(jié)點ip地址
修改端口
文件:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/conf/spark-defaults.conf
修改內(nèi)容:
將 7337 端口修改為 7447
spark.shuffle.service.port=7447
分別啟動節(jié)點
文件路徑:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/sbin
啟動 master 執(zhí)行:
./start-master.sh
啟動 slave 執(zhí)行:
./start-slaves.sh # 注意,這個腳本是有 s 的,還有一個是start-slave.sh,別啟動錯了
確認正常啟動
了解 Spark
- Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室) 所開源的
類Hadoop MapReduce的通用并行框架
- 擁有Hadoop MapReduce所具有的優(yōu)點;但不同于MapReduce的是——
Job中間輸出結(jié)果可以保存在內(nèi)存中
,從而不再需要讀寫HDFS,因此Spark能更好地適用于數(shù)據(jù)挖掘與機器學習等需要迭代的MapReduce的算法。 - Spark 比 MapReduce 快,MapReduce只能進行離線運算,并且需要完全依靠HDFS,數(shù)據(jù)需要從磁盤加載,然后才能進行計算,因此MapReduce速度較慢,但Spark可以將計算結(jié)果存儲到內(nèi)存中,也可以進行流式計算,因此速度比MapReduce 快
- Spark 提供了大量的庫,包括Spark Core、Spark SQL、Spark Streaming、MLlib、GraphX。 開發(fā)者可以在同一個應用程序中無縫組合使用這些庫。
- Spark 支持 Hadoop YARN,Apache Mesos,及其自帶的獨立集群管理器
組件
-
SparkCore:相當于MapReduce,是spark的核心引擎。
-
SparkSQL:是一個用于處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,主要用于結(jié)構(gòu)化數(shù)據(jù)處理和對數(shù)據(jù)執(zhí)行類SQL查詢??梢葬槍Σ煌瑪?shù)據(jù)格式(如:JSON,Parquet, ORC等)和數(shù)據(jù)源執(zhí)行ETL操作(如:HDFS、數(shù)據(jù)庫等),完成特定的查詢操作。
-
SparkStreaming:微批處理的流處理引擎,將流數(shù)據(jù)分片以后用SparkCore的計算引擎中進行處理,可以進行實時運算。
-
Mllib和GraphX主要一些機器學習和圖計算的算法庫。
-
SparkR是一個R語言包,它提供了輕量級的方式使得可以在R語言中使用 Spark。
Spark數(shù)據(jù)結(jié)構(gòu)
在Spark中,數(shù)據(jù)以RDD或者DataFrame的格式儲存。
RDD
RDD 是 Spark 中最重要的概念之一,全稱為 Resilient Distributed Dataset,即彈性分布式數(shù)據(jù)集。它是一種容錯、可并行計算的數(shù)據(jù)類型,可以跨多個節(jié)點進行分布式計算。RDD 是 Spark 提供的核心分布式數(shù)據(jù)結(jié)構(gòu),可以通過一系列的轉(zhuǎn)換和動作(operation)進行處理,從而實現(xiàn)大規(guī)模數(shù)據(jù)處理。
在 Spark 中,RDD 表示一個不可變、可分區(qū)、支持并行操作的數(shù)據(jù)集合,每個 RDD 可以被分為多個分區(qū),這些分區(qū)可以被放置在不同的節(jié)點上,使得計算可以在節(jié)點間并行進行。用戶可以通過一系列的操作來構(gòu)建和轉(zhuǎn)換 RDD。
RDD 的特點如下:
-
以分布式方式存儲在多個節(jié)點上,通過網(wǎng)絡進行傳輸,可以實現(xiàn)高效的數(shù)據(jù)計算和處理。
-
支持多種數(shù)據(jù)來源,例如 HDFS, HBase, Cassandra 等大規(guī)模數(shù)據(jù)存儲系統(tǒng)。
-
可以容錯并快速恢復,當某個節(jié)點失敗或數(shù)據(jù)損壞時,RDD 可以快速恢復原始數(shù)據(jù)。
-
支持多種操作,例如轉(zhuǎn)換(transformation)和動作(action),可以在 RDD 上進行各種復雜的數(shù)據(jù)處理、過濾、排序等操作。
DataFrame
DataFrame 是 Spark SQL 中內(nèi)置的、分布式的數(shù)據(jù)處理結(jié)構(gòu)。它可以看做是基于 RDD 的分布式數(shù)據(jù)集合,但相對于 RDD,DataFrame 提供了更高層次的抽象,使得數(shù)據(jù)處理更加方便、高效。DataFrame 可以將不同數(shù)據(jù)源中的數(shù)據(jù)統(tǒng)一表示為一個分布式的表格,提供了一套 SQL 的查詢語言,支持豐富的數(shù)據(jù)轉(zhuǎn)換以及數(shù)據(jù)分析處理功能。
與 RDD 不同的是,DataFrame 中的數(shù)據(jù)結(jié)構(gòu)是由一組命名的列組成的,支持多種數(shù)據(jù)類型,并且可以自動推斷數(shù)據(jù)模式(schema)。而且 DataFrame 中的數(shù)據(jù)都是以列存儲的,因此可以更加高效地進行數(shù)據(jù)壓縮和編碼,提高數(shù)據(jù)處理的速度和效率。除此之外,DataFrame 還提供了很多類似于 SQL 的數(shù)據(jù)操作方法,例如 select, filter, groupBy, orderBy 等等。
使用 DataFrame 可以更加方便地進行數(shù)據(jù)處理工作,將常用的大部分復雜計算交給 Spark SQL 來處理,而不需要過多地自己實現(xiàn)。
例如,在 Spark SQL 中可以讀取各種結(jié)構(gòu)化數(shù)據(jù)(如 JSON, CSV, parquet 等等),然后使用 DataFrame API 進行數(shù)據(jù)查詢、篩選、聚合甚至機器學習算法的處理。在某些情況下,DataFrame 甚至可以代替編寫 MapReduce 作業(yè)來處理數(shù)據(jù)。
Dataset
在 Spark 中,Dataset 是一種強類型的、可分布式處理的數(shù)據(jù)集合,可以運用 Spark 的函數(shù)式編程方式,提供了更加方便、穩(wěn)定的 API 接口,支持如 SQL 語法風格的查詢,也可以與原生 Scala、Java 等語言的 API 緊密結(jié)合,支持對各種數(shù)據(jù)源的讀取和寫入等操作。
Dataset 實現(xiàn)了兩個經(jīng)典的 Spark 數(shù)據(jù)結(jié)構(gòu):RDD 和 DataFrame。與 RDD 相比,Dataset 提供了更加高級的類型約束和更好的性能優(yōu)化,可以在編譯期間捕獲類型錯誤,并且能夠利用 Catalyst(Spark 的高性能查詢優(yōu)化器)對查詢語句進行優(yōu)化。
與 DataFrame 相比,Dataset 不僅支持強類型編程,還支持面向?qū)ο缶幊獭?梢酝ㄟ^編寫強類型類來指定數(shù)據(jù)結(jié)構(gòu),支持使用標準 Scala、Java 類型檢查器進行檢查,避免了在運行時出現(xiàn)類型不匹配的錯誤。
一次Spark Job的運行過程簡述
-
配置與初始化。在這個階段中,Spark 根據(jù)用戶設(shè)定的配置信息,對集群進行初始化,并加載用戶指定的代碼和依賴項。這個階段還包括 Spark 上下文的創(chuàng)建和運行環(huán)境的初始化等操作。
-
轉(zhuǎn)換與優(yōu)化。在這個階段中,Spark 根據(jù)用戶設(shè)定的代碼和數(shù)據(jù)輸入,進行一系列的轉(zhuǎn)換和優(yōu)化操作,包括分區(qū)、排序、過濾、聚合等操作。Spark 會根據(jù) DAG (Directed Acyclic Graph) 的形式將轉(zhuǎn)換操作組織起來,并進行邏輯優(yōu)化和物理優(yōu)化。
-
計算與執(zhí)行。在這個階段中,Spark 根據(jù) DAG 的構(gòu)建結(jié)果,將代碼和數(shù)據(jù)輸入根據(jù) DAG 拆分為多個階段,并按照計算依賴關(guān)系進行并行計算和執(zhí)行。Spark 會在集群中的多個節(jié)點上運行任務,從而實現(xiàn)高效的數(shù)據(jù)并行處理。
-
結(jié)果輸出和保存。在這個階段中,Spark 將計算結(jié)果進行輸出和保存,可以將結(jié)果保存到內(nèi)存、磁盤或是外部存儲系統(tǒng)中(如 HDFS, S3, HBase 等)??梢酝ㄟ^ API 代碼或 Spark SQL 等途徑直接與結(jié)果進行交互和查詢。
運行角色
在 Spark 集群中,有以下幾個運行角色:
-
Driver:Driver 是整個 Spark 應用程序的主類,通過調(diào)用 SparkContext 來創(chuàng)建 RDD 并且定義數(shù)據(jù)處理流程。Driver 維護著集群任務的整體狀態(tài)、資源分配和任務調(diào)度等職責,是整個應用的控制節(jié)點。
-
Executor:Executor 是 Spark 中真正執(zhí)行任務的角色,每個應用程序啟動時,Spark 會為每個節(jié)點分配一個或多個 Executor,Executor 會在該節(jié)點上負責執(zhí)行分配給它的任務,包括數(shù)據(jù)的計算和轉(zhuǎn)換、計算結(jié)果的緩存和存儲、以及任務的監(jiān)控和重試等職責。
-
Cluster Manager:Cluster Manager 是 Spark 集群的管理組件,負責分配和管理集群的資源、監(jiān)控集群的狀態(tài)和健康狀況、處理節(jié)點的故障和重啟等職責。Spark 支持多種 Cluster Manager,包括 Standalone、Mesos、YARN、Kubernetes 等。
-
Worker:Worker 是 Spark 集群中的節(jié)點,可以是物理機、虛擬機或 Docker 容器等,它們負責提供計算和存儲資源、啟動和運行 Executor、定期向 Cluster Manager 匯報節(jié)點狀態(tài)等職責。
-
Client:Client 是 Spark 應用程序的啟動者,負責啟動 Driver 進程,向 Cluster Manager 請求計算資源、提交應用程序代碼等職責。通常來說,Client 與 Driver 運行在同一臺機器上。
常用的配置參數(shù)
SparkConf
是 Spark 配置的核心類,你可以使用 SparkConf
配置類來設(shè)置 Spark 應用程序的各種參數(shù)。下面是一些常見的 SparkConf 配置參數(shù)及其說明:
-
spark.master
: 指定 Spark 應用程序運行在哪個集群(Standalone、Mesos 或 YARN)的哪個節(jié)點上,以及運行模式(local、client 或 cluster);示例:spark://master:7077
(集群模式)或local[*]
(本地模式)。 -
spark.app.name
: 指定應用程序的名稱,方便在 Spark Web UI 和日志中定位;示例:MyApp
。 -
spark.driver.memory
: 指定 Driver 程序占用的內(nèi)存大小,包括 JVM Heap 和其他內(nèi)存(如 I/O 緩存);示例:2g
。 -
spark.executor.memory
: 指定 Executor 程序占用的內(nèi)存大小,包括 JVM Heap 和其他內(nèi)存(如 I/O 緩存);示例:4g
。 -
spark.executor.instances
: 指定 Spark 應用程序啟動的 Executor 數(shù)量;示例:10
。 -
spark.executor.cores
: 指定每個 Executor 占用的 CPU 核數(shù);示例:4
。 -
spark.default.parallelism
: 指定默認的并行度,通常和數(shù)據(jù)分區(qū)數(shù)保持一致;示例:100
。 -
spark.sql.shuffle.partitions
: 指定 Spark SQL Shuffle 操作的默認并行度,通常和數(shù)據(jù)分區(qū)數(shù)保持一致;示例:100
。 -
spark.hadoop.fs.s3a.endpoint
: 指定 Object Store 的訪問地址;示例:s3.amazonaws.com
。 -
spark.hadoop.fs.s3a.access.key
: 指定 Object Store 的訪問 Key;示例:AKIATXDGSSAACXEXAMPLE
。 -
spark.hadoop.fs.s3a.secret.key
: 指定 Object Store 的訪問 Secret;示例:wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY
。
除了上面列出的常用配置參數(shù)外,還有許多其他的配置參數(shù),
以下是 Spark 官方文檔的鏈接:
- Spark 配置指南
- Spark SQL 配置指南
- Spark Streaming 配置指南
在這些文檔中,可以找到 Spark 所有模塊的配置參數(shù),包括 Spark Core、Spark SQL、Spark Streaming、機器學習庫 MLlib 等。同時,這些文檔還提供了每個配置參數(shù)的用途、默認值和可用值范圍等信息。
代碼編寫
查看scala 版本和 spark 版本
登陸 Spark Master 服務器
# 找到執(zhí)行 spark-shell 的目錄
cd /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/bin
# 執(zhí)行該命令
./spark-shell
觀察執(zhí)行結(jié)果
Spark version: 2.4.0-cdh6.2.0
Scala version: 2.11.12
pom.xml
已知 Scala 版本,和 spark 版本,所以針對性的添加依賴文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>cdh-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cdh-demo</name>
<description>cdh-demo</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<cdh.version>2.4.0-cdh6.2.0</cdh.version>
<scala.version>2.11.12</scala.version>
<hadoop.version>3.0.0-cdh6.2.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- scala 依賴 開始 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<!-- scala 依賴 結(jié)束-->
<!-- spark 依賴 開始-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${cdh.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${cdh.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.12</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${cdh.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${cdh.version}</version>
</dependency>
<!-- spark 依賴 結(jié)束-->
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.example.cdh.CdhDemoApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>cloudera.repo</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring</id>
<url>https://maven.aliyun.com/repository/central</url>
</repository>
<repository>
<id>jcenter</id>
<url>https://maven.aliyun.com/repository/jcenter</url>
</repository>
<repository>
<id>public</id>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
</project>
yml
spark:
jars:
# 當前 JAR 的名字,支持相對路徑,如果使用匿名內(nèi)部類,會生成$1的class,不添加jar,會出現(xiàn)ClassNotFoundException
- target/cdh-demo-0.0.1-SNAPSHOT.jar
app-name: cdh-demo
master-url: spark://cdh-slave-1:7077
driver:
memory: 1g
host: 10.8.0.5
# JDBC 驅(qū)動地址,手動上傳到 hdfs 的
extraClassPath: hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar
worker:
memory: 1g
executor:
memory: 1g
rpc:
message:
maxSize: 1024
logging:
level:
org:
apache:
spark:
storage: WARN
deploy:
client: WARN
scheduler:
cluster: WARN
hadoop:
url: hdfs://cdh-slave-1:8020
replication: 3
blockSize: 2097152
user: root
SparkAutoConfiguration
package com.example.cdh.configuration;
import com.example.cdh.properties.spark.SparkProperties;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.AbstractEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource;
/**
* @author lcy
*/
@Configuration
public class SparkAutoConfiguration {
private static final Logger logger = LoggerFactory.getLogger(SparkAutoConfiguration.class);
@Autowired
private SparkProperties sparkProperties;
@Autowired
private Environment env;
/**
* spark 的基本配置
*
* @return 把 yml 里配置的內(nèi)容都寫入該配置項
*/
@Bean
public SparkConf sparkConf() {
List<String> jars = sparkProperties.getJars();
String[] sparkJars = jars.toArray(new String[0]);
SparkConf conf = new SparkConf()
.setAppName(sparkProperties.getAppName())
.setMaster(sparkProperties.getMasterUrL())
.setJars(sparkJars);
AbstractEnvironment abstractEnvironment = ((AbstractEnvironment) env);
MutablePropertySources sources = abstractEnvironment.getPropertySources();
for (PropertySource<?> source : sources) {
if (source instanceof MapPropertySource) {
Map<String, Object> propertyMap = ((MapPropertySource) source).getSource();
for (Map.Entry<String, Object> entry : propertyMap.entrySet()) {
String key = entry.getKey();
if (key.startsWith("spark.")) {
if ("spark.jars".equals(key)){
continue;
}
String value = env.getProperty(key);
conf.set(key,value);
logger.info("已識別 spark 配置屬性,{}:{}",key,value);
}
}
}
}
// 也可以通過此方式設(shè)置 (方式二) 二選一即可
// conf.set("spark.driver.extraClassPath","hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar");
// 也可以通過此方式設(shè)置 (方式三) 二選一即可
// conf.set("spark.executor.extraClassPath","hdfs://cdh-slave-1:8020/jars/mysql-connector-java-5.1.47.jar");
return conf;
}
/**
* 連接 spark 集群
*
* @param sparkConf
* @return
*/
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext(SparkConf sparkConf) {
return new JavaSparkContext(sparkConf);
}
/**
*
* @param javaSparkContext
* @return
*/
@Bean
public SparkSession sparkSession(JavaSparkContext javaSparkContext) {
return SparkSession
.builder()
.sparkContext(javaSparkContext.sc())
.appName(sparkProperties.getAppName())
.getOrCreate();
}
}
踩坑記錄
ClassNotFoundException:xxxxxx$1
異常信息截圖
產(chǎn)生的原因分析
Spark 在執(zhí)行過程中,會將jar 進行網(wǎng)絡傳輸,但是代碼中包含了匿名內(nèi)部類,因此產(chǎn)生了$1這種后綴的class 文件
示例
解決方案
在裝配時候,通過 setJars方法,添加當前的jar包作為傳輸對象,該路徑可以為相對路徑,或者 hdfs 路徑
示例代碼目標
使用 spark sql 進行簡單的查詢示例
- 簡單的條件查詢
- 稍微復雜一些的聚合查詢
- 每行數(shù)據(jù)對象,轉(zhuǎn)換為自定義Class對象
- 目標數(shù)據(jù)存儲到MySQL數(shù)據(jù)庫中
- 符合斷言判斷
package com.example.cdh.service;
import com.example.cdh.dto.UserDTO;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static org.apache.spark.sql.functions.column;
import static org.apache.spark.sql.functions.count;
/**
* 使用 spark sql 離線計算
*
* @author chunyang.leng
* @date 2023-04-12 14:53
*/
@Component
public class SparkOfflineService implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(SparkOfflineService.class);
private static final long serialVersionUID = 1L;
@Autowired
private SparkSession sparkSession;
/**
* 統(tǒng)計 hdfs 中一個csv文件的行數(shù)
*
* @param hdfsPath demo: hdfs://cdh-slave-1:8020/demo/csv/input.csv
* @return
*/
public long countHdfsCsv(String hdfsPath) {
return sparkSession.read().csv(hdfsPath).count();
}
/**
* 小于等于 計算示例
* <br/>
* <pre>
* {@code select name, age from xx where age <=#{age} }
* </pre>
* @param hdfsPath 要計算的文件
* @param age 閾值
* @return 算出來的數(shù)據(jù)總量
*/
public long lte(String hdfsPath, int age) {
// 臨時表名稱
String tempTableName = "cdh_demo_lte";
// 加載 csv 數(shù)據(jù)
Dataset<UserDTO> data = loadCsv(hdfsPath);
// 創(chuàng)建臨時表
data.createOrReplaceTempView(tempTableName);
// 執(zhí)行 sql 語句
Dataset<Row> sqlData = sparkSession
.sql("select name,age from " + tempTableName + " where age <= " + age);
// 存儲數(shù)據(jù)
saveToMySQL(tempTableName, sqlData);
return sqlData.count();
}
/**
* 簡單的聚合查詢示例
* <br/>
* <pre>
* {@code
* select
* count(name) as c,
* age
* from
* xx
* group by age
*
* having c > #{count}
*
* order by c desc
* }
* </pre>
* @param hdfsPath 要統(tǒng)計的文件
* @param count having > #{count}
* @return
*/
public long agg(String hdfsPath, int count){
// 臨時表名稱
String tempTableName = "cdh_demo_agg";
// 加載 csv 數(shù)據(jù)
Dataset<UserDTO> data = loadCsv(hdfsPath);
// 創(chuàng)建臨時表
data.createOrReplaceTempView(tempTableName);
// 執(zhí)行 sql 語句
Dataset<Row> sqlData = sparkSession
.sql("select name,age from " + tempTableName)
.groupBy(column("age").alias("age"))
.agg(count("name").alias("c"))
// filter = having
.filter(column("c").gt(count))
// 按照統(tǒng)計出來的數(shù)量,降序排序
.orderBy(column("c").desc());
saveToMySQL(tempTableName, sqlData);
return sqlData.count();
}
/**
* 加載 hdfs 中 csv 文件內(nèi)容
* @param hdfsPath
* @return
*/
private Dataset<UserDTO> loadCsv(String hdfsPath) {
// 自定義數(shù)據(jù)類型,也可以使用數(shù)據(jù)類型自動推斷
StructField nameField = DataTypes.createStructField("name", DataTypes.StringType, true);
StructField ageField = DataTypes.createStructField("age", DataTypes.IntegerType, true);
StructField[] fields = new StructField[2];
fields[0] = nameField;
fields[1] = ageField;
StructType schema = new StructType(fields);
return sparkSession
.read()
.schema(schema)
.csv(hdfsPath)
.map(new MapFunction<Row, UserDTO>() {
@Override
public UserDTO call(Row row) throws Exception {
UserDTO dto = new UserDTO();
// 防止 npe
if (!row.isNullAt(0)){
dto.setName(row.getString(0));
}
// 防止 npe
if (!row.isNullAt(1)) {
dto.setAge(row.getInt(1));
}
return dto;
}
}, Encoders.bean(UserDTO.class));
}
/**
* 數(shù)據(jù)存儲到 mysql
* @param tableName 表名字
* @param dataset 數(shù)據(jù)
*/
private void saveToMySQL(String tableName,Dataset<Row> dataset){
dataset
.write()
// 覆蓋模式,原始數(shù)據(jù)會被覆蓋掉,如果需要追加,換成 SaveMode.Append
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", "jdbc:mysql://10.8.0.4/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", tableName)
.option("user", "root")
.option("password", "q")
.save();
}
}
測試用例代碼
package com.example.cdh.spark;
import com.example.cdh.service.HdfsService;
import com.example.cdh.service.SparkOfflineService;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.Assert;
/**
* @author chunyang.leng
* @date 2023-04-12 14:55
*/
@SpringBootTest
public class SparkOfflineTest {
private static final Logger logger = LoggerFactory.getLogger(SparkOfflineTest.class);
@Autowired
private SparkOfflineService sparkOfflineService;
@Autowired
private HdfsService hdfsService;
String path = "hdfs://cdh-slave-1:8020/demo/csv/input.csv";
@Test
public void countHdfsCsvTest() throws IOException {
initHdfsData();
long count = sparkOfflineService.countHdfsCsv(path);
cleanup();
Assert.isTrue(count == 6,"查詢的結(jié)果應該為6");
logger.info("統(tǒng)計測試執(zhí)行完畢");
}
@Test
public void lteTest() throws IOException {
initHdfsData();
long count = sparkOfflineService.lte(path, 19);
cleanup();
Assert.isTrue(count == 3,"查詢的結(jié)果應該為 3");
logger.info("簡單條件測試執(zhí)行完畢");
}
@Test
public void aggTest() throws IOException {
initHdfsData();
long count = sparkOfflineService.agg(path, 1);
cleanup();
Assert.isTrue(count == 1,"查詢的結(jié)果應該為 1");
logger.info("聚合測試執(zhí)行完畢");
}
private void initHdfsData() throws IOException {
String data =
"name,age\n"+
"n0,17\n" +
"n1,18\n" +
"n2,19\n" +
"n3,20\n" +
"n4,20\n";
hdfsService.delete(path,true);
hdfsService.uploadFile(data.getBytes(StandardCharsets.UTF_8),path,true);
}
private void cleanup() throws IOException {
hdfsService.delete(path,true);
}
}
測試結(jié)果
單元測試通過
lte臨時表數(shù)據(jù)驗證通過
文章來源:http://www.zghlxwxcb.cn/news/detail-472469.html
agg 臨時表數(shù)據(jù)驗證通過
文章來源地址http://www.zghlxwxcb.cn/news/detail-472469.html
到了這里,關(guān)于大數(shù)據(jù)技術(shù)(入門篇)--- 使用Spring Boot 操作 CDH6.2.0 Spark SQL進行離線計算的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!