国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions)

這篇具有很好參考價(jià)值的文章主要介紹了Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的時(shí)候用到了 Spark Rebalance 這個(gè)算子,這個(gè)算子的主要作用是在AQE階段的最后寫(xiě)文件的階段進(jìn)行小文件的合并,使得最后落盤的文件不會(huì)太大也不會(huì)太小,從而達(dá)到小文件合并的作用,這其中的主要原理是在于三個(gè)規(guī)則:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,這里主要說(shuō)一下OptimizeSkewInRebalancePartitions規(guī)則,CoalesceShufflePartitions的作用主要是進(jìn)行文件的合并,是得文件不會(huì)太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

結(jié)論

OptimizeSkewInRebalancePartitions的作用是對(duì)小文件進(jìn)行拆分,使得羅盤的文件不會(huì)太大,這個(gè)會(huì)有個(gè)問(wèn)題,如果我們?cè)谑褂?code>Rebalance(col)這種情況的時(shí)候,如果col的值是固定的,比如說(shuō)值永遠(yuǎn)是20240320,那么這里就得注意一下,關(guān)于OptimizeSkewInRebalancePartitions涉及到的參數(shù)spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 這些值配置,如果這些配置調(diào)整的不合適,就會(huì)導(dǎo)致寫(xiě)文件的時(shí)候有可能只有一個(gè)Task在運(yùn)行,那么最終就只有一個(gè)文件。而且大大加長(zhǎng)了整個(gè)任務(wù)的運(yùn)行時(shí)間。

分析

直接到OptimizeSkewInRebalancePartitions中的代碼中來(lái):

  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }

    plan transformUp {
      case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
        tryOptimizeSkewedPartitions(stage)
    }
  }

如果我們禁用掉對(duì)rebalance的傾斜處理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled為false(默認(rèn)是true),那么就不會(huì)應(yīng)用此規(guī)則,那么如果Col為固定值的情況下,就只會(huì)有一個(gè)Task進(jìn)行文件的寫(xiě)入操作,也就只有一個(gè)文件,因?yàn)橐粋€(gè)Task會(huì)拉取所有的Map的數(shù)據(jù)(因?yàn)榇藭r(shí)每個(gè)maptask上的hash(Col)都是一樣的,此時(shí)只有一個(gè)reduce task去拉取數(shù)據(jù)),如圖:

Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions),大數(shù)據(jù),spark,分布式,spark,大數(shù)據(jù),分布式
假如說(shuō)hash(col)為0,那實(shí)際上只有reduceTask0有數(shù)據(jù),其他的ReduceTask1等等都是沒(méi)有數(shù)據(jù)的,所以最終只有ReduceTask0寫(xiě)文件,并且只有一個(gè)文件。

在看合并的計(jì)算公式,該數(shù)據(jù)流如下:

 tryOptimizeSkewedPartitions
      ||
      \/
 optimizeSkewedPartitions
      ||
      \/
 ShufflePartitionsUtil.createSkewPartitionSpecs
      ||
      \/
 ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的參數(shù)解釋如下 :

  • 參數(shù) sizes: Array[Long] 表示屬于同一個(gè)reduce任務(wù)的maptask任務(wù)的大小數(shù)組,舉例 sizes = [100,200,300,400]
    表明該任務(wù)有4個(gè)maptask,0表示maptask為0的所屬reduce的大小,1表示maptask為1的所屬reduce的大小,依次類推,圖解如下:

Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions),大數(shù)據(jù),spark,分布式,spark,大數(shù)據(jù),分布式
比如說(shuō)reduceTask0的從Maptask拉取的數(shù)據(jù)的大小分別是100,200,300,400.

  • 參數(shù)targetSize 為 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如說(shuō)是256MB
  • 參數(shù)smallPartitionFactor為spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默認(rèn)是0.2
    這里有個(gè)計(jì)算公式:
    def tryMergePartitions() = {
      // When we are going to start a new partition, it's possible that the current partition or
      // the previous partition is very small and it's better to merge the current partition into
      // the previous partition.
      val shouldMergePartitions = lastPartitionSize > -1 &&
        ((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
        (currentPartitionSize < targetSize * smallPartitionFactor ||
          lastPartitionSize < targetSize * smallPartitionFactor))
      if (shouldMergePartitions) {
        // We decide to merge the current partition into the previous one, so the start index of
        // the current partition should be removed.
        partitionStartIndices.remove(partitionStartIndices.length - 1)
        lastPartitionSize += currentPartitionSize
      } else {
        lastPartitionSize = currentPartitionSize
      }
    }
    。。。
    while (i < sizes.length) {
      // If including the next size in the current partition exceeds the target size, package the
      // current partition and start a new partition.
      if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
        tryMergePartitions()
        partitionStartIndices += i
        currentPartitionSize = sizes(i)
      } else {
        currentPartitionSize += sizes(i)
      }
      i += 1
    }
    tryMergePartitions()
    partitionStartIndices.toArray

這里的計(jì)算公式大致就是:從每個(gè)maptask中的獲取到屬于同一個(gè)reduce的數(shù)值,依次累加,如果大于targetSize就嘗試合并,直至到最后一個(gè)maptask
可以看到tryMergePartitions有個(gè)計(jì)算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是說(shuō)如果當(dāng)前maptask的對(duì)應(yīng)的reduce分區(qū)數(shù)據(jù) 小于 256MB*0.2 = 51.2MB 的話,也還是會(huì)合并到前一個(gè)分區(qū)中去,如果smallPartitionFactor設(shè)置過(guò)大,可能會(huì)導(dǎo)致所有的分區(qū)都會(huì)合并到一個(gè)分區(qū)中去,最終會(huì)導(dǎo)致一個(gè)文件會(huì)有幾十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如說(shuō)以下的測(cè)試案例:

    val targetSize = 100
    val smallPartitionFactor2 = 0.5
    // merge last two partition if their size is not bigger than smallPartitionFactor * target
    val sizeList5 = Array[Long](50, 50, 40, 5)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList5, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

    val sizeList6 = Array[Long](40, 5, 50, 45)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList6, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

這種情況下,就會(huì)只有一個(gè)reduce任務(wù)運(yùn)行。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-851201.html

到了這里,關(guān)于Spark Rebalance hint的傾斜的處理(OptimizeSkewInRebalancePartitions)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Hive & Spark & Flink 數(shù)據(jù)傾斜

    絕大部分任務(wù)都很快完成,只有一個(gè)或者少數(shù)幾個(gè)任務(wù)執(zhí)行的很慢甚至最終執(zhí)行失敗, 這樣的現(xiàn)象為數(shù)據(jù)傾斜現(xiàn)象。 任務(wù)進(jìn)度長(zhǎng)時(shí)間維持在 99%或者 100%的附近,查看任務(wù)監(jiān)控頁(yè)面,發(fā)現(xiàn)只有少量 reduce 子任務(wù)未完成,因?yàn)槠涮幚淼臄?shù)據(jù)量和其他的 reduce 差異過(guò)大。 單一 redu

    2024年02月07日
    瀏覽(31)
  • Spark數(shù)據(jù)傾斜場(chǎng)景及解決思路

    絕大多數(shù) task 執(zhí)行得都非??欤珎€(gè)別 task 執(zhí)行極慢。 在進(jìn)行 shuffle 的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的 key 拉取到某個(gè)節(jié)點(diǎn)上的一個(gè) task 來(lái)進(jìn)行處理,比如按照 key 進(jìn)行聚合或 join 等操 作。此時(shí)如果某個(gè) key 對(duì)應(yīng)的數(shù)據(jù)量特別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜。 因此出現(xiàn)數(shù)據(jù)傾斜的

    2023年04月24日
    瀏覽(27)
  • hive/spark數(shù)據(jù)傾斜解決方案

    hive/spark數(shù)據(jù)傾斜解決方案

    數(shù)據(jù)傾斜主要表現(xiàn)在,mapreduce程序執(zhí)行時(shí),reduce節(jié)點(diǎn)大部分執(zhí)行完畢,但是有一個(gè)或者幾個(gè)reduce節(jié)點(diǎn)運(yùn)行很慢,導(dǎo)致整個(gè)程序的處理時(shí)間很長(zhǎng),這是因?yàn)槟骋粋€(gè)key的條數(shù)比其他key多很多(有時(shí)是百倍或者千倍之多),這條Key所在的reduce節(jié)點(diǎn)所處理的數(shù)據(jù)量比其他節(jié)點(diǎn)就大很多,

    2024年02月11日
    瀏覽(26)
  • 萬(wàn)字解決Flink|Spark|Hive 數(shù)據(jù)傾斜

    萬(wàn)字解決Flink|Spark|Hive 數(shù)據(jù)傾斜

    此篇主要總結(jié)到Hive,Flink,Spark出現(xiàn)數(shù)據(jù)傾斜的表現(xiàn),原因和解決辦法。首先會(huì)讓大家認(rèn)識(shí)到不同框架或者計(jì)算引擎處理傾斜的方案。最后你會(huì)發(fā)現(xiàn)計(jì)算框架只是“異曲”,文末總結(jié)才是“同工之妙”。點(diǎn)擊收藏與分享,工作和漲薪用得到?。。?數(shù)據(jù)傾斜最籠統(tǒng)概念就是數(shù)據(jù)的

    2024年02月03日
    瀏覽(28)
  • spark 的group by ,join數(shù)據(jù)傾斜調(diào)優(yōu)

    spark任務(wù)中最常見(jiàn)的耗時(shí)原因就是數(shù)據(jù)分布不均勻,從而導(dǎo)致有些task運(yùn)行時(shí)間很長(zhǎng),長(zhǎng)尾效應(yīng)導(dǎo)致的整個(gè)job運(yùn)行耗時(shí)很長(zhǎng) 首先我們要定位數(shù)據(jù)傾斜,我們可以通過(guò)在spark ui界面中查看某個(gè)stage下的task的耗時(shí),如果發(fā)現(xiàn)某些task耗時(shí)很長(zhǎng),對(duì)應(yīng)要處理的數(shù)據(jù)很多,證明有數(shù)據(jù)傾斜

    2024年02月21日
    瀏覽(24)
  • 【詳解】Spark數(shù)據(jù)傾斜問(wèn)題由基礎(chǔ)到深入詳解-完美理解-費(fèi)元星

    數(shù)據(jù)傾斜定義:顧名思義,就是大量相似或相同數(shù)據(jù)聚集在一個(gè)塊的節(jié)點(diǎn)里,導(dǎo)致計(jì)算和資源分配不均導(dǎo)致的計(jì)算緩慢(長(zhǎng)尾)問(wèn)題。 數(shù)據(jù)傾斜原因: count(distinct field) group by? NULL 空值 Shuffle (概率最高、發(fā)生最普遍的數(shù)據(jù)傾斜問(wèn)題,本文重點(diǎn)講述這個(gè)) ##########################

    2024年02月20日
    瀏覽(29)
  • spark sql 數(shù)據(jù)傾斜--join 同時(shí)開(kāi)窗去重的問(wèn)題優(yōu)化

    背景: 需求:在一張查詢?nèi)罩颈碇?,有百億數(shù)據(jù),需要join上維表,再根據(jù)幾個(gè)字段進(jìn)行去重 開(kāi)窗去重和join 一定要分步進(jìn)行 ,按照需求先做join再開(kāi)窗,或者去重完成后在進(jìn)行join。 dwd_tmp1 中存在百億用戶查詢?nèi)罩緮?shù)據(jù) 數(shù)據(jù)傾斜 數(shù)據(jù)量超百億,資源給到200 * 2c * 20G,執(zhí)行引擎

    2024年02月11日
    瀏覽(26)
  • 被修飾成單棟的傾斜攝影處理思路

    被修飾成單棟的傾斜攝影處理思路

    作者:kele 傾斜攝影數(shù)據(jù)是三維項(xiàng)目系統(tǒng)中的??汀T谀承╉?xiàng)目中,為了給傾斜攝影上的建筑賦予屬性信息,實(shí)現(xiàn)點(diǎn)擊建筑高亮并展示屬性的功能,客戶將傾斜攝影數(shù)據(jù)進(jìn)行了模型單體化(使用pdmodeler或者其它軟件,將傾斜攝影中的建筑提取成單個(gè)單個(gè)的對(duì)象)。這樣處理后

    2023年04月27日
    瀏覽(18)
  • 如何提高傾斜攝影超大場(chǎng)景的三維模型輕量化處理速度和效率?

    如何提高傾斜攝影超大場(chǎng)景的三維模型輕量化處理速度和效率?

    ?傾斜攝影超大場(chǎng)景的三維模型輕量化處理是將高精度的三維模型進(jìn)行降采樣、簡(jiǎn)化等處理,以達(dá)到減少數(shù)據(jù)大小和提高渲染性能的目的。為了提高輕量化處理速度,可以從以下方面入手: 1、選擇合適的輕量化算法。當(dāng)前已有很多成熟的三維模型輕量化算法,如基于多分辨率

    2024年02月01日
    瀏覽(31)
  • Python 基于 OpenCV 視覺(jué)圖像處理實(shí)戰(zhàn) 之 OpenCV 簡(jiǎn)單實(shí)戰(zhàn)案例 之六 簡(jiǎn)單圖像傾斜校正處理效果

    Python 基于 OpenCV 視覺(jué)圖像處理實(shí)戰(zhàn) 之 OpenCV 簡(jiǎn)單實(shí)戰(zhàn)案例 之六 簡(jiǎn)單圖像傾斜校正處理效果

    目錄 Python 基于 OpenCV 視覺(jué)圖像處理實(shí)戰(zhàn) 之 OpenCV 簡(jiǎn)單實(shí)戰(zhàn)案例 之六 簡(jiǎn)單圖像傾斜校正處理效果 一、簡(jiǎn)單介紹 二、簡(jiǎn)單圖像傾斜校正處理效果實(shí)現(xiàn)原理 三、簡(jiǎn)單圖像傾斜校正處理效果案例實(shí)現(xiàn)簡(jiǎn)單步驟 四、注意事項(xiàng) Python是一種跨平臺(tái)的計(jì)算機(jī)程序設(shè)計(jì)語(yǔ)言。是一種面向?qū)?/p>

    2024年04月13日
    瀏覽(28)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包