《Flink SQL 語法篇》系列,共包含以下 10 篇文章:
- Flink SQL 語法篇(一):CREATE
- Flink SQL 語法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 語法篇(四):Group 聚合、Over 聚合
- Flink SQL 語法篇(五):Regular Join、Interval Join
- Flink SQL 語法篇(六):Temporal Join
- Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 語法篇(八):集合、Order By、Limit、TopN
- Flink SQL 語法篇(九):Window TopN、Deduplication
- Flink SQL 語法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
?? 如果您覺得這篇文章有用 ?? 的話,請給博主一個一鍵三連 ?????? 吧 (點贊 ??、關(guān)注 ??、收藏 ??)!??!您的支持 ?????? 將激勵 ?? 博主輸出更多優(yōu)質(zhì)內(nèi)容?。?!
1.Lookup Join(維表 Join)
Lookup Join 定義(支持 Batch / Streaming):Lookup Join 其實就是維表 Join,比如拿離線數(shù)倉來說,常常會有用戶畫像,設(shè)備畫像等數(shù)據(jù),而對應到實時數(shù)倉場景中,這種實時獲取外部緩存的 Join 就叫做維表 Join。
應用場景:小伙伴萌會問,我們既然已經(jīng)有了上面介紹的 Regular Join,Interval Join 等,為啥還需要一種 Lookup Join?因為上面說的這幾種 Join 都是 流與流之間的 Join,而 Lookup Join 是流與 Redis,MySQL,HBase 這種存儲介質(zhì)的 Join。Lookup 的意思就是實時查找,而實時的畫像數(shù)據(jù)一般都是存儲在 Redis,MySQL,HBase 中,這就是 Lookup Join 的由來。
實際案例:使用曝光用戶日志流(show_log
)關(guān)聯(lián)用戶畫像維表(user_profile
)關(guān)聯(lián)到用戶的維度之后,提供給下游計算分性別,年齡段的曝光用戶數(shù)使用。
- 曝光用戶日志流(
show_log
)數(shù)據(jù)(數(shù)據(jù)存儲在 Kafka 中)
log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c
- 用戶畫像維表(
user_profile
)數(shù)據(jù)(數(shù)據(jù)存儲在 Redis 中)
user_id(主鍵) age sex
a 12-18 男
b 18-24 女
c 18-24 男
注意:Redis 中的數(shù)據(jù)結(jié)構(gòu)存儲是按照 Key-Value 去存儲的。其中 Key 為
user_id
,Value 為age
,sex
的 JSON。
具體 SQL:
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 邏輯
INSERT INTO sink_table
SELECT
s.log_id as log_id,
s.`timestamp` as `timestamp`,
s.user_id as user_id,
s.proctime as proctime,
u.sex as sex,
u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
輸出數(shù)據(jù)如下:
log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18 男
2 2021-11-01 00:03:00 b 18-24 女
3 2021-11-01 00:05:00 c 18-24 男
4 2021-11-01 00:06:00 b 18-24 女
5 2021-11-01 00:07:00 c 18-24 男
注意:實時的 Lookup 維表關(guān)聯(lián)能使用 處理時間 去做關(guān)聯(lián)。
- 同一條數(shù)據(jù)關(guān)聯(lián)到的維度數(shù)據(jù)可能不同:實時數(shù)倉中常用的實時維表都是在不斷的變化中的,當前流表數(shù)據(jù)關(guān)聯(lián)完維表數(shù)據(jù)后,如果同一個
key
的維表的數(shù)據(jù)發(fā)生了變化,已關(guān)聯(lián)到的維表的結(jié)果數(shù)據(jù)不會再同步更新。舉個例子,維表中user_id
為 1 1 1 的數(shù)據(jù)在 08 : 00 08:00 08:00 時age
由12-18
變?yōu)榱?18-24
,那么當我們的任務在 08 : 01 08:01 08:01failover
之后從 07 : 59 07:59 07:59 開始回溯數(shù)據(jù)時,原本應該關(guān)聯(lián)到12-18
的數(shù)據(jù)會關(guān)聯(lián)到18-24
的age
數(shù)據(jù)。這是有可能會影響數(shù)據(jù)質(zhì)量的。所以小伙伴萌在評估你們的實時任務時要考慮到這一點。 - 會發(fā)生實時的新建及更新的維表博主建議小伙伴萌應該建立起數(shù)據(jù)延遲的監(jiān)控機制,防止出現(xiàn)流表數(shù)據(jù)先于維表數(shù)據(jù)到達,導致關(guān)聯(lián)不到維表數(shù)據(jù)。
再說說維表常見的性能問題及優(yōu)化思路。
所有的維表性能問題都可以總結(jié)為:高 QPS 下訪問維表存儲引擎產(chǎn)生的任務背壓,數(shù)據(jù)產(chǎn)出延遲問題。
舉個例子:
- 在沒有使用維表的情況下:一條數(shù)據(jù)從輸入 Flink 任務到輸出 Flink 任務的時延假如為 0.1 ? m s 0.1\ ms 0.1?ms,那么并行度為 1 1 1 的任務的吞吐可以達到 1 ? q u e r y ? / ? 0.1 ? m s = 10000 ? q p s 1\ query\ /\ 0.1\ ms = 10000\ qps 1?query?/?0.1?ms=10000?qps。
- 在使用維表之后:每條數(shù)據(jù)訪問維表的外部存儲的時長為 2 ? m s 2\ ms 2?ms,那么一條數(shù)據(jù)從輸入 Flink 任務到輸出 Flink 任務的時延就會變成 2.1 ? m s 2.1\ ms 2.1?ms,那么同樣并行度為 1 的任務的吞吐只能達到 1 ? q u e r y ? / ? 2.1 ? m s = 476 ? q p s 1\ query\ /\ 2.1\ ms = 476\ qps 1?query?/?2.1?ms=476?qps。兩者的吞吐量相差 21 21 21 倍。
這就是為什么維表 Join 的算子會產(chǎn)生背壓,任務產(chǎn)出會延遲。
那么當然,解決方案也是有很多的。拋開 Flink SQL 想一下,如果我們使用 DataStream API,甚至是在做一個后端應用,需要訪問外部存儲時,常用的優(yōu)化方案有哪些?這里列舉一下:
- 1?? 按照 Redis 維表的 key 分桶 + local cache:通過按照
key
分桶的方式,讓大多數(shù)據(jù)的維表關(guān)聯(lián)的數(shù)據(jù)訪問走之前訪問過的local cache
即可。這樣就可以把訪問外部存儲 2.1 ? m s 2.1\ ms 2.1?ms 處理一個 Query 變?yōu)樵L問內(nèi)存的 0.1 ? m s 0.1\ ms 0.1?ms 處理一個 Query 的時長。 - 2?? 異步訪問外存:DataStream API 有異步算子,可以利用線程池去同時多次請求維表外部存儲。這樣就可以把 2.1 ? m s 2.1\ ms 2.1?ms 處理 1 1 1 個 Query 變?yōu)? 2.1 ? m s 2.1\ ms 2.1?ms 處理 10 10 10 個 Query。吞吐可變優(yōu)化到 10 ? q u e r y ? / ? 2.1 ? m s = 4761 ? q p s 10\ query\ /\ 2.1\ ms = 4761\ qps 10?query?/?2.1?ms=4761?qps。
- 3?? 批量訪問外存:除了異步訪問之外,我們還可以批量訪問外部存儲。舉一個例子:在訪問 Redis 維表的
1
1
1 Query 占用
2.1
?
m
s
2.1\ ms
2.1?ms 時長中,其中可能有
2
?
m
s
2\ ms
2?ms 都是在網(wǎng)絡(luò)請求上面的耗時 ,其中只有
0.1
?
m
s
0.1\ ms
0.1?ms 是 Redis Server 處理請求的時長。那么我們就可以使用 Redis 提供的
pipeline
能力,在客戶端(也就是 Flink 任務lookup join
算子中),攢一批數(shù)據(jù),使用pipeline
去同時訪問 Redis Sever。這樣就可以把 2.1 ? m s 2.1\ ms 2.1?ms 處理 1 1 1 個 Query 變?yōu)? 7 ? m s = 2 ? m s + 50 ? 0.1 ? m s 7\ ms=2\ ms + 50 * 0.1\ ms 7?ms=2?ms+50?0.1?ms 處理 50 50 50 個 Query。吞吐可變?yōu)? 50 ? q u e r y ? / ? 7 ? m s = 7143 ? q p s 50\ query\ /\ 7\ ms = 7143\ qps 50?query?/?7?ms=7143?qps。
博主認為上述優(yōu)化效果中,最好用的是 1?? + 3??,2?? 相比 3?? 還是一條一條發(fā)請求,性能會差一些。
既然 DataStream 可以這樣做,F(xiàn)link SQL 必須必的也可以借鑒上面的這些優(yōu)化方案。具體怎么操作呢?看下文騷操作
- 1?? 按照 Redis 維表的 key 分桶 + local cache:SQL 中如果要做分桶,得先做
group by
,但是如果做了group by
的聚合,就只能在udaf
(user defined aggregation function
)中做訪問 Redis 處理,并且udaf
產(chǎn)出的結(jié)果只能是一條,所以這種實現(xiàn)起來非常復雜。我們選擇不做keyby
分桶。但是我們可以直接使用local cache
去做本地緩存,雖然【直接緩存】的效果比【先按照key
分桶再做緩存】的效果差,但是也能一定程度上減少訪問 Redis 壓力。在博主實現(xiàn)的 Redis Connector 中,內(nèi)置了local cache
的實現(xiàn)。 - 2?? 異步訪問外存:目前博主實現(xiàn)的 Redis Connector 不支持異步訪問,但是官方實現(xiàn)的 HBase Connector 支持這個功能,參考下面鏈接文章的,點開之后搜索
lookup.async
。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/ - 3?? 批量訪問外存:這玩意官方必然沒有實現(xiàn)啊,但是,但是,但是,經(jīng)過博主周末兩天的瘋狂 debug,改了改源碼,搞定了基于 Redis 的批量訪問外存優(yōu)化的功能。
2.Array Expansion(數(shù)組列轉(zhuǎn)行)
應用場景(支持 Batch / Streaming):將表中 ARRAY 類型字段(列)拍平,轉(zhuǎn)為多行。
實際案例:比如某些場景下,日志是合并、攢批上報的,就可以使用這種方式將一個 Array 轉(zhuǎn)為多行。
CREATE TABLE show_log_table (
log_id BIGINT,
show_params ARRAY<STRING>
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
log_id BIGINT,
show_param STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
log_id,
t.show_param as show_param
FROM show_log_table
-- array 炸開語法
CROSS JOIN UNNEST(show_params) AS t (show_param)
show_log_table
原始數(shù)據(jù):
+I[7, [a, b, c]]
+I[5, [d, e, f]]
輸出結(jié)果如下所示:
-- +I[7, [a, b, c]] 一行轉(zhuǎn)為 3 行
+I[7, a]
+I[7, b]
+I[7, b]
-- +I[5, [d, e, f]] 一行轉(zhuǎn)為 3 行
+I[5, d]
+I[5, e]
+I[5, f]
3.Table Function(自定義列轉(zhuǎn)行)
應用場景(支持 Batch / Streaming):這個其實和 Array Expansion 功能類似,但是 Table Function 本質(zhì)上是個 UDTF 函數(shù),和離線 Hive SQL 一樣,我們可以自定義 UDTF 去決定列轉(zhuǎn)行的邏輯。
Table Function 使用分類:文章來源:http://www.zghlxwxcb.cn/news/detail-857337.html
-
Inner Join Table Function
:如果 UDTF 返回結(jié)果為空,則相當于 1 1 1 行轉(zhuǎn)為 0 0 0 行,這行數(shù)據(jù)直接被丟棄。 -
Left Join Table Function
:如果 UDTF 返回結(jié)果為空,折行數(shù)據(jù)不會被丟棄,只會在結(jié)果中填充null
值。
public class TableFunctionInnerJoin_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"
+ "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"
+ "\n"
+ "CREATE TABLE source_table (\n"
+ " user_id BIGINT NOT NULL,\n"
+ " name STRING,\n"
+ " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"
+ " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '10',\n"
+ " 'fields.name.length' = '1',\n"
+ " 'fields.user_id.min' = '1',\n"
+ " 'fields.user_id.max' = '10'\n"
+ ");\n"
+ "\n"
+ "CREATE TABLE sink_table (\n"
+ " user_id BIGINT,\n"
+ " name STRING,\n"
+ " age INT,\n"
+ " row_time TIMESTAMP(3)\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ");\n"
+ "\n"
+ "INSERT INTO sink_table\n"
+ "SELECT user_id,\n"
+ " name,\n"
+ " age,\n"
+ " row_time\n"
+ "FROM source_table,\n"
// Table Function Join 語法對應 LATERAL TABLE
+ "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";
Arrays.stream(sql.split(";"))
.forEach(flinkEnv.streamTEnv()::executeSql);
}
public static class UserProfileTableFunction extends TableFunction<Integer> {
public void eval(long userId) {
// 自定義輸出邏輯
if (userId <= 5) {
// 一行轉(zhuǎn) 1 行
collect(1);
} else {
// 一行轉(zhuǎn) 3 行
collect(1);
collect(2);
collect(3);
}
}
}
}
執(zhí)行結(jié)果如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-857337.html
-- userId <= 5,則只有 1 行結(jié)果
+I[3, 7, 1, 2021-05-01T18:23:42.560]
-- userId > 5,則有行 3 結(jié)果
+I[8, e, 1, 2021-05-01T18:23:42.560]
+I[8, e, 2, 2021-05-01T18:23:42.560]
+I[8, e, 3, 2021-05-01T18:23:42.560]
-- userId <= 5,則只有 1 行結(jié)果
+I[4, 9, 1, 2021-05-01T18:23:42.561]
-- userId > 5,則有行 3 結(jié)果
+I[8, c, 1, 2021-05-01T18:23:42.561]
+I[8, c, 2, 2021-05-01T18:23:42.561]
+I[8, c, 3, 2021-05-01T18:23:42.561]
到了這里,關(guān)于【大數(shù)據(jù)】Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!