一、前言
本文基于flink-1.13.6
SQL Client: Init scripts and Statement Sets
這個(gè)版本極大地改進(jìn)了 SQL 客戶端的功能。現(xiàn)在 SQL Client 和 SQL 腳本都支持 通過Java 應(yīng)用程序執(zhí)行的幾乎所有操作(從 TableEnvironment 以編程方式啟動(dòng)查詢)。這意味著 SQL 用戶在 SQL 部署中需要的代碼少了很多。其中最核心的功能就是支持了 -i
命令用來初始化腳本,-f
命令用來執(zhí)行 SQL 語句,之前的 YAML 文件這個(gè)版本不再支持了,相反更多的是通過 SQL 腳本的方式來配置會(huì)話和提交任務(wù).
類似于下面這種方式:
sql-client.sh -i init.sql -f test.sql
1.1、 -i 初始化 SQL Client
SET execution.runtime-mode=batch;
SET sql-client.execution.result-mode=TABLEAU;
SET pipeline.name=batch_demo
init.sql 初始化腳本文件支持的功能還非常多,我這里就簡單的設(shè)置了幾個(gè),更多的屬性可以參考官網(wǎng).
使用 -i <init.sql> 選項(xiàng)初始化 SQL Client 會(huì)話時(shí),初始化 SQL 文件中允許以下語句:
DDL(CREATE/DROP/ALTER),
USE CATALOG/DATABASE,
LOAD/UNLOAD MODULE,
SET command,
RESET command.
1.2、-f SQL腳本
create table rate_history (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/ratesHistory.csv',
'format.type' = 'csv'
);
CREATE TABLE printb
(
num bigint
)
WITH ('connector' = 'print');
-- 兩條sql語句
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;
執(zhí)行:
./bin/sql-client.sh -i test/init.sql -f test/batch.sql
查看flink web 頁面發(fā)現(xiàn)兩個(gè)job
SQL Client 將每個(gè) INSERT INTO 語句作為單個(gè) Flink 作業(yè)執(zhí)行。但是,由于管道的某些部分可以重復(fù)使用,因此有時(shí)不是最佳選擇。
SQL Client 支持 STATEMENT SET 語法來執(zhí)行一組 SQL 語句。這是 Table API 中StatementSet 的等效功能。STATEMENT SET 語法包含一個(gè)或多個(gè) INSERT INTO 語句。全面優(yōu)化了STATEMENT SET 塊中的所有語句,并將其作為單個(gè) Flink 作業(yè)執(zhí)行。聯(lián)合優(yōu)化和執(zhí)行允許重用常見的中間結(jié)果,因此可以顯著提高執(zhí)行多個(gè)查詢的效率。
STATEMENT SET 的語法格式如下:
BEGIN STATEMENT SET;
-- one or more INSERT INTO statements
{ INSERT INTO|OVERWRITE <select_statement>; }+
END;
-- 修改上面的sql腳本
-- 兩條sql語句
BEGIN STATEMENT SET;
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;
END;
.接下來就來看一下底層源碼是怎么實(shí)現(xiàn)的.
二、源碼分析
2.1、從sql-client.sh 找到執(zhí)行的入口類是 org.apache.flink.table.client.SqlClient
然后來看下 SqlClient 對(duì)象屬性源碼如下:
public class SqlClient {
// 標(biāo)記是否是 embedded 模式
private final boolean isEmbedded;
// 提交命令選項(xiàng)
private final CliOptions options;
// 用來返回結(jié)果的
private final Supplier<Terminal> terminalFactory;
// 目前只支持 embedded
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";
// ...
}
2.2、接著來看 SqlClient 的 main 方法,也就是程序的入口
main 方法里面調(diào)用的是 startClient 方法,所以直接來看 startClient 方法的源碼:
@VisibleForTesting
protected static void startClient(String[] args, Supplier<Terminal> terminalFactory) {
final String mode;
final String[] modeArgs;
// 設(shè)置啟動(dòng)模式默認(rèn)是 embedded
if (args.length < 1 || args[0].startsWith("-")) {
// mode is not specified, use the default `embedded` mode
mode = MODE_EMBEDDED;
modeArgs = args;
} else {
// mode is specified, extract the mode value and reaming args
mode = args[0];
// remove mode
modeArgs = Arrays.copyOfRange(args, 1, args.length);
}
switch (mode) {
case MODE_EMBEDDED:
// 解析提交命令里的參數(shù)
final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
// 打印參數(shù)說明
if (options.isPrintHelp()) {
CliOptionsParser.printHelpEmbeddedModeClient();
} else {
try {
// 構(gòu)建 SqlClient 對(duì)象
final SqlClient client = new SqlClient(true, options, terminalFactory);
client.start();
} catch (SqlClientException e) {
//...
}
}
break;
case MODE_GATEWAY:
// gateway 模式暫時(shí)不支持
throw new SqlClientException("Gateway mode is not supported yet.");
default:
CliOptionsParser.printHelpClient();
}
}
2.2.1、解析參數(shù)
調(diào)用 parseEmbeddedModeClient 方法解析提交命令里面的各種參數(shù).包括我們上面用到的 -i 和 -f 都是在這一步解析并賦值的.
public static CliOptions parseEmbeddedModeClient(String[] args) {
try {
DefaultParser parser = new DefaultParser();
CommandLine line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true);
return new CliOptions(
line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
checkSessionId(line),
// 解析 -i 初始化文件
checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
// 解析 -f sql腳本
checkUrl(line, CliOptionsParser.OPTION_FILE),
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
getPythonConfiguration(line));
} catch (ParseException e) {
throw new SqlClientException(e.getMessage());
}
}
public static final Option OPTION_INIT_FILE =
Option.builder("i")
.required(false)
.longOpt("init")
.numberOfArgs(1)
.argName("initialization file")
.desc(
"Script file that used to init the session context. "
+ "If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file.")
.build();
public static final Option OPTION_FILE =
Option.builder("f")
.required(false)
.longOpt("file")
.numberOfArgs(1)
.argName("script file")
.desc(
"Script file that should be executed. In this mode, "
+ "the client will not open an interactive terminal.")
.build();
2.2.2、構(gòu)建 SqlClient
final SqlClient client = new SqlClient(true, options, terminalFactory);
2.2.3、啟動(dòng) SqlClient
client.start();
private void start() {
if (isEmbedded) {
// create local executor with default environment
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
// 創(chuàng)建一個(gè) LocalExecutor 對(duì)象,用于本地執(zhí)行程序
final Executor executor = new LocalExecutor(defaultContext);
executor.start();
// Open an new session
String sessionId = executor.openSession(options.getSessionId());
try {
// add shutdown hook
Runtime.getRuntime()
.addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
// do the actual work 真正執(zhí)行 SQL 的地方
openCli(sessionId, executor);
} finally {
executor.closeSession(sessionId);
}
}
else {
throw new SqlClientException("Gateway mode is not supported yet.");
}
}
2.2.4、真正執(zhí)行 SQL 的地方是 openCli 方法
/**
* Opens the CLI client for executing SQL statements.
*
* @param sessionId session identifier for the current client.
* @param executor executor
*/
private void openCli(String sessionId, Executor executor) {
Path historyFilePath;
if (options.getHistoryFilePath() != null) {
historyFilePath = Paths.get(options.getHistoryFilePath());
} else {
historyFilePath =
Paths.get(
System.getProperty("user.home"),
SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
}
boolean hasSqlFile = options.getSqlFile() != null;
boolean hasUpdateStatement = options.getUpdateStatement() != null;
if (hasSqlFile && hasUpdateStatement) {
throw new IllegalArgumentException(//...
}
try (CliClient cli = new CliClient(terminalFactory, sessionId, executor, historyFilePath)) {
// 執(zhí)行初始化 SQL -i 參數(shù)
if (options.getInitFile() != null) {
boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));
if (!success) { // ...}
}
if (!hasSqlFile && !hasUpdateStatement) {
cli.executeInInteractiveMode();
} else {
// 執(zhí)行真正的 SQL 文件 -f
cli.executeInNonInteractiveMode(readExecutionContent());
}
}
}
這個(gè)里面會(huì)先獲取 historyFilePath 的路徑,然后判斷是否存在 -i -f 這兩個(gè)文件,如果有的話會(huì)先調(diào)用 executeInitialization 執(zhí)行初始化的腳本.實(shí)際調(diào)用的是 executeInitialization#executeFile 方法來執(zhí)行腳本,executeFile 的源碼如下:
private boolean executeFile(String content, ExecutionMode mode) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EXECUTE_FILE).toAnsi());
for (String statement : CliStatementSplitter.splitContent(content)) {
terminal.writer()
.println(
new AttributedString(String.format("%s%s", prompt, statement))
.toString());
terminal.flush();
// 執(zhí)行
if (!executeStatement(statement, mode)) {
// cancel execution when meet error or ctrl + C;
return false;
}
}
return true;
}
其實(shí)不管是 -i 還是 -f 最終都會(huì)調(diào)用 executeFile 這個(gè)方法去解析腳本里的內(nèi)容并且執(zhí)行,這里方法里面先調(diào)用 splitContent 方法去做解析.
public static List<String> splitContent(String content) {
List<String> statements = new ArrayList<>();
List<String> buffer = new ArrayList<>();
for (String line : content.split("\n")) {
if (isEndOfStatement(line)) {
buffer.add(line);
statements.add(String.join("\n", buffer));
buffer.clear();
} else {
buffer.add(line);
}
}
if (!buffer.isEmpty()) {
statements.add(String.join("\n", buffer));
}
return statements;
}
private static boolean isEndOfStatement(String line) {
return line.replaceAll(MASK, "").trim().endsWith(";");
}
其實(shí)就是一行一行的讀取初始化腳本和 SQL 腳本里面的內(nèi)容,然后放到一個(gè) List 里面.然后循環(huán)這個(gè) List 調(diào)用 executeStatement 方法去執(zhí)行 SQL 腳本.
// 執(zhí)行 SQL 腳本.
private boolean executeStatement(String statement, ExecutionMode executionMode) {
try {
final Optional<Operation> operation = parseCommand(statement);
operation.ifPresent(op -> callOperation(op, executionMode));
} catch (SqlExecutionException e) {
printExecutionException(e);
return false;
}
return true;
}
執(zhí)行之前會(huì)先對(duì) SQL 做一個(gè)清洗,具體邏輯在 parseCommand 方法中.
// 其實(shí)就是把 SQL 后面的 ; 去掉,并在遇到 bad case 的時(shí)候返回空.然后調(diào)用 parseStatement 方法將 SQL 語句解析成 Operation,后面的過程就跟 Flink SQL 翻譯成代碼的過程差不多.就不在往后面跟了.
private Optional<Operation> parseCommand(String stmt) {
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";")) {
stmt = stmt.substring(0, stmt.length() - 1).trim();
}
// meet bad case, e.g ";\n"
if (stmt.trim().isEmpty()) {
return Optional.empty();
}
Operation operation = executor.parseStatement(sessionId, stmt);
return Optional.of(operation);
}
-f 參數(shù)調(diào)用的是 executeInNonInteractiveMode 方法,實(shí)際也會(huì)調(diào)用 executeFile 方法,跟 -i 的執(zhí)行邏輯是一樣的.這里就不再分析了.文章來源:http://www.zghlxwxcb.cn/news/detail-426518.html
另外當(dāng)前的 SQL Client 僅支持嵌入式模式(也就是 embedded 模式)。將來,社區(qū)計(jì)劃通過提供基于 REST 的SQL 客戶端網(wǎng)關(guān)來擴(kuò)展其功能,有關(guān)更多信息,請(qǐng)參見 FLIP-24 和 FLIP-91。文章來源地址http://www.zghlxwxcb.cn/news/detail-426518.html
到了這里,關(guān)于FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!