問題出現(xiàn)場景
為了評估一個 Flink 程序的處理效果,我使用本地模式啟動了 Flink 程序,并在上游表中一次性插入了大量數(shù)據(jù)(大概相當于線上單個并發(fā) 4 - 5 分鐘的最大處理量),以觸發(fā)計算。
但是,在本地計算中,一直無法計算完成,觀察后發(fā)現(xiàn)任務在被重復計算,進而發(fā)現(xiàn) Flink 在不斷從 checkpoint 恢復。在日志中搜索 checkpoint,發(fā)現(xiàn)如下報錯信息:
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1934) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1906) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:96) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1990) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_391]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_391]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_391]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_391]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_391]
該報錯大概每 10 分鐘左右出現(xiàn)一次,具體如下:
- 11:30:44 - 啟動 Flink 程序
- 11:41:17 - 第一次報錯(在啟動后 10 分鐘 33 秒)
- 11:51:48 - 第二次報錯(在上次報錯后 10 分鐘 31 秒)
- 12:02:19 - 第三次報錯(在上次報錯后 10 分鐘 31 秒)
- ……
問題原因
通過定位,發(fā)現(xiàn)是 checkpoint 超時報錯。Flink 的 checkpoint 的超時時間時 600 秒,但是這個任務需要 11 分鐘才能完成。本地之所以比線上慢,一方面是因為本地增加了一部分新的邏輯;另一方面也可能是因為線上運行時,對 MySQL 請求時走的是內網(wǎng)請求,而本地運行走的是外網(wǎng)請求。
Flink 的 checkpoint 超時時間默認值的源碼位置如下:文章來源:http://www.zghlxwxcb.cn/news/detail-860868.html
// org.apache.flink.streaming.api.environment.CheckpointConfig public class CheckpointConfig implements Serializable { public CheckpointConfig() { this.checkpointingMode = DEFAULT_MODE; this.checkpointInterval = -1L; this.checkpointTimeout = 600000L; this.minPauseBetweenCheckpoints = 0L; this.maxConcurrentCheckpoints = 1; this.alignmentTimeout = (Duration)ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue(); this.failOnCheckpointingErrors = true; this.preferCheckpointForRecovery = false; this.tolerableCheckpointFailureNumber = -1; }
解決方法
在本地測試時,如需較大數(shù)據(jù)量測試,顯式地設置 checkpoint 超時時間即可:文章來源地址http://www.zghlxwxcb.cn/news/detail-860868.html
env.getCheckpointConfig().setCheckpointTimeout(1200000);
到了這里,關于Flink|checkpoint 超時報錯問題處理(FlinkRuntimeException)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!