spark優(yōu)化總結(jié):
一、spark?代碼優(yōu)
六大代碼優(yōu)化: 避免創(chuàng)建重復(fù)的RDD 盡可能復(fù)用同一個(gè)RDD 對(duì)多次使用的RDD進(jìn)行持久化 盡量避免使用shuffle類算子 使用map-side預(yù)聚合的shuffle操作 使用高性能的算子 廣播大變量 使用Kryo優(yōu)化序列化性能 優(yōu)化數(shù)據(jù)結(jié)構(gòu) 使用高性能的庫(kù)fastutil
1.?對(duì)多次使用的RDD進(jìn)行持久化
同常內(nèi)存夠的時(shí)候建議使用:MEMORY_ONLY
如果內(nèi)存不夠的時(shí)候使用通常建議使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。
如何選擇一種最合適的持久化策略 1 默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大, 可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)。因?yàn)椴贿M(jìn)行序列化與反序列化操作,就避 免了這部分的性能開(kāi)銷;對(duì)這個(gè)RDD的后續(xù)算子操作, 都是基于純內(nèi)存中的數(shù)據(jù)的操作 ,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳 送到其他節(jié)點(diǎn)上。但是這里必須要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種 策略的場(chǎng)景還是有限的, 如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億),直接用這種持久化 級(jí)別,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。 如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出,那么建議嘗試使用 MEMORY_ONLY_SER級(jí)別。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè) partition僅僅是一個(gè)字節(jié)數(shù)組而已,大大減少了對(duì)象數(shù)量,并降低了內(nèi)存占用。 這種級(jí)別 比MEMORY_ONLY多出來(lái)的性能開(kāi)銷,主要就是序列化與反序列化的開(kāi)銷。但是后續(xù)算 子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問(wèn)題同上, 如果RDD中的數(shù)據(jù)量過(guò)多的話, 還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常。
如何選擇一種最合適的持久化策略 2 如果純內(nèi)存的級(jí)別都無(wú)法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步,就說(shuō)明RDD的數(shù)據(jù)量很大,內(nèi)存無(wú) 法完全放下。序列化后的數(shù)據(jù)比較少, 可以節(jié)省內(nèi)存和磁盤的空間開(kāi)銷。同時(shí)該策略會(huì)優(yōu) 先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會(huì)寫入磁盤。 通常不建議使用DISK_ONLY和后綴為_(kāi)2的級(jí)別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫 ,會(huì)導(dǎo)致性能急劇降低,有時(shí)還不如重新計(jì)算一次所有RDD。后綴為_(kāi)2的級(jí)別,必須將 所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點(diǎn)上, 數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性 能開(kāi)銷,除非是要求作業(yè)的高可用性,否則不建議使用。
2. 使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey 使用mapPartitions替代普通map Transformation算子 使用foreachPartitions替代foreach Action算子 使用filter之后進(jìn)行coalesce操作 使用repartitionAndSortWithinPartitions替代repartition與sort類操作代碼 repartition:coalesce(numPartitions,true) 增多分區(qū)使用這個(gè) coalesce(numPartitions,false) 減少分區(qū) 沒(méi)有shuffle只是合并 partition
3. 廣播大變量
1. 開(kāi)發(fā)過(guò)程中,會(huì)遇到需要在算子函數(shù)中使用外部變量的場(chǎng)景(尤其是大變量,比如 100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來(lái)提 升性能 2. 函數(shù)中使用到外部變量時(shí),默認(rèn)情況下,Spark會(huì)將該變量復(fù)制多個(gè)副本,通過(guò)網(wǎng)絡(luò) 傳輸?shù)絫ask中,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如 100M,甚至1G), 那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_(kāi)銷,以及在各個(gè)節(jié) 點(diǎn)的Executor中占用過(guò)多內(nèi)存導(dǎo)致的頻繁GC(垃圾回收),都會(huì)極大地影響性能 3. 如果使用的外部變量比較大,建議使用Spark的廣播功能,對(duì)該變量進(jìn)行廣播。廣播 后的變量,會(huì)保證每個(gè)Executor的內(nèi)存中,只駐留一份變量副本, 而Executor中的 task執(zhí)行時(shí)共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本 的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_(kāi)銷,并減少對(duì)Executor內(nèi)存的占用開(kāi)銷,降低 GC的頻率 4. 廣播大變量發(fā)送方式:Executor一開(kāi)始并沒(méi)有廣播變量,而是task運(yùn)行需要用到廣 播變量,會(huì)找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要。
4. 使用Kryo優(yōu)化序列化性能
在Spark中,主要有三個(gè)地方涉及到了序列化: 在算子函數(shù)中使用到外部變量時(shí),該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸 將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,SXT是自定義類型),所有自 定義類型對(duì)象,都會(huì)進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn) Serializable接口。 使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè) partition都序列化成一個(gè)大的字節(jié)數(shù)組。
Kryo序列化器介紹: Spark支持使用Kryo序列化機(jī)制。Kryo序列化機(jī)制,比默認(rèn)的Java序列化機(jī)制,速度要快 ,序列化后的數(shù)據(jù)要更小,大概是Java序列化機(jī)制的1/10。所以Kryo序列化優(yōu)化以后,可 以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少;在集群中耗費(fèi)的內(nèi)存資源大大減少。 對(duì)于這三種出現(xiàn)序列化的地方,我們都可以通過(guò)使用Kryo序列化類庫(kù),來(lái)優(yōu)化序列化和 反序列化的性能。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是 ObjectOutputStream/ObjectInputStream API來(lái)進(jìn)行序列化和反序列化。但是Spark同 時(shí)支持使用Kryo序列化庫(kù),Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多。 官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制,性能高10倍左右。Spark之所以默認(rèn)沒(méi)有 使用Kryo作為序列化類庫(kù),是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類 型,因此對(duì)于開(kāi)發(fā)者來(lái)說(shuō),這種方式比較麻煩
5. 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
Java中,有三種類型比較耗費(fèi)內(nèi)存: 對(duì)象,每個(gè)Java對(duì)象都有對(duì)象頭、引用等額外的信息,因此比較占用內(nèi)存空間。 字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長(zhǎng)度等額外信息。 集合類型,比如HashMap、LinkedList等,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來(lái) 封裝集合元素,比如Map.Entry。 因此Spark官方建議,在Spark編碼實(shí)現(xiàn)中,特別是對(duì)于算子函數(shù)中的代碼,盡 量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對(duì)象,使用原始類型(比如 Int、Long)替代字符串, 使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用 ,從而降低GC頻率,提升性能。
6. 使用高性能的庫(kù)fastutil
fastutil介紹: fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List、Set;HashMap、ArrayList、 HashSet)的類庫(kù),提供了特殊類型的map、set、list和queue; fastutil能夠提供更小的內(nèi)存占用,更快的存取速度;我們使用fastutil提供的集合類,來(lái) 替代自己平時(shí)使用的JDK的原生的Map、List、Set,好處在于, fastutil集合類,可以減 小內(nèi)存的占用,并且在進(jìn)行集合的遍歷、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素 的值的時(shí)候,提供更快的存取速度; fastutil最新版本要求Java 7以及以上版本; fastutil的每一種集合類型,都實(shí)現(xiàn)了對(duì)應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的map,實(shí) 現(xiàn)了Java的Map接口),因此可以直接放入已有系統(tǒng)的任何代碼中。 fastutil的每一種集合類型,都實(shí)現(xiàn)了對(duì)應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的 map,實(shí)現(xiàn)了Java的Map接口),因此可以直接放入已有系統(tǒng)的任何代碼中。 使用? IDEA中導(dǎo)入依賴
<!-- https://mvnrepository.com/artifact/fastutil/fastutil -->
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
二、spark 參數(shù)調(diào)優(yōu)
--num-executors executor的數(shù)量
--executor-memory 每一個(gè)executor的內(nèi)存
--executor-cores 每一個(gè)executor的核心數(shù)
--driver-memory Driver的內(nèi)存1G-2G(保存廣播變量)
--spark.storage.memoryFraction 用于緩存的內(nèi)存占比默認(rèn)時(shí)0.6,如果代碼中沒(méi)有用到緩存 可以將內(nèi)存分配給shuffle
--spark.shuffle.memoryFraction 用戶shuffle的內(nèi)存占比默認(rèn)0.2
總的內(nèi)存=num-executors*executor-memory
總的核數(shù)=num-executors*executor-cores
spark on yarn 資源設(shè)置標(biāo)準(zhǔn)
1、單個(gè)任務(wù)總的內(nèi)存和總的核數(shù)一般做多在yarn總資源的1/3到1/2之間
比如公司集群有10太服務(wù)器
單臺(tái)服務(wù)器內(nèi)存是128G,核數(shù)是40
yarn總的內(nèi)存=10*128G=1280G*0.8=960G 需要預(yù)留一般分內(nèi)存給系統(tǒng)進(jìn)程
yarn總的核數(shù)=40*10=400
提交單個(gè)spark任務(wù)資源上線
總的內(nèi)存=960G *(1/3| 1/2) = 300G-500G
總的核數(shù)=400 * (1/3| 1/2) = 120 - 200
2、在上線內(nèi)再按照需要處理的數(shù)據(jù)量來(lái)合理指定資源 -- 最理想的情況是一個(gè)task對(duì)應(yīng)一個(gè)core
2.1、數(shù)據(jù)量比較小 - 10G
10G = 80個(gè)block = rdd80分區(qū) = 80個(gè)task
- 最理想資源指定 -- 剩余資源充足
--num-executors=40
--executor-memory=4G
--executor-cores=2
- 資源里面最優(yōu)的方式 -- 剩余資源不是很充足時(shí)
--num-executors=20
--executor-memory=4G
--executor-cores=2
2.2、數(shù)據(jù)量比較大時(shí) - 80G
80G = 640block = 640分區(qū) = 640task
- 最理想資源指定 -- 剩余資源充足, 如果剩余資源不夠,還需要減少指定的資源
--num-executors=100
--executor-memory=4G
--executor-cores=2
-- spark.locality.wait: spark task 再executor中執(zhí)行前的等待時(shí)間 默認(rèn)3秒
spark.yarn.executor.memoryOverhead : 堆外內(nèi)存 默認(rèn)等于堆內(nèi)存的10%
spark.network.timeout spark網(wǎng)絡(luò)鏈接的超時(shí)時(shí)間 默認(rèn)120s
提高數(shù)據(jù)本地化優(yōu)先級(jí)別
附錄:參數(shù)調(diào)優(yōu)詳解
1參數(shù)調(diào)優(yōu) 1.1num-executors 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來(lái)執(zhí)行。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照你的設(shè)置來(lái)在集群的各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)相應(yīng)數(shù)量的Executor進(jìn)程。 這個(gè)參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的。 參數(shù)調(diào)優(yōu)建議:每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無(wú)法充分利用集群資源;設(shè)置的太多的話,大部分隊(duì)列可能無(wú)法給予充分的資源。 1.2executor-memory 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見(jiàn)的JVM OOM異常,也有直接的關(guān)聯(lián)。 參數(shù)調(diào)優(yōu)建議:每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。但是這只是一個(gè)參考值,具體的設(shè)置還是得根據(jù)不同部門的資源隊(duì)列來(lái)定??梢钥纯醋约簣F(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少, num-executors乘以executor-memory,就代表了你的Spark作業(yè)申請(qǐng)到的總內(nèi)存量(也就是所有Executor進(jìn)程的內(nèi)存總和),這個(gè)量是不能超過(guò)隊(duì)列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列, 那么申請(qǐng)的總內(nèi)存量最好不要超過(guò)資源隊(duì)列最大總內(nèi)存的1/3~1/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無(wú)法運(yùn)行。 1.3executor-cores 可以用total-executor-cores總的核數(shù) executor-cores = total-executor-cores / num-executors 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。因?yàn)槊總€(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task線程,因此每個(gè)Executor進(jìn)程的CPU core數(shù)量越多, 越能夠快速地執(zhí)行完分配給自己的所有task線程。 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適。同樣得根據(jù)不同部門的資源隊(duì)列來(lái)定,可以看看自己的資源隊(duì)列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量, 來(lái)決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU core。同樣建議,如果是跟他人共享這個(gè)隊(duì)列,那么num-executors * executor-cores不要超過(guò)隊(duì)列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運(yùn)行。 1.4driver-memory 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來(lái)說(shuō)不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大, 否則會(huì)出現(xiàn)OOM內(nèi)存溢出的問(wèn)題。 1.5spark.default.parallelism 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)stage的默認(rèn)task數(shù)量。這個(gè)參數(shù)極為重要,如果不設(shè)置可能會(huì)直接影響你的Spark作業(yè)性能。 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個(gè)較為合適。很多同學(xué)常犯的一個(gè)錯(cuò)誤就是不去設(shè)置這個(gè)參數(shù),那么此時(shí)就會(huì)導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來(lái)設(shè)置task的數(shù)量, 默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)task。通常來(lái)說(shuō),Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個(gè)task),如果task數(shù)量偏少的話,就會(huì)導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下,無(wú)論你的Executor進(jìn)程有多少個(gè), 內(nèi)存和CPU有多大,但是task只有1個(gè)或者10個(gè),那么90%的Executor進(jìn)程可能根本就沒(méi)有task執(zhí)行,也就是白白浪費(fèi)了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適, 比如Executor的總CPU core數(shù)量為300個(gè),那么設(shè)置1000個(gè)task是可以的,此時(shí)可以充分地利用Spark集群的資源。 1.6spark.storage.memoryFraction 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說(shuō),默認(rèn)Executor 60%的內(nèi)存,可以用來(lái)保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時(shí), 可能數(shù)據(jù)就不會(huì)持久化,或者數(shù)據(jù)會(huì)寫入磁盤。 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。 但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過(guò)spark web ui可以觀察到作業(yè)的gc耗時(shí)), 意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。 1.7spark.shuffle.memoryFraction 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置shuffle過(guò)程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說(shuō),Executor默認(rèn)只有20%的內(nèi)存用來(lái)進(jìn)行該操作。 shuffle操作在進(jìn)行聚合時(shí),如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制,那么多余的數(shù)據(jù)就會(huì)溢寫到磁盤文件中去,此時(shí)就會(huì)極大地降低性能。 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時(shí),建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過(guò)程中數(shù)據(jù)過(guò)多時(shí)內(nèi)存不夠用,必須溢寫到磁盤上, 降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。 資源參數(shù)的調(diào)優(yōu),沒(méi)有一個(gè)固定的值,需要同學(xué)們根據(jù)自己的實(shí)際情況(包括Spark作業(yè)中的shuffle操作數(shù)量、RDD持久化操作數(shù)量以及spark web ui中顯示的作業(yè)gc情況),同時(shí)參考本篇文章中給出的原理以及調(diào)優(yōu)建議, 合理地設(shè)置上述參數(shù)。
spark任務(wù)提交參數(shù)設(shè)置模板(企業(yè)中)
spark-submit
--master yarn-cluster
--num-executors = 50
--executor-memory = 4G
--executor-cores = 2
--driver-memory = 2G
--conf spark.storage.memoryFraction=0.4
--conf spark.shuffle.memoryFraction=0.4
--conf spark.locality.wait=10s
--conf spark.shuffle.file.buffer=64kb
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.network.timeout=200s
三、spark 數(shù)據(jù)傾斜
1、使用Hive ETL預(yù)處理數(shù)據(jù)
方案適用場(chǎng)景:如果導(dǎo)致數(shù)據(jù)傾斜的是Hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個(gè) key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),其他key才對(duì)應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場(chǎng)景需要頻繁使用Spark對(duì)Hive表 執(zhí)行某個(gè)分析操作, 那么比較適合使用這種技術(shù)方案。 方案實(shí)現(xiàn)思路:此時(shí)可以評(píng)估一下,是否可以通過(guò)Hive來(lái)進(jìn)行數(shù)據(jù)預(yù)處理(即通過(guò)Hive ETL預(yù)先對(duì) 數(shù)據(jù)按照key進(jìn)行聚合,或者是預(yù)先和其他表進(jìn)行join),然后在Spark作業(yè)中針對(duì)的數(shù)據(jù)源就不是 原來(lái)的Hive表了, 而是預(yù)處理后的Hive表。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過(guò)聚合或join操作了,那么 在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。 方案實(shí)現(xiàn)原理:這種方案從根源上解決了數(shù)據(jù)傾斜,因?yàn)閺氐妆苊饬嗽赟park中執(zhí)行shuffle類算子 ,那么肯定就不會(huì)有數(shù)據(jù)傾斜的問(wèn)題了。但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本。 因?yàn)楫吘箶?shù)據(jù)本身就存在分布不均勻的問(wèn)題,所以Hive ETL中進(jìn)行g(shù)roup by或者join等shuffle操作 時(shí),還是會(huì)出現(xiàn)數(shù)據(jù)傾斜,導(dǎo)致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中, 避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。
2、過(guò)濾少數(shù)導(dǎo)致傾斜的key
方案適用場(chǎng)景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個(gè),而且對(duì)計(jì)算本身的影響并不大的話,那么很 適合使用這種方案。比如99%的key就對(duì)應(yīng)10條數(shù)據(jù),但是只有一個(gè)key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),從而導(dǎo) 致了數(shù)據(jù)傾斜。 方案實(shí)現(xiàn)思路:如果我們判斷那少數(shù)幾個(gè)數(shù)據(jù)量特別多的key,對(duì)作業(yè)的執(zhí)行和計(jì)算結(jié)果不是特別 重要的話,那么干脆就直接過(guò)濾掉那少數(shù)幾個(gè)key。比如,在Spark SQL中可以使用where子句過(guò)濾掉這些key或者在Spark Core中 對(duì)RDD執(zhí)行filter算子過(guò)濾掉這些key。如果需要每次作業(yè)執(zhí)行時(shí), 動(dòng)態(tài)判定哪些key的數(shù)據(jù)量最多然后再進(jìn)行過(guò)濾,那么可以使用sample算子對(duì)RDD進(jìn)行采樣,然后 計(jì)算出每個(gè)key的數(shù)量,取數(shù)據(jù)量最多的key過(guò)濾掉即可。 方案實(shí)現(xiàn)原理:將導(dǎo)致數(shù)據(jù)傾斜的key給過(guò)濾掉之后,這些key就不會(huì)參與計(jì)算了,自然不可能產(chǎn)生 數(shù)據(jù)傾斜。
3、提高shuffle操作的并行度
方案實(shí)現(xiàn)思路:在對(duì)RDD執(zhí)行shuffle算子時(shí),給shuffle算子傳入一個(gè)參數(shù),比如 reduceByKey(1000),該參數(shù)就設(shè)置了這個(gè)shuffle算子執(zhí)行時(shí)shuffle read task的數(shù)量。對(duì)于 Spark SQL中的shuffle類語(yǔ)句, 比如group by、join等,需要設(shè)置一個(gè)參數(shù),即 spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對(duì)于很 多場(chǎng)景來(lái)說(shuō)都有點(diǎn)過(guò)小。 方案實(shí)現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè) task,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù)。舉例來(lái)說(shuō),如果原本有5個(gè)key,每個(gè)key對(duì)應(yīng)10條 數(shù)據(jù), 這5個(gè)key都是分配給一個(gè)task的,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個(gè)task就分配到一個(gè)key,即每個(gè)task就處理10條數(shù)據(jù),那么自然每個(gè)task的執(zhí)行時(shí) 間都會(huì)變短了。具體原理如下圖所示。
4、雙重聚合join
方案適用場(chǎng)景:對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by 語(yǔ)句進(jìn)行分組聚合時(shí),比較適用這種方案。 方案實(shí)現(xiàn)思路:這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè)key 都打上一個(gè)隨機(jī)數(shù),比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了, 比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著 對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作, 進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì) 變成了(1_hello, 2) (2_hello, 2)。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2),再次 進(jìn)行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。 方案實(shí)現(xiàn)原理:將原本相同的key通過(guò)附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被 一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過(guò)多的問(wèn)題。 接著去除掉隨機(jī)前綴, 再次進(jìn)行全局聚合,就可以得到最終的結(jié)果
5、將reduce join轉(zhuǎn)為map join
方案適用場(chǎng)景:在對(duì)RDD使用join類操作,或者是在Spark SQL中使用join語(yǔ)句時(shí),而且join操作中 的一個(gè)RDD或表的數(shù)據(jù)量比較小(比如幾百M(fèi)或者一兩G),比較適用此方案。 方案實(shí)現(xiàn)思路:不使用join算子進(jìn)行連接操作,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作, 進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。 將較小RDD中的數(shù)據(jù)直接通過(guò) collect算子拉取到Driver端的內(nèi)存中來(lái),然后對(duì)其創(chuàng)建一個(gè)Broadcast變量;接著對(duì)另外一個(gè)RDD 執(zhí)行map類算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù), 與當(dāng)前RDD的每 一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式 連接起來(lái)。 方案實(shí)現(xiàn)原理:普通的join是會(huì)走shuffle過(guò)程的,而一旦shuffle,就相當(dāng)于會(huì)將相同key的數(shù)據(jù)拉 取到一個(gè)shuffle read task中再進(jìn)行join,此時(shí)就是reduce join。但是如果一個(gè)RDD是比較小的, 則可以采用廣播小RDD全量數(shù)據(jù)+map算子來(lái)實(shí)現(xiàn)與join同樣的效果,也就是map join,此時(shí)就不 會(huì)發(fā)生shuffle操作,也就不會(huì)發(fā)生數(shù)據(jù)傾斜
6、采樣傾斜key并分拆join操作
方案適用場(chǎng)景:兩個(gè)RDD/Hive表進(jìn)行join的時(shí)候,如果數(shù)據(jù)量都比較大,無(wú)法采用“解決方案五 ”,那么此時(shí)可以看一下兩個(gè)RDD/Hive表中的key分布情況。如果出現(xiàn)數(shù)據(jù)傾斜, 是因?yàn)槠渲心骋?個(gè)RDD/Hive表中的少數(shù)幾個(gè)key的數(shù)據(jù)量過(guò)大,而另一個(gè)RDD/Hive表中的所有key都分布比較均 勻,那么采用這個(gè)解決方案是比較合適的。 方案實(shí)現(xiàn)思路: 對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過(guò)大的key的那個(gè)RDD,通過(guò)sample算子采樣出一份樣本來(lái),然后統(tǒng)計(jì)一下每個(gè) key的數(shù)量,計(jì)算出來(lái)數(shù)據(jù)量最大的是哪幾個(gè)key。 然后將這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)從原來(lái)的RDD中拆分出來(lái),形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以 內(nèi)的隨機(jī)數(shù)作為前綴,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD。 接著將需要join的另一個(gè)RDD,也過(guò)濾出來(lái)那幾個(gè)傾斜key對(duì)應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù) 據(jù)膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè) RDD。 再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join,此時(shí)就可以將原先相同的key打 散成n份,分散到多個(gè)task中去進(jìn)行join了。 而另外兩個(gè)普通的RDD就照常join即可。 最后將兩次join的結(jié)果使用union算子合并起來(lái)即可,就是最終的join結(jié)果。
7、使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join
方案適用場(chǎng)景:如果在進(jìn)行join操作時(shí),RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜,那么進(jìn)行分拆key也沒(méi) 什么意義,此時(shí)就只能使用最后一種方案來(lái)解決問(wèn)題了。 方案實(shí)現(xiàn)思路: 該方案的實(shí)現(xiàn)思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數(shù)據(jù)分布情況,找到那個(gè)造成 數(shù)據(jù)傾斜的RDD/Hive表,比如有多個(gè)key都對(duì)應(yīng)了超過(guò)1萬(wàn)條數(shù)據(jù)。 然后將該RDD的每條數(shù)據(jù)都打上一個(gè)n以內(nèi)的隨機(jī)前綴。 同時(shí)對(duì)另外一個(gè)正常的RDD進(jìn)行擴(kuò)容,將每條數(shù)據(jù)都擴(kuò)容成n條數(shù)據(jù),擴(kuò)容出來(lái)的每條數(shù)據(jù)都依次打上一 個(gè)0~n的前綴。 最后將兩個(gè)處理后的RDD進(jìn)行join即可。 方案實(shí)現(xiàn)原理:將原先一樣的key通過(guò)附加隨機(jī)前綴變成不一樣的key,然后就可以將這些處理后的 “不同key”分散到多個(gè)task中去處理,而不是讓一個(gè)task處理大量的相同key。該方案與“解決方 案六”的不同之處就在于,上一種方案是盡量只對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行特殊處理,由于處 理過(guò)程需要擴(kuò)容RDD,因此上一種方案擴(kuò)容RDD后對(duì)內(nèi)存的占用并不大;而這一種方案是針對(duì)有大 量?jī)A斜key的情況,沒(méi)法將部分key拆分出來(lái)進(jìn)行單獨(dú)處理,因此只能對(duì)整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容,對(duì) 內(nèi)存資源要求很高。
一、常規(guī)性能調(diào)優(yōu)
1. 最優(yōu)資源配置
Spark性能調(diào)優(yōu)的第一步,就是為任務(wù)分配更多的資源,在一定范圍內(nèi),增加資源的分配與性能的提升是成正比的,實(shí)現(xiàn)了最優(yōu)的資源配置后,在此基礎(chǔ)上再考慮進(jìn)行后面論述的性能調(diào)優(yōu)策略。資源的分配在使用腳本提交Spark任務(wù)時(shí)進(jìn)行指定,標(biāo)準(zhǔn)的Spark任務(wù)提交腳本如下所示:
bin/spark-submit \
--class com.fancyry.spark.Analysis \
--master yarn--deploy-mode cluster
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \
可以進(jìn)行分配的資源如表所示:
名稱?? ?說(shuō)明
–num-executors?? ?配置Executor的數(shù)量
–driver-memory?? ?配置Driver內(nèi)存(影響不大)
–executor-memory?? ?配置每個(gè)Executor的內(nèi)存大小
–executor-cores?? ?配置每個(gè)Executor的CPU core數(shù)量
調(diào)節(jié)原則:盡量將任務(wù)分配的資源調(diào)節(jié)到可以使用的資源的最大限度。對(duì)于具體資源的分配,我們分別討論Spark的兩種Cluster運(yùn)行模式:
?第一種是SparkStandalone模式,你在提交任務(wù)前,一定知道或者可以從運(yùn)維部門獲取到你可以使用的資源情況,在編寫submit腳本的時(shí)候,就根據(jù)可用的資源情況進(jìn)行資源的分配,比如說(shuō)集群有15臺(tái)機(jī)器,每臺(tái)機(jī)器為8G內(nèi)存,2個(gè)CPU core,那么就指定15個(gè)Executor,每個(gè)Executor分配8G內(nèi)存,2個(gè)CPUcore。
?第二種是SparkYarn模式,由于Yarn使用資源隊(duì)列進(jìn)行資源的分配和調(diào)度,在編寫submit腳本的時(shí)候,就根據(jù)Spark作業(yè)要提交到的資源隊(duì)列,進(jìn)行資源的分配,比如資源隊(duì)列有400G內(nèi)存,100個(gè)CPU core,那么指定50個(gè)Executor,每個(gè)Executor分配8G內(nèi)存,2個(gè)CPU core。
對(duì)各項(xiàng)資源進(jìn)行了調(diào)節(jié)后,得到的性能提升會(huì)有如下表現(xiàn):
名稱?? ?解析
增加Executor·個(gè)數(shù)?? ?在資源允許的情況下,增加Executor的個(gè)數(shù)可以提高執(zhí)行task的并行度。比如有4個(gè)Executor,每個(gè)Executor有2個(gè)CPU core,那么可以并行執(zhí)行8個(gè)task,如果將Executor的個(gè)數(shù)增加到8個(gè)(資源允許的情況下),那么可以并行執(zhí)行16個(gè)task,此時(shí)的并行能力提升了一倍。
增加每個(gè)Executor的CPU core個(gè)數(shù)?? ?在資源允許的情況下,增加每個(gè)Executor的Cpu core個(gè)數(shù),可以提高執(zhí)行task的并行度。比如有4個(gè)Executor,每個(gè)Executor有2個(gè)CPU core,那么可以并行執(zhí)行8個(gè)task,如果將每個(gè)Executor的CPU core個(gè)數(shù)增加到4個(gè) (資源允許的情況下),那么可以并行執(zhí)行16個(gè)task,此時(shí)的并行能力提升了一倍。
增加每個(gè)Executor的內(nèi)存量?? ?在資源允許的情況下,增加每個(gè)Executor的內(nèi)存量以后,對(duì)性能的提升有三點(diǎn):1.可以緩存更多的數(shù)據(jù)(即對(duì)RDD進(jìn)行cache),寫入磁盤的數(shù)據(jù)相應(yīng)減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;2.可以為shuffle操作提供更多內(nèi)存,即有更多空間來(lái)存放reduce端拉取的數(shù)據(jù),寫入磁盤的數(shù)據(jù)相應(yīng)減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;3.可以為task的執(zhí)行提供更多內(nèi)存,在task的執(zhí)行過(guò)程中可能創(chuàng)建很多對(duì)象,內(nèi)存較小時(shí)會(huì)引發(fā)頻繁的GC,增加內(nèi)存后,可以避免頻繁的GC,提升整體性能。
補(bǔ)充:生產(chǎn)環(huán)境 Spark submit 腳本配置
bin/spark-submit \
--class com.fancy.spark.WordCount \
--master yarn \
--deploy-modecluster \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \/usr/local/spark/spark.jar
參數(shù)配置參考值:
2. RDD優(yōu)化
A、RDD復(fù)用
在對(duì) RDD 進(jìn)行算子時(shí),要避免相同的算子和計(jì)算邏輯之下對(duì) RDD 進(jìn)行重復(fù)的計(jì)算
對(duì)上圖中的RDD計(jì)算架構(gòu)進(jìn)行修改,得到如下圖所示的優(yōu)化結(jié)果:
B、RDD持久化
在Spark中,當(dāng)多次對(duì)同一個(gè)RDD執(zhí)行算子操作時(shí),每一次都會(huì)對(duì)這個(gè)RDD以之前的父RDD重新計(jì)算一次,這種情況是必須要避免的,對(duì)同一個(gè)RDD的重復(fù)計(jì)算是對(duì)資源的極大浪費(fèi),因此,必須對(duì)多次使用的RDD進(jìn)行持久化,通過(guò)持久化將公共RDD的數(shù)據(jù)緩存到內(nèi)存/磁盤中,之后對(duì)于公共RDD的計(jì)算都會(huì)從內(nèi)存/磁盤中直接獲取RDD數(shù)據(jù)。對(duì)于RDD的持久化,有兩點(diǎn)需要說(shuō)明:
?RDD的持久化是可以進(jìn)行序列化的,當(dāng)內(nèi)存無(wú)法將RDD的數(shù)據(jù)完整的進(jìn)行存放的時(shí)候,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完整存儲(chǔ)在內(nèi)存中。
?如果對(duì)于數(shù)據(jù)的可靠性要求很高,并且內(nèi)存充足,可以使用副本機(jī)制,對(duì)RDD數(shù)據(jù)進(jìn)行持久化。當(dāng)持久化啟用了復(fù)本機(jī)制時(shí),對(duì)于持久化的每個(gè)數(shù)據(jù)單元都存儲(chǔ)一個(gè)副本,放在其他節(jié)點(diǎn)上面,由此實(shí)現(xiàn)數(shù)據(jù)的容錯(cuò),一旦一個(gè)副本數(shù)據(jù)丟失,不需要重新計(jì)算,還可以使用另外一個(gè)副本。
C、RDD盡可能早的filter操作
獲取到初始RDD后,應(yīng)該考慮盡早地過(guò)濾掉不需要的數(shù)據(jù),進(jìn)而減少對(duì)內(nèi)存的占用,從而提升Spark作業(yè)的運(yùn)行效率。
3. 并行度調(diào)節(jié)
Spark作業(yè)中的并行度指各個(gè)stage的task的數(shù)量。如果并行度設(shè)置不合理而導(dǎo)致并行度過(guò)低,會(huì)導(dǎo)致資源的極大浪費(fèi),例如,20個(gè)Executor,每個(gè)Executor分配3個(gè)CPU core,而Spark作業(yè)有40個(gè)task,這樣每個(gè)Executor分配到的task個(gè)數(shù)是2個(gè),這就使得每個(gè)Executor有一個(gè)CPU core空閑,導(dǎo)致資源的浪費(fèi)。
理想的并行度設(shè)置,應(yīng)該是讓并行度與資源相匹配,簡(jiǎn)單來(lái)說(shuō)就是在資源允許的前提下,并行度要設(shè)置的盡可能大,達(dá)到可以充分利用集群資源。合理的設(shè)置并行度,可以提升整個(gè)Spark作業(yè)的性能和運(yùn)行速度。
Spark官方推薦,task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2~3倍。之所以沒(méi)有推薦task數(shù)量與CPU core總數(shù)相等,是因?yàn)閠ask的執(zhí)行時(shí)間不同,有的task執(zhí)行速度快而有的task執(zhí)行速度慢,如果task數(shù)量與CPU core總數(shù)相等,那么執(zhí)行快的task執(zhí)行完成后,會(huì)出現(xiàn)CPU core空閑的情況。如果task數(shù)量設(shè)置為CPU core總數(shù)的2~3倍,那么一個(gè)task執(zhí)行完畢后,CPU core會(huì)立刻執(zhí)行下一個(gè)task,降低了資源的浪費(fèi),同時(shí)提升了Spark作業(yè)運(yùn)行的效率。
Spark作業(yè)并行度的設(shè)置如下所示:
valconf = new SparkConf().set("spark.default.parallelism", "500")
1
4. 廣播大變量
默認(rèn)情況下,task中的算子中如果使用了外部的變量,每個(gè)task都會(huì)獲取一份變量的復(fù)本,這就造成了內(nèi)存的極大消耗。
一方面,如果后續(xù)對(duì)RDD進(jìn)行持久化,可能就無(wú)法將RDD數(shù)據(jù)存入內(nèi)存,只能寫入磁盤,磁盤IO將會(huì)嚴(yán)重消耗性能;另一方面,task在創(chuàng)建對(duì)象的時(shí)候,也許會(huì)發(fā)現(xiàn)堆內(nèi)存無(wú)法存放新創(chuàng)建的對(duì)象,這就會(huì)導(dǎo)致頻繁的GC,GC會(huì)導(dǎo)致工作線程停止,進(jìn)而導(dǎo)致Spark暫停工作一段時(shí)間,嚴(yán)重影響Spark性能。
假設(shè)當(dāng)前任務(wù)配置了20個(gè)Executor,指定500個(gè)task,有一個(gè)20M的變量被所有task共用,此時(shí)會(huì)在500個(gè)task中產(chǎn)生500個(gè)副本,耗費(fèi)集群10G的內(nèi)存,如果使用了廣播變量,那么每個(gè)Executor保存一個(gè)副本,一共消耗400M內(nèi)存,內(nèi)存消耗減少了5倍。廣播變量在每個(gè)Executor保存一個(gè)副本,此Executor的所有task共用此廣播變量,這讓變量產(chǎn)生的副本數(shù)量大大減少。
在初始階段,廣播變量只在Driver中有一份副本。task在運(yùn)行的時(shí)候,想要使用廣播變量中的數(shù)據(jù),此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量,如果本地沒(méi)有,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的復(fù)本,并由本地的BlockManager進(jìn)行管理;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量。
5. Kryo序列化
默認(rèn)情況下,Spark使用Java的序列化機(jī)制。Java的序列化機(jī)制使用方便,不需要額外的配置,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可,但是,Java序列化機(jī)制的效率不高,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大。
Kryo 序列化機(jī)制比 Java 序列化機(jī)制性能提高 10 倍左右,Spark 之所以沒(méi)有默認(rèn)使用 Kryo 作為序列化類庫(kù),是因?yàn)樗恢С炙袑?duì)象的序列化,同時(shí) Kryo 需要用戶在使用前注冊(cè)需要序列化的類型,不夠方便,但從 Spark 2.0.0 版本開(kāi)始,簡(jiǎn)單類型、簡(jiǎn)單類型數(shù)組、字符串類型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator {?
? ? ?@Overridepublic void registerClasses(Kryo kryo) {
? ? ??? ?kryo.register(StartupReportLogs.class);
? ? ?}
}
配置Kryo序列化方式的實(shí)例代碼:
//創(chuàng)建SparkConf對(duì)象
val conf = new SparkConf().setMaster(...).setAppName(...)
//使用Kryo序列化庫(kù),如果要使用Java序列化庫(kù),需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); ?
//在Kryo序列化庫(kù)中注冊(cè)自定義的類集合,如果要使用Java序列化庫(kù),需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "fancy.com.MyKryoRegistrator");
6. 調(diào)節(jié)本地化等待時(shí)長(zhǎng)
Spark 作業(yè)運(yùn)行過(guò)程中,Driver 會(huì)對(duì)每一個(gè) stage 的 task 進(jìn)行分配。根據(jù) Spark 的 task 分配算法,Spark 希望 task 能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)算在的節(jié)點(diǎn) (數(shù)據(jù)本地化思想),這樣就可以避免數(shù)據(jù)的網(wǎng)絡(luò)傳輸。通常來(lái)說(shuō),task可能不會(huì)被分配到它處理的數(shù)據(jù)所在的節(jié)點(diǎn),因?yàn)檫@些節(jié)點(diǎn)可用的資源可能已經(jīng)用盡,此時(shí),Spark會(huì)等待一段時(shí)間,默認(rèn)3s,如果等待指定時(shí)間后仍然無(wú)法在指定節(jié)點(diǎn)運(yùn)行,那么會(huì)自動(dòng)降級(jí),嘗試將task分配到比較差的本地化級(jí)別所對(duì)應(yīng)的節(jié)點(diǎn)上,比如將task分配到離它要計(jì)算的數(shù)據(jù)比較近的一個(gè)節(jié)點(diǎn),然后進(jìn)行計(jì)算,如果當(dāng)前級(jí)別仍然不行,那么繼續(xù)降級(jí)。
當(dāng) task 要處理的數(shù)據(jù)不在 task 所在節(jié)點(diǎn)上時(shí),會(huì)發(fā)生數(shù)據(jù)的傳輸。task 會(huì)通過(guò)所在節(jié)點(diǎn)的 BlockManager 獲取數(shù)據(jù),BlockManager 發(fā)現(xiàn)數(shù)據(jù)不在本地時(shí),戶通過(guò)網(wǎng)絡(luò)傳輸組件從數(shù)據(jù)所在節(jié)點(diǎn)的BlockManager 處獲取數(shù)據(jù)。網(wǎng)絡(luò)傳輸數(shù)據(jù)的情況是我們不愿意看到的,大量的網(wǎng)絡(luò)傳輸會(huì)嚴(yán)重影響性能,因此,我們希望通過(guò)調(diào)節(jié)本地化等待時(shí)長(zhǎng),如果在等待時(shí)長(zhǎng)這段時(shí)間內(nèi),目標(biāo)節(jié)點(diǎn)處理完成了一部分 task,那么當(dāng)前的 task 將有機(jī)會(huì)得到執(zhí)行,這樣就能夠改善 Spark 作業(yè)的整體性能。Spark 的本地化等級(jí)如表所示
名稱?? ?解析
PROCESS_LOCAL?? ?進(jìn)程本地化,task和數(shù)據(jù)在同一個(gè)Executor中,性能最好。
NODE_LOCAL?? ?節(jié)點(diǎn)本地化,task和數(shù)據(jù)在同一個(gè)節(jié)點(diǎn)中,但是task和數(shù)據(jù)不在同一個(gè)Executor中,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸。
RACK_LOCAL?? ?機(jī)架本地化,task和數(shù)據(jù)在同一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上,數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸。
NO_PREF?? ?對(duì)于task來(lái)說(shuō),從哪里獲取都一樣,沒(méi)有好壞之分。
ANY?? ?task和數(shù)據(jù)可以在集群的任何地方,而且不在一個(gè)機(jī)架中,性能最差。
在Spark項(xiàng)目開(kāi)發(fā)階段,可以使用client模式對(duì)程序進(jìn)行測(cè)試,此時(shí),可以在本地看到比較全的日志信息,日志信息中有明確的task數(shù)據(jù)本地化的級(jí)別,如果大部分都是 PROCESS_LOCAL,那么就無(wú)需進(jìn)行調(diào)節(jié),但是如果發(fā)現(xiàn)很多的級(jí)別都是 NODE_LOCAL、ANY,那么需要對(duì)本地化的等待時(shí)長(zhǎng)進(jìn)行調(diào)節(jié),通過(guò)延長(zhǎng)本地化等待時(shí)長(zhǎng),看看 task 的本地化級(jí)別有沒(méi)有提升,并觀察 Spark 作業(yè)的運(yùn)行時(shí)間有沒(méi)有縮短。注意,過(guò)猶不及,不要將本地化等待時(shí)長(zhǎng)延長(zhǎng)地過(guò)長(zhǎng),導(dǎo)致因?yàn)榇罅康牡却龝r(shí)長(zhǎng),使得 Spark作業(yè)的運(yùn)行時(shí)間反而增加了。Spark本地化等待時(shí)長(zhǎng)的設(shè)置如代碼所示:
val conf = new SparkConf().set("spark.locality.wait", "6")
1
二、算子調(diào)優(yōu)
1. mapPartitions
普通的map算子對(duì)RDD中的每一個(gè)元素進(jìn)行操作,而mapPartitions算子對(duì)RDD中每一個(gè)分區(qū)進(jìn)行操作。如果是普通的map算子,假設(shè)一個(gè)partition有1萬(wàn)條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬(wàn)次,也就是對(duì)每個(gè)元素進(jìn)行操作。
如果是 mapPartition 算子,由于一個(gè) task 處理一個(gè) RDD 的 partition,那么一個(gè)task只會(huì)執(zhí)行一次function,function 一次接收所有的 partition 數(shù)據(jù),效率比較高。
比如,當(dāng)要把RDD中的所有數(shù)據(jù)通過(guò)JDBC寫入數(shù)據(jù),如果使用map算子,那么需要對(duì)RDD中的每一個(gè)元素都創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接,這樣對(duì)資源的消耗很大,如果使用mapPartitions算子,那么針對(duì)一個(gè)分區(qū)的數(shù)據(jù),只需要建立一個(gè)數(shù)據(jù)庫(kù)連接。
mapPartitions算子也存在一些缺點(diǎn):對(duì)于普通的map操作,一次處理一條數(shù)據(jù),如果在處理了2000條數(shù)據(jù)后內(nèi)存不足,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉;但是如果使用mapPartitions算子,但數(shù)據(jù)量非常大時(shí),function一次處理一個(gè)分區(qū)的數(shù)據(jù),如果一旦內(nèi)存不足,此時(shí)無(wú)法回收內(nèi)存,就可能會(huì)OOM,即內(nèi)存溢出。
因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時(shí)候,此時(shí)使用mapPartitions算子對(duì)性能的提升效果還是不錯(cuò)的。(當(dāng)數(shù)據(jù)量很大的時(shí)候,一旦使用mapPartitions算子,就會(huì)直接OOM)在項(xiàng)目中,應(yīng)該首先估算一下RDD的數(shù)據(jù)量、每個(gè)partition的數(shù)據(jù)量,以及分配給每個(gè)Executor的內(nèi)存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。
2. foreachPartition 優(yōu)化數(shù)據(jù)庫(kù)操作
在生產(chǎn)環(huán)境中,通常使用 foreachPartition 算子來(lái)完成數(shù)據(jù)庫(kù)的寫入,通過(guò) foreachPartition 算子的特性,可以優(yōu)化寫數(shù)據(jù)庫(kù)的性能。
如果使用 foreach 算子完成數(shù)據(jù)庫(kù)的操作,由于 foreach 算子是遍歷RDD的每條數(shù)據(jù),因此,每條數(shù)據(jù)都會(huì)建立一個(gè)數(shù)據(jù)庫(kù)連接,這是對(duì)資源的極大浪費(fèi),因此,對(duì)于寫數(shù)據(jù)庫(kù)操作,我們應(yīng)當(dāng)使用 foreachPartition 算子。
與 mapPartitions 算子非常相似,foreachPartition 是將RDD 的每個(gè)分區(qū)作為遍歷對(duì)象,一次處理一個(gè)分區(qū)的數(shù)據(jù),也就是說(shuō),如果涉及數(shù)據(jù)庫(kù)的相關(guān)操作,一個(gè)分區(qū)的數(shù)據(jù)只需要?jiǎng)?chuàng)建一次數(shù)據(jù)庫(kù)連接,如圖所示:
使用了foreachPartition算子后,可以獲得以下的性能提升:
?對(duì)于我們寫的function函數(shù),一次處理一整個(gè)分區(qū)的數(shù)據(jù);
?對(duì)于一個(gè)分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫(kù)連接;
?只需要向數(shù)據(jù)庫(kù)發(fā)送一次SQL語(yǔ)句和多組參數(shù);
在生產(chǎn)環(huán)境中,全部都會(huì)使用foreachPartition算子完成數(shù)據(jù)庫(kù)操作。foreachPartition算子存在一個(gè)問(wèn)題,與mapPartitions算子類似,如果一個(gè)分區(qū)的數(shù)據(jù)量特別大,可能會(huì)造成OOM,即內(nèi)存溢出。
3. filter 與 coalesce 的配合使用
在Spark任務(wù)中我們經(jīng)常會(huì)使用filter算子完成RDD中數(shù)據(jù)的過(guò)濾,在任務(wù)初始階段,從各個(gè)分區(qū)中加載到的數(shù)據(jù)量是相近的,但是一旦進(jìn)過(guò)filter過(guò)濾后,每個(gè)分區(qū)的數(shù)據(jù)量有可能會(huì)存在較大差異,如圖所示:
根據(jù)圖中信息我們可以發(fā)現(xiàn)兩個(gè)問(wèn)題:
? 每個(gè)partition的數(shù)據(jù)量變小了,如果還按照之前與partition相等的task個(gè)數(shù)去處理當(dāng)前數(shù)據(jù),有點(diǎn)浪費(fèi)task的計(jì)算資源;
? 每個(gè)partition的數(shù)據(jù)量不一樣,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition數(shù)據(jù)的時(shí)候,每個(gè)task要處理的數(shù)據(jù)量不同,這很有可能導(dǎo)致數(shù)據(jù)傾斜問(wèn)題。
如上圖所示,第二個(gè)分區(qū)的數(shù)據(jù)過(guò)濾后只剩100條,而第三個(gè)分區(qū)的數(shù)據(jù)過(guò)濾后剩下800條,在相同的處理邏輯下,第二個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量與第三個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量差距達(dá)到了8倍,這也會(huì)導(dǎo)致運(yùn)行速度可能存在數(shù)倍的差距,這也就是數(shù)據(jù)傾斜問(wèn)題。
針對(duì)上述的兩個(gè)問(wèn)題,我們分別進(jìn)行分析:
?針對(duì)第一個(gè)問(wèn)題,既然分區(qū)的數(shù)據(jù)量變小了,我們希望可以對(duì)分區(qū)數(shù)據(jù)進(jìn)行重新分配,比如將原來(lái)4個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)化到2個(gè)分區(qū)中,這樣只需要用后面的兩個(gè)task進(jìn)行處理即可,避免了資源的浪費(fèi)。
?針對(duì)第二個(gè)問(wèn)題,解決方法和第一個(gè)問(wèn)題的解決方法非常相似,對(duì)分區(qū)數(shù)據(jù)重新分配,讓每個(gè)partition中的數(shù)據(jù)量差不多,這就避免了數(shù)據(jù)傾斜問(wèn)題。
那么具體應(yīng)該如何實(shí)現(xiàn)上面的解決思路?我們需要 coalesce 算子。repartition 與 coalesce 都可以用來(lái)進(jìn)行重分區(qū),其中 repartition 只是 coalesce 接口中 shuffle為 true 的簡(jiǎn)易實(shí)現(xiàn),coalesce 默認(rèn)情況下不進(jìn)行 shuffle,但是可以通過(guò)參數(shù)進(jìn)行設(shè)置。假設(shè)我們希望將原本的分區(qū)個(gè)數(shù) A 通過(guò)重新分區(qū)變?yōu)?B,那么有以下幾種情況:
?A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))
A、A與B相差值不大此時(shí)使用coalesce即可,無(wú)需shuffle過(guò)程。
B、A與B相差值很大
此時(shí)可以使用coalesce并且不啟用shuffle過(guò)程,但是會(huì)導(dǎo)致合并過(guò)程性能低下,所以推薦設(shè)置coalesce的第二個(gè)參數(shù)為true,即啟動(dòng)shuffle過(guò)程。
?A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))
此時(shí)使用 repartition 即可,如果使用 coalesce 需要將 shuffle 設(shè)置為 true,否則 coalesce 無(wú)效。我們可以在filter 操作之后,使用coalesce算子針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況,壓縮partition的數(shù)量,而且讓每個(gè)partition的數(shù)據(jù)量盡量均勻緊湊,以便于后面的task進(jìn)行計(jì)算操作,在某種程度上能夠在一定程度上提升性能。
注意:local模式是進(jìn)程內(nèi)模擬集群運(yùn)行,已經(jīng)對(duì)并行度和分區(qū)數(shù)量有了一定的內(nèi)部?jī)?yōu)化,因此不用去設(shè)置并行度和分區(qū)數(shù)量。
4. repartition解決SparkSQL低并行度問(wèn)題
在第一節(jié)的常規(guī)性能調(diào)優(yōu)中我們講解了并行度的調(diào)節(jié)策略,但是,并行度的設(shè)置對(duì)于 Spark SQL 是不生效的,用戶設(shè)置的并行度只對(duì)于 Spark SQL 以外的所有 Spark 的 stage 生效。
Spark SQL 的并行度不允許用戶自己指定,Spark SQL自己會(huì)默認(rèn)根據(jù) hive 表對(duì)應(yīng)的 HDFS 文件的 split 個(gè)數(shù)自動(dòng)設(shè)置 Spark SQL 所在的那個(gè) stage 的并行度,用戶自己通 spark.default.parallelism 參數(shù)指定的并行度,只會(huì)在沒(méi) Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度無(wú)法手動(dòng)設(shè)置,如果數(shù)據(jù)量較大,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯,而Spark SQL自動(dòng)設(shè)置的task數(shù)量很少,這就意味著每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行非常復(fù)雜的處理邏輯,這就可能表現(xiàn)為第一個(gè)有Spark SQL的 stage 速度很慢,而后續(xù)的沒(méi)有 Spark SQL 的 stage 運(yùn)行速度非常快。
為了解決Spark SQL無(wú)法設(shè)置并行度和task數(shù)量的問(wèn)題,我們可以使用repartition算子。
Spark SQL這一步的并行度和task數(shù)量肯定是沒(méi)有辦法去改變了,但是,對(duì)于Spark SQL查詢出來(lái)的RDD,立即使用repartition算子,去重新進(jìn)行分區(qū),這樣可以重新分區(qū)為多個(gè)partition,從repartition之后的RDD操作,由于不再設(shè)計(jì)Spark SQL,因此stage的并行度就會(huì)等于你手動(dòng)設(shè)置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯。
5. reduceByKey 預(yù)聚合
reduceByKey相較于普通的shuffle操作一個(gè)顯著的特點(diǎn)就是會(huì)進(jìn)行map端的本地聚合,map端會(huì)先對(duì)本地的數(shù)據(jù)進(jìn)行combine操作,然后將數(shù)據(jù)寫入給下個(gè)stage的每個(gè)task創(chuàng)建的文件中,也就是在map端,對(duì)每一個(gè)key對(duì)應(yīng)的value,執(zhí)行reduceByKey算子函數(shù)。reduceByKey算子的執(zhí)行過(guò)程如圖所示:
使用reduceByKey對(duì)性能的提升如下:
?本地聚合后,在map端的數(shù)據(jù)量變少,減少了磁盤IO,也減少了對(duì)磁盤空間的占用;
?本地聚合后,下一個(gè)stage拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量;
?本地聚合后,在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用減少;
?本地聚合后,在reduce端進(jìn)行聚合的數(shù)據(jù)量減少。
基于reduceByKey的本地聚合特征,我們應(yīng)該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey與groupByKey的運(yùn)行原理如圖所示:
根據(jù)上圖可知,groupByKey 不會(huì)進(jìn)行 map 端的聚合,而是將所有 map 端的數(shù)據(jù) shuffle 到 reduce 端,然后在 reduce 端進(jìn)行數(shù)據(jù)的聚合操作。由于 reduceByKey 有 map 端聚合的特性,使得網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量減小,因此效率要明顯高于 groupByKey。
三、Shuffle調(diào)優(yōu)
1. 調(diào)節(jié)map端緩沖區(qū)大小
在 Spark 任務(wù)運(yùn)行過(guò)程中,如果 shuffle 的 map 端處理的數(shù)據(jù)量比較大,但是 map 端緩沖的大小是固定的,可能會(huì)出現(xiàn) map 端緩沖數(shù)據(jù)頻繁 spill 溢寫到磁盤文件中的情況,使得性能非常低下,通過(guò)調(diào)節(jié)map 端緩沖的大小,可以避免頻繁的磁盤IO操作,進(jìn)而提升Spark任務(wù)的整體性能。
map 端緩沖的默認(rèn)配置是 32KB,如果每個(gè) task 處理 640KB 的數(shù)據(jù),那么會(huì)發(fā)生 640/32 = 20 次溢寫,如果每個(gè) task 處理 64000KB的數(shù)據(jù),機(jī)會(huì)發(fā)生64000/32=2000此溢寫,這對(duì)于性能的影響是非常嚴(yán)重的。
map端緩沖的配置方法如代碼清單所示:
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
1
2. 調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)大小
Spark Shuffle過(guò)程中,shuffle reduce task的 buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量,如果內(nèi)存資源較為充足,適當(dāng)增加拉取數(shù)據(jù)緩沖區(qū)的大小,可以減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。
reduce 端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過(guò) spark.reducer.maxSizeInFlight 參數(shù)進(jìn)行設(shè)置,默認(rèn)為48MB,該參數(shù)的設(shè)置方法如代碼清單所示:
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
1
3. 調(diào)節(jié)reduce端拉取數(shù)據(jù)重試次數(shù)
Spark Shuffle 過(guò)程中,reduce task 拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試。對(duì)于那些包含了特別耗時(shí)的 shuffle 操作的作業(yè),建議增加重試最大次數(shù) (比如60次),以避免由于 JVM 的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對(duì)于針對(duì)超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過(guò)程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
reduce端拉取數(shù)據(jù)重試次數(shù)可以通過(guò)spark.shuffle.io.maxRetries參數(shù)進(jìn)行設(shè)置,該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒(méi)有成功,就可能會(huì)導(dǎo)致作業(yè)執(zhí)行失敗,默認(rèn)為3,該參數(shù)的設(shè)置方法如代碼清單所示:
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")
1
4. 調(diào)節(jié)reduce端拉取數(shù)據(jù)等待間隔
Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試,在一次失敗后,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試,可以通過(guò)加大間隔時(shí)長(zhǎng) (比如60s),以增加shuffle 操作的穩(wěn)定性。reduce端拉取數(shù)據(jù)等待間隔可以通過(guò) spark.shuffle.io.retryWait 參數(shù)進(jìn)行設(shè)置,默認(rèn)值為 5s,該參數(shù)的設(shè)置方法如代碼清單所示:
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
1
5. 調(diào)節(jié)SortShuffle排序操作閾值
對(duì)于SortShuffleManager,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過(guò)程中不會(huì)進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤文件都合并成一個(gè)文件,并會(huì)創(chuàng)建單獨(dú)的索引文件。
當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量,那么此時(shí)map-side就不會(huì)進(jìn)行排序了,減少了排序的性能開(kāi)銷,但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高。SortShuffleManager排序操作閾值的設(shè)置可以通過(guò)spark.shuffle.sort. bypassMergeThreshold這一參數(shù)進(jìn)行設(shè)置,默認(rèn)值為200,該參數(shù)的設(shè)置方法如代碼清單所示:
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")
1
四、JVM調(diào)優(yōu)
對(duì)于JVM調(diào)優(yōu),首先應(yīng)該明確,full gc/minor gc,都會(huì)導(dǎo)致JVM的工作線程停止工作,即stop the world。
1. 降低cache操作的內(nèi)存占比
A、靜態(tài)內(nèi)存管理機(jī)制根據(jù)Spark靜態(tài)內(nèi)存管理機(jī)制,堆內(nèi)存被劃分為了兩塊,Storage和Execution。
Storage 主要用于緩存 RDD 數(shù)據(jù)和 broadcast 數(shù)據(jù),Execution 主要用于緩存在shuffle過(guò)程中產(chǎn)生的中間數(shù)據(jù),Storage占系統(tǒng)內(nèi)存的60%,Execution占系統(tǒng)內(nèi)存的20%,并且兩者完全獨(dú)立。在一般情況下,Storage的內(nèi)存都提供給了cache操作,但是如果在某些情況下cache操作內(nèi)存不是很緊張,而task的算子中創(chuàng)建的對(duì)象很多,Execution內(nèi)存又相對(duì)較小,這回導(dǎo)致頻繁的minor gc,甚至于頻繁的full gc,進(jìn)而導(dǎo)致Spark頻繁的停止工作,性能影響會(huì)很大。
在Spark UI中可以查看每個(gè)stage的運(yùn)行情況,包括每個(gè)task的運(yùn)行時(shí)間、gc時(shí)間等等,如果發(fā)現(xiàn)gc太頻繁,時(shí)間太長(zhǎng),就可以考慮調(diào)節(jié)Storage的內(nèi)存占比,讓task執(zhí)行算子函數(shù)式,有更多的內(nèi)存可以使用。Storage內(nèi)存區(qū)域可以通過(guò)spark.storage.memoryFraction參數(shù)進(jìn)行指定,默認(rèn)為0.6,即60%,可以逐級(jí)向下遞減,如代碼清單所示:
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
1
B、統(tǒng)一內(nèi)存管理機(jī)制根據(jù)Spark統(tǒng)一內(nèi)存管理機(jī)制,堆內(nèi)存被劃分為了兩塊,Storage和Execution。Storage主要用于緩存數(shù)據(jù),Execution主要用于緩存在shuffle過(guò)程中產(chǎn)生的中間數(shù)據(jù),兩者所組成的內(nèi)存部分稱為統(tǒng)一內(nèi)存,Storage和Execution各占統(tǒng)一內(nèi)存的50%,由于動(dòng)態(tài)占用機(jī)制的實(shí)現(xiàn),shuffle過(guò)程需要的內(nèi)存過(guò)大時(shí),會(huì)自動(dòng)占用Storage的內(nèi)存區(qū)域,因此無(wú)需手動(dòng)進(jìn)行調(diào)節(jié)。
2. 調(diào)節(jié)Executor堆外內(nèi)存
Executor的堆外內(nèi)存主要用于程序的共享庫(kù)、PermSpace、線程Stack和一些Memory mapping等, 或者類C方式allocate object。有時(shí),如果你的Spark作業(yè)處理的數(shù)據(jù)量非常大,達(dá)到幾億的數(shù)據(jù)量,此時(shí)運(yùn)行Spark作業(yè)會(huì)時(shí)不時(shí)地報(bào)錯(cuò),例如shuffle output file cannot find,executor lost,task lost,out of memory等,這可能是Executor的堆外內(nèi)存不太夠用,導(dǎo)致Executor在運(yùn)行的過(guò)程中內(nèi)存溢出。
stage的task在運(yùn)行的時(shí)候,可能要從一些Executor中去拉取shuffle map output文件,但是Executor可能已經(jīng)由于內(nèi)存溢出掛掉了,其關(guān)聯(lián)的BlockManager也沒(méi)有了,這就可能會(huì)報(bào)出shuffle output file cannot find,executor lost,task lost,out of memory等錯(cuò)誤,此時(shí),就可以考慮調(diào)節(jié)一下Executor的堆外內(nèi)存,也就可以避免報(bào)錯(cuò),與此同時(shí),堆外內(nèi)存調(diào)節(jié)的比較大的時(shí)候,對(duì)于性能來(lái)講,也會(huì)帶來(lái)一定的提升。
默認(rèn)情況下,Executor堆外內(nèi)存上限大概為300多MB,在實(shí)際的生產(chǎn)環(huán)境下,對(duì)海量數(shù)據(jù)進(jìn)行處理的時(shí)候,這里都會(huì)出現(xiàn)問(wèn)題,導(dǎo)致Spark作業(yè)反復(fù)崩潰,無(wú)法運(yùn)行,此時(shí)就會(huì)去調(diào)節(jié)這個(gè)參數(shù),到至少1G,甚至于2G、4G。
Executor堆外內(nèi)存的配置需要在spark-submit腳本里配置,如代碼清單所示:
--conf spark.yarn.executor.memoryOverhead=2048?
1
以上參數(shù)配置完成后,會(huì)避免掉某些JVM OOM的異常問(wèn)題,同時(shí),可以提升整體 Spark 作業(yè)的性能。
3. 調(diào)節(jié)連接等待時(shí)長(zhǎng)
在Spark作業(yè)運(yùn)行過(guò)程中,Executor優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù),如果本地BlockManager沒(méi)有的話,會(huì)通過(guò)TransferService遠(yuǎn)程連接其他節(jié)點(diǎn)上Executor的BlockManager來(lái)獲取數(shù)據(jù)。
如果task在運(yùn)行過(guò)程中創(chuàng)建大量對(duì)象或者創(chuàng)建的對(duì)象較大,會(huì)占用大量的內(nèi)存,這回導(dǎo)致頻繁的垃圾回收,但是垃圾回收會(huì)導(dǎo)致工作現(xiàn)場(chǎng)全部停止,也就是說(shuō),垃圾回收一旦執(zhí)行,Spark的Executor進(jìn)程就會(huì)停止工作,無(wú)法提供相應(yīng),此時(shí),由于沒(méi)有響應(yīng),無(wú)法建立網(wǎng)絡(luò)連接,會(huì)導(dǎo)致網(wǎng)絡(luò)連接超時(shí)。
在生產(chǎn)環(huán)境下,有時(shí)會(huì)遇到file not found、file lost這類錯(cuò)誤,在這種情況下,很有可能是 Executor 的BlockManager在拉取數(shù)據(jù)的時(shí)候,無(wú)法建立連接,然后超過(guò)默認(rèn)的連接等待時(shí)長(zhǎng)60s后,宣告數(shù)據(jù)拉取失敗,如果反復(fù)嘗試都拉取不到數(shù)據(jù),可能會(huì)導(dǎo)致Spark作業(yè)的崩潰。這種情況也可能會(huì)導(dǎo)致DAGScheduler反復(fù)提交幾次stage,TaskScheduler返回提交幾次task,大大延長(zhǎng)了我們的Spark作業(yè)的運(yùn)行時(shí)間。
優(yōu)化目的
Spark調(diào)優(yōu)的目標(biāo)是在不影響其他業(yè)務(wù)正常運(yùn)行的前提下,高效的完成業(yè)務(wù)目標(biāo),通常為了達(dá)成該目標(biāo),一般需要最大限度利用集群的物理資源,如CPU、內(nèi)存、磁盤IO,使其某一項(xiàng)達(dá)到瓶頸。
Spark-core的優(yōu)化
Yarn 模式下動(dòng)態(tài)資源調(diào)度
原理:
動(dòng)態(tài)資源調(diào)度就是為了解決的資源浪費(fèi)和資源不合理,根據(jù)當(dāng)前應(yīng)用任務(wù)的負(fù)載情況,實(shí)時(shí)的增減Executor個(gè)數(shù),從而實(shí)現(xiàn)動(dòng)態(tài)分配資源,使整個(gè)Spark系統(tǒng)更加健康。
適合場(chǎng)景:批任務(wù)。特別在使用Spark作為一個(gè)常駐的服務(wù)時(shí)候,動(dòng)態(tài)資源調(diào)度將大大的提高資源的利用率。例如JDBCServer服務(wù),大多數(shù)時(shí)間該進(jìn)程并不接受JDBC請(qǐng)求,因此將這段空閑時(shí)間的資源釋放出來(lái),將極大的節(jié)約集群的資源
條件:必須開(kāi)啟Yarn External Shuffle才能使用這個(gè)功能
spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true //開(kāi)啟動(dòng)態(tài)資源調(diào)度
spark.dynamicAllocation.minExecutors //最小Executor個(gè)數(shù)。 ??
spark.dynamicAllocation.initialExecutors //初始Executor個(gè)數(shù)。 ??
spark.dynamicAllocation.maxExecutors ? //最大executor個(gè) ?
spark.dynamicAllocation.executorIdleTimeout ? ?//普通Executor空閑超時(shí)時(shí)間。 ? ?
Shuffle階段調(diào)優(yōu)
1)使用序列化KryoSerializer方式
spark支持使用kryo序列化機(jī)制。kryo序列化機(jī)制,比默認(rèn)的java序列化機(jī)制,速度要快,序列化后的數(shù)據(jù)要更小,大概是java序列化機(jī)制的1/10,所以kryo序列化優(yōu)化后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費(fèi)的內(nèi)存資源大大減少。
第一步,在sparkconf中設(shè)置:SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
第二步,注冊(cè)你使用到的,需要通過(guò)kryo序列化的一些自定義類: SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new ?Class[]{CategorySortKey.class})
2)設(shè)置合理并行度
調(diào)整并行度讓任務(wù)的數(shù)量和每個(gè)任務(wù)處理的數(shù)據(jù)與機(jī)器的處理能力達(dá)到最優(yōu)。 查看CPU使用情況和內(nèi)存占用情況,當(dāng)任務(wù)和數(shù)據(jù)不是平均分布在各節(jié)點(diǎn),而是集中在個(gè)別節(jié)點(diǎn)時(shí),可以增大并行度使任務(wù)和數(shù)據(jù)更均勻的分布在各個(gè)節(jié)點(diǎn)。增加任務(wù)的并行度,充分利用集群機(jī)器的計(jì)算能力,一般并行度設(shè)置為集群CPU總和的2-3倍。
設(shè)置平行度的方法:
(1)在會(huì)產(chǎn)生shuffle的操作函數(shù)內(nèi)設(shè)置并行度參數(shù),優(yōu)先級(jí)最高
? ? RDD. groupByKey(10);
(2)在代碼配置參數(shù)中設(shè)置并行度,優(yōu)先級(jí)次之
SparkConf spConf=new SparkConf().setMaster("local[4]")
? ? ? ? ? .set("spark.default.parallelism", "10")
(3)在spark-defaults.conf配置文件中配置,優(yōu)先級(jí)最低
spark.default.parallelism=10
3)使用廣播變量
原理:
Broadcast把數(shù)據(jù)集合分發(fā)到每一個(gè)節(jié)點(diǎn)上,Spark任務(wù)在執(zhí)行過(guò)程中要使用這個(gè)數(shù)據(jù)集合時(shí),就會(huì)在本地查找Broadcast過(guò)來(lái)的數(shù)據(jù)集合。如果不使用Broadcast,每次任務(wù)需要數(shù)據(jù)集合時(shí),都會(huì)把數(shù)據(jù)序列化到任務(wù)里面,不但耗時(shí),還使任務(wù)變得很大。
使用場(chǎng)景:
每個(gè)任務(wù)分片在執(zhí)行中都需要同一份數(shù)據(jù)集合時(shí),就可以把公共數(shù)據(jù)集Broadcast到每個(gè)節(jié)點(diǎn),讓每個(gè)節(jié)點(diǎn)在本地都保存一份。
大表和小表做join操作時(shí)可以把小表Broadcast到各個(gè)節(jié)點(diǎn),從而就可以把join操作轉(zhuǎn)變成普通的操作,減少了shuffle操作。
ArrayList list= new ArrayList();
? ? ? ?list.add("test");
? ? ? ?Broadcast bc= javaSparkContext.broadcast(list);
4)使用緩存
如果在應(yīng)用程序中多次使用同一個(gè)RDD,可以將該RDD緩存起來(lái),也就是把中間計(jì)算結(jié)果緩存起來(lái),避免每次迭代重計(jì)算。
常用的緩存方式:
MEMORY_ONLY_SER
? ?MEMORY_ONLY
? ?MEMORY_AND_DISK
? ?DISK_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY);
rdd.unpersist;
5)使用Checkpoint
checkpoint在spark中主要有兩塊應(yīng)用:一塊是在spark core中對(duì)RDD做checkpoint,可以切斷做checkpoint RDD的依賴關(guān)系,將RDD數(shù)據(jù)保存到可靠存儲(chǔ)(如HDFS)以便數(shù)據(jù)恢復(fù);另外一塊是應(yīng)用在spark streaming中,使用checkpoint用來(lái)保存DStreamGraph以及相關(guān)配置信息,以便在Driver崩潰重啟的時(shí)候能夠接著之前進(jìn)度繼續(xù)進(jìn)行處理(如之前waiting batch的job會(huì)在重啟后繼續(xù)處理)。
rdd.checkpoint() 并不會(huì)觸發(fā)計(jì)算,只有在遇到action方法后,才會(huì)觸發(fā)計(jì)算,在job執(zhí)行完畢后,會(huì)啟動(dòng)checkpoint計(jì)算,對(duì)這個(gè)rdd再次觸發(fā)一個(gè)job執(zhí)行checkpoint計(jì)算。所以在checkpoint前,對(duì)rdd做cache,可以避免checkpoint計(jì)算過(guò)程中重新根據(jù)rdd依賴鏈計(jì)算。
javaSparkContext.setCheckpointDir(pathName);
rdd.cache();
rdd.checkpoint();
6)設(shè)置spark.shuffle.memoryFraction
該參數(shù)用于設(shè)置shuffle過(guò)程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說(shuō),Executor默認(rèn)只有20%的內(nèi)存用來(lái)進(jìn)行該操作。shuffle操作在進(jìn)行聚合時(shí),如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制,那么多余的數(shù)據(jù)就會(huì)溢寫到磁盤文件中去,此時(shí)就會(huì)極大地降低性能。
7)開(kāi)啟consolidateFiles優(yōu)化
shuffle read的拉取過(guò)程是一邊拉取一邊進(jìn)行聚合的。每個(gè)shuffle read task都會(huì)有一個(gè)自己的buffer緩沖,每次都只能拉取與buffer緩沖(這個(gè)緩存大小可以通過(guò)上面的參數(shù)來(lái)設(shè)定)相同大小的數(shù)據(jù),然后通過(guò)內(nèi)存中的一個(gè)Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。一直循環(huán),直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
開(kāi)啟consolidate機(jī)制之后,在shuffle write過(guò)程中,task就不是為下游stage的每個(gè)task創(chuàng)建一個(gè)磁盤文件了。此時(shí)會(huì)出現(xiàn)shuffleFileGroup的概念,每個(gè)shuffleFileGroup會(huì)對(duì)應(yīng)一批磁盤文件,磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。一個(gè)Executor上有多少個(gè)CPU core,就可以并行執(zhí)行多少個(gè)task。而第一批并行執(zhí)行的每個(gè)task都會(huì)創(chuàng)建一個(gè)shuffleFileGroup,并將數(shù)據(jù)寫入對(duì)應(yīng)的磁盤文件內(nèi)。
當(dāng)Executor的CPU core執(zhí)行完一批task,接著執(zhí)行下一批task時(shí),下一批task就會(huì)復(fù)用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說(shuō),此時(shí)task會(huì)將數(shù)據(jù)寫入已有的磁盤文件中,而不會(huì)寫入新的磁盤文件中。因此,consolidate機(jī)制允許不同的task復(fù)用同一批磁盤文件,這樣就可以有效將多個(gè)task的磁盤文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量,進(jìn)而提升shuffle write的性能。
?new SparkConf().set("spark.shuffle.consolidateFiles", "true") 默認(rèn)為false。
MapPartitions分區(qū)替換map計(jì)算結(jié)果
使用mapPartitions,按每個(gè)分區(qū)計(jì)算結(jié)果
使用foreachPartitions替代foreach
原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。在實(shí)踐中發(fā)現(xiàn),foreachPartitions類的算子,對(duì)性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫Oracle,那么如果是普通的foreach算子,就會(huì)一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接,此時(shí)就勢(shì)必會(huì)頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫(kù)連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù),那么對(duì)于每個(gè)partition,只要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接即可,然后執(zhí)行批量插入操作,此時(shí)性能是比較高的。
設(shè)置num-executors參數(shù)
該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來(lái)執(zhí)行。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照設(shè)置來(lái)在集群的各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)相應(yīng)數(shù)量的Executor進(jìn)程。這個(gè)參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的。
該參數(shù)設(shè)置的太少,無(wú)法充分利用集群資源;設(shè)置的太多的話,大部分隊(duì)列可能無(wú)法給予充分的資源。建議該參數(shù)設(shè)置1-5。
設(shè)置executor-memory參數(shù)
該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見(jiàn)的JVM OOM異常也有直接的關(guān)聯(lián)。
針對(duì)數(shù)據(jù)交換的業(yè)務(wù)場(chǎng)景,建議本參數(shù)設(shè)置在512M及以下。
?設(shè)置executor-cores
該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。因?yàn)槊總€(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task線程,因此每個(gè)Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。
注意Collect的使用
collect操作會(huì)將Executor的數(shù)據(jù)發(fā)送到Driver端,因此使用collect前需要確保Driver端內(nèi)存足夠,以免Driver進(jìn)程發(fā)生OutOfMemory異常。當(dāng)不確定數(shù)據(jù)量大小時(shí),可使用saveAsTextFile等操作把數(shù)據(jù)寫入HDFS中。只有在能夠大致確定數(shù)據(jù)大小且driver內(nèi)存充足的時(shí)候,才能使用collect。
使用reduceByKey替換groupByKey
reduceByKey會(huì)在Map端做本地聚合,使得Shuffle過(guò)程更加平緩,而groupByKey等Shuffle操作不會(huì)在Map端做聚合。因此能使用reduceByKey的地方盡量使用該算子,避免出現(xiàn)groupByKey。
數(shù)據(jù)傾斜
當(dāng)數(shù)據(jù)發(fā)生傾斜,雖然沒(méi)有GC(Gabage Collection,垃圾回收),但是task執(zhí)行時(shí)間嚴(yán)重不一致。
需要重新設(shè)計(jì)key,以更小粒度的key使得task大小合理化。
修改并行度。
將HDFS上的文本格式數(shù)據(jù)轉(zhuǎn)換為Parquet格式數(shù)據(jù)
列式存儲(chǔ)布局查詢中只涉及到部分列,所以只需讀取這些列對(duì)應(yīng)的數(shù)據(jù)塊,而不需要讀取整個(gè)表的數(shù)據(jù),從而減少I/O開(kāi)銷。Parquet還支持靈活的壓縮選項(xiàng),可以顯著減少磁盤上的存儲(chǔ)。
Spark-sql的優(yōu)化
使用分區(qū)表
數(shù)據(jù)量在1GB以上的大表之間相互關(guān)聯(lián),或者對(duì)大表進(jìn)行聚合操作前,可以在建表時(shí)先對(duì)大表根據(jù)關(guān)聯(lián)字段或聚合字段進(jìn)行分區(qū)。這樣可以避免Shuffle操作,提高性能。
使用廣播
將小表BroadCast到各個(gè)節(jié)點(diǎn)上,從而轉(zhuǎn)變成非shuffle操作,提高任務(wù)執(zhí)行性能。
spark.sql.autoBroadcastJoin.Threshold= 10485760 ?//-1表示不廣播
使用重分區(qū)優(yōu)化小文件
df.repartition(5).write.mode(SaveMode.Append).saveAsTable("t1");
寬依賴和窄依賴
1)寬依賴:是指1個(gè)父RDD分區(qū)對(duì)應(yīng)多個(gè)子RDD的分區(qū)
如:groupByKey,reduceByKey,sortByKey,join 即使shuffle操作算子一般屬于寬依賴。
2)窄依賴:是指一個(gè)或多個(gè)父RDD分區(qū)對(duì)應(yīng)一個(gè)子RDD分區(qū)
如:map,filter,union,co-partioned join
即使:寬依賴就是1對(duì)多,窄依賴就是一對(duì)一或者多對(duì)一
Spark Shuffle
在Spark的中,兩個(gè)Stage之間就是shuffle,負(fù)責(zé)shuffle過(guò)程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發(fā)展有兩種實(shí)現(xiàn)的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager,Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。HashShuffleManager會(huì)產(chǎn)生很多中間小文件,SortShuffleManager相較于HashShuffleManager來(lái)說(shuō),有了一定的改進(jìn)。主要就在于,每個(gè)Task在進(jìn)行shuffle操作時(shí),雖然也會(huì)產(chǎn)生較多的臨時(shí)磁盤文件,但是最后會(huì)將所有的臨時(shí)文件合并(merge)成一個(gè)磁盤文件,因此每個(gè)Task就只有一個(gè)磁盤文件。在下一個(gè)stage的shuffle read task拉取自己的數(shù)據(jù)時(shí),只要根據(jù)索引讀取每個(gè)磁盤文件中的部分?jǐn)?shù)據(jù)即可。
Spark Stage的劃分
對(duì)RDD的操作分為transformation和action兩類,真正的作業(yè)提交運(yùn)行發(fā)生在action之后,調(diào)用action之后會(huì)將對(duì)原始輸入數(shù)據(jù)的所有transformation操作封裝成作業(yè)并向集群提交運(yùn)行。這個(gè)過(guò)程大致可以如下描述:
由DAGScheduler對(duì)RDD之間的依賴性進(jìn)行分析,通過(guò)DAG來(lái)分析各個(gè)RDD之間的轉(zhuǎn)換依賴關(guān)系
根據(jù)DAGScheduler分析得到的RDD依賴關(guān)系將Job劃分成多個(gè)stage
每個(gè)stage會(huì)生成一個(gè)TaskSet并提交給TaskScheduler,調(diào)度權(quán)轉(zhuǎn)交給TaskScheduler,由它來(lái)負(fù)責(zé)分發(fā)task到worker執(zhí)行
stage的劃分:
stage的劃分基于DAG確定依賴關(guān)系,將依賴鏈斷開(kāi),每個(gè)stage內(nèi)部可以并行運(yùn)行,整個(gè)作業(yè)按照stage順序依次執(zhí)行,最終完成整個(gè)Job。Spark利用依賴關(guān)系,調(diào)度器從DAG圖末端出發(fā),逆向遍歷整個(gè)依賴關(guān)系鏈,遇到ShuffleDependency(寬依賴關(guān)系的一種叫法)就斷開(kāi),遇到NarrowDependency(窄依賴)就將其加入到當(dāng)前stage。stage中task數(shù)目由stage末端的RDD分區(qū)個(gè)數(shù)來(lái)決定,RDD轉(zhuǎn)換是基于分區(qū)的一種粗粒度計(jì)算,一個(gè)stage執(zhí)行的結(jié)果就是這幾個(gè)分區(qū)構(gòu)成的RDD。
yarn-cluster和yarn-client模式
yarn-cluster和yarn-client模式的區(qū)別其實(shí)就是Application Master進(jìn)程的區(qū)別,yarn-cluster模式下,driver運(yùn)行在AM(Application Master)中,它負(fù)責(zé)向YARN申請(qǐng)資源,并監(jiān)督作業(yè)的運(yùn)行狀況。當(dāng)用戶提交了作業(yè)之后,就可以關(guān)掉Client,作業(yè)會(huì)繼續(xù)在YARN上運(yùn)行。然而yarn-cluster模式不適合運(yùn)行交互類型的作業(yè)。而yarn-client模式下,Application Master僅僅向YARN請(qǐng)求executor,client會(huì)和請(qǐng)求的container通信來(lái)調(diào)度他們工作,也就是說(shuō)Client不能離開(kāi)。
yarn-cluster適用于生產(chǎn)環(huán)境;而yarn-client適用于交互和調(diào)試。
Spark在Executor上的內(nèi)存分配
spark.serializer (default org.apache.spark.serializer.JavaSerializer )
? ? 建議設(shè)置為 org.apache.spark.serializer.KryoSerializer,因?yàn)镵ryoSerializer比JavaSerializer快,但是有可能會(huì)有些Object會(huì)序列化失敗,這個(gè)時(shí)候就需要顯示的對(duì)序列化失敗的類進(jìn)行KryoSerializer的注冊(cè),這個(gè)時(shí)候要配置spark.kryo.registrator參數(shù)
Spark的Executor內(nèi)存分配
Spark Executor有兩種內(nèi)存:
堆內(nèi)內(nèi)存:受JVM管理
堆外內(nèi)存:不受jvm管理
Executo堆內(nèi)內(nèi)存:
Spark在一個(gè)Executor中的內(nèi)存分為三塊,一塊是execution內(nèi)存,一塊是storage內(nèi)存,一塊是other內(nèi)存。
execution和storage是Spark Executor中內(nèi)存的大戶,other占用內(nèi)存相對(duì)少很多。在spark-1.6.0以前的版本,execution和storage的內(nèi)存分配是固定的,使用的參數(shù)配置分別是spark.shuffle.memoryFraction(execution內(nèi)存占Executor總內(nèi)存大小,default 0.2)和spark.storage.memoryFraction(storage內(nèi)存占Executor內(nèi)存大小,default 0.6),因?yàn)槭?.6.0以前這兩塊內(nèi)存是互相隔離的,這就導(dǎo)致了Executor的內(nèi)存利用率不高,而且需要根據(jù)Application的具體情況,使用者自己來(lái)調(diào)節(jié)這兩個(gè)參數(shù)才能優(yōu)化Spark的內(nèi)存使用。在spark-1.6.0以上的版本,execution內(nèi)存和storage內(nèi)存可以相互借用,提高了內(nèi)存的Spark中內(nèi)存的使用率,同時(shí)也減少了OOM的情況。
execution內(nèi)存是執(zhí)行內(nèi)存,文檔中說(shuō)join,aggregate都在這部分內(nèi)存中執(zhí)行,shuffle的數(shù)據(jù)也會(huì)先緩存在這個(gè)內(nèi)存中,滿了再寫入磁盤,能夠減少IO。其實(shí)map過(guò)程也是在這個(gè)內(nèi)存中執(zhí)行的。默認(rèn)總內(nèi)存的0.2,由Spark應(yīng)用程序啟動(dòng)時(shí)的–executor-memory或spark.executor.memory參數(shù)配置。
storage內(nèi)存是存儲(chǔ)broadcast,cache,persist數(shù)據(jù)的地方。默認(rèn)總內(nèi)存的0.6,通過(guò)spark.storage.storageFraction參數(shù)設(shè)置。
other內(nèi)存是程序執(zhí)行時(shí)預(yù)留給自己的內(nèi)存。默認(rèn)總內(nèi)存的0.2。
Executo堆外內(nèi)存:
在默認(rèn)情況下,堆外內(nèi)存并不啟用,可通過(guò)配置spark.memory.offHeap.enabled參數(shù)啟用,并由spark.memory.offHeap.size參數(shù)設(shè)定堆外空間的大小。堆外內(nèi)存主要存儲(chǔ)經(jīng)過(guò)序列化的二進(jìn)制數(shù)據(jù)。
堆外的空間分配較為簡(jiǎn)單,除了沒(méi)有 other空間,存儲(chǔ)內(nèi)存、執(zhí)行內(nèi)存的大小同樣是固定的,所有運(yùn)行中的并發(fā)任務(wù)共享存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存。
Job的劃分
1、Application :
應(yīng)用,創(chuàng)建一個(gè)SparkContext可以認(rèn)為創(chuàng)建了一個(gè)Application
2、Job;
在一個(gè)app中每執(zhí)行一次行動(dòng)算子就會(huì)創(chuàng)建一個(gè)Job,一個(gè)application會(huì)有多個(gè)job
3、stage;
階段,每碰到一個(gè)shuffle算子,會(huì)產(chǎn)生一個(gè)新的stage,一個(gè)Job中可以包含多個(gè)stage;
4、task
任務(wù),表示階段執(zhí)行的時(shí)候的并行度,一個(gè)stage會(huì)有多個(gè)task;
spark性能優(yōu)化
一:Spark的性能優(yōu)化,主要手段包括:
1、使用高性能序列化類庫(kù)
2、優(yōu)化數(shù)據(jù)結(jié)構(gòu)
3、對(duì)多次使用的RDD進(jìn)行持久化 / Checkpoint
4、使用序列化的持久化級(jí)別
5、Java虛擬機(jī)垃圾回收調(diào)優(yōu)
6、提高并行度
7、廣播共享數(shù)據(jù)
8、數(shù)據(jù)本地化
9、reduceByKey和groupByKey的合理使用
10、Shuffle調(diào)優(yōu)(核心中的核心,重中之重)
二:spark診斷內(nèi)存消耗
java主要的內(nèi)存消耗
1、每個(gè)Java對(duì)象,都有一個(gè)對(duì)象頭,會(huì)占用16個(gè)字節(jié),主要是包括了一些對(duì)象的元信息,比如指向它的類的指針。如果一個(gè)對(duì)象本身很小,比如就包括了一個(gè)int類型的field,那么它的對(duì)象頭實(shí)際上比對(duì)象自己還要大。 2、Java的String對(duì)象,會(huì)比它內(nèi)部的原始數(shù)據(jù),要多出40個(gè)字節(jié)。因?yàn)樗鼉?nèi)部使用char數(shù)組來(lái)保存內(nèi)部的字符序列的,并且還得保存諸如數(shù)組長(zhǎng)度之類的信息。而且因?yàn)镾tring使用的是UTF-16編碼,所以每個(gè)字符會(huì)占用2個(gè)字節(jié)。比如,包含10個(gè)字符的String,會(huì)占用60個(gè)字節(jié)。 3、Java中的集合類型,比如HashMap和LinkedList,內(nèi)部使用的是鏈表數(shù)據(jù)結(jié)構(gòu),所以對(duì)鏈表中的每一個(gè)數(shù)據(jù),都使用了Entry對(duì)象來(lái)包裝。Entry對(duì)象不光有對(duì)象頭,還有指向下一個(gè)Entry的指針,通常占用8個(gè)字節(jié)。 4、元素類型為原始數(shù)據(jù)類型(比如int)的集合,內(nèi)部通常會(huì)使用原始數(shù)據(jù)類型的包裝類型,比如Integer,來(lái)存儲(chǔ)元素。
怎么判斷程序消耗的內(nèi)存:
1、首先,自己設(shè)置RDD的并行度,有兩種方式:要不然,在parallelize()、textFile()等方法中,傳入第二個(gè)參數(shù),設(shè)置RDD的task / partition的數(shù)量;要不然,用SparkConf.set()方法,設(shè)置一個(gè)參數(shù),spark.default.parallelism,可以統(tǒng)一設(shè)置這個(gè)application所有RDD的partition數(shù)量。 2、其次,在程序中將RDD cache到內(nèi)存中,調(diào)用RDD.cache()方法即可。 3、最后,觀察Driver的log,你會(huì)發(fā)現(xiàn)類似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。這就顯示了每個(gè)partition占用了多少內(nèi)存。 4、將這個(gè)內(nèi)存信息乘以partition數(shù)量,即可得出RDD的內(nèi)存占用量。
三:spark高性能序列化庫(kù)
兩種序列化機(jī)制
spark默認(rèn)使用了第一種序列化機(jī)制: 1、Java序列化機(jī)制:默認(rèn)情況下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream機(jī)制進(jìn)行對(duì)象的序列化。只要你的類實(shí)現(xiàn)了Serializable接口,那么都是可以序列化的。而且Java序列化機(jī)制是提供了自定義序列化支持的,只要你實(shí)現(xiàn)Externalizable接口即可實(shí)現(xiàn)自己的更高性能的序列化算法。Java序列化機(jī)制的速度比較慢,而且序列化后的數(shù)據(jù)占用的內(nèi)存空間比較大。 2、Kryo序列化機(jī)制:Spark也支持使用Kryo類庫(kù)來(lái)進(jìn)行序列化。Kryo序列化機(jī)制比Java序列化機(jī)制更快,而且序列化后的數(shù)據(jù)占用的空間更小,通常比Java序列化的數(shù)據(jù)占用的空間要小10倍。Kryo序列化機(jī)制之所以不是默認(rèn)序列化機(jī)制的原因是,有些類型雖然實(shí)現(xiàn)了Seriralizable接口,但是它也不一定能夠進(jìn)行序列化;此外,如果你要得到最佳的性能,Kryo還要求你在Spark應(yīng)用程序中,對(duì)所有你需要序列化的類型都進(jìn)行注冊(cè)。
如何使用Kroyo序列
方式一:SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 方式二:如果要注冊(cè)自定義的類型,那么就使用如下的代碼,即可: Scala版本: val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[Counter] )) val sc = new SparkContext(conf) Java版本: SparkConf conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Counter.class) JavaSparkContext sc = new JavaSparkContext(conf)
?使用Kroyo序列的建議:
1、優(yōu)化緩存大小 如果注冊(cè)的要序列化的自定義的類型,本身特別大,比如包含了超過(guò)100個(gè)field。那么就會(huì)導(dǎo)致要序列化的對(duì)象過(guò)大。此時(shí)就需要對(duì)Kryo本身進(jìn)行優(yōu)化。因?yàn)镵ryo內(nèi)部的緩存可能不夠存放那么大的class對(duì)象。此時(shí)就需要調(diào)用SparkConf.set()方法,設(shè)置spark.kryoserializer.buffer.mb參數(shù)的值,將其調(diào)大。 默認(rèn)情況下它的值是2,就是說(shuō)最大能緩存2M的對(duì)象,然后進(jìn)行序列化。可以在必要時(shí)將其調(diào)大。比如設(shè)置為10。 2、預(yù)先注冊(cè)自定義類型 雖然不注冊(cè)自定義類型,Kryo類庫(kù)也能正常工作,但是那樣的話,對(duì)于它要序列化的每個(gè)對(duì)象,都會(huì)保存一份它的全限定類名。此時(shí)反而會(huì)耗費(fèi)大量?jī)?nèi)存。因此通常都建議預(yù)先注冊(cè)號(hào)要序列化的自定義的類。
使用場(chǎng)景:
首先,這里討論的都是Spark的一些普通的場(chǎng)景,一些特殊的場(chǎng)景,比如RDD的持久化,在后面會(huì)講解。這里先不說(shuō)。 那么,這里針對(duì)的Kryo序列化類庫(kù)的使用場(chǎng)景,就是算子函數(shù)使用到了外部的大數(shù)據(jù)的情況。比如說(shuō)吧,我們?cè)谕獠慷x了一個(gè)封裝了應(yīng)用所有配置的對(duì)象,比如自定義了一個(gè)MyConfiguration對(duì)象,里面包含了100m的數(shù)據(jù)。然后,在算子函數(shù)里面,使用到了這個(gè)外部的大對(duì)象。 此時(shí)呢,如果默認(rèn)情況下,讓Spark用java序列化機(jī)制來(lái)序列化這種外部的大對(duì)象,那么就會(huì)導(dǎo)致,序列化速度緩慢,并且序列化以后的數(shù)據(jù)還是比較大,比較占用內(nèi)存空間。 因此,在這種情況下,比較適合,切換到Kryo序列化類庫(kù),來(lái)對(duì)外部的大對(duì)象進(jìn)行序列化操作。一是,序列化速度會(huì)變快;二是,會(huì)減少序列化后的數(shù)據(jù)占用的內(nèi)存空間。
?四:Spark優(yōu)化數(shù)據(jù)結(jié)構(gòu)
目的:使用數(shù)據(jù)結(jié)構(gòu)是為了減少數(shù)據(jù)的占用量,從而減少內(nèi)存的開(kāi)銷。
優(yōu)化的對(duì)象:主要就是優(yōu)化你的算子函數(shù),內(nèi)部使用到的局部數(shù)據(jù),或者是算子函數(shù)外部的數(shù)據(jù)。都可以進(jìn)行數(shù)據(jù)結(jié)構(gòu)的優(yōu)化。優(yōu)化之后,都會(huì)減少其對(duì)內(nèi)存的消耗和占用。
優(yōu)化方式:
1、優(yōu)先使用數(shù)組以及字符串,而不是集合類。也就是說(shuō),優(yōu)先用array,而不是ArrayList、LinkedList、HashMap等集合。 比如,有個(gè)List<Integer> list = new ArrayList<Integer>(),將其替換為int[] arr = new int[]。這樣的話,array既比List少了額外信息的存儲(chǔ)開(kāi)銷,還能使用原始數(shù)據(jù)類型(int)來(lái)存儲(chǔ)數(shù)據(jù),比List中用Integer這種包裝類型存儲(chǔ)數(shù)據(jù),要節(jié)省內(nèi)存的多。 還比如,通常企業(yè)級(jí)應(yīng)用中的做法是,對(duì)于HashMap、List這種數(shù)據(jù),統(tǒng)一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()??梢詢?yōu)化為,特殊的字符串格式:id:name,address|id:name,address...。 2、避免使用多層嵌套的對(duì)象結(jié)構(gòu)。比如說(shuō),public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因?yàn)門eacher類的內(nèi)部又嵌套了大量的小Student對(duì)象。 比如說(shuō),對(duì)于上述例子,也完全可以使用特殊的字符串來(lái)進(jìn)行數(shù)據(jù)的存儲(chǔ)。比如,用json字符串來(lái)存儲(chǔ)數(shù)據(jù),就是一個(gè)很好的選擇。 {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]} 3、對(duì)于有些能夠避免的場(chǎng)景,盡量使用int替代String。因?yàn)镾tring雖然比ArrayList、HashMap等數(shù)據(jù)結(jié)構(gòu)高效多了,占用內(nèi)存量少多了,但是之前分析過(guò),還是有額外信息的消耗。比如之前用String表示id,那么現(xiàn)在完全可以用數(shù)字類型的int,來(lái)進(jìn)行替代。 這里提醒,在spark應(yīng)用中,id就不要用常用的uuid了,因?yàn)闊o(wú)法轉(zhuǎn)成int,就用自增的int類型的id即可。(sdfsdfdf-234242342-sdfsfsfdfd)
五:對(duì)多次使用的RDD進(jìn)行持久化操作 或 CheckPoint
對(duì)多次運(yùn)算的RDD進(jìn)行持久化或放到內(nèi)存,可以減少對(duì)重復(fù)計(jì)算的代價(jià);
如果要保證在RDD的持久化數(shù)據(jù)可能丟失的情況下,還要保證高性能,那么可以對(duì)RDD進(jìn)行Checkpoint操作。
對(duì)數(shù)據(jù)的持久化有多重級(jí)別:
除了對(duì)多次使用的RDD進(jìn)行持久化操作之外,還可以進(jìn)一步優(yōu)化其性能。因?yàn)楹苡锌赡埽琑DD的數(shù)據(jù)是持久化到內(nèi)存,或者磁盤中的。那么,此時(shí),如果內(nèi)存大小不是特別充足,完全可以使用序列化的持久化級(jí)別,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)這樣的語(yǔ)法即可。 這樣的話,將數(shù)據(jù)序列化之后,再持久化,可以大大減小對(duì)內(nèi)存的消耗。此外,數(shù)據(jù)量小了之后,如果要寫入磁盤,那么磁盤io性能消耗也比較小。 對(duì)RDD持久化序列化后,RDD的每個(gè)partition的數(shù)據(jù),都是序列化為一個(gè)巨大的字節(jié)數(shù)組。這樣,對(duì)于內(nèi)存的消耗就小的多了。但是唯一的缺點(diǎn)就是,獲取RDD數(shù)據(jù)時(shí),需要對(duì)其進(jìn)行反序列化,會(huì)增大其性能開(kāi)銷。 因此,對(duì)于序列化的持久化級(jí)別,還可以進(jìn)一步優(yōu)化,也就是說(shuō),使用Kryo序列化類庫(kù),這樣,可以獲得更快的序列化速度,并且占用更小的內(nèi)存空間。但是要記住,如果RDD的元素(RDD<T>的泛型類型),是自定義類型的話,在Kryo中提前注冊(cè)自定義類型。
六:JVM虛擬機(jī)垃圾回收
主要是創(chuàng)建少量的對(duì)象,以及創(chuàng)建對(duì)象的大小。編程中避免大對(duì)象。
還有一些jvm的通用方法。都是通用的,可以參考一些通用方法。
七:提高并行度
實(shí)際上Spark集群的資源并不一定會(huì)被充分利用到,所以要盡量設(shè)置合理的并行度,來(lái)充分地利用集群的資源。才能充分提高Spark應(yīng)用程序的性能。
Spark會(huì)自動(dòng)設(shè)置以文件作為輸入源的RDD的并行度,依據(jù)其大小,比如HDFS,就會(huì)給每一個(gè)block創(chuàng)建一個(gè)partition,也依據(jù)這個(gè)設(shè)置并行度。對(duì)于reduceByKey等會(huì)發(fā)生shuffle的操作,就使用并行度最大的父RDD的并行度即可。
可以手動(dòng)使用textFile()、parallelize()等方法的第二個(gè)參數(shù)來(lái)設(shè)置并行度;也可以使用spark.default.parallelism參數(shù),來(lái)設(shè)置統(tǒng)一的并行度。
比如說(shuō),spark-submit設(shè)置了executor數(shù)量是10個(gè),每個(gè)executor要求分配2個(gè)core,那么application總共會(huì)有20個(gè)core。此時(shí)可以設(shè)置new SparkConf().set("spark.default.parallelism", "60")來(lái)設(shè)置合理的并行度,從而充分利用資源。
官方建議設(shè)置的并行數(shù)量為2-3倍的cpu cores的數(shù)量,這樣可以使一些計(jì)算能力較弱的cpu少計(jì)算一些數(shù)據(jù)。能力好的cpu計(jì)算多一些數(shù)據(jù)。
八:廣播共享文件
如果你的算子函數(shù)中,使用到了特別大的數(shù)據(jù),那么,這個(gè)時(shí)候,推薦將該數(shù)據(jù)進(jìn)行廣播。這樣的話,就不至于將一個(gè)大數(shù)據(jù)拷貝到每一個(gè)task上去。而是給每個(gè)節(jié)點(diǎn)拷貝一份,然后節(jié)點(diǎn)上的task共享該數(shù)據(jù)。
這樣的話,就可以減少大數(shù)據(jù)在節(jié)點(diǎn)上的內(nèi)存消耗。并且可以減少數(shù)據(jù)到節(jié)點(diǎn)的網(wǎng)絡(luò)傳輸消耗。
九:數(shù)據(jù)本地化
?基于移動(dòng)計(jì)算的成本要遠(yuǎn)遠(yuǎn)小于移動(dòng)數(shù)據(jù)的原則。
數(shù)據(jù)本地化級(jí)別:
數(shù)據(jù)本地化,指的是,數(shù)據(jù)離計(jì)算它的代碼有多近。基于數(shù)據(jù)距離代碼的距離,有幾種數(shù)據(jù)本地化級(jí)別: 1、PROCESS_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在同一個(gè)JVM進(jìn)程中。 2、NODE_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在一個(gè)節(jié)點(diǎn)上,但是不在一個(gè)進(jìn)程中,比如在不同的executor進(jìn)程中,或者是數(shù)據(jù)在HDFS文件的block中。 3、NO_PREF:數(shù)據(jù)從哪里過(guò)來(lái),性能都是一樣的。 4、RACK_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在一個(gè)機(jī)架上。 5、ANY:數(shù)據(jù)可能在任意地方,比如其他網(wǎng)絡(luò)環(huán)境內(nèi),或者其他機(jī)架上。
優(yōu)化方案:
Spark傾向于使用最好的本地化級(jí)別來(lái)調(diào)度task,但是這是不可能的。如果沒(méi)有任何未處理的數(shù)據(jù)在空閑的executor上,那么Spark就會(huì)放低本地化級(jí)別。這時(shí)有兩個(gè)選擇:第一,等待,直到executor上的cpu釋放出來(lái),那么就分配task過(guò)去;第二,立即在任意一個(gè)executor上啟動(dòng)一個(gè)task。 Spark默認(rèn)會(huì)等待一會(huì)兒,來(lái)期望task要處理的數(shù)據(jù)所在的節(jié)點(diǎn)上的executor空閑出一個(gè)cpu,從而將task分配過(guò)去。只要超過(guò)了時(shí)間,那么Spark就會(huì)將task分配到其他任意一個(gè)空閑的executor上。 可以設(shè)置參數(shù),spark.locality系列參數(shù),來(lái)調(diào)節(jié)Spark等待task可以進(jìn)行數(shù)據(jù)本地化的時(shí)間。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。
十: groupByKey 和 ReduceByKey
如果能用reduceByKey,那就用reduceByKey,因?yàn)樗鼤?huì)在map端,先進(jìn)行本地combine,可以大大減少要傳輸?shù)絩educe端的數(shù)據(jù)量,減小網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷。
只有在reduceByKey處理不了時(shí),才用groupByKey().map()來(lái)替代。
十一: shuffle優(yōu)化
了解下shuffle的過(guò)程:
優(yōu)化參數(shù):
new SparkConf().set("spark.shuffle.consolidateFiles", "true") spark.shuffle.consolidateFiles:是否開(kāi)啟shuffle block file的合并,默認(rèn)為false spark.reducer.maxSizeInFlight:reduce task的拉取緩存,默認(rèn)48m spark.shuffle.file.buffer:map task的寫磁盤緩存,默認(rèn)32k spark.shuffle.io.maxRetries:拉取失敗的最大重試次數(shù),默認(rèn)3次 spark.shuffle.io.retryWait:拉取失敗的重試間隔,默認(rèn)5s spark.shuffle.memoryFraction:用于reduce端聚合的內(nèi)存比例,默認(rèn)0.2,超過(guò)比例就會(huì)溢出到磁盤上
1-?spark.shuffle.consolidateFiles參數(shù)優(yōu)化
沒(méi)有開(kāi)啟consolidation機(jī)制的時(shí)候,shuffle write的性能是比較低下的,而且會(huì)影響到shuffle read的性能,也會(huì)比較低下。
因?yàn)樵趕huffle map端創(chuàng)建的磁盤文件太多了,導(dǎo)致shuffle write要耗費(fèi)大量的性能到磁盤文件的創(chuàng)建,以及磁盤的io上。對(duì)于shuffle read,也是一樣的,每個(gè)result task可能都需要通過(guò)磁盤io讀取多個(gè)文件的數(shù)據(jù),都只shuffle read,性能可能也會(huì)受到影響。做主要的還是shuffle write,因?yàn)橐獙懙拇疟P文件太多。
比如每個(gè)節(jié)點(diǎn)有100個(gè)shuffle map task,10個(gè)CPU core是,總共有1000個(gè)result task。所以,每個(gè)節(jié)點(diǎn)上的磁盤文件為100*1000個(gè)。
設(shè)置為true時(shí),每個(gè)cpu為每個(gè)result task寫一個(gè)文件(文件內(nèi)容是之前的數(shù)據(jù)進(jìn)行合并的結(jié)果),每個(gè)節(jié)點(diǎn)上的磁盤文件為10*1000個(gè)。
2-?spark.reducer.maxSizeInFlight
如果內(nèi)存足夠的話,這個(gè)量應(yīng)該增大,這樣,result task拉取的次數(shù)會(huì)減少(每次拉取數(shù)據(jù)量增加)。
3-?spark.shuffle.file.buffer
可以適量增大,這樣每次寫入到文件的數(shù)據(jù)量減少,從而減少寫文件的次數(shù)。
4-?spark.shuffle.io.maxRetries
拉取數(shù)據(jù)的時(shí)候,可能jvm在full GC。
5-?spark.shuffle.io.retryWait
可以適當(dāng)增加時(shí)間。為了應(yīng)對(duì)jvm 的full GC。
6-?spark.shuffle.memoryFraction
可以適當(dāng)?shù)恼{(diào)大。
執(zhí)行reduce task的Excetor中,有一部分內(nèi)存是用來(lái)匯聚各個(gè)reduce task拉取的數(shù)據(jù),放到map集合中,進(jìn)行聚合。
當(dāng)該數(shù)據(jù)超過(guò)總緩存*比例時(shí),會(huì)把該內(nèi)存的數(shù)據(jù)寫入到磁盤上。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-500445.html
7- 如果jvm GC沒(méi)有調(diào)優(yōu)好,會(huì)導(dǎo)致每次gc都需要1min。那么拉取的最大默認(rèn)時(shí)間為3*5s=15s。就會(huì)導(dǎo)致頻繁的很多文件拉取失敗。會(huì)報(bào)shuffle output file lost。然后DAGScheduler會(huì)重試task和stage。最后甚至導(dǎo)致application掛掉。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-500445.html
到了這里,關(guān)于spark 數(shù)據(jù)傾斜處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!