国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析

這篇具有很好參考價(jià)值的文章主要介紹了FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

FlinkSQL處理如下實(shí)時(shí)數(shù)據(jù)需求:
實(shí)時(shí)聚合不同 類型/賬號(hào)/發(fā)布時(shí)間 的各個(gè)指標(biāo)數(shù)據(jù),比如:初始化/初始化后刪除/初始化后取消/推送/成功/失敗 的指標(biāo)數(shù)據(jù)。要求實(shí)時(shí)產(chǎn)出指標(biāo)數(shù)據(jù),數(shù)據(jù)源是mysql cdc binlog數(shù)據(jù)。

代碼實(shí)例

--SET table.exec.state.ttl=86400s; --24 hour,默認(rèn): 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默認(rèn): 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果為false,則在發(fā)生checkpoint時(shí)觸發(fā)offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失敗->2               失敗->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群參數(shù)
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 檢查點(diǎn)配置
    FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析,flink,大數(shù)據(jù),flink

  • job運(yùn)行資源
    管理節(jié)點(diǎn)(JM) 1 個(gè), 節(jié)點(diǎn)規(guī)格 1 核 4 GB內(nèi)存, 磁盤 10Gi
    運(yùn)行節(jié)點(diǎn)(TM)10 個(gè), 節(jié)點(diǎn)規(guī)格 1 核 4 GB內(nèi)存, 磁盤 80Gi
    單TM槽位數(shù)(Slot): 1
    默認(rèn)并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{
    "app_cust_syyy_private_domain_syyy_group_msg": {
        "properties": {
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
            "account_id": {
                "type": "keyword"
            },
           "publish_time": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
           			"ignore_malformed":"true" # 忽略錯(cuò)誤的各式
           		}
           	}
           },
            "grouping_id": {
                "type": "integer"
            },
            "init": {
                "type": "integer"
            },
            "init_cancel": {
                "type": "integer"
            },
            "query": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "init_delete": {
                "type": "integer"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

性能調(diào)優(yōu)

是否開啟【MiniBatch 聚合】和【Local-Global 聚合】對(duì)分組聚合場(chǎng)景影響巨大,尤其是在數(shù)據(jù)量大的場(chǎng)景下。

  • 如果未開啟,在分組聚合,數(shù)據(jù)更新狀態(tài)時(shí),每條數(shù)據(jù)都會(huì)觸發(fā)聚合運(yùn)算,進(jìn)而更新StateBackend (尤其是對(duì)于 RocksDB StateBackend,火焰圖上反映就是一直在update rocksdb),造成上游算子背壓特別大。此外,生產(chǎn)中非常常見的數(shù)據(jù)傾斜會(huì)使這個(gè)問題惡化,并且容易導(dǎo)致 job 發(fā)生反壓。
    FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析,flink,大數(shù)據(jù),flink

  • 在開啟【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

開啟配置好會(huì)在DAG上添加兩個(gè)環(huán)節(jié)MiniBatchAssignerLocalGroupAggregate
FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析,flink,大數(shù)據(jù),flink

對(duì)結(jié)果的影響

開啟了【MiniBatch 聚合】和【Local-Global 聚合】后,一天處理不完的數(shù)據(jù),在10分鐘內(nèi)處理完畢

輸出結(jié)果

FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析,flink,大數(shù)據(jù),flinkFlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析,flink,大數(shù)據(jù),flink

參考:
Group Aggregation
Streaming Aggregation Performance Tuning文章來源地址http://www.zghlxwxcb.cn/news/detail-796621.html

到了這里,關(guān)于FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Redisson】分布式鎖源碼分析如何實(shí)現(xiàn)多個(gè)應(yīng)用實(shí)例互斥

    lockName就是保存到Redis里面的key 直接進(jìn)行構(gòu)建方法里面的 super(commandExecutor, name); org.redisson.connection.ServiceManager : private final String id = UUID.randomUUID().toString(); 這個(gè) id 就是 UUID : this.id = getServiceManager().getId(); 這個(gè)entryName通過UUID可以區(qū)分是哪個(gè)應(yīng)用實(shí)例 entryName+threadId可以區(qū)分哪個(gè)應(yīng)

    2024年02月11日
    瀏覽(20)
  • MySQL修煉手冊(cè)4:分組與聚合:GROUP BY與HAVING的應(yīng)用

    MySQL修煉手冊(cè)4:分組與聚合:GROUP BY與HAVING的應(yīng)用

    MySQL數(shù)據(jù)庫的強(qiáng)大功能為我們提供了豐富的數(shù)據(jù)處理工具,其中GROUP BY與HAVING的應(yīng)用使得數(shù)據(jù)的分組與聚合變得更加靈活和高效。在本篇博客中,我們將深入研究GROUP BY與HAVING的基礎(chǔ)知識(shí),并通過實(shí)際案例,展示它們?cè)跀?shù)據(jù)分析中的強(qiáng)大威力。 首先,為了更好地演示GROUP BY與

    2024年02月01日
    瀏覽(17)
  • Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    FlinkSQL 官網(wǎng)配置參數(shù): https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的錯(cuò)誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時(shí)間導(dǎo)致狀態(tài)爆炸。列舉兩個(gè)場(chǎng)景: ? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會(huì)一直保存在狀態(tài)里,不

    2024年02月14日
    瀏覽(20)
  • Pix4Dmapper空間三維模型的應(yīng)用實(shí)例:GIS選址分析

    Pix4Dmapper空間三維模型的應(yīng)用實(shí)例:GIS選址分析

    ??本文介紹基于 無人機(jī)影像建模完成后的結(jié)果 ,利用 ArcMap 軟件進(jìn)行 空間選址分析 ,從而實(shí)現(xiàn)空間三維模型應(yīng)用的方法。 目錄 1 空間分析目標(biāo)確立 2 基于基本約束條件的選址求解 2.1 坡度計(jì)算與提取 2.2 海拔提取 2.3 LAS數(shù)據(jù)初探 2.4 淹沒分析 2.5 區(qū)域相交 2.6 面積約束 3 基于

    2024年02月04日
    瀏覽(23)
  • FlinkSQL 時(shí)間語義、窗口和聚合

    FlinkSQL 時(shí)間語義、窗口和聚合

    目錄 一、時(shí)間語義 1.1 事件時(shí)間 1.1.1 在創(chuàng)建表的DDL中定義 1.1.2?在數(shù)據(jù)流轉(zhuǎn)換為表時(shí)定義 1.2?處理時(shí)間? 1.2.1 在創(chuàng)建表的DDL中定義 二、窗口? 2.1 分組窗口(老版本,已經(jīng)棄用,未來的版本中可能會(huì)刪除) 2.2?窗口表值函數(shù) (Windowing TVFs,新版本,從1.13起) 2.2.1 滾動(dòng)窗口(TUMBLE) 2.2.2?滑動(dòng)

    2024年02月16日
    瀏覽(32)
  • 【海量數(shù)據(jù)挖掘/數(shù)據(jù)分析】之 決策樹模型(決策樹模型、決策樹構(gòu)成、決策樹常用算法、決策樹性能要求、信息增益、信息增益計(jì)算公式、決策樹信息增益計(jì)算實(shí)例)

    【海量數(shù)據(jù)挖掘/數(shù)據(jù)分析】之 決策樹模型(決策樹模型、決策樹構(gòu)成、決策樹常用算法、決策樹性能要求、信息增益、信息增益計(jì)算公式、決策樹信息增益計(jì)算實(shí)例)

    目錄 【海量數(shù)據(jù)挖掘/數(shù)據(jù)分析】之 決策樹模型(決策樹模型、決策樹構(gòu)成、決策樹常用算法、決策樹性能要求、信息增益、信息增益計(jì)算公式、決策樹信息增益計(jì)算實(shí)例) 一、決策樹模型 1、常用算法 2、屬性劃分策略 3、其他算法 三、決策樹算法性能要求 四、 決策樹模型

    2024年02月13日
    瀏覽(21)
  • 【kafka性能測(cè)試腳本詳解、性能測(cè)試、性能分析與性能調(diào)優(yōu)】

    【kafka性能測(cè)試腳本詳解、性能測(cè)試、性能分析與性能調(diào)優(yōu)】

    Apache Kafka 官方提供了兩個(gè)客戶端性能測(cè)試腳本,它們的存放位置如下: 生產(chǎn)者性能測(cè)試腳本:$KAFKA_HOME/bin/kafka-producer-perf-test.sh 消費(fèi)者性能測(cè)試腳本:$KAFKA_HOME/bin/kafka-consumer-perf-test.sh kafka-producer-perf-test.sh 支持測(cè)試的性能指標(biāo)包括:吞吐量(throughput)、最大時(shí)延(max-latenc

    2024年02月04日
    瀏覽(24)
  • 目標(biāo)檢測(cè)算法之YOLOv5的應(yīng)用實(shí)例(零售業(yè)庫存管理、無人機(jī)航拍分析、工業(yè)自動(dòng)化領(lǐng)域應(yīng)用的詳解)

    在零售業(yè)庫存管理中,YOLOv5可以幫助自動(dòng)化商品識(shí)別和庫存盤點(diǎn)過程。通過使用深度學(xué)習(xí)模型來實(shí)時(shí)識(shí)別貨架上的商品,零售商可以更高效地管理庫存,減少人工盤點(diǎn)的時(shí)間和成本。以下是一個(gè)使用YOLOv5進(jìn)行商品識(shí)別的Python腳本示例:

    2024年02月20日
    瀏覽(22)
  • FlinkSQL聚合函數(shù)(Aggregate Function)詳解

    FlinkSQL聚合函數(shù)(Aggregate Function)詳解

    使用場(chǎng)景: 聚合函數(shù)即 UDAF,常?于進(jìn)多條數(shù)據(jù),出?條數(shù)據(jù)的場(chǎng)景。 上圖展示了?個(gè) 聚合函數(shù)的例? 以及 聚合函數(shù)包含的重要?法 。 案例場(chǎng)景: 關(guān)于飲料的表,有三個(gè)字段,分別是 id、name、price,表?有 5 ?數(shù)據(jù),找到所有飲料?最貴的飲料的價(jià)格,即執(zhí)??個(gè) max(

    2024年02月04日
    瀏覽(20)
  • redis性能測(cè)試及瓶頸分析調(diào)優(yōu)

    redis性能測(cè)試及瓶頸分析調(diào)優(yōu)

    一、簡(jiǎn)介 Redis(Remote Dictionary Server ),即遠(yuǎn)程字典服務(wù),是一個(gè)開源的使用ANSI C語言編寫、支持網(wǎng)絡(luò)、可基于內(nèi)存亦可持久化的日志型、Key-Value數(shù)據(jù)庫,并提供多種語言的API mysql與redis的區(qū)別: 類型上mysql是關(guān)系型數(shù)據(jù)庫,而redis是緩存數(shù)據(jù)庫; 作用上mysql用于持久化的存儲(chǔ)數(shù)

    2024年02月06日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包