Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文介紹了Flink 的table api和sql中的DDL操作與示例。
本文比較簡(jiǎn)單,僅僅是介紹Flink 的DDL。
一、DDL概述
CREATE 語句用于向當(dāng)前或指定的 Catalog 中注冊(cè)表、視圖或函數(shù)。注冊(cè)后的表、視圖和函數(shù)可以在 SQL 查詢中使用。
目前 Flink SQL 支持下列 CREATE 語句:
- CREATE TABLE
- CREATE CATALOG
- CREATE DATABASE
- CREATE VIEW
- CREATE FUNCTION
二、執(zhí)行 CREATE 語句
可以使用 TableEnvironment 中的 executeSql() 方法執(zhí)行 CREATE 語句。 若 CREATE 操作執(zhí)行成功,executeSql() 方法返回 ‘OK’,否則會(huì)拋出異常。
1、java
以下的例子展示了如何在 TableEnvironment 中執(zhí)行一個(gè) CREATE 語句。
EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 對(duì)已注冊(cè)的表進(jìn)行 SQL 查詢
// 注冊(cè)名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 SQL 查詢,并把得到的結(jié)果作為一個(gè)新的表
Table result = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// 對(duì)已注冊(cè)的表進(jìn)行 INSERT 操作
// 注冊(cè) TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 INSERT 語句并向 TableSink 發(fā)出結(jié)果
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
2、SQL Cli
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
三、CREATE TABLE語法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
根據(jù)指定的表名創(chuàng)建一個(gè)表,如果同名表已經(jīng)在 catalog 中存在了,則無法注冊(cè)。
1、Columns
1、Physical / Regular Columns
物理列是數(shù)據(jù)庫中已知的常規(guī)列。它們定義物理數(shù)據(jù)中字段的名稱、類型和順序。因此,物理列表示從外部系統(tǒng)讀取和寫入的有效負(fù)載。連接器和格式使用這些列(按定義的順序)來配置自身??梢栽谖锢砹兄g聲明其他類型的列,但不會(huì)影響最終的physical schema。
以下語句創(chuàng)建一個(gè)僅包含常規(guī)列的表:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
2、Metadata Columns
元數(shù)據(jù)列是SQL標(biāo)準(zhǔn)的擴(kuò)展,允許訪問連接器和/或格式化表的每一行的特定字段。元數(shù)據(jù)列由元數(shù)據(jù)關(guān)鍵字指示。例如,元數(shù)據(jù)列可用于從 Kafka 記錄讀取和寫入時(shí)間戳,以便進(jìn)行基于時(shí)間的操作。連接器和格式文檔列出了每個(gè)組件的可用元數(shù)據(jù)字段。但是,在table’s schema中聲明元數(shù)據(jù)列是可選的。
以下語句創(chuàng)建一個(gè)表,其中包含引用元數(shù)據(jù)字段時(shí)間戳的附加元數(shù)據(jù)列:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH (
'connector' = 'kafka'
...
);
每個(gè)元數(shù)據(jù)字段都由基于字符串的鍵標(biāo)識(shí),并具有記錄的數(shù)據(jù)類型。例如,Kafka 連接器公開一個(gè)元數(shù)據(jù)字段,其中包含可用于讀取和寫入記錄的鍵時(shí)間戳和數(shù)據(jù)類型 TIMESTAMP_LTZ(3)。
在上面的示例中,元數(shù)據(jù)列record_time成為table’s schema的一部分,并且可以像常規(guī)列一樣進(jìn)行轉(zhuǎn)換和存儲(chǔ):
INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
為方便起見,如果列名應(yīng)用作標(biāo)識(shí)元數(shù)據(jù)鍵,則可以省略 FROM 子句:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA -- use column name as metadata key
) WITH (
'connector' = 'kafka'
...
);
為方便起見,如果列的數(shù)據(jù)類型與元數(shù)據(jù)字段的數(shù)據(jù)類型不同,運(yùn)行時(shí)將執(zhí)行顯式強(qiáng)制轉(zhuǎn)換。當(dāng)然,這要求兩種數(shù)據(jù)類型兼容。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` BIGINT METADATA -- cast the timestamp as BIGINT
) WITH (
'connector' = 'kafka'
...
);
默認(rèn)情況下,計(jì)劃器假定元數(shù)據(jù)列可用于讀取和寫入。但是,在許多情況下,外部系統(tǒng)提供的只讀元數(shù)據(jù)字段多于可寫字段。因此,可以使用 VIRTUAL 關(guān)鍵字從持久保留中排除元數(shù)據(jù)列。
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA, -- part of the query-to-sink schema
`offset` BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面的示例中,偏移量是只讀元數(shù)據(jù)列, 在query-to-sink schema中排除。因此,source-to-query schema(對(duì)于 SELECT)和query-to-sink (對(duì)于INSERT INTO)架構(gòu)不同:
- source-to-query schema:
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
- query-to-sink schema:
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
3、Computed Columns
計(jì)算列是使用 AS computed_column_expression語法column_name生成的虛擬列。
計(jì)算列計(jì)算可引用同一表中聲明的其他列的表達(dá)式??梢栽L問物理列和元數(shù)據(jù)列。列本身不以物理方式存儲(chǔ)在表中。列的數(shù)據(jù)類型是從給定表達(dá)式自動(dòng)派生的,不必手動(dòng)聲明。
計(jì)劃器將在源之后將計(jì)算列轉(zhuǎn)換為常規(guī)投影。對(duì)于優(yōu)化或水印策略下推,評(píng)估可能會(huì)分布在運(yùn)算符之間、多次執(zhí)行或在給定查詢不需要時(shí)跳過。
例如,計(jì)算列可以定義為:
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity, -- evaluate expression and supply the result to queries
) WITH (
'connector' = 'kafka'
...
);
表達(dá)式可以包含列、常量或函數(shù)的任意組合。表達(dá)式不能包含子查詢。
計(jì)算列在 Flink 中通常用于定義 CREATE TABLE 語句中的時(shí)間屬性。
處理時(shí)間屬性可以通過 proc AS PROCTIME() 使用系統(tǒng)的 PROCTIME() 函數(shù)輕松定義。
可以在 WATERMARK 聲明之前預(yù)處理事件時(shí)間屬性時(shí)間戳。例如,如果原始字段不是 TIMESTAMP(3) 類型或嵌套在 JSON 字符串中,則可以使用計(jì)算列。
與虛擬元數(shù)據(jù)列類似,計(jì)算列從持久化中排除。因此,計(jì)算列不能是 INSERT INTO 語句的目標(biāo)。因此,source-to-query schema(對(duì)于 SELECT)和query-to-sink (for INSERT INTO) schema不同:
- source-to-query schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
- query-to-sink schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
2、WATERMARK
WATERMARK 定義了表的事件時(shí)間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。
rowtime_column_name 把一個(gè)現(xiàn)有的列定義為一個(gè)為表標(biāo)記事件時(shí)間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個(gè)計(jì)算列。
watermark_strategy_expression 定義了 watermark 的生成策略。它允許使用包括計(jì)算列在內(nèi)的任意非查詢表達(dá)式來計(jì)算 watermark ;表達(dá)式的返回類型必須是 TIMESTAMP(3),表示了從 Epoch 以來的經(jīng)過的時(shí)間。 返回的 watermark 只有當(dāng)其不為空且其值大于之前發(fā)出的本地 watermark 時(shí)才會(huì)被發(fā)出(以保證 watermark 遞增)。每條記錄的 watermark 生成表達(dá)式計(jì)算都會(huì)由框架完成。 框架會(huì)定期發(fā)出所生成的最大的 watermark ,如果當(dāng)前 watermark 仍然與前一個(gè) watermark 相同、為空、或返回的 watermark 的值小于最后一個(gè)發(fā)出的 watermark ,則新的 watermark 不會(huì)被發(fā)出。 Watermark 根據(jù) pipeline.auto-watermark-interval 中所配置的間隔發(fā)出。 若 watermark 的間隔是 0ms ,那么每條記錄都會(huì)產(chǎn)生一個(gè) watermark,且 watermark 會(huì)在不為空并大于上一個(gè)發(fā)出的 watermark 時(shí)發(fā)出。
使用事件時(shí)間語義時(shí),表必須包含事件時(shí)間屬性和 watermark 策略。
Flink 提供了三種常用的 watermark 策略。
-
嚴(yán)格遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column。
發(fā)出到目前為止已觀察到的最大時(shí)間戳的 watermark ,時(shí)間戳大于最大時(shí)間戳的行被認(rèn)為沒有遲到。 -
遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
發(fā)出到目前為止已觀察到的最大時(shí)間戳減 1 的 watermark ,時(shí)間戳大于或等于最大時(shí)間戳的行被認(rèn)為沒有遲到。 -
有界亂序時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
發(fā)出到目前為止已觀察到的最大時(shí)間戳減去指定延遲的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一個(gè) 5 秒延遲的 watermark 策略。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
3、PRIMARY KEY
主鍵用作 Flink 優(yōu)化的一種提示信息。主鍵限制表明一張表或視圖的某個(gè)(些)列是唯一的并且不包含 Null 值。 主鍵聲明的列都是非 nullable 的。因此主鍵可以被用作表行級(jí)別的唯一標(biāo)識(shí)。
主鍵可以和列的定義一起聲明,也可以獨(dú)立聲明為表的限制屬性,不管是哪種方式,主鍵都不可以重復(fù)定義,否則 Flink 會(huì)報(bào)錯(cuò)。
SQL 標(biāo)準(zhǔn)主鍵限制可以有兩種模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否輸入/出數(shù)據(jù)會(huì)做合法性檢查(是否唯一)。Flink 不存儲(chǔ)數(shù)據(jù)因此只支持 NOT ENFORCED 模式,即不做檢查,用戶需要自己保證唯一性。
Flink 假設(shè)聲明了主鍵的列都是不包含 Null 值的,Connector 在處理數(shù)據(jù)時(shí)需要自己保證語義正確。
在 CREATE TABLE 語句中,創(chuàng)建主鍵會(huì)修改列的 nullable 屬性,主鍵聲明的列默認(rèn)都是非 Nullable 的。
4、PARTITIONED BY
根據(jù)指定的列對(duì)已經(jīng)創(chuàng)建的表進(jìn)行分區(qū)。若表使用 filesystem sink ,則將會(huì)為每個(gè)分區(qū)創(chuàng)建一個(gè)目錄。
5、WITH Options
表屬性用于創(chuàng)建 table source/sink ,一般用于尋找和創(chuàng)建底層的連接器。
表達(dá)式 key1=val1 的鍵和值必須為字符串文本常量。請(qǐng)參考 連接外部系統(tǒng)Flink(十六)Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式
表名可以為以下三種格式
- catalog_name.db_name.table_name ,使用catalog_name.db_name.table_name 的表將會(huì)與名為 “catalog_name” 的 catalog 和名為 “db_name” 的數(shù)據(jù)庫一起注冊(cè)到 metastore 中
- db_name.table_name ,使用 db_name.table_name 的表將會(huì)被注冊(cè)到當(dāng)前執(zhí)行的 table environment 中的 catalog 且數(shù)據(jù)庫會(huì)被命名為 “db_name”
- table_name,對(duì)于 table_name, 數(shù)據(jù)表將會(huì)被注冊(cè)到當(dāng)前正在運(yùn)行的catalog和數(shù)據(jù)庫中
使用 CREATE TABLE 語句注冊(cè)的表均可用作 table source 和 table sink。 在被 DML 語句引用前,我們無法決定其實(shí)際用于 source 抑或是 sink。
6、LIKE
LIKE 子句來源于兩種 SQL 特性的變體/組合(Feature T171,“表定義中的 LIKE 語法” 和 Feature T173,“表定義中的 LIKE 語法擴(kuò)展”)。LIKE 子句可以基于現(xiàn)有表的定義去創(chuàng)建新表,并且可以擴(kuò)展或排除原始表中的某些部分。與 SQL 標(biāo)準(zhǔn)相反,LIKE 子句必須在 CREATE 語句中定義,并且是基于 CREATE 語句的更上層定義,這是因?yàn)?LIKE 子句可以用于定義表的多個(gè)部分,而不僅僅是 schema 部分。
可以使用該子句,重用(或改寫)指定的連接器配置屬性或者可以向外部表添加 watermark 定義,例如可以向 Apache Hive 中定義的表添加 watermark 定義。
示例如下:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- 添加 watermark 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 改寫 startup-mode 屬性
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
結(jié)果表 Orders_with_watermark 等效于使用以下語句創(chuàng)建的表:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
表屬性的合并邏輯可以用 like options 來控制。
可以控制合并的表屬性如下:
- CONSTRAINTS - 主鍵和唯一鍵約束
- GENERATED - 計(jì)算列
- OPTIONS - 連接器信息、格式化方式等配置項(xiàng)
- PARTITIONS - 表分區(qū)信息
- WATERMARKS - watermark 定義
并且有三種不同的表屬性合并策略:
- INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重復(fù)則會(huì)直接失敗,例如新表和源表存在相同 key 的屬性
- EXCLUDING - 新表不包含源表指定的任何表屬性
- OVERWRITING - 新表包含源表的表屬性,但如果出現(xiàn)重復(fù)項(xiàng),則會(huì)用新表的表屬性覆蓋源表中的重復(fù)表屬性,例如,兩個(gè)表中都存在相同 key 的屬性,則會(huì)使用當(dāng)前語句中定義的 key 的屬性值
并且你可以使用 INCLUDING/EXCLUDING ALL 這種聲明方式來指定使用怎樣的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 屬性才會(huì)被包含進(jìn)新表。
示例如下:
-- 存儲(chǔ)在文件系統(tǒng)的源表
CREATE TABLE Orders_in_file (
`user` BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`)
WITH (
'connector' = 'filesystem',
'path' = '...'
);
-- 對(duì)應(yīng)存儲(chǔ)在 kafka 的源表
CREATE TABLE Orders_in_kafka (
-- 添加 watermark 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
)
LIKE Orders_in_file (
-- 排除需要生成 watermark 的計(jì)算列之外的所有內(nèi)容。
-- 去除不適用于 kafka 的所有分區(qū)和文件系統(tǒng)的相關(guān)屬性。
EXCLUDING ALL
INCLUDING GENERATED
);
如果未提供 like 配置項(xiàng)(like options),默認(rèn)將使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。
您無法選擇物理列的合并策略,當(dāng)物理列進(jìn)行合并時(shí)就如使用了 INCLUDING 策略。
源表 source_table 可以是一個(gè)組合 ID。您可以指定不同 catalog 或者 DB 的表作為源表: 例如,my_catalog.my_db.MyTable 指定了源表 MyTable 來源于名為 MyCatalog 的 catalog 和名為 my_db 的 DB ,my_db.MyTable 指定了源表 MyTable 來源于當(dāng)前 catalog 和名為 my_db 的 DB。
7、AS select_statement
表也可以通過一個(gè) CTAS 語句中的查詢結(jié)果來創(chuàng)建并填充數(shù)據(jù),CTAS 是一種簡(jiǎn)單、快捷的創(chuàng)建表并插入數(shù)據(jù)的方法。
CTAS 有兩個(gè)部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查詢。 CREATE 部分從 SELECT 查詢中獲取列信息,并創(chuàng)建目標(biāo)表。 與 CREATE TABLE 類似,CTAS 要求必須在目標(biāo)表的 WITH 子句中指定必填的表屬性。
CTAS 的建表操作需要依賴目標(biāo) Catalog。比如,Hive Catalog 會(huì)自動(dòng)在 Hive 中創(chuàng)建物理表。但是基于內(nèi)存的 Catalog 只會(huì)將表的元信息注冊(cè)在執(zhí)行 SQL 的 Client 的內(nèi)存中。
示例如下:
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka',
...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
結(jié)果表 my_ctas_table 等效于使用以下語句創(chuàng)建表并寫入數(shù)據(jù):
CREATE TABLE my_ctas_table (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
...
);
INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
注意 CTAS 有如下約束:
- 暫不支持創(chuàng)建臨時(shí)表。
- 暫不支持指定列信息。
- 暫不支持指定 Watermark。
- 暫不支持創(chuàng)建分區(qū)表。
- 暫不支持主鍵約束。
目前,CTAS 創(chuàng)建的目標(biāo)表是非原子性的,如果在向表中插入數(shù)據(jù)時(shí)發(fā)生錯(cuò)誤,該表不會(huì)被自動(dòng)刪除。
三、CREATE CATALOG
CREATE CATALOG catalog_name
WITH (key1=val1, key2=val2, ...)
使用給定的目錄屬性創(chuàng)建目錄。如果已存在同名目錄,則會(huì)引發(fā)異常。
用于存儲(chǔ)與此目錄相關(guān)的額外信息的目錄屬性。表達(dá)式 key1=val1 的鍵和值都應(yīng)該是字符串文本。
關(guān)于Catalogs,請(qǐng)參考Flink(二十四)Flink 的table api與sql之Catalogs
四、CREATE DATABASE
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
根據(jù)給定的表屬性創(chuàng)建數(shù)據(jù)庫。若數(shù)據(jù)庫中已存在同名表會(huì)拋出異常。
- 1、IF NOT EXISTS
若數(shù)據(jù)庫已經(jīng)存在,則不會(huì)進(jìn)行任何操作。
- 2、WITH OPTIONS
數(shù)據(jù)庫屬性一般用于存儲(chǔ)關(guān)于這個(gè)數(shù)據(jù)庫額外的信息。 表達(dá)式 key1=val1 中的鍵和值都需要是字符串文本常量。
五、CREATE VIEW
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[{columnName [, columnName ]* }] [COMMENT view_comment]
AS query_expression
根據(jù)給定的 query 語句創(chuàng)建一個(gè)視圖。若數(shù)據(jù)庫中已經(jīng)存在同名視圖會(huì)拋出異常.
- 1、TEMPORARY
創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的臨時(shí)視圖,并覆蓋原有的視圖。
- 2、IF NOT EXISTS
若該視圖已經(jīng)存在,則不會(huì)進(jìn)行任何操作。
六、CREATE FUNCTION
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
[USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的 catalog function ,需要指定一個(gè) identifier ,可指定 language tag 。 若 catalog 中,已經(jīng)有同名的函數(shù)注冊(cè)了,則無法注冊(cè)。
如果 language tag 是 JAVA 或者 SCALA ,則 identifier 是 UDF 實(shí)現(xiàn)類的全限定名。關(guān)于 JAVA/SCALA UDF 的實(shí)現(xiàn),請(qǐng)參考 Flink(二十五)Flink 的table api與sql之函數(shù)。
- TEMPORARY
創(chuàng)建一個(gè)有 catalog 和數(shù)據(jù)庫命名空間的臨時(shí) catalog function ,并覆蓋原有的 catalog function 。
- TEMPORARY SYSTEM
創(chuàng)建一個(gè)沒有數(shù)據(jù)庫命名空間的臨時(shí)系統(tǒng) catalog function ,并覆蓋系統(tǒng)內(nèi)置的函數(shù)。
- IF NOT EXISTS
若該函數(shù)已經(jīng)存在,則不會(huì)進(jìn)行任何操作。
- LANGUAGE JAVA|SCALA|PYTHON
Language tag 用于指定 Flink runtime 如何執(zhí)行這個(gè)函數(shù)。目前,只支持 JAVA, SCALA 和 PYTHON,且函數(shù)的默認(rèn)語言為 JAVA。
- USING
指定包含該函數(shù)的實(shí)現(xiàn)及其依賴的 jar 資源列表。該 jar 應(yīng)該位于 Flink 當(dāng)前支持的本地或遠(yuǎn)程文件系統(tǒng) 中,比如 hdfs/s3/oss。
注意 目前只有 JAVA、SCALA 語言支持 USING 子句。文章來源:http://www.zghlxwxcb.cn/news/detail-660699.html
以上,介紹了Flink 的table api和sql中的DDL操作與示例。文章來源地址http://www.zghlxwxcb.cn/news/detail-660699.html
到了這里,關(guān)于22、Flink 的table api與sql之創(chuàng)建表的DDL的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!