FlinkSQL處理如下實(shí)時(shí)數(shù)據(jù)需求:
實(shí)時(shí)聚合不同 類型/賬號(hào)/發(fā)布時(shí)間 的各個(gè)指標(biāo)數(shù)據(jù),比如:初始化/初始化后刪除/初始化后取消/推送/成功/失敗 的指標(biāo)數(shù)據(jù)。要求實(shí)時(shí)產(chǎn)出指標(biāo)數(shù)據(jù),數(shù)據(jù)源是mysql cdc binlog數(shù)據(jù)。
代碼實(shí)例
--SET table.exec.state.ttl=86400s; --24 hour,默認(rèn): 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默認(rèn): 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
CREATE TABLE kafka_table (
mid bigint,
db string,
sch string,
tab string,
opt string,
ts bigint,
ddl string,
err string,
src map<string,string>,
cur map<string,string>,
cus map<string,string>,
account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE --SECOND
) WITH (
'connector' = 'kafka',
'topic' = 't1',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset', --group-offsets/earliest-offset/latest-offset
-- 'properties.enable.auto.commit',= 'true' -- default:false, 如果為false,則在發(fā)生checkpoint時(shí)觸發(fā)offset提交
'format' = 'json'
);
CREATE TABLE es_sink(
send_type STRING
,account_id STRING
,publish_time STRING
,grouping_id INTEGER
,init INTEGER
,init_cancel INTEGER
,push INTEGER
,succ INTEGER
,fail INTEGER
,init_delete INTEGER
,update_time STRING
,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
'connector' = 'elasticsearch-6',
'index' = 'es_sink',
'document-type' = 'es_sink',
'hosts' = 'http://xxx:9200',
'format' = 'json',
'filter.null-value'='true',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '10mb'
);
CREATE view tmp as
select
send_type,
account_id,
publish_time,
msg_status,
case when UPPER(opt) = 'INSERT' and msg_status='0' then 1 else 0 end AS init,
case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then 1 else 0 end AS init_delete,
event_time,
opt,
ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');
--send_type=1 send_type=0
--初始化->0 初始化->0
--取消->4
--推送->3 推送->3
--成功->1 成功->5
--失敗->2 失敗->6
CREATE view tmp_groupby as
select
COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
when send_type is not null and account_id is null and publish_time is null then 2
when send_type is not null and account_id is not null and publish_time is null then 3
when send_type is not null and account_id is not null and publish_time is not null then 4
end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上
INSERT INTO es_sink
select
send_type
,account_id
,publish_time
,grouping_id
,init
,init_cancel
,push
,succ
,fail
,init_delete
,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby
其他配置
- flink集群參數(shù)
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
-
檢查點(diǎn)配置
-
job運(yùn)行資源
管理節(jié)點(diǎn)(JM) 1 個(gè), 節(jié)點(diǎn)規(guī)格 1 核 4 GB內(nèi)存, 磁盤 10Gi
運(yùn)行節(jié)點(diǎn)(TM)10 個(gè), 節(jié)點(diǎn)規(guī)格 1 核 4 GB內(nèi)存, 磁盤 80Gi
單TM槽位數(shù)(Slot): 1
默認(rèn)并行度:8 -
es mapping
#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{
"app_cust_syyy_private_domain_syyy_group_msg": {
"properties": {
"send_type": {
"type": "keyword",
"ignore_above": 256
},
"account_id": {
"type": "keyword"
},
"publish_time": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
"ignore_malformed":"true" # 忽略錯(cuò)誤的各式
}
}
},
"grouping_id": {
"type": "integer"
},
"init": {
"type": "integer"
},
"init_cancel": {
"type": "integer"
},
"query": {
"type": "integer"
},
"succ": {
"type": "integer"
},
"fail": {
"type": "integer"
},
"init_delete": {
"type": "integer"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
性能調(diào)優(yōu)
是否開啟【MiniBatch 聚合】和【Local-Global 聚合】對(duì)分組聚合場(chǎng)景影響巨大,尤其是在數(shù)據(jù)量大的場(chǎng)景下。
-
如果未開啟,在分組聚合,數(shù)據(jù)更新狀態(tài)時(shí),每條數(shù)據(jù)都會(huì)觸發(fā)聚合運(yùn)算,進(jìn)而更新StateBackend (尤其是對(duì)于 RocksDB StateBackend,火焰圖上反映就是一直在update rocksdb),造成上游算子背壓特別大。此外,生產(chǎn)中非常常見的數(shù)據(jù)傾斜會(huì)使這個(gè)問題惡化,并且容易導(dǎo)致 job 發(fā)生反壓。
-
在開啟【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
開啟配置好會(huì)在DAG上添加兩個(gè)環(huán)節(jié)MiniBatchAssigner和LocalGroupAggregate
對(duì)結(jié)果的影響
開啟了【MiniBatch 聚合】和【Local-Global 聚合】后,一天處理不完的數(shù)據(jù),在10分鐘內(nèi)處理完畢
輸出結(jié)果
文章來源:http://www.zghlxwxcb.cn/news/detail-796621.html
參考:
Group Aggregation
Streaming Aggregation Performance Tuning文章來源地址http://www.zghlxwxcb.cn/news/detail-796621.html
到了這里,關(guān)于FlinkSQL【分組聚合-多維分析-性能調(diào)優(yōu)】應(yīng)用實(shí)例分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!