国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

這篇具有很好參考價值的文章主要介紹了Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

零、本講學(xué)習(xí)目標(biāo)

  1. 了解RDD容錯機(jī)制
  2. 理解RDD檢查點機(jī)制的特點與用處
  3. 理解共享變量的類別、特點與使用

一、RDD容錯機(jī)制

  • 當(dāng)Spark集群中的某一個節(jié)點由于宕機(jī)導(dǎo)致數(shù)據(jù)丟失,則可以通過Spark中的RDD進(jìn)行容錯恢復(fù)已經(jīng)丟失的數(shù)據(jù)。RDD提供了兩種故障恢復(fù)的方式,分別是血統(tǒng)(Lineage)方式設(shè)置檢查點(checkpoint)方式。

(一)血統(tǒng)方式

  • 根據(jù)RDD之間依賴關(guān)系對丟失數(shù)據(jù)的RDD進(jìn)行數(shù)據(jù)恢復(fù)。若丟失數(shù)據(jù)的子RDD進(jìn)行窄依賴運(yùn)算,則只需要把丟失數(shù)據(jù)的父RDD的對應(yīng)分區(qū)進(jìn)行重新計算,不依賴其他節(jié)點,并且在計算過程中不存在冗余計算;若丟失數(shù)據(jù)的RDD進(jìn)行寬依賴運(yùn)算,則需要父RDD所有分區(qū)都要進(jìn)行從頭到尾計算,計算過程中存在冗余計算。

(二)設(shè)置檢查點方式

  • 本質(zhì)是將RDD寫入磁盤存儲。當(dāng)RDD進(jìn)行寬依賴運(yùn)算時,只要在中間階段設(shè)置一個檢查點進(jìn)行容錯,即Spark中的sparkContext調(diào)用setCheckpoint()方法,設(shè)置容錯文件系統(tǒng)目錄作為檢查點checkpoint,將checkpoint的數(shù)據(jù)寫入之前設(shè)置的容錯文件系統(tǒng)中進(jìn)行持久化存儲,若后面有節(jié)點宕機(jī)導(dǎo)致分區(qū)數(shù)據(jù)丟失,則以從做檢查點的RDD開始重新計算,不需要從頭到尾的計算,從而減少開銷。

二、RDD檢查點

(一)RDD檢查點機(jī)制

  • RDD的檢查點機(jī)制(Checkpoint)相當(dāng)于對RDD數(shù)據(jù)進(jìn)行快照,可以將經(jīng)常使用的RDD快照到指定的文件系統(tǒng)中,最好是共享文件系統(tǒng),例如HDFS。當(dāng)機(jī)器發(fā)生故障導(dǎo)致內(nèi)存或磁盤中的RDD數(shù)據(jù)丟失時,可以快速從快照中對指定的RDD進(jìn)行恢復(fù),而不需要根據(jù)RDD的依賴關(guān)系從頭進(jìn)行計算,大大提高了計算效率。

(二)與RDD持久化的區(qū)別

  • cache()或者persist()是將數(shù)據(jù)存儲于機(jī)器本地的內(nèi)存或磁盤,當(dāng)機(jī)器發(fā)生故障時無法進(jìn)行數(shù)據(jù)恢復(fù),而檢查點是將RDD數(shù)據(jù)存儲于外部的共享文件系統(tǒng)(例如HDFS),共享文件系統(tǒng)的副本機(jī)制保證了數(shù)據(jù)的可靠性。
  • 在Spark應(yīng)用程序執(zhí)行結(jié)束后,cache()或者persist()存儲的數(shù)據(jù)將被清空,而檢查點存儲的數(shù)據(jù)不會受影響,將永久存在,除非手動將其移除。因此,檢查點數(shù)據(jù)可以被下一個Spark應(yīng)用程序使用,而cache()或者persist()數(shù)據(jù)只能被當(dāng)前Spark應(yīng)用程序使用。

(三)RDD檢查點案例演示

  • net.cl.rdd包里創(chuàng)建CheckpointDemo對象
package net.cl.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    // 設(shè)置系統(tǒng)屬性(本地運(yùn)行必須設(shè)置,否則無權(quán)訪問HDFS)
    System.setProperty("HADOOP_USER_NAME", "root")
    // 創(chuàng)建SparkConf對象
    val conf = new SparkConf()
    // 設(shè)置應(yīng)用程序名稱,可在Spark WebUI里顯示
    conf.setAppName("Spark-CheckpointDemo")
    // 設(shè)置集群Master節(jié)點訪問地址
    conf.setMaster("local[2]")
    // 設(shè)置測試內(nèi)存
    conf.set("spark.testing.memory", "2147480000")
    // 基于SparkConf對象創(chuàng)建SparkContext對象,該對象是提交Spark應(yīng)用程序的入口
    val sc = new SparkContext(conf)

    // 設(shè)置檢查點數(shù)據(jù)存儲路徑
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 創(chuàng)建模擬數(shù)據(jù)RDD
    val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
    // 過濾結(jié)果
    val resultRDD = rdd.filter(_ >= 5)
    // 持久化RDD到內(nèi)存中
    resultRDD.cache()
    // 將resultRDD標(biāo)記為檢查點
    resultRDD.checkpoint()

    // 第一次行動算子計算時,將把標(biāo)記為檢查點的RDD數(shù)據(jù)存儲到文件系統(tǒng)指定路徑中
    val result: String = resultRDD.collect().mkString(", ")
    println(result)
    // 第二次行動算子計算時,將直接從文件系統(tǒng)讀取resultRDD數(shù)據(jù),而不需要從頭計算
    val count = resultRDD.count()
    println(count)

    // 停止Spark容器
    sc.stop()
  }
}
  • 上述代碼使用checkpoint()方法將RDD標(biāo)記為檢查點(只是標(biāo)記,遇到行動算子才會執(zhí)行)。在第一次行動計算時,被標(biāo)記為檢查點的RDD的數(shù)據(jù)將以文件的形式保存在setCheckpointDir()方法指定的文件系統(tǒng)目錄中,并且該RDD的所有父RDD依賴關(guān)系將被移除,因為下一次對該RDD計算時將直接從文件系統(tǒng)中讀取數(shù)據(jù),而不需要根據(jù)依賴關(guān)系重新計算。
  • Spark建議,在將RDD標(biāo)記為檢查點之前,最好將RDD持久化到內(nèi)存,因為Spark會單獨啟動一個任務(wù)將標(biāo)記為檢查點的RDD的數(shù)據(jù)寫入文件系統(tǒng),如果RDD的數(shù)據(jù)已經(jīng)持久化到了內(nèi)存,將直接從內(nèi)存中讀取數(shù)據(jù),然后進(jìn)行寫入,提高數(shù)據(jù)寫入效率,否則需要重復(fù)計算一遍RDD的數(shù)據(jù)。
  • 創(chuàng)建檢查點保存數(shù)據(jù)的目錄

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?文章來源地址http://www.zghlxwxcb.cn/news/detail-458249.html

  • 運(yùn)行程序,在控制臺查看結(jié)果

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

  • 查看HDFS檢查點目錄,執(zhí)行命令:hdfs dfs -ls -R /spark-ck

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

三、共享變量

  • 通常情況下,Spark應(yīng)用程序運(yùn)行的時候,Spark算子(例如map(func)或filter(func))中的函數(shù)func會被發(fā)送到遠(yuǎn)程的多個Worker節(jié)點上執(zhí)行,如果一個算子中使用了某個外部變量,該變量就會復(fù)制到Worker節(jié)點的每一個Task任務(wù)中,各個Task任務(wù)對變量的操作相互獨立。當(dāng)變量所存儲的數(shù)據(jù)量非常大時(例如一個大型集合)將增加網(wǎng)絡(luò)傳輸及內(nèi)存的開銷。因此,Spark提供了兩種共享變量:廣播變量和累加器。

(一)廣播變量

  • 廣播變量是將一個變量通過廣播的形式發(fā)送到每個Worker節(jié)點的緩存中,而不是發(fā)送到每個Task任務(wù)中,各個Task任務(wù)可以共享該變量的數(shù)據(jù)。因此,廣播變量是只讀的。

1、默認(rèn)情況下變量的傳遞

  • map()算子傳入的函數(shù)中使用外部變量arr

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()

?

  • 上述代碼中,傳遞給map()算子的函數(shù)(_, arr)會被發(fā)送到Executor端執(zhí)行,而變量arr將發(fā)送到Worker節(jié)點所有Task任務(wù)中。變量arr傳遞的流程如下圖所示

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

  • 假設(shè)變量arr存儲的數(shù)據(jù)量大小有100MB,則每一個Task任務(wù)都需要維護(hù)100MB的副本,若某一個Executor中啟動了3個Task任務(wù),則該Executor將消耗300MB內(nèi)存。

2、使用廣播變量時變量的傳遞

  • 廣播變量其實是對普通變量的封裝,在分布式函數(shù)中可以通過Broadcast對象的value方法訪問廣播變量的值

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

  • 使用廣播變量將數(shù)組arr傳遞給map()算子

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()

?

  • 上述代碼使用broadcast()方法向集群發(fā)送(廣播)了一個只讀變量,該方法只發(fā)送一次,并返回一個廣播變量broadcastVar,該變量是一個org.apache.spark.broadcast.Broadcast對象。Broadcast對象是只讀的,緩存在集群的每個Worker節(jié)點中。使用廣播變量進(jìn)行變量傳遞的流程如下圖所示。

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

  • Worker節(jié)點的每個Task任務(wù)共享唯一的一份廣播變量,大大減少了網(wǎng)絡(luò)傳輸和內(nèi)存開銷。
  • 輸出result的數(shù)據(jù)

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

(二)累加器

1、累加器功能

  • 累加器提供了將Worker節(jié)點的值聚合到Driver的功能,可以用于實現(xiàn)計數(shù)和求和。

2、不使用累加器

  • 對一個整型數(shù)組求和

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

  • 上述代碼由于sum變量在Driver中定義,而累加操作sum = sum + x會發(fā)送到Executor中執(zhí)行,因此輸出結(jié)果不正確。

3、使用累加器

  • 對一個整型數(shù)組求和

Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量

?

scala> val myacc = sc.longAccumulator("My Accumulator") // 聲明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端輸出結(jié)果
  • 上述代碼通過調(diào)用SparkContext對象的longAccumulator ()方法創(chuàng)建了一個Long類型的累加器,默認(rèn)初始值為0。也可以使用doubleAccumulator()方法創(chuàng)建Double類型的累加器。
  • 累加器只能在Driver端定義,在Executor端更新。Executor端不能讀取累加器的值,需要在Driver端使用value屬性讀取。

到了這里,關(guān)于Spark基礎(chǔ)學(xué)習(xí)筆記----RDD檢查點與共享變量的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink狀態(tài)管理與檢查點機(jī)制

    Flink狀態(tài)管理與檢查點機(jī)制

    本專欄案例代碼和數(shù)據(jù)集鏈接:? https://download.csdn.net/download/shangjg03/88477960 相對于其他流計算框架,F(xiàn)link?一個比較重要的特性就是其支持有狀態(tài)計算。即你可以將中間的計算結(jié)果進(jìn)行保存,并提供給后續(xù)的計算使用: 具體而言,F(xiàn)link?又將狀態(tài)?(State)?分為?Keyed?State?與?O

    2024年02月07日
    瀏覽(91)
  • 深入了解 Flink 的檢查點機(jī)制

    Flink 是一個流處理框架,用于實時數(shù)據(jù)處理。檢查點(checkpoint)機(jī)制是 Flink 的一個核心組件,用于保證流處理作業(yè)的可靠性和容錯性。在這篇文章中,我們將深入了解 Flink 的檢查點機(jī)制,涵蓋其核心概念、算法原理、實例代碼以及未來發(fā)展趨勢。 Flink 的檢查點機(jī)制是一種保存

    2024年02月20日
    瀏覽(22)
  • loadrunner入門教程(14)--檢查點

    loadrunner入門教程(14)--檢查點

    檢查點函數(shù)原理:回放腳本時搜索特定的文本或者字符串,從而驗證服務(wù)器相應(yīng)的正確性;驗證請求是否成功,可以添加檢查點。以檢查從服務(wù)器返回的內(nèi)容是否正確。本任務(wù)針對腳本開發(fā)–檢查點進(jìn)行介紹 掌握基于loadrunner性能測試腳本開發(fā)——檢查點 1.單擊Design→Insert

    2024年02月05日
    瀏覽(31)
  • 怎么理解flink的異步檢查點機(jī)制

    flink的checkpoint監(jiān)控頁面那里有兩個指標(biāo)Sync Duration 和Async Duration,一個是開始進(jìn)行同步checkpoint所需的時間,一個是異步checkpoint過程所需的時間,你是否也有過疑惑,是否只是同步過程中的時間才會阻塞正常的數(shù)據(jù)處理,而異步checkpoint的時間不會影響正常的數(shù)據(jù)處理流程? 這

    2024年02月09日
    瀏覽(24)
  • Flink流式計算狀態(tài)檢查點與恢復(fù)

    Flink流式計算狀態(tài)檢查點與恢復(fù) Apache Flink是一個流處理框架,用于實時數(shù)據(jù)處理和分析。Flink可以處理大規(guī)模數(shù)據(jù)流,并提供一種高效、可靠的方法來處理和分析這些數(shù)據(jù)。Flink流式計算狀態(tài)檢查點與恢復(fù)是流處理的關(guān)鍵組件,它們確保Flink應(yīng)用程序在故障時能夠恢復(fù)并繼續(xù)處

    2024年02月19日
    瀏覽(26)
  • Flink系列之:背壓下的檢查點

    Flink系列之:背壓下的檢查點

    通常情況下,對齊 Checkpoint 的時長主要受 Checkpointing 過程中的同步和異步兩個部分的影響。 然而,當(dāng) Flink 作業(yè)正運(yùn)行在嚴(yán)重的背壓下時,Checkpoint 端到端延遲的主要影響因子將會是傳遞 Checkpoint Barrier 到 所有的算子/子任務(wù)的時間。這在 checkpointing process) 的概述中有說明原因

    2024年02月04日
    瀏覽(18)
  • 論文閱讀-多級檢查點重新啟動MPI應(yīng)用的共同設(shè)計

    論文閱讀-多級檢查點重新啟動MPI應(yīng)用的共同設(shè)計

    論文名稱: Co-Designing Multi-Level Checkpoint Restart for MPI Applications 摘要—高性能計算(HPC)系統(tǒng)繼續(xù)通過包含更多硬件組件來支持更大的應(yīng)用部署來擴(kuò)展。關(guān)鍵是,這種擴(kuò)展往往會減少故障之間的平均時間,從而使容錯成為一個越來越重要的挑戰(zhàn)。在HPC中容錯的標(biāo)準(zhǔn)做法是檢查點

    2024年04月09日
    瀏覽(33)
  • 【大數(shù)據(jù)】Flink 架構(gòu)(五):檢查點 Checkpoint(看完即懂)

    【大數(shù)據(jù)】Flink 架構(gòu)(五):檢查點 Checkpoint(看完即懂)

    《 Flink 架構(gòu) 》系列(已完結(jié)),共包含以下 6 篇文章: Flink 架構(gòu)(一):系統(tǒng)架構(gòu) Flink 架構(gòu)(二):數(shù)據(jù)傳輸 Flink 架構(gòu)(三):事件時間處理 Flink 架構(gòu)(四):狀態(tài)管理 Flink 架構(gòu)(五):檢查點 Checkpoint(看完即懂) Flink 架構(gòu)(六):保存點 Savepoint ?? 如果您覺得這篇

    2024年02月19日
    瀏覽(23)
  • 【漲薪技術(shù)】0到1學(xué)會性能測試 —— LR錄制回放&事務(wù)&檢查點

    上一次推文我們分享了性能測試分類和應(yīng)用領(lǐng)域,今天帶大家學(xué)習(xí)性能測試工作原理、事務(wù)、檢查點!后續(xù)文章都會系統(tǒng)分享干貨,帶大家從0到1學(xué)會性能測試,另外還有教程等同步資料,文末免費獲取~ ?通常我們認(rèn)為LoadRunner是由三部分組成:VuGen、Controller、Analysis VuGen:錄

    2024年02月05日
    瀏覽(16)
  • Flink任務(wù)失敗,檢查點失效:Exceeded checkpoint tolerable failure threshold.

    Flink任務(wù)失敗,檢查點失效:Exceeded checkpoint tolerable failure threshold.

    最近實時平臺flink任務(wù)頻繁失敗,報檢查點方面的錯誤,最近集群的hdfs也經(jīng)常報警:運(yùn)行狀況不良,不知道是否和該情況有關(guān),我的狀態(tài)后端位置是hdfs,廢話不多說,干貨搞起來~ 日志中報錯如下: 在報 Exceeded checkpoint tolerable failure threshold. 錯誤的之前,是先報的是 Checkpoi

    2024年02月07日
    瀏覽(55)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包