国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink-SQL——時(shí)態(tài)表(Temporal Table)

這篇具有很好參考價(jià)值的文章主要介紹了Flink-SQL——時(shí)態(tài)表(Temporal Table)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

時(shí)態(tài)表(Temporal Table)

數(shù)據(jù)庫(kù)時(shí)態(tài)表的實(shí)現(xiàn)邏輯

這里我們需要注意一下的是雖然我們介紹的是Flink 的 Temporal Table 但是這個(gè)概念最早是在數(shù)據(jù)庫(kù)中提出的

在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數(shù)據(jù)庫(kù)廠商也先后實(shí)現(xiàn)了這個(gè)標(biāo)準(zhǔn)。Temporal Table記錄了歷史上任何時(shí)間點(diǎn)所有的數(shù)據(jù)改動(dòng),Temporal Table的工作流程如下:

  1. 創(chuàng)建時(shí)態(tài)表
  2. 執(zhí)行DML 操作
  3. 執(zhí)行基于時(shí)態(tài)表的查詢(xún)

這里我們先介紹一下什么是時(shí)態(tài)表,時(shí)態(tài)表(Temporal Table)是一張隨時(shí)間變化的表,其實(shí)這是句廢話,因?yàn)樗械谋矶茧S著時(shí)間變化,那時(shí)態(tài)表的意義在那里呢,時(shí)態(tài)表有一個(gè)功能就是可以返回當(dāng)時(shí)表的狀態(tài)(數(shù)據(jù)),也就是說(shuō)我們可以查詢(xún)?nèi)我鈺r(shí)間點(diǎn)的數(shù)據(jù),有點(diǎn)類(lèi)似快照,只不過(guò)這個(gè)快照是根據(jù)我們提供的時(shí)間篩選出來(lái)的,也就是說(shuō)時(shí)態(tài)表做到是時(shí)間旅行。

Temporal table 相比普通的表會(huì)做多一個(gè) history table, 把所有修改/刪除的結(jié)果存起來(lái), 所以任何數(shù)據(jù)都不會(huì)丟失,要找回也比較容易,有點(diǎn)類(lèi)似WAL 日志,但是不是,WAL存儲(chǔ)的是操作本身,Temporal table 存儲(chǔ)的是操作結(jié)果,和普通的表很像,只不過(guò)是每一條記錄提供了一個(gè)有效期的狀態(tài)

版本: 時(shí)態(tài)表可以劃分成一系列帶版本的表快照集合,表快照中的版本代表了快照中所有記錄的有效區(qū)間,有效區(qū)間的開(kāi)始時(shí)間和結(jié)束時(shí)間可以通過(guò)用戶(hù)指定,根據(jù)時(shí)態(tài)表是否可以追蹤自身的歷史版本與否,時(shí)態(tài)表可以分為 版本表普通表。

版本表: 如果時(shí)態(tài)表中的記錄可以追蹤和并訪問(wèn)它的歷史版本,這種表我們稱(chēng)之為版本表,來(lái)自數(shù)據(jù)庫(kù)的 changelog 可以定義成版本表。

普通表: 如果時(shí)態(tài)表中的記錄僅僅可以追蹤并和它的最新版本,這種表我們稱(chēng)之為普通表,來(lái)自數(shù)據(jù)庫(kù) 或 HBase 的表可以定義成普通表。

時(shí)態(tài)表的實(shí)現(xiàn)原理

時(shí)態(tài)表是有一對(duì)表而不是一個(gè)表,當(dāng)前表和歷史表。這些表都包含2個(gè)額外的datetime2字段用來(lái)定義每個(gè)行的可用期限:

  • 期限開(kāi)始列:系統(tǒng)把行的開(kāi)始時(shí)間記錄在這個(gè)列上,稱(chēng)為SysStartTime
  • 期限結(jié)束列:系統(tǒng)把行的結(jié)束時(shí)間記錄在這個(gè)列上,稱(chēng)為SysEndTime

當(dāng)前表包含了每個(gè)行的當(dāng)前值。歷史表包含每個(gè)行的之前的只,starttime,endtime表示行的可用期限。

Flink-SQL——時(shí)態(tài)表(Temporal Table),# Flink SQL,flink,sql

對(duì)于每個(gè)操作時(shí)態(tài)表都進(jìn)行相應(yīng)的操作

  1. INSERT:對(duì)于一個(gè)insert,系統(tǒng)會(huì)設(shè)置SysStartTime列為當(dāng)前事務(wù)的開(kāi)始時(shí)間,SysEndTime為最大的值9999-12-31
  2. UPDATE:對(duì)于update,系統(tǒng)會(huì)報(bào)之前的行保存到歷史表并且設(shè)置SysEndTime為當(dāng)前事務(wù)的啟動(dòng)時(shí)間。行被關(guān)閉,這個(gè)期限就是這個(gè)行的可用期限。這個(gè)行在當(dāng)前表上的值被修改,那么SysStartTime被設(shè)置為當(dāng)前事務(wù)的開(kāi)始時(shí)間。SysEndTime被設(shè)置為最大時(shí)間。
  3. DELETE:對(duì)于刪除,系統(tǒng)把之前的行保存到history表,并且設(shè)置SysEndtime為事務(wù)的開(kāi)始時(shí)間。標(biāo)記行關(guān)閉,期限記錄表示行的可用期限。當(dāng)前表中行被刪除。當(dāng)前的查詢(xún)不會(huì)被查到當(dāng)前行。只有帶時(shí)間的查詢(xún),或者直接查詢(xún)歷史表才能查到這個(gè)行。
  4. MERGE:對(duì)于MERGE涉及到3個(gè)操作INSERT,UPDATE,DELETE,根據(jù)操作的不同做不同的記錄(有的數(shù)據(jù)庫(kù)沒(méi)有這個(gè)操作)

時(shí)態(tài)表的查詢(xún)實(shí)現(xiàn)

可以使用select from的for system_time子句來(lái)查詢(xún)當(dāng)前表和歷史表的數(shù)據(jù)。其實(shí)這里我們可以看到就涉及到一個(gè)路由的問(wèn)題就是這個(gè)查詢(xún)事發(fā)生在當(dāng)前表和歷史表上的

Flink-SQL——時(shí)態(tài)表(Temporal Table),# Flink SQL,flink,sql

SELECT * FROM Employee  
    FOR SYSTEM_TIME    
        BETWEEN '2014-01-01 00:00:00.0000000' AND '2015-01-01 00:00:00.0000000'   
            WHERE EmployeeID = 1000 ORDER BY ValidFrom ;

FOR SYSTEM_TIME會(huì)過(guò)濾掉SysStartTime=SysEndTime的數(shù)據(jù)。這些行在同一個(gè)事務(wù)里面操作了同一行兒產(chǎn)生。只能通過(guò)查詢(xún)歷史表才能返回

關(guān)于SYSTEM_TIME過(guò)濾

表達(dá)式 符合條件的行 Description
AS OF<date_time> SysStartTime <= date_time AND SysEndTime > date_time 返回一個(gè)表,其行中包含過(guò)去指定時(shí)間點(diǎn)的實(shí)際(當(dāng)前)值。 在內(nèi)部,臨時(shí)表及其歷史記錄表之間將進(jìn)行聯(lián)合,然后篩選結(jié)果以返回在 <date_time> 參數(shù)指定的時(shí)間點(diǎn)有效的行中的值。 如果 system_start_time_column_name 值小于或等于 <date_time> 參數(shù)值,并且 system_end_time_column_name 值大于 <date_time> 參數(shù)值,則此行的值視為有效。
FROM<start_date_time>TO<end_date_time> SysStartTime < end_date_time AND SysEndTime > start_date_time 返回一個(gè)表,其中包含在指定的時(shí)間范圍內(nèi)保持活動(dòng)狀態(tài)的所有行版本的值,不管這些版本是在 FROM 自變量的 <start_date_time> 參數(shù)之前開(kāi)始活動(dòng),還是在 TO 自變量的 <end_date_time> 參數(shù)值之后停止活動(dòng)。 在內(nèi)部,將在臨時(shí)表及其歷史記錄表之間進(jìn)行聯(lián)合,然后篩選結(jié)果,以返回在指定時(shí)間范圍內(nèi)任意時(shí)間保持活動(dòng)狀態(tài)的所有行版本的值。 正好在 FROM 終結(jié)點(diǎn)定義的下限時(shí)間停止活動(dòng)的行將被排除,正好在 TO 終結(jié)點(diǎn)定義的上限時(shí)間開(kāi)始活動(dòng)的記錄也將被排除。
BETWEEN<start_date_time>AND<end_date_time> SysStartTime <= end_date_time AND SysEndTime > start_date_time 與上面的 FOR SYSTEM_TIME FROM <start_date_time>TO<end_date_time> 描述相同,不過(guò),返回的行表包括在 <end_date_time> 終結(jié)點(diǎn)定義的上限時(shí)間激活的行。
CONTAINED IN (<start_date_time> , <end_date_time>) SysStartTime >= start_date_time AND SysEndTime <= end_date_time 返回一個(gè)表,其中包含在 CONTAINED IN 參數(shù)的兩個(gè)日期時(shí)間值定義的時(shí)間范圍內(nèi)打開(kāi)和關(guān)閉的所有行版本的值。 正好在下限時(shí)間激活的記錄,或者在上限時(shí)間停止活動(dòng)的行將包括在內(nèi)。
ALL 所有行 返回屬于當(dāng)前表和歷史記錄表的行的聯(lián)合。

時(shí)態(tài)表的意義

  1. 審核所有數(shù)據(jù)更改并在必要時(shí)執(zhí)行數(shù)據(jù)取證
  2. 重建過(guò)去任何時(shí)間的數(shù)據(jù)狀態(tài)
  3. 計(jì)算一段時(shí)間內(nèi)的趨勢(shì)
  4. 為決策支持應(yīng)用程序維護(hù)緩慢變化的維度
  5. 從意外的數(shù)據(jù)更改和應(yīng)用程序錯(cuò)誤中恢復(fù)

時(shí)態(tài)表是一種用戶(hù)表,旨在保存數(shù)據(jù)更改的完整歷史記錄,從而實(shí)現(xiàn)輕松的時(shí)間點(diǎn)分析。這種類(lèi)型的時(shí)態(tài)又被稱(chēng)為系統(tǒng)版本的時(shí)態(tài)表,因?yàn)槊恳恍械挠行诙加上到y(tǒng)(即數(shù)據(jù)庫(kù)引擎)管理。

Flink中的時(shí)態(tài)表

時(shí)態(tài)表中的每條記錄都關(guān)聯(lián)了一個(gè)或多個(gè)時(shí)間段,所有的 Flink 表都是動(dòng)態(tài),這里的動(dòng)態(tài)和動(dòng)態(tài)表的概念一致,就是說(shuō)數(shù)據(jù)是不斷變化更新的,只不過(guò)時(shí)態(tài)表在動(dòng)態(tài)表的基礎(chǔ)上提供了更加強(qiáng)大的功能時(shí)間旅行——?dú)v史數(shù)據(jù)回溯。

Flink 使用主鍵約束和事件時(shí)間來(lái)定義一張時(shí)態(tài)表,我們有時(shí)候也稱(chēng)之為版本表,對(duì)應(yīng)的歷史表也稱(chēng)為版本試圖

設(shè)計(jì)初衷

這里我們看一下Flink中為什么要引入時(shí)態(tài)表,因?yàn)槲覀兊臄?shù)據(jù)是實(shí)時(shí)進(jìn)來(lái)的,而且我們的維度表是會(huì)發(fā)生變化的,所以對(duì)于實(shí)時(shí)進(jìn)來(lái)的數(shù)據(jù)我們希望在關(guān)聯(lián)的時(shí)候關(guān)聯(lián)上的是當(dāng)時(shí)的維度數(shù)據(jù),而不是當(dāng)前的

產(chǎn)品價(jià)格的例子——時(shí)態(tài)表

以訂單流關(guān)聯(lián)產(chǎn)品表這個(gè)場(chǎng)景舉例,orders 表包含了來(lái)自 Kafka 的實(shí)時(shí)訂單流,product_changelog 表來(lái)自數(shù)據(jù)庫(kù)表 products 的 changelog , 產(chǎn)品的價(jià)格在數(shù)據(jù)庫(kù)表 products 中是隨時(shí)間實(shí)時(shí)變化的。

SELECT * FROM product_changelog;

(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ ===== 
+(INSERT)         00:01:00     p_001      scooter      11.11
+(INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
+(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
+(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99 

product_changelog 表示數(shù)據(jù)庫(kù)表 products不斷增長(zhǎng)的 changelog, 比如,產(chǎn)品 scooter 在時(shí)間點(diǎn) 00:01:00的初始價(jià)格是 11.11, 在 12:00:00 的時(shí)候漲價(jià)到了 12.99, 在 18:00:00 的時(shí)候這條產(chǎn)品價(jià)格記錄被刪除。

如果我們想輸出 product_changelog 表在 10:00:00 對(duì)應(yīng)的版本,表的內(nèi)容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11

如果我們想輸出 product_changelog 表在 13:00:00 對(duì)應(yīng)的版本,表的內(nèi)容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99

上述例子中,products 表的版本是通過(guò) update_timeproduct_id 進(jìn)行追蹤的,product_id 對(duì)應(yīng) product_changelog 表的主鍵,update_time 對(duì)應(yīng)事件時(shí)間。

匯率的例子——普通表

另一方面,某些用戶(hù)案列需要連接變化的維表,該表是外部數(shù)據(jù)庫(kù)表。

假設(shè) LatestRates 是一個(gè)物化的最新匯率表 (比如:一張 HBase 表),LatestRates 總是表示 HBase 表 Rates 的最新內(nèi)容。

我們?cè)?10:15:00 時(shí)查詢(xún)到的內(nèi)容如下所示:

10:15:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      114
Yen       1

我們?cè)?11:00:00 時(shí)查詢(xún)到的內(nèi)容如下所示:

11:00:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      116
Yen       1

其實(shí)我們可以看到我們的匯率是實(shí)時(shí)變化的,如果是普通表的話我們永遠(yuǎn)只能查詢(xún)當(dāng)前最新的狀態(tài),無(wú)法獲取歷史的數(shù)據(jù)情況

聲明版本表

在 Flink 中,定義了主鍵約束和事件時(shí)間屬性的表就是版本表。

-- 定義一張版本表
CREATE TABLE product_changelog (
  product_id STRING,
  product_name STRING,
  product_price DECIMAL(10, 4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定義主鍵約束
  WATERMARK FOR update_time AS update_time   -- (2) 通過(guò) watermark 定義事件時(shí)間              
) WITH (
  'connector' = 'kafka',
  'topic' = 'products',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'debezium-json'
);

(1) 為表 product_changelog 定義了主鍵, 行 (2)update_time 定義為表 product_changelog 的事件時(shí)間,因此 product_changelog 是一張版本表。

注意: METADATA FROM 'value.source.timestamp' VIRTUAL 語(yǔ)法的意思是從每條 changelog 中抽取 changelog 對(duì)應(yīng)的數(shù)據(jù)庫(kù)表中操作的執(zhí)行時(shí)間,強(qiáng)烈推薦使用數(shù)據(jù)庫(kù)表中操作的 執(zhí)行時(shí)間作為事件時(shí)間 ,否則通過(guò)時(shí)間抽取的版本可能和數(shù)據(jù)庫(kù)中的版本不匹配。

聲明版本視圖

Flink 也支持定義版本視圖只要一個(gè)視圖包含主鍵和事件時(shí)間便是一個(gè)版本視圖。

假設(shè)我們有表 RatesHistory 如下所示:

-- 定義一張 append-only 表
CREATE TABLE RatesHistory (
    currency_time TIMESTAMP(3),
    currency STRING,
    rate DECIMAL(38, 10),
    WATERMARK FOR currency_time AS currency_time   -- 定義事件時(shí)間
) WITH (
  'connector' = 'kafka',
  'topic' = 'rates',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'                                -- 普通的 append-only 流
)

RatesHistory 代表一個(gè)兌換日元貨幣匯率表(日元匯率為1),該表是不斷增長(zhǎng)的 append-only 表。 例如,歐元 兌換 日元09:00:0010:45:00 的匯率為 114。從 10:45:0011:15:00 的匯率為 116。

SELECT * FROM RatesHistory;

currency_time currency  rate
============= ========= ====
09:00:00      US Dollar 102
09:00:00      Euro      114
09:00:00      Yen       1
10:45:00      Euro      116
11:15:00      Euro      119
11:49:00      Pounds    108

為了在 RatesHistory 上定義版本表,F(xiàn)link 支持通過(guò)去重查詢(xún)定義版本視圖, 去重查詢(xún)可以產(chǎn)出一個(gè)有序的 changelog 流,去重查詢(xún)能夠推斷主鍵并保留原始數(shù)據(jù)流的事件時(shí)間屬性。

CREATE VIEW versioned_rates AS              
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件時(shí)間
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
         ORDER BY currency_time DESC) AS rowNum 
      FROM RatesHistory )
WHERE rowNum = 1; 

-- 視圖 `versioned_rates` 將會(huì)產(chǎn)出如下的 changelog:

(changelog kind) currency_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      US Dollar  102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      Yen        1
+(UPDATE_AFTER)  10:45:00      Euro       116
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:49:00      Pounds     108

(1) 保留了事件時(shí)間作為視圖 versioned_rates 的事件時(shí)間,行 (2) 使得視圖 versioned_rates 有了主鍵, 因此視圖 versioned_rates 是一個(gè)版本視圖。

視圖中的去重 query 會(huì)被 Flink 優(yōu)化并高效地產(chǎn)出 changelog stream, 產(chǎn)出的 changelog 保留了主鍵約束和事件時(shí)間。

如果我們想輸出 versioned_rates 表在 11:00:00 對(duì)應(yīng)的版本,表的內(nèi)容如下所示:

currency_time currency   rate  
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       116

如果我們想輸出 versioned_rates 表在 12:00:00 對(duì)應(yīng)的版本,表的內(nèi)容如下所示:

currency_time currency   rate  
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       119
11:49:00      Pounds     108

聲明普通表

普通表的聲明和 Flink 建表 DDL 一致,參考 create table 頁(yè)面獲取更多如何建表的信息。

-- 用 DDL 定義一張 HBase 表,然后我們可以在 SQL 中將其當(dāng)作一張時(shí)態(tài)表使用
-- 'currency' 列是 HBase 表中的 rowKey
 CREATE TABLE LatestRates (   
     currency STRING,   
     fam1 ROW<rate DOUBLE>   
 ) WITH (   
    'connector' = 'hbase-1.4',   
    'table-name' = 'rates',   
    'zookeeper.quorum' = 'localhost:2181'   
 );

注意 理論上講任意都能用作時(shí)態(tài)表并在基于處理時(shí)間的時(shí)態(tài)表 Join 中使用,但當(dāng)前支持作為時(shí)態(tài)表的普通表必須實(shí)現(xiàn)接口 LookupableTableSource。接口 LookupableTableSource 的實(shí)例只能作為時(shí)態(tài)表用于基于處理時(shí)間的時(shí)態(tài) Join 。

一個(gè)完整的例子

舉個(gè)例子,假設(shè)你在Mysql中有兩張表: browse_event、product_history_info。

  • browse_event: 事件表,某個(gè)用戶(hù)在某個(gè)時(shí)刻瀏覽了某個(gè)商品,以及商品的價(jià)值。如下
SELECT * FROM browse_event;
    
+--------+---------------------+-----------+-----------+--------------+
| userID | eventTime           | eventType | productID | productPrice |
+--------+---------------------+-----------+-----------+--------------+
| user_1 | 2016-01-01 00:00:00 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:01 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:02 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:03 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:04 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:05 | browse    | product_5 |           20 |
| user_1 | 2016-01-01 00:00:06 | browse    | product_5 |           20 |
| user_2 | 2016-01-01 00:00:01 | browse    | product_3 |           20 |
| user_2 | 2016-01-01 00:00:02 | browse    | product_3 |           20 |
| user_2 | 2016-01-01 00:00:05 | browse    | product_3 |           20 |
| user_2 | 2016-01-01 00:00:06 | browse    | product_3 |           20 |
+--------+---------------------+-----------+-----------+--------------+

  • product_history_info:商品基礎(chǔ)信息表,記錄了商品歷史以來(lái)的基礎(chǔ)信息。如下:
SELECT * FROM product_history_info;
+-----------+-------------+-----------------+---------------------+
| productID | productName | productCategory | updatedAt           |
+-----------+-------------+-----------------+---------------------+
| product_5 | name50      | category50      | 2016-01-01 00:00:00 |
| product_5 | name52      | category52      | 2016-01-01 00:00:02 |
| product_5 | name55      | category55      | 2016-01-01 00:00:05 |
| product_3 | name32      | category32      | 2016-01-01 00:00:02 |
| product_3 | name35      | category35      | 2016-01-01 00:00:05 |
+-----------+-------------+-----------------+---------------------+

此刻,你想獲取事件發(fā)生時(shí),對(duì)應(yīng)的最新的商品基礎(chǔ)信息。可能需要借助以下SQL實(shí)現(xiàn):

SELECT l.userID,
       l.eventTime,
       l.eventType,
       l.productID,
       l.productPrice,
       r.productID,
       r.productName,
       r.productCategory,
       r.updatedAt
FROM
    browse_event AS l,
    product_history_info AS r
WHERE r.productID = l.productID
 AND r.updatedAt = (
    SELECT max(updatedAt)
    FROM product_history_info AS r2
    WHERE r2.productID = l.productID
      AND r2.updatedAt <= l.eventTime
)

// 結(jié)果
+--------+---------------------+-----------+-----------+--------------+-----------+-------------+-----------------+---------------------+
| userID | eventTime           | eventType | productID | productPrice | productID | productName | productCategory | updatedAt           |
+--------+---------------------+-----------+-----------+--------------+-----------+-------------+-----------------+---------------------+
| user_1 | 2016-01-01 00:00:00 | browse    | product_5 |           20 | product_5 | name50      | category50      | 2016-01-01 00:00:00 |
| user_1 | 2016-01-01 00:00:01 | browse    | product_5 |           20 | product_5 | name50      | category50      | 2016-01-01 00:00:00 |
| user_1 | 2016-01-01 00:00:02 | browse    | product_5 |           20 | product_5 | name52      | category52      | 2016-01-01 00:00:02 |
| user_1 | 2016-01-01 00:00:03 | browse    | product_5 |           20 | product_5 | name52      | category52      | 2016-01-01 00:00:02 |
| user_1 | 2016-01-01 00:00:04 | browse    | product_5 |           20 | product_5 | name52      | category52      | 2016-01-01 00:00:02 |
| user_1 | 2016-01-01 00:00:05 | browse    | product_5 |           20 | product_5 | name55      | category55      | 2016-01-01 00:00:05 |
| user_1 | 2016-01-01 00:00:06 | browse    | product_5 |           20 | product_5 | name55      | category55      | 2016-01-01 00:00:05 |
| user_2 | 2016-01-01 00:00:02 | browse    | product_3 |           20 | product_3 | name32      | category32      | 2016-01-01 00:00:02 |
| user_2 | 2016-01-01 00:00:05 | browse    | product_3 |           20 | product_3 | name35      | category35      | 2016-01-01 00:00:05 |
| user_2 | 2016-01-01 00:00:06 | browse    | product_3 |           20 | product_3 | name35      | category35      | 2016-01-01 00:00:05 |
+--------+---------------------+-----------+-----------+--------------+-----------+-------------+-----------------+---------------------+

Temporal Table可以簡(jiǎn)化和加速此類(lèi)查詢(xún),并減少對(duì)狀態(tài)的使用。Temporal Table是將一個(gè)Append-Only表(如上product_history_info)中追加的行,根據(jù)設(shè)置的主鍵和時(shí)間(如上productID、updatedAt),解釋成Chanlog,并在特定時(shí)間提供數(shù)據(jù)的版本。

測(cè)試數(shù)據(jù)

自己造的測(cè)試數(shù)據(jù),browse log和product history info,如下:

// browse log
{"userID": "user_1", "eventTime": "2016-01-01 00:00:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:01", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:03", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:04", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:05", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 00:00:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 00:00:01", "eventType": "browse", "productID": "product_3", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 00:00:02", "eventType": "browse", "productID": "product_3", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 00:00:05", "eventType": "browse", "productID": "product_3", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 00:00:06", "eventType": "browse", "productID": "product_3", "productPrice": 20}

// product history info
{"productID":"product_5","productName":"name50","productCategory":"category50","updatedAt":"2016-01-01 00:00:00"}
{"productID":"product_5","productName":"name52","productCategory":"category52","updatedAt":"2016-01-01 00:00:02"}
{"productID":"product_5","productName":"name55","productCategory":"category55","updatedAt":"2016-01-01 00:00:05"}
{"productID":"product_3","productName":"name32","productCategory":"category32","updatedAt":"2016-01-01 00:00:02"}
{"productID":"product_3","productName":"name35","productCategory":"category35","updatedAt":"2016-01-01 00:00:05"}

代碼實(shí)現(xiàn)

package com.bigdata.flink.tableSqlTemporalTable;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.ProductInfo;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;


/**
 * Summary:
 *  時(shí)態(tài)表(Temporal Table)
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception{

        args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlTemporalTable/application.properties"};

        //1、解析命令行參數(shù)
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        //browse log
        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
        String browseTopic = parameterTool.getRequired("browseTopic");
        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        //product history info
        String productInfoTopic = parameterTool.getRequired("productHistoryInfoTopic");
        String productInfoGroupID = parameterTool.getRequired("productHistoryInfoGroupID");

        //2、設(shè)置運(yùn)行環(huán)境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);

        //3、注冊(cè)Kafka數(shù)據(jù)源
        //注意: 為了在北京時(shí)間和時(shí)間戳之間有直觀的認(rèn)識(shí),這里的UserBrowseLog中增加了一個(gè)字段eventTimeTimestamp作為eventTime的時(shí)間戳
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new BrowseTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("browse",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice,browseRowtime.rowtime");
        //tableEnv.toAppendStream(tableEnv.scan("browse"),Row.class).print();

        //4、注冊(cè)時(shí)態(tài)表(Temporal Table)
        //注意: 為了在北京時(shí)間和時(shí)間戳之間有直觀的認(rèn)識(shí),這里的ProductInfo中增加了一個(gè)字段updatedAtTimestamp作為updatedAt的時(shí)間戳
        Properties productInfoProperties = new Properties();
        productInfoProperties.put("bootstrap.servers",kafkaBootstrapServers);
        productInfoProperties.put("group.id",productInfoGroupID);
        DataStream<ProductInfo> productInfoStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties))
                .process(new ProductInfoProcessFunction())
                .assignTimestampsAndWatermarks(new ProductInfoTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("productInfo",productInfoStream, "productID,productName,productCategory,updatedAt,updatedAtTimestamp,productInfoRowtime.rowtime");
        //設(shè)置Temporal Table的時(shí)間屬性和主鍵
        TemporalTableFunction productInfo = tableEnv.scan("productInfo").createTemporalTableFunction("productInfoRowtime", "productID");
        //注冊(cè)TableFunction
        tableEnv.registerFunction("productInfoFunc",productInfo);
        //tableEnv.toAppendStream(tableEnv.scan("productInfo"),Row.class).print();

        //5、運(yùn)行SQL
        String sql = ""
                + "SELECT "
                + "browse.userID, "
                + "browse.eventTime, "
                + "browse.eventTimeTimestamp, "
                + "browse.eventType, "
                + "browse.productID, "
                + "browse.productPrice, "
                + "productInfo.productID, "
                + "productInfo.productName, "
                + "productInfo.productCategory, "
                + "productInfo.updatedAt, "
                + "productInfo.updatedAtTimestamp "
                + "FROM "
                + " browse, "
                + " LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "
                + "WHERE "
                + " browse.productID=productInfo.productID";

        Table table = tableEnv.sqlQuery(sql);
        tableEnv.toAppendStream(table,Row.class).print();

        //6、開(kāi)始執(zhí)行
        tableEnv.execute(Test.class.getSimpleName());


    }


    /**
     * 解析Kafka數(shù)據(jù)
     */
    static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {

                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

                // 增加一個(gè)long類(lèi)型的時(shí)間戳
                // 指定eventTime為yyyy-MM-dd HH:mm:ss格式的北京時(shí)間
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
                // 轉(zhuǎn)換成毫秒時(shí)間戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setEventTimeTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka數(shù)據(jù)異常...",ex);
            }
        }
    }

    /**
     * 提取時(shí)間戳生成水印
     */
    static class BrowseTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> {

        BrowseTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(UserBrowseLog element) {
            return element.getEventTimeTimestamp();
        }
    }





    /**
     * 解析Kafka數(shù)據(jù)
     */
    static class ProductInfoProcessFunction extends ProcessFunction<String, ProductInfo> {
        @Override
        public void processElement(String value, Context ctx, Collector<ProductInfo> out) throws Exception {
            try {

                ProductInfo log = JSON.parseObject(value, ProductInfo.class);

                // 增加一個(gè)long類(lèi)型的時(shí)間戳
                // 指定eventTime為yyyy-MM-dd HH:mm:ss格式的北京時(shí)間
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of("+08:00"));
                // 轉(zhuǎn)換成毫秒時(shí)間戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setUpdatedAtTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka數(shù)據(jù)異常...",ex);
            }
        }
    }

    /**
     * 提取時(shí)間戳生成水印
     */
    static class ProductInfoTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<ProductInfo> {

        ProductInfoTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(ProductInfo element) {
            return element.getUpdatedAtTimestamp();
        }
    }

}

測(cè)試結(jié)果

在對(duì)應(yīng)Kafka Topic中發(fā)送如上測(cè)試數(shù)據(jù)后,得到結(jié)果。

// 可以看到,獲取到了,事件發(fā)生時(shí),對(duì)應(yīng)的歷史最新的商品基礎(chǔ)信息
user_1,2016-01-01 00:00:01,1451577601000,browse,product_5,20,product_5,name50,category50,2016-01-01 00:00:00,1451577600000
user_1,2016-01-01 00:00:04,1451577604000,browse,product_5,20,product_5,name52,category52,2016-01-01 00:00:02,1451577602000
user_1,2016-01-01 00:00:02,1451577602000,browse,product_5,20,product_5,name52,category52,2016-01-01 00:00:02,1451577602000
user_1,2016-01-01 00:00:05,1451577605000,browse,product_5,20,product_5,name55,category55,2016-01-01 00:00:05,1451577605000
user_1,2016-01-01 00:00:00,1451577600000,browse,product_5,20,product_5,name50,category50,2016-01-01 00:00:00,1451577600000
user_1,2016-01-01 00:00:03,1451577603000,browse,product_5,20,product_5,name52,category52,2016-01-01 00:00:02,1451577602000
user_2,2016-01-01 00:00:02,1451577602000,browse,product_3,20,product_3,name32,category32,2016-01-01 00:00:02,1451577602000
user_2,2016-01-01 00:00:05,1451577605000,browse,product_3,20,product_3,name35,category35,2016-01-01 00:00:05,1451577605000

總結(jié)

Temporal Table可以簡(jiǎn)化和加速我們對(duì)歷史狀態(tài)數(shù)據(jù)的查詢(xún),并減少對(duì)狀態(tài)的使用。Temporal Table是將一個(gè)Append-Only表(如上product_history_info)中追加的行,根據(jù)設(shè)置的主鍵和時(shí)間(如上productID、updatedAt),解釋成Chanlog,并在特定時(shí)間提供數(shù)據(jù)的版本。

在使用時(shí)態(tài)表(Temporal Table)時(shí),要注意以下問(wèn)題。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-793448.html

  1. Temporal Table可提供歷史某個(gè)時(shí)間點(diǎn)上的數(shù)據(jù)。
  2. Temporal Table根據(jù)時(shí)間來(lái)跟蹤版本。
  3. Temporal Table需要提供時(shí)間屬性和主鍵。
  4. Temporal Table一般和關(guān)鍵詞LATERAL TABLE結(jié)合使用。
  5. Temporal Table在基于ProcessingTime時(shí)間屬性處理時(shí),每個(gè)主鍵只保存最新版本的數(shù)據(jù)。
  6. Temporal Table在基于EventTime時(shí)間屬性處理時(shí),每個(gè)主鍵保存從上個(gè)Watermark到當(dāng)前系統(tǒng)時(shí)間的所有版本。
  7. 左側(cè)Append-Only表Join右側(cè)Temporal Table,本質(zhì)上還是左表驅(qū)動(dòng)Join,即從左表拿到Key,根據(jù)Key和時(shí)間(可能是歷史時(shí)間)去右側(cè)Temporal Table表中查詢(xún)。

到了這里,關(guān)于Flink-SQL——時(shí)態(tài)表(Temporal Table)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink-sql對(duì)kafka數(shù)據(jù)進(jìn)行清洗過(guò)濾

    今天這篇blog主要記錄使用flink-sql對(duì)kafka中的數(shù)據(jù)進(jìn)行過(guò)濾。 以前對(duì)kafka數(shù)據(jù)進(jìn)行實(shí)時(shí)處理時(shí)都是使用java來(lái)進(jìn)行flink開(kāi)發(fā),需要?jiǎng)?chuàng)建一個(gè)工程,并且打成jar包再提交,流程固定但對(duì)于簡(jiǎn)單任務(wù)來(lái)說(shuō)還是比較繁瑣的。 今天我們要對(duì)logstash采集到kafka中的數(shù)據(jù)進(jìn)行過(guò)濾篩選,將篩選

    2024年02月16日
    瀏覽(30)
  • Flink-SQL join 優(yōu)化 -- MiniBatch + local-global

    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global

    問(wèn)題1. 近期在開(kāi)發(fā)flink-sql期間,發(fā)現(xiàn)數(shù)據(jù)在啟動(dòng)后,任務(wù)總是進(jìn)行重試,運(yùn)行一段時(shí)間后,container heartbeat timeout,內(nèi)存溢出(GC overhead limit exceede) ,作業(yè)無(wú)法進(jìn)行正常工作 問(wèn)題2. 未出現(xiàn)container心跳超時(shí)的,作業(yè)運(yùn)行緩慢,超過(guò)一天 ,作業(yè)仍存在反壓情況 查看日志內(nèi)容發(fā)現(xiàn),出

    2024年02月06日
    瀏覽(29)
  • 【大數(shù)據(jù)】Flink SQL 語(yǔ)法篇(六):Temporal Join

    《 Flink SQL 語(yǔ)法篇 》系列,共包含以下 10 篇文章: Flink SQL 語(yǔ)法篇(一):CREATE Flink SQL 語(yǔ)法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 語(yǔ)法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 語(yǔ)法篇(四):Group 聚合、Over 聚合 Flink SQL 語(yǔ)法篇(五):Regular Join、

    2024年03月15日
    瀏覽(67)
  • 11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無(wú)法獲取增量數(shù)據(jù)問(wèn)題

    11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無(wú)法獲取增量數(shù)據(jù)問(wèn)題

    問(wèn)題是來(lái)自于 群友, 2024.03.29, 也是花了一些時(shí)間 來(lái)排查這個(gè)問(wèn)題? 大致的問(wèn)題是用 mysql-cdc 連接了一個(gè) mysql-pxc 集群, 然后創(chuàng)建了一個(gè) test_user 表? 使用 \\\"select * from test_user\\\" 獲取數(shù)據(jù)表的數(shù)據(jù), 可以拿到 查詢(xún)時(shí)的快照, 但是 無(wú)法獲取到后續(xù)對(duì)于 test_user 表的增量操作的數(shù)據(jù), 比如

    2024年04月15日
    瀏覽(43)
  • Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 詳解

    Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 詳解

    Flink ?持?常多的數(shù)據(jù) Join ?式,主要包括以下三種: 動(dòng)態(tài)表(流)與動(dòng)態(tài)表(流)的 Join 動(dòng)態(tài)表(流)與外部維表(?如 Redis)的 Join 動(dòng)態(tài)表字段的列轉(zhuǎn)?(?種特殊的 Join) 細(xì)分 Flink SQL ?持的 Join: Regular Join:流與流的 Join,包括 Inner Equal Join、Outer Equal Join Interval Joi

    2024年02月04日
    瀏覽(23)
  • 【Flink SQL】Flink SQL 基礎(chǔ)概念(一):SQL & Table 運(yùn)行環(huán)境、基本概念及常用 API

    《 Flink SQL 基礎(chǔ)概念 》系列,共包含以下 5 篇文章: Flink SQL 基礎(chǔ)概念(一):SQL Table 運(yùn)行環(huán)境、基本概念及常用 API Flink SQL 基礎(chǔ)概念(二):數(shù)據(jù)類(lèi)型 Flink SQL 基礎(chǔ)概念(三):SQL 動(dòng)態(tài)表 連續(xù)查詢(xún) Flink SQL 基礎(chǔ)概念(四):SQL 的時(shí)間屬性 Flink SQL 基礎(chǔ)概念(五):SQL 時(shí)區(qū)問(wèn)

    2024年03月21日
    瀏覽(99)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 時(shí)態(tài)表的join(java版本)

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(20)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭(zhēng)取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開(kāi)始做實(shí)時(shí)數(shù)倉(cāng)了。據(jù)說(shuō)是應(yīng)屆生得把實(shí)時(shí)數(shù)倉(cāng)搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過(guò)了一些簡(jiǎn)單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(52)
  • 《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門(mén)

    《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門(mén)

    第四章中介紹了 DataStream API 以及 DataSet API 的入門(mén)案例,本章開(kāi)始介紹 Table API 以及基于此的高層應(yīng)用 Flink SQL 的基礎(chǔ)。 Flink 提供了兩個(gè)關(guān)系A(chǔ)PI——Table API 和 SQL——用于統(tǒng)一的流和批處理。Table API 是一種針對(duì)Java、Scala和Python的語(yǔ)言集成查詢(xún)API,它允許以非常直觀的方式組合來(lái)

    2024年02月03日
    瀏覽(48)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來(lái)運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語(yǔ)言的超集并專(zhuān)門(mén)為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語(yǔ)言集成式的 API 。與常規(guī) SQL 語(yǔ)言

    2024年02月04日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包