-
jvm內(nèi)存優(yōu)化
-
內(nèi)存優(yōu)化
-
netty優(yōu)化
-
akka優(yōu)化
-
并行度優(yōu)化
-
對(duì)象重用
-
checkpoint優(yōu)化
-
網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu)
-
狀態(tài)優(yōu)化
-
flink數(shù)據(jù)傾斜優(yōu)化
-
flink背壓
jvm內(nèi)存參數(shù)調(diào)優(yōu)
Flink是依賴內(nèi)存計(jì)算,計(jì)算過(guò)程中內(nèi)存不夠?qū)link的執(zhí)行效率影響很大。可以通過(guò)監(jiān)控GC(Garbage Collection),評(píng)估內(nèi)存使用及剩余情況來(lái)判斷內(nèi)存是否變成性能瓶頸,并根據(jù)情況優(yōu)化。
監(jiān)控節(jié)點(diǎn)進(jìn)程的YARN的Container GC日志,如果頻繁出現(xiàn)Full GC,需要優(yōu)化GC。
GC的配置:在客戶端的"conf/flink-conf.yaml"配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):
-Xloggc:<LOG_DIR>/gc.log
-XX:+PrintGCDetails
-XX:-OmitStackTraceInFastThrow
-XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=20
-XX:GCLogFileSize=20M
此處默認(rèn)已經(jīng)添加GC日志。
調(diào)整老年代和新生代的比值。在客戶端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):“-XX:NewRatio”。如“ -XX:NewRatio=2”,則表示老年代與新生代的比值為2:1,新生代占整個(gè)堆空間的1/3,老年代占2/3。
可以通過(guò)設(shè)置?jobmanager.memory.enable-jvm-direct-memory-limit
?對(duì) JobManager 進(jìn)程的?JVM 直接內(nèi)存進(jìn)行限制
Flink內(nèi)存調(diào)優(yōu)
flink進(jìn)程內(nèi)存
?jobmanager相關(guān)配置:
taskamanger相關(guān)配置:
yarn相關(guān)的配置:
yarn.appmaster.vcores YARN應(yīng)用程序主機(jī)使用的虛擬核心(vcore)的數(shù)量。
yarn.containers.vcores 每個(gè)YARN容器的虛擬核心數(shù)(vcore)。默認(rèn)情況下,vcore數(shù)設(shè)置為每個(gè)TaskManager的插槽數(shù)(如果已設(shè)置),否則設(shè)置為1。為了使用此參數(shù),您的群集必須啟用CPU調(diào)度。您可以通過(guò)設(shè)置來(lái)做到這一點(diǎn)org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。
yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores
Flink單個(gè)task manager的slot數(shù)量必須介于這兩個(gè)值之間
yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb
Flink的job manager 和task manager內(nèi)存不得超過(guò)container最大分配內(nèi)存大小。
yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)的2-3倍,如果設(shè)置過(guò)少,會(huì)導(dǎo)致CPU資源無(wú)法被充分利用,跑任務(wù)的時(shí)候CPU占用率不高。
netty優(yōu)化
Netty Shuffle環(huán)境
Network Communication (via Netty)
akka優(yōu)化
akka.ask.callstack ???
捕獲異步請(qǐng)求的調(diào)用堆棧。注意,如果有數(shù)百萬(wàn)個(gè)并發(fā)RPC調(diào)用,這可能會(huì)增加內(nèi)存占用。
akka.ask.timeout
用于期望并阻止Akka呼叫的超時(shí)。如果Flink由于超時(shí)而失敗,則應(yīng)嘗試增加此值。超時(shí)可能是由于計(jì)算機(jī)運(yùn)行緩慢或網(wǎng)絡(luò)擁塞引起的。超時(shí)值需要一個(gè)時(shí)間單位說(shuō)明符(ms / s / min / h / d)。
akka.client-socket-worker-pool.pool-size-factor
池大小因子使用以下公式確定線程池大小:ceil(available processors * factor)。然后,結(jié)果大小受pool-size-min和pool-size-max值限制。
akka.client-socket-worker-pool.pool-size-max??
要限制基于因素的最大線程數(shù)。
akka.client-socket-worker-pool.pool-size-min??
最小線程數(shù)以上限為基礎(chǔ)。
akka.client.timeout 60s
客戶端上所有阻塞呼叫的超時(shí)。
akka.fork-join-executor.parallelism-factor??
并行度因子用于通過(guò)以下公式確定線程池大?。篶eil(available processors * factor)。然后,所得到的大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max
最大線程數(shù)上限為基于因子的并行數(shù)。
akka.fork-join-executor.parallelism-min??
最小線程數(shù)以基于因素的并行度為上限。
akka.framesize 10485760b(10MB)
在JobManager和TaskManager之間發(fā)送的消息的最大大小。如果Flink因消息超出此限制而失敗,則應(yīng)增加該限制。消息大小需要大小單位說(shuō)明符。
akka.fork-join-executor.parallelism-factor
并行度因子用于使用以下公式確定線程池大小:ceil(可用處理器*因子)。然后,結(jié)果大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max
基于并行度的最大線程數(shù)上限
akka.fork-join-executor.parallelism-min
基于并行度的最大線程數(shù)下限
akka.framesize
JobManager和TaskManager之間發(fā)送的最大消息大小。如果Flink失敗是因?yàn)橄⒊^(guò)了這個(gè)限制,那么您應(yīng)該增加它。消息大小需要大小單位說(shuō)明符。
akka.retry-gate-closed-for
遠(yuǎn)程連接斷開(kāi)后,閘門(mén)應(yīng)關(guān)閉幾毫秒。
akka.server-socket-worker-pool.pool-size-factor
池大小因子用于使用以下公式確定線程池大?。篶eil(可用處理器*因子)。然后,結(jié)果大小由池大小最小值和池大小最大值限定。
akka.server-socket-worker-pool.pool-size-max
基于上限因子的最大線程數(shù)。
akka.server-socket-worker-pool.pool-size-min
基于上限因子的最小線程數(shù)
akka.tcp.timeout
所有出站連接超時(shí)。如果由于網(wǎng)絡(luò)速度慢而在連接TaskManager時(shí)遇到問(wèn)題,則應(yīng)增加此值。
akka.startup-timeout
超時(shí)之后,遠(yuǎn)程組件的啟動(dòng)被視為失敗。
并行度優(yōu)化
當(dāng)分區(qū)導(dǎo)致數(shù)據(jù)傾斜時(shí),需要考慮優(yōu)化分區(qū)。避免非并行度操作,有些對(duì)DataStream的操作會(huì)導(dǎo)致無(wú)法并行,例如WindowAll。keyBy盡量不要使用String。
并行度控制任務(wù)的數(shù)量,影響操作后數(shù)據(jù)被切分成的塊數(shù)。調(diào)整并行度讓任務(wù)的數(shù)量和每個(gè)任務(wù)處理的數(shù)據(jù)與機(jī)器的處理能力達(dá)到最優(yōu)。查看CPU使用情況和內(nèi)存占用情況,當(dāng)任務(wù)和數(shù)據(jù)不是平均分布在各節(jié)點(diǎn),而是集中在個(gè)別節(jié)點(diǎn)時(shí),可以增大并行度使任務(wù)和數(shù)據(jù)更均勻的分布在各個(gè)節(jié)點(diǎn)。增加任務(wù)的并行度,充分利用集群機(jī)器的計(jì)算能力,一般并行度設(shè)置為集群CPU核數(shù)總和的2-3倍。
taskmanager個(gè)數(shù):
num_of_tm = ceil(parallelism / slot) 即并行度除以slot個(gè)數(shù),結(jié)果向上取整。
算子層面并行度設(shè)置:
通過(guò)調(diào)用setParallelism()方法來(lái)指定
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word?Count?Example");
執(zhí)行環(huán)境層次
Flink程序運(yùn)行在執(zhí)行環(huán)境中。執(zhí)行環(huán)境為所有執(zhí)行的算子、數(shù)據(jù)源、data sink定義了一個(gè)默認(rèn)的并行度。
執(zhí)行環(huán)境的默認(rèn)并行度可以通過(guò)調(diào)用setParallelism()方法指定。例如:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
????env.execute("Word?Count?Example");
客戶端層次
并行度可以在客戶端將job提交到Flink時(shí)設(shè)定。對(duì)于CLI客戶端,可以通過(guò)“-p”參數(shù)指定并行度。例如:./bin/flink run -p 10 ../examples/WordCount-java.jar
對(duì)象重用
對(duì)象重用的本質(zhì)就是在算子鏈中的下游算子使用上游對(duì)象的淺拷貝。若關(guān)閉對(duì)象重用,則必須經(jīng)過(guò)一輪序列化和反序列化,相當(dāng)于深拷貝,所以就不能100%地發(fā)揮算子鏈的優(yōu)化效果。
但正所謂魚(yú)與熊掌不可兼得,若啟用了對(duì)象重用,那么我們的業(yè)務(wù)代碼中必然不能出現(xiàn)以下兩種情況,以免造成混亂:
-
在下游修改上游發(fā)射的對(duì)象,或者上游存入其State中的對(duì)象;
-
同一條流直接對(duì)接多個(gè)處理邏輯(如stream.map(new AFunc())的同時(shí)還有stream.map(new BFunc()))。
總之,在enableObjectReuse()之前,需要謹(jǐn)慎評(píng)估業(yè)務(wù)代碼是否會(huì)帶來(lái)副作用。社區(qū)大佬David Anderson曾在Stack Overflow上給出了一個(gè)簡(jiǎn)單明晰的回答,可參見(jiàn)這里。
env.getConfig().enableObjectReuse();
當(dāng)調(diào)用了 enableObjectReuse 方法后, Flink 會(huì)把中間深拷貝的步驟都省略掉,SourceFunction 產(chǎn)生的數(shù)據(jù)直接作為 MapFunction 的輸入,可以減少 gc 壓力。但需要特別注意的是,這個(gè)方法不能隨便調(diào)用,必須要確保下游 Function 只有一種(也就是一個(gè)流只會(huì)被一個(gè)算子處理),或者下游的多個(gè) Function 均不會(huì)改變對(duì)象內(nèi)部的值。否則可能會(huì)有線程安全的問(wèn)題。
checkpoint優(yōu)化
監(jiān)控checkpoint?
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/monitoring/checkpoint_monitoring/
Checkpoint 時(shí)間間隔,需要根據(jù)業(yè)務(wù)場(chǎng)景對(duì)時(shí)效性的要求而定。如果時(shí)效性要求不高,可以設(shè)置到分鐘級(jí)別,比如5分鐘、10分鐘;如果對(duì)時(shí)效性要求很高,結(jié)合 flink ?控制頁(yè)面 Checkpoints 的Summary 中的 End to End Duration,通過(guò)最大值、最小值和平均值,合理設(shè)置時(shí)間間隔。注意,時(shí)間間隔需要比 End to End Duration 的時(shí)間要長(zhǎng),否則,可能會(huì)導(dǎo)致上一個(gè) checkpoint 沒(méi)結(jié)束,下一個(gè) checkpoint 已經(jīng)開(kāi)始。為了避免這一情況的發(fā)生,除了設(shè)置時(shí)間間隔,兩次 checkpoint 的最小時(shí)間間隔也可以起到作用,該配置決定在上一次 checkpoint 結(jié)束之后,至少等待多長(zhǎng)時(shí)間開(kāi)始下一次的 checkpoint。
設(shè)置原則:
-
Checkpoint 時(shí)間間隔不易過(guò)大。一般來(lái)說(shuō),Checkpoint 時(shí)間間隔越長(zhǎng),需要生產(chǎn)的 State 就越大。如此一來(lái),當(dāng)失敗恢復(fù)時(shí),需要更長(zhǎng)的追趕時(shí)間。
-
Checkpoint 時(shí)間間隔不易過(guò)小。如果 Checkpoint 時(shí)間間隔太小,那么 Flink 應(yīng)用程序就會(huì)頻繁 Checkpoint,導(dǎo)致部分資源被占有,無(wú)法專(zhuān)注地進(jìn)行數(shù)據(jù)處理。
-
Checkpoint 時(shí)間間隔大于 Checkpoint 的生產(chǎn)時(shí)間。當(dāng) Checkpoint 時(shí)間間隔比 Checkpoint 生產(chǎn)時(shí)間長(zhǎng)時(shí),在上次 Checkpoint 完成時(shí),不會(huì)立刻進(jìn)行下一次 Checkpoint,而是會(huì)等待一段時(shí)間,之后再進(jìn)行新的 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立即開(kāi)始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源被 Checkpoint 占用,而真正任務(wù)計(jì)算的資源就會(huì)變少。
-
開(kāi)啟本地恢復(fù)。如果 Flink State 很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)上讀取 State 進(jìn)行恢復(fù),如果 State 文件過(guò)大,此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,大量的時(shí)間浪費(fèi)在網(wǎng)絡(luò)傳輸方面。此時(shí)可以設(shè)置 Flink 應(yīng)用程序本地 State 恢復(fù),應(yīng)用程序 State 本地恢復(fù)默認(rèn)沒(méi)有開(kāi)啟,可以設(shè)置參數(shù) state.backend.local-recovery 值為 true 進(jìn)行激活。
-
設(shè)置 Checkpoint 保存數(shù)。Checkpoint 保存數(shù)默認(rèn)是 1,也就是只保存最新的 Checkpoint 的 State 文件,當(dāng)進(jìn)行 State 恢復(fù)時(shí),如果最新的 Checkpoint 文件不可用時(shí) (比如文件損壞或者其他原因),那么 State 恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù) 3,即使最新的 Checkpoint 恢復(fù)失敗,那么 Flink 也會(huì)回滾到上一次 Checkpoint 的狀態(tài)文件進(jìn)行恢復(fù)??紤]到這種情況,可以通過(guò) state.checkpoints.num-retained 設(shè)置 Checkpoint 保存數(shù)。
// 使? RocksDBStateBackend 做為狀態(tài)后端,并開(kāi)啟增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 開(kāi)啟 Checkpoint,間隔為 1 分鐘
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小間隔 2 分鐘
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2))
// 超時(shí)時(shí)間 10 分鐘
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
配置 task 本地恢復(fù)
Task 本地恢復(fù)?默認(rèn)禁用,可以通過(guò) Flink 的?CheckpointingOptions.LOCAL_RECOVERY?配置中指定的鍵 state.backend.local-recovery 來(lái)啟用。此設(shè)置的值可以是?true?以啟用或?false(默認(rèn))以禁用本地恢復(fù)。
注意,unaligned checkpoints 目前不支持 task 本地恢復(fù)。
參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)
參考官網(wǎng):
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/large_state_tuning/
網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu)
緩沖消脹機(jī)制(Buffer Debloating)
緩沖消脹機(jī)制嘗試通過(guò)自動(dòng)調(diào)整緩沖數(shù)據(jù)量到一個(gè)合理值來(lái)解決這個(gè)問(wèn)題。
緩沖消脹功能計(jì)算 subtask 可能達(dá)到的最大吞吐(始終保持繁忙狀態(tài)時(shí))并且通過(guò)調(diào)整緩沖數(shù)據(jù)量來(lái)使得數(shù)據(jù)的消費(fèi)時(shí)間達(dá)到配置值。
可以通過(guò)設(shè)置?t
askmanager.network.memory.buffer-debloat.enabled
?為?true
?來(lái)開(kāi)啟緩沖消脹機(jī)制。通過(guò)設(shè)置?taskmanager.network.memory.buffer-debloat.target
?為?duration
?類(lèi)型的值來(lái)指定消費(fèi)緩沖數(shù)據(jù)的目標(biāo)時(shí)間。默認(rèn)值應(yīng)該能滿足大多數(shù)場(chǎng)景。
這個(gè)功能使用過(guò)去的吞吐數(shù)據(jù)來(lái)預(yù)測(cè)消費(fèi)剩余緩沖數(shù)據(jù)的時(shí)間。如果預(yù)測(cè)不準(zhǔn),緩沖消脹機(jī)制會(huì)導(dǎo)致以下問(wèn)題:
-
沒(méi)有足夠的緩存數(shù)據(jù)來(lái)提供全量吞吐。
-
有太多緩沖數(shù)據(jù)對(duì) checkpoint barrier 推進(jìn)或者非對(duì)齊的 checkpoint 的大小造成不良影響。
如果您的作業(yè)負(fù)載經(jīng)常變化(即,突如其來(lái)的數(shù)據(jù)尖峰,定期的窗口聚合觸發(fā)或者 join ),您可能需要調(diào)整以下設(shè)置:
-
taskmanager.network.memory.buffer-debloat.period:這是緩沖區(qū)大小重算的最小時(shí)間周期。周期越小,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是必要的計(jì)算會(huì)消耗更多的CPU。
-
taskmanager.network.memory.buffer-debloat.samples:調(diào)整用于計(jì)算平均吞吐量的采樣數(shù)。采集樣本的頻率可以通過(guò) taskmanager.network.memory.buffer-debloat.period 來(lái)設(shè)置。樣本數(shù)越少,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是當(dāng)吞吐量突然飆升或者下降時(shí),緩沖消脹機(jī)制計(jì)算的最佳緩沖數(shù)據(jù)量會(huì)更容易出錯(cuò)。
-
taskmanager.network.memory.buffer-debloat.threshold-percentages:防止緩沖區(qū)大小頻繁改變的優(yōu)化(比如,新的大小跟舊的大小相差不大)。
您可以使用以下指標(biāo)來(lái)監(jiān)控當(dāng)前的緩沖區(qū)大?。?/p>
-
estimatedTimeToConsumeBuffersMs
:消費(fèi)所有輸入通道(input channel)中數(shù)據(jù)的總時(shí)間。 -
debloatedBufferSize
:當(dāng)前的緩沖區(qū)大小。
限制
多個(gè)輸入和合并
當(dāng)前,吞吐計(jì)算和緩沖消脹發(fā)生在 subtask 層面。
如果您的 subtask 有很多不同的輸入或者有一個(gè)合并的輸入,緩沖消脹可能會(huì)導(dǎo)致低吞吐的輸入有太多緩沖數(shù)據(jù),而高吞吐輸入的緩沖區(qū)數(shù)量可能太少而不夠維持當(dāng)前吞吐。當(dāng)不同的輸入吞吐差別比較大時(shí),這種現(xiàn)象會(huì)更加的明顯。我們推薦您在測(cè)試這個(gè)功能時(shí)重點(diǎn)關(guān)注這種 subtask。
緩沖區(qū)的尺寸和個(gè)數(shù)
當(dāng)前,緩沖消脹僅在使用的緩沖區(qū)大小上設(shè)置上限。實(shí)際的緩沖區(qū)大小和個(gè)數(shù)保持不變。這意味著緩沖消脹機(jī)制不會(huì)減少作業(yè)的內(nèi)存使用。您應(yīng)該手動(dòng)減少緩沖區(qū)的大小或者個(gè)數(shù)。
此外,如果您想減少緩沖數(shù)據(jù)量使其低于緩沖消脹當(dāng)前允許的量,您可能需要手動(dòng)的設(shè)置緩沖區(qū)的個(gè)數(shù)。
高并行度
目前,使用默認(rèn)配置,緩沖區(qū)去塊機(jī)制可能無(wú)法在高并行度(約200以上)下正確執(zhí)行。如果您觀察到吞吐量降低或檢查點(diǎn)時(shí)間高于預(yù)期,我們建議將浮動(dòng)緩沖區(qū)(taskmanager.network.memory.foating buffers per gate)的數(shù)量從默認(rèn)值增加到至少等于并行度的數(shù)量。
發(fā)生問(wèn)題的并行度的實(shí)際值因作業(yè)而異,但通常應(yīng)該超過(guò)幾百。
網(wǎng)絡(luò)緩沖生命周期?
Flink 有多個(gè)本地緩沖區(qū)池 —— 每個(gè)輸出和輸入流對(duì)應(yīng)一個(gè)。每個(gè)緩沖區(qū)池的大小被限制為
channels?*?taskmanager.network.memory.buffers-per-channel?+?taskmanager.network.memory.floating-buffers-per-gate
緩沖區(qū)的大小可以通過(guò)?taskmanager.memory.segment-size
?來(lái)設(shè)置。
輸入網(wǎng)絡(luò)緩沖?
輸入通道中的緩沖區(qū)被分為獨(dú)占緩沖區(qū)(exclusive buffer)和流動(dòng)緩沖區(qū)(floating buffer)。每個(gè)獨(dú)占緩沖區(qū)只能被一個(gè)特定的通道使用。一個(gè)通道可以從輸入流的共享緩沖區(qū)池中申請(qǐng)額外的流動(dòng)緩沖區(qū)。剩余的流動(dòng)緩沖區(qū)是可選的并且只有資源足夠的時(shí)候才能獲取。
在初始階段:
-
Flink 會(huì)為每一個(gè)輸入通道獲取配置數(shù)量的獨(dú)占緩沖區(qū)。
-
所有的獨(dú)占緩沖區(qū)都必須被滿足,否則作業(yè)會(huì)拋異常失敗。
-
Flink 至少要有一個(gè)流動(dòng)緩沖區(qū)才能運(yùn)行。
輸出網(wǎng)絡(luò)緩沖?
不像輸入緩沖區(qū)池,輸出緩沖區(qū)池只有一種類(lèi)型的緩沖區(qū)被所有的 subpartitions 共享。
為了避免過(guò)多的數(shù)據(jù)傾斜,每個(gè) subpartition 的緩沖區(qū)數(shù)量可以通過(guò)?taskmanager.network.memory.max-buffers-per-channel
?來(lái)限制。
不同于輸入緩沖區(qū)池,這里配置的獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)只被當(dāng)作推薦值。如果沒(méi)有足夠的緩沖區(qū),每個(gè)輸出 subpartition 可以只使用一個(gè)獨(dú)占緩沖區(qū)而沒(méi)有流動(dòng)緩沖區(qū)。
透支緩沖區(qū)(Overdraft buffers)?
另外,每個(gè) subtask 輸出數(shù)據(jù)時(shí)可以至多請(qǐng)求?taskmanager.network.memory.max-overdraft-buffers-per-gate
?(默認(rèn) 5)個(gè)額外的透支緩沖區(qū)(overdraft buffers)。只有當(dāng)前 subtask 被下游 subtasks 反壓且當(dāng)前 subtask 需要 請(qǐng)求超過(guò) 1 個(gè)網(wǎng)絡(luò)緩沖區(qū)(network buffer)才能完成當(dāng)前的操作時(shí),透支緩沖區(qū)才會(huì)被使用??赡馨l(fā)生在以下情況:
-
序列化非常大的 records,不能放到單個(gè)網(wǎng)絡(luò)緩沖區(qū)中。
-
類(lèi)似 flatmap 的算子,即:處理單個(gè) record 時(shí)可能會(huì)生產(chǎn)多個(gè) records。
-
周期性地或某些事件觸發(fā)產(chǎn)生大量 records 的算子(例如:
WindowOperator
?的觸發(fā))。
在這些情況下,如果沒(méi)有透支緩沖區(qū),F(xiàn)link 的 subtask 線程會(huì)被阻塞在反壓,從而阻止例如 Unaligned Checkpoint 的完成。為了緩解這種情況,增加了透支緩沖區(qū)的概念。這些透支緩沖區(qū)是可選的,F(xiàn)link 可以僅僅使用常規(guī)的緩沖區(qū)逐漸取得進(jìn)展,也就是 說(shuō)?0
?是?taskmanager.network.memory.max-overdraft-buffers-per-gate
?可以接受的配置值。
緩沖區(qū)的數(shù)量?
獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)配置應(yīng)該足以應(yīng)對(duì)最大吞吐。如果想要最小化緩沖數(shù)據(jù)量,那么可以將獨(dú)占緩沖區(qū)設(shè)置為?0
,同時(shí)減小內(nèi)存段的大小。
選擇緩沖區(qū)的大小?
在往下游 subtask 發(fā)送數(shù)據(jù)部分時(shí),緩沖區(qū)通過(guò)匯集 record 來(lái)優(yōu)化網(wǎng)絡(luò)開(kāi)銷(xiāo)。下游 subtask 應(yīng)該在接收到完整的 record 后才開(kāi)始處理它。
如果緩沖區(qū)太小,或者緩沖區(qū)刷新太頻繁(execution.buffer-timeout配置參數(shù)),這可能會(huì)導(dǎo)致吞吐量降低,因?yàn)樵贔link的運(yùn)行時(shí),每個(gè)緩沖區(qū)的開(kāi)銷(xiāo)明顯高于每個(gè)記錄的開(kāi)銷(xiāo)。
根據(jù)經(jīng)驗(yàn),我們不建議考慮增加緩沖區(qū)大小或緩沖區(qū)超時(shí),除非您可以在實(shí)際工作負(fù)載中觀察到網(wǎng)絡(luò)瓶頸(下游操作員空閑、上游背壓、輸出緩沖區(qū)隊(duì)列已滿、下游輸入隊(duì)列為空)。
如果緩沖區(qū)太大,會(huì)導(dǎo)致:
-
內(nèi)存使用高
-
大量的 checkpoint 數(shù)據(jù)量(針對(duì)非對(duì)齊的 checkpoints)
-
漫長(zhǎng)的 checkpoint 時(shí)間(針對(duì)對(duì)齊的 checkpoints)
-
execution.buffer-timeout
?較小時(shí)內(nèi)存分配使用率會(huì)比較低,因?yàn)榫彌_區(qū)還沒(méi)被塞滿數(shù)據(jù)就被發(fā)送下去了。
選擇緩沖區(qū)的數(shù)量?
緩沖區(qū)的數(shù)量是通過(guò)?taskmanager.network.memory.buffers-per-channel
?和?taskmanager.network.memory.floating-buffers-per-gate
?來(lái)配置的。
為了最好的吞吐率,我們建議使用獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)值(except you have one of limit cases)。如果緩沖數(shù)據(jù)量存在問(wèn)題,更建議打開(kāi)緩沖消脹。
您可以人工地調(diào)整網(wǎng)絡(luò)緩沖區(qū)的個(gè)數(shù),但是需要注意:
-
您應(yīng)該根據(jù)期待的吞吐量(單位?
bytes/second
)來(lái)調(diào)整緩沖區(qū)的數(shù)量。協(xié)調(diào)數(shù)據(jù)傳輸量(大約兩個(gè)節(jié)點(diǎn)之間的兩個(gè)往返消息)。延遲也取決于您的網(wǎng)絡(luò)。
使用 buffer 往返時(shí)間(大概?1ms
?在正常的本地網(wǎng)絡(luò)中),緩沖區(qū)大小和期待的吞吐,您可以通過(guò)下面的公式計(jì)算維持吞吐所需要的緩沖區(qū)數(shù)量:
number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size
比如,期待吞吐為?320MB/s
,往返延遲為?1ms
,內(nèi)存段為默認(rèn)大小,為了維持吞吐需要使用10個(gè)活躍的緩沖區(qū):
number_of_buffers = 320MB/s * 1ms / 32KB = 10
-
流動(dòng)緩沖區(qū)的目的是為了處理數(shù)據(jù)傾斜。理想情況下,流動(dòng)緩沖區(qū)的數(shù)量(默認(rèn)8個(gè))和每個(gè)通道獨(dú)占緩沖區(qū)的數(shù)量(默認(rèn)2個(gè))能夠使網(wǎng)絡(luò)吞吐量飽和。但這并不總是可行和必要的。所有 subtask 中只有一個(gè)通道被使用也是非常罕見(jiàn)的。
-
獨(dú)占緩沖區(qū)的目的是提供一個(gè)流暢的吞吐量。當(dāng)一個(gè)緩沖區(qū)在傳輸數(shù)據(jù)時(shí),另一個(gè)緩沖區(qū)被填充。當(dāng)吞吐量比較高時(shí),獨(dú)占緩沖區(qū)的數(shù)量是決定 Flink 中緩沖數(shù)據(jù)的主要因素。
當(dāng)?shù)屯掏铝肯鲁霈F(xiàn)反壓時(shí),您應(yīng)該考慮減少獨(dú)占緩沖區(qū)。
總結(jié):
可以通過(guò)開(kāi)啟緩沖消脹機(jī)制來(lái)簡(jiǎn)化 Flink 網(wǎng)絡(luò)的內(nèi)存配置調(diào)整。您也可能需要調(diào)整它。
如果這不起作用,您可以關(guān)閉緩沖消脹機(jī)制并且人工地配置內(nèi)存段的大小和緩沖區(qū)個(gè)數(shù)。針對(duì)第二種場(chǎng)景,我們推薦:
-
使用默認(rèn)值以獲得最大吞吐
-
減少內(nèi)存段大小、獨(dú)占緩沖區(qū)的數(shù)量來(lái)加快 checkpoint 并減少網(wǎng)絡(luò)棧消耗的內(nèi)存量
flink狀態(tài)優(yōu)化
使用rocksdb狀態(tài)后端開(kāi)啟增量檢查點(diǎn)
Tuning MemTable
memtable 作為 LSM Tree 體系里的讀寫(xiě)緩存,對(duì)寫(xiě)性能有較大的影響。以下是一些值得注意的參數(shù)。為方便對(duì)比,下文都會(huì)將 RocksDB 的原始參數(shù)名與 Flink 配置中的參數(shù)名一并列出,用豎線分割。
-
write_buffer_size | state.backend.rocksdb.writebuffer.size單個(gè) memtable 的大小,默認(rèn)是64MB。當(dāng) memtable 大小達(dá)到此閾值時(shí),就會(huì)被標(biāo)記為不可變。一般來(lái)講,適當(dāng)增大這個(gè)參數(shù)可以減小寫(xiě)放大帶來(lái)的影響,但同時(shí)會(huì)增大 flush 后 L0、L1 層的壓力,所以還需要配合修改 compaction 參數(shù),后面再提。
-
max_write_buffer_number | state.backend.rocksdb.writebuffer.countmemtable?的最大數(shù)量(包含活躍的和不可變的),默認(rèn)是2。當(dāng)全部 memtable 都寫(xiě)滿但是 flush 速度較慢時(shí),就會(huì)造成寫(xiě)停頓,所以如果內(nèi)存充足或者使用的是機(jī)械硬盤(pán),建議適當(dāng)調(diào)大這個(gè)參數(shù),如4。
-
min_write_buffer_number_to_merge | state.backend.rocksdb.writebuffer.number-to-merge在 flush 發(fā)生之前被合并的 memtable 最小數(shù)量,默認(rèn)是1。舉個(gè)例子,如果此參數(shù)設(shè)為2,那么當(dāng)有至少兩個(gè)不可變 memtable 時(shí),才有可能觸發(fā) flush(亦即如果只有一個(gè)不可變 memtable,就會(huì)等待)。調(diào)大這個(gè)值的好處是可以使更多的更改在 flush 前就被合并,降低寫(xiě)放大,但同時(shí)又可能增加讀放大,因?yàn)樽x取數(shù)據(jù)時(shí)要檢查的 memtable 變多了。經(jīng)測(cè)試,該參數(shù)設(shè)為2或3相對(duì)較好。
Tuning Block/Block Cache
block 是 sstable 的基本存儲(chǔ)單位。block cache 則扮演讀緩存的角色,采用 LRU 算法存儲(chǔ)最近使用的 block,對(duì)讀性能有較大的影響。
-
block_size | state.backend.rocksdb.block.blocksizeblock 的大小,默認(rèn)值為4KB。在生產(chǎn)環(huán)境中總是會(huì)適當(dāng)調(diào)大一些,一般32KB比較合適,對(duì)于機(jī)械硬盤(pán)可以再增大到128~256KB,充分利用其順序讀取能力。但是需要注意,如果 block 大小增大而 block cache 大小不變,那么緩存的 block 數(shù)量會(huì)減少,無(wú)形中會(huì)增加讀放大。
-
block_cache_size | state.backend.rocksdb.block.cache-sizeblock cache 的大小,默認(rèn)為8MB。由上文所述的讀寫(xiě)流程可知,較大的 block cache 可以有效避免熱數(shù)據(jù)的讀請(qǐng)求落到 sstable 上,所以若內(nèi)存余量充足,建議設(shè)置到128MB甚至256MB,讀性能會(huì)有非常明顯的提升。
Tuning Compaction
compaction 在所有基于 LSM Tree 的存儲(chǔ)引擎中都是開(kāi)銷(xiāo)最大的操作,弄不好的話會(huì)非常容易阻塞讀寫(xiě)。建議看官先讀讀前面那篇關(guān)于 RocksDB 的 compaction 策略的文章,獲取一些背景知識(shí),這里不再贅述。
-
compaction_style | state.backend.rocksdb.compaction.stylecompaction 算法,使用默認(rèn)的 LEVEL(即 leveled compaction)即可,下面的參數(shù)也是基于此。
-
target_file_size_base | state.backend.rocksdb.compaction.level.target-file-size-baseL1層單個(gè) sstable 文件的大小閾值,默認(rèn)值為64MB。每向上提升一級(jí),閾值會(huì)乘以因子 target_file_size_multiplier(但默認(rèn)為1,即每級(jí)sstable最大都是相同的)。顯然,增大此值可以降低 compaction 的頻率,減少寫(xiě)放大,但是也會(huì)造成舊數(shù)據(jù)無(wú)法及時(shí)清理,從而增加讀放大。此參數(shù)不太容易調(diào)整,一般不建議設(shè)為256MB以上。
-
max_bytes_for_level_base | state.backend.rocksdb.compaction.level.max-size-level-baseL1層的數(shù)據(jù)總大小閾值,默認(rèn)值為256MB。每向上提升一級(jí),閾值會(huì)乘以因子 max_bytes_for_level_multiplier(默認(rèn)值為10)。由于上層的大小閾值都是以它為基礎(chǔ)推算出來(lái)的,所以要小心調(diào)整。建議設(shè)為 target_file_size_base 的倍數(shù),且不能太小,例如5~10倍。
-
level_compaction_dynamic_level_bytes | state.backend.rocksdb.compaction.level.use-dynamic-size這個(gè)參數(shù)之前講過(guò)。當(dāng)開(kāi)啟之后,上述閾值的乘法因子會(huì)變成除法因子,能夠動(dòng)態(tài)調(diào)整每層的數(shù)據(jù)量閾值,使得較多的數(shù)據(jù)可以落在最高一層,能夠減少空間放大,整個(gè) LSM Tree 的結(jié)構(gòu)也會(huì)更穩(wěn)定。對(duì)于機(jī)械硬盤(pán)的環(huán)境,強(qiáng)烈建議開(kāi)啟。
Generic Parameters
-
max_open_files | state.backend.rocksdb.files.open顧名思義,是 RocksDB 實(shí)例能夠打開(kāi)的最大文件數(shù),默認(rèn)為-1,表示不限制。由于sstable的索引和布隆過(guò)濾器默認(rèn)都會(huì)駐留內(nèi)存,并占用文件描述符,所以如果此值太小,索引和布隆過(guò)濾器無(wú)法正常加載,就會(huì)嚴(yán)重拖累讀取性能。
-
max_background_compactions/max_background_flushes|state.backend.rocksdb.thread.num
后臺(tái)負(fù)責(zé) flush 和 compaction 的最大并發(fā)線程數(shù),默認(rèn)為1。注意 Flink 將這兩個(gè)參數(shù)合二為一處理(對(duì)應(yīng) DBOptions.setIncreaseParallelism() 方法),鑒于 flush 和 compaction 都是相對(duì)重的操作,如果 CPU 余量比較充足,建議調(diào)大,在我們的實(shí)踐中一般設(shè)為4。
參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)
flink數(shù)據(jù)傾斜優(yōu)化
?參考公眾號(hào)鏈接:flink 數(shù)據(jù)傾斜的常見(jiàn)處理方式
背壓優(yōu)化
?反壓監(jiān)控指標(biāo):
-
backPressureTimeMsPerSecond
,subtask 被反壓的時(shí)間 -
idleTimeMsPerSecond
,subtask 等待某類(lèi)處理的時(shí)間 -
busyTimeMsPerSecond
,subtask 實(shí)際工作時(shí)間 在任何時(shí)間點(diǎn),這三個(gè)指標(biāo)相加都約等于1000ms
。
web ui觀測(cè):
閑置的 tasks 為藍(lán)色,完全被反壓的 tasks 為黑色,完全繁忙的 tasks 被標(biāo)記為紅色。中間的所有值都表示為這三種顏色之間的過(guò)渡色。
Job Overview觀測(cè)反壓狀態(tài):
常見(jiàn)解決方式:
-
消除背壓源頭,通過(guò)優(yōu)化 Flink 作業(yè),通過(guò)調(diào)整 Flink 或 JVM 參數(shù),或是通過(guò)擴(kuò)容。
-
減少 Flink 作業(yè)中緩沖在 In-flight 數(shù)據(jù)的數(shù)據(jù)量。
-
啟用非對(duì)齊 Checkpoints。這些選項(xiàng)并不是互斥的,可以組合在一起。本文檔重點(diǎn)介紹后兩個(gè)選項(xiàng)。
-
禁用/合并算子鏈chain或者資源槽共享
-
先贊批,再寫(xiě)入(滿足實(shí)時(shí)性要求的情況下,異步 io + 熱緩存來(lái)優(yōu)化讀寫(xiě)性能
-
增加并行度,增加資源。checkpoint時(shí)長(zhǎng)合理設(shè)置
緩沖區(qū) Debloating
Flink 1.14 引入了一個(gè)新的工具,用于自動(dòng)控制在 Flink 算子/子任務(wù)之間緩沖的 In-flight 數(shù)據(jù)的數(shù)據(jù)量。緩沖區(qū) Debloating 機(jī) 制可以通過(guò)將屬性taskmanager.network.memory.buffer-debloat.enabled
設(shè)置為true
來(lái)啟用。
此特性對(duì)對(duì)齊和非對(duì)齊 Checkpoint 都生效,并且在這兩種情況下都能縮短 Checkpointing 的時(shí)間,不過(guò) Debloating 的效果對(duì)于 對(duì)齊 Checkpoint 最明顯。當(dāng)在非對(duì)齊 Checkpoint 情況下使用緩沖區(qū) Debloating 時(shí),額外的好處是 Checkpoint 大小會(huì)更小,并且恢復(fù)時(shí)間更快 (需要保存 和恢復(fù)的 In-flight 數(shù)據(jù)更少)。
非對(duì)齊checkpoint
從Flink 1.11開(kāi)始,Checkpoint 可以是非對(duì)齊的。Unaligned checkpoints 包含 In-flight 數(shù)據(jù)(例如,存儲(chǔ)在緩沖區(qū)中的數(shù)據(jù))作為 Checkpoint State的一部分,允許 Checkpoint Barrier 跨越這些緩沖區(qū)。因此, Checkpoint 時(shí)長(zhǎng)變得與當(dāng)前吞吐量無(wú)關(guān),因?yàn)?Checkpoint Barrier 實(shí)際上已經(jīng)不再嵌入到數(shù)據(jù)流當(dāng)中。
如果您的 Checkpointing 由于背壓導(dǎo)致周期非常的長(zhǎng),您應(yīng)該使用非對(duì)齊 Checkpoint。這樣,Checkpointing 時(shí)間基本上就與 端到端延遲無(wú)關(guān)。請(qǐng)注意,非對(duì)齊 Checkpointing 會(huì)增加狀態(tài)存儲(chǔ)的 I/O,因此當(dāng)狀態(tài)存儲(chǔ)的 I/O 是 整個(gè) Checkpointing 過(guò)程當(dāng)中真 正的瓶頸時(shí),您不應(yīng)當(dāng)使用非對(duì)齊 Checkpointing。
為了啟用非對(duì)齊 Checkpoint,您可以:
// 啟用非對(duì)齊 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
限制?
并發(fā) Checkpoint?
Flink 當(dāng)前并不支持并發(fā)的非對(duì)齊 Checkpoint。然而,由于更可預(yù)測(cè)的和更短的 Checkpointing 時(shí)長(zhǎng),可能也根本就不需要并發(fā)的 Checkpoint。此外,Savepoint 也不能與非對(duì)齊 Checkpoint 同時(shí)發(fā)生,因此它們將會(huì)花費(fèi)稍長(zhǎng)的時(shí)間。
與 Watermark 的相互影響?
非對(duì)齊 Checkpoint 在恢復(fù)的過(guò)程中改變了關(guān)于 Watermark 的一個(gè)隱式保證。目前,F(xiàn)link 確保了 Watermark 作為恢復(fù)的第一步, 而不是將最近的 Watermark 存放在 Operator 中,以方便擴(kuò)縮容。在非對(duì)齊 Checkpoint 中,這意味著當(dāng)恢復(fù)時(shí),F(xiàn)link 會(huì)在恢復(fù) In-flight 數(shù)據(jù)后再生成 Watermark。如果您的 Pipeline 中使用了對(duì)每條記錄都應(yīng)用最新的 Watermark 的算子將會(huì)相對(duì)于 使用對(duì)齊 Checkpoint產(chǎn)生不同的結(jié)果。如果您的 Operator 依賴于最新的 Watermark 始終可用,解決辦法是將 Watermark 存放在 OperatorState 中。在這種情況下,Watermark 應(yīng)該使用單鍵 group 存放在 UnionState 以方便擴(kuò)縮容。
?與長(zhǎng)時(shí)間運(yùn)行的記錄處理相互作用
即使非對(duì)齊的檢查點(diǎn),但障礙物能夠超過(guò)隊(duì)列中的所有其他記錄。如果當(dāng)前記錄需要大量時(shí)間來(lái)處理,則該屏障的處理仍可能延遲。這種情況可能發(fā)生在同時(shí)觸發(fā)多個(gè)計(jì)時(shí)器時(shí),例如在窗口操作中。當(dāng)處理單個(gè)輸入記錄時(shí),系統(tǒng)被阻止等待多個(gè)網(wǎng)絡(luò)緩沖區(qū)可用性時(shí),可能會(huì)出現(xiàn)第二種問(wèn)題。Flink不能中斷單個(gè)輸入記錄的處理,未對(duì)齊的檢查點(diǎn)必須等待當(dāng)前處理的記錄被完全處理。這可能在兩種情況下造成問(wèn)題。由于串行化了一個(gè)不適合單個(gè)網(wǎng)絡(luò)緩沖區(qū)的大記錄,或者在flatMap操作中,一個(gè)輸入記錄產(chǎn)生了多個(gè)輸出記錄。在這種情況下,背壓可以阻止未對(duì)齊的檢查點(diǎn),直到處理單個(gè)輸入記錄所需的所有網(wǎng)絡(luò)緩沖區(qū)都可用。在處理單個(gè)記錄需要一段時(shí)間的任何其他情況下,也可能發(fā)生這種情況。因此,檢查點(diǎn)的時(shí)間可能比預(yù)期的要長(zhǎng),也可能會(huì)有所不同。?
某些數(shù)據(jù)分布模式?jīng)]有檢查點(diǎn)
有一部分包含屬性的的連接無(wú)法與 Channel 中的數(shù)據(jù)一樣保存在 Checkpoint 中。為了保留這些特性并且確保沒(méi)有狀態(tài)沖突或 非預(yù)期的行為,非對(duì)齊 Checkpoint 對(duì)于這些類(lèi)型的連接是禁用的。所有其他的交換仍然執(zhí)行非對(duì)齊 Checkpoint。
?點(diǎn)對(duì)點(diǎn)連接
我們目前沒(méi)有任何對(duì)于點(diǎn)對(duì)點(diǎn)連接中有關(guān)數(shù)據(jù)有序性的強(qiáng)保證。然而,由于數(shù)據(jù)已經(jīng)被以前置的 Source 或是 KeyBy 相同的方式隱式 組織,一些用戶會(huì)依靠這種特性在提供的有序性保證的同時(shí)將計(jì)算敏感型的任務(wù)劃分為更小的塊。
只要并行度不變,非對(duì)齊 Checkpoint(UC) 將會(huì)保留這些特性。但是如果加上UC的伸縮容,這些特性將會(huì)被改變。
如果我們想將并行度從 p=2 擴(kuò)容到 p=3,那么需要根據(jù) KeyGroup 將 KeyBy 的 Channel 中的數(shù)據(jù)突然的劃分到3個(gè) Channel 中去。這 很容易做到,通過(guò)使用 Operator 的 KeyGroup 范圍和確定記錄屬于某個(gè) Key(group) 的方法(不管實(shí)際使用的是什么方法)。對(duì)于 Forward 的 Channel,我們根本沒(méi)有 KeyContext。Forward Channel 里也沒(méi)有任何記錄被分配了任何 KeyGroup;也無(wú)法計(jì)算它,因?yàn)闊o(wú)法保證 Key仍然存在。
廣播 Connections
廣播 Connection 帶來(lái)了另一個(gè)問(wèn)題。無(wú)法保證所有 Channel 中的記錄都以相同的速率被消費(fèi)。這可能導(dǎo)致某些 Task 已經(jīng)應(yīng)用了與 特定廣播事件對(duì)應(yīng)的狀態(tài)變更,而其他任務(wù)則沒(méi)有,如圖所示。
廣播分區(qū)通常用于實(shí)現(xiàn)廣播狀態(tài),它應(yīng)該跨所有 Operator 都相同。Flink 實(shí)現(xiàn)廣播狀態(tài),通過(guò)僅 Checkpointing 有狀態(tài)算子的 SubTask 0 中狀態(tài)的單份副本。在恢復(fù)時(shí),我們將該份副本發(fā)往所有的 Operator。因此,可能會(huì)發(fā)生以下情況:某個(gè)算子將很快從它的 Checkpointed Channel 消費(fèi)數(shù)據(jù)并將修改應(yīng)有于記錄來(lái)獲得狀態(tài)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-788792.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-788792.html
到了這里,關(guān)于Flink性能優(yōu)化小結(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!