序言
依據(jù)1.17.1 最新版本的內(nèi)容研究下期運作原理,總的來說其實就是設置一些參數(shù),這些參數(shù)就會影響到如何存儲checkpoint的問題.用起來沒什么難的,參數(shù)配置的組合到是挺多cuiyaonan2000@163.com
參考資料:
- Checkpointing | Apache Flink
- State Backends | Apache Flink
Checkpointing?#
Flink 中的每個方法或算子都能夠是有狀態(tài)的(閱讀?working with state?了解更多)。 狀態(tài)化的方法在處理單個 元素/事件 的時候存儲數(shù)據(jù),讓狀態(tài)成為使各個類型的算子更加精細的重要部分。 為了讓狀態(tài)容錯,F(xiàn)link 需要為狀態(tài)添加?checkpoint(檢查點)。Checkpoint 使得 Flink 能夠恢復狀態(tài)和在流中的位置,從而向應用提供和無故障執(zhí)行時一樣的語義。
容錯文檔?中介紹了 Flink 流計算容錯機制內(nèi)部的技術(shù)原理。
Flink 的 checkpoint 機制會和持久化存儲進行交互,讀寫流與狀態(tài)。一般需要:
- 一個能夠回放一段時間內(nèi)數(shù)據(jù)的持久化數(shù)據(jù)源,例如持久化消息隊列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系統(tǒng)(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
- 存放狀態(tài)的持久化存儲,通常為分布式文件系統(tǒng)(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
開啟與配置 Checkpoint?#
默認情況下 checkpoint 是禁用的。通過調(diào)用?StreamExecutionEnvironment
?的?enableCheckpointing(n)
?來啟用 checkpoint,里面的?n?是進行 checkpoint 的間隔,單位毫秒。----這個就是checkpoint的間隔時間,沒有最小時間好
另外一種是通過TableEnvironment 中的conf來開啟checkpoint
? ? ? ? ? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
? ? ? ? ? // TableEnvironment tableEnv = TableEnvironment.create(settings);
?? ??? ? ?Configuration configuration = tableEnv.getConfig().getConfiguration();
?? ??? ? ?configuration.setString("table.exec.mini-batch.enabled", "true");
?? ??? ? ?configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
?? ??? ? ?configuration.setString("table.exec.mini-batch.size", "50");
?? ??? ? ?configuration.setString("table.dml-sync", "false");
?? ??? ? ?configuration.setString("execution.checkpointing.interval", "3s");
這種方式可以設置更多的屬性
Checkpoint 其他的屬性包括:
-
精確一次(exactly-once)對比至少一次(at-least-once):你可以選擇向?
enableCheckpointing(long interval, CheckpointingMode mode)
?方法中傳入一個模式來選擇使用兩種保證等級中的哪一種。 對于大多數(shù)應用來說,精確一次是較好的選擇。至少一次可能與某些延遲超低(始終只有幾毫秒)的應用的關聯(lián)較大。 -
checkpoint 超時:如果 checkpoint 執(zhí)行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。---checkpoint超時就放棄上一個checkpoint,去執(zhí)行下一個,這樣子有可能永遠都不會執(zhí)行成功,因為每一個都超時cuiyaonan2000@163.com
-
checkpoints 之間的最小時間:該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設置為了?5000, 無論 checkpoint 持續(xù)時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒后會才開始下一個 checkpoint。
往往使用“checkpoints 之間的最小時間”來配置應用會比 checkpoint 間隔容易很多,因為“checkpoints 之間的最小時間”在 checkpoint 的執(zhí)行時間超過平均值時不會受到影響(例如如果目標的存儲系統(tǒng)忽然變得很慢)。----最小間隔貌似不會讓 checkpoint間隔失效,而是根據(jù)情況進行二選一,最小間隔不會關注checkpoin的執(zhí)行時間,及時超時了,也不會被拋棄cuiyaonan2000@163.com
注意這個值也意味著并發(fā) checkpoint 的數(shù)目是一。
-
checkpoint 可容忍連續(xù)失敗次數(shù):該屬性定義可容忍多少次連續(xù)的 checkpoint 失敗。超過這個閾值之后會觸發(fā)作業(yè)錯誤 fail over。 默認次數(shù)為“0”,這意味著不容忍 checkpoint 失敗,作業(yè)將在第一次 checkpoint 失敗時fail over。 可容忍的checkpoint失敗僅適用于下列情形:Job Manager的IOException,TaskManager做checkpoint時異步部分的失敗, checkpoint超時等。TaskManager做checkpoint時同步部分的失敗會直接觸發(fā)作業(yè)fail over。其它的checkpoint失?。ㄈ缫粋€checkpoint被另一個checkpoint包含)會被忽略掉。
-
并發(fā) checkpoint 的數(shù)目: 默認情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統(tǒng)不會觸發(fā)另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。 不過允許多個 checkpoint 并行進行是可行的,對于有確定的處理延遲(例如某方法所調(diào)用比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障后重跑的 pipelines 來說,是有意義的。
該選項不能和 “checkpoints 間的最小時間"同時使用。
-
externalized checkpoints: 你可以配置周期存儲 checkpoint 到外部系統(tǒng)中。Externalized checkpoints 將他們的元數(shù)據(jù)寫到持久化存儲上并且在 job 失敗的時候不會被自動刪除。 這種方式下,如果你的 job 失敗,你將會有一個現(xiàn)有的 checkpoint 去恢復。更多的細節(jié)請看?Externalized checkpoints 的部署文檔。
設置如上的配置代碼示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);
// 高級選項:
// 設置模式為精確一次 (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允許兩個連續(xù)的 checkpoint 錯誤
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,這樣 checkpoint 在作業(yè)取消后仍就會被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
State Backend?#
狀態(tài)內(nèi)部的存儲格式、狀態(tài)在 CheckPoint 時如何持久化以及持久化在哪里均取決于選擇的?State Backend。
Flink 的?checkpointing 機制?會將 timer 以及 stateful 的 operator 進行快照,然后存儲下來, 包括連接器(connectors),窗口(windows)以及任何用戶自定義的狀態(tài)。 Checkpoint 存儲在哪里取決于所配置的?State Backend(比如 JobManager memory、 file system、 database)。
默認情況下,狀態(tài)是保持在 TaskManagers 的內(nèi)存中,checkpoint 保存在 JobManager 的內(nèi)存中。為了合適地持久化大體量狀態(tài), Flink 支持各種各樣的途徑去存儲 checkpoint 狀態(tài)到其他的 state backends 上。通過?StreamExecutionEnvironment.setStateBackend(…)
?來配置所選的 state backends。
內(nèi)置的 State Backends?#
Flink 內(nèi)置了以下這些開箱即用的 state backends :
- HashMapStateBackend
- EmbeddedRocksDBStateBackend
如果不設置,默認使用 HashMapStateBackend
HashMapStateBackend?#
HashMapStateBackend
?是非??斓?,因為每個狀態(tài)的讀取和算子對于 objects 的更新都是在 Java 的 heap 上
在?HashMapStateBackend?內(nèi)部,數(shù)據(jù)以 Java 對象的形式存儲在堆中。 Key/value 形式的狀態(tài)和窗口算子會持有一個 hash table,其中存儲著狀態(tài)值、觸發(fā)器。
HashMapStateBackend 的適用場景:
- 有較大 state,較長 window 和較大 key/value 狀態(tài)的 Job。
- 所有的高可用場景。
建議同時將?managed memory?設為0,以保證將最大限度的內(nèi)存分配給 JVM 上的用戶代碼。
與 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 將數(shù)據(jù)以對象形式存儲在堆中,因此重用這些對象數(shù)據(jù)是不安全的。
EmbeddedRocksDBStateBackend?#
RocksDB
?可以根據(jù)可用的 disk 空間擴展,并且只有它支持增量 snapshot。
EmbeddedRocksDBStateBackend 將正在運行中的狀態(tài)數(shù)據(jù)保存在?RocksDB?數(shù)據(jù)庫中,RocksDB 數(shù)據(jù)庫默認將數(shù)據(jù)存儲在 TaskManager 的數(shù)據(jù)目錄。 不同于?HashMapStateBackend
?中的 java 對象,數(shù)據(jù)被以序列化字節(jié)數(shù)組的方式存儲,這種方式由序列化器決定,因此 key 之間的比較是以字節(jié)序的形式進行而不是使用 Java 的?hashCode
?或?equals()
?方法。
EmbeddedRocksDBStateBackend 的適用場景:
- 狀態(tài)非常大、窗口非常長、key/value 狀態(tài)非常大的 Job。
- 所有高可用的場景。
EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (見?這里)。
EmbeddedRocksDBStateBackend 將會使應用程序的最大吞吐量降低。 所有的讀寫都必須序列化、反序列化操作,這個比基于堆內(nèi)存的 state backend 的效率要低很多。 同時因為存在這些序列化、反序列化操作,重用放入 EmbeddedRocksDBStateBackend 的對象是安全的。
設置
你能在?flink-conf.yaml?中為所有 Job 設置其他默認的 State Backend。?
也可以使用如下代碼為為每個任務設置State Backend;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
如果你想在 IDE 中使用?EmbeddedRocksDBStateBackend
,或者需要在作業(yè)中通過編程方式動態(tài)配置它,必須添加以下依賴到 Flink 項目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
flink-conf.yaml
?相關配置
可選值包括:
- ?jobmanager?(HashMapStateBackend),?
- rocksdb?(EmbeddedRocksDBStateBackend),
- 或使用實現(xiàn)了 state backend 工廠?StateBackendFactory?的類的全限定類名, 例如: EmbeddedRocksDBStateBackend 對應為?
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
。
state.checkpoints.dir
?選項指定了所有 State Backend 寫 CheckPoint 數(shù)據(jù)和寫元數(shù)據(jù)文件的目錄。 你能在?這里?找到關于 CheckPoint 目錄結(jié)構(gòu)的詳細信息
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
增量快照
RocksDB 支持增量快照。不同于產(chǎn)生一個包含所有數(shù)據(jù)的全量備份,增量快照中只包含自上一次快照完成之后被修改的記錄,因此可以顯著減少快照完成的耗時。
一個增量快照是基于(通常多個)前序快照構(gòu)建的(相當于是現(xiàn)有1個全量快照,后面跟了幾個增量快照來進行數(shù)據(jù)的回復cuiyaonan2000@163.com)
增量快照會造成重復的數(shù)據(jù),因此checkpoin的文件會變大,會更多的占有網(wǎng)絡,但是在計算恢復的時候會變快.
設置
雖然狀態(tài)數(shù)據(jù)量很大時我們推薦使用增量快照,但這并不是默認的快照機制,您需要通過下述配置手動開啟該功能:文章來源:http://www.zghlxwxcb.cn/news/detail-522402.html
- 在?
flink-conf.yaml
?中設置:state.backend.incremental: true
?或者 - 在代碼中按照右側(cè)方式配置(來覆蓋默認配置):
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
需要注意的是,一旦啟用了增量快照,網(wǎng)頁上展示的?Checkpointed Data Size
?只代表增量上傳的數(shù)據(jù)量,而不是一次快照的完整數(shù)據(jù)量。文章來源地址http://www.zghlxwxcb.cn/news/detail-522402.html
到了這里,關于Flink: checkPoint的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!