国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink1.14 sql基礎語法(二) flink sql表定義詳解

這篇具有很好參考價值的文章主要介紹了flink1.14 sql基礎語法(二) flink sql表定義詳解。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

flink1.14 sql基礎語法(二) flink sql表定義詳解

一、表的概念和類別

1.1 表的標識結構

每一個表的標識由 3 部分組成:

  • catalog name (常用于標識不同的“源”,比如 hive catalog,inner catalog 等)

  • database name(通常語義中的“庫”)

  • table name (通常語義中的“表”)

package cn.yyds.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _09_FlinkTableDb {
    public static void main(String[] args) {
        // 1、混合環(huán)境的創(chuàng)建
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2、建表
        TableDescriptor descriptor = TableDescriptor
                .forConnector("kafka")  // 指定連接器
                .schema(
                        Schema.newBuilder() // 指定表結構
                                .column("id", DataTypes.INT())
                                .column("name", DataTypes.STRING())
                                .column("age", DataTypes.INT())
                                .column("gender", DataTypes.STRING())
                                .build()
                )
                .format("json")
                .option("topic","kfa_person")
                .option("properties.bootstrap.servers","centos01:9092")
                .option("properties.group.id","g1")
                .option("scan.startup.mode","earliest-offset")
                .option("json.fail-on-missing-field","false")
                .option("json.ignore-parse-errors","true")
                .build();

        Table table = tableEnv.from(descriptor);
        
        // 注冊在默認的catalog和默認的database中
        tableEnv.createTemporaryView("kfa_person",table);

        // 注冊在默認的catalog和指定的database中
        tableEnv.createTemporaryView("ods.kfa_person",table);

        // 注冊在指定的catalog和指定的database中(可以和hive整合,保存到mysql中)
        tableEnv.createTemporaryView("hive_catalog.ods.kfa_person",table);
        
    }
}

1個flinksql程序在運行時,tableEnvironment 通過持有一個 map 結構來記錄所注冊的 catalog;

public final class CatalogManager {
    private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
    private final Map<String, Catalog> catalogs;
    private final Map<ObjectIdentifier, CatalogBaseTable> temporaryTables;
    ......
}

1.2 表和視圖

Flinksql中的表,可以是 virtual的 (view 視圖) 和 regular 的 (table 常規(guī)表)

  • table 描述了一個物理上的外部數(shù)據(jù)源,如文件、數(shù)據(jù)庫表、kafka 消息 topic

  • view 則基于表創(chuàng)建,代表一個或多個表上的一段計算邏輯(就是對一段查詢計劃的邏輯封裝);
    不管是 table 還是 view,在 tableAPI 中得到的都是 Table 對象

1.3 臨時和永久

臨時表(視圖) :

  • 創(chuàng)建時帶 temporary 關鍵字 (crate temporary view,createtemporary table)

永久表(視圖) :

  • 創(chuàng)建時不帶 temporary 關鍵字 (create view ,create table )

臨時表與永久表的本質區(qū)別: schema 信息是否被持久化存儲
臨時表(視圖)

  • 表 schema 只維護在所屬 flink session 運行時內存中

  • 當所屬的 flink session 結束后表信息將不復存在,且該表無法在 flink session 間共享。

常規(guī)表(視圖)

  • 表 schema 可記錄在外部持久化的元數(shù)據(jù)管理器中(比如 hive 的 metastore)

  • 當所屬 flink session 結束后,該表信息不會丟失,且在不同 flink session 中都可訪問到該表的信息。

// sql 定義方式
tableEnv.executeSql("create view view_1 as select .. from projectedTable");
tableEnv.executeSql("create temporary view_2 as select .. from projectedTable");
                    
                    
tableEnv.executeSql("create table (id int,...) with ( 'connector'= ...)");
tableEnv.executeSql("create temporary table (id int,...) with ('connector'= ...)");
                    
  
// table api方式
tenv.createTemporaryView("v_1", dataStreamschema);
tenv.createTemporaryView("v_1", table);


tenv.createTable("t_1", tableDescriptor);
tenv.createTemporaryTable("t_1", tableDescriptor);

二、表定義概覽

2.1 Table Api創(chuàng)建

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

Table 對象獲取方式解析:

  • 從已注冊的表

  • 從 TableDescriptor (連接器/format/schema/options)

  • 從 DataStream

  • 從 Table 對象上的查詢 api 生成

  • 從測試數(shù)據(jù)

涉及的核心參數(shù):

  • 已注冊的表名 (catalog name.database_name.object_name)

  • TableDescriptor (表描述器,核心是 connector 連接器)

  • Datastream(底層流)

  • 測試數(shù)據(jù)值

package cn.yyds.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 創(chuàng)建table的幾種方式
 *
 * 1、從已注冊的表
 * 2、從 TableDescriptor (連接器/format/schema/options)
 * 3、從 DataStream
 * 4、從 Table 對象上的查詢 api 生成
 * 5、從測試數(shù)據(jù)
 */
public class _04_TableCreate {
    public static void main(String[] args) {

        // 混合環(huán)境的創(chuàng)建
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 1、從 TableDescriptor (連接器/format/schema/options)
        TableDescriptor descriptor = TableDescriptor
                .forConnector("kafka")  // 指定連接器
                .schema(
                        Schema.newBuilder() // 指定表結構
                                .column("id", DataTypes.INT())
                                .column("name", DataTypes.STRING())
                                .column("age", DataTypes.INT())
                                .column("gender", DataTypes.STRING())
                                .build()
                )
                .format("json")
                .option("topic","t_kafka_1")
                .option("properties.bootstrap.servers","centos01:9092")
                .option("properties.group.id","g1")
                .option("scan.startup.mode","earliest-offset")
                .option("json.fail-on-missing-field","false")
                .option("json.ignore-parse-errors","true")
                .build();

        Table table1 = tableEnv.from(descriptor);


        // 2、從已注冊的表
        Table table2 = tableEnv.from("t_kafka_1");


        // 3、從 DataStream
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                // 設置訂閱的目標主題
                .setTopics("tp01")
                // 設置消費者組id
                .setGroupId("gp01")
                // 設置kafka服務器地址
                .setBootstrapServers("centos01:9092")
                // 起始消費位移的指定:
                //    OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消費起始位移選擇之前所提交的偏移量(如果沒有,則重置為LATEST)
                //    OffsetsInitializer.earliest()  消費起始位移直接選擇為 “最早”
                //    OffsetsInitializer.latest()  消費起始位移直接選擇為 “最新”
                //    OffsetsInitializer.offsets(Map<TopicPartition,Long>)  消費起始位移選擇為:方法所傳入的每個分區(qū)和對應的起始偏移量
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                // 設置value數(shù)據(jù)的反序列化器
                .setValueOnlyDeserializer(new SimpleStringSchema())
                // 開啟kafka底層消費者的自動位移提交機制
                //    它會把最新的消費位移提交到kafka的consumer_offsets中
                //    就算把自動位移提交機制開啟,KafkaSource依然不依賴自動位移提交機制
                //    (宕機重啟時,優(yōu)先從flink自己的狀態(tài)中去獲取偏移量<更可靠>)
                .setProperty("auto.offset.commit", "true")
                .build();

        // env.addSource();  //  接收的是  SourceFunction接口的 實現(xiàn)類
        DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");//  接收的是 Source 接口的實現(xiàn)類


        Table table3 = tableEnv.fromDataStream(streamSource);

        // 4、從 Table 對象上的查詢 api 生成
        Table table4 = table1.groupBy($("gender"))
                .select($("gender"), $("age").avg().as("avg_age"));



        // 5、從測試數(shù)據(jù)
        Table table5 = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("name", DataTypes.STRING()),
                        DataTypes.FIELD("info", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
                        DataTypes.FIELD("ts1", DataTypes.TIMESTAMP(3)),
                        DataTypes.FIELD("ts3", DataTypes.TIMESTAMP_LTZ(3))
                ),
                Row.of(1, "a", null, "2023-02-02 13:00:00.200", 1654236105000L)
        );
    }
}

2.2 Table Sql創(chuàng)建

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

注冊 sql表 (視圖)方式

  • 從已存在的 datastream 注冊

  • 從已存在的 Table 對象注冊

  • 從 TableDescriptor (連接器)注冊

  • 執(zhí)行 Sql 的 DDL 語句來注冊

package cn.yyds.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

/**
 * 注冊 sql表 (視圖)方式
 *   從已存在的 datastream 注冊
 *   從已存在的 Table 對象注冊
 *   從 TableDescriptor (連接器)注冊
 *   執(zhí)行 Sql 的 DDL 語句來注冊
 */
public class _04_SqlCreate {
    public static void main(String[] args) {
        // 混合環(huán)境的創(chuàng)建
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 1、從 TableDescriptor (連接器)注冊
        TableDescriptor descriptor = TableDescriptor
                .forConnector("kafka")  // 指定連接器
                .schema(
                        Schema.newBuilder() // 指定表結構
                                .column("id", DataTypes.INT())
                                .column("name", DataTypes.STRING())
                                .column("age", DataTypes.INT())
                                .column("gender", DataTypes.STRING())
                                .build()
                )
                .format("json")
                .option("topic","t_kafka_1")
                .option("properties.bootstrap.servers","centos01:9092")
                .option("properties.group.id","g1")
                .option("scan.startup.mode","earliest-offset")
                .option("json.fail-on-missing-field","false")
                .option("json.ignore-parse-errors","true")
                .build();


        tableEnv.createTable("kfk_person",descriptor);


        // 2、從已存在的 datastream 注冊
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                // 設置訂閱的目標主題
                .setTopics("tp01")
                // 設置消費者組id
                .setGroupId("gp01")
                // 設置kafka服務器地址
                .setBootstrapServers("centos01:9092")
                // 起始消費位移的指定:
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                // 設置value數(shù)據(jù)的反序列化器
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("auto.offset.commit", "true")
                .build();

        DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");//  接收的是 Source 接口的實現(xiàn)類


        tableEnv.createTemporaryView("kfk_source",streamSource);



        // 3、從已存在的 Table 對象注冊
        Table table = null;
        tableEnv.createTemporaryView("k_table",table);
        
        // 4、執(zhí)行 Sql 的 DDL 語句來注冊
        tableEnv.executeSql("create table t_kafka_1(\n" +
                "  id int,\n" +
                "  name string,\n" +
                "  age int,\n" +
                "  gender string\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 't_kafka_1',\n" +
                " 'properties.bootstrap.servers' = 'centos01:9092',\n" +
                " 'properties.group.id' = 'g1',\n" +
                " 'format' = 'json',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");
        

    }
}

三、Catalog詳解

3.1 catalog概念

catalog 就是一個元數(shù)據(jù)空間,簡單說就是記錄、獲取元數(shù)據(jù)(表定義信息)的實體。

flink sql 在運行時,可以擁有多個 catalog,它們由 catalogManager 模塊來注冊、管理。

CatalogManager 中可以注冊多個元數(shù)據(jù)空間。

1、環(huán)境創(chuàng)建之初,就會初始化一個默認的元數(shù)據(jù)空間

  • 空間名稱: default_catalog

  • 空間實現(xiàn)類: GenericInMemoryCatalog(基于內存)

public class GenericInMemoryCatalog extends AbstractCatalog {
    public static final String DEFAULT_DB = "default";
    // 用于記錄 本catalog空間所有database
    private final Map<String, CatalogDatabase> databases;
     // 用于記錄 本catalog空間所有table
    private final Map<ObjectPath, CatalogBaseTable> tables;
    ......
}    

2、用戶還可以向環(huán)境中注冊更多的 catalog,如下代碼新增注冊了一個 hivecatalog

// 創(chuàng)建hive元數(shù)據(jù)空間的實現(xiàn)對象
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "d:/conf/hiveconf");

// 將hive的元數(shù)據(jù)對象注冊到環(huán)境中
tableEnv.registerCatalog("hive_catalog",hiveCatalog);

注意:需要導入jar包,并把hive-site.xml的配置文件放入到hiveconf目錄下

<!--flink-sql使用hive-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

3.2 臨時表與永久表的底層差異

結論 1: 如果選擇 hive 元數(shù)據(jù)空間來創(chuàng)建表、視圖,則

  • 永久表(視圖)的元信息,都會被寫入 hive 的元數(shù)據(jù)管理器中,從而可以實現(xiàn)永久存

  • 在臨時表(視圖)的元信息,并不會寫入 hive 的元數(shù)據(jù)管理其中,而是放在 catalogManager 的一個 temporaryTables 的內存 hashmap 中記錄

  • 臨時表空間中的表名(全名) 如果與 hive 空間中的表名相同,則查詢時會優(yōu)先選擇臨時表空間的表

結論 2: 如果選擇 GenericInMemoryCatalog 元數(shù)據(jù)空間來創(chuàng)建表、視圖,則

  • 永久表(視圖)的元信息,都會被寫入 GenericInMemoryCatalog 的元數(shù)據(jù)管理器中(內存中)

  • 臨時表(視圖)的元信息,放在 catalogManager 的一個 temporaryTables 的內存 hashmap 中記

3.3 理解Hive catalog

flink sql利用 hive catalog 來建表 (查詢、修改、刪除表),本質上只是利用了 hive 的 metastore 服務

更具體來說,flinksql 只是把 flinksal 的表定義信息,按照 hive 元數(shù)據(jù)的形式,托管到 hive 的 metastore中而已。

當然,hive 中也能看到這些托管的表信息,但是,并不能利用它底層的 mapreduce 或者 spark 引擎來查詢這些表

因為 mapreduce 或者 spark 引擎,并不能理解 flinksql 表定義中的信息,也無法為這些定義信息提供相應的組件去讀取數(shù)據(jù)(比如,mr 或者 spark 就沒有 flinksql 中的各種 connector 組件)

四、表定義詳解

定義表時所需的核心要素

  • 表名 (catalog_name.database_name.object_name)

  • TableDescriptor

TableDescriptor 核心要素

  • Schema 表結構(字段)

  • Format 數(shù)據(jù)格式

  • Connector 連接器

  • Option 連接器參數(shù)

4.1 Schema字段定義詳解

4.1.1 physical column(物理字段)

物理字段: 源自于外部存儲系統(tǒng)本身 schema 中的字段

如 kafka 消息的 key、value (json 格式)中的字段;mysql表中的字段…

-- 一些連接器需要設置主鍵,例如upsert-kafka,因為支持change-log流
-- 單字段主鍵約束語法
id INT PRIMARY KEY NOT ENFORCED ,
name STRING


-- 多字段主鍵約束語法:
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED

4.1.2 computed column(表達式字段)

表達式字段(邏輯字段) : 在物理字段上施加一個 sql 表達式,并將表達式結果定義為一個字段。

4.1.3 metadata column(元數(shù)據(jù)字段)

元數(shù)據(jù)字段: 來源于 connector 從外部存儲系統(tǒng)中獲取到的 外部系統(tǒng)元信息

? 比如,kafka 的消息,通常意義上的數(shù)據(jù)內容是在 record 的 key 和 value 中的,而實質上 (底層角度來看), kafka 中的每一條 record,不光帶了 key 和 value 數(shù)據(jù)內容,還帶了這條record 所屬的 topic,所屬的 partition,所在的 offset,以及 record 的 timetamp 和 timestamp 類型等“元信息”。

? fink 的 connector 可以獲取并暴露這些元信息,并允許用戶將這些信息定義成 flinksal表中的字段

官網(wǎng)中可以查到暴露的元數(shù)據(jù)字段

比如kafka元數(shù)據(jù)字段: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

// DDL方式
tableEnv.executeSql("create table t_kafka_person(\n" +
                "  id int,                                          -- 物理字段\n" +
                "  name string,                                     -- 物理字段\n" +
                "  nick string,                                     -- 物理字段\n" +
                "  age int,                                         -- 物理字段\n" +
                "  big_age as age + 10,                             -- 表達式字段\n" +
                "  my_offset bigint METADATA FROM 'offset',         --元數(shù)據(jù)字段,來自kafka\n" +
                "  ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',   --元數(shù)據(jù)字段,來自kafka\n" +
                "  gender string\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 't_kafka_2',\n" +
                " 'properties.bootstrap.servers' = 'centos01:9092',\n" +
                " 'properties.group.id' = 'g1',\n" +
                " 'format' = 'json',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");
        // API 方式
        TableDescriptor descriptor = TableDescriptor
                .forConnector("kafka")  // 指定連接器
                .schema(
                        Schema.newBuilder() // 指定表結構
                                .column("id", DataTypes.INT())                   //column是物理字段
                                .column("name", DataTypes.STRING())              //column是物理字段
                                .column("nick", DataTypes.STRING())              //column是物理字段
                                .column("age", DataTypes.INT())                  //column是物理字段
                                .column("gender", DataTypes.STRING())            //column是物理字段
                                .columnByExpression("big_age","age + 10")                   // 聲明表達式字段
                                .columnByMetadata("my_offset",DataTypes.BIGINT(),"offset")  // 聲明元數(shù)據(jù)字段
                                // 聲明元數(shù)據(jù)字段 isVirtual表示,當這個表被當作sink表時候,該字段是否出現(xiàn)在schema中
                                .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
                                /*.primaryKey("id")*/  // 主鍵約束,upsert-kafka需要填寫主鍵
                                .build()
                )
                .format("json")
                .option("topic","t_kafka_2")
                .option("properties.bootstrap.servers","centos01:9092")
                .option("properties.group.id","g1")
                .option("scan.startup.mode","earliest-offset")
                .option("json.fail-on-missing-field","false")
                .option("json.ignore-parse-errors","true")
                .build();

4.2 format概述

connector 連接器在對接外部存儲時,根據(jù)外部存儲中的數(shù)據(jù)格式不同,需要用到不同的 format 組件

format 組件的作用就是:告訴連接器,如何解析外部存儲中的數(shù)據(jù)及映射到表 schema

format 組件的使用要點

  • 導入 format 組件的 jar 包依賴

  • 指定 format 組件的名稱

  • 設置 format 組件所需的參數(shù)(不同 format 組件有不同的參數(shù)配置需求)

官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

4.2.1 json format

官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/json/

1、需要引入依賴
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>1.14.4</version>
</dependency>
2、常用參數(shù)
參數(shù) 是否必須 默認值 類型 描述
format required (none) String 組件名json
json.fail-on-missing-field optional false Boolean 缺失字段是否失敗
json.ignore-parse-errors optional false Boolean 是否忽略json解析錯誤
json.timestamp-format.standard optional 'SQL' String json中timestamp類型字段格式
json.map-null-key.mode optional 'FAIL' String 可選值'FAIL', 'DROP' ,'LITERAL'
json.map-null-key.literal optional ‘null’ String 替換null的字符串
3、數(shù)據(jù)類型映射
Flink SQL type JSON type
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object
4、使用案例(復雜json解析)
package cn.yyds.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 文件中有如下的數(shù)據(jù):
 *
 * {"id":10, "name":"tom", "age":28, "ts":"2023-03-02 00:00:00.000"}
 */
public class _10_JsonFormatTest1 {
    public static void main(String[] args)  {

        // 創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        tableEnv.executeSql("create table t_kafka_p(\n" +
                "  id int,\n" +
                "  name string,\n" +
                "  age int,\n" +
                "  ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                " 'connector' = 'filesystem',\n" +
                " 'path' = 'file:///D:/works/flink-live/files/sql-data/test1.txt',\n" +
                " 'format' = 'json',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");


        tableEnv.executeSql("select * from t_kafka_p").print();
    }
}
+----+-------------+--------------------------------+-------------+-------------------------+
| op |          id |                           name |         age |                      ts |
+----+-------------+--------------------------------+-------------+-------------------------+
| +I |          10 |                            tom |          28 | 2023-03-02 00:00:00.000 |
+----+-------------+--------------------------------+-------------+-------------------------+

復雜json類型的解析

{
    "id":1238123899121,
    "name":"hank",
    "date":"2022-10-14",
    "obj":{
        "time1":"12:12:43Z",
        "str":"sfasfafs",
        "lg":2324342345
    },
    "arr":[
        {
            "f1":"f1str11",
            "f2":134
        },
        {
            "f1":"f1str22",
            "f2":555
        }
    ],
    "time":"12:12:43Z",
    "timestamp":"2022-10-14T12:12:43Z",
    "map":{
        "flink":123
    },
    "mapinmap":{
        "inner_map":{
            "key":234
        }
    }
}
-- 復雜json解析的表定義
CREATE TABLE json_source (
    id            BIGINT,
    name          STRING,
    `date`        DATE,
    obj           ROW<time1 TIME,str STRING,lg BIGINT>,
    arr           ARRAY<ROW<f1 STRING,f2 INT>>,
    `time`        TIME,
    `timestamp`   TIMESTAMP(3),
    `map`         MAP<STRING,BIGINT>,
    mapinmap      MAP<STRING,MAP<STRING,INT>>,
    proctime as PROCTIME()
 ) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///D:\doit\works\flink-live\files\sql-data\test3.txt',
 'format' = 'json',
 'json.ignore-parse-errors' = 'true'
);

-- 從表中獲取數(shù)據(jù)
-- 注意數(shù)組index從1開始
select id, name,`date`,obj.str,arr[1].f1,`map`['flink'],mapinmap['inner_map']['key'] from json_source;

4.2.2 csv format

官網(wǎng): https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/csv/

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>1.14.4</version>
</dependency>

參數(shù)解釋

參數(shù) 是否必須 默認值 類型 描述
format required (none) String csv
csv.field-delimiter optional , String 分割符
csv.allow-comments optional false Boolean 是否允許注釋'默認#開頭注釋'
csv.ignore-parse-errors optional false Boolean 是否忽略解析錯誤
csv.array-element-delimiter optional ; String 數(shù)組元素之間分隔符
csv.escape-character optional (none) String 轉義字符
csv.null-literal optional (none) String null的字面量字符串

4.3 watermark和時間屬性

時間屬性定義,主要是用于各類基于時間的運算操作(如基于時間窗口的查詢計算)。

4.3.1 eventTime和watermark定義

核心要點:

  • 需要一個 timestamp(3)類型字段(可以是物理字段,也可以是表達式字段,也可以是元數(shù)據(jù)字段)

  • 需要用一個 watermarkExpression 來指定 watermark 策略

package cn.yyds.sql;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _11_SqlWatermark {
    public static void main(String[] args) {

        // 創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // guid,uuid,eventId,pageId,ts
        DataStreamSource<String> sourceStream = env.socketTextStream("centos04", 9999);

        SingleOutputStreamOperator<EventBean> mapStream = sourceStream.map(line -> {
            String[] arr = line.split(",");
            return new EventBean(Integer.parseInt(arr[0]), arr[1], arr[2], arr[3], Long.parseLong(arr[4]));
        });


        // 分配wm
        SingleOutputStreamOperator<EventBean> wmStream = mapStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<EventBean>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<EventBean>() {
                            @Override
                            public long extractTimestamp(EventBean eventBean, long l) {
                                return eventBean.getTs();
                            }
                        })
        );

        // 轉換為table
        Table table = tableEnv.fromDataStream(wmStream,
                Schema.newBuilder()
                        // 聲明表達式字段,并聲明為 processing time 屬性字段
                        // .columnByExpression("pt","proctime()")
                        // 聲明表達式字段
                        .columnByExpression("rt","to_timestamp_ltz(ts, 3)")
                        // 將 rt 字段指定為 event time 屬性字段,并基于它指定 watermark 策略: = rt
                        .watermark("rt","rt")
                        // 將 rt 字段指定為 event time 屬性字段,并基于它指定 watermark 策略: = rt-8s
                        .watermark("rt","rt - interval '8' second")
                        //  將 rt 字段指定為 event time 屬性字段,并沿用“源頭流”的 watermark
                        .watermark("rt","source_watermark()")
                        .build()
        );

        table.printSchema();
    }
}
-- DDL方式定義水位線


-- {"id":1,"eventId":"e1","ts":1679754806020,"pageId":"p01"}
--加上水位線及處理時間
create table t_kafka_wm(
  id int,
  eventId string,
  ts bigint,
  pageId string,             -- 物理字段
  pt as PROCTIME(),          -- 聲明處理時間
  wc_time as TO_TIMESTAMP_LTZ(ts, 3),                     -- 表達式字段,將long轉換為TIMESTAMP_LTZ
  WATERMARK FOR wc_time AS wc_time - INTERVAL '5' SECOND  -- 水位線
) WITH (
 'connector' = 'kafka',
 'topic' = 't_kafka_3',
 'properties.bootstrap.servers' = 'centos01:9092',
 'properties.group.id' = 'g1',
 'format' = 'json',
 'scan.startup.mode' = 'earliest-offset',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

4.3.2 processing time

定義一個表達式字段,并用表達式 proctime() 將其聲明為 processing time 即可;

        // 轉換為table
        Table table = tableEnv.fromDataStream(wmStream,
                Schema.newBuilder()
                        // 聲明表達式字段,并聲明為 processing time 屬性字段
                        .columnByExpression("pt","proctime()")
                        .build()                      
               )                               

4.3.3 表和流之間水位線的傳遞

4.3.3.1 流轉表的時候

流轉表的過程中,無論“源流”是否存在 watermark,都不會自動傳遞 watermark

如需時間運算(如時間窗口等),需要在轉換定義中顯式聲明 watermark 策略

  • 先設法定義一個 timestamp(3)或者 timestamp_ltz(3)類型的字段 (可以來自于數(shù)據(jù)字段,也可以來自于一個元數(shù)據(jù): rowtime)
rt as to_timestamp_ltz(ts,3)  -- 從一個bigint中得到timestamp(3)類型的字段

rt timestamp(3) metadata from 'rowtime'
  • 然后基于該字段,用 watermarkExpression 聲明 watermark 策略
watermark for rt AS rt - interval '1' second  

watermark for rt AS source_watermark()   -- 代表使用底層流的 watermark 策略
4.3.3.2 表轉流的時候

源表定義了 wartermark 策略,則將表轉成流時,將會自動傳遞源表的 watermark。

        /**
         * 前提:table是一個存在watermark的表對象
         */
        tableEnv.toDataStream(table)
                .process(new ProcessFunction<Row, String>() {
                    @Override
                    public void processElement(Row value, Context ctx, Collector<String> out) throws Exception {
                        long watermark = ctx.timerService().currentWatermark();
                        System.out.println(watermark + "=>" + value);
                    }
                }).print();

4.4 connector詳解

  • connector 通常是用于對接外部存儲建表(源表或目標表)時的映射器、橋接器

  • connector 本質上是對 flink 的 table source /table sink 算子的封裝

連接器使用的核心要素

  • 1、導入連接器jar 包依賴

  • 2、指定連接器類型名

  • 3、指定連接器所需的參數(shù) (不同連接器有不同的參數(shù)配置需求)

  • 4、獲取連接器所提供的元數(shù)據(jù)

flink1.14支持的連接器

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

4.4.1 kafka連接器

產生的數(shù)據(jù)以及能接受的數(shù)據(jù)流,是 append-only 流 (只有 +I 這種 changemode)

所需依賴

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.12</artifactId>
      <version>1.14.4</version>
</dependency>

入門案例

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

復雜案例

解析kafka生產者產生具有key以及headers的數(shù)據(jù)

package cn.yyds.sql;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.*;

/**
 * 生產者生產數(shù)據(jù)
 */
public class _12_KafkaProducer {
    public static void main(String[] args) throws InterruptedException {

        // 泛型 K: 要發(fā)送的數(shù)據(jù)中的key
        // 泛型 V: 要發(fā)送的數(shù)據(jù)中的value
        // 隱含之意: kafka中的 message,是 Key-value結構的 (可以沒有key)
        Properties props = new Properties();
        // 因為kafka底層的存儲是沒有類型維護機制的,用戶所發(fā)的所有數(shù)據(jù)類型,都必須變成 序列化后的byte[]
        // 所以,kafka的producer需要一個針對用戶要發(fā)送的數(shù)據(jù)類型的序列化工具類
        // 且這個序列化工具類,需要實現(xiàn)kafka所提供的序列工具接口: org.apache.kafka.common.serialization.Serializer
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos01:9092,centos02:9092,centos03:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // 消息發(fā)送應答級別


        // 構造一個生產者客戶端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 檢查是否發(fā)送成功的消費者命令:
        //    kafka-console-consumer.sh  --bootstrap-server centos01:9092 --topic abcd
        for(int i = 0; i < 10; i++){
            // 將業(yè)務數(shù)據(jù)封裝成客戶端所能發(fā)送的封裝格式
            // 0->abc0
            // 1->abc1
            List<Header> headers = new ArrayList<>();
            headers.add(new RecordHeader("k1", "v1".getBytes()));

            JSONObject jsonObject = new JSONObject();
            jsonObject.put("guid",i);
            jsonObject.put("pageId","page" + i);
            jsonObject.put("eventId","e" + i);
            jsonObject.put("eventTime",System.currentTimeMillis());

            ProducerRecord<String, String> message = new ProducerRecord<>("abcd", 0, "key_" + (i % 3), jsonObject.toJSONString(),headers);

            // 調用客戶端去發(fā)送
            // 數(shù)據(jù)的發(fā)送動作在producer的底層是異步線程去異步發(fā)送的
            producer.send(message);

            Thread.sleep(100);
        }


        // 關閉客戶端
        producer.close();
    }
}

{"eventId":"e0","eventTime":1680615780889,"guid":0,"pageId":"page0"}
{"eventId":"e1","eventTime":1680615781420,"guid":1,"pageId":"page1"}
{"eventId":"e2","eventTime":1680615781521,"guid":2,"pageId":"page2"}
{"eventId":"e3","eventTime":1680615781622,"guid":3,"pageId":"page3"}
{"eventId":"e4","eventTime":1680615781724,"guid":4,"pageId":"page4"}
{"eventId":"e5","eventTime":1680615781825,"guid":5,"pageId":"page5"}
{"eventId":"e6","eventTime":1680615781925,"guid":6,"pageId":"page6"}
{"eventId":"e7","eventTime":1680615782027,"guid":7,"pageId":"page7"}
{"eventId":"e8","eventTime":1680615782129,"guid":8,"pageId":"page8"}
{"eventId":"e9","eventTime":1680615782229,"guid":9,"pageId":"page9"}

·

-- 解析kafka
create table t_kafka_w(
  guid int,
  pageId string, 
  eventId string,
  eventTime bigint,
  msgkey string,
  `partition` bigint METADATA VIRTUAL,
  `offset` bigint METADATA VIRTUAL,
  `headers` MAP<string,bytes> METADATA FROM 'headers'
) WITH (
 'connector' = 'kafka',
 'topic' = 'abcd',
 'properties.bootstrap.servers' = 'centos01:9092',
 'properties.group.id' = 'g1',
 --'format' = 'json',
 'key.format' = 'raw',     -- 解析key用raw
 'key.fields' = 'msgkey',
 'value.format' = 'json',  -- 解析value用json
  -- 解析key的值是,要加上 'value.fields-include' = 'EXCEPT_KEY' 參數(shù)
  -- 不然這個 key_field列也會被當成 value 的一部分參與 value 的解析,從而導致解析不出來數(shù)據(jù)
 'value.fields-include' = 'EXCEPT_KEY',
 'scan.startup.mode' = 'earliest-offset'
)


-- 查找數(shù)據(jù)
select guid,pageId,eventId,eventTime,msgkey,`partition`,`offset`,cast(headers['k1'] as string) as headers_value from t_kafka_w


+----+-------------+--------------------------------+--------------------------------+----------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op |        guid |                         pageId |                        eventId |            eventTime |                         msgkey |            partition |               offset |                  headers_value |
+----+-------------+--------------------------------+--------------------------------+----------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I |           0 |                          page0 |                             e0 |        1680615780889 |                          key_0 |                    0 |                    0 |                             v1 |
| +I |           1 |                          page1 |                             e1 |        1680615781420 |                          key_1 |                    0 |                    1 |                             v1 |
| +I |           2 |                          page2 |                             e2 |        1680615781521 |                          key_2 |                    0 |                    2 |                             v1 |
| +I |           3 |                          page3 |                             e3 |        1680615781622 |                          key_0 |                    0 |                    3 |                             v1 |
| +I |           4 |                          page4 |                             e4 |        1680615781724 |                          key_1 |                    0 |                    4 |                             v1 |
| +I |           5 |                          page5 |                             e5 |        1680615781825 |                          key_2 |                    0 |                    5 |                             v1 |
| +I |           6 |                          page6 |                             e6 |        1680615781925 |                          key_0 |                    0 |                    6 |                             v1 |
| +I |           7 |                          page7 |                             e7 |        1680615782027 |                          key_1 |                    0 |                    7 |                             v1 |
| +I |           8 |                          page8 |                             e8 |        1680615782129 |                          key_2 |                    0 |                    8 |                             v1 |
| +I |           9 |                          page9 |                             e9 |        1680615782229 |                          key_0 |                    0 |                    9 |                             v1 |

4.4.2 upsert kafka連接器

所需依賴和kafka相同。

作為source

根據(jù)所定義的主鍵,將讀取到的數(shù)據(jù)轉換為 +I/-U/+U 記錄,如果讀到 null,則轉換為-D 記錄。

-- kafka 中假設有如下數(shù)據(jù)
1,zs,18
1,zs,28


-- kafka-connector產生出 appendonly 流
+I[1,zs,18]
+I[1,zs,28]


-- upsert-kafka-connector 產生出 upsert 模式的 changelog 流
+I [1,zs,18]
-U [1,zs,18]
+U [1,zs,28]

作為sink

  • 對于 -U/+U/+I 記錄,都以正常的 append 消息寫入 kafka

  • 對于-D 記錄,則寫入一個 null 到 kafka 來表示 delete 操作:

案例

package cn.yyds.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class _13_UpsertKafka {
    public static void main(String[] args) {
        // 創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 創(chuàng)建測試數(shù)據(jù)
        Table table = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("province", DataTypes.STRING()),
                        DataTypes.FIELD("user_id", DataTypes.STRING())
                ),
                Row.of("sh","u001"),
                Row.of("sh","u002"),
                Row.of("sh","u003")
        );

        tableEnv.createTemporaryView("s_source",table);

        // 創(chuàng)建upsert-kafka sink表
        tableEnv.executeSql("create table t_upsert_kafka_w(\n" +
                "  province string,\n" +
                "  pv bigint, \n" +
                "  primary  key(province) not enforced -- 需要設置主鍵字段  \n" +
                ") WITH (\n" +
                " 'connector' = 'upsert-kafka',\n" +
                " 'topic' = 't_upsert_kafka',\n" +
                " 'properties.bootstrap.servers' = 'centos01:9092',\n" +
                " 'key.format' = 'csv',\n" +
                " 'value.format' = 'csv'\n" +
                ")");


        tableEnv.executeSql("insert into t_upsert_kafka_w select province,count(distinct user_id) as uv from s_source group by province");

        /**
         *+----+--------------------------------+----------------------+
         * | op |                       province |                   uv |
         * +----+--------------------------------+----------------------+
         * | +I |                             sh |                    1 |
         * | -U |                             sh |                    1 |
         * | +U |                             sh |                    2 |
         * | -U |                             sh |                    2 |
         * | +U |                             sh |                    3 |
         * +----+--------------------------------+----------------------+
         */
        // 從kafka讀取結果
        tableEnv.executeSql("select * from t_upsert_kafka_w").print();

    }
}

4.4.3 jdbc連接器

jdbc connector作為source有如下特性

  • 可作為scan source,底層產生bounded stream

  • 可作為 lookup source,底層是“事件驅動"式查詢??梢詫dbc連接器作為一個維表進行時態(tài)關聯(lián)。
    具體可參考:flink1.14 sql基礎語法(一) flink sql表查詢詳解

jdbc connector作為sink有如下特性

  • 可作為 Batch 模式的sink

  • 可作為Stream模式下的append sink和upsert sink

所需依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.14.4</version>
</dependency>


根據(jù)所連接的數(shù)據(jù)庫不同,還需要相應的 jdbc 驅動,比如連接 mysql
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>

冪等寫出

  • jdbc connector 可以利用目標數(shù)據(jù)庫的特性,實現(xiàn)冪等寫出

  • 冪等寫出可以避免在 failover 發(fā)生后的可能產生的數(shù)據(jù)重復

實現(xiàn)冪等寫出,本身并不需要對jdbc connector 做額外的配置,只需要指定主鍵字段,jdbc connector 就會利用目標數(shù)據(jù)庫的 upsert 語法,來實現(xiàn)冪等寫出。

package cn.yyds.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class _14_UpsertJdbcSink {
    public static void main(String[] args) {
        // 創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        Table table = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("province", DataTypes.STRING()),
                        DataTypes.FIELD("user_id", DataTypes.STRING())
                ),
                Row.of("sh","u001"),
                Row.of("sh","u002"),
                Row.of("sh","u003")
        );

        tableEnv.createTemporaryView("s_source",table);

        
        // 創(chuàng)建jdbc sink表
        tableEnv.executeSql("create table t_province_uv(\n" +
                "  province string,\n" +
                "  uv bigint, \n" +
                "  primary  key(province) not enforced \n" +
                ") with(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/test?serverTimezone=UTC',\n" +
                "    'table-name' = 't_province_uv_res',\n" +
                "    'driver' = 'com.mysql.jdbc.Driver',\n" +
                "    'username' = 'root',\n" +
                "    'password' = 'root'\n" +
                ")");


        tableEnv.executeSql("insert into t_province_uv select province,count(distinct user_id) as uv from s_source group by province");

        /**
         * +----+--------------------------------+----------------------+
         * | op |                       province |                   uv |
         * +----+--------------------------------+----------------------+
         * | +I |                             sh |                    3 |
         * +----+--------------------------------+----------------------+
         */
        // 從kafka讀取結果
        tableEnv.executeSql("select * from t_province_uv").print();

    }
}

flink1.14 sql基礎語法(二) flink sql表定義詳解,# flink,sql,flink

分區(qū)并行讀取 (partitioned scan)
jdbc connector 持有一個多并行度的 source task,因而可以多并行度加快表數(shù)據(jù)的讀取

通過設置如下參數(shù)即可實現(xiàn)多并行讀取

  • scan.partition.column: 劃分并行任務的參照列

  • scan.partition.num: 任務并行數(shù)

  • scan.partition.lower-bound: 首分區(qū)的參照字段最小值

  • scan.partition.upper-bound: 末分區(qū)的參照字段最大值
    分區(qū)參照字段必須是: numeric, date,或 timestamp 類型

4.4.4 filesystem連接器

filesystem connector 表特性

  • 可讀可寫

  • 作為 source 表時,支持持續(xù)監(jiān)視讀取目錄下新文件,且每個新文件只會被讀取一次

  • 作為 sink 表時,支持 多種文件格式、分區(qū)、文件滾動、壓縮設置等功能

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
)
PARTITIONED BY (part_name1, part_name2)
WITH (
  'connector' = 'filesystem',                   -- 必填: 指定連接器名稱
  'path' = 'file:///path/to/whatever',          -- 必填: 目錄路徑
  'format' = '...',                             -- 必填: 文件系統(tǒng)連接器要求指定一個format格式化
  'partition.default-name' = '...',             -- 可選: 如果動態(tài)分區(qū)字段值為null/空字符串,則使用指定的默認分區(qū)名稱
  'sink.shuffle-by-partition.enable' = '...',   --可選:在sink階段開啟對動態(tài)分區(qū)文件數(shù)據(jù)的shuffle,開啟之后可以減少寫出文件的數(shù)量,但是有可能造成數(shù)據(jù)傾斜。默認為false。
  ...
);
1、分區(qū)文件

文件系統(tǒng)分區(qū)支持使用標準的hive format格式,而且,它不要求分區(qū)被預注冊在表的catalog中。分區(qū)通過目錄結構來進行發(fā)現(xiàn)和推斷。比如,下面基于目錄的表分區(qū)將會被推斷為包含日期和小時分區(qū)。

path
└── datetime=2019-08-25
    └── hour=11
        ├── part-0.parquet
        ├── part-1.parquet
    └── hour=12
        ├── part-0.parquet
└── datetime=2019-08-26
    └── hour=6
        ├── part-0.parquet
12345678910

使用insert overwrite覆蓋一個分區(qū)表時,只有相關聯(lián)的分區(qū)被覆蓋,而不是整張表。

2、文件format

文件系統(tǒng)連接器支持多種format格式:

  • CSV: RFC-4180. 未壓縮
  • JSON: 注意,文件系統(tǒng)的JSON格式并不是標準的JSON文件,而是未壓縮的newline delimited JSON。
  • Avro: Apache Avro. 支持通過配置avro.codec來支持壓縮。
  • Parquet: Apache Parquet. 兼容Hive.
  • Orc: Apache Orc. 兼容Hive.
  • Debezium-JSON: debezium-json.
  • Canal-JSON: canal-json.
  • Raw: raw.
3、Source

file system 連接器在單個表中可以被用于讀取單個文件,或者是整個目錄。

當使用目錄作為 source 路徑時,目錄中的文件并沒有定義好的讀取順序。

目錄監(jiān)控

默認情況下,file system 連接器是有界的,該連接器只會讀取一次配置的目錄,然后關閉它。

你可以通過配置 option source.monitor-interval 選項配置持續(xù)的目錄監(jiān)控:

Key 默認值 類型 描述
source.monitor-interval (none) Duration source 檢查新文件的時間間隔,該數(shù)值必須大于0。每個文件都會使用他們自己的路徑作為唯一標識符,并且在被發(fā)現(xiàn)后處理一次。已經被處理過的文件集合會在整個 source 的生命周期內被保存到 state 中,因此他們和 source state 一起被持久化到 checkpoint 和 savepoint 中。 更小的時間間隔意味著文件會更快被發(fā)現(xiàn),但是會對文件系統(tǒng)或對象存儲進行更頻繁的文件列出或目錄遍歷。如果沒有配置該選項,則提供的路徑將只會被掃描一次,此時該 source 將會是有界的。
可用元數(shù)據(jù)

下面的連接器元數(shù)據(jù)可以通過被定義為表的元數(shù)據(jù)字段來訪問,所有的元數(shù)據(jù)都是只讀的。

Key 數(shù)據(jù)類型 描述
file.path STRING NOT NULL 輸入文件的路徑
file.name STRING NOT NULL 文件名稱,他是距離文件路徑根目錄最遠的元素。
file.size BIGINT NOT NULL 文件的字節(jié)數(shù)。
file.modification-time TIMESTAMP_LTZ(3) NOT NULL 文件的修改時間。

下面的代碼片段展示了 CREATE TABLE 案例如何訪問元數(shù)據(jù)屬性:

CREATE TABLE MyUserTableWithFilepath (
    column_name1 INT,
    column_name2 STRING,
    `file.path` STRING NOT NULL METADATA
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///path/to/whatever',
    'format' = 'json'
)
4、Streaming Sink

文件系統(tǒng)連接器基于Streaming File Sink 寫入記錄到文件以支持文件系統(tǒng)連接器流式寫入。行編碼格式支持csvjson。塊編碼格式支持parquet、orcavro

可以通過sql直接寫入,插入流數(shù)據(jù)到不分區(qū)的表中。如果是分區(qū)表,可以配置分區(qū)關聯(lián)操作。

滾動策略

數(shù)據(jù)通過分區(qū)目錄會被切分為多個文件。每個分區(qū)將包含其對應sink子任務接收到數(shù)據(jù)之后寫入的至少一個文件,正在處理的文件將會根據(jù)配置的滾動策略來關閉并成為分區(qū)中的一個文件。文件的滾動策略基于大小、文件可以被打開的最大超時時間間隔來配置。

Key 要求 是否可被傳遞 默認值 類型 描述
sink.rolling-policy.file-size 可選 128MB MemorySize 滾動之前文件的最大大小。
sink.rolling-policy.rollover-interval 可選 30 min Duration 被滾動之前,一個文件可以保持打開的最大時間間隔(默認為30分鐘,以避免產生很多小文件)。通過 sink.rolling-policy.check-interval 選項來控制檢查的頻率。
sink.rolling-policy.check-interval 可選 1 min Duration 滾動策略的檢查時間間隔。該選項基于 sink.rolling-policy.rollover-interval 選項來控制檢查文件是否可以被滾動。

注:對于塊格式(parquet、orc、avro),滾動策略將會根據(jù)checkpoint間隔來控制大小和他們的數(shù)量,checkpoint決定文件的寫入完成。

注:對于行格式(csvjson),如果想查看文件是否在文件系統(tǒng)中存在,并且不想等待過長的時間,則可以在連接器配置 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval ,并且在flink-conf.yaml中設置 execution.checkpointing.interval 參數(shù)。

對于其他的格式(avro、orc),可以只在flink-conf.yaml中配置execution.checkpointing.interval參數(shù)。

文件壓縮

文件系統(tǒng)sink支持文件壓縮,該特性允許應用程序設置更小的checkpoint間隔,而不會產生很多的文件。

Key 要求 是否可被傳遞 默認值 類型 描述
auto-compaction 可選 false Boolean 是否在流slink中開啟自動壓縮。數(shù)據(jù)將會被寫入臨時文件。checkpoint完成之后,通過checkpoint生成的臨時文件將會被壓縮。臨時文件在被壓縮之前是不可見的。
compaction.file-size 可選 (none) Boolean 壓縮的目標文件大小,默認值為滾動文件大小。

如果開啟,文件壓縮將會基于目標文件大小合并多個小文件為大文件。在生產生運行文件壓縮時,需要注意以下問題:

  • 只有單個checkpoint中的文件可以被合并,因此,至少有和checkpoint次數(shù)相同的文件被生成。
  • 文件在被合并之前是不可見的,因此文件可見時間為:checkpoint間隔+壓縮時間
  • 如果壓縮運行時間過長,則將會造成任務的反壓,并且增加checkpoint的時間。
5、分區(qū)提交

通常來說,寫入分區(qū)之后通知下游應用程序是非常必要的。比如:增加分區(qū)信息到hive的元數(shù)據(jù),或者是在分區(qū)目錄中寫入一個 _SUCCESS 文件。文件系統(tǒng)sink連接器提供了分區(qū)提交特性,以允許配置自定義策略。提交行為基于合并的觸發(fā)器和策略。

Trigger觸發(fā)器:分區(qū)提交的時間可以通過水印或處理時間來確定。

Policy策略:如何提交一個分區(qū),支持通過success文件和元數(shù)據(jù)提交,也可以自定義實現(xiàn)策略。比如觸發(fā)hive的指標分區(qū),或者是和并小文件等等。

注:分區(qū)提交只在動態(tài)分區(qū)插入時起作用。

分區(qū)提交觸發(fā)器

定義何時提交分區(qū),提供分區(qū)提交觸發(fā)器:

Key 要求 是否可被傳遞 默認值 類型 描述
sink.partition-commit.trigger 可選 process-time String 分區(qū)提交觸發(fā)的類型: process-time:基于機器時間,既不需要分區(qū)時間提取,也不需要水印生成。一旦當前系統(tǒng)時間超過了分區(qū)創(chuàng)建時的系統(tǒng)時間加上指定的delay延遲就會提交分區(qū)。 partition-time:基于分區(qū)字段值提取的時間,要求生成水印。當水印超過了分區(qū)值提取的時間加上delay延遲時提交水印。
sink.partition-commit.delay 可選 0 s Duration 分區(qū)在延遲時間到達之前不會提交。如果是按天分區(qū),則應該是1 d,如果是按小時分區(qū),則應該是1 h。
sink.partition-commit.watermark-time-zone 可選 UTC String 轉換long類型的水印值為TIMESTAMP類型是使用的時區(qū),轉換之后的水印時間戳將被用于和分區(qū)時間計算,以決定分區(qū)是否應該被提交。 該選項只有在 sink.partition-commit.trigger 選項設置為 partition-time 時起作用。如果該選項沒有被正確配置,比如source的rowtime被定義為TIMESTAMP_LTZ字段,但是該選項沒有配置,則用戶將會延遲幾小時之后看到提交的分區(qū)。 默認值為UTC,這意味著水印需要被定義為TIMESTAMP字段,或者是不被定義。如果水印被定義為TIMESTAMP_LTZ字段,則水印時區(qū)為會話時區(qū)。該選項值可以是完全名稱,比如America/Los_Angeles,或者是自定義的時區(qū)id,比如GMT+08:00。

有兩種觸發(fā)器類型:

  • 第一個是分區(qū)的處理時間,既不要求分區(qū)時間提取,也不要求水印生成。該觸發(fā)器根據(jù)分區(qū)的創(chuàng)建時間和當前系統(tǒng)時間觸發(fā)分區(qū)提交。該觸發(fā)器更常用,但不是很精確。比如,數(shù)據(jù)延遲或失敗,將會導致不成熟的分區(qū)提交。
  • 第二個是根據(jù)水印和從分區(qū)中提取的時間來觸發(fā)分區(qū)提交。該觸發(fā)器要求任務有水印生成,并且分區(qū)根據(jù)時間來劃分,比如按小時或按天分區(qū)。

如果想要下游盡快看到新分區(qū),而不管數(shù)據(jù)寫入是否完成:

  • ‘sink.partition-commit.trigger’=‘process-time’ (默認值)
  • ‘sink.partition-commit.delay’=‘0s’ (默認值),分區(qū)一旦寫入數(shù)據(jù),將會立即提交。注:分區(qū)可能會被提交多次。

如果想要下游在數(shù)據(jù)寫入完成之后看到分區(qū),并且job任務有水印生成,則可以通過分區(qū)值來提取時間:

  • ‘sink.partition-commit.trigger’=‘partition-time’
  • ‘sink.partition-commit.delay’=‘1h’ (如果分區(qū)為小時分區(qū),則使用 1h,取決于分區(qū)時間類型)這是提交分區(qū)更準確的方式。它將嘗試在數(shù)據(jù)寫入完成之后再提交分區(qū)。

如果想要下游在數(shù)據(jù)寫入完成之后看到分區(qū),但是沒有水印,或者是無法從分區(qū)值提取時間:

  • ‘ink.partition-commit.trigger’=‘process-time’ (默認值)
  • ‘sink.partition-commit.delay’=‘1h’ (如果分區(qū)為小時分區(qū),則使用 1h,取決于分區(qū)時間類型)嘗試準確的提交分區(qū),但是遲到的數(shù)據(jù)或者是失敗將會導致不成熟的分區(qū)提交。

遲到數(shù)據(jù)處理:支持寫入分區(qū)的記錄將會被寫入已經提交的分區(qū),并且該分區(qū)提交將會被再次觸發(fā)。

默認提取器基于分區(qū)屬性和時間戳默認組成。也可以通過實現(xiàn) PartitionTimeExtractor 接口來完全自定義分區(qū)提取器。

public class HourPartTimeExtractor implements PartitionTimeExtractor {
    @Override
    public LocalDateTime extract(List<String> keys, List<String> values) {
        String dt = values.get(0);
        String hour = values.get(1);
        return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
    }
}
分區(qū)提交策略

分區(qū)提交策略定義分區(qū)提交時執(zhí)行哪些操作

  • 第一個是元數(shù)據(jù),只有hive表支持元數(shù)據(jù)策略,文件系統(tǒng)通過目錄結構管理分區(qū)。
  • 第二個是success文件,在分區(qū)對一個的目錄下寫一個空文件。
Key 要求 是否可被傳遞 默認值 類型 描述
sink.partition-commit.policy.kind 可選 (none) String 指定提交分區(qū)并通知下游應用程序,該分區(qū)已經完成寫入并可進行讀取的策略。 metastore:將分區(qū)寫入元數(shù)據(jù)。只有hive表支持元數(shù)據(jù)策略,文件系統(tǒng)通過目錄結構來管理分區(qū)。 success-file:在目錄中增加 _success 文件。這兩個方式可以同時配置: metastore,success-file custom:使用策略類創(chuàng)建一個提交策略。 支持配置多個策略:metastore,success-file。
sink.partition-commit.policy.class 可選 (none) String 實現(xiàn)了PartitionCommitPolicy接口的分區(qū)提交策略實現(xiàn)類。只在自定義custom提交策略中起作用。
sink.partition-commit.success-file.name 可選 _SUCCESS String success-file分區(qū)提交的文件名稱,默認為: _SUCCESS
6、sink并行度

寫入文件到外部文件系統(tǒng)的并行度(包括hive),可以通過表的option選項來配置,流模式和批模式都支持這么做。
默認情況下,slink的并行度和上游鏈在一起的算子并行度一致。如果配置了和上游算子不同的并行度,則寫入文件算子的并行度將使用配置的并行度。

Key 要求 是否可被傳遞 默認值 類型 描述
sink.parallelism 可選 (none) Integer 將文件寫入外部文件系統(tǒng)的并行度。數(shù)值應該大于0,否則將拋出異常。

注:目前,配置sink并行度只支持上游算子為僅插入INERT-ONLY類型的變更日志模式,否則將拋出異常。

7、完整案例

下面的例子展示文件系統(tǒng)連接器如何通過流查詢從kafka讀取數(shù)據(jù),然后寫入文件系統(tǒng),并且通過批查詢從文件系統(tǒng)中讀取寫入的數(shù)據(jù)。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table 
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
    DATE_FORMAT(log_ts, 'HH') 
FROM kafka_table;

-- 批式sql,查詢指定分區(qū)下的數(shù)據(jù)
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

如果水印定義在TIMESTAMP_LTZ類型的字段上,并且被用于分區(qū)提交時間,則sink.partition-commit.watermark-time-zone配置必須設置為會話時間分區(qū),否則分區(qū)提交將會晚幾個小時。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, -- 毫秒值
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在TIMESTAMP_LTZ字段上定義水印
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 表名用戶配置的時區(qū)為:'Asia/Shanghai'
  'sink.partition-commit.policy.kind'='success-file'
);

-- 流式sql,插入數(shù)據(jù)到文件系統(tǒng)
INSERT INTO fs_table
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
    DATE_FORMAT(ts_ltz, 'HH') 
FROM kafka_table;

-- 批式sql,查詢指定分區(qū)下的數(shù)據(jù)
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

4.4.5 第三方連接器

例如:flink-doris-connector

create table cdc_mysql_source(
  id int,
  name varchar,
  primary key(id) not enforced
)with(
  'connector' = 'mysql-cdc',
  'hostname' = 'centos01',
  'port' = '3306',
  'username' = 'root',
  'password' = 'root',
  'database-name' = 'test',
  'table-name' = 't_test'
)

-- 支持刪除事件同步(sink.enable-delete='true'),需要 Doris 表開啟批量刪除功能
CREATE TABLE doris_sink (
  id INT,
  name STRING
) WITH (
    'connector' = 'doris',
    'fenodes' ='centos01:8030',
    'table.identifier' = 'test.t_test',
    'username' = 'root',
    'password' = 'root',
    'sink.properties.format' = 'json',
    'sink.properties.strip_outer_array' = 'true',
    'sink.enable-delete' = 'true'
)

insert into doris_sink select id,name from cdc_mysql_source;

flink-hudi-connector文章來源地址http://www.zghlxwxcb.cn/news/detail-626695.html

-- 1、創(chuàng)建測試表
CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t2',
  'table.type' = 'MERGE_ON_READ'
);

-- 2、執(zhí)行插入
insert into t2 select * from sourceT;

到了這里,關于flink1.14 sql基礎語法(二) flink sql表定義詳解的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flink1.17 自定義trigger ContinuousEventTimeTrigger

    在?ContinuousEventTimeTrigger 的基礎上新增了timeout,如果超時后窗口都沒關閉,那么就硬輸出一波,避免間斷數(shù)據(jù),留存窗口太久. ContinuousEventTimeTrigger連續(xù)事件時間觸發(fā)器與ContinuousProcessingTimeTrigger連續(xù)處理時間觸發(fā)器,指定一個固定時間間隔interval,不需要等到窗口結束才能獲取結果

    2024年02月14日
    瀏覽(41)
  • Flink1.14提交任務報錯classloader.check-leaked-classloader問題解決

    我的hadoop版本是3.1.3,F(xiàn)link版本是1.14。不知道是hadoop版本的原因還是Flink版本更新的原因。當我運行一個簡單的Flink測試時,雖然結果出來了但是后面還跟著一段報錯信息。 測試命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 報錯信息: Trying to acce

    2024年02月11日
    瀏覽(26)
  • 使用Flink1.16.0的SQLGateway遷移Hive SQL任務

    使用Flink1.16.0的SQLGateway遷移Hive SQL任務

    使用Flink的SQL Gateway遷移Hive SQL任務 我們有數(shù)萬個離線任務,主要還是默認的DataPhin調度CDP集群的Hive On Tez這種低成本任務,當然也有PySpark、打Jar包的Spark和打Jar包的Flink任務這種高成本的任務【Java和Scala都有】。畢竟SQL上手門檻極低,是個人都能寫幾下并且跑起來,還可以很容

    2023年04月08日
    瀏覽(22)
  • Flink1.17 基礎知識

    Flink1.17 基礎知識

    來源:B站尚硅谷 Flink 概述 Flink 是什么 Flink的核心目標是“ 數(shù)據(jù)流上的有狀態(tài)計算 ” (Stateful Computations over Data Streams)。 具體來說:Apache Flink是一個 框架式和分布式處理引擎 ,用于對無界和有界數(shù)據(jù)流進行有 狀態(tài)計算 。 Flink特點 處理數(shù)據(jù)的目標是: 低延遲、高吞吐、結

    2024年01月25日
    瀏覽(24)
  • 實時數(shù)倉|基于Flink1.11的SQL構建實時數(shù)倉探索實踐

    實時數(shù)倉主要是為了解決傳統(tǒng)數(shù)倉數(shù)據(jù)時效性低的問題,實時數(shù)倉通常會用在實時的 OLAP 分析、實時的數(shù)據(jù)看板、業(yè)務指標實時監(jiān)控等場景。雖然關于實時數(shù)倉的架構及技術選型與傳統(tǒng)的離線數(shù)倉會存在差異,但是關于數(shù)倉建設的基本方法論是一致的。本文會分享基于 Flink

    2024年02月16日
    瀏覽(22)
  • [flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer

    [flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer

    升級flink1.14.4報錯? Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table \\\'default_catalog.default_database.new_buyer_trade_order2\\\'? ? ?source表未加主鍵導致,注釋放開,提交成功

    2024年02月15日
    瀏覽(19)
  • (二開)Flink 修改源碼拓展 SQL 語法

    (二開)Flink 修改源碼拓展 SQL 語法

    1、Flink 擴展 calcite 中的語法解析 1)定義需要的 SqlNode 節(jié)點類-以 SqlShowCatalogs 為例 a)類位置 flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java 核心方法 : b)類血緣 2)修改 includes 目錄下的 .ftl 文件,在 parserImpls.ftl 文件中添加語法邏輯 a)文件位

    2024年02月07日
    瀏覽(13)
  • Flink---14、Flink SQL(SQL-Client準備、流處理中的表、時間屬性、DDL)

    Flink---14、Flink SQL(SQL-Client準備、流處理中的表、時間屬性、DDL)

    ?????????????????????? 星光下的趕路人star的個人主頁 ?????????????????????? 你生而真實,而非完美 Table API和SQL是最上層的API,在Flink中這兩種API被集成在一起,SQL執(zhí)行的對象也是Flink中的表(Table),所以我們一般會認

    2024年02月06日
    瀏覽(23)
  • 【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE

    CREATE 語句用于向當前或指定的 Catalog 中注冊庫、表、視圖或函數(shù)。注冊后的庫、表、視圖和函數(shù)可以在 SQL 查詢中使用。 目前 Flink SQL 支持下列 CREATE 語句: CREATE TABLE CREATE DATABASE CREATE VIEW CREATE FUNCTION 下面的 SQL 語句就是建表語句的定義,根據(jù)指定的表名創(chuàng)建一個表,如果同

    2024年02月21日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包