国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

22、Flink 的table api與sql之創(chuàng)建表的DDL

這篇具有很好參考價(jià)值的文章主要介紹了22、Flink 的table api與sql之創(chuàng)建表的DDL。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文介紹了Flink 的table api和sql中的DDL操作與示例。
本文比較簡(jiǎn)單,僅僅是介紹Flink 的DDL。

一、DDL概述

CREATE 語句用于向當(dāng)前或指定的 Catalog 中注冊(cè)表、視圖或函數(shù)。注冊(cè)后的表、視圖和函數(shù)可以在 SQL 查詢中使用。
目前 Flink SQL 支持下列 CREATE 語句:

  • CREATE TABLE
  • CREATE CATALOG
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

二、執(zhí)行 CREATE 語句

可以使用 TableEnvironment 中的 executeSql() 方法執(zhí)行 CREATE 語句。 若 CREATE 操作執(zhí)行成功,executeSql() 方法返回 ‘OK’,否則會(huì)拋出異常。

1、java

以下的例子展示了如何在 TableEnvironment 中執(zhí)行一個(gè) CREATE 語句。

EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 對(duì)已注冊(cè)的表進(jìn)行 SQL 查詢
// 注冊(cè)名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 SQL 查詢,并把得到的結(jié)果作為一個(gè)新的表
Table result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// 對(duì)已注冊(cè)的表進(jìn)行 INSERT 操作
// 注冊(cè) TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 INSERT 語句并向 TableSink 發(fā)出結(jié)果
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

2、SQL Cli

Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

三、CREATE TABLE語法

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]
   
<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

根據(jù)指定的表名創(chuàng)建一個(gè)表,如果同名表已經(jīng)在 catalog 中存在了,則無法注冊(cè)。

1、Columns

1、Physical / Regular Columns

物理列是數(shù)據(jù)庫中已知的常規(guī)列。它們定義物理數(shù)據(jù)中字段的名稱、類型和順序。因此,物理列表示從外部系統(tǒng)讀取和寫入的有效負(fù)載。連接器和格式使用這些列(按定義的順序)來配置自身??梢栽谖锢砹兄g聲明其他類型的列,但不會(huì)影響最終的physical schema。

以下語句創(chuàng)建一個(gè)僅包含常規(guī)列的表:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);

2、Metadata Columns

元數(shù)據(jù)列是SQL標(biāo)準(zhǔn)的擴(kuò)展,允許訪問連接器和/或格式化表的每一行的特定字段。元數(shù)據(jù)列由元數(shù)據(jù)關(guān)鍵字指示。例如,元數(shù)據(jù)列可用于從 Kafka 記錄讀取和寫入時(shí)間戳,以便進(jìn)行基于時(shí)間的操作。連接器和格式文檔列出了每個(gè)組件的可用元數(shù)據(jù)字段。但是,在table’s schema中聲明元數(shù)據(jù)列是可選的。

以下語句創(chuàng)建一個(gè)表,其中包含引用元數(shù)據(jù)字段時(shí)間戳的附加元數(shù)據(jù)列:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH (
  'connector' = 'kafka'
  ...
);

每個(gè)元數(shù)據(jù)字段都由基于字符串的鍵標(biāo)識(shí),并具有記錄的數(shù)據(jù)類型。例如,Kafka 連接器公開一個(gè)元數(shù)據(jù)字段,其中包含可用于讀取和寫入記錄的鍵時(shí)間戳和數(shù)據(jù)類型 TIMESTAMP_LTZ(3)。

在上面的示例中,元數(shù)據(jù)列record_time成為table’s schema的一部分,并且可以像常規(guī)列一樣進(jìn)行轉(zhuǎn)換和存儲(chǔ):

INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;

為方便起見,如果列名應(yīng)用作標(biāo)識(shí)元數(shù)據(jù)鍵,則可以省略 FROM 子句:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key
) WITH (
  'connector' = 'kafka'
  ...
);

為方便起見,如果列的數(shù)據(jù)類型與元數(shù)據(jù)字段的數(shù)據(jù)類型不同,運(yùn)行時(shí)將執(zhí)行顯式強(qiáng)制轉(zhuǎn)換。當(dāng)然,這要求兩種數(shù)據(jù)類型兼容。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT
) WITH (
  'connector' = 'kafka'
  ...
);

默認(rèn)情況下,計(jì)劃器假定元數(shù)據(jù)列可用于讀取和寫入。但是,在許多情況下,外部系統(tǒng)提供的只讀元數(shù)據(jù)字段多于可寫字段。因此,可以使用 VIRTUAL 關(guān)鍵字從持久保留中排除元數(shù)據(jù)列。

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema
  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

在上面的示例中,偏移量是只讀元數(shù)據(jù)列, 在query-to-sink schema中排除。因此,source-to-query schema(對(duì)于 SELECT)和query-to-sink (對(duì)于INSERT INTO)架構(gòu)不同:

  • source-to-query schema:
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
  • query-to-sink schema:
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)

3、Computed Columns

計(jì)算列是使用 AS computed_column_expression語法column_name生成的虛擬列。

計(jì)算列計(jì)算可引用同一表中聲明的其他列的表達(dá)式??梢栽L問物理列和元數(shù)據(jù)列。列本身不以物理方式存儲(chǔ)在表中。列的數(shù)據(jù)類型是從給定表達(dá)式自動(dòng)派生的,不必手動(dòng)聲明。

計(jì)劃器將在源之后將計(jì)算列轉(zhuǎn)換為常規(guī)投影。對(duì)于優(yōu)化或水印策略下推,評(píng)估可能會(huì)分布在運(yùn)算符之間、多次執(zhí)行或在給定查詢不需要時(shí)跳過。

例如,計(jì)算列可以定義為:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries
) WITH (
  'connector' = 'kafka'
  ...
);

表達(dá)式可以包含列、常量或函數(shù)的任意組合。表達(dá)式不能包含子查詢。

計(jì)算列在 Flink 中通常用于定義 CREATE TABLE 語句中的時(shí)間屬性。

處理時(shí)間屬性可以通過 proc AS PROCTIME() 使用系統(tǒng)的 PROCTIME() 函數(shù)輕松定義。
可以在 WATERMARK 聲明之前預(yù)處理事件時(shí)間屬性時(shí)間戳。例如,如果原始字段不是 TIMESTAMP(3) 類型或嵌套在 JSON 字符串中,則可以使用計(jì)算列。

與虛擬元數(shù)據(jù)列類似,計(jì)算列從持久化中排除。因此,計(jì)算列不能是 INSERT INTO 語句的目標(biāo)。因此,source-to-query schema(對(duì)于 SELECT)和query-to-sink (for INSERT INTO) schema不同:

  • source-to-query schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
  • query-to-sink schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

2、WATERMARK

WATERMARK 定義了表的事件時(shí)間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。

rowtime_column_name 把一個(gè)現(xiàn)有的列定義為一個(gè)為表標(biāo)記事件時(shí)間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個(gè)計(jì)算列。

watermark_strategy_expression 定義了 watermark 的生成策略。它允許使用包括計(jì)算列在內(nèi)的任意非查詢表達(dá)式來計(jì)算 watermark ;表達(dá)式的返回類型必須是 TIMESTAMP(3),表示了從 Epoch 以來的經(jīng)過的時(shí)間。 返回的 watermark 只有當(dāng)其不為空且其值大于之前發(fā)出的本地 watermark 時(shí)才會(huì)被發(fā)出(以保證 watermark 遞增)。每條記錄的 watermark 生成表達(dá)式計(jì)算都會(huì)由框架完成。 框架會(huì)定期發(fā)出所生成的最大的 watermark ,如果當(dāng)前 watermark 仍然與前一個(gè) watermark 相同、為空、或返回的 watermark 的值小于最后一個(gè)發(fā)出的 watermark ,則新的 watermark 不會(huì)被發(fā)出。 Watermark 根據(jù) pipeline.auto-watermark-interval 中所配置的間隔發(fā)出。 若 watermark 的間隔是 0ms ,那么每條記錄都會(huì)產(chǎn)生一個(gè) watermark,且 watermark 會(huì)在不為空并大于上一個(gè)發(fā)出的 watermark 時(shí)發(fā)出。

使用事件時(shí)間語義時(shí),表必須包含事件時(shí)間屬性和 watermark 策略。

Flink 提供了三種常用的 watermark 策略。

  • 嚴(yán)格遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column。
    發(fā)出到目前為止已觀察到的最大時(shí)間戳的 watermark ,時(shí)間戳大于最大時(shí)間戳的行被認(rèn)為沒有遲到。

  • 遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
    發(fā)出到目前為止已觀察到的最大時(shí)間戳減 1 的 watermark ,時(shí)間戳大于或等于最大時(shí)間戳的行被認(rèn)為沒有遲到。

  • 有界亂序時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
    發(fā)出到目前為止已觀察到的最大時(shí)間戳減去指定延遲的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一個(gè) 5 秒延遲的 watermark 策略。

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

3、PRIMARY KEY

主鍵用作 Flink 優(yōu)化的一種提示信息。主鍵限制表明一張表或視圖的某個(gè)(些)列是唯一的并且不包含 Null 值。 主鍵聲明的列都是非 nullable 的。因此主鍵可以被用作表行級(jí)別的唯一標(biāo)識(shí)。

主鍵可以和列的定義一起聲明,也可以獨(dú)立聲明為表的限制屬性,不管是哪種方式,主鍵都不可以重復(fù)定義,否則 Flink 會(huì)報(bào)錯(cuò)。

SQL 標(biāo)準(zhǔn)主鍵限制可以有兩種模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否輸入/出數(shù)據(jù)會(huì)做合法性檢查(是否唯一)。Flink 不存儲(chǔ)數(shù)據(jù)因此只支持 NOT ENFORCED 模式,即不做檢查,用戶需要自己保證唯一性。

Flink 假設(shè)聲明了主鍵的列都是不包含 Null 值的,Connector 在處理數(shù)據(jù)時(shí)需要自己保證語義正確。

在 CREATE TABLE 語句中,創(chuàng)建主鍵會(huì)修改列的 nullable 屬性,主鍵聲明的列默認(rèn)都是非 Nullable 的。

4、PARTITIONED BY

根據(jù)指定的列對(duì)已經(jīng)創(chuàng)建的表進(jìn)行分區(qū)。若表使用 filesystem sink ,則將會(huì)為每個(gè)分區(qū)創(chuàng)建一個(gè)目錄。

5、WITH Options

表屬性用于創(chuàng)建 table source/sink ,一般用于尋找和創(chuàng)建底層的連接器。

表達(dá)式 key1=val1 的鍵和值必須為字符串文本常量。請(qǐng)參考 連接外部系統(tǒng)Flink(十六)Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式

表名可以為以下三種格式

  1. catalog_name.db_name.table_name ,使用catalog_name.db_name.table_name 的表將會(huì)與名為 “catalog_name” 的 catalog 和名為 “db_name” 的數(shù)據(jù)庫一起注冊(cè)到 metastore 中
  2. db_name.table_name ,使用 db_name.table_name 的表將會(huì)被注冊(cè)到當(dāng)前執(zhí)行的 table environment 中的 catalog 且數(shù)據(jù)庫會(huì)被命名為 “db_name”
  3. table_name,對(duì)于 table_name, 數(shù)據(jù)表將會(huì)被注冊(cè)到當(dāng)前正在運(yùn)行的catalog和數(shù)據(jù)庫中
使用 CREATE TABLE 語句注冊(cè)的表均可用作 table source 和 table sink。 在被 DML 語句引用前,我們無法決定其實(shí)際用于 source 抑或是 sink。

6、LIKE

LIKE 子句來源于兩種 SQL 特性的變體/組合(Feature T171,“表定義中的 LIKE 語法” 和 Feature T173,“表定義中的 LIKE 語法擴(kuò)展”)。LIKE 子句可以基于現(xiàn)有表的定義去創(chuàng)建新表,并且可以擴(kuò)展或排除原始表中的某些部分。與 SQL 標(biāo)準(zhǔn)相反,LIKE 子句必須在 CREATE 語句中定義,并且是基于 CREATE 語句的更上層定義,這是因?yàn)?LIKE 子句可以用于定義表的多個(gè)部分,而不僅僅是 schema 部分。

可以使用該子句,重用(或改寫)指定的連接器配置屬性或者可以向外部表添加 watermark 定義,例如可以向 Apache Hive 中定義的表添加 watermark 定義。

示例如下:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改寫 startup-mode 屬性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

結(jié)果表 Orders_with_watermark 等效于使用以下語句創(chuàng)建的表:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

表屬性的合并邏輯可以用 like options 來控制。

可以控制合并的表屬性如下:

  • CONSTRAINTS - 主鍵和唯一鍵約束
  • GENERATED - 計(jì)算列
  • OPTIONS - 連接器信息、格式化方式等配置項(xiàng)
  • PARTITIONS - 表分區(qū)信息
  • WATERMARKS - watermark 定義

并且有三種不同的表屬性合并策略:

  • INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重復(fù)則會(huì)直接失敗,例如新表和源表存在相同 key 的屬性
  • EXCLUDING - 新表不包含源表指定的任何表屬性
  • OVERWRITING - 新表包含源表的表屬性,但如果出現(xiàn)重復(fù)項(xiàng),則會(huì)用新表的表屬性覆蓋源表中的重復(fù)表屬性,例如,兩個(gè)表中都存在相同 key 的屬性,則會(huì)使用當(dāng)前語句中定義的 key 的屬性值

并且你可以使用 INCLUDING/EXCLUDING ALL 這種聲明方式來指定使用怎樣的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 屬性才會(huì)被包含進(jìn)新表。

示例如下:

-- 存儲(chǔ)在文件系統(tǒng)的源表
CREATE TABLE Orders_in_file (
    `user` BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`) 
WITH ( 
    'connector' = 'filesystem',
    'path' = '...'
);

-- 對(duì)應(yīng)存儲(chǔ)在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的計(jì)算列之外的所有內(nèi)容。
    -- 去除不適用于 kafka 的所有分區(qū)和文件系統(tǒng)的相關(guān)屬性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

如果未提供 like 配置項(xiàng)(like options),默認(rèn)將使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。

您無法選擇物理列的合并策略,當(dāng)物理列進(jìn)行合并時(shí)就如使用了 INCLUDING 策略。

源表 source_table 可以是一個(gè)組合 ID。您可以指定不同 catalog 或者 DB 的表作為源表: 例如,my_catalog.my_db.MyTable 指定了源表 MyTable 來源于名為 MyCatalog 的 catalog 和名為 my_db 的 DB ,my_db.MyTable 指定了源表 MyTable 來源于當(dāng)前 catalog 和名為 my_db 的 DB。

7、AS select_statement

表也可以通過一個(gè) CTAS 語句中的查詢結(jié)果來創(chuàng)建并填充數(shù)據(jù),CTAS 是一種簡(jiǎn)單、快捷的創(chuàng)建表并插入數(shù)據(jù)的方法。

CTAS 有兩個(gè)部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查詢。 CREATE 部分從 SELECT 查詢中獲取列信息,并創(chuàng)建目標(biāo)表。 與 CREATE TABLE 類似,CTAS 要求必須在目標(biāo)表的 WITH 子句中指定必填的表屬性。

CTAS 的建表操作需要依賴目標(biāo) Catalog。比如,Hive Catalog 會(huì)自動(dòng)在 Hive 中創(chuàng)建物理表。但是基于內(nèi)存的 Catalog 只會(huì)將表的元信息注冊(cè)在執(zhí)行 SQL 的 Client 的內(nèi)存中。

示例如下:

CREATE TABLE my_ctas_table
WITH (
    'connector' = 'kafka',
    ...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

結(jié)果表 my_ctas_table 等效于使用以下語句創(chuàng)建表并寫入數(shù)據(jù):

CREATE TABLE my_ctas_table (
    id BIGINT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    ...
);
 
INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意 CTAS 有如下約束:

  • 暫不支持創(chuàng)建臨時(shí)表。
  • 暫不支持指定列信息。
  • 暫不支持指定 Watermark。
  • 暫不支持創(chuàng)建分區(qū)表。
  • 暫不支持主鍵約束。

目前,CTAS 創(chuàng)建的目標(biāo)表是非原子性的,如果在向表中插入數(shù)據(jù)時(shí)發(fā)生錯(cuò)誤,該表不會(huì)被自動(dòng)刪除。

三、CREATE CATALOG

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

使用給定的目錄屬性創(chuàng)建目錄。如果已存在同名目錄,則會(huì)引發(fā)異常。

用于存儲(chǔ)與此目錄相關(guān)的額外信息的目錄屬性。表達(dá)式 key1=val1 的鍵和值都應(yīng)該是字符串文本。

關(guān)于Catalogs,請(qǐng)參考Flink(二十四)Flink 的table api與sql之Catalogs

四、CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

根據(jù)給定的表屬性創(chuàng)建數(shù)據(jù)庫。若數(shù)據(jù)庫中已存在同名表會(huì)拋出異常。

  • 1、IF NOT EXISTS

若數(shù)據(jù)庫已經(jīng)存在,則不會(huì)進(jìn)行任何操作。

  • 2、WITH OPTIONS

數(shù)據(jù)庫屬性一般用于存儲(chǔ)關(guān)于這個(gè)數(shù)據(jù)庫額外的信息。 表達(dá)式 key1=val1 中的鍵和值都需要是字符串文本常量。

五、CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [{columnName [, columnName ]* }] [COMMENT view_comment]
  AS query_expression

根據(jù)給定的 query 語句創(chuàng)建一個(gè)視圖。若數(shù)據(jù)庫中已經(jīng)存在同名視圖會(huì)拋出異常.

  • 1、TEMPORARY

創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的臨時(shí)視圖,并覆蓋原有的視圖。

  • 2、IF NOT EXISTS

若該視圖已經(jīng)存在,則不會(huì)進(jìn)行任何操作。

六、CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
  [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]

創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的 catalog function ,需要指定一個(gè) identifier ,可指定 language tag 。 若 catalog 中,已經(jīng)有同名的函數(shù)注冊(cè)了,則無法注冊(cè)。

如果 language tag 是 JAVA 或者 SCALA ,則 identifier 是 UDF 實(shí)現(xiàn)類的全限定名。關(guān)于 JAVA/SCALA UDF 的實(shí)現(xiàn),請(qǐng)參考 Flink(二十五)Flink 的table api與sql之函數(shù)。

  • TEMPORARY

創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的臨時(shí) catalog function ,并覆蓋原有的 catalog function 。

  • TEMPORARY SYSTEM

創(chuàng)建一個(gè)沒有數(shù)據(jù)庫命名空間的臨時(shí)系統(tǒng) catalog function ,并覆蓋系統(tǒng)內(nèi)置的函數(shù)。

  • IF NOT EXISTS

若該函數(shù)已經(jīng)存在,則不會(huì)進(jìn)行任何操作。

  • LANGUAGE JAVA|SCALA|PYTHON

Language tag 用于指定 Flink runtime 如何執(zhí)行這個(gè)函數(shù)。目前,只支持 JAVA, SCALA 和 PYTHON,且函數(shù)的默認(rèn)語言為 JAVA。

  • USING

指定包含該函數(shù)的實(shí)現(xiàn)及其依賴的 jar 資源列表。該 jar 應(yīng)該位于 Flink 當(dāng)前支持的本地或遠(yuǎn)程文件系統(tǒng) 中,比如 hdfs/s3/oss。

注意 目前只有 JAVA、SCALA 語言支持 USING 子句。

以上,介紹了Flink 的table api和sql中的DDL操作與示例。文章來源地址http://www.zghlxwxcb.cn/news/detail-660699.html

到了這里,關(guān)于22、Flink 的table api與sql之創(chuàng)建表的DDL的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月24日
    瀏覽(19)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 時(shí)態(tài)表的join(java版本)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(20)
  • Flink流批一體計(jì)算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊(cè)成臨時(shí)表 ·執(zhí)行 SQL 查詢 ·注冊(cè)用戶自定義的 (標(biāo)量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭(zhēng)取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開始做實(shí)時(shí)數(shù)倉了。據(jù)說是應(yīng)屆生得把實(shí)時(shí)數(shù)倉搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過了一些簡(jiǎn)單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(52)
  • 《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    第四章中介紹了 DataStream API 以及 DataSet API 的入門案例,本章開始介紹 Table API 以及基于此的高層應(yīng)用 Flink SQL 的基礎(chǔ)。 Flink 提供了兩個(gè)關(guān)系A(chǔ)PI——Table API 和 SQL——用于統(tǒng)一的流和批處理。Table API 是一種針對(duì)Java、Scala和Python的語言集成查詢API,它允許以非常直觀的方式組合來

    2024年02月03日
    瀏覽(48)
  • 【Flink SQL】Flink SQL 基礎(chǔ)概念(一):SQL & Table 運(yùn)行環(huán)境、基本概念及常用 API

    《 Flink SQL 基礎(chǔ)概念 》系列,共包含以下 5 篇文章: Flink SQL 基礎(chǔ)概念(一):SQL Table 運(yùn)行環(huán)境、基本概念及常用 API Flink SQL 基礎(chǔ)概念(二):數(shù)據(jù)類型 Flink SQL 基礎(chǔ)概念(三):SQL 動(dòng)態(tài)表 連續(xù)查詢 Flink SQL 基礎(chǔ)概念(四):SQL 的時(shí)間屬性 Flink SQL 基礎(chǔ)概念(五):SQL 時(shí)區(qū)問

    2024年03月21日
    瀏覽(99)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語言的超集并專門為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語言集成式的 API 。與常規(guī) SQL 語言

    2024年02月04日
    瀏覽(25)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(內(nèi)聯(lián)接、外聯(lián)接以及聯(lián)接自定義函數(shù)等)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月19日
    瀏覽(26)
  • Flink(十三)Flink 的table api與sql的基本概念、通用api介紹及入門示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月15日
    瀏覽(23)
  • Flink Table API/SQL 多分支sink

    在某個(gè)場(chǎng)景中,需要從Kafka中獲取數(shù)據(jù),經(jīng)過轉(zhuǎn)換處理后,需要同時(shí)sink到多個(gè)輸出源中(kafka、mysql、hologres)等。兩次調(diào)用execute, 阿里云Flink vvr引擎報(bào)錯(cuò): 使用 StreamStatementSet. 具體參考官網(wǎng): https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包