Spark學(xué)習(xí)筆記
前言:今天是溫習(xí) Spark 的第 3 天啦!主要梳理了 Spark 核心數(shù)據(jù)結(jié)構(gòu):RDD(彈性分布式數(shù)據(jù)集),包括RDD持久化,checkpoint機制,spark兩種共享變量以及spark內(nèi)核調(diào)度原理,希望對大家有幫助!
Tips:"分享是快樂的源泉??,在我的博客里,不僅有知識的海洋??,還有滿滿的正能量加持??,快來和我一起分享這份快樂吧??!
喜歡我的博客的話,記得點個紅心??和小關(guān)小注哦!您的支持是我創(chuàng)作的動力!"
(本節(jié)的所有數(shù)據(jù)集放在我的資源下載區(qū)哦,感興趣的小伙伴可以自行下載:
最全面的SparkCore系列案例數(shù)據(jù)集
)
5. RDD持久化[掌握]
(1)為什么使用緩存
- 緩存可以加速計算,比如在wordcount操作的時候?qū)educeByKey算子進行cache的緩存操作,這時候后續(xù)的操作直接基于緩存后續(xù)的計算
- 緩存可以解決容錯問題,因為RDD是基于依賴鏈的Dependency
- 使用經(jīng)驗:一次緩存可以多次使用
(2)如何進行緩存
- spark中提供cache方法
- spark中提供persist方法
_15_acheOrpersist.py
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
print('PySpark join Function Program')
# TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、從本地文件系統(tǒng)創(chuàng)建RDD數(shù)據(jù)集
x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
y = sc.parallelize([(1001, "sales"), (1002, "tech")])
# TODO:3、使用join完成聯(lián)合操作
join_result_rdd = x.join(y)
print(join_result_rdd.collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
print(x.leftOuterJoin(y).collect())
print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
# 緩存--基于內(nèi)存緩存-cache底層調(diào)用的是self.persist(StorageLevel.MEMORY_ONLY)
join_result_rdd.cache()
# join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
# 如果執(zhí)行了緩存的操作,需要使用action算子觸發(fā),在4040頁面上看到綠顏色標(biāo)識
join_result_rdd.collect()
# 如果后續(xù)執(zhí)行任何的操作會直接基于上述緩存的數(shù)據(jù)執(zhí)行,比如count==============================> 4040端口出現(xiàn)綠點
print(join_result_rdd.count())
print(join_result_rdd.first())
time.sleep(600)
sc.stop()
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
2
(1001, ('zhangsan', 'sales'))
(3)何時緩存數(shù)據(jù)
- rdd來之不易
- 經(jīng)過很長依賴鏈計算
- 經(jīng)過shuffle
- rdd被使用多次
- 緩存cache或persist
問題1:緩存將數(shù)據(jù)保存在內(nèi)存或磁盤中,內(nèi)存或磁盤都屬于易失介質(zhì)
內(nèi)存在重啟之后沒有數(shù)據(jù)了,磁盤也會數(shù)據(jù)丟失
注意:緩存會將依賴鏈進行保存的
問題2:如何解決基于cache或persist的存儲在易失介質(zhì)的問題?
引入checkpoint檢查點機制
將元數(shù)據(jù)和數(shù)據(jù)統(tǒng)統(tǒng)存儲在HDFS的非易失介質(zhì),HDFS有副本機制
checkpoint切斷依賴鏈,直接基于保存在hdfs的中元數(shù)據(jù)和數(shù)據(jù)進行后續(xù)計算
什么是元數(shù)據(jù)?
管理數(shù)據(jù)的數(shù)據(jù)
比如,數(shù)據(jù)大小,位置等都是元數(shù)據(jù)
6. Checkpoint機制[掌握]
(1) 為什么要檢查點
為什么有檢查點機制?
- 1-因為cache或perisist將數(shù)據(jù)緩存在內(nèi)存或磁盤中,會有丟失數(shù)據(jù)情況,引入檢查點機制,可以將數(shù)據(jù)斬斷依賴之后存儲到HDFS的非易失介質(zhì)中,解決Spark的容錯問題
- 2-Spark的容錯問題?
- 有一些rdd出錯怎么辦?可以借助于cache或Persist,或checkpoint
(2)如何進行檢查點
如何使用檢查點機制?
- 1-指定數(shù)據(jù)保存在哪里?:sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
- 2-對誰緩存?:算子
- 3-rdd1.checkpoint() :斬斷依賴關(guān)系進行檢查點
- 4-檢查點機制觸發(fā)方式:action算子可以觸發(fā)
- 5-后續(xù)的計算過程:Spark機制直接從checkpoint中讀取數(shù)據(jù)
(3)檢查點機制有哪些作用
檢查點機制那些作用?
- 將數(shù)據(jù)和元數(shù)據(jù)保存在HDFS中
- 后續(xù)執(zhí)行rdd的計算直接基于checkpoint的rdd
- 起到了容錯的作用
(4) 如何實現(xiàn)spark的容錯
面試題:如何實現(xiàn)Spark的容錯?
- 1-首先會查看Spark是否對數(shù)據(jù)緩存,cache或perisist,直接從緩存中提取數(shù)據(jù)
- 2-否則查看checkpoint是否保存數(shù)據(jù)
- 3-否則根據(jù)依賴關(guān)系重建RDD
(5)持久化和檢查點的區(qū)別
- 1-存儲位置:緩存放在內(nèi)存或本地磁盤,檢查點機制在hdfs
- 2-生命周期:緩存通過LRU或unpersist釋放,檢查點機制會根據(jù)文件一直存在
- 3-依賴關(guān)系:緩存保存依賴關(guān)系,檢查點斬斷依賴關(guān)系鏈
_16_checkpoint.py
# -*- coding: utf-8 -*-
# Program function:checkpoint RDD
from pyspark import SparkContext, SparkConf
import os
import time
from pyspark.storagelevel import StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 當(dāng)存在多個版本時,不指定很可能會導(dǎo)致出錯
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
print('PySpark checkpoint Program')
# TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、RDD的checkpoint
sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
# TODO: 3、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)
fileRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
# TODO: 調(diào)用checkpoint函數(shù),將RDD進行備份,需要RDD中Action函數(shù)觸發(fā)
fileRDD.checkpoint()
print(fileRDD.count())
# TODO: 再次執(zhí)行count函數(shù), 此時從checkpoint讀取數(shù)據(jù)
print(fileRDD.count())
time.sleep(100)
print('停止 PySpark SparkSession 對象')
# 關(guān)閉SparkContext
sc.stop()
2
2
停止 PySpark SparkSession 對象
(6)持久化和檢查點并存
先cache 再 checkpoint測試
- 1-讀取數(shù)據(jù)文件
- 2-設(shè)置檢查點目錄
- 3-rdd.checkpoint() 和rdd.cache()
- 4-執(zhí)行action操作,根據(jù)spark容錯選擇首先從cache中讀取數(shù)據(jù),時間更少,速度更快
- 5-如果對rdd實現(xiàn)unpersist
- 6-從checkpoint中讀取rdd的數(shù)據(jù)
- 7-通過action可以查看時間
_17_acheCheckpoint.py
# -*- coding: utf-8 -*-
# Program function:cache&checkpoint RDD
from pyspark import SparkContext, SparkConf
import os
import time
from pyspark.storagelevel import StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 當(dāng)存在多個版本時,不指定很可能會導(dǎo)致出錯
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
print('PySpark cache&checkpoint Program')
# TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、RDD的checkpoint
sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
# TODO: 3、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)
fileRDD = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
# TODO: 調(diào)用checkpoint和cache函數(shù),將RDD進行容錯,需要RDD中Action函數(shù)觸發(fā)
print("=======1-同時做cache和Perisist========")
fileRDD.cache()
fileRDD.checkpoint()
print("=======2-啟動Job1跑正常任務(wù),啟動Job2就會先從Cache讀取數(shù)據(jù),Web頁面可以看到ProcessLocal========")
fileRDD.count()
# TODO: 再次執(zhí)行count函數(shù), 此時從checkpoint讀取數(shù)據(jù)
fileRDD.count()
print("=======3-啟動一個Job發(fā)現(xiàn)查詢數(shù)據(jù)從checkpoint的hdfs中查找========")
# TODO:釋放cache之后如果在查詢數(shù)據(jù)從哪里讀取? 答案是checkpoint的hdfs的數(shù)據(jù)中。
fileRDD.unpersist(True)
fileRDD.count()
time.sleep(100)
print('停止 PySpark SparkSession 對象')
# 關(guān)閉SparkContext
sc.stop()
7.兩種共享變量[掌握]
(1)累加器
- 1-原理
- 在Driver端和exeutor端可以共享Executor執(zhí)行計算的結(jié)果
- 2-不使用累加器
- python本地集合可以直接得到結(jié)果
- 但是在分布式集合中得不到累加的
- 3-使用累加器
- acc=sc.accumulate(10),10是初始值
- acc.add(num)
- print(acc.value)通過value獲取累加器的值
(2)廣播變量
- 1-廣播變量不是在每個Task擁有一份變量,而是每個節(jié)點的executor一份副本
- 2-廣播變量通過本地的executor從blockmanager中過去driver上面變量的副本(計算資源+計算程序)
8. Spark的內(nèi)核調(diào)度
(1) RDD依賴
- RDD依賴
- 為什么設(shè)計依賴?
- 1-為了實現(xiàn)Spark的容錯,rdd1-rdd2-rdd3-rdd4
- 2-并行計算,劃分依賴、
- 為什么劃分寬窄依賴?
- 為了加速并行計算
- 窄依賴可以并行計算,如果是寬依賴無法并行計算
- 依賴的劃分
- 窄依賴:*父 RDD 與子 RDD 間的分區(qū)是一對一的*
- 寬依賴:劃分Stage
- *父 RDD 中的分區(qū)可能會被多個子 RDD 分區(qū)使用*
- 如何區(qū)分寬窄依賴?
- 比如map。filter,flatMap 窄依賴,無需進行shuffle
- 比如reduceByKey(合并多個窄依賴),groupByKey,寬依賴(shuffle)
- 不能說:一個子RDD依賴于多個父rdd,該種情況無法判斷
(2) DAG
什么是DAG?
- 有向無環(huán)圖
- DAG如何劃分Stage?
- 一個Dag就是一個Job,一個Dag是由Action算子進行劃分
- 一個Job下面有很多Stage,根據(jù)寬依賴Shuffle依賴劃分Stage
- 一個Spark應(yīng)用程序包括Job、Stage及Task:
- 第一:Job是以Action方法為界,遇到一個Action方法則觸發(fā)一個Job;一個Job就是dag
- 第二:Stage是Job的子集,以RDD寬依賴(即Shuffle)為界,遇到Shuffle做一次劃分;
- 第三:Task是Stage的子集,以并行度(分區(qū)數(shù))來衡量,分區(qū)數(shù)是多少,則有多少個task。
(3) Job的調(diào)度流程
-
1-用戶代碼編寫: 用戶根據(jù)需求編寫 Spark 應(yīng)用程序,包括定義 RDD、轉(zhuǎn)換操作和行動操作等。
-
2-DAG 構(gòu)建: Spark 將用戶編寫的代碼進行解析,并構(gòu)建出一個有向無環(huán)圖(DAG),該圖表示了任務(wù)之間的依賴關(guān)系。DAG 由一系列的階段(Stage)組成,每個階段包含一組可以并行執(zhí)行的任務(wù)。
-
3-Stage 劃分: 根據(jù)任務(wù)之間的依賴關(guān)系,Spark 將 DAG 進一步劃分為不同的階段。一個階段包含一組可以在無需 shuffle 的情況下并行執(zhí)行的任務(wù)。
-
4-Task 劃分: 對于每個階段,Spark 將其劃分為一系列的任務(wù)(Task),每個任務(wù)對應(yīng)于一個 RDD partition 的處理。任務(wù)的劃分是根據(jù)數(shù)據(jù)的分區(qū)方式和計算的轉(zhuǎn)換操作來確定的。
-
5-資源分配: Spark 根據(jù)集群的資源情況,將任務(wù)分配給可用的 Executor,以便在集群中并行執(zhí)行。
-
6-DAG 調(diào)度: Spark 根據(jù)階段之間的依賴關(guān)系,按照拓?fù)漤樞蛘{(diào)度階段的執(zhí)行。每個階段的任務(wù)會在 Executor 上啟動,并且會根據(jù)需要進行數(shù)據(jù)的 shuffle 操作。
-
7-任務(wù)執(zhí)行: Executor 在分配到的資源上并行執(zhí)行任務(wù)。每個任務(wù)會根據(jù)用戶編寫的轉(zhuǎn)換操作對 RDD 進行處理,并將結(jié)果傳遞給下一個階段的任務(wù)。文章來源:http://www.zghlxwxcb.cn/news/detail-845165.html
-
8-結(jié)果輸出: 最后一個階段完成后,Spark 將最終的結(jié)果返回給用戶代碼,或者將結(jié)果寫入外部存儲系統(tǒng),如 HDFS、數(shù)據(jù)庫等。文章來源地址http://www.zghlxwxcb.cn/news/detail-845165.html
到了這里,關(guān)于Spark重溫筆記(三):Spark在企業(yè)中為什么能這么強?——持久化、Checkpoint機制、共享變量與內(nèi)核調(diào)度原理全攻略“的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!