国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE

這篇具有很好參考價值的文章主要介紹了【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

CREATE 語句用于向當前或指定的 Catalog 中注冊庫、表、視圖或函數(shù)。注冊后的庫、表、視圖和函數(shù)可以在 SQL 查詢中使用。

目前 Flink SQL 支持下列 CREATE 語句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

1.建表語句

下面的 SQL 語句就是建表語句的定義,根據(jù)指定的表名創(chuàng)建一個表,如果同名表已經(jīng)在 Catalog 中存在了,則無法注冊。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

2.表中的列

2.1 常規(guī)列(物理列)

物理列 是數(shù)據(jù)庫中所說的 常規(guī)列。其定義了物理介質中存儲的數(shù)據(jù)中字段的名稱、類型和順序。其他類型的列可以在物理列之間聲明,但不會影響最終的物理列的讀取。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);

2.2 元數(shù)據(jù)列

元數(shù)據(jù)列 是 SQL 標準的擴展,允許訪問數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA 關鍵字標識。

例如,我們可以使用元數(shù)據(jù)列從 Kafka 數(shù)據(jù)中讀取 Kafka 數(shù)據(jù)自帶的時間戳(這個時間戳不是數(shù)據(jù)中的某個時間戳字段,而是數(shù)據(jù)寫入 Kafka 時,Kafka 引擎給這條數(shù)據(jù)打上的時間戳標記),然后我們可以在 Flink SQL 中使用這個時間戳,比如進行基于時間的窗口操作。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  -- 讀取 kafka 本身自帶的時間戳
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka'
  ...
);

元數(shù)據(jù)列可以用于后續(xù)數(shù)據(jù)的處理,或者寫入到目標表中。

INSERT INTO MyTable 
SELECT 
    user_id
    , name
    , record_time + INTERVAL '1' SECOND 
FROM MyTable;

如果自定義的列名稱和 Connector 中定義 metadata 字段的名稱一樣的話,FROM xxx 子句是可以被省略的。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  -- 讀取 kafka 本身自帶的時間戳
  `timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
  'connector' = 'kafka'
  ...
);

關于 Flink SQL 的每種 Connector 都提供了哪些 metadata 字段,詳細可見 官網(wǎng)文檔。

如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata 字段的數(shù)據(jù)類型不一致的話,程序運行時會自動 cast 強轉。但是這要求兩種數(shù)據(jù)類型是可以強轉的。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  -- 將時間戳強轉為 BIGINT
  `timestamp` BIGINT METADATA
) WITH (
  'connector' = 'kafka'
  ...
);

默認情況下,F(xiàn)link SQL Planner 認為 metadata 列是可以 讀取 也可以 寫入 的。但是有些外部存儲系統(tǒng)的元數(shù)據(jù)信息是只能用于讀取,不能寫入的。

那么在往一個表寫入的場景下,我們就可以使用 VIRTUAL 關鍵字來標識某個元數(shù)據(jù)列不寫入到外部存儲中(不持久化)。

CREATE TABLE MyTable (
  -- sink 時會寫入
  `timestamp` BIGINT METADATA,
  -- sink 時不寫入
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

在上面這個案例中,Kafka 引擎的 offset 是只讀的。所以我們在把 MyTable 作為數(shù)據(jù)源(輸入)表時,Schema 中是包含 offset 的。在把 MyTable 作為數(shù)據(jù)匯(輸出)表時,Schema 中是不包含 offset 的。如下:

所以這里在寫入時需要注意,不要在 SQL 的 INSERT INTO 語句中寫入 offset 列,否則 Flink SQL 任務會直接報錯。

2.3 計算列

計算列 其實就是在寫建表的 DDL 時,可以拿已有的一些列經(jīng)過一些自定義的運算生成的新列。這些列本身是沒有以物理形式存儲到數(shù)據(jù)源中的。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  -- cost 就是使用 price 和 quanitity 生成的計算列,計算方式為 price * quanitity
  `cost` AS price * quanitity,
) WITH (
  'connector' = 'kafka'
  ...
);

計算列可以包含其他列、常量或者函數(shù),但是不能寫一個子查詢進去。

小伙伴萌這時會問到一個問題,既然只能包含列、常量或者函數(shù)計算,我就直接在 DML Query 代碼中寫就完事了唄,為啥還要專門在 DDL 中定義呢?

沒錯,如果只是簡單的四則運算的話直接寫在 DML 中就可以,但是 計算列一般是用于定義時間屬性的(因為在 SQL 任務中時間屬性只能在 DDL 中定義,不能在 DML 語句中定義)。比如要把輸入數(shù)據(jù)的時間格式標準化。處理時間、事件時間分別舉例如下:

  • 處理時間:使用 PROCTIME() 函數(shù)來定義處理時間列。
  • 事件時間:事件時間的時間戳可以在聲明 Watermark 之前進行預處理。比如,如果字段不是 TIMESTAMP(3) 類型或者時間戳是嵌套在 JSON 字符串中的,則可以使用計算列進行預處理。

?注意:和虛擬 metadata 列是類似的,計算列也是只能讀不能寫的。

也就是說,我們在把 MyTable 作為數(shù)據(jù)源(輸入)表時,Schema 中是包含 cost 的。

在把 MyTable 作為數(shù)據(jù)匯(輸出)表時,Schema 中是不包含 cost 的。

-- 當做數(shù)據(jù)源(輸入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)

-- 當做數(shù)據(jù)匯(輸出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

3.定義 Watermark

Watermark 是在 Create Table 中進行定義的。具體 SQL 語法標準是:

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  • rowtime_column_name:表的事件時間屬性字段。該列必須是 TIMESTAMP(3)TIMESTAMP_LTZ(3) 類,這個時間可以是一個計算列。
  • watermark_strategy_expression:定義 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列減掉一段固定時間間隔。SQL 中 Watermark 的生產(chǎn)策略是:當前 Watermark 大于上次發(fā)出的 Watermark 時發(fā)出當前 Watermark。

注意:

  • 如果你使用的是事件時間語義,那么必須要設設置事件時間屬性和 WATERMARK 生成策略。
  • Watermark 的發(fā)出頻率:Watermark 發(fā)出一般是間隔一定時間的,Watermark 的發(fā)出間隔時間可以由 pipeline.auto-watermark-interval 進行配置,如果設置為 200ms 則每 200ms 會計算一次 Watermark,如果比之前發(fā)出的 Watermark 大,則發(fā)出。如果間隔設為 0ms,則 Watermark 只要滿足觸發(fā)條件就會發(fā)出,不會受到間隔時間控制。

Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:

  • 有界無序:設置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此類策略就可以用于設置最大亂序時間,假如設置為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,則生成的是運行 5s 延遲的 Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場景中,而對應到實際的場景中,數(shù)據(jù)都是會存在亂序的,所以基本都使用此類策略。
  • 嚴格升序:設置方式為 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用這種方式。如果你能保證你的數(shù)據(jù)源的時間戳是嚴格升序的,那就可以使用這種方式。嚴格升序代表 Flink 任務認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小于之前的,就認為是遲到的數(shù)據(jù)。
  • 遞增:設置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用這種方式。如果設置此類,則允許有相同的時間戳出現(xiàn)。

4.Create Table With 子句

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

可以看到 DDL 中 With 子句就是在建表時,描述數(shù)據(jù)源、數(shù)據(jù)匯的具體外部存儲的元數(shù)據(jù)信息的

一般 With 中的配置項由 Flink SQL 的 Connector(鏈接外部存儲的連接器) 來定義,每種 Connector 提供的 With 配置項都是不同的。

注意:

  • Flink SQL 中 Connector 其實就是 Flink 用于鏈接外部數(shù)據(jù)源的接口。舉一個類似的例子,在 Java 中想連接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去鏈接。映射到 Flink SQL 中,在 Flink SQL 中要連接到 Kafka,需要使用 kafka connector。
  • Flink SQL 已經(jīng)提供了一系列的內置 Connector,具體可見:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

回到上述案例中,With 聲明了以下幾項信息:

  • 'connector' = 'kafka':聲明外部存儲是 Kafka。
  • 'topic' = 'user_behavior':聲明 Flink SQL 任務要連接的 Kafka 表的 topicuser_behavior。
  • 'properties.bootstrap.servers' = 'localhost:9092':聲明 Kafka 的 server iplocalhost:9092。
  • 'properties.group.id' = 'testGroup':聲明 Flink SQL 任務消費這個 Kafka topic,會使用 testGroupgroup id 去消費。
  • 'scan.startup.mode' = 'earliest-offset':聲明 Flink SQL 任務消費這個 Kafka topic 會從最早位點開始消費。
  • 'format' = 'csv':聲明 Flink SQL 任務讀入或者寫出時對于 Kafka 消息的序列化方式是 csv 格式。

從這里也可以看出來 With 中具體要配置哪些配置項都是和每種 Connector 決定的。

5.Create Table Like 子句

Like 子句是 Create Table 子句的一個延伸。

下面定義了一張 Orders 表:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

但是忘記定義 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定義一張帶 Watermark 的新表:

CREATE TABLE Orders_with_watermark (
    -- 1. 添加了 WATERMARK 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 2. 覆蓋了原 Orders 表中 scan.startup.mode 參數(shù)
    'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句聲明是在原來的 Orders 表的基礎上定義 Orders_with_watermark 表
LIKE Orders;

上面這個語句的效果就等同于:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

不過這種不常使用。就不過多介紹了。如果小伙伴萌感興趣,直接去 官網(wǎng) 參考具體注意事項。文章來源地址http://www.zghlxwxcb.cn/news/detail-832557.html

到了這里,關于【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 【yarn】 ‘husky install‘ fails if ‘.git‘ directory does not exists解決方法

    【yarn】 ‘husky install‘ fails if ‘.git‘ directory does not exists解決方法

    環(huán)境:win10 + yarn 1.22.19 問題:在使用yarn安裝前端依賴時,yarn install 出現(xiàn)錯誤: .git can’t be found (see https://git.io/Jc3F9) error Command failed with exit code 1. 截圖 根據(jù)設計,husky安裝必須在與 .git 相同的目錄中運行,但可以在準備腳本期間更改目錄并傳遞子目錄 打開web目錄下的 package.

    2024年02月16日
    瀏覽(21)
  • 【大數(shù)據(jù)】Flink SQL 語法篇(一):CREATE

    CREATE 語句用于向當前或指定的 Catalog 中注冊庫、表、視圖或函數(shù)。注冊后的庫、表、視圖和函數(shù)可以在 SQL 查詢中使用。 目前 Flink SQL 支持下列 CREATE 語句: CREATE TABLE CREATE DATABASE CREATE VIEW CREATE FUNCTION 下面的 SQL 語句就是建表語句的定義,根據(jù)指定的表名創(chuàng)建一個表,如果同

    2024年02月21日
    瀏覽(24)
  • 24、Flink 的table api與sql之Catalogs(java api操作視圖)-3

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月07日
    瀏覽(28)
  • 【大數(shù)據(jù)】Flink SQL 語法篇(六):Temporal Join

    《 Flink SQL 語法篇 》系列,共包含以下 10 篇文章: Flink SQL 語法篇(一):CREATE Flink SQL 語法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 語法篇(四):Group 聚合、Over 聚合 Flink SQL 語法篇(五):Regular Join、

    2024年03月15日
    瀏覽(67)
  • SQL中exists和not exists的用法

    SQL中exists和not exists的用法

    exists 和 not exists用法 exists和not exists 用于檢查子查詢是否至少會返回一行數(shù)據(jù),該子查詢實際上并不返回任何數(shù)據(jù),而是返回值TRUE或FALSE。 只不過exists和not exists 正好相反 exists(sql 返回結果集,為真) exists括號中sql語句有結果,才會繼續(xù)執(zhí)行where 條件,沒結果視為條件不成立

    2024年02月16日
    瀏覽(23)
  • 24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù)、表)-4

    24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù)、表)-4

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月08日
    瀏覽(29)
  • 【大數(shù)據(jù)】Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function

    《 Flink SQL 語法篇 》系列,共包含以下 10 篇文章: Flink SQL 語法篇(一):CREATE Flink SQL 語法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 語法篇(四):Group 聚合、Over 聚合 Flink SQL 語法篇(五):Regular Join、

    2024年04月25日
    瀏覽(54)
  • SQL 相關子查詢 和 不相關子查詢、Exists 、Not Exists、 多表連接(包含自連接)

    SQL 相關子查詢 和 不相關子查詢、Exists 、Not Exists、 多表連接(包含自連接)

    不相關子查詢 子查詢的查詢條件不依賴于父查詢,稱不相關子查詢。子查詢可以單獨運行的 相關子查詢 ==== 關聯(lián)子查詢 子查詢的查詢條件依賴于父查詢,稱為 相關子查詢。子查詢不能單獨運行的 子查詢 也稱 內部查詢 父查詢 也稱 外部查詢 如果子查詢的執(zhí)行依賴于外部查

    2024年02月14日
    瀏覽(68)
  • Flink Catalog 解讀與同步 Hudi 表元數(shù)據(jù)的最佳實踐

    Flink Catalog 解讀與同步 Hudi 表元數(shù)據(jù)的最佳實踐

    博主歷時三年精心創(chuàng)作的《大數(shù)據(jù)平臺架構與原型實現(xiàn):數(shù)據(jù)中臺建設實戰(zhàn)》一書現(xiàn)已由知名IT圖書品牌電子工業(yè)出版社博文視點出版發(fā)行,點擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側

    2024年02月22日
    瀏覽(23)
  • 存儲過程不要使用IF EXISTS 使用@RecordCount = count(1) 查詢是否存在數(shù)據(jù)

    業(yè)務需求:在存儲過程中保存前判斷:是否數(shù)據(jù)庫中已經(jīng)存在要存儲的條碼信息,如果存在,則拋出提示信息,不存儲 錯誤代碼 該代碼IF EXISTS 無法走,不知道為什么 修改成

    2024年01月19日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包