背景
本文基于Spark 3.5.0
目前公司在做小文件合并的時(shí)候用到了 Spark Rebalance
這個(gè)算子,這個(gè)算子的主要作用是在AQE階段的最后寫(xiě)文件的階段進(jìn)行小文件的合并,使得最后落盤的文件不會(huì)太大也不會(huì)太小,從而達(dá)到小文件合并的作用,這其中的主要原理是在于三個(gè)規(guī)則:OptimizeSkewInRebalancePartitions
,CoalesceShufflePartitions
,OptimizeShuffleWithLocalRead
,這里主要說(shuō)一下OptimizeSkewInRebalancePartitions
規(guī)則,CoalesceShufflePartitions
的作用主要是進(jìn)行文件的合并,是得文件不會(huì)太小,OptimizeShuffleWithLocalRead
的作用是加速shuffle fetch的速度。
結(jié)論
OptimizeSkewInRebalancePartitions
的作用是對(duì)小文件進(jìn)行拆分,使得羅盤的文件不會(huì)太大,這個(gè)會(huì)有個(gè)問(wèn)題,如果我們?cè)谑褂?code>Rebalance(col)這種情況的時(shí)候,如果col
的值是固定的,比如說(shuō)值永遠(yuǎn)是20240320
,那么這里就得注意一下,關(guān)于OptimizeSkewInRebalancePartitions
涉及到的參數(shù)spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
,spark.sql.adaptive.advisoryPartitionSizeInBytes
,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
這些值配置,如果這些配置調(diào)整的不合適,就會(huì)導(dǎo)致寫(xiě)文件的時(shí)候有可能只有一個(gè)Task在運(yùn)行,那么最終就只有一個(gè)文件。而且大大加長(zhǎng)了整個(gè)任務(wù)的運(yùn)行時(shí)間。
分析
直接到OptimizeSkewInRebalancePartitions
中的代碼中來(lái):
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan transformUp {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
}
}
如果我們禁用掉對(duì)rebalance的傾斜處理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled為false
(默認(rèn)是true
),那么就不會(huì)應(yīng)用此規(guī)則,那么如果Col
為固定值的情況下,就只會(huì)有一個(gè)Task進(jìn)行文件的寫(xiě)入操作,也就只有一個(gè)文件,因?yàn)橐粋€(gè)Task會(huì)拉取所有的Map的數(shù)據(jù)(因?yàn)榇藭r(shí)每個(gè)maptask上的hash(Col)都是一樣的,此時(shí)只有一個(gè)reduce task去拉取數(shù)據(jù)),如圖:
假如說(shuō)hash(col)為0,那實(shí)際上只有reduceTask0有數(shù)據(jù),其他的ReduceTask1等等都是沒(méi)有數(shù)據(jù)的,所以最終只有ReduceTask0寫(xiě)文件,并且只有一個(gè)文件。
在看合并的計(jì)算公式,該數(shù)據(jù)流如下:
tryOptimizeSkewedPartitions
||
\/
optimizeSkewedPartitions
||
\/
ShufflePartitionsUtil.createSkewPartitionSpecs
||
\/
ShufflePartitionsUtil.splitSizeListByTargetSize
splitSizeListByTargetSize
方法中涉及到的參數(shù)解釋如下 :
- 參數(shù) sizes: Array[Long] 表示屬于同一個(gè)reduce任務(wù)的maptask任務(wù)的大小數(shù)組,舉例 sizes = [100,200,300,400]
表明該任務(wù)有4個(gè)maptask,0表示maptask為0的所屬reduce的大小,1表示maptask為1的所屬reduce的大小,依次類推,圖解如下:
比如說(shuō)reduceTask0的從Maptask拉取的數(shù)據(jù)的大小分別是100,200,300,400.
- 參數(shù)targetSize 為
spark.sql.adaptive.advisoryPartitionSizeInBytes
的值,假如說(shuō)是256MB
- 參數(shù)smallPartitionFactor為
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
的值,默認(rèn)是0.2
這里有個(gè)計(jì)算公式:
def tryMergePartitions() = {
// When we are going to start a new partition, it's possible that the current partition or
// the previous partition is very small and it's better to merge the current partition into
// the previous partition.
val shouldMergePartitions = lastPartitionSize > -1 &&
((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
(currentPartitionSize < targetSize * smallPartitionFactor ||
lastPartitionSize < targetSize * smallPartitionFactor))
if (shouldMergePartitions) {
// We decide to merge the current partition into the previous one, so the start index of
// the current partition should be removed.
partitionStartIndices.remove(partitionStartIndices.length - 1)
lastPartitionSize += currentPartitionSize
} else {
lastPartitionSize = currentPartitionSize
}
}
。。。
while (i < sizes.length) {
// If including the next size in the current partition exceeds the target size, package the
// current partition and start a new partition.
if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
tryMergePartitions()
partitionStartIndices += i
currentPartitionSize = sizes(i)
} else {
currentPartitionSize += sizes(i)
}
i += 1
}
tryMergePartitions()
partitionStartIndices.toArray
這里的計(jì)算公式大致就是:從每個(gè)maptask中的獲取到屬于同一個(gè)reduce的數(shù)值,依次累加,如果大于targetSize就嘗試合并,直至到最后一個(gè)maptask
,
可以看到tryMergePartitions
有個(gè)計(jì)算公式:currentPartitionSize < targetSize * smallPartitionFactor
,也就是說(shuō)如果當(dāng)前maptask的對(duì)應(yīng)的reduce分區(qū)數(shù)據(jù) 小于 256MB*0.2 = 51.2MB
的話,也還是會(huì)合并到前一個(gè)分區(qū)中去,如果smallPartitionFactor
設(shè)置過(guò)大,可能會(huì)導(dǎo)致所有的分區(qū)都會(huì)合并到一個(gè)分區(qū)中去,最終會(huì)導(dǎo)致一個(gè)文件會(huì)有幾十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如說(shuō)以下的測(cè)試案例:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-851201.html
val targetSize = 100
val smallPartitionFactor2 = 0.5
// merge last two partition if their size is not bigger than smallPartitionFactor * target
val sizeList5 = Array[Long](50, 50, 40, 5)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList5, targetSize, smallPartitionFactor2).toSeq ==
Seq(0))
val sizeList6 = Array[Long](40, 5, 50, 45)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList6, targetSize, smallPartitionFactor2).toSeq ==
Seq(0))
這種情況下,就會(huì)只有一個(gè)reduce任務(wù)運(yùn)行。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-851201.html
到了這里,關(guān)于Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!