国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink:流式 Join 類型 / 分類 盤點 (一)

這篇具有很好參考價值的文章主要介紹了Flink:流式 Join 類型 / 分類 盤點 (一)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink:流式 Join 類型 / 分類 盤點 (一),大數據專題,flink,join,類型,Temporal,Interval,Lookup,維表 博主歷時三年精心創(chuàng)作的《大數據平臺架構與原型實現:數據中臺建設實戰(zhàn)》一書現已由知名IT圖書品牌電子工業(yè)出版社博文視點出版發(fā)行,點擊《重磅推薦:建大數據平臺太難了!給我發(fā)個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。

在Flink中,實現流之間連接的操作可以分為兩類。第一類是基于原生State狀態(tài)存儲的Connect算子操作,這種方式可以實現低延遲的數據連接和轉換;第二類則是基于窗口的JOIN操作,這種方式又可以細分為window join和interval join兩種,通過對數據進行時間窗口和滑動窗口的劃分,實現不同粒度的數據關聯和計算。

1. Regular Join(常規(guī) Join):

從 SQL 上看,它只是一條普通的 SQL,和批處理的 SQL 無異:

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

在為了維持常規(guī) Join 結果的準確性,不難判斷的是:Flink 需要將 Join 輸入的兩邊數據永遠保持在狀態(tài)中,所以,計算查詢結果所需的狀態(tài)可能會無限增長,對于長時間運行的大數據量的流來說,這種 Join 的代價是負擔不起的。當然,我們可以通過配置狀態(tài)的 TTL 來緩解這一問題,但這可能會導致結果不準確??偟脕碚f就是:在流上,常規(guī) Join 是可用的,但要慎用。

Regular Join 又會細分為 INNER Equi-JOIN 和 OUTER Equi-JOIN,具體參考文檔,此處不再贅述

2. Interval Join(時間區(qū)間 Join)

Regular Join 的條件太寬松,導致 Join 成本巨大,Interval Join 會添加一個時間范圍限制,讓流上僅處于指定時間區(qū)間內的數據參與 Join。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

上述 SQL 是一個典型的 Interval Join,它試圖關聯訂單和它的發(fā)貨記錄以便獲得更多信息,如果業(yè)務上保證:下單之后 4 小時以內即可發(fā)貨,那上述 SQL 就能保證 order 和它的 shipment 可以關聯上。我們看到,從 SQL 上來說,Interval Join 區(qū)別于 Regular Join 的地方就是:它在 Regular Join 的基礎上又追加了時間范圍條件,這就大大地減輕了維持 Join 狀態(tài)數據的負擔。以下是一些典型的 Interval Join 條件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

3. Temporal Join (版本表 Join)

Temporal Join 并沒有對應一個精準的中文稱為,但別簡單地把它稱為 Temporal Table Join,因為它 Join 的是 Temporal Table (時態(tài)表)中的版本表,如果要精準描述的話,應該說是:版本表 Join。

要了解 Temporal Join 必須得先了解很么是 Temporal Table (時態(tài)表),對此,請參考 《關于 動態(tài)表 / 時態(tài)表 / 版本表 概念的澄清》一文,本文就不再解釋了。Temporal Join 就是 join 了一張版本表,那這到底有何不同呢?我們知道,既然版本表中一條記錄在不同時刻可能會有不同的值(版本),那這就會引申出一個問題:當我們 join 一張版本表時,應該 join 一條記錄的哪個版本呢?如果沒有特別配置,那么默認行為自然是應該 join 當前的最新值(版本),那有沒有需要 join 過去某個時間點的值(版本)的場景呢?有!并且有很多!官方文檔給出的就是一個典型的例子:對于一張訂單,我們總是應該參照下單時的匯率表去轉換為一種統(tǒng)一貨幣的總價,這就需要訂單表去 Join 匯率表在下單時刻的那個版本值

3.1. 基于事件時間的 Temporal Join

為了便于描述,我們按官方文檔的介紹,讓版本表作為被關聯表,用”右表“指代,把主動需要關聯的表稱為”左表“。既然 Temporal Join 關聯的右表是版本表,則關聯的一方,也就是版本表必然已經定義了事件時間屬性,如果關聯的另一張表,也就是”左表“,也定義了事件時間屬性(通過 Wartermark),且在 Join 時通過 FOR SYSTEM_TIME AS OF 關鍵字指定了左表上的這個事件時間屬性,那么,這就是一個”基于事件時間的 Temporal Join“,以下是一個示例:

-- 左表:orders, 注意 orders 表也定義了事件時間列:order_time
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (/* ... */);

-- 右表:currency_rates 是一張版本表,因為它定義了主鍵和事件時間
-- 這種表的數據通常來自 CDC 數據,也就是 數據庫的 changelog
-- 顯然,這里使用的是存放在kafka中的debezium-json格式的 changelog 數據 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     orders.currency,
     conversion_rate,
     order_time
FROM orders
-- 關鍵字:FOR SYSTEM_TIME AS OF 用于指定左表中的一個時間類型的字段,Flink會根據這個時間和
-- 版本表上的指定的事件時間字段(即 currency_rates.update_time)進行比對,找到對應版本的記錄
-- 與之進行關聯。這里“對應版本”的邏輯應該是:在order_time這個時刻,currency_rates 所對應著的
-- 當時版本的記錄值
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

在上面這個基于事件時間的 Temporal Join 中,最核心的一個邏輯在于:在按匯率計算一個 order 的總價時,到底是讀的 currency_rates 版本表中的哪一個版本值?我們假設: 一個 order 的 order_time 是 13:42,currency_rates 對應貨幣的匯率在 13:4013:45各有一個版本(假設每5分鐘更新一次),分別是 6.886.89,若當前時間 是13:46 分,則這個 order join 的哪一個匯率呢?顯然是 6.88。

從 基于事件時間的 Temporal Join 的行為特征上不難看出:對于正在實時 Join 的兩個流來說,如我們需要一張表總是 Join 其記錄所代表的事件在發(fā)生的當時另一張表上當時的數據,此時就應該使用 “基于事件時間的 Temporal Join”,簡單總結一下的話可以說是:當時對當時,這應該符合大多數流式的 Join 需求。

3.2. 基于處理時間的 Temporal Join

基于處理時間的 Temporal Join常常用在使用外部系統(tǒng)來豐富流的數據,典型的例子是:維表 Join。

從 基于處理時間的 Temporal Join 的行為特征上不難看出:對于正在實時 Join 的兩個流來說,如我們需要一張表總是 Join 另一張表上當前的最新數據,此時就應該使用 “基于處理時間的 Temporal Join”,簡單總結一下的話可以說是:當時對現在,維表 Join 通常是此類情形的典型代表(通常維表的變化是很緩慢的)文章來源地址http://www.zghlxwxcb.cn/news/detail-840794.html

到了這里,關于Flink:流式 Join 類型 / 分類 盤點 (一)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包