国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark 部署與應(yīng)用程序交互簡單使用說明

這篇具有很好參考價值的文章主要介紹了Spark 部署與應(yīng)用程序交互簡單使用說明。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

本文將講解 Spark 的部署,并通過三個簡單的步驟來編寫一個獨立應(yīng)用程序。
我們將使用本地模式,其中所有的處理都是在Spark shell中的一臺機器上完成的——這是學習框架的一種簡單方法,迭代執(zhí)行的方式可以及時反饋直接結(jié)果。使用Spark shell,可以在編寫復雜的Spark應(yīng)用程序之前使用小數(shù)據(jù)集對Spark進行操作驗證,但是對于想要獲得分布式執(zhí)行好處的大數(shù)據(jù)集或生產(chǎn)環(huán)境,建議使用YARN或Kubernetes部署模式。
雖然Spark shell只支持Scala、Python和R,但你可以用任何支持的語言(包括Java)編寫Spark應(yīng)用程序,并使用Spark SQL發(fā)出查詢。

步驟一:下載安裝包

進入Spark下載頁面,在步驟2的下拉菜單中選擇“Pre-built for Apache Hadoop 3.3”,然后點擊步驟3中的“download Spark”鏈接(如圖2-1所示)。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 1. Apache Spark下載頁面
這將下載壓縮包spark-3.5.1-bin-hadoop3.tgz。它包含在筆記本電腦上以本地模式運行Spark所需的所有與hadoop相關(guān)的二進制文件?;蛘?,如果要將它安裝在現(xiàn)有的HDFS或Hadoop安裝上,可以從下拉菜單中選擇匹配的Hadoop版本。如果想要以源碼編譯的方式部署,可以在官方文檔中閱讀更多相關(guān)內(nèi)容。
自Apache Spark 2.2發(fā)布以來,只關(guān)心在Python中學習Spark的開發(fā)人員可以選擇從PyPI存儲庫安裝PySpark。如果你只用Python編程,你不需要安裝運行Scala、Java或R所需的所有其他庫; 要從PyPI安裝PySpark,只需運行pip install PySpark。
可以通過pip install pyspark[SQL, ML, MLlib]安裝SQL, ML和MLlib的一些額外依賴項(如果只想要SQL依賴項,也可以通過pip install pyspark[SQL])。

NOTE
需要在機器上安裝Java 8或更高版本,并設(shè)置JAVA_HOME環(huán)境變量。有關(guān)如何下載和安裝Java的說明,請參閱文檔。

如果想以解釋性shell模式運行R,則必須先安裝R,然后再運行sparkR。要使用R進行分布式計算,還可以使用R社區(qū)創(chuàng)建的開源項目sparklyr

Spark的目錄和文件

本文中的所有命令和指令都是在 Unix 系統(tǒng)上運行的。下載完tarball后,cd到下載目錄,使用tar -xf spark-3.5.1-bin-hadoop3.tgz解壓tarball內(nèi)容,其中內(nèi)容如下:

$ cd spark-3.0.0-preview2-bin-hadoop2.7
$ ls
LICENSE   R          RELEASE   conf    examples   kubernetes  python   yarn
NOTICE    README.md  bin       data    jars       licenses    sbin

README.md

  • 這個文件包含了關(guān)于如何使用Spark shell、如何從源代碼構(gòu)建Spark、如何運行獨立的Spark示例、如何閱讀Spark文檔和配置指南的鏈接,以及如何為Spark做出貢獻的新的詳細說明.

bin

  • 顧名思義,該目錄包含用于與Spark交互的大多數(shù)腳本,包括Spark shell (Spark -sql、pyspark、Spark -shell和sparkR)。我們將在后面使用這個目錄中的shell和可執(zhí)行文件,使用Spark -submit提交一個獨立的Spark應(yīng)用程序,并編寫一個腳本,在Kubernetes支持下運行Spark時構(gòu)建和推送Docker 鏡像。

sbin

  • 該目錄中的大多數(shù)腳本都是用于管理的,用于在集群的各種部署模式下啟動和停止Spark組件。

kubernetes

  • 自從Spark 2.4發(fā)布以來,這個目錄包含了用于在Kubernetes集群上為Spark發(fā)行版創(chuàng)建Docker鏡像的Dockerfiles。它還包含一個文件,提供如何在構(gòu)建Docker映像之前構(gòu)建Spark發(fā)行版的說明。

data

  • 該目錄中填充了*.txt文件,這些文件作為Spark組件的輸入:MLlib、Structured Streaming和GraphX。

examples

  • Spark提供了Java、Python、R和Scala的示例,可以在學習該框架時用到它們。

步驟二:使用Scala或PySpark Shell

如前所述,Spark附帶了四個廣泛使用的解釋器,它們就像交互式“shell”一樣,支持臨時數(shù)據(jù)分析:pyspark、Spark -shell、Spark-sql和sparkR。
這些shell已經(jīng)支持連接到集群,并允許你將分布式數(shù)據(jù)加載到Spark worker的內(nèi)存中。無論你是在處理千兆字節(jié)的數(shù)據(jù)還是小數(shù)據(jù)集,Spark shell都有助于快速學習Spark。
要啟動PySpark, cd到 bin 目錄并輸入PySpark啟動shell。如果你已經(jīng)從PyPI安裝了PySpark,那么只需輸入PySpark就足夠了:

$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
20/02/16 19:28:48 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> spark.version
'3.0.0-preview2'
>>>

要用Scala啟動一個類似的Spark shell, cd到bin目錄并輸入Spark -shell:

$ spark-shell
20/05/07 19:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.0.1.7:4040
Spark context available as 'sc' (master = local[*], app id = local-1581910231902)
Spark session available as 'spark'.
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.version
res0: String = 3.0.0-preview2
scala>

本地 shell 運行

Spark計算被表示為算子。然后,這些算子被轉(zhuǎn)換成低級的基于rdd的字節(jié)碼作為任務(wù),分發(fā)給Spark的執(zhí)行器執(zhí)行。
讓我們看一個簡短的示例,其中我們以DataFrame的形式讀取文本文件,顯示讀取的字符串示例,并計算文件中的總行數(shù)。這個簡單的例子說明了高級結(jié)構(gòu)化api的使用。DataFrame上的show(10, false)操作只顯示前10行,不截斷。
默認情況下,截斷布爾標志為true。下面是它在Scala shell中的樣子:

scala> val strings = spark.read.text("../README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]

scala> strings.show(10, false)
+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|                                                                              |
|Spark is a unified analytics engine for large-scale data processing. It       |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized      |
|engine that supports general computation graphs for data analysis. It also    |
|supports a rich set of higher-level tools including Spark SQL for SQL and     |
|DataFrames, MLlib for machine learning, GraphX for graph processing,          |
| and Structured Streaming for stream processing.                              |
|                                                                              |
|<https://spark.apache.org/>                                                   |
+------------------------------------------------------------------------------+
only showing top 10 rows

scala> strings.count()
res2: Long = 109
scala>

讓我們看一個使用Python解釋性shell pyspark的類似示例:

$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform 
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> strings = spark.read.text("../README.md")
>>> strings.show(10, truncate=False)
+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|                                                                              |
|Spark is a unified analytics engine for large-scale data processing. It       |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized      |
|engine that supports general computation graphs for data analysis. It also    |
|supports a rich set of higher-level tools including Spark SQL for SQL and     |
|DataFrames, MLlib for machine learning, GraphX for graph processing,          |
|and Structured Streaming for stream processing.                               |
|                                                                              |
|<https://spark.apache.org/>                                                   |
+------------------------------------------------------------------------------+
only showing top 10 rows

>>> strings.count()
109
>>>

要退出任何Spark shell,按Ctrl-D。這種與Spark shell的快速交互不僅有利于快速學習,也有利于快速驗證實驗。
我們使用高級結(jié)構(gòu)化api將文本文件讀入Spark DataFrame而不是RDD,目前基本上已經(jīng)很少直接使用 RDD 去操作數(shù)據(jù),而是使用 API。

NOTE
在高級結(jié)構(gòu)化api中表達的每一個計算都被分解為低級的RDD操作,然后轉(zhuǎn)換為Scala字節(jié)碼,供執(zhí)行器的jvm使用。這個生成的RDD操作代碼對用戶來說是不可訪問的,也與面向用戶的RDD api不一樣。

步驟3:理解Spark應(yīng)用中的概念

要理解我們的示例代碼在底層發(fā)生了什么,需要熟悉Spark應(yīng)用程序的一些關(guān)鍵概念,以及代碼如何作為任務(wù)在Spark執(zhí)行器之間轉(zhuǎn)換和執(zhí)行:
Application

  • 使用Spark的 APIs 構(gòu)建在Spark上的用戶程序, 它由集群上的 driver 和 executors 組成。

SparkSession

  • 它提供了與底層Spark功能交互的入口點的一個實例化對象,并允許使用Spark的api對Spark進行編程。在交互式Spark shell中,Spark driver 會自動實例化一個SparkSession,而在Spark應(yīng)用程序中,我們自己可以創(chuàng)建一個SparkSession對象。

Job

  • 由多個任務(wù)組成的并行計算,這些任務(wù)在響應(yīng)Spark操作(例如,save(), collect())時產(chǎn)生。

Stage

  • 每個任務(wù)被分成更小的任務(wù)集,稱為階段,這些階段相互依賴。

Task

  • 將被發(fā)送到Spark executor 的操作命令或單個執(zhí)行單元。

Spark Application and SparkSession

每個Spark application 的核心是Spark driver 程序,它創(chuàng)建一個SparkSession對象。當你使用Spark shell時,driver 是shell的一部分,并且創(chuàng)建了SparkSession對象(可通過變量Spark訪問),正如在啟動shell時在前面的示例中看到的那樣。
在這些示例中,因為在筆記本電腦上本地啟動了Spark shell,所以所有操作都在本地運行,在單個JVM中運行。但是,你可以像在本地模式下一樣輕松地啟動Spark shell來在集群上并行分析數(shù)據(jù)。命令Spark -shell——help或pyspark——help將向您展示如何連接到Spark集群管理器。圖2-2顯示了Spark在集群上的執(zhí)行情況。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 2. 在Spark的分布式架構(gòu)中,Spark組件通過Spark driver 進行通信
一旦有了SparkSession,就可以使用api對Spark進行編程來執(zhí)行Spark操作。

Spark Jobs

在與Spark shell的交互會話中,Driver 將我們的Spark應(yīng)用程序轉(zhuǎn)換為一個或多個Spark作業(yè)(圖2-3)。然后將每個作業(yè)轉(zhuǎn)換為DAG。本質(zhì)上,這就是Spark的執(zhí)行計劃,其中DAG中的每個節(jié)點可以是單個或多個Spark階段。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 3. 創(chuàng)建一個或多個Spark作業(yè)的Spark Driver

Spark Stages

作為DAG節(jié)點的一部分,階段是根據(jù)可以串行或并行執(zhí)行的算子創(chuàng)建的(圖2-4)。并非所有的Spark 算子都可以在一個階段中發(fā)生,因此它們可以被劃分為多個階段。通常階段是在運算符的計算邊界上劃分的,在那里它們規(guī)定了Spark executor 之間的數(shù)據(jù)傳輸。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 4. 創(chuàng)建一個或多個階段的Spark job

Spark Tasks

每個階段都由Spark任務(wù)(一個執(zhí)行單元)組成,然后在每個Spark executor 上執(zhí)行.
每個任務(wù)映射到一個核,在一個數(shù)據(jù)分區(qū)上工作(圖2-5)。因此,一個16核的執(zhí)行器可以在16個或更多的分區(qū)上并行運行16個或更多的任務(wù),這使得Spark的任務(wù)執(zhí)行并行度很高:
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 5. 創(chuàng)建一個或多個任務(wù)以分發(fā)給 executor 的 Spark stage

轉(zhuǎn)換、立即執(zhí)行操作和延遲求值

分布式數(shù)據(jù)上的Spark操作可以分為兩種類型: 轉(zhuǎn)換執(zhí)行操作。顧名思義,轉(zhuǎn)換在不改變原始數(shù)據(jù)的情況下將Spark DataFrame轉(zhuǎn)換為新的DataFrame,從而使其具有不可變性。
換句話說,像select()filter()這樣的操作不會改變原始DataFrame;相反,它將返回轉(zhuǎn)換后的操作結(jié)果作為一個新的DataFrame。
所有的轉(zhuǎn)換都是延遲執(zhí)行的,它們的結(jié)果不是立即計算出來的,而是作為一個轉(zhuǎn)換關(guān)系被記錄。這些記錄允許Spark在稍后的執(zhí)行計劃中重新安排某些轉(zhuǎn)換,合并它們,或者將轉(zhuǎn)換優(yōu)化到更有效的執(zhí)行階段。延遲計算是Spark延遲執(zhí)行的策略,直到一個執(zhí)行操作被調(diào)用或數(shù)據(jù)被“使用”(從磁盤讀取或?qū)懭氪疟P)。
執(zhí)行操作觸發(fā)所有轉(zhuǎn)換記錄的延遲計算。在圖2-6中,所有的轉(zhuǎn)換T都被記錄下來,直到動作A被調(diào)用。每個轉(zhuǎn)換T產(chǎn)生一個新的DataFrame。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
圖2 - 6. 延遲轉(zhuǎn)換和立即執(zhí)行求值的操作
延遲求值通過轉(zhuǎn)換血緣關(guān)系和數(shù)據(jù)不變性提供了容錯性,允許Spark通過鏈式調(diào)用轉(zhuǎn)換來優(yōu)化查詢。由于Spark在轉(zhuǎn)換血緣關(guān)系中記錄了每個轉(zhuǎn)換,并且dataframe在轉(zhuǎn)換之間是不可變的,因此它可以通過簡單地重新執(zhí)行血緣關(guān)系的記錄來重現(xiàn)其原始狀態(tài),從而在發(fā)生故障時提供彈性。
下邊列出了一些轉(zhuǎn)換和操作的示例:
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
這些動作和轉(zhuǎn)換構(gòu)成了一個Spark查詢計劃,在調(diào)用操作之前,查詢計劃中不會執(zhí)行任何內(nèi)容。下面的示例用Python和Scala顯示,有兩個轉(zhuǎn)換——read()filter()——和一個立即執(zhí)行操作 count()。該操作觸發(fā)了作為查詢執(zhí)行計劃的一部分記錄的所有轉(zhuǎn)換的執(zhí)行。在這個例子中,在shell中執(zhí)行filter .count()之前什么都不會發(fā)生:

# In Python 
>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
20
// In Scala
scala> import org.apache.spark.sql.functions._
scala> val strings = spark.read.text("../README.md")
scala> val filtered = strings.filter(col("value").contains("Spark"))
scala> filtered.count()
res5: Long = 20s

窄變換和寬變換

如前所述,轉(zhuǎn)換是Spark 延遲計算的操作。延遲求值方案的一個巨大優(yōu)勢是,Spark可以檢查你的計算性查詢,并確定如何優(yōu)化它。這種優(yōu)化可以通過連接或管道化一些操作并將它們分配到一個階段來完成,或者通過確定哪些操作需要跨集群的shuffle或數(shù)據(jù)交換來將它們分解為階段來完成。
轉(zhuǎn)換可以分為窄依賴關(guān)系寬依賴關(guān)系。任何可以從單個輸入分區(qū)計算單個輸出分區(qū)的轉(zhuǎn)換都是窄轉(zhuǎn)換。例如,在前面的代碼片段中,filter()contains()表示狹窄的轉(zhuǎn)換,因為它們可以在單個分區(qū)上操作并生成結(jié)果輸出分區(qū),而無需交換任何數(shù)據(jù)。
但是,groupBy()orderBy()之類的轉(zhuǎn)換會指示Spark執(zhí)行寬轉(zhuǎn)換,其中來自其他分區(qū)的數(shù)據(jù)被讀入、合并并寫入磁盤。如果我們要通過調(diào)用.orderby()對前面示例中filtered后的DataFrame進行排序,那么每個分區(qū)都將在本地排序,但是我們需要強制對集群中每個執(zhí)行器分區(qū)中的數(shù)據(jù)進行過濾,以便對所有記錄進行排序。與窄轉(zhuǎn)換相比,寬轉(zhuǎn)換需要其他分區(qū)的輸出來計算最終的聚合。
下圖說明了兩種類型的依賴關(guān)系:
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)

Spark UI

Spark包含一個圖形用戶界面,可以使用它來檢查或監(jiān)視Spark應(yīng)用程序的各個分解階段(即job、state 和 tasks)。根據(jù)Spark的部署方式,驅(qū)動程序啟動一個web UI,默認在端口4040上運行,可以在其中查看指標和詳細信息,例如:

  • 調(diào)度 stages 和 tasks 的列表
  • RDD大小和內(nèi)存使用的概要描述
  • 運行環(huán)境相關(guān)信息
  • 正在運行的 executors 信息
  • 所有的Spark SQL 查詢

在本地模式下,可以通過瀏覽器http://:4040訪問該接口。

NOTE
當啟動spark-shell時,輸出日志部分會顯示要在端口4040上訪問的本地主機URL。

讓我們看一下前邊的Python示例是如何轉(zhuǎn)換為job、stage 和 tasks的。要查看DAG的外觀,單擊web UI中的“DAG可視化”。如下圖所示,Driver 創(chuàng)建了一個 job 和一個 stage:
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
注意,這里不需要Exchange(執(zhí)行器之間交換數(shù)據(jù)的地方),因為只有一個階段。每個單獨的操作用藍框表示。
stage 0由一個task 組成。如果你有多個任務(wù),它們將并行執(zhí)行。在“stages”頁中可以查看各個stage的詳細信息,如下圖所示:
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
在后續(xù)的文章系列中,我會詳細介紹 UI 界面的使用,這里先只做個簡單的介紹。

單機的應(yīng)用程序

Spark發(fā)行版為每個Spark組件提供了一組示例應(yīng)用程序。
從本地機器上的安裝目錄中,可以運行使用該命令提供的幾個Java或Scala示例程序之一:
bin/run-example _<class> [params]_

$ ./bin/run-example JavaWordCount README.md

這將在控制臺上INFO 信息中輸出 README.md 文件中每個單詞的列表及其計數(shù)(計數(shù)單詞是分布式計算的“Hello, World”)。

計算巧克力豆的數(shù)量

在前面的例子中,我們統(tǒng)計了文件中的單詞。如果文件很大,它將分布在一個被劃分為小數(shù)據(jù)塊的集群上,我們的Spark程序?qū)⒎峙溆嬎忝總€分區(qū)中每個單詞的任務(wù),并返回最終的聚合計數(shù),但這個例子已經(jīng)有點過時了。
讓我們來解決一個類似的問題,但是使用更大的數(shù)據(jù)集,并使用更多Spark的分發(fā)功能和DataFrame api。
如下圖有很多巧克力豆的餅干,我們需要將這些不同顏色的巧克力豆分配給不同的人。
Spark 部署與應(yīng)用程序交互簡單使用說明,# Spark,大數(shù)據(jù),spark,大數(shù)據(jù)
讓我們編寫一個Spark程序,讀取一個包含超過100,000個條目的文件(其中每行或每行都有一個<state, mnm_color, count>),并計算和匯總每種顏色和狀態(tài)的計數(shù)。這些匯總的計數(shù)告訴我們每個人喜歡的m&m巧克力豆的顏色。下邊給出了完整的 Python 代碼:

# Import the necessary libraries.
# Since we are using Python, import the SparkSession and related functions
# from the PySpark module.
import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: mnmcount <file>", file=sys.stderr)
        sys.exit(-1)

    # Build a SparkSession using the SparkSession APIs.
    # If one does not exist, then create an instance. There
    # can only be one SparkSession per JVM.
    spark = (SparkSession
             .builder
             .appName("PythonMnMCount")
             .getOrCreate())
    # Get the M&M data set filename from the command-line arguments
    mnm_file = sys.argv[1]
    # Read the file into a Spark DataFrame using the CSV
    # format by inferring the schema and specifying that the
    # file contains a header, which provides column names for comma-
    # separated fields.
    mnm_df = (spark.read.format("csv") 
              .option("header", "true") 
              .option("inferSchema", "true") 
              .load(mnm_file))

    # We use the DataFrame high-level APIs. Note
    # that we don't use RDDs at all. Because some of Spark's 
    # functions return the same object, we can chain function calls.
    # 1. Select from the DataFrame the fields "State", "Color", and "Count"
    # 2. Since we want to group each state and its M&M color count,
    #    we use groupBy()
    # 3. Aggregate counts of all colors and groupBy() State and Color
    # 4  orderBy() in descending order
    count_mnm_df = (mnm_df
                    .select("State", "Color", "Count")
                    .groupBy("State", "Color")
                    .sum("Count")
                    .orderBy("sum(Count)", ascending=False))
    # Show the resulting aggregations for all the states and colors;
    # a total count of each color per state.
    # Note show() is an action, which will trigger the above
    # query to be executed.
    count_mnm_df.show(n=60, truncate=False)
    print("Total Rows = %d" % (count_mnm_df.count()))
    # While the above code aggregated and counted for all 
    # the states, what if we just want to see the data for 
    # a single state, e.g., CA? 
    # 1. Select from all rows in the DataFrame
    # 2. Filter only CA state
    # 3. groupBy() State and Color as we did above
    # 4. Aggregate the counts for each color
    # 5. orderBy() in descending order  
    # Find the aggregate count for California by filtering
    ca_count_mnm_df = (mnm_df
                       .select("State", "Color", "Count")
                       .where(mnm_df.State == "CA")
                       .groupBy("State", "Color")
                       .sum("Count")
                       .orderBy("sum(Count)", ascending=False))
    # Show the resulting aggregation for California.
    # As above, show() is an action that will trigger the execution of the
    # entire computation. 
    ca_count_mnm_df.show(n=10, truncate=False)
    # Stop the SparkSession
    spark.stop()

創(chuàng)建 mnmcount.py 文件,mnn_datasets .csv 文件數(shù)據(jù)集下載地址,并使用安裝的bin目錄中的submit- Spark腳本將其作為Spark作業(yè)提交。將SPARK_HOME環(huán)境變量設(shè)置為在本地機器上安裝Spark的根目錄。

NOTE
前面的代碼使用DataFrame API,讀起來像高級DSL查詢。我將在后續(xù)文章中介紹這個和其他api。與RDD API不同,你可以使用它來指示Spark做什么,而不是如何做,這是清晰和簡單的!

為了避免將詳細的INFO消息打印到控制臺中,請復制 log4j.properties.template 模板文件到 log4j.properties。并設(shè)置log4j.conf/log4j.conf文件中的rootCategory=WARN
執(zhí)行提交命令,提交上邊的 Pyhton 代碼至 Spark 集群:

$SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv

-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
|   CA|Yellow|    100956|
|   WA| Green|     96486|
|   CA| Brown|     95762|
|   TX| Green|     95753|
|   TX|   Red|     95404|
|   CO|Yellow|     95038|
|   NM|   Red|     94699|
|   OR|Orange|     94514|
|   WY| Green|     94339|
|   NV|Orange|     93929|
|   TX|Yellow|     93819|
|   CO| Green|     93724|
|   CO| Brown|     93692|
|   CA| Green|     93505|
|   NM| Brown|     93447|
|   CO|  Blue|     93412|
|   WA|   Red|     93332|
|   WA| Brown|     93082|
|   WA|Yellow|     92920|
|   NM|Yellow|     92747|
|   NV| Brown|     92478|
|   TX|Orange|     92315|
|   AZ| Brown|     92287|
|   AZ| Green|     91882|
|   WY|   Red|     91768|
|   AZ|Orange|     91684|
|   CA|   Red|     91527|
|   WA|Orange|     91521|
|   NV|Yellow|     91390|
|   UT|Orange|     91341|
|   NV| Green|     91331|
|   NM|Orange|     91251|
|   NM| Green|     91160|
|   WY|  Blue|     91002|
|   UT|   Red|     90995|
|   CO|Orange|     90971|
|   AZ|Yellow|     90946|
|   TX| Brown|     90736|
|   OR|  Blue|     90526|
|   CA|Orange|     90311|
|   OR|   Red|     90286|
|   NM|  Blue|     90150|
|   AZ|   Red|     90042|
|   NV|  Blue|     90003|
|   UT|  Blue|     89977|
|   AZ|  Blue|     89971|
|   WA|  Blue|     89886|
|   OR| Green|     89578|
|   CO|   Red|     89465|
|   NV|   Red|     89346|
|   UT|Yellow|     89264|
|   OR| Brown|     89136|
|   CA|  Blue|     89123|
|   UT| Brown|     88973|
|   TX|  Blue|     88466|
|   UT| Green|     88392|
|   OR|Yellow|     88129|
|   WY|Orange|     87956|
|   WY|Yellow|     87800|
|   WY| Brown|     86110|
+-----+------+----------+

Total Rows = 60

+-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
|   CA|Yellow|    100956|
|   CA| Brown|     95762|
|   CA| Green|     93505|
|   CA|   Red|     91527|
|   CA|Orange|     90311|
|   CA|  Blue|     89123|
+-----+------+----------+

首先我們看到每個地區(qū)的人喜歡的顏色的聚合數(shù)據(jù),下邊是單個地區(qū)的。
下邊是 Scala 版本代碼運行相同的應(yīng)用程序:

package main.scala.chapter2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
 * Usage: MnMcount <mnm_file_dataset>
 */
object MnMcount {
  def main(args: Array[String]) {
    val spark = SparkSession
    .builder
    .appName("MnMCount")
    .getOrCreate()

    if (args.length < 1) {
      print("Usage: MnMcount <mnm_file_dataset>")
      sys.exit(1)
    }
    // Get the M&M data set filename
    val mnmFile = args(0)
    // Read the file into a Spark DataFrame
    val mnmDF = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(mnmFile)
    // Aggregate counts of all colors and groupBy() State and Color
    // orderBy() in descending order
    val countMnMDF = mnmDF
    .select("State", "Color", "Count")
    .groupBy("State", "Color")
    .sum("Count")
    .orderBy(desc("sum(Count)"))
    // Show the resulting aggregations for all the states and colors
    countMnMDF.show(60)
    println(s"Total Rows = ${countMnMDF.count()}")
    println()
    // Find the aggregate counts for California by filtering
    val caCountMnNDF = mnmDF
    .select("State", "Color", "Count")
    .where(col("State") === "CA")
    .groupBy("State", "Color")
    .sum("Count")
    .orderBy(desc("sum(Count)"))
    // Show the resulting aggregations for California
    caCountMnMDF.show(10)
    // Stop the SparkSession
    spark.stop()
  }
}

單機編譯 Scala 程序

下邊將說明如何使用Scala構(gòu)建工具(sbt)構(gòu)建一個Scala Spark程序。
build.sbt 是規(guī)范文件,與makefile類似,它描述并指示Scala編譯器構(gòu)建與Scala相關(guān)的任務(wù),例如 jar 包、packages、要解析的依賴項以及在哪里查找它們。下邊是一個簡單構(gòu)建的例子:

// Name of the package
name := "main/scala/chapter2"
// Version of our package
version := "1.0"
// Version of Scala
scalaVersion := "2.12.10"
// Spark library dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.0.0-preview2",
  "org.apache.spark" %% "spark-sql"  % "3.0.0-preview2"
)

確保已經(jīng)安裝了Java開發(fā)工具包(JDK)和sbt,并設(shè)置了JAVA_HOME和SPARK_HOME,用一個命令,就可以構(gòu)建Spark應(yīng)用程序:

$ sbt clean package
[info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/
project/build.properties: set sbt.version to 1.2.8
[info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/project
[info] Updating 
[info] Done updating.
...
[info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/
scala-2.12/main-scala-chapter2_2.12-1.0.jar ...
[info] Done packaging.
[success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM

成功構(gòu)建后,您可以運行Scala版本的計數(shù)示例,如下所示:

$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount \ 
jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv
...
...
20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: 
Stage finished
20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49, 
took 0.264579 s
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
|   CA|Yellow| 1807|
|   CA| Green| 1723|
|   CA| Brown| 1718|
|   CA|Orange| 1657|
|   CA|   Red| 1656|
|   CA|  Blue| 1603|
+-----+------+-----+

總結(jié)

在本章中,我們介紹了開始使用Apache Spark所需的三個簡單步驟:下載框架,熟悉Scala或PySpark交互shell,掌握高級Spark應(yīng)用程序概念和術(shù)語。我們快速概述了使用轉(zhuǎn)換和操作來編寫Spark應(yīng)用程序的過程,并簡要介紹了使用Spark UI來檢查所創(chuàng)建的job、stage和task。
最后,通過一個簡短的示例,展示了如何使用高級結(jié)構(gòu)化api來告訴Spark要做什么——在下一篇文章我將更詳細地介紹這些api。文章來源地址http://www.zghlxwxcb.cn/news/detail-849275.html

到了這里,關(guān)于Spark 部署與應(yīng)用程序交互簡單使用說明的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 事件驅(qū)動編程:如何在應(yīng)用程序中處理用戶輸入和交互

    [toc] 引言 1.1. 背景介紹 隨著互聯(lián)網(wǎng)技術(shù)的快速發(fā)展,應(yīng)用程序被廣泛應(yīng)用于人們的生活和工作中。在這些應(yīng)用程序中,用戶輸入和交互是必不可少的組成部分。如何優(yōu)雅地處理用戶輸入和交互,讓應(yīng)用程序更加符合用戶的使用習慣,是擺在每個程序員面前的一個重要問題。

    2024年02月07日
    瀏覽(33)
  • 微信小程序前后端交互與WXS的應(yīng)用

    微信小程序前后端交互與WXS的應(yīng)用

    目錄 前言 一、后臺數(shù)據(jù)交互 1.數(shù)據(jù)表 2.后端代碼的實現(xiàn) 3.前后端交互 3.1.后端接口URL管理 3.2.發(fā)送后端請求 3.3.請求方式的封裝 4.前端代碼的編寫 二、WXS的使用 1、.wxs 文件 2.綜合運用 當今社交媒體的普及使得微信小程序成為了一種流行的應(yīng)用開發(fā)形式。微信小程序不僅可以

    2024年02月08日
    瀏覽(15)
  • Intellij IDEA編寫Spark應(yīng)用程序的環(huán)境配置和操作步驟

    Intellij IDEA編寫Spark應(yīng)用程序的環(huán)境配置和操作步驟

    本文介紹如何在win系統(tǒng)中使用IDEA開發(fā)spark應(yīng)用程序,并將其打成jar包上傳到虛擬機中的三個Ubuntu系統(tǒng),然后在分布式環(huán)境中運行。 主要步驟包括: 安裝Scala插件:在Intellij IDEA中安裝Scala插件,并重啟IDEA。 創(chuàng)建Maven項目:在Intellij IDEA中創(chuàng)建一個Maven項目,選擇Scala語言,并添加

    2024年02月12日
    瀏覽(12)
  • spark-shell(pyspark)單機模式使用和編寫?yīng)毩?yīng)用程序

    spark-shell(pyspark)單機模式使用和編寫?yīng)毩?yīng)用程序

    spark有四種部署方式:Local,Standalone,Spark on Mesos,Spark on yarn。第一個為單機模式,后三個為集群模式。 spark-shell支持python和scala,這里使用python。 1.啟動pyspark環(huán)境 在spark安裝目錄下 進入之后,如下圖:? 2.編寫程序 新建代碼文件WordCount.py,并編寫程序 運行代碼:python3 Wor

    2024年04月14日
    瀏覽(25)
  • C# 利用 UI 自動化框架與應(yīng)用程序的用戶界面進行交互來模擬點擊按鈕

    ①需要引入命名空間: using System.Windows.Automation; ②添加兩個引用: UIAutomationClient、UIAutomationTypes 當程序已經(jīng)啟動時, AutoClickLoginButton 方法會尋找名為\\\"FR\\\"的應(yīng)用程序進程。然后,它使用 AutomationElement.FromHandle 從該進程的主窗口句柄獲取根元素。 接著, FindLoginButton 方法被調(diào)用

    2024年01月25日
    瀏覽(30)
  • Windows應(yīng)用程序基礎(chǔ)(詳細,簡單易懂)

    Windows應(yīng)用程序基礎(chǔ)(詳細,簡單易懂)

    大家好!這是我的 第一篇博客 ,首先來個自我介紹吧!我是來自一所雙非院校的大一新生,所學的專業(yè)是 計算機科學與技術(shù) ,純純小白一枚。 那我為什么要寫博客呢 ?為什么?說實話,我一開始一直都沒有寫博客的想法,我都是看別人的博客,直到有一天,我的好同學

    2024年02月04日
    瀏覽(22)
  • 一個簡單的web應(yīng)用程序的創(chuàng)建

    一個簡單的web應(yīng)用程序的創(chuàng)建

    實體、屬性、關(guān)系 簡單介紹 簡單介紹

    2024年02月11日
    瀏覽(32)
  • 應(yīng)用程序部署方式演變

    應(yīng)用程序部署方式演變

    1.傳統(tǒng)部署 互聯(lián)網(wǎng)早期,會直接將應(yīng)用程序部署在物理機上 優(yōu)點:簡單,不需要其他技術(shù)的參與 缺點:不能為應(yīng)用程序定義資源使用邊界,很難合理地分配計算資源,而且程序之間容易產(chǎn)生影響。 2.虛擬化部署 可以在一臺物理機上運行多個虛擬機,每個虛擬機都是獨立的一

    2024年02月15日
    瀏覽(23)
  • .NET 應(yīng)用程序 部署

    .NET 應(yīng)用程序 部署

    **硬件支持型號 ?點擊 查看 硬件支持 詳情** DTU701?產(chǎn)品詳情 DTU702 產(chǎn)品詳情 DTU801?產(chǎn)品詳情 DTU802 產(chǎn)品詳情 DTU902?產(chǎn)品詳情 G5501 產(chǎn)品詳情 本文內(nèi)容 在設(shè)備上部署 dotnet應(yīng)用,與任何其他平臺的部署相同,可以2種方式: 依賴于框架的應(yīng)用 獨立應(yīng)用 2種方式各有優(yōu)勢 依賴于框架

    2024年02月13日
    瀏覽(25)
  • Qt實現(xiàn)一個簡單的應(yīng)用程序——桌面助手

    Qt實現(xiàn)一個簡單的應(yīng)用程序——桌面助手

    1、實現(xiàn)不同功能之間的界面切換 2、可查看日歷 3、可實現(xiàn)計時器功能 4、可實現(xiàn)計算器功能 5、ui界面及按鈕部件背景的設(shè)置 6、為軟件設(shè)置圖標 7、程序打包成軟件 ? ? ? 1、創(chuàng)建工程 ? ? ? ? New Project - Application - Qt Widgets Application然后下一步,使用ui界面文件能省去很多步驟

    2024年02月08日
    瀏覽(66)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包