常見問題
舉個(gè)例子
提交任務(wù)命令:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 隊(duì)列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足夠
-Dtaskmanager.memory.process.size=4096mb \ 單個(gè) TM2~8G 足夠
-Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數(shù) 1core: 1slot 或 2core: 1slot
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
并行度為5,每個(gè)任務(wù)占用槽數(shù)為2,則需要申請(qǐng)3個(gè)容器(2*3=6),JobManager需要一個(gè)容器,共需要4個(gè)容器。6個(gè)vcore+JobManager的1個(gè)vcore共7個(gè)vcore。而實(shí)際上是4個(gè)容器,4個(gè)vcore,這是為什么呢?
實(shí)際運(yùn)行效果:?
Yarn調(diào)度器設(shè)置
這跟yarn的調(diào)度器設(shè)置相關(guān),找到capacity-scheduler.xml
- default的方式只會(huì)參考內(nèi)存來申請(qǐng)容器,不會(huì)考慮cpu的需求。
- 調(diào)整為下面domian的方式,會(huì)綜合考慮內(nèi)存+CPU的需求來申請(qǐng)資源。
調(diào)整后運(yùn)行效果:
刷新一下
?指定容器核心數(shù)
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
一個(gè)容器3個(gè)核,2個(gè)slot,不是1:1的關(guān)系也可以。
slot主要隔離內(nèi)存,不隔離cpu資源。
solt還有一個(gè)共享機(jī)制,一個(gè)slot可以同時(shí)跑多個(gè)task,一個(gè)solt可以不只使用一個(gè)線程。
通常讓系統(tǒng)自動(dòng)來設(shè)置,通常跟solt數(shù)1比1
并行度設(shè)置
- 配置文件:默認(rèn)并行度,默認(rèn)1
- 提交參數(shù):如-p 5
- 代碼env
- 代碼算子
優(yōu)先級(jí)下面的高。
全局并行度計(jì)算
????????開發(fā)完成后,先進(jìn)行壓測(cè)。任務(wù)并行度給 10 以下,測(cè)試單個(gè)并行度的處理上限。然后
總QPS / 單并行度的處理能力 = 并行度
QPS使用高峰期的。
????????開發(fā)完 Flink 作業(yè),壓測(cè)的方式很簡單,先在 kafka 中積壓數(shù)據(jù),之后開啟 Flink 任務(wù),
出現(xiàn)反壓,就是處理瓶頸。相當(dāng)于水庫先積水,一下子泄洪。
????????不能只從 QPS 去得出并行度,因?yàn)橛行┳侄紊佟⑦壿嫼唵蔚娜蝿?wù),單并行度一秒處理
幾萬條數(shù)據(jù)。 而有些數(shù)據(jù)字段多,處理邏輯復(fù)雜, 單并行度一秒只能處理 1000 條數(shù)據(jù)。
最好根據(jù)高峰期的 QPS 壓測(cè), 并行度*1.2 倍,富余一些資源。
查看單個(gè)任務(wù)的輸出量:numRecordsOutPerSecond,單并行度7000條/秒,生成環(huán)境高峰期的qps:30000/s,30000/7000 = 4.x,并行度5,再乘以個(gè)冗余1.2 = 6個(gè)
如果數(shù)據(jù)源是kafka,可以按kafka分區(qū)數(shù)來設(shè)置并行度。?
大部分情況下并行度10以下即可。
Source 端并行度的配置
????????數(shù)據(jù)源端是 Kafka, Source 的并行度設(shè)置為 Kafka 對(duì)應(yīng) Topic 的分區(qū)數(shù)。
????????如果已經(jīng)等于 Kafka 的分區(qū)數(shù), 消費(fèi)速度仍跟不上數(shù)據(jù)生產(chǎn)速度, 考慮下 Kafka 要擴(kuò)
大分區(qū), 同時(shí)調(diào)大并行度等于分區(qū)數(shù)。
????????Flink 的一個(gè)并行度可以處理一至多個(gè)分區(qū)的數(shù)據(jù),如果并行度多于 Kafka 的分區(qū)數(shù),
那么就會(huì)造成有的并行度空閑,浪費(fèi)資源。
Transform 端并行度的配置
Keyby 之前的算子
一般不會(huì)做太重的操作,都是比如 map、 filter、 flatmap 等處理較快的算子,并行度
可以和 source 保持一致。
Keyby 之后的算子
如果并發(fā)較大,建議設(shè)置并行度為 2 的整數(shù)次冪,例如: 128、 256、 512;
小并發(fā)任務(wù)的并行度不一定需要設(shè)置成 2 的整數(shù)次冪;
大并發(fā)任務(wù)如果沒有 KeyBy,并行度也無需設(shè)置為 2 的整數(shù)次冪;文章來源:http://www.zghlxwxcb.cn/news/detail-848021.html
Sink 端并行度的配置
????????Sink 端是數(shù)據(jù)流向下游的地方,可以根據(jù) Sink 端的數(shù)據(jù)量及下游的服務(wù)抗壓能力進(jìn)行評(píng)估。 如果 Sink 端是 Kafka,可以設(shè)為 Kafka 對(duì)應(yīng) Topic 的分區(qū)數(shù)。
????????Sink 端的數(shù)據(jù)量小, 比較常見的就是監(jiān)控告警的場(chǎng)景,并行度可以設(shè)置的小一些。
????????Source 端的數(shù)據(jù)量是最小的,拿到 Source 端流過來的數(shù)據(jù)后做了細(xì)粒度的拆分,數(shù)據(jù)量不斷的增加,到 Sink 端的數(shù)據(jù)量就非常大。那么在 Sink 到下游的存儲(chǔ)中間件的時(shí)候就需要提高并行度。
????????另外 Sink 端要與下游的服務(wù)進(jìn)行交互,并行度還得根據(jù)下游的服務(wù)抗壓能力來設(shè)置,如果在 Flink Sink 這端的數(shù)據(jù)量過大的話, 且 Sink 處并行度也設(shè)置的很大,但下游的服務(wù)完全撐不住這么大的并發(fā)寫入,可能會(huì)造成下游服務(wù)直接被寫掛,所以最終還是要在 Sink處的并行度做一定的權(quán)衡。文章來源地址http://www.zghlxwxcb.cn/news/detail-848021.html
到了這里,關(guān)于【Flink精講】Flink性能調(diào)優(yōu):CPU核數(shù)與并行度的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!