国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

spark-sql字段血緣實現(xiàn)

這篇具有很好參考價值的文章主要介紹了spark-sql字段血緣實現(xiàn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

spark-sql字段血緣實現(xiàn)

背景

Apache Spark是一個開源的大數(shù)據(jù)處理框架,它提供了一種高效、易于使用的方式來處理大規(guī)模數(shù)據(jù)集。在Spark中,數(shù)據(jù)是通過DataFrame和Dataset的形式進(jìn)行操作的,這些數(shù)據(jù)結(jié)構(gòu)包含了一系列的字段(也稱為列)。字段血緣是Spark中的一個關(guān)鍵概念,它幫助我們理解數(shù)據(jù)的來源和流向,從而更好地理解和控制數(shù)據(jù)處理過程。

字段血緣是指在數(shù)據(jù)處理過程中,一個字段的值是如何從源數(shù)據(jù)產(chǎn)生并傳遞給目標(biāo)數(shù)據(jù)的。在Spark中,字段血緣是通過依賴關(guān)系進(jìn)行管理的。每個字段都有一個或多個依賴關(guān)系,這些依賴關(guān)系定義了字段的值如何從其他字段或數(shù)據(jù)源產(chǎn)生。

前提

spark版本:2.4.3
使用語言:java+scala

技術(shù)實現(xiàn)

1. spark-sql的執(zhí)行計劃,了解如何實現(xiàn)字段血緣解析

spark-sql字段血緣實現(xiàn),spark,spark,sql,大數(shù)據(jù)
一個sql會經(jīng)歷一些列的處理,最終生成spark-core的代碼,提交到集群運行。
首先看一下一個簡單的sql生成的邏輯執(zhí)行計劃長什么樣子

insert into default.jy_test
select * from default.jy_test

未解析的邏輯執(zhí)行計劃:

'InsertIntoTable 'UnresolvedRelation `default`.`jy_test`, false, false
+- 'Project [*]
   +- 'UnresolvedRelation `default`.`jy_test`

解析后(analyzer)的邏輯執(zhí)行計劃:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- Project [id#0, name#1]
   +- SubqueryAlias `default`.`jy_test`
      +- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

優(yōu)化后(optimizer)的邏輯執(zhí)行計劃:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

重點來了

  1. 所謂的邏輯執(zhí)行計劃就是一個樹形結(jié)構(gòu)
  2. 樹形結(jié)構(gòu)中的葉子結(jié)點,就是hive的表信息:庫名、表名、字段信息,并且spark給每一個字段生成了一個唯一的id
  3. 樹形結(jié)構(gòu)中的非葉子只包含了字段信息,不包含庫表信息

所以想要實現(xiàn)字段血緣,我們需要做的就是通過那個生成的唯一id去一層層的關(guān)聯(lián),當(dāng)關(guān)聯(lián)到葉子結(jié)點的時候,就找到了庫名表名

2. 構(gòu)建一顆與解析后的邏輯執(zhí)行計劃一模一樣的樹形結(jié)構(gòu)

  1. 首先定義node對象,用來存放節(jié)點信息
public abstract class Node {
    private String name;
    private List<Column> columnList = new ArrayList<>();
    private List<Node> children = new ArrayList<>();
    private Node parentNode;
    private String graphId;
}
  1. 其次定義column對象,用來存放字段信息
public class Column {
   private String name;
   private Long exprId;
   private String ColumnType;
   private ArrayList<Column> child = new ArrayList<Column>();
   private String tableName;
   private String process;
}
  1. 根據(jù)spark-sql生成的邏輯執(zhí)行計劃,我們?yōu)槊恳粋€邏輯節(jié)點創(chuàng)建對應(yīng)的結(jié)點,由于結(jié)點很多,我這里直接給個截圖,源碼會在文章最后提供出來
    spark-sql字段血緣實現(xiàn),spark,spark,sql,大數(shù)據(jù)

  2. 解析spark-sql生成的解析后的邏輯執(zhí)行計劃
    首先獲取邏輯執(zhí)行計劃,這里提供兩種方式:
    1.通過spark-session獲取,該方法可以用來做測試,非常的方便

     LogicalPlan logicalPlan = spark.sessionState().sqlParser().parsePlan(sql);
     LogicalPlan analyzer = spark.sessionState().analyzer().execute(logicalPlan);
    

    2.通過QueryExecution獲得,這里貼個圖,詳情看源碼
    spark-sql字段血緣實現(xiàn),spark,spark,sql,大數(shù)據(jù)

  3. 解析spark生成的analyzer,構(gòu)建我們自己的樹形結(jié)構(gòu)
    這里貼一下主要的邏輯,使用scala去遞歸解析抽象語法樹會方便很多

def resolveLogicPlan(plan: LogicalPlan, root: Node): Unit = {
    plan match {
      case plan: InsertIntoHadoopFsRelationCommand =>
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.INSERTINTOHIVETABLE.getName)
        val database: String = plan.catalogTable.get.identifier.database.getOrElse("default")
        val table: String = plan.catalogTable.get.identifier.table
        val fullTableName = database + "." + table
        plan.catalogTable.get.schema.foreach { field => {
          val column = new Column()
          column.setName(field.name)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: SaveIntoDataSourceCommand =>
        val table: String = plan.options.get("dbtable").getOrElse("")
        val url: String = plan.options.get("url").getOrElse("")
        val user: String = plan.options.get("user").getOrElse("")
        val password: String = plan.options.get("password").getOrElse("")
        // 定義匹配數(shù)據(jù)庫名稱的正則表達(dá)式模式
        val pattern: Regex = ".*://[^/]+/(\\w+)".r
        // 使用正則表達(dá)式進(jìn)行匹配
        val dbNameOption: Option[String] = pattern.findFirstMatchIn(url).map(_.group(1))
        val fullTableName = dbNameOption.getOrElse("") + "." + table
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.SAVEINTODATASOURCECOMMAND.getName)
        // 連接mysql,根據(jù)庫名表明獲取字段列表
        val fieldsList = getFieldsListFromMysql(url, user, password, table)
        fieldsList.foreach { field => {
          val column = new Column()
          column.setName(field)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: InsertIntoHiveTable =>
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.INSERTINTOHIVETABLE.getName)
        val database: String = plan.table.identifier.database.getOrElse("default")
        val table: String = plan.table.identifier.table
        val fullTableName = database + "." + table

        node.setTableName(fullTableName)
        plan.table.schema.foreach { field => {
          val column = new Column()
          column.setName(field.name)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: Aggregate =>
        val node = new AggregateNode()
        insertNodeColumnsFromNamedExpression(node, plan.aggregateExpressions)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case plan: Project =>
        val node = new ProjectNode()
        insertNodeColumnsFromNamedExpression(node, plan.projectList)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: LogicalRelation =>
        val node = new LogicalRelationNode()
        dfsLogicalRelation(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)

      case plan: HiveTableRelation =>
        val node = new LogicalRelationNode()
        dfsLogicalRelation(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)

      case plan: Filter =>
        val node = new FilterNode()
        node.setParentNode(root)
        node.setCondition(plan.condition.toString)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case plan: Join =>
        val node = new JoinNode()
        node.setName(plan.joinType.toString + " " + node.getName)
        node.setParentNode(root)
        node.setCondition(plan.condition.toString)
        root.getChildren.add(node)
        resolveLogicPlan(plan.left, node)
        resolveLogicPlan(plan.right, node)
      case plan: Window =>
        val node = new WindowNode()
        insertNodeColumnsFromNamedExpression(node, plan.windowExpressions)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: Union =>
        val node = new UnionNode()
        node.setParentNode(root)
        root.getChildren.add(node)
        plan.children.foreach(resolveLogicPlan(_, node))

      case plan: SubqueryAlias =>
        val node = new SubqueryNode()
        node.setName(node.getName + " " + plan.name.toString())
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: Generate =>
        val node = new GenerateNode()
        processGenerate(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case _ =>
        plan.children.foreach(resolveLogicPlan(_, root))
    }

  }
  1. 到這里,我們已經(jīng)得到了自己的樹形結(jié)構(gòu)。接下來要通過唯一id進(jìn)行關(guān)聯(lián),補充庫表信息。
    不知道大家注意沒有,我們在node對象中有一個方法,

    spark-sql字段血緣實現(xiàn),spark,spark,sql,大數(shù)據(jù)
    這里用到了訪問者設(shè)計模式,感興趣的同學(xué)可以學(xué)習(xí)一下,在spark-sql源碼中,同樣用的是訪問者設(shè)計模式。

這里主要說一下Visitor的定義及方法:processColumn方法主要是拿自己的ExprId和所有孩子結(jié)點的ExprId比較,如果相等的話,說明是同一個字段,那就表名復(fù)制過來。

public interface Visitor {

    void visit(Node node);

    default void processColumn(Node node) {
        for (Column column1 : node.getColumnList()) {
            for (Node nd : node.getChildren()) {
                for (Column column2 : nd.getColumnList()) {
                    processColumn(column1, column2);
                }
            }
        }

    }

     default void processColumn(Column column1, Column column2) {
        List<Column> child = column1.getChild();
        child.forEach(ch -> processColumn(ch, column2));
        if (column1.getExprId().equals(column2.getExprId())) {
            if(column2.getTableName() != null) {
                column1.setTableName(column2.getTableName());
            } else {
                column1.getChild().addAll(column2.getChild());
            }
        }
    }
}

LineageVisitor是Visitor的實現(xiàn)類, 主要用來做模式匹配,不同的結(jié)點處理方式會有不同,感興趣的同學(xué)看一下這塊的代碼。

public class LineageVisitor implements Visitor{
    @Override
    public void visit(Node node) {
        switch (node.getClass().getSimpleName()) {
            case "FilterNode" :
            case "SubqueryNode" :
            case "JoinNode" : copyChildColumnToThis(node); break;
            case "WindowNode" :
            case "GenerateNode" : copyChildColumnToThisWithProcess(node); break;
            case "Root" : processColumn((Root)node); break;
            case "UnionNode" : processColumn((UnionNode)node); break;
            default : processColumn(node);
        }
    }

    @Override
    public void processColumn(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        Visitor.super.processColumn(node);
    }

    public void copyChildColumnToThis(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        for (Node child : node.getChildren()) {
            node.getColumnList().addAll(child.getColumnList());
        }
    }
    public void copyChildColumnToThisWithProcess(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        Visitor.super.processColumn(node);
        for (Node child : node.getChildren()) {
            node.getColumnList().addAll(child.getColumnList());
        }

    }

    public void processColumn(Root node) {
        node.getChildren().forEach( child -> child.accept(this));
        if(node.getColumnList().size() > 0) {
            for (int i = 0; i < node.getChildren().get(0).getColumnList().size(); i++) {
                for (Node child : node.getChildren()) {
                    node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
                }
            }
        }
    }

    public void processColumn(UnionNode node) {
        node.getChildren().forEach( child -> child.accept(this));
        int size = node.getChildren().get(0).getColumnList().size();
        for (Column column : node.getChildren().get(0).getColumnList()) {
            Column column1 = new Column();
            column1.setName(column.getName());
            column1.setExprId(column.getExprId());
            node.getColumnList().add(column1);
        }
        for (int i = 0; i < size; i++) {
            for (Node child : node.getChildren()) {
                node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
            }
        }
    }
}

成果展示

還是拿最開始的sql ,看一下最終生成的字段血緣

insert into default.jy_test select * from default.jy_test

spark-sql字段血緣實現(xiàn),spark,spark,sql,大數(shù)據(jù)

最后

字段血緣實現(xiàn)起來還是比較困難的,需要了解spak-sql的底層原理和一些技巧。
這里方便大家使用、學(xué)習(xí)、交流,所以貢獻(xiàn)自己的源碼,倉庫地址:https://gitee.com/chenxiaoliang0901/crock/tree/main文章來源地址http://www.zghlxwxcb.cn/news/detail-786451.html

到了這里,關(guān)于spark-sql字段血緣實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Spark-SQL小結(jié)

    Spark-SQL小結(jié)

    目錄 一、RDD、DataFrame、DataSet的概念、區(qū)別聯(lián)系、相互轉(zhuǎn)換操作 ? 1.RDD概念 ? 2.DataFrame概念 ? 3.DataSet概念 ? 4.RDD、DataFrame、DataSet的區(qū)別聯(lián)系 ? 5.RDD、DataFrame、DataSet的相互轉(zhuǎn)換操作 ? ?1 RDD-DataFrame、DataSet ? ?2? DataFrame-RDD,DataSet ? ?3 DataSet-RDD,DataFrame 二、Spark-SQL連接JDBC的方式

    2024年02月09日
    瀏覽(19)
  • Hudi-集成Spark之spark-sql方式

    啟動spark-sql 創(chuàng)建表 建表參數(shù): 參數(shù)名 默認(rèn)值 說明 primaryKey uuid 表的主鍵名,多個字段用逗號分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的預(yù)合并字段。同 hoodie.datasource.write.precombine.field type cow 創(chuàng)建的表類型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    瀏覽(23)
  • Spark參數(shù)配置和調(diào)優(yōu),Spark-SQL、Config

    一、Hive-SQL / Spark-SQL參數(shù)配置和調(diào)優(yōu) 二、shell腳本spark-submit參數(shù)配置 三、sparkSession中配置參數(shù)

    2024年02月13日
    瀏覽(21)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目錄 0. 相關(guān)文章鏈接 1.?創(chuàng)建表 1.1.?啟動spark-sql 1.2.?建表參數(shù) 1.3.?創(chuàng)建非分區(qū)表 1.4.?創(chuàng)建分區(qū)表 1.5.?在已有的hudi表上創(chuàng)建新表 1.6.?通過CTAS (Create Table As Select)建表 2.?插入數(shù)據(jù) 2.1.?向非分區(qū)表插入數(shù)據(jù) 2.2.?向分區(qū)表動態(tài)分區(qū)插入數(shù)據(jù) 2.3.?向分區(qū)表靜態(tài)分區(qū)插入數(shù)據(jù) 2.4

    2024年02月06日
    瀏覽(21)
  • Spark-SQL連接Hive的五種方法

    Spark-SQL連接Hive的五種方法

    若使用Spark內(nèi)嵌的Hive,直接使用即可,什么都不需要做(在實際生產(chǎn)活動中,很少會使用這一模式) 步驟: 將Hive中conf/下的hive-site.xml拷貝到Spark的conf/目錄下; 把Mysql的驅(qū)動copy到j(luò)ars/目錄下; 如果訪問不到hdfs,則將core-site.xml和hdfs-site.xml拷貝到conf/目錄下; 重啟spark-shell;

    2024年02月16日
    瀏覽(21)
  • spark-sql: insert overwrite分區(qū)表問題

    spark-sql: insert overwrite分區(qū)表問題

    用spark-sql,insert overwrite分區(qū)表時發(fā)現(xiàn)兩個比較麻煩的問題: 從目標(biāo)表select出來再insert overwrite目標(biāo)表時報錯:Error in query: Cannot overwrite a path that is also being read from. 從其他表select出來再insert overwrite目標(biāo)表時,其他分區(qū)都被刪除了. 印象中這兩個問題也出現(xiàn)過,但憑經(jīng)驗和感覺,

    2024年02月11日
    瀏覽(21)
  • Spark-SQL連接JDBC的方式及代碼寫法

    Spark-SQL連接JDBC的方式及代碼寫法

    提示:文章內(nèi)容僅供參考! 目錄 一、數(shù)據(jù)加載與保存 通用方式: 加載數(shù)據(jù): 保存數(shù)據(jù): 二、Parquet 加載數(shù)據(jù): 保存數(shù)據(jù): 三、JSON 四、CSV ?五、MySQL SparkSQL 提供了通用的保存數(shù)據(jù)和數(shù)據(jù)加載的方式。這里的通用指的是使用相同的API,根據(jù)不同的參數(shù)讀取和保存不同格式的

    2024年02月13日
    瀏覽(22)
  • spark-sql處理json字符串的常用函數(shù)

    整理了spark-sql處理json字符串的幾個函數(shù): 1?get_json_object 解析不含數(shù)組的 json ? 2 from_json? 解析json 3 schema_of_json?提供生成json格式的方法 4 explode? ?把JSONArray轉(zhuǎn)為多行 get_json_object(string json_string, string path) :適合最外層為{}的json解析。 ?第一個參數(shù)是json對象變量,也就是含j

    2023年04月08日
    瀏覽(17)
  • 在 spark-sql / spark-shell / hive / beeline 中粘貼 sql、程序腳本時的常見錯誤

    《大數(shù)據(jù)平臺架構(gòu)與原型實現(xiàn):數(shù)據(jù)中臺建設(shè)實戰(zhàn)》一書由博主歷時三年精心創(chuàng)作,現(xiàn)已通過知名IT圖書品牌電子工業(yè)出版社博文視點出版發(fā)行,點擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描

    2024年02月14日
    瀏覽(21)
  • spark-sql(jdbc)本地模式導(dǎo)出csv或Excel文件

    注意: 當(dāng)前excel和commons-io版本都是較較新版本,而commons-io在spark的jars安裝目錄下也在commons-io的包,如版本沖突,找不到 orgapachecommonsiooutputByteArrayOutputStream.class 。如果spark的是2.4或者更低版本,則找不到 orgapachecommonsiooutputUnsynchronizedByteArrayOutputStream.class ,請同步spa

    2024年02月02日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包