国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink: checkPoint

這篇具有很好參考價值的文章主要介紹了Flink: checkPoint。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

序言

依據(jù)1.17.1 最新版本的內(nèi)容研究下期運作原理,總的來說其實就是設置一些參數(shù),這些參數(shù)就會影響到如何存儲checkpoint的問題.用起來沒什么難的,參數(shù)配置的組合到是挺多cuiyaonan2000@163.com

參考資料:

  1. Checkpointing | Apache Flink
  2. State Backends | Apache Flink

Checkpointing?#

Flink 中的每個方法或算子都能夠是有狀態(tài)的(閱讀?working with state?了解更多)。 狀態(tài)化的方法在處理單個 元素/事件 的時候存儲數(shù)據(jù),讓狀態(tài)成為使各個類型的算子更加精細的重要部分。 為了讓狀態(tài)容錯,F(xiàn)link 需要為狀態(tài)添加?checkpoint(檢查點)。Checkpoint 使得 Flink 能夠恢復狀態(tài)和在流中的位置,從而向應用提供和無故障執(zhí)行時一樣的語義。

容錯文檔?中介紹了 Flink 流計算容錯機制內(nèi)部的技術(shù)原理。

Flink 的 checkpoint 機制會和持久化存儲進行交互,讀寫流與狀態(tài)。一般需要:

  • 一個能夠回放一段時間內(nèi)數(shù)據(jù)的持久化數(shù)據(jù)源,例如持久化消息隊列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系統(tǒng)(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
  • 存放狀態(tài)的持久化存儲,通常為分布式文件系統(tǒng)(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

開啟與配置 Checkpoint?#

默認情況下 checkpoint 是禁用的。通過調(diào)用?StreamExecutionEnvironment?的?enableCheckpointing(n)?來啟用 checkpoint,里面的?n?是進行 checkpoint 的間隔,單位毫秒。----這個就是checkpoint的間隔時間,沒有最小時間好

另外一種是通過TableEnvironment 中的conf來開啟checkpoint

? ? ? ? ? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
? ? ? ? ? // TableEnvironment tableEnv = TableEnvironment.create(settings);
?? ??? ? ?Configuration configuration = tableEnv.getConfig().getConfiguration();
?? ??? ? ?configuration.setString("table.exec.mini-batch.enabled", "true");
?? ??? ? ?configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
?? ??? ? ?configuration.setString("table.exec.mini-batch.size", "50");
?? ??? ? ?configuration.setString("table.dml-sync", "false");
?? ??? ? ?configuration.setString("execution.checkpointing.interval", "3s");
這種方式可以設置更多的屬性

Checkpoint 其他的屬性包括:

  • 精確一次(exactly-once)對比至少一次(at-least-once):你可以選擇向?enableCheckpointing(long interval, CheckpointingMode mode)?方法中傳入一個模式來選擇使用兩種保證等級中的哪一種。 對于大多數(shù)應用來說,精確一次是較好的選擇。至少一次可能與某些延遲超低(始終只有幾毫秒)的應用的關聯(lián)較大。

  • checkpoint 超時:如果 checkpoint 執(zhí)行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。---checkpoint超時就放棄上一個checkpoint,去執(zhí)行下一個,這樣子有可能永遠都不會執(zhí)行成功,因為每一個都超時cuiyaonan2000@163.com

  • checkpoints 之間的最小時間:該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設置為了?5000, 無論 checkpoint 持續(xù)時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒后會才開始下一個 checkpoint。

    往往使用“checkpoints 之間的最小時間”來配置應用會比 checkpoint 間隔容易很多,因為“checkpoints 之間的最小時間”在 checkpoint 的執(zhí)行時間超過平均值時不會受到影響(例如如果目標的存儲系統(tǒng)忽然變得很慢)。----最小間隔貌似不會讓 checkpoint間隔失效,而是根據(jù)情況進行二選一,最小間隔不會關注checkpoin的執(zhí)行時間,及時超時了,也不會被拋棄cuiyaonan2000@163.com

    注意這個值也意味著并發(fā) checkpoint 的數(shù)目是。

  • checkpoint 可容忍連續(xù)失敗次數(shù)該屬性定義可容忍多少次連續(xù)的 checkpoint 失敗。超過這個閾值之后會觸發(fā)作業(yè)錯誤 fail over。 默認次數(shù)為“0”,這意味著不容忍 checkpoint 失敗,作業(yè)將在第一次 checkpoint 失敗時fail over。 可容忍的checkpoint失敗僅適用于下列情形:Job Manager的IOException,TaskManager做checkpoint時異步部分的失敗, checkpoint超時等。TaskManager做checkpoint時同步部分的失敗會直接觸發(fā)作業(yè)fail over。其它的checkpoint失?。ㄈ缫粋€checkpoint被另一個checkpoint包含)會被忽略掉。

  • 并發(fā) checkpoint 的數(shù)目: 默認情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統(tǒng)不會觸發(fā)另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。 不過允許多個 checkpoint 并行進行是可行的,對于有確定的處理延遲(例如某方法所調(diào)用比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障后重跑的 pipelines 來說,是有意義的。

    該選項不能和 “checkpoints 間的最小時間"同時使用。

  • externalized checkpoints: 你可以配置周期存儲 checkpoint 到外部系統(tǒng)中。Externalized checkpoints 將他們的元數(shù)據(jù)寫到持久化存儲上并且在 job 失敗的時候不會被自動刪除。 這種方式下,如果你的 job 失敗,你將會有一個現(xiàn)有的 checkpoint 去恢復。更多的細節(jié)請看?Externalized checkpoints 的部署文檔。

設置如上的配置代碼示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);

// 高級選項:

// 設置模式為精確一次 (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允許兩個連續(xù)的 checkpoint 錯誤
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,這樣 checkpoint 在作業(yè)取消后仍就會被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();

State Backend?#

狀態(tài)內(nèi)部的存儲格式、狀態(tài)在 CheckPoint 時如何持久化以及持久化在哪里均取決于選擇的?State Backend。

Flink 的?checkpointing 機制?會將 timer 以及 stateful 的 operator 進行快照,然后存儲下來, 包括連接器(connectors),窗口(windows)以及任何用戶自定義的狀態(tài)。 Checkpoint 存儲在哪里取決于所配置的?State Backend(比如 JobManager memory、 file system、 database)。

默認情況下,狀態(tài)是保持在 TaskManagers 的內(nèi)存中,checkpoint 保存在 JobManager 的內(nèi)存中。為了合適地持久化大體量狀態(tài), Flink 支持各種各樣的途徑去存儲 checkpoint 狀態(tài)到其他的 state backends 上。通過?StreamExecutionEnvironment.setStateBackend(…)?來配置所選的 state backends。

內(nèi)置的 State Backends?#

Flink 內(nèi)置了以下這些開箱即用的 state backends :

  • HashMapStateBackend
  • EmbeddedRocksDBStateBackend

如果不設置,默認使用 HashMapStateBackend

HashMapStateBackend?#

HashMapStateBackend?是非??斓?,因為每個狀態(tài)的讀取和算子對于 objects 的更新都是在 Java 的 heap 上

在?HashMapStateBackend?內(nèi)部,數(shù)據(jù)以 Java 對象的形式存儲在堆中。 Key/value 形式的狀態(tài)和窗口算子會持有一個 hash table,其中存儲著狀態(tài)值、觸發(fā)器。

HashMapStateBackend 的適用場景:

  • 有較大 state,較長 window 和較大 key/value 狀態(tài)的 Job。
  • 所有的高可用場景。

建議同時將?managed memory?設為0,以保證將最大限度的內(nèi)存分配給 JVM 上的用戶代碼。

與 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 將數(shù)據(jù)以對象形式存儲在堆中,因此重用這些對象數(shù)據(jù)是不安全的。

EmbeddedRocksDBStateBackend?#

RocksDB?可以根據(jù)可用的 disk 空間擴展,并且只有它支持增量 snapshot。

EmbeddedRocksDBStateBackend 將正在運行中的狀態(tài)數(shù)據(jù)保存在?RocksDB?數(shù)據(jù)庫中,RocksDB 數(shù)據(jù)庫默認將數(shù)據(jù)存儲在 TaskManager 的數(shù)據(jù)目錄。 不同于?HashMapStateBackend?中的 java 對象,數(shù)據(jù)被以序列化字節(jié)數(shù)組的方式存儲,這種方式由序列化器決定,因此 key 之間的比較是以字節(jié)序的形式進行而不是使用 Java 的?hashCode?或?equals()?方法。

EmbeddedRocksDBStateBackend 的適用場景:

  • 狀態(tài)非常大、窗口非常長、key/value 狀態(tài)非常大的 Job。
  • 所有高可用的場景。

EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (見?這里)。

EmbeddedRocksDBStateBackend 將會使應用程序的最大吞吐量降低。 所有的讀寫都必須序列化、反序列化操作,這個比基于堆內(nèi)存的 state backend 的效率要低很多。 同時因為存在這些序列化、反序列化操作,重用放入 EmbeddedRocksDBStateBackend 的對象是安全的。

設置

你能在?flink-conf.yaml?中為所有 Job 設置其他默認的 State Backend。?

也可以使用如下代碼為為每個任務設置State Backend;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

如果你想在 IDE 中使用?EmbeddedRocksDBStateBackend,或者需要在作業(yè)中通過編程方式動態(tài)配置它,必須添加以下依賴到 Flink 項目中。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.17.1</version>
    <scope>provided</scope>
</dependency>

flink-conf.yaml?相關配置

可選值包括:

  1. ?jobmanager?(HashMapStateBackend),?
  2. rocksdb?(EmbeddedRocksDBStateBackend),
  3. 或使用實現(xiàn)了 state backend 工廠?StateBackendFactory?的類的全限定類名, 例如: EmbeddedRocksDBStateBackend 對應為?org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory

state.checkpoints.dir?選項指定了所有 State Backend 寫 CheckPoint 數(shù)據(jù)和寫元數(shù)據(jù)文件的目錄。 你能在?這里?找到關于 CheckPoint 目錄結(jié)構(gòu)的詳細信息

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

增量快照

RocksDB 支持增量快照。不同于產(chǎn)生一個包含所有數(shù)據(jù)的全量備份,增量快照中只包含自上一次快照完成之后被修改的記錄,因此可以顯著減少快照完成的耗時。

一個增量快照是基于(通常多個)前序快照構(gòu)建的(相當于是現(xiàn)有1個全量快照,后面跟了幾個增量快照來進行數(shù)據(jù)的回復cuiyaonan2000@163.com)

增量快照會造成重復的數(shù)據(jù),因此checkpoin的文件會變大,會更多的占有網(wǎng)絡,但是在計算恢復的時候會變快.

設置

雖然狀態(tài)數(shù)據(jù)量很大時我們推薦使用增量快照,但這并不是默認的快照機制,您需要通過下述配置手動開啟該功能:

  • 在?flink-conf.yaml?中設置:state.backend.incremental: true?或者
  • 在代碼中按照右側(cè)方式配置(來覆蓋默認配置):EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

需要注意的是,一旦啟用了增量快照,網(wǎng)頁上展示的?Checkpointed Data Size?只代表增量上傳的數(shù)據(jù)量,而不是一次快照的完整數(shù)據(jù)量。文章來源地址http://www.zghlxwxcb.cn/news/detail-522402.html

到了這里,關于Flink: checkPoint的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • IDEA 中使用 Big Data Tools 連接大數(shù)據(jù)組件

    IDEA 中使用 Big Data Tools 連接大數(shù)據(jù)組件

    簡介 Big Data Tools 插件可用于 Intellij Idea 2019.2 及以后的版本。它提供了使用 Zeppelin,AWS S3,Spark,Google Cloud Storage,Minio,Linode,數(shù)字開放空間,Microsoft Azure 和 Hadoop 分布式文件系統(tǒng)(HDFS)來監(jiān)視和處理數(shù)據(jù)的特定功能。 下面來看一下 Big Data Tools 的安裝和使用,主要會配置

    2023年04月08日
    瀏覽(19)
  • Databases and Big Data Technologies: Essential Knowledg

    作者:禪與計算機程序設計藝術(shù) 互聯(lián)網(wǎng)正在改變著傳統(tǒng)行業(yè)和新興行業(yè)的結(jié)構(gòu),電子商務、社交網(wǎng)絡、移動應用程序等新興產(chǎn)業(yè)的迅速發(fā)展也催生了基于數(shù)據(jù)中心的數(shù)據(jù)庫應用的需求,而這方面的知識技能是越來越重要。然而,除了數(shù)據(jù)庫技術(shù)的基礎知識和技術(shù)棧外,基于數(shù)

    2024年02月07日
    瀏覽(38)
  • Big Data Tools插件(詳細講解安裝,連接,包教包會?。。?

    Big Data Tools插件(詳細講解安裝,連接,包教包會?。?!)

    ??博主syst1m 帶你 acquire knowledge! ?博客首頁——syst1m的博客?? ??《CTF專欄》超級詳細的解析,寶寶級教學讓你從蹣跚學步到健步如飛?? ??《大數(shù)據(jù)專欄》大數(shù)據(jù)從0到禿頭??,從分析到?jīng)Q策,無所不能? ?? 《python面向?qū)ο?人狗大戰(zhàn))》突破百萬的閱讀量,上過各種各樣

    2024年02月03日
    瀏覽(24)
  • Establishing a RealTime Big Data Platform for Transport

    作者:禪與計算機程序設計藝術(shù) Apache Kafka是一個開源的分布式流處理平臺,它最初由LinkedIn公司開發(fā),用于實時數(shù)據(jù)管道及流動計算,隨著時間的推移,Kafka已成為最流行的開源消息代理之一。同時,它還是一個快速、可靠的分布式存儲系統(tǒng),它可以作為消息隊列來用。Mong

    2024年02月07日
    瀏覽(44)
  • Apache Hadoop: Building a Big Data Distributed Environm

    作者:禪與計算機程序設計藝術(shù) Apache Hadoop (以下簡稱HDFS)是一個開源的分布式文件系統(tǒng),用來存儲大量的數(shù)據(jù)集并進行計算處理。它可以處理超大數(shù)據(jù)集、實時數(shù)據(jù)分析、日志聚類等應用場景。HDFS被廣泛應用于企業(yè)數(shù)據(jù)倉庫、電子商務網(wǎng)站、搜索引擎、Hadoop生態(tài)系統(tǒng)中的大多

    2024年02月06日
    瀏覽(18)
  • How AI is changing Big Data and Business

    作者:禪與計算機程序設計藝術(shù) 隨著人工智能的不斷進步、計算機算力的不斷提高,以及基于云計算平臺的大數(shù)據(jù)產(chǎn)生的越來越多的數(shù)據(jù),人工智能已成為經(jīng)濟界和產(chǎn)業(yè)界的一股重要力量。而人工智能究竟能給企業(yè)帶來哪些新的機遇和變化,如何運用人工智能為企業(yè)提供更好

    2024年02月08日
    瀏覽(15)
  • Python Packages for Big Data Analysis and Visualization

    作者:禪與計算機程序設計藝術(shù) Python第三方庫主要分為兩類:數(shù)據(jù)處理、可視化。下面是用于大數(shù)據(jù)分析與可視化的常用的Python第三方庫列表(按推薦順序排序): NumPy: NumPy 是用 Python 編寫的一個科學計算庫,其功能強大且全面,尤其適用于對大型多維數(shù)組和矩陣進行快速

    2024年02月07日
    瀏覽(21)
  • Building a big data platform system, architecture desig

    作者:禪與計算機程序設計藝術(shù) Apache Hadoop是一個開源的分布式計算平臺,它可以運行在廉價的商用硬件上,并提供可擴展性和高容錯性。作為Hadoop框架的一部分,MapReduce是一種編程模型和執(zhí)行引擎,用于對大數(shù)據(jù)集進行并行處理。但是,由于其復雜性和龐大的體系結(jié)構(gòu),開

    2024年02月05日
    瀏覽(32)
  • 大數(shù)據(jù):HDFS操作的客戶端big data tools和NFS

    大數(shù)據(jù):HDFS操作的客戶端big data tools和NFS

    2022找工作是學歷、能力和運氣的超強結(jié)合體,遇到寒冬,大廠不招人,可能很多算法學生都得去找開發(fā),測開 測開的話,你就得學數(shù)據(jù)庫,sql,oracle,尤其sql要學,當然,像很多金融企業(yè)、安全機構(gòu)啥的,他們必須要用oracle數(shù)據(jù)庫 這oracle比sql安全,強大多了,所以你需要學

    2024年02月09日
    瀏覽(16)
  • An Introduction to Hadoop Streaming API in Big Data

    作者:禪與計算機程序設計藝術(shù) Hadoop Streaming 是 Hadoop 的一個子項目,它可以讓用戶在 Hadoop 上運行離線批處理作業(yè)或?qū)崟r流處理作業(yè)。其主要工作原理是從標準輸入(stdin)讀取數(shù)據(jù),對其進行處理,然后輸出到標準輸出(stdout)。Hadoop Streaming 的計算模型是 MapReduce-like,每

    2024年02月08日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包