1.背景
要了解spark參數(shù)調優(yōu),首先需要清楚一部分背景資料Spark SQL的執(zhí)行原理,方便理解各種參數(shù)對任務的具體影響。
一條SQL語句生成執(zhí)行引擎可識別的程序,解析(Parser)、優(yōu)化(Optimizer)、執(zhí)行(Execution) 三大過程。其中Spark SQL 解析和優(yōu)化如下圖
-
Parser模塊:未解析的邏輯計劃,將SparkSql字符串解析為一個抽象語法樹/AST。語法檢查,不涉及表名字段。
-
Analyzer模塊:解析后的邏輯計劃,該模塊會遍歷整個AST,并對AST上的每個節(jié)點進行數(shù)據(jù)類型的綁定以及函數(shù)綁定,然后根據(jù)元數(shù)據(jù)信息Catalog對數(shù)據(jù)表中的字段和基本函數(shù)進行解析。
-
Optimizer模塊:該模塊是Catalyst的核心,主要分為RBO和CBO兩種優(yōu)化策略,其中RBO是基于規(guī)則優(yōu)化(謂詞下推(Predicate Pushdown) 、常量累加(Constant Folding) 、列值裁剪(Column Pruning)),CBO是基于代價優(yōu)化。
-
SparkPlanner模塊:優(yōu)化后的邏輯執(zhí)行計劃OptimizedLogicalPlan依然是邏輯的,并不能被Spark系統(tǒng)理解,此時需要將OptimizedLogicalPlan轉換成physical plan(物理計劃),如join算子BroadcastHashJoin、ShuffleHashJoin以及SortMergejoin 。
-
CostModel模塊:主要根據(jù)過去的性能統(tǒng)計數(shù)據(jù),選擇最佳的物理執(zhí)行計劃。這個過程的優(yōu)化就是CBO(基于代價優(yōu)化)。
在實際Spark執(zhí)行完成一個數(shù)據(jù)生產(chǎn)任務(執(zhí)行一條SQL)的基本過程:
(1)對SQL進行語法分析,生成邏輯執(zhí)行計劃
(2)從Hive metastore server獲取表信息,結合邏輯執(zhí)行計劃生成并優(yōu)化物理執(zhí)行計劃
(3)根據(jù)物理執(zhí)行計劃向Yarn申請資源(executor),調度task到executor執(zhí)行。
(4)從HDFS讀取數(shù)據(jù),任務執(zhí)行,任務執(zhí)行結束后將數(shù)據(jù)寫回HDFS。
上述運行過程
過程 (2)主要是driver的處理能力
過程 (3)主要是executor 、driver的處理能力、作業(yè)運行行為
本文從作業(yè)的運行過程(2)(3)各選擇一個參數(shù)介紹從而了解運行過程。
目前的spark參數(shù)以及相關生態(tài)的參數(shù)列表幾百個:
Hadoop參數(shù):https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
hive參數(shù):Configuration Properties - Apache Hive - Apache Software Foundation
spark參數(shù):spark 配置參數(shù) Configuration - Spark 3.5.0 Documentation
? ? ? ? ? ? ? ? ? ? ?spark 優(yōu)化參數(shù) Performance Tuning - Spark 3.5.0 Documentation
? ? ? ? ? ? ? ? ? ? ?spark 執(zhí)行參數(shù) Spark SQL and DataFrames - Spark 2.0.0 Documentation
? ? ? ? ? ? ? ? ? ? ?各個公司自定義參數(shù):set spark.sql.insertRebalancePartitionsBeforeWrite.enabled = true
其他網(wǎng)上參考的參數(shù):Hive常用參數(shù)總結-CSDN博客
參數(shù)列表
參數(shù)類型 |
參數(shù) |
設置值 |
描述 |
資源利用 |
spark.driver.memory |
5g |
--driver-memory 5G 每個exector的內存大小,后綴"k", "m", "g" or "t" |
input split |
spark.hadoop.hive.exec.orc.split.strategy spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; spark.hadoop.mapreduce.input.fileinputformat.split.minsize; |
BI 、ETL 、HYBRID |
|
shuffle |
spark.sql.shuffle.partitions |
200 |
|
spark.default.parallelism |
80, 100, 200, 300 |
||
join |
1.spark.hadoop.hive.exec.orc.split.strategy 參數(shù)
? ? ? ? ? ? 1. 參數(shù)作用:參數(shù)控制在讀取ORC表時生成split的策略,影響任務執(zhí)行時driver壓力和mapper 數(shù)量。
? ? ? ? ? ? 2. 參數(shù)介紹 : 參數(shù)來源于hive ?:hive.exec.orc.split.strategy官方定義如下圖,當任務執(zhí)行開始時,ORC有三種分割文件的策略 BI 、ETL 、HYBRID(默認)
HYBRID模式:文件數(shù)過多和文件小的場景下,當文件數(shù)大于mapper count (總文件大小/hadoop默認分割大小128M) 且文件大小小于HDFS默認(128M)的大小。
ETL:生成分割文件之前首先讀取ORC文件的footer(存儲文件信息的文件),
BI: 直接分割文件,沒有訪問HDFS上的數(shù)據(jù)。
ORC文件的footer是什么?
? ORC 文件原理:全稱 Optimized Row Columnar?1.ORC是一個文件格式比較高效的讀取、寫入、處理hive數(shù)據(jù)。(我之前理解是一個高效壓縮文件)。2.序列化和壓縮: intger和String 序列化。按照文件塊增量的壓縮。
文件結構:三級結構:stripes 存在具體的數(shù)據(jù)行組(索引、數(shù)據(jù)行、stripe footer 的信息),file footer 文件的輔助信息(stripe的列表、每個stripe行數(shù)、列的數(shù)據(jù)類型、列上聚合信息 最大值最小值),psotscipt 文件的壓縮參數(shù)和壓縮后的大小。
? ? ? ?3.使用方法和場景: 因此ETL模式下讀取的file footer是每個orc文件塊的輔助信息。對于一些較大的ORC表,footer可能非常大,ETL模式下讀取大量hdfs的數(shù)據(jù)信息切分文件,導致driver的開銷壓力過大,這種情況適用BI模式比較合適。
? ? 一些配合使用參數(shù) 如:spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;?spark.hadoop.mapreduce.input.fileinputformat.split.minsize; map輸入最小最大分割塊,maxsize 和minsize在輸入端控制ORC文件的分割合并。當spark 從hive表中讀取數(shù)據(jù)是會創(chuàng)建一個HadoopRDD的實例,HadoopRDD根據(jù)computeSplitSize方法分割文件(org.apache.hadoop.mapreduce.lib.input.FileInputFormat?)?Math.max(minSize, Math.min(maxSize, blockSize) 源代碼Source code,因此文件表的小文件過多3M大小,根據(jù)公式一個小文件就是一個split分割生成大量的patitions,導致tasks數(shù)量就巨大,整個任務性能瓶頸可能在讀取資源數(shù)據(jù)緩解。
文件分割源碼
? ?spark.sql.files.maxPartitionBytes ?單partition的最大字節(jié)數(shù), 為了防止把已經(jīng)設置好的分割塊再次合并,可以將 set spark.hadoopRDD.targetBytesInPartition=-1。
2.spark.sql.shuffle.partitions
? ? 參數(shù)作用: 在任務有shuffle時候(join或者聚合場景下)控制partitions的數(shù)量。
? ? 參數(shù)介紹:
Property Name |
Default |
meaning |
鏈接 |
翻譯 |
不同點 |
共同點 |
spark.sql.shuffle.partitions |
200 |
Configures the number of partitions to use when shuffling data for joins or aggregations. |
Spark SQL and DataFrames - Spark 2.0.0 Documentation |
Spark SQL中shuffle過程中Partition的數(shù)量 |
僅適用于DataFrame ,group By, join 觸發(fā)數(shù)據(jù)shuffle,因此這些數(shù)據(jù)轉換后的結果會導致分區(qū)大小需要通過Spark.sql.shuffle.partitions 中設置的值。 |
配置shuffle partitions 的數(shù)量 |
spark.default.parallelism |
For distributed shuffle operations like?reduceByKeyand?join, the largest number of partitions in a parent RDD. For operations like?parallelizewith no parent RDDs, it depends on the cluster manager:
|
Default number of partitions in RDDs returned by transformations like?join,?reduceByKey, and?parallelize?when not set by user. |
Configuration - Spark 3.5.0 Documentation |
1.reduceByKey 1.若當前RDD執(zhí)行shuffle操算子如reducebykey 和join ,則為在父RDD中最大的partition數(shù)。 |
spark.default.parallelism 是隨 RDD 引入的,當用戶未設置時候,返回reduceByKey(), groupByKey(), join() 轉換的默認分區(qū)數(shù),僅適用于RDD。 |
參數(shù)用法:在提交作業(yè)的通過 --conf 來修改這兩個設置的值,方法如下:或者
? ? ? ? ?spark-submit --conf spark.sql.shuffle.partitions=300?--conf spark.default.parallelism=300
? ? ? ? ? ? ? ? ? ? ?sqlContext.setConf("spark.sql.shuffle.partitions", "300")
? ? ? ? ? ? ? ? ? ? ?sqlContext.setConf("spark.default.parallelism", "300”)
參數(shù)介紹2.0:chatGPT3.5 的答案
? ? ?理解spark的并行度:
-
?資源的并行 ?exector數(shù)和cpu core數(shù)
-
?數(shù)據(jù)的并行 ?spark作業(yè)在各個stage的task 的數(shù)量是并行執(zhí)行,task數(shù)量設置成Spark Application總CPU core數(shù)量的2~3倍,同時盡量提升Spark運行效率和速度;
? ??
? ? ?擴展: flink 的并行度文章來源:http://www.zghlxwxcb.cn/news/detail-729748.html
參考文檔:
1.Spark SQL底層執(zhí)行流程詳解(好文收藏)-騰訊云開發(fā)者社區(qū)-騰訊云? spark 執(zhí)行原理
2.ORC 參數(shù):Configuration Properties - Apache Hive - Apache Software Foundation
3.ORC文件定義:?LanguageManual ORC - Apache Hive - Apache Software Foundation
4.oRC解讀:?深入理解ORC文件結構-CSDN博客
5.hadoop input:?How does Spark SQL decide the number of partitions it will use when loading data from a Hive table? - Stack Overflow
6.文件分割:從源碼看Spark讀取Hive表數(shù)據(jù)小文件和分塊的問題 - 掘金,?How does Spark SQL decide the number of partitions it will use when loading data from a Hive table? - Stack Overflow
7.spark手冊:How to Set Apache Spark Executor Memory - Spark By {Examples}
8.并行:?performance - What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? - Stack Overflow
9.flink的并行 :?并行執(zhí)行 | Apache Flink
10.reducebykey :scala - reduceByKey: How does it work internally? - Stack Overflow
11.key values :?4. Working with Key/Value Pairs - Learning Spark [Book]
12.spark并行:? ??Spark調優(yōu)之 -- Spark的并行度深入理解(別再讓資源浪費了)_spark并行度-CSDN博客
13.場景:??spark SQL 任務參數(shù)調優(yōu)1文章來源地址http://www.zghlxwxcb.cn/news/detail-729748.html
到了這里,關于spark SQL 任務參數(shù)調優(yōu)1的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!