前言
本文將講解 Spark 的部署,并通過三個簡單的步驟來編寫一個獨立應(yīng)用程序。
我們將使用本地模式,其中所有的處理都是在Spark shell中的一臺機器上完成的——這是學習框架的一種簡單方法,迭代執(zhí)行的方式可以及時反饋直接結(jié)果。使用Spark shell,可以在編寫復雜的Spark應(yīng)用程序之前使用小數(shù)據(jù)集對Spark進行操作驗證,但是對于想要獲得分布式執(zhí)行好處的大數(shù)據(jù)集或生產(chǎn)環(huán)境,建議使用YARN或Kubernetes部署模式。
雖然Spark shell只支持Scala、Python和R,但你可以用任何支持的語言(包括Java)編寫Spark應(yīng)用程序,并使用Spark SQL發(fā)出查詢。
步驟一:下載安裝包
進入Spark下載頁面,在步驟2的下拉菜單中選擇“Pre-built for Apache Hadoop 3.3”,然后點擊步驟3中的“download Spark”鏈接(如圖2-1所示)。
圖2 - 1. Apache Spark下載頁面
這將下載壓縮包spark-3.5.1-bin-hadoop3.tgz。它包含在筆記本電腦上以本地模式運行Spark所需的所有與hadoop相關(guān)的二進制文件?;蛘?,如果要將它安裝在現(xiàn)有的HDFS或Hadoop安裝上,可以從下拉菜單中選擇匹配的Hadoop版本。如果想要以源碼編譯的方式部署,可以在官方文檔中閱讀更多相關(guān)內(nèi)容。
自Apache Spark 2.2發(fā)布以來,只關(guān)心在Python中學習Spark的開發(fā)人員可以選擇從PyPI存儲庫安裝PySpark。如果你只用Python編程,你不需要安裝運行Scala、Java或R所需的所有其他庫; 要從PyPI安裝PySpark,只需運行pip install PySpark
。
可以通過pip install pyspark[SQL, ML, MLlib]
安裝SQL, ML和MLlib的一些額外依賴項(如果只想要SQL依賴項,也可以通過pip install pyspark[SQL]
)。
NOTE
需要在機器上安裝Java 8或更高版本,并設(shè)置JAVA_HOME環(huán)境變量。有關(guān)如何下載和安裝Java的說明,請參閱文檔。
如果想以解釋性shell模式運行R,則必須先安裝R,然后再運行sparkR
。要使用R進行分布式計算,還可以使用R社區(qū)創(chuàng)建的開源項目sparklyr
。
Spark的目錄和文件
本文中的所有命令和指令都是在 Unix 系統(tǒng)上運行的。下載完tarball后,cd
到下載目錄,使用tar -xf spark-3.5.1-bin-hadoop3.tgz
解壓tarball內(nèi)容,其中內(nèi)容如下:
$ cd spark-3.0.0-preview2-bin-hadoop2.7
$ ls
LICENSE R RELEASE conf examples kubernetes python yarn
NOTICE README.md bin data jars licenses sbin
README.md
- 這個文件包含了關(guān)于如何使用Spark shell、如何從源代碼構(gòu)建Spark、如何運行獨立的Spark示例、如何閱讀Spark文檔和配置指南的鏈接,以及如何為Spark做出貢獻的新的詳細說明.
bin
- 顧名思義,該目錄包含用于與Spark交互的大多數(shù)腳本,包括Spark shell (Spark -sql、pyspark、Spark -shell和sparkR)。我們將在后面使用這個目錄中的shell和可執(zhí)行文件,使用Spark -submit提交一個獨立的Spark應(yīng)用程序,并編寫一個腳本,在Kubernetes支持下運行Spark時構(gòu)建和推送Docker 鏡像。
sbin
- 該目錄中的大多數(shù)腳本都是用于管理的,用于在集群的各種部署模式下啟動和停止Spark組件。
kubernetes
- 自從Spark 2.4發(fā)布以來,這個目錄包含了用于在Kubernetes集群上為Spark發(fā)行版創(chuàng)建Docker鏡像的Dockerfiles。它還包含一個文件,提供如何在構(gòu)建Docker映像之前構(gòu)建Spark發(fā)行版的說明。
data
- 該目錄中填充了*.txt文件,這些文件作為Spark組件的輸入:MLlib、Structured Streaming和GraphX。
examples
- Spark提供了Java、Python、R和Scala的示例,可以在學習該框架時用到它們。
步驟二:使用Scala或PySpark Shell
如前所述,Spark附帶了四個廣泛使用的解釋器,它們就像交互式“shell”一樣,支持臨時數(shù)據(jù)分析:pyspark、Spark -shell、Spark-sql和sparkR。
這些shell已經(jīng)支持連接到集群,并允許你將分布式數(shù)據(jù)加載到Spark worker的內(nèi)存中。無論你是在處理千兆字節(jié)的數(shù)據(jù)還是小數(shù)據(jù)集,Spark shell都有助于快速學習Spark。
要啟動PySpark, cd
到 bin 目錄并輸入PySpark
啟動shell。如果你已經(jīng)從PyPI安裝了PySpark,那么只需輸入PySpark就足夠了:
$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
20/02/16 19:28:48 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> spark.version
'3.0.0-preview2'
>>>
要用Scala啟動一個類似的Spark shell, cd
到bin目錄并輸入Spark -shell
:
$ spark-shell
20/05/07 19:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.0.1.7:4040
Spark context available as 'sc' (master = local[*], app id = local-1581910231902)
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.version
res0: String = 3.0.0-preview2
scala>
本地 shell 運行
Spark計算被表示為算子。然后,這些算子被轉(zhuǎn)換成低級的基于rdd的字節(jié)碼作為任務(wù),分發(fā)給Spark的執(zhí)行器執(zhí)行。
讓我們看一個簡短的示例,其中我們以DataFrame的形式讀取文本文件,顯示讀取的字符串示例,并計算文件中的總行數(shù)。這個簡單的例子說明了高級結(jié)構(gòu)化api的使用。DataFrame上的show(10, false)操作只顯示前10行,不截斷。
默認情況下,截斷布爾標志為true。下面是它在Scala shell中的樣子:
scala> val strings = spark.read.text("../README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]
scala> strings.show(10, false)
+------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized |
|engine that supports general computation graphs for data analysis. It also |
|supports a rich set of higher-level tools including Spark SQL for SQL and |
|DataFrames, MLlib for machine learning, GraphX for graph processing, |
| and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+------------------------------------------------------------------------------+
only showing top 10 rows
scala> strings.count()
res2: Long = 109
scala>
讓我們看一個使用Python解釋性shell pyspark的類似示例:
$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> strings = spark.read.text("../README.md")
>>> strings.show(10, truncate=False)
+------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized |
|engine that supports general computation graphs for data analysis. It also |
|supports a rich set of higher-level tools including Spark SQL for SQL and |
|DataFrames, MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+------------------------------------------------------------------------------+
only showing top 10 rows
>>> strings.count()
109
>>>
要退出任何Spark shell,按Ctrl-D。這種與Spark shell的快速交互不僅有利于快速學習,也有利于快速驗證實驗。
我們使用高級結(jié)構(gòu)化api將文本文件讀入Spark DataFrame而不是RDD,目前基本上已經(jīng)很少直接使用 RDD 去操作數(shù)據(jù),而是使用 API。
NOTE
在高級結(jié)構(gòu)化api中表達的每一個計算都被分解為低級的RDD操作,然后轉(zhuǎn)換為Scala字節(jié)碼,供執(zhí)行器的jvm使用。這個生成的RDD操作代碼對用戶來說是不可訪問的,也與面向用戶的RDD api不一樣。
步驟3:理解Spark應(yīng)用中的概念
要理解我們的示例代碼在底層發(fā)生了什么,需要熟悉Spark應(yīng)用程序的一些關(guān)鍵概念,以及代碼如何作為任務(wù)在Spark執(zhí)行器之間轉(zhuǎn)換和執(zhí)行:
Application
- 使用Spark的 APIs 構(gòu)建在Spark上的用戶程序, 它由集群上的 driver 和 executors 組成。
SparkSession
- 它提供了與底層Spark功能交互的入口點的一個實例化對象,并允許使用Spark的api對Spark進行編程。在交互式Spark shell中,Spark driver 會自動實例化一個SparkSession,而在Spark應(yīng)用程序中,我們自己可以創(chuàng)建一個SparkSession對象。
Job
- 由多個任務(wù)組成的并行計算,這些任務(wù)在響應(yīng)Spark操作(例如,
save()
,collect()
)時產(chǎn)生。
Stage
- 每個任務(wù)被分成更小的任務(wù)集,稱為階段,這些階段相互依賴。
Task
- 將被發(fā)送到Spark executor 的操作命令或單個執(zhí)行單元。
Spark Application and SparkSession
每個Spark application 的核心是Spark driver 程序,它創(chuàng)建一個SparkSession對象。當你使用Spark shell時,driver 是shell的一部分,并且創(chuàng)建了SparkSession對象(可通過變量Spark訪問),正如在啟動shell時在前面的示例中看到的那樣。
在這些示例中,因為在筆記本電腦上本地啟動了Spark shell,所以所有操作都在本地運行,在單個JVM中運行。但是,你可以像在本地模式下一樣輕松地啟動Spark shell來在集群上并行分析數(shù)據(jù)。命令Spark -shell——help或pyspark——help將向您展示如何連接到Spark集群管理器。圖2-2顯示了Spark在集群上的執(zhí)行情況。
圖2 - 2. 在Spark的分布式架構(gòu)中,Spark組件通過Spark driver 進行通信
一旦有了SparkSession,就可以使用api對Spark進行編程來執(zhí)行Spark操作。
Spark Jobs
在與Spark shell的交互會話中,Driver 將我們的Spark應(yīng)用程序轉(zhuǎn)換為一個或多個Spark作業(yè)(圖2-3)。然后將每個作業(yè)轉(zhuǎn)換為DAG。本質(zhì)上,這就是Spark的執(zhí)行計劃,其中DAG中的每個節(jié)點可以是單個或多個Spark階段。
圖2 - 3. 創(chuàng)建一個或多個Spark作業(yè)的Spark Driver
Spark Stages
作為DAG節(jié)點的一部分,階段是根據(jù)可以串行或并行執(zhí)行的算子創(chuàng)建的(圖2-4)。并非所有的Spark 算子都可以在一個階段中發(fā)生,因此它們可以被劃分為多個階段。通常階段是在運算符的計算邊界上劃分的,在那里它們規(guī)定了Spark executor 之間的數(shù)據(jù)傳輸。
圖2 - 4. 創(chuàng)建一個或多個階段的Spark job
Spark Tasks
每個階段都由Spark任務(wù)(一個執(zhí)行單元)組成,然后在每個Spark executor 上執(zhí)行.
每個任務(wù)映射到一個核,在一個數(shù)據(jù)分區(qū)上工作(圖2-5)。因此,一個16核的執(zhí)行器可以在16個或更多的分區(qū)上并行運行16個或更多的任務(wù),這使得Spark的任務(wù)執(zhí)行并行度很高:
圖2 - 5. 創(chuàng)建一個或多個任務(wù)以分發(fā)給 executor 的 Spark stage
轉(zhuǎn)換、立即執(zhí)行操作和延遲求值
分布式數(shù)據(jù)上的Spark操作可以分為兩種類型: 轉(zhuǎn)換和執(zhí)行操作。顧名思義,轉(zhuǎn)換在不改變原始數(shù)據(jù)的情況下將Spark DataFrame轉(zhuǎn)換為新的DataFrame,從而使其具有不可變性。
換句話說,像select()
或filter()
這樣的操作不會改變原始DataFrame;相反,它將返回轉(zhuǎn)換后的操作結(jié)果作為一個新的DataFrame。
所有的轉(zhuǎn)換都是延遲執(zhí)行的,它們的結(jié)果不是立即計算出來的,而是作為一個轉(zhuǎn)換關(guān)系被記錄。這些記錄允許Spark在稍后的執(zhí)行計劃中重新安排某些轉(zhuǎn)換,合并它們,或者將轉(zhuǎn)換優(yōu)化到更有效的執(zhí)行階段。延遲計算是Spark延遲執(zhí)行的策略,直到一個執(zhí)行操作被調(diào)用或數(shù)據(jù)被“使用”(從磁盤讀取或?qū)懭氪疟P)。
執(zhí)行操作觸發(fā)所有轉(zhuǎn)換記錄的延遲計算。在圖2-6中,所有的轉(zhuǎn)換T都被記錄下來,直到動作A被調(diào)用。每個轉(zhuǎn)換T產(chǎn)生一個新的DataFrame。
圖2 - 6. 延遲轉(zhuǎn)換和立即執(zhí)行求值的操作
延遲求值通過轉(zhuǎn)換血緣關(guān)系和數(shù)據(jù)不變性提供了容錯性,允許Spark通過鏈式調(diào)用轉(zhuǎn)換來優(yōu)化查詢。由于Spark在轉(zhuǎn)換血緣關(guān)系中記錄了每個轉(zhuǎn)換,并且dataframe在轉(zhuǎn)換之間是不可變的,因此它可以通過簡單地重新執(zhí)行血緣關(guān)系的記錄來重現(xiàn)其原始狀態(tài),從而在發(fā)生故障時提供彈性。
下邊列出了一些轉(zhuǎn)換和操作的示例:
這些動作和轉(zhuǎn)換構(gòu)成了一個Spark查詢計劃,在調(diào)用操作之前,查詢計劃中不會執(zhí)行任何內(nèi)容。下面的示例用Python和Scala顯示,有兩個轉(zhuǎn)換——read()
和filter()
——和一個立即執(zhí)行操作 count()
。該操作觸發(fā)了作為查詢執(zhí)行計劃的一部分記錄的所有轉(zhuǎn)換的執(zhí)行。在這個例子中,在shell中執(zhí)行filter .count()
之前什么都不會發(fā)生:
# In Python
>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
20
// In Scala
scala> import org.apache.spark.sql.functions._
scala> val strings = spark.read.text("../README.md")
scala> val filtered = strings.filter(col("value").contains("Spark"))
scala> filtered.count()
res5: Long = 20s
窄變換和寬變換
如前所述,轉(zhuǎn)換是Spark 延遲計算的操作。延遲求值方案的一個巨大優(yōu)勢是,Spark可以檢查你的計算性查詢,并確定如何優(yōu)化它。這種優(yōu)化可以通過連接或管道化一些操作并將它們分配到一個階段來完成,或者通過確定哪些操作需要跨集群的shuffle或數(shù)據(jù)交換來將它們分解為階段來完成。
轉(zhuǎn)換可以分為窄依賴關(guān)系和寬依賴關(guān)系。任何可以從單個輸入分區(qū)計算單個輸出分區(qū)的轉(zhuǎn)換都是窄轉(zhuǎn)換。例如,在前面的代碼片段中,filter()
和contains()
表示狹窄的轉(zhuǎn)換,因為它們可以在單個分區(qū)上操作并生成結(jié)果輸出分區(qū),而無需交換任何數(shù)據(jù)。
但是,groupBy()
或orderBy()
之類的轉(zhuǎn)換會指示Spark執(zhí)行寬轉(zhuǎn)換,其中來自其他分區(qū)的數(shù)據(jù)被讀入、合并并寫入磁盤。如果我們要通過調(diào)用.orderby()
對前面示例中filtered
后的DataFrame進行排序,那么每個分區(qū)都將在本地排序,但是我們需要強制對集群中每個執(zhí)行器分區(qū)中的數(shù)據(jù)進行過濾,以便對所有記錄進行排序。與窄轉(zhuǎn)換相比,寬轉(zhuǎn)換需要其他分區(qū)的輸出來計算最終的聚合。
下圖說明了兩種類型的依賴關(guān)系:
Spark UI
Spark包含一個圖形用戶界面,可以使用它來檢查或監(jiān)視Spark應(yīng)用程序的各個分解階段(即job、state 和 tasks)。根據(jù)Spark的部署方式,驅(qū)動程序啟動一個web UI,默認在端口4040上運行,可以在其中查看指標和詳細信息,例如:
- 調(diào)度 stages 和 tasks 的列表
- RDD大小和內(nèi)存使用的概要描述
- 運行環(huán)境相關(guān)信息
- 正在運行的 executors 信息
- 所有的Spark SQL 查詢
在本地模式下,可以通過瀏覽器http://:4040訪問該接口。
NOTE
當啟動spark-shell時,輸出日志部分會顯示要在端口4040上訪問的本地主機URL。
讓我們看一下前邊的Python示例是如何轉(zhuǎn)換為job、stage 和 tasks的。要查看DAG的外觀,單擊web UI中的“DAG可視化”。如下圖所示,Driver 創(chuàng)建了一個 job 和一個 stage:
注意,這里不需要Exchange
(執(zhí)行器之間交換數(shù)據(jù)的地方),因為只有一個階段。每個單獨的操作用藍框表示。
stage 0由一個task 組成。如果你有多個任務(wù),它們將并行執(zhí)行。在“stages”頁中可以查看各個stage的詳細信息,如下圖所示:
在后續(xù)的文章系列中,我會詳細介紹 UI 界面的使用,這里先只做個簡單的介紹。
單機的應(yīng)用程序
Spark發(fā)行版為每個Spark組件提供了一組示例應(yīng)用程序。
從本地機器上的安裝目錄中,可以運行使用該命令提供的幾個Java或Scala示例程序之一:bin/run-example _<class> [params]_
$ ./bin/run-example JavaWordCount README.md
這將在控制臺上INFO 信息中輸出 README.md 文件中每個單詞的列表及其計數(shù)(計數(shù)單詞是分布式計算的“Hello, World”)。
計算巧克力豆的數(shù)量
在前面的例子中,我們統(tǒng)計了文件中的單詞。如果文件很大,它將分布在一個被劃分為小數(shù)據(jù)塊的集群上,我們的Spark程序?qū)⒎峙溆嬎忝總€分區(qū)中每個單詞的任務(wù),并返回最終的聚合計數(shù),但這個例子已經(jīng)有點過時了。
讓我們來解決一個類似的問題,但是使用更大的數(shù)據(jù)集,并使用更多Spark的分發(fā)功能和DataFrame api。
如下圖有很多巧克力豆的餅干,我們需要將這些不同顏色的巧克力豆分配給不同的人。
讓我們編寫一個Spark程序,讀取一個包含超過100,000個條目的文件(其中每行或每行都有一個<state, mnm_color, count>),并計算和匯總每種顏色和狀態(tài)的計數(shù)。這些匯總的計數(shù)告訴我們每個人喜歡的m&m巧克力豆的顏色。下邊給出了完整的 Python 代碼:
# Import the necessary libraries.
# Since we are using Python, import the SparkSession and related functions
# from the PySpark module.
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)
# Build a SparkSession using the SparkSession APIs.
# If one does not exist, then create an instance. There
# can only be one SparkSession per JVM.
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# Get the M&M data set filename from the command-line arguments
mnm_file = sys.argv[1]
# Read the file into a Spark DataFrame using the CSV
# format by inferring the schema and specifying that the
# file contains a header, which provides column names for comma-
# separated fields.
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
# We use the DataFrame high-level APIs. Note
# that we don't use RDDs at all. Because some of Spark's
# functions return the same object, we can chain function calls.
# 1. Select from the DataFrame the fields "State", "Color", and "Count"
# 2. Since we want to group each state and its M&M color count,
# we use groupBy()
# 3. Aggregate counts of all colors and groupBy() State and Color
# 4 orderBy() in descending order
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# Show the resulting aggregations for all the states and colors;
# a total count of each color per state.
# Note show() is an action, which will trigger the above
# query to be executed.
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
# While the above code aggregated and counted for all
# the states, what if we just want to see the data for
# a single state, e.g., CA?
# 1. Select from all rows in the DataFrame
# 2. Filter only CA state
# 3. groupBy() State and Color as we did above
# 4. Aggregate the counts for each color
# 5. orderBy() in descending order
# Find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.where(mnm_df.State == "CA")
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# Show the resulting aggregation for California.
# As above, show() is an action that will trigger the execution of the
# entire computation.
ca_count_mnm_df.show(n=10, truncate=False)
# Stop the SparkSession
spark.stop()
創(chuàng)建 mnmcount.py 文件,mnn_datasets .csv 文件數(shù)據(jù)集下載地址,并使用安裝的bin目錄中的submit- Spark腳本將其作為Spark作業(yè)提交。將SPARK_HOME環(huán)境變量設(shè)置為在本地機器上安裝Spark的根目錄。
NOTE
前面的代碼使用DataFrame API,讀起來像高級DSL查詢。我將在后續(xù)文章中介紹這個和其他api。與RDD API不同,你可以使用它來指示Spark做什么,而不是如何做,這是清晰和簡單的!
為了避免將詳細的INFO消息打印到控制臺中,請復制 log4j.properties.template 模板文件到 log4j.properties。并設(shè)置log4j.conf/log4j.conf
文件中的rootCategory=WARN
。
執(zhí)行提交命令,提交上邊的 Pyhton 代碼至 Spark 集群:
$SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv
-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
| CA|Yellow| 100956|
| WA| Green| 96486|
| CA| Brown| 95762|
| TX| Green| 95753|
| TX| Red| 95404|
| CO|Yellow| 95038|
| NM| Red| 94699|
| OR|Orange| 94514|
| WY| Green| 94339|
| NV|Orange| 93929|
| TX|Yellow| 93819|
| CO| Green| 93724|
| CO| Brown| 93692|
| CA| Green| 93505|
| NM| Brown| 93447|
| CO| Blue| 93412|
| WA| Red| 93332|
| WA| Brown| 93082|
| WA|Yellow| 92920|
| NM|Yellow| 92747|
| NV| Brown| 92478|
| TX|Orange| 92315|
| AZ| Brown| 92287|
| AZ| Green| 91882|
| WY| Red| 91768|
| AZ|Orange| 91684|
| CA| Red| 91527|
| WA|Orange| 91521|
| NV|Yellow| 91390|
| UT|Orange| 91341|
| NV| Green| 91331|
| NM|Orange| 91251|
| NM| Green| 91160|
| WY| Blue| 91002|
| UT| Red| 90995|
| CO|Orange| 90971|
| AZ|Yellow| 90946|
| TX| Brown| 90736|
| OR| Blue| 90526|
| CA|Orange| 90311|
| OR| Red| 90286|
| NM| Blue| 90150|
| AZ| Red| 90042|
| NV| Blue| 90003|
| UT| Blue| 89977|
| AZ| Blue| 89971|
| WA| Blue| 89886|
| OR| Green| 89578|
| CO| Red| 89465|
| NV| Red| 89346|
| UT|Yellow| 89264|
| OR| Brown| 89136|
| CA| Blue| 89123|
| UT| Brown| 88973|
| TX| Blue| 88466|
| UT| Green| 88392|
| OR|Yellow| 88129|
| WY|Orange| 87956|
| WY|Yellow| 87800|
| WY| Brown| 86110|
+-----+------+----------+
Total Rows = 60
+-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
| CA|Yellow| 100956|
| CA| Brown| 95762|
| CA| Green| 93505|
| CA| Red| 91527|
| CA|Orange| 90311|
| CA| Blue| 89123|
+-----+------+----------+
首先我們看到每個地區(qū)的人喜歡的顏色的聚合數(shù)據(jù),下邊是單個地區(qū)的。
下邊是 Scala 版本代碼運行相同的應(yīng)用程序:
package main.scala.chapter2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
* Usage: MnMcount <mnm_file_dataset>
*/
object MnMcount {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("MnMCount")
.getOrCreate()
if (args.length < 1) {
print("Usage: MnMcount <mnm_file_dataset>")
sys.exit(1)
}
// Get the M&M data set filename
val mnmFile = args(0)
// Read the file into a Spark DataFrame
val mnmDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnmFile)
// Aggregate counts of all colors and groupBy() State and Color
// orderBy() in descending order
val countMnMDF = mnmDF
.select("State", "Color", "Count")
.groupBy("State", "Color")
.sum("Count")
.orderBy(desc("sum(Count)"))
// Show the resulting aggregations for all the states and colors
countMnMDF.show(60)
println(s"Total Rows = ${countMnMDF.count()}")
println()
// Find the aggregate counts for California by filtering
val caCountMnNDF = mnmDF
.select("State", "Color", "Count")
.where(col("State") === "CA")
.groupBy("State", "Color")
.sum("Count")
.orderBy(desc("sum(Count)"))
// Show the resulting aggregations for California
caCountMnMDF.show(10)
// Stop the SparkSession
spark.stop()
}
}
單機編譯 Scala 程序
下邊將說明如何使用Scala構(gòu)建工具(sbt)構(gòu)建一個Scala Spark程序。build.sbt
是規(guī)范文件,與makefile類似,它描述并指示Scala編譯器構(gòu)建與Scala相關(guān)的任務(wù),例如 jar 包、packages、要解析的依賴項以及在哪里查找它們。下邊是一個簡單構(gòu)建的例子:
// Name of the package
name := "main/scala/chapter2"
// Version of our package
version := "1.0"
// Version of Scala
scalaVersion := "2.12.10"
// Spark library dependencies
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.0.0-preview2",
"org.apache.spark" %% "spark-sql" % "3.0.0-preview2"
)
確保已經(jīng)安裝了Java開發(fā)工具包(JDK)和sbt,并設(shè)置了JAVA_HOME和SPARK_HOME,用一個命令,就可以構(gòu)建Spark應(yīng)用程序:
$ sbt clean package
[info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/
project/build.properties: set sbt.version to 1.2.8
[info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/project
[info] Updating
[info] Done updating.
...
[info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/
scala-2.12/main-scala-chapter2_2.12-1.0.jar ...
[info] Done packaging.
[success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM
成功構(gòu)建后,您可以運行Scala版本的計數(shù)示例,如下所示:文章來源:http://www.zghlxwxcb.cn/news/detail-849275.html
$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount \
jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv
...
...
20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4:
Stage finished
20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49,
took 0.264579 s
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
| CA|Yellow| 1807|
| CA| Green| 1723|
| CA| Brown| 1718|
| CA|Orange| 1657|
| CA| Red| 1656|
| CA| Blue| 1603|
+-----+------+-----+
總結(jié)
在本章中,我們介紹了開始使用Apache Spark所需的三個簡單步驟:下載框架,熟悉Scala或PySpark交互shell,掌握高級Spark應(yīng)用程序概念和術(shù)語。我們快速概述了使用轉(zhuǎn)換和操作來編寫Spark應(yīng)用程序的過程,并簡要介紹了使用Spark UI來檢查所創(chuàng)建的job、stage和task。
最后,通過一個簡短的示例,展示了如何使用高級結(jié)構(gòu)化api來告訴Spark要做什么——在下一篇文章我將更詳細地介紹這些api。文章來源地址http://www.zghlxwxcb.cn/news/detail-849275.html
到了這里,關(guān)于Spark 部署與應(yīng)用程序交互簡單使用說明的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!