国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark重溫筆記(三):Spark在企業(yè)中為什么能這么強?——持久化、Checkpoint機制、共享變量與內(nèi)核調(diào)度原理全攻略“

這篇具有很好參考價值的文章主要介紹了Spark重溫筆記(三):Spark在企業(yè)中為什么能這么強?——持久化、Checkpoint機制、共享變量與內(nèi)核調(diào)度原理全攻略“。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Spark學(xué)習(xí)筆記

前言:今天是溫習(xí) Spark 的第 3 天啦!主要梳理了 Spark 核心數(shù)據(jù)結(jié)構(gòu):RDD(彈性分布式數(shù)據(jù)集),包括RDD持久化,checkpoint機制,spark兩種共享變量以及spark內(nèi)核調(diào)度原理,希望對大家有幫助!

Tips:"分享是快樂的源泉??,在我的博客里,不僅有知識的海洋??,還有滿滿的正能量加持??,快來和我一起分享這份快樂吧??!

喜歡我的博客的話,記得點個紅心??和小關(guān)小注哦!您的支持是我創(chuàng)作的動力!"


(本節(jié)的所有數(shù)據(jù)集放在我的資源下載區(qū)哦,感興趣的小伙伴可以自行下載:最全面的SparkCore系列案例數(shù)據(jù)集

5. RDD持久化[掌握]

(1)為什么使用緩存
  • 緩存可以加速計算,比如在wordcount操作的時候?qū)educeByKey算子進行cache的緩存操作,這時候后續(xù)的操作直接基于緩存后續(xù)的計算
  • 緩存可以解決容錯問題,因為RDD是基于依賴鏈的Dependency
  • 使用經(jīng)驗:一次緩存可以多次使用
(2)如何進行緩存
  • spark中提供cache方法
  • spark中提供persist方法
_15_acheOrpersist.py

# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、從本地文件系統(tǒng)創(chuàng)建RDD數(shù)據(jù)集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成聯(lián)合操作
    join_result_rdd = x.join(y)
    print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    # 緩存--基于內(nèi)存緩存-cache底層調(diào)用的是self.persist(StorageLevel.MEMORY_ONLY)
    join_result_rdd.cache()
    # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    # 如果執(zhí)行了緩存的操作,需要使用action算子觸發(fā),在4040頁面上看到綠顏色標(biāo)識
    join_result_rdd.collect()
    # 如果后續(xù)執(zhí)行任何的操作會直接基于上述緩存的數(shù)據(jù)執(zhí)行,比如count==============================> 4040端口出現(xiàn)綠點
    print(join_result_rdd.count())
    print(join_result_rdd.first())
    time.sleep(600)
    sc.stop()
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
2
(1001, ('zhangsan', 'sales'))

(3)何時緩存數(shù)據(jù)
  • rdd來之不易
  • 經(jīng)過很長依賴鏈計算
  • 經(jīng)過shuffle
  • rdd被使用多次
  • 緩存cache或persist

問題1:緩存將數(shù)據(jù)保存在內(nèi)存或磁盤中,內(nèi)存或磁盤都屬于易失介質(zhì)

  • 內(nèi)存在重啟之后沒有數(shù)據(jù)了,磁盤也會數(shù)據(jù)丟失

  • 注意:緩存會將依賴鏈進行保存的

  • 問題2:如何解決基于cache或persist的存儲在易失介質(zhì)的問題?

  • 引入checkpoint檢查點機制

  • 將元數(shù)據(jù)和數(shù)據(jù)統(tǒng)統(tǒng)存儲在HDFS的非易失介質(zhì),HDFS有副本機制

  • checkpoint切斷依賴鏈,直接基于保存在hdfs的中元數(shù)據(jù)和數(shù)據(jù)進行后續(xù)計算

  • 什么是元數(shù)據(jù)?

  • 管理數(shù)據(jù)的數(shù)據(jù)

  • 比如,數(shù)據(jù)大小,位置等都是元數(shù)據(jù)

6. Checkpoint機制[掌握]

(1) 為什么要檢查點

為什么有檢查點機制?

  • 1-因為cache或perisist將數(shù)據(jù)緩存在內(nèi)存或磁盤中,會有丟失數(shù)據(jù)情況,引入檢查點機制,可以將數(shù)據(jù)斬斷依賴之后存儲到HDFS的非易失介質(zhì)中,解決Spark的容錯問題
  • 2-Spark的容錯問題?
    • 有一些rdd出錯怎么辦?可以借助于cache或Persist,或checkpoint
(2)如何進行檢查點

如何使用檢查點機制?

  • 1-指定數(shù)據(jù)保存在哪里?:sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
  • 2-對誰緩存?:算子
  • 3-rdd1.checkpoint() :斬斷依賴關(guān)系進行檢查點
  • 4-檢查點機制觸發(fā)方式:action算子可以觸發(fā)
  • 5-后續(xù)的計算過程:Spark機制直接從checkpoint中讀取數(shù)據(jù)
(3)檢查點機制有哪些作用

檢查點機制那些作用?

  • 將數(shù)據(jù)和元數(shù)據(jù)保存在HDFS中
  • 后續(xù)執(zhí)行rdd的計算直接基于checkpoint的rdd
  • 起到了容錯的作用
(4) 如何實現(xiàn)spark的容錯

面試題:如何實現(xiàn)Spark的容錯?

  • 1-首先會查看Spark是否對數(shù)據(jù)緩存,cache或perisist,直接從緩存中提取數(shù)據(jù)
  • 2-否則查看checkpoint是否保存數(shù)據(jù)
  • 3-否則根據(jù)依賴關(guān)系重建RDD
(5)持久化和檢查點的區(qū)別
  • 1-存儲位置:緩存放在內(nèi)存或本地磁盤,檢查點機制在hdfs
  • 2-生命周期:緩存通過LRU或unpersist釋放,檢查點機制會根據(jù)文件一直存在
  • 3-依賴關(guān)系:緩存保存依賴關(guān)系,檢查點斬斷依賴關(guān)系鏈
_16_checkpoint.py
# -*- coding: utf-8 -*-
# Program function:checkpoint RDD

from pyspark import SparkContext, SparkConf
import os
import time

from pyspark.storagelevel import StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 當(dāng)存在多個版本時,不指定很可能會導(dǎo)致出錯
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    print('PySpark checkpoint Program')
    # TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、RDD的checkpoint
    sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
    # TODO: 3、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)
    fileRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
    # TODO: 調(diào)用checkpoint函數(shù),將RDD進行備份,需要RDD中Action函數(shù)觸發(fā)
    fileRDD.checkpoint()
    print(fileRDD.count())
    # TODO: 再次執(zhí)行count函數(shù), 此時從checkpoint讀取數(shù)據(jù)
    print(fileRDD.count())

    time.sleep(100)
    print('停止 PySpark SparkSession 對象')
    # 關(guān)閉SparkContext
    sc.stop()

2
2
停止 PySpark SparkSession 對象  
(6)持久化和檢查點并存

先cache 再 checkpoint測試

  • 1-讀取數(shù)據(jù)文件
  • 2-設(shè)置檢查點目錄
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-執(zhí)行action操作,根據(jù)spark容錯選擇首先從cache中讀取數(shù)據(jù),時間更少,速度更快
  • 5-如果對rdd實現(xiàn)unpersist
  • 6-從checkpoint中讀取rdd的數(shù)據(jù)
  • 7-通過action可以查看時間
_17_acheCheckpoint.py
# -*- coding: utf-8 -*-
# Program function:cache&checkpoint RDD

from pyspark import SparkContext, SparkConf
import os
import time

from pyspark.storagelevel import StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 當(dāng)存在多個版本時,不指定很可能會導(dǎo)致出錯
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    print('PySpark cache&checkpoint Program')
    # TODO:1、創(chuàng)建應(yīng)用程序入口SparkContext實例對象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、RDD的checkpoint
    sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
    # TODO: 3、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)
    fileRDD = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
    # TODO: 調(diào)用checkpoint和cache函數(shù),將RDD進行容錯,需要RDD中Action函數(shù)觸發(fā)
    print("=======1-同時做cache和Perisist========")
    fileRDD.cache()
    fileRDD.checkpoint()
    print("=======2-啟動Job1跑正常任務(wù),啟動Job2就會先從Cache讀取數(shù)據(jù),Web頁面可以看到ProcessLocal========")
    fileRDD.count()
    # TODO: 再次執(zhí)行count函數(shù), 此時從checkpoint讀取數(shù)據(jù)
    fileRDD.count()
    print("=======3-啟動一個Job發(fā)現(xiàn)查詢數(shù)據(jù)從checkpoint的hdfs中查找========")
    # TODO:釋放cache之后如果在查詢數(shù)據(jù)從哪里讀取? 答案是checkpoint的hdfs的數(shù)據(jù)中。
    fileRDD.unpersist(True)
    fileRDD.count()

    time.sleep(100)
    print('停止 PySpark SparkSession 對象')
    # 關(guān)閉SparkContext
    sc.stop()

7.兩種共享變量[掌握]

(1)累加器
  • 1-原理
    • 在Driver端和exeutor端可以共享Executor執(zhí)行計算的結(jié)果
  • 2-不使用累加器
    • python本地集合可以直接得到結(jié)果
    • 但是在分布式集合中得不到累加的
  • 3-使用累加器
    • acc=sc.accumulate(10),10是初始值
    • acc.add(num)
    • print(acc.value)通過value獲取累加器的值
(2)廣播變量
  • 1-廣播變量不是在每個Task擁有一份變量,而是每個節(jié)點的executor一份副本
  • 2-廣播變量通過本地的executor從blockmanager中過去driver上面變量的副本(計算資源+計算程序)

8. Spark的內(nèi)核調(diào)度

(1) RDD依賴
  • RDD依賴
  • 為什么設(shè)計依賴?
    • 1-為了實現(xiàn)Spark的容錯,rdd1-rdd2-rdd3-rdd4
    • 2-并行計算,劃分依賴、
  • 為什么劃分寬窄依賴?
    • 為了加速并行計算
    • 窄依賴可以并行計算,如果是寬依賴無法并行計算
  • 依賴的劃分
    • 窄依賴:*父 RDD 與子 RDD 間的分區(qū)是一對一的*
    • 寬依賴:劃分Stage
      • *父 RDD 中的分區(qū)可能會被多個子 RDD 分區(qū)使用*
    • 如何區(qū)分寬窄依賴?
      • 比如map。filter,flatMap 窄依賴,無需進行shuffle
      • 比如reduceByKey(合并多個窄依賴),groupByKey,寬依賴(shuffle)
      • 不能說:一個子RDD依賴于多個父rdd,該種情況無法判斷
(2) DAG

什么是DAG?

  • 有向無環(huán)圖
  • DAG如何劃分Stage?
    • 一個Dag就是一個Job,一個Dag是由Action算子進行劃分
    • 一個Job下面有很多Stage,根據(jù)寬依賴Shuffle依賴劃分Stage
  • 一個Spark應(yīng)用程序包括Job、Stage及Task:
    • 第一:Job是以Action方法為界,遇到一個Action方法則觸發(fā)一個Job;一個Job就是dag
    • 第二:Stage是Job的子集,以RDD寬依賴(即Shuffle)為界,遇到Shuffle做一次劃分;
    • 第三:Task是Stage的子集,以并行度(分區(qū)數(shù))來衡量,分區(qū)數(shù)是多少,則有多少個task。
(3) Job的調(diào)度流程
  • 1-用戶代碼編寫: 用戶根據(jù)需求編寫 Spark 應(yīng)用程序,包括定義 RDD、轉(zhuǎn)換操作和行動操作等。

  • 2-DAG 構(gòu)建: Spark 將用戶編寫的代碼進行解析,并構(gòu)建出一個有向無環(huán)圖(DAG),該圖表示了任務(wù)之間的依賴關(guān)系。DAG 由一系列的階段(Stage)組成,每個階段包含一組可以并行執(zhí)行的任務(wù)。

  • 3-Stage 劃分: 根據(jù)任務(wù)之間的依賴關(guān)系,Spark 將 DAG 進一步劃分為不同的階段。一個階段包含一組可以在無需 shuffle 的情況下并行執(zhí)行的任務(wù)。

  • 4-Task 劃分: 對于每個階段,Spark 將其劃分為一系列的任務(wù)(Task),每個任務(wù)對應(yīng)于一個 RDD partition 的處理。任務(wù)的劃分是根據(jù)數(shù)據(jù)的分區(qū)方式和計算的轉(zhuǎn)換操作來確定的。

  • 5-資源分配: Spark 根據(jù)集群的資源情況,將任務(wù)分配給可用的 Executor,以便在集群中并行執(zhí)行。

  • 6-DAG 調(diào)度: Spark 根據(jù)階段之間的依賴關(guān)系,按照拓?fù)漤樞蛘{(diào)度階段的執(zhí)行。每個階段的任務(wù)會在 Executor 上啟動,并且會根據(jù)需要進行數(shù)據(jù)的 shuffle 操作。

  • 7-任務(wù)執(zhí)行: Executor 在分配到的資源上并行執(zhí)行任務(wù)。每個任務(wù)會根據(jù)用戶編寫的轉(zhuǎn)換操作對 RDD 進行處理,并將結(jié)果傳遞給下一個階段的任務(wù)。

  • 8-結(jié)果輸出: 最后一個階段完成后,Spark 將最終的結(jié)果返回給用戶代碼,或者將結(jié)果寫入外部存儲系統(tǒng),如 HDFS、數(shù)據(jù)庫等。文章來源地址http://www.zghlxwxcb.cn/news/detail-845165.html

到了這里,關(guān)于Spark重溫筆記(三):Spark在企業(yè)中為什么能這么強?——持久化、Checkpoint機制、共享變量與內(nèi)核調(diào)度原理全攻略“的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 職場工作多年,為什么成長這么慢

    在職場工作多年,卻沒有成長,是許多人都會遇到的問題。這種情況可能讓人感到沮喪和無助,但是它的根本原因是什么呢?在本文中,我們將探討為什么會出現(xiàn)這種情況,以及如何克服這種困境。 成長需要我們對自己的能力和表現(xiàn)進行評估和反思。如果沒有對自己的工作進

    2023年04月16日
    瀏覽(32)
  • 為什么這么設(shè)計—— Go的GC

    Go語言采用了3色標(biāo)記清理法來對內(nèi)存進行自動垃圾回收, 過程是這樣的: (1)起初所有的對象都是白色的; (2)從根對象出發(fā)掃描所有可達(dá)對象,標(biāo)記為灰色,放入待處理隊列; (3)從待處理隊列中取出灰色對象,將其引用的對象標(biāo)記為灰色并放入待處理隊列中,自身標(biāo)

    2024年02月12日
    瀏覽(21)
  • 為什么現(xiàn)在原生家庭的問題這么嚴(yán)重?

    匿名用戶 191 人贊同了該回答 換一個玄學(xué)的角度來看這個問題,之前看b站,有一個up主說,中國有歷史記載的人口數(shù)一直都很穩(wěn)定,7-8千萬到1億左右,明朝2億,清朝到民國算是增長比較多的,有4億,但是從開國到現(xiàn)在增長了10億,從輪回的角度來講,哪來那么多的人來轉(zhuǎn)世

    2024年02月13日
    瀏覽(36)
  • 48 | DMA:為什么Kafka這么快?

    48 | DMA:為什么Kafka這么快?

    過去幾年里,整個計算機產(chǎn)業(yè)界,都在嘗試不停地提升 I/O 設(shè)備的速度。把 HDD 硬盤換成 SSD 硬盤,我們?nèi)匀挥X得不夠快;用 PCI Express 接口的 SSD 硬盤替代 SATA 接口的 SSD 硬盤,我們還是覺得不夠快,所以,現(xiàn)在就有了傲騰(Optane)這樣的技術(shù)。 但是,無論 I/O 速度如何提升,

    2024年02月21日
    瀏覽(16)
  • 玩轉(zhuǎn)Discord:為什么它這么吸引加密社區(qū)?

    玩轉(zhuǎn)Discord:為什么它這么吸引加密社區(qū)?

    ? ? Twitter、Telegram、Discord,目前加密貨幣項目和社區(qū)必備的三件套,其重要程度堪比國內(nèi)所說的“兩微一抖(微博、微信和抖音)”。 Twitter和Telegram國內(nèi)的用戶還算了解,Discord相對來說就比較陌生了,但是近一年以來,隨著國內(nèi)社交平臺的審查收緊,NFT、DAO的盛行,Discor

    2024年02月04日
    瀏覽(29)
  • 為什么C++這么復(fù)雜還不被淘汰?

    為什么C++這么復(fù)雜還不被淘汰?

    C++是一門廣泛使用的編程語言,主要用于系統(tǒng)和應(yīng)用程序的開發(fā)。盡管C++具有一些復(fù)雜的語法和概念,但它仍然是編程界的重量級選手,在編程語言排行榜中一直位居前列。 為什么C++這么復(fù)雜還不被淘汰呢? C++有以下優(yōu)勢 1、C++具有高性能 C++是一門編譯型語言,可以直接編

    2024年02月05日
    瀏覽(19)
  • ChatGPT是怎么實現(xiàn)的?為什么它這么有效?

    ChatGPT是怎么實現(xiàn)的?為什么它這么有效?

    ChatGPT 能夠自動生成類似于人類寫作的文本,這一點非常引人注目,也令人意外。但它是如何實現(xiàn)的?為什么它能夠如此出色地生成我們認(rèn)為有意義的文本?我的目的是在這里概述ChatGPT內(nèi)部的運行情況,并探討它能夠如此出色地產(chǎn)生有意義文本的原因。 首先需要解釋的是,

    2023年04月26日
    瀏覽(23)
  • 記錄--強制緩存這么暴力,為什么不使用協(xié)商緩存

    記錄--強制緩存這么暴力,為什么不使用協(xié)商緩存

    前段時間在看面經(jīng)的時候,發(fā)現(xiàn)很多份面經(jīng)中都被問到了 強緩存 和 協(xié)商緩存 。因此我覺得有必要寫一篇文章來好好聊聊這兩者。 瀏覽器緩存是瀏覽器在本地磁盤對用戶最近請求過的文檔進行存儲,當(dāng)訪問者再次訪問同一頁面時,瀏覽器就可以直接從本地磁盤加載文檔,其中瀏覽

    2024年02月10日
    瀏覽(32)
  • 電腦為什么這么卡?6個方法處理電腦卡頓

    電腦為什么這么卡?6個方法處理電腦卡頓

    你是否打開電腦就卡到不行?電腦的開機速度慢,就連打開網(wǎng)頁也在轉(zhuǎn)圈圈,一直加載不出來。世界上最痛苦的事莫過于此,想要好好工作,卻一直加載不出網(wǎng)頁。 你知道電腦為什么這么卡嗎? 其實大多數(shù)的原因都在這篇文章列出來了,有興趣的朋友一起來看看,下面還有

    2024年02月11日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包