国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink性能優(yōu)化小結(jié)

這篇具有很好參考價(jià)值的文章主要介紹了Flink性能優(yōu)化小結(jié)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

  • jvm內(nèi)存優(yōu)化

  • 內(nèi)存優(yōu)化

  • netty優(yōu)化

  • akka優(yōu)化

  • 并行度優(yōu)化

  • 對(duì)象重用

  • checkpoint優(yōu)化

  • 網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu)

  • 狀態(tài)優(yōu)化

  • flink數(shù)據(jù)傾斜優(yōu)化

  • flink背壓

jvm內(nèi)存參數(shù)調(diào)優(yōu)

Flink是依賴內(nèi)存計(jì)算,計(jì)算過(guò)程中內(nèi)存不夠?qū)link的執(zhí)行效率影響很大。可以通過(guò)監(jiān)控GC(Garbage Collection),評(píng)估內(nèi)存使用及剩余情況來(lái)判斷內(nèi)存是否變成性能瓶頸,并根據(jù)情況優(yōu)化。
監(jiān)控節(jié)點(diǎn)進(jìn)程的YARN的Container GC日志,如果頻繁出現(xiàn)Full GC,需要優(yōu)化GC。

GC的配置:在客戶端的"conf/flink-conf.yaml"配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):

-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M

此處默認(rèn)已經(jīng)添加GC日志。

調(diào)整老年代和新生代的比值。在客戶端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):“-XX:NewRatio”。如“ -XX:NewRatio=2”,則表示老年代與新生代的比值為2:1,新生代占整個(gè)堆空間的1/3,老年代占2/3。

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

可以通過(guò)設(shè)置?jobmanager.memory.enable-jvm-direct-memory-limit?對(duì) JobManager 進(jìn)程的?JVM 直接內(nèi)存進(jìn)行限制

Flink內(nèi)存調(diào)優(yōu)

flink進(jìn)程內(nèi)存

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

?jobmanager相關(guān)配置:

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

taskamanger相關(guān)配置:

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

yarn相關(guān)的配置:
yarn.appmaster.vcores    YARN應(yīng)用程序主機(jī)使用的虛擬核心(vcore)的數(shù)量。yarn.containers.vcores   每個(gè)YARN容器的虛擬核心數(shù)(vcore)。默認(rèn)情況下,vcore數(shù)設(shè)置為每個(gè)TaskManager的插槽數(shù)(如果已設(shè)置),否則設(shè)置為1。為了使用此參數(shù),您的群集必須啟用CPU調(diào)度。您可以通過(guò)設(shè)置來(lái)做到這一點(diǎn)org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。
yarn.scheduler.maximum-allocation-vcoresyarn.scheduler.minimum-allocation-vcoresFlink單個(gè)task manager的slot數(shù)量必須介于這兩個(gè)值之間
yarn.scheduler.maximum-allocation-mbyarn.scheduler.minimum-allocation-mbFlink的job manager 和task manager內(nèi)存不得超過(guò)container最大分配內(nèi)存大小。
yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)的2-3倍,如果設(shè)置過(guò)少,會(huì)導(dǎo)致CPU資源無(wú)法被充分利用,跑任務(wù)的時(shí)候CPU占用率不高。

netty優(yōu)化

Netty Shuffle環(huán)境

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

Network Communication (via Netty)

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

akka優(yōu)化

akka.ask.callstack ???捕獲異步請(qǐng)求的調(diào)用堆棧。注意,如果有數(shù)百萬(wàn)個(gè)并發(fā)RPC調(diào)用,這可能會(huì)增加內(nèi)存占用。
akka.ask.timeout      用于期望并阻止Akka呼叫的超時(shí)。如果Flink由于超時(shí)而失敗,則應(yīng)嘗試增加此值。超時(shí)可能是由于計(jì)算機(jī)運(yùn)行緩慢或網(wǎng)絡(luò)擁塞引起的。超時(shí)值需要一個(gè)時(shí)間單位說(shuō)明符(ms / s / min / h / d)。
akka.client-socket-worker-pool.pool-size-factor  池大小因子使用以下公式確定線程池大小:ceil(available processors * factor)。然后,結(jié)果大小受pool-size-min和pool-size-max值限制。
akka.client-socket-worker-pool.pool-size-max??要限制基于因素的最大線程數(shù)。
akka.client-socket-worker-pool.pool-size-min??最小線程數(shù)以上限為基礎(chǔ)。
akka.client.timeout  60s 客戶端上所有阻塞呼叫的超時(shí)。
akka.fork-join-executor.parallelism-factor??并行度因子用于通過(guò)以下公式確定線程池大?。篶eil(available processors * factor)。然后,所得到的大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max  最大線程數(shù)上限為基于因子的并行數(shù)。
akka.fork-join-executor.parallelism-min??最小線程數(shù)以基于因素的并行度為上限。
akka.framesize  10485760b(10MB)  在JobManager和TaskManager之間發(fā)送的消息的最大大小。如果Flink因消息超出此限制而失敗,則應(yīng)增加該限制。消息大小需要大小單位說(shuō)明符。
akka.fork-join-executor.parallelism-factor  并行度因子用于使用以下公式確定線程池大小:ceil(可用處理器*因子)。然后,結(jié)果大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max      基于并行度的最大線程數(shù)上限
akka.fork-join-executor.parallelism-min      基于并行度的最大線程數(shù)下限
akka.framesize     JobManager和TaskManager之間發(fā)送的最大消息大小。如果Flink失敗是因?yàn)橄⒊^(guò)了這個(gè)限制,那么您應(yīng)該增加它。消息大小需要大小單位說(shuō)明符。
akka.retry-gate-closed-for    遠(yuǎn)程連接斷開(kāi)后,閘門(mén)應(yīng)關(guān)閉幾毫秒。
akka.server-socket-worker-pool.pool-size-factor   池大小因子用于使用以下公式確定線程池大?。篶eil(可用處理器*因子)。然后,結(jié)果大小由池大小最小值和池大小最大值限定。
akka.server-socket-worker-pool.pool-size-max     基于上限因子的最大線程數(shù)。
akka.server-socket-worker-pool.pool-size-min     基于上限因子的最小線程數(shù)
akka.tcp.timeout 所有出站連接超時(shí)。如果由于網(wǎng)絡(luò)速度慢而在連接TaskManager時(shí)遇到問(wèn)題,則應(yīng)增加此值。
akka.startup-timeout  超時(shí)之后,遠(yuǎn)程組件的啟動(dòng)被視為失敗。

并行度優(yōu)化

當(dāng)分區(qū)導(dǎo)致數(shù)據(jù)傾斜時(shí),需要考慮優(yōu)化分區(qū)。避免非并行度操作,有些對(duì)DataStream的操作會(huì)導(dǎo)致無(wú)法并行,例如WindowAll。keyBy盡量不要使用String。
并行度控制任務(wù)的數(shù)量,影響操作后數(shù)據(jù)被切分成的塊數(shù)。調(diào)整并行度讓任務(wù)的數(shù)量和每個(gè)任務(wù)處理的數(shù)據(jù)與機(jī)器的處理能力達(dá)到最優(yōu)。查看CPU使用情況和內(nèi)存占用情況,當(dāng)任務(wù)和數(shù)據(jù)不是平均分布在各節(jié)點(diǎn),而是集中在個(gè)別節(jié)點(diǎn)時(shí),可以增大并行度使任務(wù)和數(shù)據(jù)更均勻的分布在各個(gè)節(jié)點(diǎn)。增加任務(wù)的并行度,充分利用集群機(jī)器的計(jì)算能力,一般并行度設(shè)置為集群CPU核數(shù)總和的2-3倍。
taskmanager個(gè)數(shù)

num_of_tm = ceil(parallelism / slot) 即并行度除以slot個(gè)數(shù),結(jié)果向上取整。
算子層面并行度設(shè)置:

通過(guò)調(diào)用setParallelism()方法來(lái)指定

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = [...]DataStream<Tuple2<String, Integer>> wordCounts = text    .flatMap(new LineSplitter())    .keyBy(0)    .timeWindow(Time.seconds(5))    .sum(1).setParallelism(5);wordCounts.print();env.execute("Word?Count?Example");
執(zhí)行環(huán)境層次

Flink程序運(yùn)行在執(zhí)行環(huán)境中。執(zhí)行環(huán)境為所有執(zhí)行的算子、數(shù)據(jù)源、data sink定義了一個(gè)默認(rèn)的并行度。

執(zhí)行環(huán)境的默認(rèn)并行度可以通過(guò)調(diào)用setParallelism()方法指定。例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setParallelism(3);    DataStream<String> text = [...]    DataStream<Tuple2<String, Integer>> wordCounts = [...]    wordCounts.print();????env.execute("Word?Count?Example");
客戶端層次

并行度可以在客戶端將job提交到Flink時(shí)設(shè)定。對(duì)于CLI客戶端,可以通過(guò)“-p”參數(shù)指定并行度。例如:./bin/flink run -p 10 ../examples/WordCount-java.jar

對(duì)象重用

對(duì)象重用的本質(zhì)就是在算子鏈中的下游算子使用上游對(duì)象的淺拷貝。若關(guān)閉對(duì)象重用,則必須經(jīng)過(guò)一輪序列化和反序列化,相當(dāng)于深拷貝,所以就不能100%地發(fā)揮算子鏈的優(yōu)化效果。

但正所謂魚(yú)與熊掌不可兼得,若啟用了對(duì)象重用,那么我們的業(yè)務(wù)代碼中必然不能出現(xiàn)以下兩種情況,以免造成混亂:

  • 在下游修改上游發(fā)射的對(duì)象,或者上游存入其State中的對(duì)象;

  • 同一條流直接對(duì)接多個(gè)處理邏輯(如stream.map(new AFunc())的同時(shí)還有stream.map(new BFunc()))。

總之,在enableObjectReuse()之前,需要謹(jǐn)慎評(píng)估業(yè)務(wù)代碼是否會(huì)帶來(lái)副作用。社區(qū)大佬David Anderson曾在Stack Overflow上給出了一個(gè)簡(jiǎn)單明晰的回答,可參見(jiàn)這里。

env.getConfig().enableObjectReuse();

當(dāng)調(diào)用了 enableObjectReuse 方法后, Flink 會(huì)把中間深拷貝的步驟都省略掉,SourceFunction 產(chǎn)生的數(shù)據(jù)直接作為 MapFunction 的輸入,可以減少 gc 壓力。但需要特別注意的是,這個(gè)方法不能隨便調(diào)用,必須要確保下游 Function 只有一種(也就是一個(gè)流只會(huì)被一個(gè)算子處理),或者下游的多個(gè) Function 均不會(huì)改變對(duì)象內(nèi)部的值。否則可能會(huì)有線程安全的問(wèn)題。

checkpoint優(yōu)化

監(jiān)控checkpoint?
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/monitoring/checkpoint_monitoring/

Checkpoint 時(shí)間間隔,需要根據(jù)業(yè)務(wù)場(chǎng)景對(duì)時(shí)效性的要求而定。如果時(shí)效性要求不高,可以設(shè)置到分鐘級(jí)別,比如5分鐘、10分鐘;如果對(duì)時(shí)效性要求很高,結(jié)合 flink ?控制頁(yè)面 Checkpoints 的Summary 中的 End to End Duration,通過(guò)最大值、最小值和平均值,合理設(shè)置時(shí)間間隔。注意,時(shí)間間隔需要比 End to End Duration 的時(shí)間要長(zhǎng),否則,可能會(huì)導(dǎo)致上一個(gè) checkpoint 沒(méi)結(jié)束,下一個(gè) checkpoint 已經(jīng)開(kāi)始。為了避免這一情況的發(fā)生,除了設(shè)置時(shí)間間隔,兩次 checkpoint 的最小時(shí)間間隔也可以起到作用,該配置決定在上一次 checkpoint 結(jié)束之后,至少等待多長(zhǎng)時(shí)間開(kāi)始下一次的 checkpoint。

設(shè)置原則:
  • Checkpoint 時(shí)間間隔不易過(guò)大。一般來(lái)說(shuō),Checkpoint 時(shí)間間隔越長(zhǎng),需要生產(chǎn)的 State 就越大。如此一來(lái),當(dāng)失敗恢復(fù)時(shí),需要更長(zhǎng)的追趕時(shí)間。

  • Checkpoint 時(shí)間間隔不易過(guò)小。如果 Checkpoint 時(shí)間間隔太小,那么 Flink 應(yīng)用程序就會(huì)頻繁 Checkpoint,導(dǎo)致部分資源被占有,無(wú)法專(zhuān)注地進(jìn)行數(shù)據(jù)處理。

  • Checkpoint 時(shí)間間隔大于 Checkpoint 的生產(chǎn)時(shí)間。當(dāng) Checkpoint 時(shí)間間隔比 Checkpoint 生產(chǎn)時(shí)間長(zhǎng)時(shí),在上次 Checkpoint 完成時(shí),不會(huì)立刻進(jìn)行下一次 Checkpoint,而是會(huì)等待一段時(shí)間,之后再進(jìn)行新的 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立即開(kāi)始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源被 Checkpoint 占用,而真正任務(wù)計(jì)算的資源就會(huì)變少。

  • 開(kāi)啟本地恢復(fù)。如果 Flink State 很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)上讀取 State 進(jìn)行恢復(fù),如果 State 文件過(guò)大,此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,大量的時(shí)間浪費(fèi)在網(wǎng)絡(luò)傳輸方面。此時(shí)可以設(shè)置 Flink 應(yīng)用程序本地 State 恢復(fù),應(yīng)用程序 State 本地恢復(fù)默認(rèn)沒(méi)有開(kāi)啟,可以設(shè)置參數(shù) state.backend.local-recovery 值為 true 進(jìn)行激活。

  • 設(shè)置 Checkpoint 保存數(shù)。Checkpoint 保存數(shù)默認(rèn)是 1,也就是只保存最新的 Checkpoint 的 State 文件,當(dāng)進(jìn)行 State 恢復(fù)時(shí),如果最新的 Checkpoint 文件不可用時(shí) (比如文件損壞或者其他原因),那么 State 恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù) 3,即使最新的 Checkpoint 恢復(fù)失敗,那么 Flink 也會(huì)回滾到上一次 Checkpoint 的狀態(tài)文件進(jìn)行恢復(fù)??紤]到這種情況,可以通過(guò) state.checkpoints.num-retained 設(shè)置 Checkpoint 保存數(shù)。

// 使? RocksDBStateBackend 做為狀態(tài)后端,并開(kāi)啟增量 CheckpointRocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true);env.setStateBackend(rocksDBStateBackend);// 開(kāi)啟 Checkpoint,間隔為 1 分鐘env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));// 配置 CheckpointCheckpointConfig checkpointConf = env.getCheckpointConfig();checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 最小間隔 2 分鐘checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2))// 超時(shí)時(shí)間 10 分鐘checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));// 保存 checkpointcheckpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
配置 task 本地恢復(fù)

Task 本地恢復(fù)?默認(rèn)禁用,可以通過(guò) Flink 的?CheckpointingOptions.LOCAL_RECOVERY?配置中指定的鍵 state.backend.local-recovery 來(lái)啟用。此設(shè)置的值可以是?true?以啟用或?false(默認(rèn))以禁用本地恢復(fù)。

注意,unaligned checkpoints 目前不支持 task 本地恢復(fù)。

參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)

參考官網(wǎng):

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/large_state_tuning/

網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu)

緩沖消脹機(jī)制(Buffer Debloating)

緩沖消脹機(jī)制嘗試通過(guò)自動(dòng)調(diào)整緩沖數(shù)據(jù)量到一個(gè)合理值來(lái)解決這個(gè)問(wèn)題。

緩沖消脹功能計(jì)算 subtask 可能達(dá)到的最大吞吐(始終保持繁忙狀態(tài)時(shí))并且通過(guò)調(diào)整緩沖數(shù)據(jù)量來(lái)使得數(shù)據(jù)的消費(fèi)時(shí)間達(dá)到配置值。

可以通過(guò)設(shè)置?taskmanager.network.memory.buffer-debloat.enabled?為?true?來(lái)開(kāi)啟緩沖消脹機(jī)制。通過(guò)設(shè)置?taskmanager.network.memory.buffer-debloat.target?為?duration?類(lèi)型的值來(lái)指定消費(fèi)緩沖數(shù)據(jù)的目標(biāo)時(shí)間。默認(rèn)值應(yīng)該能滿足大多數(shù)場(chǎng)景。

這個(gè)功能使用過(guò)去的吞吐數(shù)據(jù)來(lái)預(yù)測(cè)消費(fèi)剩余緩沖數(shù)據(jù)的時(shí)間。如果預(yù)測(cè)不準(zhǔn),緩沖消脹機(jī)制會(huì)導(dǎo)致以下問(wèn)題:

  • 沒(méi)有足夠的緩存數(shù)據(jù)來(lái)提供全量吞吐。

  • 有太多緩沖數(shù)據(jù)對(duì) checkpoint barrier 推進(jìn)或者非對(duì)齊的 checkpoint 的大小造成不良影響。

如果您的作業(yè)負(fù)載經(jīng)常變化(即,突如其來(lái)的數(shù)據(jù)尖峰,定期的窗口聚合觸發(fā)或者 join ),您可能需要調(diào)整以下設(shè)置:

  • taskmanager.network.memory.buffer-debloat.period:這是緩沖區(qū)大小重算的最小時(shí)間周期。周期越小,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是必要的計(jì)算會(huì)消耗更多的CPU。

  • taskmanager.network.memory.buffer-debloat.samples:調(diào)整用于計(jì)算平均吞吐量的采樣數(shù)。采集樣本的頻率可以通過(guò) taskmanager.network.memory.buffer-debloat.period 來(lái)設(shè)置。樣本數(shù)越少,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是當(dāng)吞吐量突然飆升或者下降時(shí),緩沖消脹機(jī)制計(jì)算的最佳緩沖數(shù)據(jù)量會(huì)更容易出錯(cuò)。

  • taskmanager.network.memory.buffer-debloat.threshold-percentages:防止緩沖區(qū)大小頻繁改變的優(yōu)化(比如,新的大小跟舊的大小相差不大)。

您可以使用以下指標(biāo)來(lái)監(jiān)控當(dāng)前的緩沖區(qū)大?。?/p>

  • estimatedTimeToConsumeBuffersMs:消費(fèi)所有輸入通道(input channel)中數(shù)據(jù)的總時(shí)間。

  • debloatedBufferSize:當(dāng)前的緩沖區(qū)大小。

限制
多個(gè)輸入和合并

當(dāng)前,吞吐計(jì)算和緩沖消脹發(fā)生在 subtask 層面。

如果您的 subtask 有很多不同的輸入或者有一個(gè)合并的輸入,緩沖消脹可能會(huì)導(dǎo)致低吞吐的輸入有太多緩沖數(shù)據(jù),而高吞吐輸入的緩沖區(qū)數(shù)量可能太少而不夠維持當(dāng)前吞吐。當(dāng)不同的輸入吞吐差別比較大時(shí),這種現(xiàn)象會(huì)更加的明顯。我們推薦您在測(cè)試這個(gè)功能時(shí)重點(diǎn)關(guān)注這種 subtask。

緩沖區(qū)的尺寸和個(gè)數(shù)

當(dāng)前,緩沖消脹僅在使用的緩沖區(qū)大小上設(shè)置上限。實(shí)際的緩沖區(qū)大小和個(gè)數(shù)保持不變。這意味著緩沖消脹機(jī)制不會(huì)減少作業(yè)的內(nèi)存使用。您應(yīng)該手動(dòng)減少緩沖區(qū)的大小或者個(gè)數(shù)。

此外,如果您想減少緩沖數(shù)據(jù)量使其低于緩沖消脹當(dāng)前允許的量,您可能需要手動(dòng)的設(shè)置緩沖區(qū)的個(gè)數(shù)。

高并行度

目前,使用默認(rèn)配置,緩沖區(qū)去塊機(jī)制可能無(wú)法在高并行度(約200以上)下正確執(zhí)行。如果您觀察到吞吐量降低或檢查點(diǎn)時(shí)間高于預(yù)期,我們建議將浮動(dòng)緩沖區(qū)(taskmanager.network.memory.foating buffers per gate)的數(shù)量從默認(rèn)值增加到至少等于并行度的數(shù)量。

發(fā)生問(wèn)題的并行度的實(shí)際值因作業(yè)而異,但通常應(yīng)該超過(guò)幾百。

網(wǎng)絡(luò)緩沖生命周期?

Flink 有多個(gè)本地緩沖區(qū)池 —— 每個(gè)輸出和輸入流對(duì)應(yīng)一個(gè)。每個(gè)緩沖區(qū)池的大小被限制為

channels?*?taskmanager.network.memory.buffers-per-channel?+?taskmanager.network.memory.floating-buffers-per-gate

緩沖區(qū)的大小可以通過(guò)?taskmanager.memory.segment-size?來(lái)設(shè)置。

輸入網(wǎng)絡(luò)緩沖?

輸入通道中的緩沖區(qū)被分為獨(dú)占緩沖區(qū)(exclusive buffer)和流動(dòng)緩沖區(qū)(floating buffer)。每個(gè)獨(dú)占緩沖區(qū)只能被一個(gè)特定的通道使用。一個(gè)通道可以從輸入流的共享緩沖區(qū)池中申請(qǐng)額外的流動(dòng)緩沖區(qū)。剩余的流動(dòng)緩沖區(qū)是可選的并且只有資源足夠的時(shí)候才能獲取。

在初始階段:

  • Flink 會(huì)為每一個(gè)輸入通道獲取配置數(shù)量的獨(dú)占緩沖區(qū)。

  • 所有的獨(dú)占緩沖區(qū)都必須被滿足,否則作業(yè)會(huì)拋異常失敗。

  • Flink 至少要有一個(gè)流動(dòng)緩沖區(qū)才能運(yùn)行。

輸出網(wǎng)絡(luò)緩沖?

不像輸入緩沖區(qū)池,輸出緩沖區(qū)池只有一種類(lèi)型的緩沖區(qū)被所有的 subpartitions 共享。

為了避免過(guò)多的數(shù)據(jù)傾斜,每個(gè) subpartition 的緩沖區(qū)數(shù)量可以通過(guò)?taskmanager.network.memory.max-buffers-per-channel?來(lái)限制。

不同于輸入緩沖區(qū)池,這里配置的獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)只被當(dāng)作推薦值。如果沒(méi)有足夠的緩沖區(qū),每個(gè)輸出 subpartition 可以只使用一個(gè)獨(dú)占緩沖區(qū)而沒(méi)有流動(dòng)緩沖區(qū)。

透支緩沖區(qū)(Overdraft buffers)?

另外,每個(gè) subtask 輸出數(shù)據(jù)時(shí)可以至多請(qǐng)求?taskmanager.network.memory.max-overdraft-buffers-per-gate?(默認(rèn) 5)個(gè)額外的透支緩沖區(qū)(overdraft buffers)。只有當(dāng)前 subtask 被下游 subtasks 反壓且當(dāng)前 subtask 需要 請(qǐng)求超過(guò) 1 個(gè)網(wǎng)絡(luò)緩沖區(qū)(network buffer)才能完成當(dāng)前的操作時(shí),透支緩沖區(qū)才會(huì)被使用??赡馨l(fā)生在以下情況:

  • 序列化非常大的 records,不能放到單個(gè)網(wǎng)絡(luò)緩沖區(qū)中。

  • 類(lèi)似 flatmap 的算子,即:處理單個(gè) record 時(shí)可能會(huì)生產(chǎn)多個(gè) records。

  • 周期性地或某些事件觸發(fā)產(chǎn)生大量 records 的算子(例如:WindowOperator?的觸發(fā))。

在這些情況下,如果沒(méi)有透支緩沖區(qū),F(xiàn)link 的 subtask 線程會(huì)被阻塞在反壓,從而阻止例如 Unaligned Checkpoint 的完成。為了緩解這種情況,增加了透支緩沖區(qū)的概念。這些透支緩沖區(qū)是可選的,F(xiàn)link 可以僅僅使用常規(guī)的緩沖區(qū)逐漸取得進(jìn)展,也就是 說(shuō)?0?是?taskmanager.network.memory.max-overdraft-buffers-per-gate?可以接受的配置值。

緩沖區(qū)的數(shù)量?

獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)配置應(yīng)該足以應(yīng)對(duì)最大吞吐。如果想要最小化緩沖數(shù)據(jù)量,那么可以將獨(dú)占緩沖區(qū)設(shè)置為?0,同時(shí)減小內(nèi)存段的大小。

選擇緩沖區(qū)的大小?

在往下游 subtask 發(fā)送數(shù)據(jù)部分時(shí),緩沖區(qū)通過(guò)匯集 record 來(lái)優(yōu)化網(wǎng)絡(luò)開(kāi)銷(xiāo)。下游 subtask 應(yīng)該在接收到完整的 record 后才開(kāi)始處理它。

如果緩沖區(qū)太小,或者緩沖區(qū)刷新太頻繁(execution.buffer-timeout配置參數(shù)),這可能會(huì)導(dǎo)致吞吐量降低,因?yàn)樵贔link的運(yùn)行時(shí),每個(gè)緩沖區(qū)的開(kāi)銷(xiāo)明顯高于每個(gè)記錄的開(kāi)銷(xiāo)。

根據(jù)經(jīng)驗(yàn),我們不建議考慮增加緩沖區(qū)大小或緩沖區(qū)超時(shí),除非您可以在實(shí)際工作負(fù)載中觀察到網(wǎng)絡(luò)瓶頸(下游操作員空閑、上游背壓、輸出緩沖區(qū)隊(duì)列已滿、下游輸入隊(duì)列為空)。

如果緩沖區(qū)太大,會(huì)導(dǎo)致:

  • 內(nèi)存使用高

  • 大量的 checkpoint 數(shù)據(jù)量(針對(duì)非對(duì)齊的 checkpoints)

  • 漫長(zhǎng)的 checkpoint 時(shí)間(針對(duì)對(duì)齊的 checkpoints)

  • execution.buffer-timeout?較小時(shí)內(nèi)存分配使用率會(huì)比較低,因?yàn)榫彌_區(qū)還沒(méi)被塞滿數(shù)據(jù)就被發(fā)送下去了。

選擇緩沖區(qū)的數(shù)量?

緩沖區(qū)的數(shù)量是通過(guò)?taskmanager.network.memory.buffers-per-channel?和?taskmanager.network.memory.floating-buffers-per-gate?來(lái)配置的。

為了最好的吞吐率,我們建議使用獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)值(except you have one of limit cases)。如果緩沖數(shù)據(jù)量存在問(wèn)題,更建議打開(kāi)緩沖消脹。

您可以人工地調(diào)整網(wǎng)絡(luò)緩沖區(qū)的個(gè)數(shù),但是需要注意:

  1. 您應(yīng)該根據(jù)期待的吞吐量(單位?bytes/second)來(lái)調(diào)整緩沖區(qū)的數(shù)量。協(xié)調(diào)數(shù)據(jù)傳輸量(大約兩個(gè)節(jié)點(diǎn)之間的兩個(gè)往返消息)。延遲也取決于您的網(wǎng)絡(luò)。

使用 buffer 往返時(shí)間(大概?1ms?在正常的本地網(wǎng)絡(luò)中),緩沖區(qū)大小和期待的吞吐,您可以通過(guò)下面的公式計(jì)算維持吞吐所需要的緩沖區(qū)數(shù)量:

number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size

比如,期待吞吐為?320MB/s,往返延遲為?1ms,內(nèi)存段為默認(rèn)大小,為了維持吞吐需要使用10個(gè)活躍的緩沖區(qū):

number_of_buffers = 320MB/s * 1ms / 32KB = 10
  1. 流動(dòng)緩沖區(qū)的目的是為了處理數(shù)據(jù)傾斜。理想情況下,流動(dòng)緩沖區(qū)的數(shù)量(默認(rèn)8個(gè))和每個(gè)通道獨(dú)占緩沖區(qū)的數(shù)量(默認(rèn)2個(gè))能夠使網(wǎng)絡(luò)吞吐量飽和。但這并不總是可行和必要的。所有 subtask 中只有一個(gè)通道被使用也是非常罕見(jiàn)的。

  2. 獨(dú)占緩沖區(qū)的目的是提供一個(gè)流暢的吞吐量。當(dāng)一個(gè)緩沖區(qū)在傳輸數(shù)據(jù)時(shí),另一個(gè)緩沖區(qū)被填充。當(dāng)吞吐量比較高時(shí),獨(dú)占緩沖區(qū)的數(shù)量是決定 Flink 中緩沖數(shù)據(jù)的主要因素。

當(dāng)?shù)屯掏铝肯鲁霈F(xiàn)反壓時(shí),您應(yīng)該考慮減少獨(dú)占緩沖區(qū)。

總結(jié):

可以通過(guò)開(kāi)啟緩沖消脹機(jī)制來(lái)簡(jiǎn)化 Flink 網(wǎng)絡(luò)的內(nèi)存配置調(diào)整。您也可能需要調(diào)整它。

如果這不起作用,您可以關(guān)閉緩沖消脹機(jī)制并且人工地配置內(nèi)存段的大小和緩沖區(qū)個(gè)數(shù)。針對(duì)第二種場(chǎng)景,我們推薦:

  • 使用默認(rèn)值以獲得最大吞吐

  • 減少內(nèi)存段大小、獨(dú)占緩沖區(qū)的數(shù)量來(lái)加快 checkpoint 并減少網(wǎng)絡(luò)棧消耗的內(nèi)存量

flink狀態(tài)優(yōu)化

使用rocksdb狀態(tài)后端開(kāi)啟增量檢查點(diǎn)


Tuning MemTable

memtable 作為 LSM Tree 體系里的讀寫(xiě)緩存,對(duì)寫(xiě)性能有較大的影響。以下是一些值得注意的參數(shù)。為方便對(duì)比,下文都會(huì)將 RocksDB 的原始參數(shù)名與 Flink 配置中的參數(shù)名一并列出,用豎線分割。

  • write_buffer_size | state.backend.rocksdb.writebuffer.size單個(gè) memtable 的大小,默認(rèn)是64MB。當(dāng) memtable 大小達(dá)到此閾值時(shí),就會(huì)被標(biāo)記為不可變。一般來(lái)講,適當(dāng)增大這個(gè)參數(shù)可以減小寫(xiě)放大帶來(lái)的影響,但同時(shí)會(huì)增大 flush 后 L0、L1 層的壓力,所以還需要配合修改 compaction 參數(shù),后面再提。

  • max_write_buffer_number | state.backend.rocksdb.writebuffer.countmemtable?的最大數(shù)量(包含活躍的和不可變的),默認(rèn)是2。當(dāng)全部 memtable 都寫(xiě)滿但是 flush 速度較慢時(shí),就會(huì)造成寫(xiě)停頓,所以如果內(nèi)存充足或者使用的是機(jī)械硬盤(pán),建議適當(dāng)調(diào)大這個(gè)參數(shù),如4。

  • min_write_buffer_number_to_merge | state.backend.rocksdb.writebuffer.number-to-merge在 flush 發(fā)生之前被合并的 memtable 最小數(shù)量,默認(rèn)是1。舉個(gè)例子,如果此參數(shù)設(shè)為2,那么當(dāng)有至少兩個(gè)不可變 memtable 時(shí),才有可能觸發(fā) flush(亦即如果只有一個(gè)不可變 memtable,就會(huì)等待)。調(diào)大這個(gè)值的好處是可以使更多的更改在 flush 前就被合并,降低寫(xiě)放大,但同時(shí)又可能增加讀放大,因?yàn)樽x取數(shù)據(jù)時(shí)要檢查的 memtable 變多了。經(jīng)測(cè)試,該參數(shù)設(shè)為2或3相對(duì)較好。

Tuning Block/Block Cache

block 是 sstable 的基本存儲(chǔ)單位。block cache 則扮演讀緩存的角色,采用 LRU 算法存儲(chǔ)最近使用的 block,對(duì)讀性能有較大的影響。

  • block_size | state.backend.rocksdb.block.blocksizeblock 的大小,默認(rèn)值為4KB。在生產(chǎn)環(huán)境中總是會(huì)適當(dāng)調(diào)大一些,一般32KB比較合適,對(duì)于機(jī)械硬盤(pán)可以再增大到128~256KB,充分利用其順序讀取能力。但是需要注意,如果 block 大小增大而 block cache 大小不變,那么緩存的 block 數(shù)量會(huì)減少,無(wú)形中會(huì)增加讀放大。

  • block_cache_size | state.backend.rocksdb.block.cache-sizeblock cache 的大小,默認(rèn)為8MB。由上文所述的讀寫(xiě)流程可知,較大的 block cache 可以有效避免熱數(shù)據(jù)的讀請(qǐng)求落到 sstable 上,所以若內(nèi)存余量充足,建議設(shè)置到128MB甚至256MB,讀性能會(huì)有非常明顯的提升。

Tuning Compaction

compaction 在所有基于 LSM Tree 的存儲(chǔ)引擎中都是開(kāi)銷(xiāo)最大的操作,弄不好的話會(huì)非常容易阻塞讀寫(xiě)。建議看官先讀讀前面那篇關(guān)于 RocksDB 的 compaction 策略的文章,獲取一些背景知識(shí),這里不再贅述。

  • compaction_style | state.backend.rocksdb.compaction.stylecompaction 算法,使用默認(rèn)的 LEVEL(即 leveled compaction)即可,下面的參數(shù)也是基于此。

  • target_file_size_base | state.backend.rocksdb.compaction.level.target-file-size-baseL1層單個(gè) sstable 文件的大小閾值,默認(rèn)值為64MB。每向上提升一級(jí),閾值會(huì)乘以因子 target_file_size_multiplier(但默認(rèn)為1,即每級(jí)sstable最大都是相同的)。顯然,增大此值可以降低 compaction 的頻率,減少寫(xiě)放大,但是也會(huì)造成舊數(shù)據(jù)無(wú)法及時(shí)清理,從而增加讀放大。此參數(shù)不太容易調(diào)整,一般不建議設(shè)為256MB以上。

  • max_bytes_for_level_base | state.backend.rocksdb.compaction.level.max-size-level-baseL1層的數(shù)據(jù)總大小閾值,默認(rèn)值為256MB。每向上提升一級(jí),閾值會(huì)乘以因子 max_bytes_for_level_multiplier(默認(rèn)值為10)。由于上層的大小閾值都是以它為基礎(chǔ)推算出來(lái)的,所以要小心調(diào)整。建議設(shè)為 target_file_size_base 的倍數(shù),且不能太小,例如5~10倍。

  • level_compaction_dynamic_level_bytes | state.backend.rocksdb.compaction.level.use-dynamic-size這個(gè)參數(shù)之前講過(guò)。當(dāng)開(kāi)啟之后,上述閾值的乘法因子會(huì)變成除法因子,能夠動(dòng)態(tài)調(diào)整每層的數(shù)據(jù)量閾值,使得較多的數(shù)據(jù)可以落在最高一層,能夠減少空間放大,整個(gè) LSM Tree 的結(jié)構(gòu)也會(huì)更穩(wěn)定。對(duì)于機(jī)械硬盤(pán)的環(huán)境,強(qiáng)烈建議開(kāi)啟。

Generic Parameters
  • max_open_files | state.backend.rocksdb.files.open顧名思義,是 RocksDB 實(shí)例能夠打開(kāi)的最大文件數(shù),默認(rèn)為-1,表示不限制。由于sstable的索引和布隆過(guò)濾器默認(rèn)都會(huì)駐留內(nèi)存,并占用文件描述符,所以如果此值太小,索引和布隆過(guò)濾器無(wú)法正常加載,就會(huì)嚴(yán)重拖累讀取性能。

  • max_background_compactions/max_background_flushes|state.backend.rocksdb.thread.num

    后臺(tái)負(fù)責(zé) flush 和 compaction 的最大并發(fā)線程數(shù),默認(rèn)為1。注意 Flink 將這兩個(gè)參數(shù)合二為一處理(對(duì)應(yīng) DBOptions.setIncreaseParallelism() 方法),鑒于 flush 和 compaction 都是相對(duì)重的操作,如果 CPU 余量比較充足,建議調(diào)大,在我們的實(shí)踐中一般設(shè)為4。

參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)

flink數(shù)據(jù)傾斜優(yōu)化

?參考公眾號(hào)鏈接:flink 數(shù)據(jù)傾斜的常見(jiàn)處理方式

背壓優(yōu)化

?反壓監(jiān)控指標(biāo):

  • backPressureTimeMsPerSecond,subtask 被反壓的時(shí)間

  • idleTimeMsPerSecond,subtask 等待某類(lèi)處理的時(shí)間

  • busyTimeMsPerSecond,subtask 實(shí)際工作時(shí)間 在任何時(shí)間點(diǎn),這三個(gè)指標(biāo)相加都約等于1000ms。

web ui觀測(cè):

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

閑置的 tasks 為藍(lán)色,完全被反壓的 tasks 為黑色,完全繁忙的 tasks 被標(biāo)記為紅色。中間的所有值都表示為這三種顏色之間的過(guò)渡色。

Job Overview觀測(cè)反壓狀態(tài):

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

常見(jiàn)解決方式:

  1. 消除背壓源頭,通過(guò)優(yōu)化 Flink 作業(yè),通過(guò)調(diào)整 Flink 或 JVM 參數(shù),或是通過(guò)擴(kuò)容。

  2. 減少 Flink 作業(yè)中緩沖在 In-flight 數(shù)據(jù)的數(shù)據(jù)量。

  3. 啟用非對(duì)齊 Checkpoints。這些選項(xiàng)并不是互斥的,可以組合在一起。本文檔重點(diǎn)介紹后兩個(gè)選項(xiàng)。

  4. 禁用/合并算子鏈chain或者資源槽共享

  5. 先贊批,再寫(xiě)入(滿足實(shí)時(shí)性要求的情況下,異步 io + 熱緩存來(lái)優(yōu)化讀寫(xiě)性能

  6. 增加并行度,增加資源。checkpoint時(shí)長(zhǎng)合理設(shè)置

緩沖區(qū) Debloating

Flink 1.14 引入了一個(gè)新的工具,用于自動(dòng)控制在 Flink 算子/子任務(wù)之間緩沖的 In-flight 數(shù)據(jù)的數(shù)據(jù)量。緩沖區(qū) Debloating 機(jī) 制可以通過(guò)將屬性taskmanager.network.memory.buffer-debloat.enabled設(shè)置為true來(lái)啟用。

此特性對(duì)對(duì)齊和非對(duì)齊 Checkpoint 都生效,并且在這兩種情況下都能縮短 Checkpointing 的時(shí)間,不過(guò) Debloating 的效果對(duì)于 對(duì)齊 Checkpoint 最明顯。當(dāng)在非對(duì)齊 Checkpoint 情況下使用緩沖區(qū) Debloating 時(shí),額外的好處是 Checkpoint 大小會(huì)更小,并且恢復(fù)時(shí)間更快 (需要保存 和恢復(fù)的 In-flight 數(shù)據(jù)更少)。
非對(duì)齊checkpoint

從Flink 1.11開(kāi)始,Checkpoint 可以是非對(duì)齊的。Unaligned checkpoints 包含 In-flight 數(shù)據(jù)(例如,存儲(chǔ)在緩沖區(qū)中的數(shù)據(jù))作為 Checkpoint State的一部分,允許 Checkpoint Barrier 跨越這些緩沖區(qū)。因此, Checkpoint 時(shí)長(zhǎng)變得與當(dāng)前吞吐量無(wú)關(guān),因?yàn)?Checkpoint Barrier 實(shí)際上已經(jīng)不再嵌入到數(shù)據(jù)流當(dāng)中。

如果您的 Checkpointing 由于背壓導(dǎo)致周期非常的長(zhǎng),您應(yīng)該使用非對(duì)齊 Checkpoint。這樣,Checkpointing 時(shí)間基本上就與 端到端延遲無(wú)關(guān)。請(qǐng)注意,非對(duì)齊 Checkpointing 會(huì)增加狀態(tài)存儲(chǔ)的 I/O,因此當(dāng)狀態(tài)存儲(chǔ)的 I/O 是 整個(gè) Checkpointing 過(guò)程當(dāng)中真 正的瓶頸時(shí),您不應(yīng)當(dāng)使用非對(duì)齊 Checkpointing。

為了啟用非對(duì)齊 Checkpoint,您可以:

// 啟用非對(duì)齊 Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();
限制?
并發(fā) Checkpoint?

Flink 當(dāng)前并不支持并發(fā)的非對(duì)齊 Checkpoint。然而,由于更可預(yù)測(cè)的和更短的 Checkpointing 時(shí)長(zhǎng),可能也根本就不需要并發(fā)的 Checkpoint。此外,Savepoint 也不能與非對(duì)齊 Checkpoint 同時(shí)發(fā)生,因此它們將會(huì)花費(fèi)稍長(zhǎng)的時(shí)間。

與 Watermark 的相互影響?

非對(duì)齊 Checkpoint 在恢復(fù)的過(guò)程中改變了關(guān)于 Watermark 的一個(gè)隱式保證。目前,F(xiàn)link 確保了 Watermark 作為恢復(fù)的第一步, 而不是將最近的 Watermark 存放在 Operator 中,以方便擴(kuò)縮容。在非對(duì)齊 Checkpoint 中,這意味著當(dāng)恢復(fù)時(shí),F(xiàn)link 會(huì)在恢復(fù) In-flight 數(shù)據(jù)后再生成 Watermark。如果您的 Pipeline 中使用了對(duì)每條記錄都應(yīng)用最新的 Watermark 的算子將會(huì)相對(duì)于 使用對(duì)齊 Checkpoint產(chǎn)生不同的結(jié)果。如果您的 Operator 依賴于最新的 Watermark 始終可用,解決辦法是將 Watermark 存放在 OperatorState 中。在這種情況下,Watermark 應(yīng)該使用單鍵 group 存放在 UnionState 以方便擴(kuò)縮容。

?與長(zhǎng)時(shí)間運(yùn)行的記錄處理相互作用

即使非對(duì)齊的檢查點(diǎn),但障礙物能夠超過(guò)隊(duì)列中的所有其他記錄。如果當(dāng)前記錄需要大量時(shí)間來(lái)處理,則該屏障的處理仍可能延遲。這種情況可能發(fā)生在同時(shí)觸發(fā)多個(gè)計(jì)時(shí)器時(shí),例如在窗口操作中。當(dāng)處理單個(gè)輸入記錄時(shí),系統(tǒng)被阻止等待多個(gè)網(wǎng)絡(luò)緩沖區(qū)可用性時(shí),可能會(huì)出現(xiàn)第二種問(wèn)題。Flink不能中斷單個(gè)輸入記錄的處理,未對(duì)齊的檢查點(diǎn)必須等待當(dāng)前處理的記錄被完全處理。這可能在兩種情況下造成問(wèn)題。由于串行化了一個(gè)不適合單個(gè)網(wǎng)絡(luò)緩沖區(qū)的大記錄,或者在flatMap操作中,一個(gè)輸入記錄產(chǎn)生了多個(gè)輸出記錄。在這種情況下,背壓可以阻止未對(duì)齊的檢查點(diǎn),直到處理單個(gè)輸入記錄所需的所有網(wǎng)絡(luò)緩沖區(qū)都可用。在處理單個(gè)記錄需要一段時(shí)間的任何其他情況下,也可能發(fā)生這種情況。因此,檢查點(diǎn)的時(shí)間可能比預(yù)期的要長(zhǎng),也可能會(huì)有所不同。?

某些數(shù)據(jù)分布模式?jīng)]有檢查點(diǎn)

有一部分包含屬性的的連接無(wú)法與 Channel 中的數(shù)據(jù)一樣保存在 Checkpoint 中。為了保留這些特性并且確保沒(méi)有狀態(tài)沖突或 非預(yù)期的行為,非對(duì)齊 Checkpoint 對(duì)于這些類(lèi)型的連接是禁用的。所有其他的交換仍然執(zhí)行非對(duì)齊 Checkpoint。

?點(diǎn)對(duì)點(diǎn)連接

我們目前沒(méi)有任何對(duì)于點(diǎn)對(duì)點(diǎn)連接中有關(guān)數(shù)據(jù)有序性的強(qiáng)保證。然而,由于數(shù)據(jù)已經(jīng)被以前置的 Source 或是 KeyBy 相同的方式隱式 組織,一些用戶會(huì)依靠這種特性在提供的有序性保證的同時(shí)將計(jì)算敏感型的任務(wù)劃分為更小的塊。

只要并行度不變,非對(duì)齊 Checkpoint(UC) 將會(huì)保留這些特性。但是如果加上UC的伸縮容,這些特性將會(huì)被改變。

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

如果我們想將并行度從 p=2 擴(kuò)容到 p=3,那么需要根據(jù) KeyGroup 將 KeyBy 的 Channel 中的數(shù)據(jù)突然的劃分到3個(gè) Channel 中去。這 很容易做到,通過(guò)使用 Operator 的 KeyGroup 范圍和確定記錄屬于某個(gè) Key(group) 的方法(不管實(shí)際使用的是什么方法)。對(duì)于 Forward 的 Channel,我們根本沒(méi)有 KeyContext。Forward Channel 里也沒(méi)有任何記錄被分配了任何 KeyGroup;也無(wú)法計(jì)算它,因?yàn)闊o(wú)法保證 Key仍然存在。

廣播 Connections

廣播 Connection 帶來(lái)了另一個(gè)問(wèn)題。無(wú)法保證所有 Channel 中的記錄都以相同的速率被消費(fèi)。這可能導(dǎo)致某些 Task 已經(jīng)應(yīng)用了與 特定廣播事件對(duì)應(yīng)的狀態(tài)變更,而其他任務(wù)則沒(méi)有,如圖所示。

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)

廣播分區(qū)通常用于實(shí)現(xiàn)廣播狀態(tài),它應(yīng)該跨所有 Operator 都相同。Flink 實(shí)現(xiàn)廣播狀態(tài),通過(guò)僅 Checkpointing 有狀態(tài)算子的 SubTask 0 中狀態(tài)的單份副本。在恢復(fù)時(shí),我們將該份副本發(fā)往所有的 Operator。因此,可能會(huì)發(fā)生以下情況:某個(gè)算子將很快從它的 Checkpointed Channel 消費(fèi)數(shù)據(jù)并將修改應(yīng)有于記錄來(lái)獲得狀態(tài)。

Flink性能優(yōu)化小結(jié),flink,性能優(yōu)化,大數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-788792.html

到了這里,關(guān)于Flink性能優(yōu)化小結(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink 內(nèi)容分享(四):Fink原理、實(shí)戰(zhàn)與性能優(yōu)化(四)

    目錄 Transformations Sink 分區(qū)策略 Transformations算子可以將一個(gè)或者多個(gè)算子轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)流,使用Transformations算子組合可以處理復(fù)雜的業(yè)務(wù)處理。 Map DataStream → DataStream 遍歷數(shù)據(jù)流中的每一個(gè)元素,產(chǎn)生一個(gè)新的元素。 FlatMap DataStream → DataStream 遍歷數(shù)據(jù)流中的每一個(gè)元

    2024年02月03日
    瀏覽(55)
  • Flink CDC 2.3 發(fā)布,持續(xù)優(yōu)化性能,更多連接器支持增量快照,新增 Db2 支持

    Flink CDC 2.3 發(fā)布,持續(xù)優(yōu)化性能,更多連接器支持增量快照,新增 Db2 支持

    01 Flink CDC 簡(jiǎn)介 Flink CDC? [ 1] 是基于數(shù)據(jù)庫(kù)的日志 CDC 技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。配合 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),F(xiàn)link CDC 可以高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)集成。 作為新一代的實(shí)時(shí)數(shù)據(jù)集成框架,F(xiàn)link CDC 具有全增量一體化、無(wú)鎖讀取、并行讀

    2024年02月01日
    瀏覽(22)
  • 大數(shù)據(jù)Flink(五十二):Flink中的批和流以及性能比較

    大數(shù)據(jù)Flink(五十二):Flink中的批和流以及性能比較

    文章目錄 Flink中的批和流以及性能比較 ??????????????一、Flink中的批和流

    2024年02月15日
    瀏覽(15)
  • Flink實(shí)時(shí)大數(shù)據(jù)處理性能測(cè)試

    Flink是一個(gè)開(kāi)源的流處理框架,用于實(shí)時(shí)大數(shù)據(jù)處理。它可以處理大量數(shù)據(jù),提供低延遲和高吞吐量。Flink的性能測(cè)試是一項(xiàng)重要的任務(wù),可以幫助我們了解其在實(shí)際應(yīng)用中的表現(xiàn)。在本文中,我們將討論Flink實(shí)時(shí)大數(shù)據(jù)處理性能測(cè)試的背景、核心概念、算法原理、代碼實(shí)例、

    2024年03月18日
    瀏覽(27)
  • 實(shí)時(shí)Flink的數(shù)據(jù)庫(kù)與Kafka集成優(yōu)化案例

    在現(xiàn)代數(shù)據(jù)處理系統(tǒng)中,實(shí)時(shí)數(shù)據(jù)處理和分析是至關(guān)重要的。Apache Flink是一個(gè)流處理框架,可以用于實(shí)時(shí)數(shù)據(jù)處理和分析。在許多場(chǎng)景下,F(xiàn)link需要與數(shù)據(jù)庫(kù)和Kafka等消息系統(tǒng)進(jìn)行集成,以實(shí)現(xiàn)更高效的數(shù)據(jù)處理。本文將討論Flink與數(shù)據(jù)庫(kù)和Kafka集成的優(yōu)化案例,并提供實(shí)際示

    2024年02月20日
    瀏覽(29)
  • 滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(下)

    滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(下)

    滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(二) 問(wèn)題分析 在 ODPS計(jì)算期間 或者 odps表同步到hbase表期間,發(fā)生了查詢,會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)誤。出現(xiàn)問(wèn)題的地方就是這兩個(gè)時(shí)間窗口:ODPS計(jì)算期間 和 odps表同步到hbase表期間。那就針對(duì)性分析,各個(gè)擊破。? 解決方案

    2024年03月27日
    瀏覽(37)
  • 【Flink精講】Flink性能調(diào)優(yōu):CPU核數(shù)與并行度

    【Flink精講】Flink性能調(diào)優(yōu):CPU核數(shù)與并行度

    提交任務(wù)命令: bin/flink run -t yarn-per-job -d -p 5 指定并行度 -Dyarn.application.queue=test 指定 yarn 隊(duì)列 -Djobmanager.memory.process.size=2048mb JM2~4G 足夠 -Dtaskmanager.memory.process.size=4096mb 單個(gè) TM2~8G 足夠 -Dtaskmanager.numberOfTaskSlots=2 與容器核數(shù) 1core: 1slot 或 2core: 1slot -c com.atguigu.flin

    2024年04月11日
    瀏覽(23)
  • Flink流批一體計(jì)算(7):Flink優(yōu)化

    目錄 配置內(nèi)存 設(shè)置并行度 操作場(chǎng)景 具體設(shè)置 補(bǔ)充 配置進(jìn)程參數(shù) 操作場(chǎng)景 具體配置 配置netty網(wǎng)絡(luò)通信 操作場(chǎng)景 具體配置 配置內(nèi)存 Flink 是依賴內(nèi)存計(jì)算,計(jì)算過(guò)程中內(nèi)存不夠?qū)?Flink 的執(zhí)行效率影響很大??梢酝ㄟ^(guò)監(jiān)控 GC ( Garbage Collection ),評(píng)估內(nèi)存使用及剩余情況來(lái)判

    2024年02月12日
    瀏覽(54)
  • Flink 內(nèi)容分享(十四):美團(tuán) Flink 資源調(diào)度優(yōu)化實(shí)踐

    Flink 內(nèi)容分享(十四):美團(tuán) Flink 資源調(diào)度優(yōu)化實(shí)踐

    目錄 相關(guān)背景和問(wèn)題 解決思路分析 資源調(diào)度優(yōu)化實(shí)踐 資源冗余申請(qǐng) 黑名單機(jī)制 故障節(jié)點(diǎn)感知策略 異常節(jié)點(diǎn)處理機(jī)制 規(guī)避慢節(jié)點(diǎn)場(chǎng)景 其他優(yōu)化 后續(xù)規(guī)劃 在計(jì)算規(guī)模方面,目前我們有 7w 多作業(yè),部署在 1.7w 臺(tái)機(jī)器上,高峰期流量達(dá)到每秒 9 億條。在部署方式上,目前我們

    2024年02月02日
    瀏覽(26)
  • 記一次Flink遇到性能瓶頸

    記一次Flink遇到性能瓶頸

    這周的主要時(shí)間花在Flink上面,做了一個(gè)簡(jiǎn)單的從文本文件中讀取數(shù)據(jù),然后存入數(shù)據(jù)庫(kù)的例子,能夠正常的實(shí)現(xiàn)功能,但是遇到個(gè)問(wèn)題,我有四臺(tái)機(jī)器,自己搭建了一個(gè)standalone的集群,不論我把并行度設(shè)置多少,跑起來(lái)的耗時(shí)都非常接近,實(shí)在是百思不得其解。機(jī)器多似乎

    2023年04月15日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包