零、本講學(xué)習(xí)目標(biāo)
- 了解RDD容錯機(jī)制
- 理解RDD檢查點機(jī)制的特點與用處
- 理解共享變量的類別、特點與使用
一、RDD容錯機(jī)制
- 當(dāng)Spark集群中的某一個節(jié)點由于宕機(jī)導(dǎo)致數(shù)據(jù)丟失,則可以通過Spark中的RDD進(jìn)行容錯恢復(fù)已經(jīng)丟失的數(shù)據(jù)。RDD提供了兩種故障恢復(fù)的方式,分別是血統(tǒng)(Lineage)方式和設(shè)置檢查點(checkpoint)方式。
(一)血統(tǒng)方式
- 根據(jù)RDD之間依賴關(guān)系對丟失數(shù)據(jù)的RDD進(jìn)行數(shù)據(jù)恢復(fù)。若丟失數(shù)據(jù)的子RDD進(jìn)行窄依賴運(yùn)算,則只需要把丟失數(shù)據(jù)的父RDD的對應(yīng)分區(qū)進(jìn)行重新計算,不依賴其他節(jié)點,并且在計算過程中不存在冗余計算;若丟失數(shù)據(jù)的RDD進(jìn)行寬依賴運(yùn)算,則需要父RDD所有分區(qū)都要進(jìn)行從頭到尾計算,計算過程中存在冗余計算。
(二)設(shè)置檢查點方式
- 本質(zhì)是將RDD寫入磁盤存儲。當(dāng)RDD進(jìn)行寬依賴運(yùn)算時,只要在中間階段設(shè)置一個檢查點進(jìn)行容錯,即Spark中的sparkContext調(diào)用setCheckpoint()方法,設(shè)置容錯文件系統(tǒng)目錄作為檢查點checkpoint,將checkpoint的數(shù)據(jù)寫入之前設(shè)置的容錯文件系統(tǒng)中進(jìn)行持久化存儲,若后面有節(jié)點宕機(jī)導(dǎo)致分區(qū)數(shù)據(jù)丟失,則以從做檢查點的RDD開始重新計算,不需要從頭到尾的計算,從而減少開銷。
二、RDD檢查點
(一)RDD檢查點機(jī)制
- RDD的檢查點機(jī)制(Checkpoint)相當(dāng)于對RDD數(shù)據(jù)進(jìn)行快照,可以將經(jīng)常使用的RDD快照到指定的文件系統(tǒng)中,最好是共享文件系統(tǒng),例如HDFS。當(dāng)機(jī)器發(fā)生故障導(dǎo)致內(nèi)存或磁盤中的RDD數(shù)據(jù)丟失時,可以快速從快照中對指定的RDD進(jìn)行恢復(fù),而不需要根據(jù)RDD的依賴關(guān)系從頭進(jìn)行計算,大大提高了計算效率。
(二)與RDD持久化的區(qū)別
- cache()或者persist()是將數(shù)據(jù)存儲于機(jī)器本地的內(nèi)存或磁盤,當(dāng)機(jī)器發(fā)生故障時無法進(jìn)行數(shù)據(jù)恢復(fù),而檢查點是將RDD數(shù)據(jù)存儲于外部的共享文件系統(tǒng)(例如HDFS),共享文件系統(tǒng)的副本機(jī)制保證了數(shù)據(jù)的可靠性。
- 在Spark應(yīng)用程序執(zhí)行結(jié)束后,cache()或者persist()存儲的數(shù)據(jù)將被清空,而檢查點存儲的數(shù)據(jù)不會受影響,將永久存在,除非手動將其移除。因此,檢查點數(shù)據(jù)可以被下一個Spark應(yīng)用程序使用,而cache()或者persist()數(shù)據(jù)只能被當(dāng)前Spark應(yīng)用程序使用。
(三)RDD檢查點案例演示
- 在
net.cl.rdd
包里創(chuàng)建CheckpointDemo
對象
package net.cl.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CheckpointDemo {
def main(args: Array[String]): Unit = {
// 設(shè)置系統(tǒng)屬性(本地運(yùn)行必須設(shè)置,否則無權(quán)訪問HDFS)
System.setProperty("HADOOP_USER_NAME", "root")
// 創(chuàng)建SparkConf對象
val conf = new SparkConf()
// 設(shè)置應(yīng)用程序名稱,可在Spark WebUI里顯示
conf.setAppName("Spark-CheckpointDemo")
// 設(shè)置集群Master節(jié)點訪問地址
conf.setMaster("local[2]")
// 設(shè)置測試內(nèi)存
conf.set("spark.testing.memory", "2147480000")
// 基于SparkConf對象創(chuàng)建SparkContext對象,該對象是提交Spark應(yīng)用程序的入口
val sc = new SparkContext(conf)
// 設(shè)置檢查點數(shù)據(jù)存儲路徑
sc.setCheckpointDir("hdfs://master:9000/spark-ck")
// 創(chuàng)建模擬數(shù)據(jù)RDD
val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
// 過濾結(jié)果
val resultRDD = rdd.filter(_ >= 5)
// 持久化RDD到內(nèi)存中
resultRDD.cache()
// 將resultRDD標(biāo)記為檢查點
resultRDD.checkpoint()
// 第一次行動算子計算時,將把標(biāo)記為檢查點的RDD數(shù)據(jù)存儲到文件系統(tǒng)指定路徑中
val result: String = resultRDD.collect().mkString(", ")
println(result)
// 第二次行動算子計算時,將直接從文件系統(tǒng)讀取resultRDD數(shù)據(jù),而不需要從頭計算
val count = resultRDD.count()
println(count)
// 停止Spark容器
sc.stop()
}
}
- 上述代碼使用checkpoint()方法將RDD標(biāo)記為檢查點(只是標(biāo)記,遇到行動算子才會執(zhí)行)。在第一次行動計算時,被標(biāo)記為檢查點的RDD的數(shù)據(jù)將以文件的形式保存在setCheckpointDir()方法指定的文件系統(tǒng)目錄中,并且該RDD的所有父RDD依賴關(guān)系將被移除,因為下一次對該RDD計算時將直接從文件系統(tǒng)中讀取數(shù)據(jù),而不需要根據(jù)依賴關(guān)系重新計算。
- Spark建議,在將RDD標(biāo)記為檢查點之前,最好將RDD持久化到內(nèi)存,因為Spark會單獨啟動一個任務(wù)將標(biāo)記為檢查點的RDD的數(shù)據(jù)寫入文件系統(tǒng),如果RDD的數(shù)據(jù)已經(jīng)持久化到了內(nèi)存,將直接從內(nèi)存中讀取數(shù)據(jù),然后進(jìn)行寫入,提高數(shù)據(jù)寫入效率,否則需要重復(fù)計算一遍RDD的數(shù)據(jù)。
- 創(chuàng)建檢查點保存數(shù)據(jù)的目錄
?文章來源地址http://www.zghlxwxcb.cn/news/detail-458249.html
- 運(yùn)行程序,在控制臺查看結(jié)果
?
- 查看HDFS檢查點目錄,執(zhí)行命令:
hdfs dfs -ls -R /spark-ck
?
三、共享變量
- 通常情況下,Spark應(yīng)用程序運(yùn)行的時候,Spark算子(例如map(func)或filter(func))中的函數(shù)func會被發(fā)送到遠(yuǎn)程的多個Worker節(jié)點上執(zhí)行,如果一個算子中使用了某個外部變量,該變量就會復(fù)制到Worker節(jié)點的每一個Task任務(wù)中,各個Task任務(wù)對變量的操作相互獨立。當(dāng)變量所存儲的數(shù)據(jù)量非常大時(例如一個大型集合)將增加網(wǎng)絡(luò)傳輸及內(nèi)存的開銷。因此,Spark提供了兩種共享變量:廣播變量和累加器。
(一)廣播變量
- 廣播變量是將一個變量通過廣播的形式發(fā)送到每個Worker節(jié)點的緩存中,而不是發(fā)送到每個Task任務(wù)中,各個Task任務(wù)可以共享該變量的數(shù)據(jù)。因此,廣播變量是只讀的。
1、默認(rèn)情況下變量的傳遞
- map()算子傳入的函數(shù)中使用外部變量arr
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()
?
- 上述代碼中,傳遞給map()算子的函數(shù)
(_, arr)
會被發(fā)送到Executor端執(zhí)行,而變量arr
將發(fā)送到Worker節(jié)點
的所有Task任務(wù)
中。變量arr傳遞的流程如下圖所示
- 假設(shè)變量
arr
存儲的數(shù)據(jù)量大小有100MB
,則每一個Task任務(wù)都需要維護(hù)100MB
的副本,若某一個Executor中啟動了3
個Task任務(wù),則該Executor將消耗300MB
內(nèi)存。
2、使用廣播變量時變量的傳遞
- 廣播變量其實是對普通變量的封裝,在分布式函數(shù)中可以通過Broadcast對象的
value
方法訪問廣播變量的值
?
- 使用廣播變量將數(shù)組arr傳遞給map()算子
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()
?
- 上述代碼使用broadcast()方法向集群發(fā)送(廣播)了一個只讀變量,該方法只發(fā)送一次,并返回一個廣播變量broadcastVar,該變量是一個org.apache.spark.broadcast.Broadcast對象。Broadcast對象是只讀的,緩存在集群的每個Worker節(jié)點中。使用廣播變量進(jìn)行變量傳遞的流程如下圖所示。
?
- Worker節(jié)點的每個Task任務(wù)共享唯一的一份廣播變量,大大減少了網(wǎng)絡(luò)傳輸和內(nèi)存開銷。
- 輸出result的數(shù)據(jù)
?
(二)累加器
1、累加器功能
- 累加器提供了將Worker節(jié)點的值聚合到Driver的功能,可以用于實現(xiàn)計數(shù)和求和。
2、不使用累加器
- 對一個整型數(shù)組求和
?
- 上述代碼由于
sum
變量在Driver
中定義,而累加操作sum = sum + x
會發(fā)送到Executor
中執(zhí)行,因此輸出結(jié)果不正確。
3、使用累加器
- 對一個整型數(shù)組求和
文章來源:http://www.zghlxwxcb.cn/news/detail-458249.html
?
scala> val myacc = sc.longAccumulator("My Accumulator") // 聲明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端輸出結(jié)果
- 上述代碼通過調(diào)用SparkContext對象的
longAccumulator ()
方法創(chuàng)建了一個Long
類型的累加器,默認(rèn)初始值為0
。也可以使用doubleAccumulator()
方法創(chuàng)建Double
類型的累加器。 - 累加器只能在
Driver端定義
,在Executor端更新
。Executor端不能讀取累加器的值,需要在Driver端
使用value
屬性讀取。
到了這里,關(guān)于Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!