每日一句正能量
人生很長(zhǎng),不必慌張。你未長(zhǎng)大,我要擔(dān)當(dāng)。
第3章 Spark RDD彈性分布式數(shù)據(jù)集
章節(jié)概要
傳統(tǒng)的MapReduce雖然具有自動(dòng)容錯(cuò)、平衡負(fù)載和可拓展性的優(yōu)點(diǎn),但是其最大缺點(diǎn)是采用非循環(huán)式的數(shù)據(jù)流模型,使得在迭代計(jì)算式要進(jìn)行大量的磁盤IO操作。Spark中的RDD可以很好的解決這一缺點(diǎn)。
RDD是Spark提供的最重要的抽象概念,我們可以將RDD理解為一個(gè)分布式存儲(chǔ)在集群中的大型數(shù)據(jù)集合,不同RDD之間可以通過轉(zhuǎn)換操作形成依賴關(guān)系實(shí)現(xiàn)管道化,從而避免了中間結(jié)果的I/O操作,提高數(shù)據(jù)處理的速度和性能。接下來,本章將針對(duì)RDD進(jìn)行詳細(xì)講解。
3.3 RDD的處理過程
Spark用Scala語言實(shí)現(xiàn)了RDD的API,程序開發(fā)者可以通過調(diào)用API對(duì)RDD進(jìn)行操作處理。RDD經(jīng)過一系列的“轉(zhuǎn)換”操作,每一次轉(zhuǎn)換都會(huì)產(chǎn)生不同的RDD,以供給下一次“轉(zhuǎn)換”操作使用,直到最后一個(gè)RDD經(jīng)過“行動(dòng)”操作才會(huì)被真正計(jì)算處理,并輸出到外部數(shù)據(jù)源中,若是中間的數(shù)據(jù)結(jié)果需要復(fù)用,則可以進(jìn)行緩存處理,將數(shù)據(jù)緩存到內(nèi)存中。
Spark用Scala語言實(shí)現(xiàn)了RDD的API,程序開發(fā)者可以通過調(diào)用API對(duì)RDD進(jìn)行操作處理。下面,通過一張圖來描述RDD的處理過程。
RDD經(jīng)過一系列的"轉(zhuǎn)換”操作,每一次轉(zhuǎn)換都會(huì)產(chǎn)生不同的RDD,以供給下一次轉(zhuǎn)換”操作使用,直到最后一個(gè)RDD經(jīng)過“行動(dòng)”操作才會(huì)被真正計(jì)算處理。
需要注意的是,RDD采用了惰性調(diào)用,即在RDD的處理過程中,真正的計(jì)算發(fā)生在RDD的"行動(dòng)”操作,對(duì)于"行動(dòng)"之前的所有"轉(zhuǎn)換"操作,Spark只是記錄下“轉(zhuǎn)換”操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集以及RDD相互之間的依賴關(guān)系,而不會(huì)觸發(fā)真正的計(jì)算處理。
3.3.1 轉(zhuǎn)換算子
RDD處理過程中的“轉(zhuǎn)換”操作主要用于根據(jù)已有RDD創(chuàng)建新的RDD,每一次通過Transformation算子計(jì)算后都會(huì)返回一個(gè)新RDD,供給下一個(gè)轉(zhuǎn)換算子使用。下面,通過一張表來列舉一些常用轉(zhuǎn)換算子操作的API,具體如下。
下面,我們通過結(jié)合具體的示例對(duì)這些轉(zhuǎn)換算子API進(jìn)行詳細(xì)講解。
- filter(func)
filter(func)操作會(huì)篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集。假設(shè),有一個(gè)文件test.txt(內(nèi)容如前面所示),下面,通過一張圖來描述如何通過filter算子操作,篩選出包含單詞“spark”的元素。
通過從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過map操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來,通過代碼來進(jìn)行演示,具體代碼如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
具體步驟如下:
1.進(jìn)入到hadoop01,進(jìn)入/export/data目錄,命令如下cd /export/data
2.修改test.txt文件的內(nèi)容與源數(shù)據(jù)保持一致(vi test.txt
)。
hadoop spark
itcast heima
scala spark
spark itcast
iscast hadoop
3.進(jìn)入到spark shell(參考之前的啟動(dòng))。
cd export/servers/spark/
bin /spark-shell --master local[2]
4.加載文件并產(chǎn)生RDD,代碼如下。
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
結(jié)果如下圖所示
- map(func)
map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集。假設(shè),有一個(gè)文件test.txt,接下來,通過一張圖來描述如何通過map算子操作把文件內(nèi)容拆分成一個(gè)個(gè)的單詞并封裝在數(shù)組對(duì)象中,具體過程如下圖所示。
通過從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過map操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來,通過代碼來進(jìn)行演示,具體代碼如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[9] at textFile at <console>:24
scala> var words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:25
結(jié)果如下所示:
- flatMap(func)
flatMap(func)與map(func)相似,但是每個(gè)輸入的元素都可以映射到0或者多個(gè)輸出的結(jié)果。有一個(gè)文件test.txt,接下來,通過一張圖來描述如何通過flatMap算子操作,把文件內(nèi)容拆分成一個(gè)個(gè)的單詞。具體過程如下圖所示。
通過從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過flatMap操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來,通過代碼來進(jìn)行演示,具體如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:25
結(jié)果如下所示:
- groupByKey()
groupByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,將具有相同Key的Value進(jìn)行分組,會(huì)返回一個(gè)新的(Key ,lterable)形式的數(shù)據(jù)集。同樣以文件test.txt為例,接下來,通過一張圖來描述如何通過groupByKey算子操作,將文件內(nèi)容中的所有單詞進(jìn)行分組。具體過程如下圖所示。
通過groupByKey操作把(Key,Value))鍵值對(duì)類型的RDD,按單詞將單詞出現(xiàn)的次數(shù)進(jìn)行分組,這些元素組成的集合是一個(gè)新的RDD。接下來,通過代碼來進(jìn)行演示,具體代碼如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:25
scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at <console>:25
結(jié)果如下所示:
- reduceByKey(func)
reduceByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,返回的是一個(gè)新的(Key,Iterable)形式的數(shù)據(jù)集,該數(shù)據(jù)集是每個(gè)Key傳遞給函數(shù)func進(jìn)行聚合運(yùn)算后得到的結(jié)果。同樣以文件test.txt,接下來,通過一張圖來描述如何通過reduceByKey算子操作統(tǒng)計(jì)單詞出現(xiàn)的次數(shù)。具體過程如下圖所示。
通過groupByKey操作把(Key,Value)鍵值對(duì)類型的RDD,按單詞將單詞出現(xiàn)的次數(shù)進(jìn)行分組,這些元素組成的集合是一個(gè)新的RDD。接下來,通過代碼來進(jìn)行演示,具體代碼如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[20] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:25
scala> var reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:25
結(jié)果如下所示:
3.3.2 行動(dòng)算子
行動(dòng)算子主要是將在數(shù)據(jù)集上運(yùn)行計(jì)算后的數(shù)值返回到驅(qū)動(dòng)程序,從而觸發(fā)真正的計(jì)算。下面,通過一張表來列舉一些常用行動(dòng)算子操作的API,具體如下。
下面,結(jié)合具體的示例對(duì)這些行動(dòng)算子API進(jìn)行詳細(xì)講解。
- count ()
count()主要用于返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)。假設(shè),現(xiàn)有一個(gè)arrRdd,如果要統(tǒng)計(jì)arrRdd元素的個(gè)數(shù),示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.count()
res0: Long = 5
- first()
first()主要用于返回教組的第一個(gè)元素?,F(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中第一個(gè)元素,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.first()
res1: Int = 1
從上述結(jié)果可以看出,當(dāng)執(zhí)行arrRdd.first()操作后返回的結(jié)果是1,說明成功獲取到了第1個(gè)元素。
- take()
take()主要用于以數(shù)組的形式返回?cái)?shù)組集中的前n個(gè)元素。現(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中的前三個(gè)元素,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)
從上述代碼可以看出,執(zhí)行arrRdd.take(3)操作后返回的結(jié)果是Array(1,2,3),說明成功獲取到了RDD數(shù)據(jù)集的前3個(gè)元素。
- reduce(func)
reduce()主要用于通過函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素?,F(xiàn)有一個(gè)arrRdd,如果要對(duì)arrRdd中的元素進(jìn)行聚合,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15
- collect()
collect()主要用于以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素?,F(xiàn)有一個(gè)rdd,如果希望rdd中的元素以數(shù)組的形式輸出,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)
- foreach(func)
foreach()主要用于將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行?,F(xiàn)有一個(gè)arrRdd,如果希望遍歷輸出arrRdd中的元素,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.foreach(x=>println(x))
1
2
3
4
5
3.3.3 編寫WordCount詞頻統(tǒng)計(jì)案例
在Linux本地系統(tǒng)的/export/data目錄下,有一個(gè)test.txt文件,文件里有多行文本,每行文本都是由2個(gè)單詞構(gòu)成,且單詞之間都是用空格分隔?,F(xiàn)在,我們需要通過RDD統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)(即詞頻),具體操作過程如下。
具體參見書本內(nèi)容文章來源:http://www.zghlxwxcb.cn/news/detail-835565.html
轉(zhuǎn)載自:https://blog.csdn.net/u014727709/article/details/136032993
歡迎 ??點(diǎn)贊?評(píng)論?收藏,歡迎指正文章來源地址http://www.zghlxwxcb.cn/news/detail-835565.html
到了這里,關(guān)于Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)筆記(第三章 Spark RDD 彈性分布式數(shù)據(jù)集-02)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!