在數據處理領域,Apache Spark已成為一個強大且多功能的框架。然而,隨著數據量和復雜性不斷增長,確保最佳性能變得至關重要。
在這篇博文中,我們將探討解釋 計劃如何成為調試和優(yōu)化 Spark 應用程序的秘密武器。我們將深入探討 Spark Scala 的基礎知識并提供清晰的示例,以幫助您了解如何利用這個有價值的工具。
解釋計劃是什么?
解釋計劃是 Spark 處理數據所遵循的邏輯和物理執(zhí)行步驟的全面細分。將其視為指導您完成 Spark 作業(yè)內部運作的路線圖。
Spark 解釋計劃的兩個重要組成部分是:
邏輯計劃:邏輯計劃代表 Spark 應用程序中指定的高級轉換和操作。它是對您想要對數據執(zhí)行的操作的抽象描述。
物理計劃:另一方面,物理計劃提供了 Spark 如何將邏輯計劃轉化為一組具體操作的具體細節(jié)。它揭示了 Spark 如何優(yōu)化您的工作以獲得性能。
Explain API還有一些其他重載方法:
explain()- 打印物理計劃。
explain(extended: Boolean)- 打印計劃(邏輯和物理)。
explain(mode: String)- 使用給定解釋模式指定的格式打印計劃(邏輯和物理):
simple僅打印物理計劃。
extended:打印邏輯計劃和物理計劃。
codegen:打印物理計劃并生成代碼(如果可用)。
cost:打印邏輯計劃和統(tǒng)計數據(如果有)。
formatted:將解釋輸出分為兩部分:物理計劃大綱和節(jié)點詳細信息。
解釋計劃的使用
我們可以使用explain()DataFrame 或 Dataset 上的方法來做到這一點。這是使用解釋計劃的一個簡單示例:
import org.apache.spark.sql.SparkSession // 創(chuàng)建 SparkSession val spark = SparkSession.builder() .appName("ExplainPlanExample") .getOrCreate() // 為員工創(chuàng)建一個虛擬 DataFrame val employeesData = Seq( (1, "Alice", "HR"), (2, "Bob", "Engineering"), (3, "Charlie", "Sales"), (4, "David", "Engineering") ) val employeesDF = employeesData.toDF("employee_id", "employee_name", "department") // 為工資創(chuàng)建另一個虛擬 DataFrame val salariesData = Seq( (1, 50000), (2, 60000), (3, 55000), (4, 62000) ) val salariesDF = salariesData.toDF("employee_id", "salary") // 將 DataFrame 注冊為 SQL 臨時表 employeesDF.createOrReplaceTempView("employees") salariesDF.createOrReplaceTempView("salaries") // 使用Spark SQL計算每個部門的平均工資 val avgSalaryDF = spark.sql(""" SELECT department, AVG(salary) as avg_salary FROM employees e JOIN salaries s ON e.employee_id = s.employee_id GROUP BY department """) // 使用擴展模式調用解釋計劃來打印物理和邏輯計劃 avgSalaryDF.explain(true) // 停止 SparkSession spark.stop()
在上面的示例中,我們創(chuàng)建了一個實例employeeData和salariesData DataFrame,并執(zhí)行連接,然后進行聚合以獲得部門的平均工資。以下是給定數據框的解釋計劃。
scala> avgSalaryDF.explain(true) == Parsed Logical Plan == 'Aggregate ['department], ['department, 'AVG('salary) AS avg_salary#150] +- 'Join Inner, ('e.employee_id = 's.employee_id) :- 'SubqueryAlias e : +- 'UnresolvedRelation [employees], [], false +- 'SubqueryAlias s +- 'UnresolvedRelation [salaries], [], false == Analyzed Logical Plan == department: string, avg_salary: double Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150] +- Join Inner, (employee_id#133 = employee_id#146) :- SubqueryAlias e : +- SubqueryAlias employees : +- View (`employees`, [employee_id#133,employee_name#134,department#135]) : +- Project [_1#126 AS employee_id#133, _2#127 AS employee_name#134, _3#128 AS department#135] : +- LocalRelation [_1#126, _2#127, _3#128] +- SubqueryAlias s +- SubqueryAlias salaries +- View (`salaries`, [employee_id#146,salary#147]) +- Project [_1#141 AS employee_id#146, _2#142 AS salary#147] +- LocalRelation [_1#141, _2#142] == Optimized Logical Plan == Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150] +- Project [department#135, salary#147] +- Join Inner, (employee_id#133 = employee_id#146) :- LocalRelation [employee_id#133, department#135] +- LocalRelation [employee_id#146, salary#147] == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[department#135], functions=[avg(salary#147)], output=[department#135, avg_salary#150]) +- Exchange hashpartitioning(department#135, 200), ENSURE_REQUIREMENTS, [plan_id=271] +- HashAggregate(keys=[department#135], functions=[partial_avg(salary#147)], output=[department#135, sum#162, count#163L]) +- Project [department#135, salary#147] +- BroadcastHashJoin [employee_id#133], [employee_id#146], Inner, BuildRight, false :- LocalTableScan [employee_id#133, department#135] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=266] +- LocalTableScan [employee_id#146, salary#147] scala>
正如您在上面看到的,當extended標志設置為 時true,我們有解析的邏輯計劃、分析的邏輯計劃、優(yōu)化的邏輯計劃和物理計劃。
在嘗試理解計劃之前,我們需要自下而上地閱讀所有計劃。因此我們將在底部看到任何數據幀的創(chuàng)建或讀取。
我們將了解其中的每一項:
解析邏輯計劃
這是Spark解析用戶提供的SQL或 DataFrame 操作并創(chuàng)建查詢的解析表示的初始階段。使用 Spark SQL 查詢時,任何語法錯誤都會在此處捕獲。如果我們在這里觀察,列名稱尚未解析。
在上面解析的邏輯計劃中,我們可以看到 UnresolvedRelation。這意味著架構尚未解決。解析后的邏輯計劃概述了查詢的邏輯結構,包括聚合和連接操作。employees它還定義別名并標識和DataFrame的源salaries。未解析的關系將在查詢執(zhí)行期間解析為其實際數據源。
分析邏輯計劃
解析之后,Spark 會經歷一個稱為語義分析或解析的過程。在此階段,Spark 根據可用表和列的目錄檢查查詢,解析列名稱,驗證數據類型,并確保查詢在語義上正確。結果是經過分析的邏輯計劃,其中包含有關所涉及的表和列的元數據和信息。
該計劃表示解析和語義分析后查詢的初始邏輯結構。它顯示了包含兩列的結果架構,department和avg_salary。該計劃由兩個子查詢e,和組成s,它們對應于employees和salariesDataFrame。employee_id內連接操作作為連接鍵應用于這些子查詢之間。該計劃還包括別名 (e和s) 以及用于選擇特定列的投影操作。
優(yōu)化的邏輯計劃
一旦分析了查詢并且 Spark 對數據和模式有了清晰的了解,它就會繼續(xù)優(yōu)化查詢。在優(yōu)化過程中,Spark會對查詢計劃應用各種邏輯優(yōu)化來提高性能。這可能涉及謂詞下推、常量折疊和基于規(guī)則的轉換等技術。優(yōu)化的邏輯計劃代表了在數據檢索和處理方面更高效的查詢版本。
優(yōu)化階段簡化了計劃以獲得更好的性能。在這種情況下,計劃被簡化以刪除子查詢和不必要的投影操作。它使用連接鍵直接連接兩個本地關系 (employees和salaries) employee_id,然后應用聚合來計算每個部門的平均工資。
物理計劃
物理計劃也稱為執(zhí)行計劃,是查詢優(yōu)化的最后階段。此時,Spark 會生成一個關于如何在集群上物理執(zhí)行查詢的計劃。它考慮了數據分區(qū)、數據洗牌和跨節(jié)點的任務分配等因素。物理計劃是實際執(zhí)行查詢的藍圖,它考慮了可用資源和并行性以有效地執(zhí)行查詢。
物理計劃概述了 Spark 執(zhí)行查詢所采取的實際執(zhí)行步驟。它涉及聚合、聯接和數據掃描,以及廣播聯接等優(yōu)化技術以提高效率。該計劃反映了 Spark 將遵循的執(zhí)行策略來計算查詢結果。現在,讓我們仔細檢查每一行以更深入地了解(從下到上)。
LocalTableScan:這些是本地表的掃描。在本例中,它們代表表或 DataFrames employee_id#133、department#135、employee_id#146和salary#147。這些掃描從本地分區(qū)檢索數據。
BroadcastExchange:此操作將較小的 DataFrame (employee_id#146和salary#147) 廣播到所有工作節(jié)點以進行廣播連接。它將廣播模式指定為HashedRelationBroadcastMode并指示應廣播輸入數據。
BroadcastHashJoin:這是兩個數據源(employee_id#133和employee_id#146)之間使用inner join. 它構建連接的右側,因為它被標記為“BuildRight”。此操作執(zhí)行廣播連接,這意味著它將較小的 DataFrame(右側)廣播到較大的 DataFrame(左側)所在的所有節(jié)點。當一個 DataFrame 明顯小于另一個 DataFrame 時,這樣做是為了優(yōu)化目的。
項目:此操作從數據中選擇department和列。salary
HashAggregate (partial_avg):這是一個部分聚合操作,計算每個部門的平均工資。它包括附加列 sum#162和count#163L,分別表示工資總和和記錄數。
Exchange hashpartitioning:此操作基于列對數據執(zhí)行哈希分區(qū)department#135。它的目標是將數據均勻分布在 200 個分區(qū)中。該ENSURE_REQUIREMENTS屬性表明該操作保證了后續(xù)操作的要求。
HashAggregate:avg(salary#147)這是一個聚合操作,計算列中每個唯一值的平均工資 ( ) department。輸出包括兩列:department#135和avg_salary#150。
AdaptiveSparkPlan:這表示 Spark 查詢的頂級執(zhí)行計劃。該屬性isFinalPlan=false表明該計劃尚未最終確定,這表明 Spark 可能會在執(zhí)行期間根據運行時統(tǒng)計數據調整該計劃。
結論
了解 Spark SQL 生成的執(zhí)行計劃對于開發(fā)人員來說非常有價值,體現在以下幾個方面:
查詢優(yōu)化:通過檢查物理計劃,開發(fā)人員可以深入了解 Spark 如何優(yōu)化其 SQL 查詢。它可以幫助他們查看查詢是否有效地使用可用資源、分區(qū)和聯接。
性能調優(yōu):開發(fā)人員可以識別計劃中潛在的性能瓶頸。例如,如果他們注意到不必要的改組或數據重新分配,他們可以修改查詢或調整 Spark 配置以提高性能。
調試:當查詢未產生預期結果或發(fā)生錯誤時,物理計劃可以提供有關問題可能出在哪里的線索。開發(fā)人員可以查明計劃中存在問題的階段或轉換。
高效連接:了解廣播連接等連接策略可以幫助開發(fā)人員就廣播哪些表做出明智的決定。這可以顯著減少shuffle并提高查詢性能。文章來源:http://www.zghlxwxcb.cn/article/479.html
文章來源地址http://www.zghlxwxcb.cn/article/479.html
到此這篇關于使用解釋計劃調試 Apache Spark 性能的文章就介紹到這了,更多相關內容可以在右上角搜索或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!