spark-sql字段血緣實現(xiàn)
背景
Apache Spark是一個開源的大數(shù)據(jù)處理框架,它提供了一種高效、易于使用的方式來處理大規(guī)模數(shù)據(jù)集。在Spark中,數(shù)據(jù)是通過DataFrame和Dataset的形式進(jìn)行操作的,這些數(shù)據(jù)結(jié)構(gòu)包含了一系列的字段(也稱為列)。字段血緣是Spark中的一個關(guān)鍵概念,它幫助我們理解數(shù)據(jù)的來源和流向,從而更好地理解和控制數(shù)據(jù)處理過程。
字段血緣是指在數(shù)據(jù)處理過程中,一個字段的值是如何從源數(shù)據(jù)產(chǎn)生并傳遞給目標(biāo)數(shù)據(jù)的。在Spark中,字段血緣是通過依賴關(guān)系進(jìn)行管理的。每個字段都有一個或多個依賴關(guān)系,這些依賴關(guān)系定義了字段的值如何從其他字段或數(shù)據(jù)源產(chǎn)生。
前提
spark版本:2.4.3
使用語言:java+scala
技術(shù)實現(xiàn)
1. spark-sql的執(zhí)行計劃,了解如何實現(xiàn)字段血緣解析
一個sql會經(jīng)歷一些列的處理,最終生成spark-core的代碼,提交到集群運行。
首先看一下一個簡單的sql生成的邏輯執(zhí)行計劃長什么樣子
insert into default.jy_test
select * from default.jy_test
未解析的邏輯執(zhí)行計劃:
'InsertIntoTable 'UnresolvedRelation `default`.`jy_test`, false, false
+- 'Project [*]
+- 'UnresolvedRelation `default`.`jy_test`
解析后(analyzer)的邏輯執(zhí)行計劃:
InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- Project [id#0, name#1]
+- SubqueryAlias `default`.`jy_test`
+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]
優(yōu)化后(optimizer)的邏輯執(zhí)行計劃:
InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]
重點來了
- 所謂的邏輯執(zhí)行計劃就是一個樹形結(jié)構(gòu)
- 樹形結(jié)構(gòu)中的葉子結(jié)點,就是hive的表信息:庫名、表名、字段信息,并且spark給每一個字段生成了一個唯一的id
- 樹形結(jié)構(gòu)中的非葉子只包含了字段信息,不包含庫表信息
所以想要實現(xiàn)字段血緣,我們需要做的就是通過那個生成的唯一id去一層層的關(guān)聯(lián),當(dāng)關(guān)聯(lián)到葉子結(jié)點的時候,就找到了庫名表名
2. 構(gòu)建一顆與解析后的邏輯執(zhí)行計劃一模一樣的樹形結(jié)構(gòu)
- 首先定義node對象,用來存放節(jié)點信息
public abstract class Node {
private String name;
private List<Column> columnList = new ArrayList<>();
private List<Node> children = new ArrayList<>();
private Node parentNode;
private String graphId;
}
- 其次定義column對象,用來存放字段信息
public class Column {
private String name;
private Long exprId;
private String ColumnType;
private ArrayList<Column> child = new ArrayList<Column>();
private String tableName;
private String process;
}
-
根據(jù)spark-sql生成的邏輯執(zhí)行計劃,我們?yōu)槊恳粋€邏輯節(jié)點創(chuàng)建對應(yīng)的結(jié)點,由于結(jié)點很多,我這里直接給個截圖,源碼會在文章最后提供出來
-
解析spark-sql生成的解析后的邏輯執(zhí)行計劃
首先獲取邏輯執(zhí)行計劃,這里提供兩種方式:
1.通過spark-session獲取,該方法可以用來做測試,非常的方便LogicalPlan logicalPlan = spark.sessionState().sqlParser().parsePlan(sql); LogicalPlan analyzer = spark.sessionState().analyzer().execute(logicalPlan);
2.通過QueryExecution獲得,這里貼個圖,詳情看源碼
-
解析spark生成的analyzer,構(gòu)建我們自己的樹形結(jié)構(gòu)
這里貼一下主要的邏輯,使用scala去遞歸解析抽象語法樹會方便很多
def resolveLogicPlan(plan: LogicalPlan, root: Node): Unit = {
plan match {
case plan: InsertIntoHadoopFsRelationCommand =>
val node = root.asInstanceOf[Root]
node.setName(NodeType.INSERTINTOHIVETABLE.getName)
val database: String = plan.catalogTable.get.identifier.database.getOrElse("default")
val table: String = plan.catalogTable.get.identifier.table
val fullTableName = database + "." + table
plan.catalogTable.get.schema.foreach { field => {
val column = new Column()
column.setName(field.name)
column.setTableName(fullTableName)
node.getColumnList.add(column)
}
}
resolveLogicPlan(plan.query, node)
case plan: SaveIntoDataSourceCommand =>
val table: String = plan.options.get("dbtable").getOrElse("")
val url: String = plan.options.get("url").getOrElse("")
val user: String = plan.options.get("user").getOrElse("")
val password: String = plan.options.get("password").getOrElse("")
// 定義匹配數(shù)據(jù)庫名稱的正則表達(dá)式模式
val pattern: Regex = ".*://[^/]+/(\\w+)".r
// 使用正則表達(dá)式進(jìn)行匹配
val dbNameOption: Option[String] = pattern.findFirstMatchIn(url).map(_.group(1))
val fullTableName = dbNameOption.getOrElse("") + "." + table
val node = root.asInstanceOf[Root]
node.setName(NodeType.SAVEINTODATASOURCECOMMAND.getName)
// 連接mysql,根據(jù)庫名表明獲取字段列表
val fieldsList = getFieldsListFromMysql(url, user, password, table)
fieldsList.foreach { field => {
val column = new Column()
column.setName(field)
column.setTableName(fullTableName)
node.getColumnList.add(column)
}
}
resolveLogicPlan(plan.query, node)
case plan: InsertIntoHiveTable =>
val node = root.asInstanceOf[Root]
node.setName(NodeType.INSERTINTOHIVETABLE.getName)
val database: String = plan.table.identifier.database.getOrElse("default")
val table: String = plan.table.identifier.table
val fullTableName = database + "." + table
node.setTableName(fullTableName)
plan.table.schema.foreach { field => {
val column = new Column()
column.setName(field.name)
column.setTableName(fullTableName)
node.getColumnList.add(column)
}
}
resolveLogicPlan(plan.query, node)
case plan: Aggregate =>
val node = new AggregateNode()
insertNodeColumnsFromNamedExpression(node, plan.aggregateExpressions)
node.setParentNode(root)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case plan: Project =>
val node = new ProjectNode()
insertNodeColumnsFromNamedExpression(node, plan.projectList)
node.setParentNode(root)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case plan: LogicalRelation =>
val node = new LogicalRelationNode()
dfsLogicalRelation(plan, node)
node.setParentNode(root)
root.getChildren.add(node)
case plan: HiveTableRelation =>
val node = new LogicalRelationNode()
dfsLogicalRelation(plan, node)
node.setParentNode(root)
root.getChildren.add(node)
case plan: Filter =>
val node = new FilterNode()
node.setParentNode(root)
node.setCondition(plan.condition.toString)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case plan: Join =>
val node = new JoinNode()
node.setName(plan.joinType.toString + " " + node.getName)
node.setParentNode(root)
node.setCondition(plan.condition.toString)
root.getChildren.add(node)
resolveLogicPlan(plan.left, node)
resolveLogicPlan(plan.right, node)
case plan: Window =>
val node = new WindowNode()
insertNodeColumnsFromNamedExpression(node, plan.windowExpressions)
node.setParentNode(root)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case plan: Union =>
val node = new UnionNode()
node.setParentNode(root)
root.getChildren.add(node)
plan.children.foreach(resolveLogicPlan(_, node))
case plan: SubqueryAlias =>
val node = new SubqueryNode()
node.setName(node.getName + " " + plan.name.toString())
node.setParentNode(root)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case plan: Generate =>
val node = new GenerateNode()
processGenerate(plan, node)
node.setParentNode(root)
root.getChildren.add(node)
resolveLogicPlan(plan.child, node)
case _ =>
plan.children.foreach(resolveLogicPlan(_, root))
}
}
-
到這里,我們已經(jīng)得到了自己的樹形結(jié)構(gòu)。接下來要通過唯一id進(jìn)行關(guān)聯(lián),補充庫表信息。
不知道大家注意沒有,我們在node對象中有一個方法,
這里用到了訪問者設(shè)計模式,感興趣的同學(xué)可以學(xué)習(xí)一下,在spark-sql源碼中,同樣用的是訪問者設(shè)計模式。
這里主要說一下Visitor的定義及方法:processColumn方法主要是拿自己的ExprId和所有孩子結(jié)點的ExprId比較,如果相等的話,說明是同一個字段,那就表名復(fù)制過來。
public interface Visitor {
void visit(Node node);
default void processColumn(Node node) {
for (Column column1 : node.getColumnList()) {
for (Node nd : node.getChildren()) {
for (Column column2 : nd.getColumnList()) {
processColumn(column1, column2);
}
}
}
}
default void processColumn(Column column1, Column column2) {
List<Column> child = column1.getChild();
child.forEach(ch -> processColumn(ch, column2));
if (column1.getExprId().equals(column2.getExprId())) {
if(column2.getTableName() != null) {
column1.setTableName(column2.getTableName());
} else {
column1.getChild().addAll(column2.getChild());
}
}
}
}
LineageVisitor是Visitor的實現(xiàn)類, 主要用來做模式匹配,不同的結(jié)點處理方式會有不同,感興趣的同學(xué)看一下這塊的代碼。
public class LineageVisitor implements Visitor{
@Override
public void visit(Node node) {
switch (node.getClass().getSimpleName()) {
case "FilterNode" :
case "SubqueryNode" :
case "JoinNode" : copyChildColumnToThis(node); break;
case "WindowNode" :
case "GenerateNode" : copyChildColumnToThisWithProcess(node); break;
case "Root" : processColumn((Root)node); break;
case "UnionNode" : processColumn((UnionNode)node); break;
default : processColumn(node);
}
}
@Override
public void processColumn(Node node) {
node.getChildren().forEach( child -> child.accept(this));
Visitor.super.processColumn(node);
}
public void copyChildColumnToThis(Node node) {
node.getChildren().forEach( child -> child.accept(this));
for (Node child : node.getChildren()) {
node.getColumnList().addAll(child.getColumnList());
}
}
public void copyChildColumnToThisWithProcess(Node node) {
node.getChildren().forEach( child -> child.accept(this));
Visitor.super.processColumn(node);
for (Node child : node.getChildren()) {
node.getColumnList().addAll(child.getColumnList());
}
}
public void processColumn(Root node) {
node.getChildren().forEach( child -> child.accept(this));
if(node.getColumnList().size() > 0) {
for (int i = 0; i < node.getChildren().get(0).getColumnList().size(); i++) {
for (Node child : node.getChildren()) {
node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
}
}
}
}
public void processColumn(UnionNode node) {
node.getChildren().forEach( child -> child.accept(this));
int size = node.getChildren().get(0).getColumnList().size();
for (Column column : node.getChildren().get(0).getColumnList()) {
Column column1 = new Column();
column1.setName(column.getName());
column1.setExprId(column.getExprId());
node.getColumnList().add(column1);
}
for (int i = 0; i < size; i++) {
for (Node child : node.getChildren()) {
node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
}
}
}
}
成果展示
還是拿最開始的sql ,看一下最終生成的字段血緣
insert into default.jy_test select * from default.jy_test
文章來源:http://www.zghlxwxcb.cn/news/detail-786451.html
最后
字段血緣實現(xiàn)起來還是比較困難的,需要了解spak-sql的底層原理和一些技巧。
這里方便大家使用、學(xué)習(xí)、交流,所以貢獻(xiàn)自己的源碼,倉庫地址:https://gitee.com/chenxiaoliang0901/crock/tree/main文章來源地址http://www.zghlxwxcb.cn/news/detail-786451.html
到了這里,關(guān)于spark-sql字段血緣實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!