關(guān)于calcite的概念相關(guān)的內(nèi)容,在我另一篇帖子
深入理解flinksql執(zhí)行流程,擴(kuò)展解析器實(shí)現(xiàn)語法的擴(kuò)展
1 CodeGen
首先闡述一下 codegen:
Codegen是基于ObjectWeb ASM的低開銷的java代碼生成器,他可以根據(jù)預(yù)先填好的規(guī)則與條件,通過編譯代碼,自動(dòng)生成java類
在遞歸調(diào)用各個(gè)節(jié)點(diǎn) DataStreamRel 的 translateToPlan 方法時(shí),會(huì)利用CodeGen元編程成Flink的各種算子,就相當(dāng)于我們直接利用Flink的DataSet或DataStream API開發(fā)的程序。
還是以上面的Demo為例,跟蹤進(jìn) DataStreamScan 的 translateToPlan 方法中,會(huì)發(fā)現(xiàn)相關(guān)邏輯:
- 首先生成 function 代碼的字符串形式,并封裝成 GeneratedFunction 對(duì)象;
- 然后使用 CodeGen 進(jìn)行編譯;
- 在需要使用 Function 的時(shí)候使用反射進(jìn)行加載使用。
后續(xù)在 擴(kuò)展 flink語法(如join維表)時(shí),需要針對(duì)上述步驟,拼接生成 function 的字符串形式。
2 flink 語法擴(kuò)展
了解完 Flink Sql 的執(zhí)行流程之后,就可以針對(duì) Flink Sql 做語法、功能上的擴(kuò)展。
在Flink老版本上,F(xiàn)link不支持 COUNT(DISTINCT aaa) 語法,但是如果需要對(duì) Flink 做此功能拓展,需要結(jié)合 前面說到的 Flink Sql 執(zhí)行流程,做相應(yīng)修改。
修改點(diǎn):
- 在進(jìn)行 Rule 規(guī)則匹配時(shí),放開對(duì) Distinct 的限制
- DataStreamRelNode 轉(zhuǎn)為 DataStream 過程中,拼接CodeGen所需的 Function String
2.1 在進(jìn)行 Rule 規(guī)則匹配時(shí),放開對(duì) Distinct 的限制
在 DATASTREAM_OPT_RULES.DataStreamGroupWindowAggregateRule 中放開對(duì) Distinct 的限制:
2.2下面附上一個(gè) 利用codegen來生成所需類的例子:
新建一個(gè)項(xiàng)目 ,從源碼中拷貝出codegen的代碼文件夾
在配置文件中,添加好,sql的保留字,關(guān)鍵字,類的名字等信息,這里就不多說了,有需要的同學(xué)可以百度具體的原理技術(shù)
新建一個(gè)SqlUseFunction.java 這個(gè)就是上文中說到的function 代碼的字符串形式,
在flink中,就是通過 拼裝來拼裝出一個(gè)類,調(diào)用codegen來進(jìn)行編譯得到繼承抽象類SqlNode 的方法,所以在開發(fā)完的源代碼中是找不到codegen相關(guān)的東西的,但實(shí)際他是參與了工作的。
package com;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
public class SqlUseFunction extends SqlCall {
private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE FUNCTION",
SqlKind.OTHER_FUNCTION);
private final SqlIdentifier funcName;
private final SqlNodeList funcProps;
/**
* SqlUseFunction constructor.
*
* @param pos sql define location
* @param funcName function name
* @param funcProps function property
* */
public SqlUseFunction(SqlParserPos pos, SqlIdentifier funcName, SqlNodeList funcProps) {
super(pos);
this.funcName = funcName;
this.funcProps = funcProps;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("USE FUNCTION");
funcName.unparse(writer, leftPrec, rightPrec);
if (funcProps != null) {
writer.keyword("WITH");
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : funcProps) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(funcName, funcProps);
}
}
pom文件中指定好,使用fmpp技術(shù),以及codegen的地址等
<build>
<plugins>
<!-- adding fmpp code gen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- 從calcite-core.jar提取解析器語法模板,并放入在${project.build}freemarker模板所在的目錄 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.18.0</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
這里引入一張圖 說明calcite中jacc和fmpp在其中起的作用
看到codegen中的fmpp了么 就是這個(gè)fmpp
3 flink使用calcite 生成解析器FlinkSqlParserImpl
以下面這個(gè)案例出發(fā)(代碼基于 flink 1.13.1 版本):
public class ParserTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
env.fromCollection(Arrays.asList(
Tuple3.of("2", 1L, 1627254000000L),
Tuple3.of("2", 1L, 1627218000000L + 5000L),
Tuple3.of("2", 101L, 1627218000000L + 6000L),
Tuple3.of("2", 201L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L)))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
});
tEnv.registerFunction("mod", new Mod_UDF());
tEnv.registerFunction("status_mapper", new StatusMapper_UDF());
tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
"status, id, timestamp, rowtime.rowtime");
String sql = "SELECT\n"
+ " count(1),\n"
+ " cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n"
+ "FROM\n"
+ " source_db.source_table\n"
+ "GROUP BY\n"
+ " tumble(rowtime, INTERVAL '1' DAY)";
Table result = tEnv.sqlQuery(sql);
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}
debug 過程如之前分析 sql -> SqlNode 過程所示,如下圖直接定位到 SqlParser:
如上圖可以看到具體的 Parser 就是 FlinkSqlParserImpl。
定位到具體的代碼如下圖所示(flink-table-palnner-blink-2.11-1.13.1.jar)。
最終 parse 的結(jié)果 SqlNode 如下圖。
再來看看 FlinkSqlParserImpl 是怎么使用 calcite 生成的。
具體到 flink 中的實(shí)現(xiàn),位于源碼中的 flink-table.flink-sql-parser 模塊(源碼基于 flink 1.13.1)。
flink 是依賴 maven 插件實(shí)現(xiàn)的上面的整體流程。
3.1 FlinkSqlParserImpl 的生成
接下來看看整個(gè) Parser 生成流程。
3.1.1 flink 引入 calcite
使用 maven-dependency-plugin 將 calcite 解壓到 flink 項(xiàng)目 build 目錄下。
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
3.1.2 fmpp 生成 Parser.jj
使用 maven-resources-plugin 將 Parser.jj 代碼生成。
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
3.1.3 javacc 生成 parser
使用 javacc 將根據(jù) Parser.jj 文件生成 Parser。
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
3.1.4 看看 Parser
最終生成的 Parser 就是 FlinkSqlParserImpl。
3.1.5 blink planner 引入 flink-sql-parser
blink planner(flink-table-planner-blink) 在打包時(shí)將 flink-sql-parser、flink-sql-parser-hive 打包進(jìn)去。
4 calcite 規(guī)則優(yōu)化器
4.1 什么是查詢優(yōu)化器
查詢優(yōu)化器是傳統(tǒng)數(shù)據(jù)庫的核心模塊,也是大數(shù)據(jù)計(jì)算引擎的核心模塊,開源大數(shù)據(jù)引擎如 Impala、Presto、Drill、HAWQ、 Spark、Hive 等都有自己的查詢優(yōu)化器。Calcite 就是從 Hive 的優(yōu)化器演化而來的。
優(yōu)化器的作用:將解析器生成的關(guān)系代數(shù)表達(dá)式轉(zhuǎn)換成執(zhí)行計(jì)劃,供執(zhí)行引擎執(zhí)行,在這個(gè)過程中,會(huì)應(yīng)用一些規(guī)則優(yōu)化,以幫助生成更高效的執(zhí)行計(jì)劃。
4.2 基于規(guī)則優(yōu)化(RBO)
基于規(guī)則的優(yōu)化器(Rule-Based Optimizer,RBO):根據(jù)優(yōu)化規(guī)則對(duì)關(guān)系表達(dá)式進(jìn)行轉(zhuǎn)換,這里的轉(zhuǎn)換是說一個(gè)關(guān)系表達(dá)式經(jīng)過優(yōu)化規(guī)則后會(huì)變成另外一個(gè)關(guān)系表達(dá)式,同時(shí)原有表達(dá)式會(huì)被裁剪掉,經(jīng)過一系列轉(zhuǎn)換后生成最終的執(zhí)行計(jì)劃。
RBO 中包含了一套有著嚴(yán)格順序的優(yōu)化規(guī)則,同樣一條 SQL,無論讀取的表中數(shù)據(jù)是怎么樣的,最后生成的執(zhí)行計(jì)劃都是一樣的。同時(shí),在 RBO 中 SQL 寫法的不同很有可能影響最終的執(zhí)行計(jì)劃,從而影響執(zhí)行計(jì)劃的性能。
4.3 基于成本優(yōu)化(CBO)
基于代價(jià)的優(yōu)化器(Cost-Based Optimizer,CBO):根據(jù)優(yōu)化規(guī)則對(duì)關(guān)系表達(dá)式進(jìn)行轉(zhuǎn)換,這里的轉(zhuǎn)換是說一個(gè)關(guān)系表達(dá)式經(jīng)過優(yōu)化規(guī)則后會(huì)生成另外一個(gè)關(guān)系表達(dá)式,同時(shí)原有表達(dá)式也會(huì)保留,經(jīng)過一系列轉(zhuǎn)換后會(huì)生成多個(gè)執(zhí)行計(jì)劃,然后 CBO 會(huì)根據(jù)統(tǒng)計(jì)信息和代價(jià)模型 (Cost Model) 計(jì)算每個(gè)執(zhí)行計(jì)劃的 Cost,從中挑選 Cost 最小的執(zhí)行計(jì)劃。
由上可知,CBO 中有兩個(gè)依賴:統(tǒng)計(jì)信息和代價(jià)模型。統(tǒng)計(jì)信息的準(zhǔn)確與否、代價(jià)模型的合理與否都會(huì)影響 CBO 選擇最優(yōu)計(jì)劃。 從上述描述可知,CBO 是優(yōu)于 RBO 的,原因是 RBO 是一種只認(rèn)規(guī)則,對(duì)數(shù)據(jù)不敏感的呆板的優(yōu)化器,而在實(shí)際過程中,數(shù)據(jù)往往是有變化的,通過 RBO 生成的執(zhí)行計(jì)劃很有可能不是最優(yōu)的。事實(shí)上目前各大數(shù)據(jù)庫和大數(shù)據(jù)計(jì)算引擎都傾向于使用 CBO,但是對(duì)于流式計(jì)算引擎來說,使用 CBO 還是有很大難度的,因?yàn)椴⒉荒芴崆邦A(yù)知數(shù)據(jù)量等信息,這會(huì)極大地影響優(yōu)化效果,CBO 主要還是應(yīng)用在離線的場(chǎng)景。
4.4 優(yōu)化規(guī)則
無論是 RBO,還是 CBO 都包含了一系列優(yōu)化規(guī)則,這些優(yōu)化規(guī)則可以對(duì)關(guān)系表達(dá)式進(jìn)行等價(jià)轉(zhuǎn)換,常見的優(yōu)化規(guī)則包含:
- 謂詞下推 Predicate Pushdown
- 常量折疊 Constant Folding
- 列裁剪 Column Pruning
- 其他
在 Calcite 的代碼里,有一個(gè)測(cè)試類(org.apache.calcite.test.RelOptRulesTest)匯集了對(duì)目前內(nèi)置所有 Rules 的測(cè)試 case,這個(gè)測(cè)試類可以方便我們了解各個(gè) Rule 的作用。在這里有下面一條 SQL,通過這條語句來說明一下上面介紹的這三種規(guī)則。
select 10 + 30, users.name, users.age
from users join jobs on users.id= user.id
where users.age > 30 and jobs.id>10
4.4.1 謂詞下推(Predicate Pushdown)
關(guān)于謂詞下推,它主要還是從關(guān)系型數(shù)據(jù)庫借鑒而來,關(guān)系型數(shù)據(jù)中將謂詞下推到外部數(shù)據(jù)庫用以減少數(shù)據(jù)傳輸;屬于邏輯優(yōu)化,優(yōu)化器將謂詞過濾下推到數(shù)據(jù)源,使物理執(zhí)行跳過無關(guān)數(shù)據(jù)。最常見的例子就是 join 與 filter 操作一起出現(xiàn)時(shí),提前執(zhí)行 filter 操作以減少處理的數(shù)據(jù)量,將 filter 操作下推,以上面例子為例,示意圖如下(對(duì)應(yīng) Calcite 中的 FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN Rule):
在進(jìn)行 join 前進(jìn)行相應(yīng)的過濾操作,可以極大地減少參加 join 的數(shù)據(jù)量。
4.4.2 常量折疊(Constant Folding)
常量折疊也是常見的優(yōu)化策略,這個(gè)比較簡(jiǎn)單、也很好理解,可以看下 編譯器優(yōu)化 – 常量折疊 這篇文章,基本不用動(dòng)腦筋就能理解,對(duì)于我們這里的示例,有一個(gè)常量表達(dá)式 10 + 30,如果不進(jìn)行常量折疊,那么每行數(shù)據(jù)都需要進(jìn)行計(jì)算,進(jìn)行常量折疊后的結(jié)果如下圖所示( 對(duì)應(yīng) Calcite 中的 ReduceExpressionsRule.PROJECT_INSTANCE Rule):
4.4.3 列裁剪(Column Pruning)
列裁剪也是一個(gè)經(jīng)典的優(yōu)化規(guī)則,在本示例中對(duì)于jobs 表來說,并不需要掃描它的所有列值,而只需要列值 id,所以在掃描 jobs 之后需要將其他列進(jìn)行裁剪,只留下列 id。這個(gè)優(yōu)化帶來的好處很明顯,大幅度減少了網(wǎng)絡(luò) IO、內(nèi)存數(shù)據(jù)量的消耗。裁剪前后的示意圖如下(不過并沒有找到 Calcite 對(duì)應(yīng)的 Rule):
如果想自定義rule
可以百度 calcite 自定義rule 來實(shí)現(xiàn)對(duì)flinksql的 語法優(yōu)化
擴(kuò)展資料
Apache Calcite的優(yōu)化器規(guī)則解析
Flink Sql 之 Calcite Volcano優(yōu)化器(源碼解析)
calcite
calcite 規(guī)則優(yōu)化開發(fā)文章來源:http://www.zghlxwxcb.cn/news/detail-835917.html
所有代碼都在我的git上,需要的同學(xué)可以自取,如果找不到可以私信我文章來源地址http://www.zghlxwxcb.cn/news/detail-835917.html
到了這里,關(guān)于calcite在flink中的二次開發(fā),介紹解析器與優(yōu)化器的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!