1. 前言
RDD之間進(jìn)行相互迭代計算(Transformation的轉(zhuǎn)換),當(dāng)執(zhí)行開啟后,新RDD的生成,代表老RDD的消息,RDD的數(shù)據(jù)只在處理的過程中存在,一旦處理完成,就不見了,所以RDD的數(shù)據(jù)是過程數(shù)據(jù)。
RDD數(shù)據(jù)是過程數(shù)據(jù)的這個特性可以最大化的利用資源,老舊的RDD沒用了就會從內(nèi)存中清理,給后續(xù)的計算騰出內(nèi)存空間。
如上圖,rdd3被2次使用,第一次使用之后,其實(shí)rdd3就不存在了,在第二次使用的時候,只能基于RDD的血緣關(guān)系,從RDD1重新執(zhí)行,構(gòu)建出來RDD3,供RDD5使用。
2. RDD的緩存
Spark中提供了緩存API,可以讓我們通過調(diào)用API,將指定的RDD數(shù)據(jù)保留在內(nèi)存或者硬盤上。上述場景如果使用緩存API,RDD3就不會消失,第二次使用RDD3的時候就不會在通過血緣關(guān)系重新開始構(gòu)建出RDD3
# RDD3 被2次使用,可以加入緩存進(jìn)行優(yōu)化
rdd3.cache() # 緩存到內(nèi)存中.
rdd3.persist(StorageLevel.MEMORY_ONLY) # 僅內(nèi)存緩存
rdd3.persist(StorageLevel.MEMORY_ONLY_2) # 僅內(nèi)存緩存,2個副本
rdd3.persist(StorageLevel.DISK_ONLY) # 僅緩存硬盤上
rdd3.persist(StorageLevel.DISK_ONLY_2) # 僅緩存硬盤上,2個副本
rdd3.persist(StorageLevel.DISK_ONLY_3) # 僅緩存硬盤上,3個副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK) # 先放內(nèi)存,不夠放硬盤
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放內(nèi)存,不夠放硬盤,2個副本
rdd3.persist(StorageLevel.OFF_HEAP) # 堆放內(nèi)存(系統(tǒng)內(nèi)存)
# 如上API,自行選擇使用即可
# 一般建議使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# 如果內(nèi)存比較小的集群,建議使用rdd3.persist(StorageLevel.DISK_ONLY) 或者別用緩存了 用CheckPoint
# 主動清理緩存的API
rdd.unpersist()
如上圖,RDD是將自己分區(qū)的數(shù)據(jù),每個分區(qū)自行將其數(shù)據(jù)保存在其所在的Executor內(nèi)存和硬盤上,這就是分散存儲
。
緩存技術(shù)可以將過程RDD數(shù)據(jù),持久化保存到內(nèi)存或者硬盤上,但是這個保存在設(shè)定上是認(rèn)為不安全的,存在丟失的風(fēng)險,所以緩存有一個特點(diǎn)就是保存RDD之間的血緣關(guān)系
。
一旦緩存丟失,可以基于血緣關(guān)系的記錄,重新計算這個RDD的數(shù)據(jù)。
緩存一般是如果丟失的?
- 在內(nèi)存中的存儲是不安全的,比如斷電\計算任務(wù)內(nèi)存不足,把緩存清理給計算讓路
- 硬盤中因?yàn)橛脖P的損壞也是可能丟失的
3. RDD的CheckPoint
Spark中Checkpoint技術(shù),也是將RDD的數(shù)據(jù)保存起來,但是它只支持硬盤存儲,并且它被設(shè)計認(rèn)為是安全的,不保留血緣關(guān)系。
如上圖,Checkpoint存儲的RDD數(shù)據(jù)是集中收集各個分區(qū)的數(shù)據(jù)進(jìn)行存儲,而緩存是分散存儲
緩存和Checkpoint的對比:
- CheckPoint不管分區(qū)數(shù)量多少,風(fēng)險都一樣。 緩存:分區(qū)越多,風(fēng)險越多
- CheckPoint支持寫入HDFS,緩存不行。HDFS是高可靠存儲,CheckPoint被認(rèn)為是安全的
- CheckPoint不支持內(nèi)存,緩存可以。緩存如果寫內(nèi)存 性能比 CheckPoint 要好一些
- CheckPoint因?yàn)樵O(shè)計是安全的,所以
不保留血緣關(guān)系
,而緩存則相反。
實(shí)現(xiàn):
# 設(shè)置CheckPoint第一件事情,選擇Checkpoint的保存路徑
# 如果Local模式,可以支持本地文件系統(tǒng),如果在集群運(yùn)行,千萬要用HDFS
sc.setCheckpointDir("hdfs://master:8020/output/11111")
# 用的時候,直接調(diào)用checkpoint算子即可,但是需要有action算子觸發(fā)
rdd.checkpoint()
rdd.count()
# TODO: 再次執(zhí)行count函數(shù), 此時從checkpoint讀取數(shù)據(jù)
rdd.count()
Checkpoint是一種重量級的使用,也就是RDD的重新計算成本很高的時候,我們采用Checkpoint比較合適,或者數(shù)據(jù)量很大的時候,采用Checkpoint比較合適。如果數(shù)據(jù)量小,或者RDD重新計算也是非??斓模苯邮褂镁彺婕纯伞?mark hidden color="red">文章來源:http://www.zghlxwxcb.cn/news/detail-406419.html
**注意:**Spark中緩存和Checkpoint兩個API都不是action算子,所以需要后面跟action算子才能觸發(fā)。文章來源地址http://www.zghlxwxcb.cn/news/detail-406419.html
到了這里,關(guān)于PySpark RDD的緩存和Checkpoint的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!