Spark3 AQE
一、 背景
Spark 2.x 在遇到有數(shù)據(jù)傾斜的任務(wù)時(shí),需要人為地去優(yōu)化任務(wù),比較費(fèi)時(shí)費(fèi)力;如果任務(wù)在Reduce階段,Reduce Task 數(shù)據(jù)分布參差不齊,會(huì)造成各個(gè)excutor節(jié)點(diǎn)資源利用率不均衡,影響任務(wù)的執(zhí)行效率;Spark 3新特性AQE極大地優(yōu)化了以上任務(wù)的執(zhí)行效率。
二、 Spark 為什么需要AQE? (Why)
RBO(Rule Based Optimization,基于規(guī)則的優(yōu)化),它往往基于一些規(guī)則和策略實(shí)現(xiàn),如謂詞下推、列剪枝,這些規(guī)則和策略來(lái)源于數(shù)據(jù)庫(kù)領(lǐng)域已有的應(yīng)用經(jīng)驗(yàn)。RBO實(shí)際上算是一種經(jīng)驗(yàn)主義。
經(jīng)驗(yàn)主義的弊端就是對(duì)待相似的問(wèn)題和場(chǎng)景都使用同一類(lèi)套路。Spark 社區(qū)正是因?yàn)橐庾R(shí)到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的優(yōu)化)。
CBO 是基于數(shù)據(jù)表的統(tǒng)計(jì)信息(如表大小、數(shù)據(jù)列分布)來(lái)選擇優(yōu)化策略。CBO 支持的統(tǒng)計(jì)信息很豐富,比如數(shù)據(jù)表的行數(shù)、每列的基數(shù)(Cardinality)、空值數(shù)、最大值、最小值等。因?yàn)橛薪y(tǒng)計(jì)數(shù)據(jù)做支持,所以 CBO 選擇的優(yōu)化策略往往優(yōu)于 RBO 選擇的優(yōu)化規(guī)則。
但是,CBO 也有三個(gè)方面的不足:
-
適用面太窄,CBO 僅支持注冊(cè)到 Hive Metastore 的數(shù)據(jù)表,但在其他的應(yīng)用場(chǎng)景中,數(shù)據(jù)源往往是存儲(chǔ)在分布式文件系統(tǒng)的各類(lèi)文件,如 Parquet、ORC、CSV 等。
-
統(tǒng)計(jì)信息的搜集效率比較低。對(duì)于注冊(cè)到 Hive Metastore 的數(shù)據(jù)表,開(kāi)發(fā)者需要調(diào)用 ANALYZE TABLE COMPUTE STATISTICS 語(yǔ)句收集統(tǒng)計(jì)信息,而各類(lèi)信息的收集會(huì)消耗大量時(shí)間。
-
靜態(tài)優(yōu)化,RBO、CBO執(zhí)行計(jì)劃一旦制定完成,就會(huì)按照該計(jì)劃堅(jiān)定不移地執(zhí)行;如果在運(yùn)行時(shí)數(shù)據(jù)分布發(fā)生動(dòng)態(tài)變化,先前制定的執(zhí)行計(jì)劃并不會(huì)跟著調(diào)整、適配。
基于CBO的執(zhí)行計(jì)劃
- Spark parses the query and creates the Unresolved Logical Plan 創(chuàng)建Unresolved Logical Plan
- Validates the syntax of the query. 驗(yàn)證語(yǔ)法
- Doesn’t validate the semantics meaning column name existence, data types. 不驗(yàn)證語(yǔ)義、字段是否存在、數(shù)據(jù)類(lèi)型
- Analysis: Using the Catalyst, it converts the Unresolved Logical Plan to Resolved Logical Plan a.k.a Logical Plan. 轉(zhuǎn)換為L(zhǎng)ogical Plan
- The catalog contains the column names and data types, during this step, it validates the columns mentioned in a query with catalog.
- Optimization: Converts Logical Plan into Optimized Logical Plan. 轉(zhuǎn)換為 Optimized Logical Plan
- Planner: Now it creates One or More Physical Plans from an optimized Logical plan. 創(chuàng)建一個(gè)或多個(gè)Physical Plans
- Cost Model: In this phase, calculates the cost for each Physical plan and select the Best Physical Plan. CBO擇優(yōu)
- RDD Generation: RDD’s are generated, this is the final phase of query optimization which generates RDD in Java bytecode.
三、 AQE 到底是什么?(What)
考慮到 RBO 和 CBO 的種種限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自適應(yīng)查詢(xún)執(zhí)行)。用一句話(huà)來(lái)概括,AQE 是 Spark SQL 的一種動(dòng)態(tài)優(yōu)化機(jī)制,在運(yùn)行時(shí),每當(dāng) Shuffle Map 階段執(zhí)行完畢,AQE 都會(huì)結(jié)合這個(gè)階段的統(tǒng)計(jì)信息,基于既定的規(guī)則動(dòng)態(tài)地調(diào)整、修正尚未執(zhí)行的邏輯計(jì)劃和物理計(jì)劃,來(lái)完成對(duì)原始查詢(xún)語(yǔ)句的運(yùn)行時(shí)優(yōu)化。
AQE 賴(lài)以?xún)?yōu)化的統(tǒng)計(jì)信息與 CBO 不同,這些統(tǒng)計(jì)信息并不是關(guān)于某張表或是哪個(gè)列,而是 Shuffle Map 階段輸出的中間文件。
基于A(yíng)QE的執(zhí)行計(jì)劃
四、AQE怎么用?(How)
AQE三大特性:自動(dòng)分區(qū)合并 、自動(dòng)數(shù)據(jù)傾斜處理、Join 策略調(diào)整
4.1 自動(dòng)分區(qū)合并
在 Shuffle 過(guò)后,Reduce Task 數(shù)據(jù)分布參差不齊,AQE 將自動(dòng)合并過(guò)小的數(shù)據(jù)分區(qū)。
那么AQE是如何確定多小的分區(qū)需要合并,以及分區(qū)合并到多大時(shí)停止合并?
對(duì)于所有的數(shù)據(jù)分區(qū),無(wú)論大小,AQE 按照分區(qū)編號(hào)從左到右進(jìn)行掃描,邊掃描邊記錄分區(qū)尺寸,當(dāng)相鄰分區(qū)的尺寸之和大于 “推薦尺寸”時(shí),AQE 就把這些掃描過(guò)的分區(qū)進(jìn)行合并。然后,繼續(xù)向右掃描,并采用同樣的算法,按照目標(biāo)尺寸合并剩余分區(qū),直到所有分區(qū)都處理完畢。
“推薦尺寸”是由spark.sql.adaptive.advisoryPartitionSizeInBytes
設(shè)置
配置 | 說(shuō)明 |
---|---|
spark.sql.adaptive.coalescePartitions.enabled |
When true and ‘spark.sql.adaptive.enabled’ is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’), to avoid too many small tasks. 開(kāi)啟分區(qū)自動(dòng)合并,默認(rèn)開(kāi)啟 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
The advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition. 分區(qū)合并后的推薦尺寸,默認(rèn)為64M |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
The minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing. 可合并分區(qū)尺寸大小,默認(rèn)為1M |
假設(shè)推薦尺寸為100M,shuffle后每一個(gè)分區(qū)的大小為70M、30M、80M、90M、10M、20M
按照正常情況(順序處理),會(huì)啟動(dòng)4個(gè)reduce task:
第一個(gè)處理:70M、30M
第二個(gè)處理:80M
第三個(gè)處理:90M
第四個(gè)處理:10M、20M
spark3.0版本按照上述情況合并之后,各分區(qū)數(shù)據(jù)還是出現(xiàn)了不均衡,從而導(dǎo)致后續(xù)計(jì)算出現(xiàn)小的數(shù)據(jù)傾斜
查看spark 3.2官網(wǎng)新增了 spark.sql.adaptive.coalescePartitions.minPartitionSize
(合并分區(qū)后,最小分區(qū)尺寸),如果把該參數(shù)設(shè)置為和推薦尺寸一致,那是不是只會(huì)啟動(dòng)3個(gè) reduce task,3個(gè)都處理100M的數(shù)據(jù)?(個(gè)人猜想,不是按照順序合并,而是會(huì)先遍歷分區(qū)大小,保證合并后的分區(qū)大小相近)
You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via spark.sql.adaptive.coalescePartitions.initialPartitionNum
configuration. 官方建議:分區(qū)數(shù)不用設(shè)置,spark會(huì)自動(dòng)設(shè)置合適的分區(qū)。
4.2 自動(dòng)數(shù)據(jù)傾斜處理
AQE 自動(dòng)拆分 Reduce 階段過(guò)大的數(shù)據(jù)分區(qū),降低單個(gè)Reduce Task 的工作負(fù)載。
AQE 如何判定數(shù)據(jù)分區(qū)是否傾斜呢?它又是怎么把大分區(qū)拆分成多個(gè)小分區(qū)的?
配置 | 說(shuō)明 |
---|---|
spark.sql.adaptive.skewJoin.enabled |
When true and ‘spark.sql.adaptive.enabled’ is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions. 開(kāi)啟AQE自動(dòng)數(shù)據(jù)傾斜處理 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes’ 判定數(shù)據(jù)分區(qū)是否傾斜,默認(rèn)值為5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionFactor’ multiplying the median partition size. Ideally this config should be set larger than ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’. 判定數(shù)據(jù)分區(qū)是否傾斜,默認(rèn)值為256M |
判定傾斜分區(qū):大于 median partition size * spark.sql.adaptive.skewJoin.skewedPartitionFactor 且 大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
假設(shè)數(shù)據(jù)表 A 有 3 個(gè)分區(qū),分區(qū)大小分別是80MB、100MB 和 512MB。這些分區(qū)按大小個(gè)排序后的中位數(shù)是 100MB,因?yàn)?/p>
skewedPartitionFactor 的默認(rèn)值是 5 ,skewedPartitionThresholdInBytes默認(rèn)值是256M,512M大于 100MB * 5 = 500MB且大于256M 的分區(qū)被判定為傾斜分區(qū)。
拆分傾斜分區(qū):上述例子512M分區(qū)會(huì)被拆分 512/256 =2個(gè)分區(qū)
4.3 Join 策略調(diào)整
如果某張表在過(guò)濾之后,尺寸小于廣播變量閾值,這張表參與的數(shù)據(jù)關(guān)聯(lián)就會(huì)從 Shuffle Sort Merge Join 降級(jí)(Demote)為執(zhí)行效率更高的 Broadcast Hash Join。
Join 策略調(diào)整指的就是 Spark SQL在運(yùn)行時(shí)動(dòng)態(tài)地將原本的 Shuffle Join 策略,調(diào)整為執(zhí)行更加高效的 Broadcast Join。
具體來(lái)說(shuō),每當(dāng) DAG 中的 Map 階段執(zhí)行完畢,Spark SQL 就會(huì)結(jié)合 Shuffle 中間文件的統(tǒng)計(jì)信息,重新計(jì)算 Reduce 階段數(shù)據(jù)表的存儲(chǔ)大小。如果發(fā)現(xiàn)基表尺寸小于廣播閾值, 那么 Spark SQL 就把下一階段的 Shuffle Join 調(diào)整為 Broadcast Join。
Broadcast Join 以廣播的方式將小表的全量數(shù)據(jù)分發(fā)到集群中所有的 Executors,大表的數(shù)據(jù)就可以與小表數(shù)據(jù)在Process local級(jí)別進(jìn)行關(guān)聯(lián)操作。本地性級(jí)別有 4 種:Process local < Node local < Rack local < Any。
配置 | 說(shuō)明 |
---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. 可廣播的表尺寸閾值,默認(rèn)10M |
五、對(duì)比驗(yàn)證
spark3.2 開(kāi)啟AQE
http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5390/
spark3.2 關(guān)閉AQE
http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5812/
spark2.2 無(wú)AQE
http://szzb-bg-uat-etl-11:18080/history/application_1665940579703_9257/
5.1 執(zhí)行耗時(shí)
使用資源: --driver-memory 6g --executor-memory 6g --executor-cores 6
耗時(shí) | |
---|---|
spark3.2 開(kāi)啟AQE | 4.3 min |
spark3.2 關(guān)閉AQE | 6.7 min |
spark2.2 無(wú)AQE | > 9.6min (OOM) |
5.2 自動(dòng)分區(qū)合并
5.3 自動(dòng)數(shù)據(jù)傾斜處理
查看各stage執(zhí)行時(shí)間
-
spark3.2 開(kāi)啟AQE 每個(gè)stage執(zhí)行時(shí)間相差不大 (需要看每個(gè)stage的tasks )
-
spark2.2 無(wú)AQE 每個(gè)stage執(zhí)行時(shí)間相差較大 (需要看每個(gè)stage的tasks )
查看執(zhí)行時(shí)間最長(zhǎng)的stage數(shù)據(jù)分布
-
spark3.2 開(kāi)啟AQE Shuffle Read/Write 數(shù)據(jù)較均衡
-
spark2.2 無(wú)AQE Shuffle Read/Write 數(shù)據(jù)不均衡
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-620140.html
六、結(jié)論
spark3.2.2 開(kāi)啟AQE(默認(rèn)開(kāi)啟),當(dāng)Reduce Task 數(shù)據(jù)分布參差不齊時(shí),能夠自動(dòng)合并過(guò)小的數(shù)據(jù)分區(qū);且在 Reduce 階段存在數(shù)據(jù)傾斜的情況下,能夠拆分大分區(qū);通過(guò)對(duì)比執(zhí)行時(shí)間,AQE能極大的提升任務(wù)的執(zhí)行效率。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-620140.html
到了這里,關(guān)于Spark3 新特性之AQE的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!