啟用檢查點(diǎn)
開(kāi)啟自動(dòng)保存快照 (默認(rèn):關(guān)閉) :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒啟動(dòng)一次檢查點(diǎn)保存
env.enableCheckpointing(1000);
間隔調(diào)整 :
- 對(duì)性能的影響更小,就調(diào)大間隔時(shí)間
- 為了更好的容錯(cuò)性,就以調(diào)小間隔時(shí)間
檢查點(diǎn)存儲(chǔ)
檢查點(diǎn)存儲(chǔ) (CheckpointStorage) : 持久化存儲(chǔ)位置
- JobManager 的堆內(nèi)存 (JobManagerCheckpointStorage) : 默認(rèn)
- 文件系統(tǒng) (FileSystemCheckpointStorage) : 常用 , (HDFS , S3)
// 配置存儲(chǔ)檢查點(diǎn)到 JobManager 堆內(nèi)存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存儲(chǔ)檢查點(diǎn)到文件系統(tǒng)
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
通用增量
Rocksdb 狀態(tài)后端 : 啟用增量 checkpoint
- Flink 1.15 后 , HashMap , Rocksdb 都能開(kāi)啟通用的增量 checkpoint
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
增量 checkpoint 過(guò)程 :
- 帶狀態(tài)的算子任務(wù) , 將狀態(tài)更改 , 寫入變更日志(記錄狀態(tài))
- 狀態(tài)物化:狀態(tài)表定期保存,獨(dú)立于檢查點(diǎn)
- 狀態(tài)物化完成后,狀態(tài)變更日志 , 就截?cái)嗟较鄳?yīng)的點(diǎn)
注意點(diǎn) :
- HDFS : 文件數(shù)變多
- 上傳變更日志 : IO 寬帶較大
- 序列化狀態(tài)變更 : CPU 消耗較大
- 緩存狀態(tài)變更 : TaskManager 內(nèi)存消耗較大
- Checkpint 最大并發(fā) = 1
- Flink 1.15 , Memory 測(cè)試階段
- 不支持 NO_ClAIM 模式
配置文件 :
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存儲(chǔ) changelog 數(shù)據(jù)
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM
代碼配置 :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-changelog</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
// 開(kāi)啟changelog:
env.enableChangelogStateBackend(true);
最終檢查點(diǎn)
當(dāng)有界數(shù)據(jù) , 部分Task 完成 , Flink 1.14 后 , 它們依然能進(jìn)行檢查點(diǎn)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-495734.html
禁用 (Flink 1.15 后, 默認(rèn)啟用) :文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-495734.html
Configuration config = new Configuration();
// 禁用最終檢查點(diǎn)
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
配置建議
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取所有配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 檢查點(diǎn)模式 (CheckpointingMode) :
// 精確一次 : exactly-once (默認(rèn))
// 至少一次 : at-least-once (效率更高)
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最大并發(fā)檢查點(diǎn)數(shù)量(maxConcurrentCheckpoints):
// 檢查點(diǎn)最多有多少個(gè)
checkpointConfig.setMaxConcurrentCheckpoints(1)
// 啟用非對(duì)齊的檢查點(diǎn)保存
// 限制: CheckpointingMode= exctly-once , 并發(fā)的檢查點(diǎn) = 1
checkpointConfig.enableUnalignedCheckpoints();
// 默認(rèn): 0: 用非對(duì)齊的檢查點(diǎn)
// > 0: 用 對(duì)齊的檢查點(diǎn)(barrier對(duì)齊)
// 當(dāng)對(duì)齊時(shí)間 > 閾值, 為: 非對(duì)齊檢查點(diǎn)(barrier非對(duì)齊)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));
// 超時(shí)時(shí)間 (checkpointTimeout) :
// 檢查點(diǎn)保存的超時(shí)時(shí)間,當(dāng)超時(shí)就丟棄
// 單位 : 長(zhǎng)整型毫秒數(shù)
checkpointConfig.setCheckpointTimeout(60000)
//最小間隔時(shí)間 (minPauseBetweenCheckpoints):
// 上個(gè) checkpoint 完成后, 最快多久觸發(fā)另個(gè) checkpoint
checkpointConfig.setMinPauseBetweenCheckpoints(500)
// 開(kāi)啟檢查點(diǎn)的外部持久化
// DELETE_ON_CANCELLATION: 作業(yè)取消時(shí), 自動(dòng)刪除外部檢查點(diǎn),但作業(yè)失敗退出,就保留檢查點(diǎn)
// RETAIN_ON_CANCELLATION:作業(yè)取消時(shí), 也保留外部檢查點(diǎn)
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 檢查點(diǎn)異常時(shí), 是否整個(gè)任務(wù)失敗
// true : 失敗提出
// false: 丟棄, 并繼續(xù)運(yùn)行
checkpointConfig.setFailOnCheckpointingErrors(true)
到了這里,關(guān)于Flink 檢查點(diǎn)配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!