容錯(cuò)機(jī)制
容錯(cuò):指出錯(cuò)后不影響數(shù)據(jù)的繼續(xù)處理,并且恢復(fù)到出錯(cuò)前的狀態(tài)。
檢查點(diǎn):用存檔讀檔的方式,將之前的某個(gè)時(shí)間點(diǎn)的所有狀態(tài)保存下來,故障恢復(fù)繼續(xù)處理的結(jié)果應(yīng)該和發(fā)送故障前完全一致,這就是所謂的檢查點(diǎn)。
檢查點(diǎn)的控制節(jié)點(diǎn):jobManager里面的檢查點(diǎn)協(xié)調(diào)器,向source節(jié)點(diǎn)的數(shù)據(jù)插入barrier標(biāo)記。
檢查點(diǎn)的保存:
- 周期性觸發(fā)保存
- 保存的時(shí)間點(diǎn):所有算子恰好處理完一個(gè)相同的輸入數(shù)據(jù)時(shí)(使用Barrier機(jī)制)
檢查點(diǎn)分界線Barrier
barrier標(biāo)記表示這個(gè)標(biāo)記之前的所有數(shù)據(jù)已經(jīng)將狀態(tài)更改存入當(dāng)前檢查點(diǎn)。后續(xù)的算子節(jié)點(diǎn)只要遇到它就開始對狀態(tài)做持久化快照保存。在它之后對數(shù)據(jù)狀態(tài)的改變,只能保存到下一個(gè)檢查點(diǎn)中。
檢查點(diǎn)算法:Chandy-Lamport算法的一種變體。
算法兩個(gè)原則:
- 當(dāng)上游任務(wù)向多個(gè)并行下游任務(wù)發(fā)送barrier時(shí),需要廣播出去
- 而當(dāng)多個(gè)上游任務(wù)向同一個(gè)下游任務(wù)傳遞分界線時(shí),需要在下游任務(wù)執(zhí)行“分界線對齊”操作,也就是需要等到所有并行分區(qū)的barrier都到齊,才可以開始狀態(tài)的保存。
分界線對齊策略
-
精確一次(等待分界線2,先到的數(shù)據(jù)暫不進(jìn)行處理):處理多次的結(jié)果是一樣的
-
至少一次(對先到的數(shù)據(jù)進(jìn)行處理):檢查點(diǎn)中記錄了先到數(shù)據(jù)對狀態(tài)的更新信息,但是還未保存到狀態(tài)后端,如果此時(shí)發(fā)生故障進(jìn)行故障恢復(fù),會(huì)導(dǎo)致從source重復(fù)發(fā)送剛剛已經(jīng)處理過的先到數(shù)據(jù)。
分界線非對齊策略
- 非對齊策略只有精準(zhǔn)一次
- 缺點(diǎn)是需要將算子左邊,分界線右邊的所有數(shù)據(jù)存儲(chǔ)起來,增加內(nèi)存壓力。
檢查點(diǎn)配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//指定一致性語義
// checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//檢查點(diǎn)的存儲(chǔ)
//JobManagerCheckpointStorage:將檢查點(diǎn)存儲(chǔ)到JobManager的內(nèi)存中
//FileSystemCehckpointSotrage:將檢查點(diǎn)存儲(chǔ)到指定的文件系統(tǒng)中
checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop102:8020/flink/checkpoint"));
//狀態(tài)后端
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
//檢查點(diǎn)間隔
checkpointConfig.setCheckpointInterval(2000L);
//檢查點(diǎn)超時(shí)時(shí)間
checkpointConfig.setCheckpointTimeout(10000);
//同時(shí)存在的檢查點(diǎn)個(gè)數(shù)
checkpointConfig.setMaxConcurrentCheckpoints(1);
//兩次檢查點(diǎn)之間的間隔
checkpointConfig.setMinPauseBetweenCheckpoints(1000L);
//檢查點(diǎn)清理
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
//檢查點(diǎn)允許的失敗次數(shù)
checkpointConfig.setTolerableCheckpointFailureNumber(5);
//開啟非對齊模式:只有在精準(zhǔn)一次時(shí)才能開啟,且最大同時(shí)存在檢查點(diǎn)只能為1
checkpointConfig.enableUnalignedCheckpoints();
//對齊超時(shí),自動(dòng)開啟非對齊
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(5));
//最終檢查點(diǎn):
//開啟changlog
env.enableChangelogStateBackend(true);
通用增量changelog配置:hashmap本身不支持增量存儲(chǔ)狀態(tài),rockDB是支持的。changeLog可以不論hashmap還是rockDB,都實(shí)現(xiàn)增量存儲(chǔ)。開啟該配置可以減少檢查點(diǎn)的持續(xù)時(shí)間,在創(chuàng)建檢查點(diǎn)時(shí),只有changlog中的相關(guān)部分需要上傳。
- 創(chuàng)建更多的文件
- 殘留更多的文件
- 使用更多的IO來上傳狀態(tài)
- 占用更多的CPU資源來序列化狀態(tài)變更
保存點(diǎn)savepoint
檢查點(diǎn)與保存點(diǎn)的區(qū)別:文章來源:http://www.zghlxwxcb.cn/news/detail-758182.html
- 檢查點(diǎn)
- 檢查點(diǎn)是頻繁觸發(fā)的,設(shè)計(jì)目標(biāo)就是輕量和盡快恢復(fù)
- 檢查點(diǎn)的數(shù)據(jù)在作業(yè)終止后是否刪除可以配置
- 數(shù)據(jù)存儲(chǔ)格式可能是增量的
- 保存點(diǎn)
- 設(shè)計(jì)更側(cè)重于可移植和操作靈活性,即運(yùn)維
- 針對計(jì)劃中的,手動(dòng)的運(yùn)維
- 保存點(diǎn)在作業(yè)終止和恢復(fù)后都不會(huì)刪除
- 保存點(diǎn)的數(shù)據(jù)格式以狀態(tài)后端獨(dú)立的(標(biāo)準(zhǔn)的)數(shù)據(jù)格式存儲(chǔ)
保存點(diǎn)的用途:文章來源地址http://www.zghlxwxcb.cn/news/detail-758182.html
- 版本管理和歸檔存儲(chǔ)
- 更新Flink版本
- 更新應(yīng)用程序
- 調(diào)整并行度
保存點(diǎn)的使用之切換狀態(tài)后端
- 開啟flink集群
- 提交任務(wù) bin/flink run -d -c -Dstate.backend=hashmap 全類名 jar路徑
- 保存點(diǎn)的落盤: bin/flink -yid -type canonical yarn_id job_id hdfs://hadoop102/flink-savepoint
- 切換狀態(tài)后重啟:bin/flink run -d -Dstate.backend=rocksdb -s hdfs保存點(diǎn)路徑 全類名 jar包路徑
到了這里,關(guān)于Flink的容錯(cuò)機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!