第一部分:Spark基礎(chǔ)篇_奔跑者-輝的博客-CSDN博客
第二部分:Spark進(jìn)階篇_奔跑者-輝的博客-CSDN博客
第三部分:Spark調(diào)優(yōu)篇_奔跑者-輝的博客-CSDN博客
第一部分:Flink基礎(chǔ)篇_奔跑者-輝的博客-CSDN博客?(*建議收藏*)
實(shí)時(shí)數(shù)倉(cāng)之 Kappa 架構(gòu)與 Lambda 架構(gòu)_奔跑者-輝的博客-CSDN博客(*建議收藏*)
目錄
1 Spark作業(yè)運(yùn)行流程
2 任務(wù)提交四個(gè)階段
3 Spark運(yùn)行原理
4 Spark 生態(tài)圈都包含哪些組件
5 Spark 與 Mapreduce 的區(qū)別
5.1 Spark效率 比 MR更高的原因
5.2 Spark 與 MR的Shuffle的區(qū)別
6 RDD
6.1 什么是RDD?
6.2 RDD五大屬性
6.3 關(guān)于彈性
6.4 RDD特點(diǎn)
6.5 RDD持久化原理
6.6 RDD有哪些缺陷
6.7 區(qū)分RDD的寬窄依賴
6.8 為什么要設(shè)計(jì)寬窄依賴
7 DAG
7.1 什么是DAG
7.2 DAG中為什么要?jiǎng)澐?Stage
7.3 DAG的Stage如何劃分
7.4 DAG 劃分為 Stage的算法了解嗎
8 算子類
8.1 Transformation算子?
8.2 Action算子?
8.3 groupByKey與reduceByKey的區(qū)別
8.4 map和mapPartitions區(qū)別
8.5 updateStateBykey與mapwithState區(qū)別
8.6 Repartition和Coalesce區(qū)別
8.7 HashPartitioner 與 RangePartitioner區(qū)別
Spark是一種基于內(nèi)存的快捷、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,總結(jié)了其重中之重的spark相關(guān)知識(shí)點(diǎn),建議收藏。
1 Spark作業(yè)運(yùn)行流程
① 構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),SparkContext向資源管理器(YARN)注冊(cè)并申請(qǐng)運(yùn)行Executor資源;
② 資源管理器分配并啟動(dòng)Executor,Executor的運(yùn)行情況將隨著心跳發(fā)送到資源管理器上;
③ SparkContext構(gòu)建成DAG圖,將DAG圖分解成Stage(Taskset),并把Taskset發(fā)送給Task Scheduler。Executor向SparkContext申請(qǐng)Task;
④ Task Scheduler將Task發(fā)放給Executor運(yùn)行,同時(shí)SparkContext將應(yīng)用程序代碼發(fā)放給Executor;
⑤ Task在Executor上運(yùn)行,運(yùn)行完畢釋放所有資源。
2 任務(wù)提交四個(gè)階段
① 構(gòu)建DAG
用戶提交的job將首先被轉(zhuǎn)換成一系列RDD,并通過(guò)RDD之間的依賴關(guān)系構(gòu)建DAG,然后將DAG提交到調(diào)度系統(tǒng);
② DAG調(diào)度
DagSucheduler將DAG切分stage(切分依據(jù)是shuffle),將stage中生成的task以taskset的形式發(fā)送給TaskScheduler;
③ TaskScheduler調(diào)度Task(根據(jù)資源情況將task調(diào)度到Executors);
④ Executors接收task,然后將task交給線程池執(zhí)行。?
3 Spark運(yùn)行原理
Spark應(yīng)用程序以"進(jìn)程集合"為單位在分布式集群上運(yùn)行,通過(guò)driver程序的main方法創(chuàng)建的SparkContext對(duì)象與集群交互的;
① Spark通過(guò)SparkContext向Cluster manager(資源管理器)申請(qǐng)所需執(zhí)行的資源(cpu、內(nèi)存等);
② Cluster manager分配應(yīng)用程序執(zhí)行需要的資源,在Worker節(jié)點(diǎn)上創(chuàng)建Executor;
③ SparkContext將程序代碼(jar包或python文件)和Task任務(wù)發(fā)送給Executor執(zhí)行,并收集結(jié)果給Driver。
相關(guān)組件功能:
master:? ? ? ? ? ? 管理集群和節(jié)點(diǎn),不參與計(jì)算;
worker:? ? ? ? ? ? 計(jì)算節(jié)點(diǎn),進(jìn)程本身不參與計(jì)算,和 master 匯報(bào);
Driver:? ? ? ? ? ? ?運(yùn)行程序的 main 方法,創(chuàng)建 spark context 對(duì)象;
spark context: ?控制整個(gè) application 的生命周期,包括 dagsheduler 和 task scheduler 等組件;
client:? ? ? ? ? ? ? 用戶提交程序的入口。
4 Spark 生態(tài)圈都包含哪些組件
如下圖所示:
Spark Core:Spark 的核心模塊,包含 RDD、任務(wù)調(diào)度、內(nèi)存管理、錯(cuò)誤恢復(fù)、與存儲(chǔ)系統(tǒng)交互等功能;
Spark SQL:主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理。它提供的最核心的編程抽象就是 DataFrame,將其作為分布式 SQL 查詢引擎,通過(guò)將 Spark SQL 轉(zhuǎn)化為 RDD 來(lái)執(zhí)行各種操作;
Spark Streaming:Spark 提供的對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行流式計(jì)算的組件。提供了用來(lái)操作數(shù)據(jù)流的 API;
Spark MLlib:提供常見(jiàn)的機(jī)器學(xué)習(xí)(ML)功能的程序庫(kù)。包括分類、回歸、聚類、協(xié)同過(guò)濾等,還提供了模型評(píng)估、數(shù)據(jù)導(dǎo)入等額外的支持功能;
GraphX(圖計(jì)算):Spark 中用于圖計(jì)算的 API,性能良好,擁有豐富的功能和運(yùn)算符,能在海量數(shù)據(jù)上自如地運(yùn)行復(fù)雜的圖算法;
集群管理器:Spark 設(shè)計(jì)為可以高效地在一個(gè)計(jì)算節(jié)點(diǎn)到數(shù)千個(gè)計(jì)算節(jié)點(diǎn)之間伸縮計(jì)算;
Structured Streaming:處理結(jié)構(gòu)化流,統(tǒng)一了離線和實(shí)時(shí)的 API。
5 Spark 與 Mapreduce 的區(qū)別
spark是借鑒了Mapreduce,并在其基礎(chǔ)上發(fā)展起來(lái)的,繼承了其分布式計(jì)算的優(yōu)點(diǎn)并進(jìn)行了改進(jìn),spark生態(tài)更為豐富,功能更為強(qiáng)大,性能更加適用范圍廣,mapreduce更簡(jiǎn)單,穩(wěn)定性好。主要區(qū)別:
① spark把運(yùn)算的中間數(shù)據(jù)(shuffle階段產(chǎn)生的數(shù)據(jù))存放在內(nèi)存,減少低效的磁盤交互,迭代計(jì)算效率更高。mapreduce的中間結(jié)果需要落地,保存到磁盤;
② spark容錯(cuò)性高,它通過(guò)彈性分布式數(shù)據(jù)集RDD來(lái)實(shí)現(xiàn)高效容錯(cuò),某一部分丟失或者出錯(cuò),可以通過(guò)整個(gè)數(shù)據(jù)集的計(jì)算流程的血緣關(guān)系來(lái)實(shí)現(xiàn)重建,mapreduce的容錯(cuò)只能重新計(jì)算;
③ spark是基于內(nèi)存的分布式計(jì)算架構(gòu),提供更加豐富的數(shù)據(jù)集操作類型,主要分成transformation和action兩類算子,數(shù)據(jù)分析更加快速,所以適合低時(shí)延環(huán)境下計(jì)算的應(yīng)用;
④ spark與hadoop最大的區(qū)別在于迭代式計(jì)算模型:
mapreduce框架的Hadoop主要分為map和reduce兩個(gè)階段,兩個(gè)階段完了就結(jié)束了,所以在一個(gè)job里面能做的處理很有限;
spark計(jì)算模型是基于內(nèi)存的迭代式計(jì)算模型,可以分為n個(gè)階段,根據(jù)用戶編寫(xiě)的RDD算子和程序,在處理完一個(gè)階段后可以繼續(xù)往下處理很多個(gè)階段,而不只是兩個(gè)階段。所以spark相較于mapreduce,計(jì)算模型更加靈活,可以提供更強(qiáng)大的功能。
5.1 Spark效率 比 MR更高的原因
① 基于內(nèi)存計(jì)算,減少低效的磁盤交互;
② 高效的調(diào)度算法,基于DAG;
③ 容錯(cuò)機(jī)制 Linage。
重點(diǎn)部分就是 DAG 和 Lingae
spark就是為了解決mr落盤導(dǎo)致效率低下的問(wèn)題而產(chǎn)生的,原理還是mr的原理,只是shuffle放在內(nèi)存中計(jì)算了,所以效率提高很多。
5.2 Spark 與 MR的Shuffle的區(qū)別
shuffle過(guò)程本質(zhì):都是將 Map 端獲得的數(shù)據(jù)使用分區(qū)機(jī)制進(jìn)行劃分,并將數(shù)據(jù)發(fā)送給對(duì)應(yīng)的Reducer 的過(guò)程;
相同點(diǎn): 都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進(jìn)行 partition,不同的 partition 送到不同的 reducer;
不同點(diǎn):
mapReduce 默認(rèn)是排序的,spark 默認(rèn)不排序,除非使用 sortByKey 算子;
mr 落盤,spark 不落盤,spark 可以解決 mr 落盤導(dǎo)致效率低下的問(wèn)題;
mapReduce 可以劃分成 split,map()、spill、merge、shuffle、sort、reduce()等階段,spark 沒(méi)有明顯的階段劃分,只有不同的 stage 和 算子操作。
6 RDD
6.1 什么是RDD?
RDD是彈性分布數(shù)據(jù)集,是Spark的基本數(shù)據(jù)結(jié)構(gòu),它是一個(gè)不可變的分布式對(duì)象集合,每個(gè)數(shù)據(jù)集被劃分為邏輯分區(qū),其可以在集群的不同節(jié)點(diǎn)上計(jì)算。
6.2 RDD五大屬性
① 1個(gè)分區(qū)列表:? ? ? ? ?每一個(gè)數(shù)據(jù)集都對(duì)應(yīng)一個(gè)分區(qū),通過(guò)getPartitions獲?。?/p>
② 1個(gè)分區(qū)計(jì)算函數(shù): ?真正獲取分區(qū)數(shù)據(jù)的函數(shù),函數(shù)作用在每一個(gè)partition(split)上;
③ 1組依賴關(guān)系:? ? ? ? ?業(yè)務(wù)邏輯轉(zhuǎn)換,形成依賴表;
④ 1個(gè)分區(qū)器:? ? ? ? ? ? ?對(duì)數(shù)據(jù)如何分區(qū),默認(rèn)是hash分區(qū)器;
⑤ 1個(gè)最優(yōu)位置列表: ?數(shù)據(jù)在哪里,計(jì)算就在哪里。移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算。(分區(qū)計(jì)算函數(shù):MapPartition、getPartition)。
6.3 關(guān)于彈性
?存儲(chǔ)的彈性:內(nèi)存與磁盤的自動(dòng)切換
?容錯(cuò)的彈性:數(shù)據(jù)丟失可以自動(dòng)恢復(fù)
?計(jì)算的彈性:計(jì)算出錯(cuò)重試機(jī)制
?分片的彈性:可根據(jù)需要重新分片
6.4 RDD特點(diǎn)
① 彈性:可以基于內(nèi)存、磁盤的存儲(chǔ),數(shù)據(jù)集可大可小 ;
② 可容錯(cuò):容錯(cuò)性 (task容錯(cuò):task失敗重試 ,stage容錯(cuò):stage失敗重試 ,RDD本身容錯(cuò)。)
③ 并行處理:數(shù)據(jù)集可分區(qū)并行處理;
④ 不可變:RDD一旦創(chuàng)建是不能更新和刪除的,但是可以基于RDD進(jìn)行計(jì)算,獲取新的RDD數(shù)據(jù)集。
6.5 RDD持久化原理
調(diào)用cache()和persist()方法即可。cache()和persist()的區(qū)別在于,cache()是persist()的一種簡(jiǎn)化方式,cache()的底層就是調(diào)用persist()的無(wú)參版本persist(MEMORY_ONLY),將數(shù)據(jù)持久化到內(nèi)存中;
如果需要從內(nèi)存中清除緩存,可以使用unpersist()方法。RDD持久化是可以手動(dòng)選擇不同的策略的。在調(diào)用persist()時(shí)傳入對(duì)應(yīng)的StorageLevel即可。
6.6 RDD有哪些缺陷
① 不支持細(xì)粒度的寫(xiě)和更新操作,spark寫(xiě)數(shù)據(jù)是粗粒度的。所謂粗粒度,就是批量寫(xiě)入數(shù)據(jù),為了提高效率;
② 不支持增量迭代計(jì)算,F(xiàn)link支持。
6.7 區(qū)分RDD的寬窄依賴
?窄依賴:父RDD的一個(gè)分區(qū)只會(huì)被子rdd的一個(gè)分區(qū)依賴(一對(duì)一)。例如map、filter、union等這些算子;
寬依賴:父RDD的一個(gè)分區(qū)會(huì)被子rdd的多個(gè)分區(qū)依賴(會(huì)引起shuffle) (一對(duì)多)。例如groupByKey、 reduceByKey,sortBykey等算子。
6.8 為什么要設(shè)計(jì)寬窄依賴
① 對(duì)于窄依賴:
窄依賴的多個(gè)分區(qū)可以并行計(jì)算;
窄依賴的一個(gè)分區(qū)的數(shù)據(jù)如果丟失只需要重新計(jì)算對(duì)應(yīng)的分區(qū)的數(shù)據(jù)就可以了。
② 對(duì)于寬依賴:
劃分 Stage(階段)的依據(jù): 對(duì)于寬依賴,必須等到上一階段計(jì)算完成才能計(jì)算下一階段。
7 DAG
7.1 什么是DAG
DAG(Directed Acyclic Graph 有向無(wú)環(huán)圖)指的是數(shù)據(jù)轉(zhuǎn)換執(zhí)行的過(guò)程,有方向,無(wú)閉環(huán)(其實(shí)就是 RDD 執(zhí)行的流程);
原始的RDD通過(guò)一系列的轉(zhuǎn)換操作就形成了DAG有向無(wú)環(huán)圖,任務(wù)執(zhí)行時(shí),可以按照 DAG 的描述,執(zhí)行真正的計(jì)算。
7.2 DAG中為什么要?jiǎng)澐?Stage
并行計(jì)算。
一個(gè)復(fù)雜的業(yè)務(wù)邏輯如果有 shuffle,那么就意味著前面階段產(chǎn)生結(jié)果后,才能執(zhí)行下一個(gè)階段,即下一個(gè)階段的計(jì)算要依賴上一個(gè)階段的數(shù)據(jù)。那么我們按照 shuffle 進(jìn)行劃分(也就是按照寬依賴就行劃分),就可以將一個(gè) DAG 劃分成多 個(gè) Stage/階段,在同一個(gè) Stage 中,會(huì)有多個(gè)算子操作,可以形成一個(gè)pipeline 流水線,流水線內(nèi)的多個(gè)平行的分區(qū)可以并行執(zhí)行。
7.3 DAG的Stage如何劃分
DAG叫做有向無(wú)環(huán)圖,原始的RDD通過(guò)一系列的轉(zhuǎn)換就形成了DAG,根據(jù)RDD之間的依賴關(guān)系不同將DAG劃分成不同的stage。
對(duì)于"窄依賴",partition的轉(zhuǎn)換處理在stage中完成計(jì)算,不劃分。
對(duì)于"寬依賴",由于shuffle的存在,只能在父RDD處理完成后,才能開(kāi)始接下來(lái)的計(jì)算,也就是說(shuō)需要?jiǎng)澐謘tage。且"寬依賴"是劃分stage的依據(jù)。
7.4 DAG 劃分為 Stage的算法了解嗎
核心算法:回溯算法
從后往前回溯/反向解析,遇到窄依賴加入本 Stage,遇見(jiàn)寬依賴進(jìn)行 Stage 切分。
8 算子類
8.1 Transformation算子?
transformation變換/轉(zhuǎn)換算子:用來(lái)將rdd進(jìn)行轉(zhuǎn)化,構(gòu)建rdd的血緣關(guān)系,這種變換并不觸發(fā)提交作業(yè);
transformation有"惰性",操作是延遲計(jì)算的,Action觸發(fā)的時(shí)候才會(huì)真正的計(jì)算;
(1) map:對(duì)RDD中所有元素施加一個(gè)函數(shù)映射,返回一個(gè)新RDD,該RDD有原RDD中的每個(gè)元素經(jīng)過(guò)function轉(zhuǎn)換后組成。 特點(diǎn):輸入一條,輸出一條;
(2) filter:過(guò)濾符合條件的記錄數(shù),true保留,false過(guò)濾掉;
(3) flatmap:通過(guò)傳入函數(shù)進(jìn)行映射,對(duì)每一個(gè)元素進(jìn)行處理。先map,后flat,與map相似,每個(gè)輸入項(xiàng)可映射0到多個(gè)輸出項(xiàng);
(4) repartition:增加或減少分區(qū),會(huì)產(chǎn)生shuffle (多分區(qū)到一個(gè)分區(qū)不會(huì)產(chǎn)生shuffle);
(5) MapPartitions:每次處理一個(gè)分區(qū)的數(shù)據(jù),這個(gè)分區(qū)的數(shù)據(jù)處理完后,原RDD中分區(qū)數(shù)據(jù)才能釋放,但是數(shù)據(jù)量大時(shí)會(huì)導(dǎo)致oom;
(6) MapPartitionsWithIndex:與MapPartition相似,除此之外還會(huì)帶分區(qū)索引值;
(7) foreache:循環(huán)遍歷數(shù)據(jù)集中每個(gè)元素,并運(yùn)行相應(yīng)的邏輯;
(8) sample:隨機(jī)抽樣算子,對(duì)傳進(jìn)去的數(shù)按比例放回或不放回的抽樣;
(9) GroupByKey:對(duì)數(shù)據(jù)會(huì)按照key進(jìn)行分組,key相同會(huì)在同一個(gè)分區(qū)里;
(10) ReduceByKey:將相同的key,將按照相應(yīng)的邏輯進(jìn)行處理。先進(jìn)行本地聚合(分區(qū)聚合),在進(jìn)行全局聚合;
(11) sortbykey:如果源RDD包含源類型(k,v)對(duì),其中k可排序,則返回新RDD包含(k,v)對(duì),并按照k排序;
(12) union:返回源數(shù)據(jù)集合參數(shù)數(shù)據(jù)的并集;
(13) distinct:返回對(duì)源數(shù)據(jù)集對(duì)元素去重后的新數(shù)據(jù)集;
還有intersection、aggregateBykey、join、cogroup、cartesian、pipe、coalesce、repartition、Repartition and SortWithPartition等算子。
8.2 Action算子?
action算子會(huì)觸發(fā)Spark提交作業(yè)(job),并將數(shù)據(jù)輸出spark系統(tǒng)。
(1) reduce: 根據(jù)聚合邏輯聚合數(shù)據(jù)集中每個(gè)元素;
(2) take(n): 返回一個(gè)數(shù)據(jù)集包含前n個(gè)元素的集合;
(3) first: ?first=take(1)意思是返回?cái)?shù)據(jù)集中的第1個(gè)元素;
(4) count: 返回?cái)?shù)據(jù)集中元素的個(gè)數(shù)。會(huì)在結(jié)果計(jì)算完成后回收到Driver端;
(5) collect:將計(jì)算結(jié)果回收到Driver端;
(6) foreach: 循環(huán)遍歷數(shù)據(jù)中每個(gè)元素,運(yùn)行相應(yīng)的邏輯;
(7) foreachPartition:遍歷每個(gè)partition里邊的數(shù)據(jù);
還有takeSample、saveAsTextfile、takeOrdered、Save As SequenceFile、SaveAsObjectFile、countByKey等算子。
8.3 groupByKey與reduceByKey的區(qū)別
groupByKey: 主要實(shí)現(xiàn)分組,key相同會(huì)在同1個(gè)分區(qū)里,沒(méi)有預(yù)聚合作用;
reduceByKey:分局部聚合(每個(gè)分區(qū)的聚合) + 全局聚合(每個(gè)分區(qū)的匯總聚合),具有預(yù)聚合操作;
reduceByKey效率更高些,盡量避免使用groupByKey
相同點(diǎn):都是transformation類型的算子,所有的算子都是根據(jù)key進(jìn)行分組,都會(huì)發(fā)生shuffle過(guò)程。
8.4 map和mapPartitions區(qū)別
map:每次處理一條數(shù)據(jù),對(duì)每一個(gè)元素作遍歷;
mapPartitions:每次處理一個(gè)分區(qū)數(shù)據(jù),這個(gè)分區(qū)數(shù)據(jù)處理完后,原RDD中分區(qū)數(shù)據(jù)才能釋放,但是數(shù)據(jù)量大時(shí)可能導(dǎo)致ooM;
開(kāi)發(fā)指導(dǎo): 當(dāng)內(nèi)存空間比較大的時(shí)候,建議使用mapPartition(),以提高效率。
相同點(diǎn): 都是基于分區(qū)數(shù)據(jù)的計(jì)算。
8.5 updateStateBykey與mapwithState區(qū)別
updateStateBykey:統(tǒng)計(jì)全局key的狀態(tài),但是就算沒(méi)有數(shù)據(jù)出入,它也會(huì)返回之前key的狀態(tài);
缺點(diǎn): 如果數(shù)據(jù)量太大的話,我們需要checkpoint數(shù)據(jù)會(huì)占用較大的存儲(chǔ),而且效率也不高。
mapwithState(效率更高,生產(chǎn)中建議使用):用戶統(tǒng)計(jì)全局key的狀態(tài),但是它如果沒(méi)有數(shù)據(jù)輸入,便不會(huì)返回之前key的狀態(tài)。我們可以只是關(guān)心那些已經(jīng)發(fā)生變化的key,對(duì)于沒(méi)有數(shù)據(jù)輸入,則不會(huì)返回那些沒(méi)有變化key的數(shù)據(jù);
優(yōu)點(diǎn): 即使數(shù)據(jù)量很大,checkpoint也不會(huì)像updateStateBykey那樣,占用較大的存儲(chǔ)。
相同點(diǎn): 對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行全局的匯總,有狀態(tài)的計(jì)算。
8.6 Repartition和Coalesce區(qū)別
① 關(guān)系:
兩者都是用來(lái)改變RDD的partition數(shù)量的,repartition底層調(diào)用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
② 區(qū)別:
repartition一定會(huì)發(fā)生shuffle,coalesce根據(jù)傳入的參數(shù)來(lái)判斷是否發(fā)生shuffle;
一般情況下增大rdd的partition數(shù)量使用repartition,減少partition數(shù)量時(shí)使用coalesce。
8.7 HashPartitioner 與 RangePartitioner區(qū)別
HashPartitioner:使用key計(jì)算其hashCode,除以分區(qū)的個(gè)數(shù)取余,得到的值作為分區(qū)ID,其結(jié)果可能導(dǎo)致分區(qū)中的數(shù)據(jù)量不均勻,產(chǎn)生數(shù)據(jù)傾斜;文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-454669.html
RangePartitioner:盡量保證每個(gè)分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,但是分區(qū)內(nèi)的元素是不能保證有序的,即就是將一點(diǎn)范圍的數(shù)據(jù)映射到某一個(gè)分區(qū)內(nèi)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-454669.html
到了這里,關(guān)于第一部分:Spark基礎(chǔ)篇的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!