国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

這篇具有很好參考價值的文章主要介紹了Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。


FlinkSQL 官網(wǎng)配置參數(shù):

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html

一、設(shè)置空閑狀態(tài)保留時間

Flink SQL 新手有可能犯的錯誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時間導(dǎo)致狀態(tài)爆炸。列舉兩個場景:

? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會一直保存在狀態(tài)里,不會清理!要么設(shè)置 TTL,要么使用 FlinkSQL 的 interval join。

? 使用 Top-N 語法進(jìn)行去重,重復(fù)數(shù)據(jù)的出現(xiàn)一般都位于特定區(qū)間內(nèi) (例如一小時或一天內(nèi)),過了這段時間之后,對應(yīng)的狀態(tài)就不再需要了。

Flink SQL 可以指定空閑狀態(tài)(即未更新的狀態(tài))被保留的最小時間,當(dāng)狀態(tài)中某個 key 對應(yīng)的狀態(tài)未更新的時間達(dá)到閾值時,該條狀態(tài)被自動清理:

#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#參數(shù)指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");

二、開啟 MiniBatch

MiniBatch 是微批處理,原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對 State 的訪問,從而提升吞吐并減少數(shù)據(jù)的輸出量。MiniBatch 主要依靠在每個 Task 上注冊的 Timer 線程來觸發(fā)微批,需要消耗一定的線程調(diào)度性能。

? MiniBatch 默認(rèn)關(guān)閉,開啟方式如下:

// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");

? 適用場景

微批處理通過增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對于聚合的場景,微批處理可以顯著的提升系統(tǒng)性能,建議開啟。

flink sql 參數(shù),Flink,flink,java,jvm
? 注意事項:

1)目前,key-value 配置項僅被 Blink planner 支持。
2)1.12 之前的版本有 bug,開啟 miniBatch,不會清理過期狀態(tài),也就是說如果設(shè)置狀態(tài)的 TTL,無法清理過期狀態(tài)。1.12 版本才修復(fù)這個問題。

參考 ISSUE:https://issues.apache.org/jira/browse/FLINK-17096

三、開啟 LocalGlobal

原理概述

LocalGlobal 優(yōu)化將原先的 Aggregate 分成 Local+Global 兩階段聚合 , 即 MapReduce 模型中的 Combine+Reduce 處理模式。第一階段在上游節(jié)點(diǎn)本地攢一批數(shù)據(jù)進(jìn)行聚合 (localAgg) ,并輸出這次微批的增量值 (Accumulator)。第二階段再將收到的 Accumulator 合并 (Merge) ,得到最終的結(jié)果 (GlobalAgg) 。

LocalGlobal 本質(zhì)上能夠靠 LocalAgg 的聚合篩除部分傾斜數(shù)據(jù),從而降低 GlobalAgg的熱點(diǎn),提升性能。結(jié)合下圖理解 LocalGlobal 如何解決數(shù)據(jù)傾斜的問題。

flink sql 參數(shù),Flink,flink,java,jvm
由上圖可知:

  • 未開啟 LocalGlobal 優(yōu)化,由于流中的數(shù)據(jù)傾斜,Key 為紅色的聚合算子實例需要處理更多的記錄,這就導(dǎo)致了熱點(diǎn)問題。
  • 開啟 LocalGlobal 優(yōu)化后,先進(jìn)行本地聚合,再進(jìn)行全局聚合??纱蟠鬁p少 GlobalAgg 的熱點(diǎn),提高性能。

? LocalGlobal 開啟方式:

1)LocalGlobal 優(yōu)化需要先開啟 MiniBatch,依賴于 MiniBatch 的參數(shù)。
2)table.optimizer.agg-phase-strategy: 聚合策略。默認(rèn) AUTO,支持參數(shù) AUTO、TWO_PHASE(使用 LocalGlobal 兩階段聚合)、ONE_PHASE(僅使用 Global 一階段聚合)。

// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");

// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

? 注意事項:
1)需要先開啟 MiniBatch
2)開啟 LocalGlobal 需要 UDAF 實現(xiàn) Merge 方法。

四、開啟 Split Distinct

LocalGlobal 優(yōu)化針對普通聚合 (例如 SUM、COUNT、MAX、MIN 和 AVG ) 有較好的效果,對于 DISTINCT 的聚合 (如 COUNT DISTINCT) 收效不明顯,因為 COUNT DISTINCT 在 Local 聚合時,對于 DISTINCT KEY 的去重率不高,導(dǎo)致在 Global 節(jié)點(diǎn)仍然存在熱點(diǎn)。

原理概述

之前,為了解決 COUNT DISTINCT 的熱點(diǎn)問題,通常需要手動改寫為兩層聚合 (增加按 Distinct Key 取模的打散層 )。

從 Flink1.9.0 版本開始,提供了 COUNT DISTINCT 自動打散功能 , 通過HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手動重寫。Split Distinct 和
LocalGlobal 的原理對比參見下圖。

flink sql 參數(shù),Flink,flink,java,jvm
Distinct 舉例:

SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a

手動打散舉例:

SELECT a, SUM(cnt)
FROM (
 SELECT a, COUNT(DISTINCT b) as cnt
 FROM T
 GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a

? Split Distinct 開啟方式

默認(rèn)不開啟,使用參數(shù)顯式開啟:

  • table.optimizer.distinct-agg.split.enabled: true,默認(rèn) false。
  • table.optimizer.distinct-agg.split.bucket-num: Split Distinct 優(yōu)化在第一層聚合中,被打散的 bucket 數(shù)目。默認(rèn) 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):(要結(jié)合 minibatch 一起使用)
// 開啟 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的 bucket 數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

? 注意事項:

(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 優(yōu)化方法。
(2)拆分出來的兩個 GROUP 聚合還可參與 LocalGlobal 優(yōu)化。
(3)該功能在 Flink1.9.0 版本及以上版本才支持。

五、多維 DISTINCT 使用 Filter

原理概述

在某些場景下,可能需要從不同維度來統(tǒng)計 count(distinct)的結(jié)果(比如統(tǒng)計 uv、app 端的 uv、web 端的 uv),可能會使用如下 CASE WHEN 語法。

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a

在這種情況下,建議使用 FILTER 語法, 目前的 Flink SQL 優(yōu)化器可以識別同一唯一鍵上的不同 FILTER 參數(shù)。如,在上面的示例中,三個 COUNT DISTINCT 都作用在 b 列上。

此時,經(jīng)過優(yōu)化器識別后,F(xiàn)link 可以只使用一個共享狀態(tài)實例,而不是三個狀態(tài)實例,可減少狀態(tài)的大小和對狀態(tài)的訪問。

將上邊的 CASE WHEN 替換成 FILTER 后,如下所示:

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a

六、設(shè)置參數(shù)總結(jié)

總結(jié)以上的調(diào)優(yōu)參數(shù),代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-621400.html

// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 設(shè)置每個批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 開啟 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的 bucket 數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定時區(qū)
configuration.setString("table.local-time-zone", "Asia/Shanghai");

到了這里,關(guān)于Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【業(yè)務(wù)功能篇86】微服務(wù)-springcloud-系統(tǒng)性能壓力測試-jmeter-性能優(yōu)化-JVM參數(shù)調(diào)優(yōu)

    【業(yè)務(wù)功能篇86】微服務(wù)-springcloud-系統(tǒng)性能壓力測試-jmeter-性能優(yōu)化-JVM參數(shù)調(diào)優(yōu)

    ??壓力測試是給軟件不斷加壓,強(qiáng)制其在極限的情況下運(yùn)行,觀察它可以運(yùn)行到何種程度,從而發(fā)現(xiàn)性能缺陷,是通過搭建與實際環(huán)境相似的測試環(huán)境,通過測試程序在同一時間內(nèi)或某一段時間內(nèi),向系統(tǒng)發(fā)送預(yù)期數(shù)量的交易請求、測試系統(tǒng)在不同壓力情況下的效率狀況,

    2024年02月10日
    瀏覽(15)
  • Flink 學(xué)習(xí)十 FlinkSQL

    Flink 學(xué)習(xí)十 FlinkSQL

    flink sql 基于flink core ,使用sql 語義方便快捷的進(jìn)行結(jié)構(gòu)化數(shù)據(jù)處理的上層庫; 類似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整體架構(gòu)和工作流程 數(shù)據(jù)流,綁定元數(shù)據(jù) schema ,注冊成catalog 中的表 table / view 用戶使用table Api / table sql 來表達(dá)計算邏輯 table-planner利用 apache calci

    2024年02月10日
    瀏覽(17)
  • 【業(yè)務(wù)功能篇86】微服務(wù)-springcloud-系統(tǒng)性能壓力測試-jmeter-性能優(yōu)化-JVM參數(shù)調(diào)優(yōu)-Nginx實現(xiàn)動靜分離

    【業(yè)務(wù)功能篇86】微服務(wù)-springcloud-系統(tǒng)性能壓力測試-jmeter-性能優(yōu)化-JVM參數(shù)調(diào)優(yōu)-Nginx實現(xiàn)動靜分離

    ??壓力測試是給軟件不斷加壓,強(qiáng)制其在極限的情況下運(yùn)行,觀察它可以運(yùn)行到何種程度,從而發(fā)現(xiàn)性能缺陷,是通過搭建與實際環(huán)境相似的測試環(huán)境,通過測試程序在同一時間內(nèi)或某一段時間內(nèi),向系統(tǒng)發(fā)送預(yù)期數(shù)量的交易請求、測試系統(tǒng)在不同壓力情況下的效率狀況,

    2024年02月07日
    瀏覽(30)
  • Flink:FlinkSql解析嵌套Json

    Flink:FlinkSql解析嵌套Json

    日常開發(fā)中都是用的簡便json格式,但是偶爾也會遇到嵌套json的時候,因此在用flinksql的時候就有點(diǎn)麻煩,下面用簡單例子簡單定義處理下 1,數(shù)據(jù)是網(wǎng)上摘抄,但包含里常用的大部分格式 { ?? ?\\\"afterColumns\\\": { ?? ??? ?\\\"created\\\": \\\"1589186680\\\", ?? ??? ?\\\"extra\\\": { ?? ??? ??? ?\\\"

    2023年04月09日
    瀏覽(24)
  • JVM調(diào)優(yōu)篇:探索Java性能優(yōu)化的必備種子面試題

    JVM調(diào)優(yōu)篇:探索Java性能優(yōu)化的必備種子面試題

    首先面試官會詢問你在進(jìn)行JVM調(diào)優(yōu)之前,是否了解JVM內(nèi)存模型的基礎(chǔ)知識。這是一個重要的入門問題。JVM內(nèi)存模型主要包括程序計數(shù)器、堆、本地方法棧、Java棧和方法區(qū)(1.7之后更改為元空間,并直接使用系統(tǒng)內(nèi)存)。 正常堆內(nèi)存又分為年輕代和老年代。在Java虛擬機(jī)中,年

    2024年02月15日
    瀏覽(27)
  • Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC

    FlinkSQL說明 Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。 自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初

    2023年04月26日
    瀏覽(25)
  • flink學(xué)習(xí)35:flinkSQL查詢mysql

    flink學(xué)習(xí)35:flinkSQL查詢mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable { ? def main(args: Array[String]): Unit = { ??? //create env ??? val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    瀏覽(19)
  • 【Flink實戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語句一起使用來改變執(zhí)行計劃的。本章介紹如何使用 SQL 提示來實現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強(qiáng) planner:沒有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(24)
  • 【Flink系列七】TableAPI和FlinkSQL初體驗

    【Flink系列七】TableAPI和FlinkSQL初體驗

    Apache Flink 有兩種關(guān)系型 API 來做流批統(tǒng)一處理:Table API 和 SQL Table API 是用于 Scala 和 Java 語言的查詢API,它可以用一種非常直觀的方式來組合使用選取、過濾、join 等關(guān)系型算子。 ?Flink SQL 是基于?Apache Calcite?來實現(xiàn)的標(biāo)準(zhǔn) SQL。無論輸入是連續(xù)的(流式)還是有界的(批處理

    2024年02月03日
    瀏覽(22)
  • Java線上故障排查(CPU、磁盤、內(nèi)存、網(wǎng)絡(luò)、GC)+JVM性能調(diào)優(yōu)監(jiān)控工具+JVM常用參數(shù)和命令

    Java線上故障排查(CPU、磁盤、內(nèi)存、網(wǎng)絡(luò)、GC)+JVM性能調(diào)優(yōu)監(jiān)控工具+JVM常用參數(shù)和命令

    根據(jù)服務(wù)部署和項目架構(gòu),從如下幾個方面排查: (1)運(yùn)用服務(wù)器:排查內(nèi)存,cpu,請求數(shù)等; (2)文件圖片服務(wù)器:排查內(nèi)存,cpu,請求數(shù)等; (3)計時器服務(wù)器:排查內(nèi)存,cpu,請求數(shù)等; (4)redis服務(wù)器:排查內(nèi)存,cpu,連接數(shù)等; (5)db服務(wù)器:排查內(nèi)存,cpu,連接數(shù)

    2024年02月07日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包