之前筆者參加了公司內(nèi)部舉辦的一個 Big Data Workshop,接觸了一些 Spark 的皮毛,后來在工作中陸陸續(xù)續(xù)又學(xué)習(xí)了一些 Spark 的實戰(zhàn)知識。
本文筆者從小白的視角出發(fā),給大家普及 Spark 的應(yīng)用知識。
什么是 Spark
Spark
集群是基于 Apache Spark
的分布式計算環(huán)境,用于處理大規(guī)模數(shù)據(jù)集
的計算任務(wù)。Apache Spark
是一個開源的、快速而通用的集群計算系統(tǒng),提供了高級的數(shù)據(jù)處理接口,包括 Spark SQL
、Spark Streaming
、MLlib
(機器學(xué)習(xí)庫)和GraphX
(圖計算庫)。Spark
的一個主要特點是能夠在內(nèi)存中進行數(shù)據(jù)處理,從而大大加速計算速度。
Scala
編程語言是 Spark
的首選編程語言之一。Spark
最初是用 Scala
編寫的,而且 Scala
具有強大的靜態(tài)類型系統(tǒng)和函數(shù)式編程特性,使其成為 Spark
的理想選擇。Spark
支持多種編程語言,包括 Java
、Python
和 R
,但 Scala
在 Spark
社區(qū)中仍然占據(jù)重要地位。
什么是 RDD?它在 Spark 架構(gòu)中扮演著怎樣的角色?
提到 Spark 就不能不提到 RDD.
Spark 架構(gòu)中的RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)是一種基本的數(shù)據(jù)結(jié)構(gòu),它在 Spark 分布式計算中扮演著關(guān)鍵的角色。RDD 是 Spark 的核心抽象,它提供了一種容錯的、可并行處理的數(shù)據(jù)結(jié)構(gòu),用于在集群中存儲和操作數(shù)據(jù)。
RDD 將數(shù)據(jù)劃分為多個分區(qū),這些分區(qū)可以并行地在集群中進行處理。RDD 提供了一種高度抽象的數(shù)據(jù)處理接口,使得開發(fā)者可以方便地執(zhí)行并行計算任務(wù)。
RDD 顧名思義,具有下面這些特性:
-
彈性(Resilient):RDD 具有容錯性,即使在節(jié)點故障時也能夠自動從先前的轉(zhuǎn)換中恢復(fù)。這通過 RDD 的依賴信息和轉(zhuǎn)換操作日志實現(xiàn),使得 Spark 能夠在節(jié)點失敗時重新計算丟失的數(shù)據(jù)。
-
分布式(Distributed):RDD 將數(shù)據(jù)劃分為多個分區(qū),并在集群中分布存儲這些分區(qū)。這樣,計算可以在分布式環(huán)境中并行執(zhí)行,提高了處理速度。
-
不可變(Immutable):RDD 是不可變的數(shù)據(jù)結(jié)構(gòu),一旦創(chuàng)建就不能被修改。這確保了數(shù)據(jù)的一致性,并簡化了并行計算的實現(xiàn)。
RDD 實戰(zhàn)(一):平方和的計算
我們通過一個計算整數(shù)集合平方和的簡單例子,來學(xué)習(xí) RDD 的實戰(zhàn)。
首先,我們創(chuàng)建一個RDD:
`data = [1, 2, 3, 4, 5]`
`rdd = sparkContext.parallelize(data)`
接下來,我們可以使用轉(zhuǎn)換操作對 RDD 執(zhí)行平方操作:
`squared_rdd = rdd.map(lambda x: x ** 2)`
現(xiàn)在,我們得到了一個新的 RDD squared_rdd
,它包含了原始 RDD 中每個元素的平方。最后,我們可以使用行動操作計算平方和:
`result = squared_rdd.reduce(lambda x, y: x + y)`
在這個例子中,RDD 允許我們以并行的方式對數(shù)據(jù)執(zhí)行轉(zhuǎn)換和計算操作,而不需要顯式的循環(huán)或迭代。同時,RDD 的容錯性確保了在計算過程中節(jié)點失敗時的可靠性。
RDD 實戰(zhàn)(二):統(tǒng)計 text 文件中每個單詞的出現(xiàn)次數(shù)
有了前面的基礎(chǔ),我們再來完成一個稍微復(fù)雜一些的大數(shù)據(jù)分析任務(wù)。
我用 Java 編寫了一個應(yīng)用程序,這個 Java 應(yīng)用接收一個輸入?yún)?shù),該參數(shù)代表一個 text 文件的絕對路徑,這個 text 文件的內(nèi)容是一本英文小說。
這個 Java 應(yīng)用,可以使用 Spark RDD 的 API,來高效統(tǒng)計 text 文件里,每個單詞的出現(xiàn)頻次。
完整的可運行的 Java 代碼如下:
package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
// Maven source
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@SuppressWarnings({ "resource", "serial" })
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
}
package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
// Maven source
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@SuppressWarnings({ "resource", "serial" })
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName(`JavaWordCount`);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + `: ` + tuple._2());
}
ctx.stop();
}
}
這段 Java 程序從 Apache Spark 中導(dǎo)入 RDD API:org.apache.spark.api.java.JavaRDD, 然后進行下面的邏輯:
- 定義一個正則表達式模式
SPACE
,用于按空格分隔單詞。 - 創(chuàng)建一個JavaSparkContext對象
ctx
,它是Spark的入口點,用于連接到集群。 - 通過命令行參數(shù)獲取輸入文件路徑,如果參數(shù)數(shù)量小于1,則打印用法說明并退出程序。
- 創(chuàng)建一個SparkConf對象
sparkConf
,設(shè)置應(yīng)用程序名稱為 “JavaWordCount”。 - 使用
ctx.textFile
讀取輸入文件,將每一行作為一個元素組成的RDD(Resilient Distributed Dataset)。 - 使用
flatMap
操作將每行文本拆分為單詞,并生成一個包含所有單詞的新RDDwords
。 - 使用
mapToPair
操作將每個單詞映射為鍵值對(單詞, 1),生成新的Pair RDDones
。 - 使用
reduceByKey
操作對相同鍵的值進行累加,得到最終的單詞計數(shù)結(jié)果,生成新的Pair RDDcounts
。 - 使用
collect
操作將結(jié)果收集到Driver程序中,得到一個包含單詞和計數(shù)的列表output
。 - 遍歷輸出列表,將結(jié)果打印到控制臺。
- 停止SparkContext,釋放資源。
將這個 Java 程序編譯成 .class 文件后,使用下面的命令行,將該 class 文件包含的 RDD 計算邏輯,以 Job 的形式,提交到 spark 集群上:
./spark-submit --class "org.apache.spark.examples.JavawordCount" --master spark://NKGV50849583FV1:7077 /root/devExpert/spark-1.4.l
1/bin/test.txt
命令行里的 spark://NKGV50849583FV1:7077
是我在一臺 Linux 服務(wù)器上安裝的 Spark 集群,如下圖所示:
文章來源:http://www.zghlxwxcb.cn/news/detail-824704.html
至此,我們完成了通過 Spark RDD 進行大數(shù)據(jù)處理分析的一個實際需求。文章來源地址http://www.zghlxwxcb.cn/news/detail-824704.html
到了這里,關(guān)于Spark 大數(shù)據(jù)實戰(zhàn):基于 RDD 的大數(shù)據(jù)處理分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!