FlinkSQL 官網(wǎng)配置參數(shù):
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html
一、設(shè)置空閑狀態(tài)保留時間
Flink SQL 新手有可能犯的錯誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時間導(dǎo)致狀態(tài)爆炸。列舉兩個場景:
? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會一直保存在狀態(tài)里,不會清理!要么設(shè)置 TTL,要么使用 FlinkSQL 的 interval join。
? 使用 Top-N 語法進(jìn)行去重,重復(fù)數(shù)據(jù)的出現(xiàn)一般都位于特定區(qū)間內(nèi) (例如一小時或一天內(nèi)),過了這段時間之后,對應(yīng)的狀態(tài)就不再需要了。
Flink SQL 可以指定空閑狀態(tài)(即未更新的狀態(tài))被保留的最小時間,當(dāng)狀態(tài)中某個 key 對應(yīng)的狀態(tài)未更新的時間達(dá)到閾值時,該條狀態(tài)被自動清理:
#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#參數(shù)指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");
二、開啟 MiniBatch
MiniBatch 是微批處理,原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對 State 的訪問,從而提升吞吐并減少數(shù)據(jù)的輸出量。MiniBatch 主要依靠在每個 Task 上注冊的 Timer 線程來觸發(fā)微批,需要消耗一定的線程調(diào)度性能。
? MiniBatch 默認(rèn)關(guān)閉,開啟方式如下:
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");
? 適用場景
微批處理通過增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對于聚合的場景,微批處理可以顯著的提升系統(tǒng)性能,建議開啟。
? 注意事項:
1)目前,key-value 配置項僅被 Blink planner 支持。
2)1.12 之前的版本有 bug,開啟 miniBatch,不會清理過期狀態(tài),也就是說如果設(shè)置狀態(tài)的 TTL,無法清理過期狀態(tài)。1.12 版本才修復(fù)這個問題。
參考 ISSUE:https://issues.apache.org/jira/browse/FLINK-17096
三、開啟 LocalGlobal
原理概述
LocalGlobal 優(yōu)化將原先的 Aggregate 分成 Local+Global 兩階段聚合 , 即 MapReduce 模型中的 Combine+Reduce 處理模式。第一階段在上游節(jié)點(diǎn)本地攢一批數(shù)據(jù)進(jìn)行聚合 (localAgg) ,并輸出這次微批的增量值 (Accumulator)。第二階段再將收到的 Accumulator 合并 (Merge) ,得到最終的結(jié)果 (GlobalAgg) 。
LocalGlobal 本質(zhì)上能夠靠 LocalAgg 的聚合篩除部分傾斜數(shù)據(jù),從而降低 GlobalAgg的熱點(diǎn),提升性能。結(jié)合下圖理解 LocalGlobal 如何解決數(shù)據(jù)傾斜的問題。
由上圖可知:
- 未開啟 LocalGlobal 優(yōu)化,由于流中的數(shù)據(jù)傾斜,Key 為紅色的聚合算子實例需要處理更多的記錄,這就導(dǎo)致了熱點(diǎn)問題。
- 開啟 LocalGlobal 優(yōu)化后,先進(jìn)行本地聚合,再進(jìn)行全局聚合??纱蟠鬁p少 GlobalAgg 的熱點(diǎn),提高性能。
? LocalGlobal 開啟方式:
1)LocalGlobal 優(yōu)化需要先開啟 MiniBatch,依賴于 MiniBatch 的參數(shù)。
2)table.optimizer.agg-phase-strategy: 聚合策略。默認(rèn) AUTO,支持參數(shù) AUTO、TWO_PHASE(使用 LocalGlobal 兩階段聚合)、ONE_PHASE(僅使用 Global 一階段聚合)。
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
? 注意事項:
1)需要先開啟 MiniBatch
2)開啟 LocalGlobal 需要 UDAF 實現(xiàn) Merge 方法。
四、開啟 Split Distinct
LocalGlobal 優(yōu)化針對普通聚合 (例如 SUM、COUNT、MAX、MIN 和 AVG ) 有較好的效果,對于 DISTINCT 的聚合 (如 COUNT DISTINCT) 收效不明顯,因為 COUNT DISTINCT 在 Local 聚合時,對于 DISTINCT KEY 的去重率不高,導(dǎo)致在 Global 節(jié)點(diǎn)仍然存在熱點(diǎn)。
原理概述
之前,為了解決 COUNT DISTINCT 的熱點(diǎn)問題,通常需要手動改寫為兩層聚合 (增加按 Distinct Key 取模的打散層 )。
從 Flink1.9.0 版本開始,提供了 COUNT DISTINCT 自動打散功能 , 通過HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手動重寫。Split Distinct 和
LocalGlobal 的原理對比參見下圖。
Distinct 舉例:
SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a
手動打散舉例:
SELECT a, SUM(cnt)
FROM (
SELECT a, COUNT(DISTINCT b) as cnt
FROM T
GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a
? Split Distinct 開啟方式
默認(rèn)不開啟,使用參數(shù)顯式開啟:
- table.optimizer.distinct-agg.split.enabled: true,默認(rèn) false。
- table.optimizer.distinct-agg.split.bucket-num: Split Distinct 優(yōu)化在第一層聚合中,被打散的 bucket 數(shù)目。默認(rèn) 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):(要結(jié)合 minibatch 一起使用)
// 開啟 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的 bucket 數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
? 注意事項:
(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 優(yōu)化方法。
(2)拆分出來的兩個 GROUP 聚合還可參與 LocalGlobal 優(yōu)化。
(3)該功能在 Flink1.9.0 版本及以上版本才支持。
五、多維 DISTINCT 使用 Filter
原理概述
在某些場景下,可能需要從不同維度來統(tǒng)計 count(distinct)的結(jié)果(比如統(tǒng)計 uv、app 端的 uv、web 端的 uv),可能會使用如下 CASE WHEN 語法。
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a
在這種情況下,建議使用 FILTER 語法, 目前的 Flink SQL 優(yōu)化器可以識別同一唯一鍵上的不同 FILTER 參數(shù)。如,在上面的示例中,三個 COUNT DISTINCT 都作用在 b 列上。
此時,經(jīng)過優(yōu)化器識別后,F(xiàn)link 可以只使用一個共享狀態(tài)實例,而不是三個狀態(tài)實例,可減少狀態(tài)的大小和對狀態(tài)的訪問。
將上邊的 CASE WHEN 替換成 FILTER 后,如下所示:文章來源:http://www.zghlxwxcb.cn/news/detail-621400.html
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a
六、設(shè)置參數(shù)總結(jié)
總結(jié)以上的調(diào)優(yōu)參數(shù),代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-621400.html
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 開啟 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的 bucket 數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定時區(qū)
configuration.setString("table.local-time-zone", "Asia/Shanghai");
到了這里,關(guān)于Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!