国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

這篇具有很好參考價(jià)值的文章主要介紹了FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、前言

本文基于flink-1.13.6

SQL Client: Init scripts and Statement Sets

這個(gè)版本極大地改進(jìn)了 SQL 客戶端的功能。現(xiàn)在 SQL Client 和 SQL 腳本都支持 通過Java 應(yīng)用程序執(zhí)行的幾乎所有操作(從 TableEnvironment 以編程方式啟動(dòng)查詢)。這意味著 SQL 用戶在 SQL 部署中需要的代碼少了很多。其中最核心的功能就是支持了 -i 命令用來初始化腳本,-f 命令用來執(zhí)行 SQL 語句之前的 YAML 文件這個(gè)版本不再支持了,相反更多的是通過 SQL 腳本的方式來配置會(huì)話和提交任務(wù).

類似于下面這種方式:

sql-client.sh -i init.sql -f test.sql

1.1、 -i 初始化 SQL Client

SET execution.runtime-mode=batch;
SET sql-client.execution.result-mode=TABLEAU;
SET pipeline.name=batch_demo

init.sql 初始化腳本文件支持的功能還非常多,我這里就簡單的設(shè)置了幾個(gè),更多的屬性可以參考官網(wǎng).

使用 -i <init.sql> 選項(xiàng)初始化 SQL Client 會(huì)話時(shí),初始化 SQL 文件中允許以下語句:

DDL(CREATE/DROP/ALTER),
USE CATALOG/DATABASE,
LOAD/UNLOAD MODULE,
SET command,
RESET command.

1.2、-f SQL腳本

create table rate_history ( 
 currency STRING, 
 conversion_rate DECIMAL(32, 2), 
 update_time TIMESTAMP(3),
 WATERMARK FOR update_time AS update_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/ratesHistory.csv', 
 'format.type' = 'csv' 
);

CREATE TABLE printb
(
  num bigint
)
WITH ('connector' = 'print');

-- 兩條sql語句
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;

執(zhí)行:

./bin/sql-client.sh -i test/init.sql -f test/batch.sql 

FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6
查看flink web 頁面發(fā)現(xiàn)兩個(gè)job
FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

SQL Client 將每個(gè) INSERT INTO 語句作為單個(gè) Flink 作業(yè)執(zhí)行。但是,由于管道的某些部分可以重復(fù)使用,因此有時(shí)不是最佳選擇。

SQL Client 支持 STATEMENT SET 語法來執(zhí)行一組 SQL 語句。這是 Table API 中StatementSet 的等效功能。STATEMENT SET 語法包含一個(gè)或多個(gè) INSERT INTO 語句。全面優(yōu)化了STATEMENT SET 塊中的所有語句,并將其作為單個(gè) Flink 作業(yè)執(zhí)行。聯(lián)合優(yōu)化和執(zhí)行允許重用常見的中間結(jié)果,因此可以顯著提高執(zhí)行多個(gè)查詢的效率。

STATEMENT SET 的語法格式如下:

BEGIN STATEMENT SET;
  -- one or more INSERT INTO statements
  { INSERT INTO|OVERWRITE <select_statement>; }+
END;

-- 修改上面的sql腳本
-- 兩條sql語句
BEGIN STATEMENT SET;
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;
END;

FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

.接下來就來看一下底層源碼是怎么實(shí)現(xiàn)的.

二、源碼分析

2.1、從sql-client.sh 找到執(zhí)行的入口類是 org.apache.flink.table.client.SqlClient

然后來看下 SqlClient 對(duì)象屬性源碼如下:

public class SqlClient { 
    // 標(biāo)記是否是 embedded 模式
    private final boolean isEmbedded;
    // 提交命令選項(xiàng)
    private final CliOptions options;
    // 用來返回結(jié)果的
    private final Supplier<Terminal> terminalFactory;
    // 目前只支持 embedded
    public static final String MODE_EMBEDDED = "embedded";
    public static final String MODE_GATEWAY = "gateway";
	// ...
}

2.2、接著來看 SqlClient 的 main 方法,也就是程序的入口

main 方法里面調(diào)用的是 startClient 方法,所以直接來看 startClient 方法的源碼:

@VisibleForTesting
protected static void startClient(String[] args, Supplier<Terminal> terminalFactory) {
    final String mode;
    final String[] modeArgs;
    // 設(shè)置啟動(dòng)模式默認(rèn)是 embedded
    if (args.length < 1 || args[0].startsWith("-")) {
        // mode is not specified, use the default `embedded` mode
        mode = MODE_EMBEDDED;
        modeArgs = args;
    } else {
        // mode is specified, extract the mode value and reaming args
        mode = args[0];
        // remove mode
        modeArgs = Arrays.copyOfRange(args, 1, args.length);
    }
    
    switch (mode) {
        case MODE_EMBEDDED:
            // 解析提交命令里的參數(shù)
            final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
            // 打印參數(shù)說明
            if (options.isPrintHelp()) {
                CliOptionsParser.printHelpEmbeddedModeClient();
            } else {
                try {
                    // 構(gòu)建 SqlClient 對(duì)象
                    final SqlClient client = new SqlClient(true, options, terminalFactory);
                    client.start();
                } catch (SqlClientException e) {
                    //...
                }
            }
            break;
        case MODE_GATEWAY:
        	// gateway 模式暫時(shí)不支持
            throw new SqlClientException("Gateway mode is not supported yet.");
        default:
            CliOptionsParser.printHelpClient();
    }
}

2.2.1、解析參數(shù)

調(diào)用 parseEmbeddedModeClient 方法解析提交命令里面的各種參數(shù).包括我們上面用到的 -i 和 -f 都是在這一步解析并賦值的.

    public static CliOptions parseEmbeddedModeClient(String[] args) {
        try {
            DefaultParser parser = new DefaultParser();
            CommandLine line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true);
            return new CliOptions(
                    line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
                    checkSessionId(line),
                    // 解析 -i  初始化文件
                    checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
                    // 解析 -f sql腳本
                    checkUrl(line, CliOptionsParser.OPTION_FILE),
                    checkUrls(line, CliOptionsParser.OPTION_JAR),
                    checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                    line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
                    line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
                    getPythonConfiguration(line));
        } catch (ParseException e) {
            throw new SqlClientException(e.getMessage());
        }
    }
    public static final Option OPTION_INIT_FILE =
            Option.builder("i")
                    .required(false)
                    .longOpt("init")
                    .numberOfArgs(1)
                    .argName("initialization file")
                    .desc(
                            "Script file that used to init the session context. "
                                    + "If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file.")
                    .build();

    public static final Option OPTION_FILE =
            Option.builder("f")
                    .required(false)
                    .longOpt("file")
                    .numberOfArgs(1)
                    .argName("script file")
                    .desc(
                            "Script file that should be executed. In this mode, "
                                    + "the client will not open an interactive terminal.")
                    .build();

2.2.2、構(gòu)建 SqlClient

final SqlClient client = new SqlClient(true, options, terminalFactory);

2.2.3、啟動(dòng) SqlClient

client.start();

private void start() {
        if (isEmbedded) {
            // create local executor with default environment
            DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
            // 創(chuàng)建一個(gè) LocalExecutor 對(duì)象,用于本地執(zhí)行程序
            final Executor executor = new LocalExecutor(defaultContext);
            executor.start();

            // Open an new session
            String sessionId = executor.openSession(options.getSessionId());
            try {
                // add shutdown hook
                Runtime.getRuntime()
                        .addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));

                // do the actual work  真正執(zhí)行 SQL 的地方
                openCli(sessionId, executor);
            } finally {
                executor.closeSession(sessionId);
            }
        }
        else {
            throw new SqlClientException("Gateway mode is not supported yet.");
        }
    }
2.2.4、真正執(zhí)行 SQL 的地方是 openCli 方法
  /**
     * Opens the CLI client for executing SQL statements.
     *
     * @param sessionId session identifier for the current client.
     * @param executor executor
     */
    private void openCli(String sessionId, Executor executor) {
        Path historyFilePath;
        if (options.getHistoryFilePath() != null) {
            historyFilePath = Paths.get(options.getHistoryFilePath());
        } else {
            historyFilePath =
                    Paths.get(
                            System.getProperty("user.home"),
                            SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
        }

        boolean hasSqlFile = options.getSqlFile() != null;
        boolean hasUpdateStatement = options.getUpdateStatement() != null;
        if (hasSqlFile && hasUpdateStatement) {
            throw new IllegalArgumentException(//...
        }

        try (CliClient cli = new CliClient(terminalFactory, sessionId, executor, historyFilePath)) {
        	// 執(zhí)行初始化 SQL -i 參數(shù)
            if (options.getInitFile() != null) {
                boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));
                if (!success) { // ...}
            }

            if (!hasSqlFile && !hasUpdateStatement) {
                cli.executeInInteractiveMode();
            } else {
            	// 執(zhí)行真正的 SQL 文件 -f
                cli.executeInNonInteractiveMode(readExecutionContent());
            }
        }
    }

這個(gè)里面會(huì)先獲取 historyFilePath 的路徑,然后判斷是否存在 -i -f 這兩個(gè)文件,如果有的話會(huì)先調(diào)用 executeInitialization 執(zhí)行初始化的腳本.實(shí)際調(diào)用的是 executeInitialization#executeFile 方法來執(zhí)行腳本,executeFile 的源碼如下:

private boolean executeFile(String content, ExecutionMode mode) {
    terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EXECUTE_FILE).toAnsi());
    for (String statement : CliStatementSplitter.splitContent(content)) {
        terminal.writer()
                .println(
                        new AttributedString(String.format("%s%s", prompt, statement))
                                .toString());
        terminal.flush();
        // 執(zhí)行 
        if (!executeStatement(statement, mode)) {
            // cancel execution when meet error or ctrl + C;
            return false;
        }
    }
    return true;
}

其實(shí)不管是 -i 還是 -f 最終都會(huì)調(diào)用 executeFile 這個(gè)方法去解析腳本里的內(nèi)容并且執(zhí)行,這里方法里面先調(diào)用 splitContent 方法去做解析.

public static List<String> splitContent(String content) {
    List<String> statements = new ArrayList<>();
    List<String> buffer = new ArrayList<>();
    for (String line : content.split("\n")) {
        if (isEndOfStatement(line)) {
            buffer.add(line);
            statements.add(String.join("\n", buffer));
            buffer.clear();
        } else {
            buffer.add(line);
        }
    }
    if (!buffer.isEmpty()) {
        statements.add(String.join("\n", buffer));
    }
    return statements;
}
private static boolean isEndOfStatement(String line) {
    return line.replaceAll(MASK, "").trim().endsWith(";");
}

其實(shí)就是一行一行的讀取初始化腳本和 SQL 腳本里面的內(nèi)容,然后放到一個(gè) List 里面.然后循環(huán)這個(gè) List 調(diào)用 executeStatement 方法去執(zhí)行 SQL 腳本.

// 執(zhí)行 SQL 腳本.
private boolean executeStatement(String statement, ExecutionMode executionMode) {
    try {
        final Optional<Operation> operation = parseCommand(statement);
        operation.ifPresent(op -> callOperation(op, executionMode));
    } catch (SqlExecutionException e) {
        printExecutionException(e);
        return false;
    }
    return true;
}

執(zhí)行之前會(huì)先對(duì) SQL 做一個(gè)清洗,具體邏輯在 parseCommand 方法中.

// 其實(shí)就是把 SQL 后面的 ; 去掉,并在遇到 bad case 的時(shí)候返回空.然后調(diào)用 parseStatement 方法將 SQL 語句解析成 Operation,后面的過程就跟 Flink SQL 翻譯成代碼的過程差不多.就不在往后面跟了.
private Optional<Operation> parseCommand(String stmt) {
    // normalize
    stmt = stmt.trim();
    // remove ';' at the end
    if (stmt.endsWith(";")) {
        stmt = stmt.substring(0, stmt.length() - 1).trim();
    }
    // meet bad case, e.g ";\n"
    if (stmt.trim().isEmpty()) {
        return Optional.empty();
    }
    
    Operation operation = executor.parseStatement(sessionId, stmt);
    return Optional.of(operation);
}

-f 參數(shù)調(diào)用的是 executeInNonInteractiveMode 方法,實(shí)際也會(huì)調(diào)用 executeFile 方法,跟 -i 的執(zhí)行邏輯是一樣的.這里就不再分析了.

另外當(dāng)前的 SQL Client 僅支持嵌入式模式(也就是 embedded 模式)。將來,社區(qū)計(jì)劃通過提供基于 REST 的SQL 客戶端網(wǎng)關(guān)來擴(kuò)展其功能,有關(guān)更多信息,請(qǐng)參見 FLIP-24 和 FLIP-91。文章來源地址http://www.zghlxwxcb.cn/news/detail-426518.html

到了這里,關(guān)于FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink:FlinkSql解析嵌套Json

    Flink:FlinkSql解析嵌套Json

    日常開發(fā)中都是用的簡便json格式,但是偶爾也會(huì)遇到嵌套json的時(shí)候,因此在用flinksql的時(shí)候就有點(diǎn)麻煩,下面用簡單例子簡單定義處理下 1,數(shù)據(jù)是網(wǎng)上摘抄,但包含里常用的大部分格式 { ?? ?\\\"afterColumns\\\": { ?? ??? ?\\\"created\\\": \\\"1589186680\\\", ?? ??? ?\\\"extra\\\": { ?? ??? ??? ?\\\"

    2023年04月09日
    瀏覽(25)
  • 【源碼解析】flink sql執(zhí)行源碼概述:flink sql執(zhí)行過程中有哪些階段,這些階段的源碼大概位置在哪里

    【源碼解析】flink sql執(zhí)行源碼概述:flink sql執(zhí)行過程中有哪些階段,這些階段的源碼大概位置在哪里

    本文大致分析了flink sql執(zhí)行過程中的各個(gè)階段的源碼邏輯,這樣可以在flink sql執(zhí)行過程中, 能夠定位到任務(wù)執(zhí)行的某個(gè)階段的代碼大概分布在哪里,為更針對(duì)性的分析此階段的細(xì)節(jié)邏輯打下基礎(chǔ),比如create 的邏輯是怎么執(zhí)行的,select的邏輯是怎么生成的,優(yōu)化邏輯都做了哪

    2024年02月04日
    瀏覽(26)
  • 20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(23)
  • 【flink番外篇】21、Flink 通過SQL client 和 table api注冊(cè)catalog示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月21日
    瀏覽(25)
  • 基于Flink CDC實(shí)時(shí)同步PostgreSQL與Tidb【Flink SQL Client模式下親測(cè)可行,詳細(xì)教程】

    操作系統(tǒng):ubuntu-22.04,運(yùn)行于wsl 2【 注意,請(qǐng)務(wù)必使用wsl 2 ;wsl 1會(huì)出現(xiàn)各種各樣的問題】 軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳過此步 (1)pg安裝 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出現(xiàn)的問題 sudo -u postgres psql 報(bào)錯(cuò): psql: err

    2024年02月11日
    瀏覽(30)
  • Flink流批一體計(jì)算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊(cè)成臨時(shí)表 ·執(zhí)行 SQL 查詢 ·注冊(cè)用戶自定義的 (標(biāo)量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink 學(xué)習(xí)十 FlinkSQL

    Flink 學(xué)習(xí)十 FlinkSQL

    flink sql 基于flink core ,使用sql 語義方便快捷的進(jìn)行結(jié)構(gòu)化數(shù)據(jù)處理的上層庫; 類似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整體架構(gòu)和工作流程 數(shù)據(jù)流,綁定元數(shù)據(jù) schema ,注冊(cè)成catalog 中的表 table / view 用戶使用table Api / table sql 來表達(dá)計(jì)算邏輯 table-planner利用 apache calci

    2024年02月10日
    瀏覽(17)
  • Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    FlinkSQL 官網(wǎng)配置參數(shù): https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的錯(cuò)誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時(shí)間導(dǎo)致狀態(tài)爆炸。列舉兩個(gè)場(chǎng)景: ? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會(huì)一直保存在狀態(tài)里,不

    2024年02月14日
    瀏覽(21)
  • 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(23)
  • Flink實(shí)戰(zhàn)-(6)FlinkSQL實(shí)現(xiàn)CDC

    FlinkSQL說明 Flink SQL 是 Flink 實(shí)時(shí)計(jì)算為簡化計(jì)算模型,降低用戶使用實(shí)時(shí)計(jì)算門檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。 自 2015 年開始,阿里巴巴開始調(diào)研開源流計(jì)算引擎,最終決定基于 Flink 打造新一代計(jì)算引擎,針對(duì) Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初

    2023年04月26日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包