CREATE 語句用于向當前或指定的 Catalog 中注冊庫、表、視圖或函數(shù)。注冊后的庫、表、視圖和函數(shù)可以在 SQL 查詢中使用。
目前 Flink SQL 支持下列 CREATE 語句:
CREATE TABLE
CREATE DATABASE
CREATE VIEW
CREATE FUNCTION
1.建表語句
下面的 SQL 語句就是建表語句的定義,根據(jù)指定的表名創(chuàng)建一個表,如果同名表已經(jīng)在 Catalog 中存在了,則無法注冊。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
2.表中的列
2.1 常規(guī)列(物理列)
物理列 是數(shù)據(jù)庫中所說的 常規(guī)列。其定義了物理介質中存儲的數(shù)據(jù)中字段的名稱、類型和順序。其他類型的列可以在物理列之間聲明,但不會影響最終的物理列的讀取。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
2.2 元數(shù)據(jù)列
元數(shù)據(jù)列 是 SQL 標準的擴展,允許訪問數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA
關鍵字標識。
例如,我們可以使用元數(shù)據(jù)列從 Kafka 數(shù)據(jù)中讀取 Kafka 數(shù)據(jù)自帶的時間戳(這個時間戳不是數(shù)據(jù)中的某個時間戳字段,而是數(shù)據(jù)寫入 Kafka 時,Kafka 引擎給這條數(shù)據(jù)打上的時間戳標記),然后我們可以在 Flink SQL 中使用這個時間戳,比如進行基于時間的窗口操作。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
元數(shù)據(jù)列可以用于后續(xù)數(shù)據(jù)的處理,或者寫入到目標表中。
INSERT INTO MyTable
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;
如果自定義的列名稱和 Connector 中定義 metadata
字段的名稱一樣的話,FROM xxx
子句是可以被省略的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
關于 Flink SQL 的每種 Connector 都提供了哪些 metadata
字段,詳細可見 官網(wǎng)文檔。
如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata
字段的數(shù)據(jù)類型不一致的話,程序運行時會自動 cast
強轉。但是這要求兩種數(shù)據(jù)類型是可以強轉的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 將時間戳強轉為 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默認情況下,F(xiàn)link SQL Planner 認為 metadata
列是可以 讀取 也可以 寫入 的。但是有些外部存儲系統(tǒng)的元數(shù)據(jù)信息是只能用于讀取,不能寫入的。
那么在往一個表寫入的場景下,我們就可以使用 VIRTUAL
關鍵字來標識某個元數(shù)據(jù)列不寫入到外部存儲中(不持久化)。
CREATE TABLE MyTable (
-- sink 時會寫入
`timestamp` BIGINT METADATA,
-- sink 時不寫入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面這個案例中,Kafka 引擎的 offset
是只讀的。所以我們在把 MyTable
作為數(shù)據(jù)源(輸入)表時,Schema 中是包含 offset
的。在把 MyTable
作為數(shù)據(jù)匯(輸出)表時,Schema 中是不包含 offset
的。如下:
所以這里在寫入時需要注意,不要在 SQL 的 INSERT INTO
語句中寫入 offset
列,否則 Flink SQL 任務會直接報錯。
2.3 計算列
計算列 其實就是在寫建表的 DDL 時,可以拿已有的一些列經(jīng)過一些自定義的運算生成的新列。這些列本身是沒有以物理形式存儲到數(shù)據(jù)源中的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的計算列,計算方式為 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);
計算列可以包含其他列、常量或者函數(shù),但是不能寫一個子查詢進去。
小伙伴萌這時會問到一個問題,既然只能包含列、常量或者函數(shù)計算,我就直接在 DML Query 代碼中寫就完事了唄,為啥還要專門在 DDL 中定義呢?
沒錯,如果只是簡單的四則運算的話直接寫在 DML 中就可以,但是 計算列一般是用于定義時間屬性的(因為在 SQL 任務中時間屬性只能在 DDL 中定義,不能在 DML 語句中定義)。比如要把輸入數(shù)據(jù)的時間格式標準化。處理時間、事件時間分別舉例如下:
- 處理時間:使用
PROCTIME()
函數(shù)來定義處理時間列。 - 事件時間:事件時間的時間戳可以在聲明 Watermark 之前進行預處理。比如,如果字段不是
TIMESTAMP(3)
類型或者時間戳是嵌套在 JSON 字符串中的,則可以使用計算列進行預處理。
?注意:和虛擬
metadata
列是類似的,計算列也是只能讀不能寫的。
也就是說,我們在把 MyTable 作為數(shù)據(jù)源(輸入)表時,Schema 中是包含 cost
的。
在把 MyTable 作為數(shù)據(jù)匯(輸出)表時,Schema 中是不包含 cost
的。
-- 當做數(shù)據(jù)源(輸入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 當做數(shù)據(jù)匯(輸出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
3.定義 Watermark
Watermark 是在 Create Table 中進行定義的。具體 SQL 語法標準是:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
-
rowtime_column_name
:表的事件時間屬性字段。該列必須是TIMESTAMP(3)
、TIMESTAMP_LTZ(3)
類,這個時間可以是一個計算列。 -
watermark_strategy_expression
:定義 Watermark 的生成策略。Watermark 的一般都是由rowtime_column_name
列減掉一段固定時間間隔。SQL 中 Watermark 的生產(chǎn)策略是:當前 Watermark 大于上次發(fā)出的 Watermark 時發(fā)出當前 Watermark。
注意:
- 如果你使用的是事件時間語義,那么必須要設設置事件時間屬性和 WATERMARK 生成策略。
- Watermark 的發(fā)出頻率:Watermark 發(fā)出一般是間隔一定時間的,Watermark 的發(fā)出間隔時間可以由
pipeline.auto-watermark-interval
進行配置,如果設置為 200ms 則每 200ms 會計算一次 Watermark,如果比之前發(fā)出的 Watermark 大,則發(fā)出。如果間隔設為 0ms,則 Watermark 只要滿足觸發(fā)條件就會發(fā)出,不會受到間隔時間控制。
Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:
-
有界無序:設置方式為
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
。此類策略就可以用于設置最大亂序時間,假如設置為WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
,則生成的是運行 5s 延遲的 Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場景中,而對應到實際的場景中,數(shù)據(jù)都是會存在亂序的,所以基本都使用此類策略。 -
嚴格升序:設置方式為
WATERMARK FOR rowtime_column AS rowtime_column
。一般基本不用這種方式。如果你能保證你的數(shù)據(jù)源的時間戳是嚴格升序的,那就可以使用這種方式。嚴格升序代表 Flink 任務認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小于之前的,就認為是遲到的數(shù)據(jù)。 -
遞增:設置方式為
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
。一般基本不用這種方式。如果設置此類,則允許有相同的時間戳出現(xiàn)。
4.Create Table With 子句
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
可以看到 DDL 中 With 子句就是在建表時,描述數(shù)據(jù)源、數(shù)據(jù)匯的具體外部存儲的元數(shù)據(jù)信息的。
一般 With 中的配置項由 Flink SQL 的 Connector(鏈接外部存儲的連接器) 來定義,每種 Connector 提供的 With 配置項都是不同的。
注意:
- Flink SQL 中 Connector 其實就是 Flink 用于鏈接外部數(shù)據(jù)源的接口。舉一個類似的例子,在 Java 中想連接到 MySQL,需要使用
mysql-connector-java
包提供的 Java API 去鏈接。映射到 Flink SQL 中,在 Flink SQL 中要連接到 Kafka,需要使用kafka connector
。 - Flink SQL 已經(jīng)提供了一系列的內置 Connector,具體可見:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
回到上述案例中,With 聲明了以下幾項信息:
-
'connector' = 'kafka'
:聲明外部存儲是 Kafka。 -
'topic' = 'user_behavior'
:聲明 Flink SQL 任務要連接的 Kafka 表的topic
是user_behavior
。 -
'properties.bootstrap.servers' = 'localhost:9092'
:聲明 Kafka 的server ip
是localhost:9092
。 -
'properties.group.id' = 'testGroup'
:聲明 Flink SQL 任務消費這個 Kafka topic,會使用testGroup
的group id
去消費。 -
'scan.startup.mode' = 'earliest-offset'
:聲明 Flink SQL 任務消費這個 Kafka topic 會從最早位點開始消費。 -
'format' = 'csv'
:聲明 Flink SQL 任務讀入或者寫出時對于 Kafka 消息的序列化方式是csv
格式。
從這里也可以看出來 With 中具體要配置哪些配置項都是和每種 Connector 決定的。
5.Create Table Like 子句
Like 子句是 Create Table 子句的一個延伸。
下面定義了一張 Orders
表:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
但是忘記定義 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定義一張帶 Watermark 的新表:
CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆蓋了原 Orders 表中 scan.startup.mode 參數(shù)
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句聲明是在原來的 Orders 表的基礎上定義 Orders_with_watermark 表
LIKE Orders;
上面這個語句的效果就等同于:文章來源:http://www.zghlxwxcb.cn/news/detail-832557.html
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
不過這種不常使用。就不過多介紹了。如果小伙伴萌感興趣,直接去 官網(wǎng) 參考具體注意事項。文章來源地址http://www.zghlxwxcb.cn/news/detail-832557.html
到了這里,關于【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!