国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink狀態(tài)管理五】Checkpoint的設(shè)計(jì)與實(shí)現(xiàn)

這篇具有很好參考價(jià)值的文章主要介紹了【Flink狀態(tài)管理五】Checkpoint的設(shè)計(jì)與實(shí)現(xiàn)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

由于系統(tǒng)原因?qū)е翭link作業(yè)無法正常運(yùn)行的情況非常多,且很多時(shí)候都是無法避免的。對(duì)于Flink集群來講,能夠快速?gòu)漠惓顟B(tài)中恢復(fù),同時(shí)保證處理數(shù)據(jù)的正確性和一致性非常重要。Flink主要借助Checkpoint的方式保障整個(gè)系統(tǒng)狀態(tài)數(shù)據(jù)的一致性,也就是基于ABS算法實(shí)現(xiàn)輕量級(jí)快照服務(wù)。

本節(jié)我們?cè)敿?xì)了解Checkpoint的設(shè)計(jì)與實(shí)現(xiàn)。

?

1. Checkpoint的整體設(shè)計(jì)

Checkpoint的執(zhí)行過程分為三個(gè)階段:?jiǎn)?dòng)、執(zhí)行以及確認(rèn)完成。其中Checkpoint的啟動(dòng)過程由JobManager管理節(jié)點(diǎn)中的CheckpointCoordinator組件控制,該組件會(huì)周期性地向數(shù)據(jù)源節(jié)點(diǎn)發(fā)送執(zhí)行Checkpoint的請(qǐng)求,執(zhí)行頻率取決于用戶配置的CheckpointInterval參數(shù)。

執(zhí)行過程:

  1. 在JobManager管理節(jié)點(diǎn)通過CheckpointCoordinator組件向每個(gè)數(shù)據(jù)源節(jié)點(diǎn)發(fā)送Checkpoint執(zhí)行請(qǐng)求,此時(shí)數(shù)據(jù)源節(jié)點(diǎn)中的算子會(huì)將消費(fèi)數(shù)據(jù)對(duì)應(yīng)的Position發(fā)送到JobManager管理節(jié)點(diǎn)中。
  2. JobManager節(jié)點(diǎn)會(huì)存儲(chǔ)Checkpoint元數(shù)據(jù),用于記錄每次執(zhí)行Checkpoint操作過程中算子的元數(shù)據(jù)信息,例如在FlinkKafkaConsumer中會(huì)記錄消費(fèi)Kafka主題的偏移量,用于確認(rèn)從Kafka主題中讀取數(shù)據(jù)的位置。
  3. 在數(shù)據(jù)源節(jié)點(diǎn)執(zhí)行完Checkpoint操作后,繼續(xù)向下游節(jié)點(diǎn)發(fā)送CheckpointBarrier事件,下游算子通過對(duì)齊Barrier事件,觸發(fā)該算子的Checkpoint操作。
    當(dāng)下游的map算子接收到數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint
    Barrier事件后,首先對(duì)當(dāng)前算子的數(shù)據(jù)進(jìn)行處理,并等待其他上游數(shù)據(jù)源節(jié)點(diǎn)的Barrier事件到達(dá)。該過程就是Checkpoint
    Barrier對(duì)齊,目的是確保屬于同一Checkpoint的數(shù)據(jù)能夠全部到達(dá)當(dāng)前節(jié)點(diǎn)。

【Flink狀態(tài)管理五】Checkpoint的設(shè)計(jì)與實(shí)現(xiàn),# flink源碼,flink

Barrier事件的作用就是切分不同Checkpoint批次的數(shù)據(jù)。

  • 當(dāng)map算子接收到所有上游的Barrier事件后,就會(huì)觸發(fā)當(dāng)前算子的Checkpoint操作,并將狀態(tài)數(shù)據(jù)快照到指定的外部持久化介質(zhì)中,該操作主要借助狀態(tài)后端存儲(chǔ)實(shí)現(xiàn)。

  • 當(dāng)狀態(tài)數(shù)據(jù)執(zhí)行完畢后,繼續(xù)將Barrier事件發(fā)送至下游的算子,進(jìn)行后續(xù)算子的Checkpoint操作。

  • 另外,在map算子中執(zhí)行完Checkpoint操作后,也會(huì)向JobManager管理節(jié)點(diǎn)發(fā)送Ack消息,確認(rèn)當(dāng)前算子的Checkpoint操作正常執(zhí)行。此時(shí)Checkpoint數(shù)據(jù)會(huì)存儲(chǔ)該算子對(duì)應(yīng)的狀態(tài)數(shù)據(jù),如果StateBackend為MemoryStateBackend,則主要會(huì)將狀態(tài)數(shù)據(jù)存儲(chǔ)在JobManager的堆內(nèi)存中。

sink節(jié)點(diǎn)的ack

像map算子節(jié)點(diǎn)一樣,當(dāng)Barrier事件到達(dá)sink類型的節(jié)點(diǎn)后,sink節(jié)點(diǎn)也會(huì)進(jìn)行Barrier對(duì)齊操作,確認(rèn)上游節(jié)點(diǎn)的數(shù)據(jù)全部接入。然后對(duì)接入的數(shù)據(jù)進(jìn)行處理,將結(jié)果輸出到外部系統(tǒng)中。完成以上步驟后,sink節(jié)點(diǎn)會(huì)向JobManager管理節(jié)點(diǎn)發(fā)送Ack確認(rèn)消息,確認(rèn)當(dāng)前Checkpoint中的狀態(tài)數(shù)據(jù)都正常進(jìn)行了持久化操作。(之后呢?當(dāng)任務(wù)結(jié)束之后,cp會(huì)消失還是?)

?

2. Checkpoint創(chuàng)建源碼解析

通過調(diào)用StreamExecutionEnvironment.enableCheckpointing(),開啟Checkpoint。
此時(shí)Checkpoint的配置會(huì)被存儲(chǔ)在StreamGraph中,然后將StreamGraph中的CheckpointConfig轉(zhuǎn)換為JobCheckpointingSettings數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)在JobGraph對(duì)象中,并伴隨JobGraph提交到集群運(yùn)行。啟動(dòng)JobMaster服務(wù)后,JobMaster調(diào)度和執(zhí)行Checkpoint操作。

2.1. DefaultExecutionGraphBuilder.buildGraph

如下代碼,通過JobGraph構(gòu)建ExecutionGraph的過程中,獲取JobGraph中存儲(chǔ)的JobCheckpointingSettings配置,然后創(chuàng)建ExecutionGraph。

1)根據(jù)snapshotSettings配置獲取triggerVertices、ackVertices以及confirmVertices節(jié)點(diǎn)集合,并轉(zhuǎn)換為對(duì)應(yīng)的ExecutionJobVertex集合。

  • 其中triggerVertices集合存儲(chǔ)了所有SourceOperator節(jié)點(diǎn),這些節(jié)點(diǎn)通過CheckpointCoordinator主動(dòng)觸發(fā)Checkpoint操作。
  • ackVertices和confirmVertices集合存儲(chǔ)了StreamGraph中的全部節(jié)點(diǎn),代表所有節(jié)點(diǎn)都需要返回Ack確認(rèn)信息并確認(rèn)Checkpoint執(zhí)行成功。

2)創(chuàng)建CompletedCheckpointStore組件,用于存儲(chǔ)Checkpoint過程中的元數(shù)據(jù)。

  • 當(dāng)對(duì)作業(yè)進(jìn)行恢復(fù)操作時(shí)會(huì)在CompletedCheckpointStore中檢索最新完成的Checkpoint元數(shù)據(jù)信息,然后基于元數(shù)據(jù)信息恢復(fù)Checkpoint中存儲(chǔ)的狀態(tài)數(shù)據(jù)。CompletedCheckpointStore有兩種實(shí)現(xiàn),分別為StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore。
  • 在CompletedCheckpointStore中通過maxNumberOfCheckpointsToRetain參數(shù)配置以及結(jié)合checkpointIdCounter計(jì)數(shù)器保證只會(huì)存儲(chǔ)固定數(shù)量的CompletedCheckpoint。

3)創(chuàng)建CheckpointStatsTracker實(shí)例
用于監(jiān)控和追蹤C(jī)heckpoint執(zhí)行和更新的情況,包括Checkpoint執(zhí)行的統(tǒng)計(jì)信息以及執(zhí)行狀況,WebUI中顯示的Checkpoint監(jiān)控?cái)?shù)據(jù)主要來自CheckpointStatsTracker。

4)創(chuàng)建StateBackend,從UserClassLoader中反序列化出應(yīng)用指定的StateBackend并設(shè)定為applicationConfiguredBackend。

5)初始化用戶自定義的Checkpoint Hook函數(shù)

6)最終調(diào)用executionGraph.enableCheckpointing()方法,在作業(yè)的執(zhí)行和調(diào)度過程中開啟Checkpoint。

// 配置狀態(tài)數(shù)據(jù)checkpointing
// 從jobGraph中獲取JobCheckpointingSettings
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
//如果snapshotSettings不為空,則開啟checkpoint功能
if (snapshotSettings != null) {
   List<ExecutionJobVertex> triggerVertices =
         idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
   List<ExecutionJobVertex> ackVertices =
         idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
   List<ExecutionJobVertex> confirmVertices =
         idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
   //創(chuàng)建CompletedCheckpointStore
   CompletedCheckpointStore completedCheckpoints;
   CheckpointIDCounter checkpointIdCounter;
   try {
      int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
          CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
      if (maxNumberOfCheckpointsToRetain <= 0) {
         maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_
            CHECKPOINTS.defaultValue();
      }
      // 通過recoveryFactory創(chuàng)建CheckpointStore
      completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, 
         maxNumberOfCheckpointsToRetain, classLoader);   
      // 通過recoveryFactory創(chuàng)建CheckpointIDCounter
      checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
   }
   catch (Exception e) {
      throw new JobExecutionException(jobId, "Failed to initialize high-
         availability checkpoint handler", e);
   }
   // 獲取checkpoints最長(zhǎng)的記錄次數(shù)
   int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
   // 創(chuàng)建CheckpointStatsTracker實(shí)例
   CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
         historySize,
         ackVertices,
         snapshotSettings.getCheckpointCoordinatorConfiguration(),
         metrics);
   // 從application中獲取StateBackend
   final StateBackend applicationConfiguredBackend;
   final SerializedValue<StateBackend> serializedAppConfigured = 
      snapshotSettings.getDefaultStateBackend();
   if (serializedAppConfigured == null) {
      applicationConfiguredBackend = null;
   }
   else {
      try {
         applicationConfiguredBackend = serializedAppConfigured.
            deserializeValue(classLoader);
      } catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId,
            "Could not deserialize application-defined state backend.", e);
      }
   }
   // 獲取最終的rootBackend
   final StateBackend rootBackend;
   try {
      rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
         applicationConfiguredBackend, jobManagerConfig, classLoader, log);
   }
   catch (IllegalConfigurationException | IOException | 
      DynamicCodeLoadingException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate configured state backend", e);
   }
   // 初始化用戶自定義的checkpoint Hooks函數(shù)
   final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = 
      snapshotSettings.getMasterHooks();
   final List<MasterTriggerRestoreHook<?>> hooks;
   // 如果serializedHooks為空,則hooks為空
   if (serializedHooks == null) {
      hooks = Collections.emptyList();
   }
   else {
   // 加載MasterTriggerRestoreHook
      final MasterTriggerRestoreHook.Factory[] hookFactories;
      try {
         hookFactories = serializedHooks.deserializeValue(classLoader);
      }
      catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate user-defined checkpoint hooks", e);
      }
      // 設(shè)定ClassLoader為UserClassLoader
      final Thread thread = Thread.currentThread();
      final ClassLoader originalClassLoader = thread.getContextClassLoader();
      thread.setContextClassLoader(classLoader);
      // 創(chuàng)建hooks函數(shù)
      try {
         hooks = new ArrayList<>(hookFactories.length);
         for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
            hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
         }
      }
      // 將thread的ContextClassLoader設(shè)定為originalClassLoader
      finally {
         thread.setContextClassLoader(originalClassLoader);
      }
   }
   // 獲取CheckpointCoordinatorConfiguration
   final CheckpointCoordinatorConfiguration chkConfig = 
      snapshotSettings.getCheckpointCoordinatorConfiguration();
   // 開啟executionGraph中的Checkpoint功能
   executionGraph.enableCheckpointing(
      chkConfig,
      triggerVertices,
      ackVertices,
      confirmVertices,
      hooks,
      checkpointIdCounter,
      completedCheckpoints,
      rootBackend,
      checkpointStatsTracker);
}

?

2.2. ExecutionGraph.enableCheckpointing

繼續(xù)看ExecutionGraph.enableCheckpointing()方法的實(shí)現(xiàn),包含如下邏輯。

  1. 將tasksToTrigger、tasksToWaitFor以及tasksToCommitTo三個(gè)ExecutionJobVertex集合轉(zhuǎn)換為ExecutionVertex[]數(shù)組,每個(gè)ExecutionVertex代表ExecutionJobVertex中的一個(gè)SubTask節(jié)點(diǎn)。
  2. 容錯(cuò)管理:創(chuàng)建CheckpointFailureManager,用于Checkpoint執(zhí)行過程中的容錯(cuò)管理,包含failJob和failJobDueToTaskFailure兩個(gè)處理方法。
  3. 定時(shí)調(diào)度和執(zhí)行:創(chuàng)建checkpointCoordinatorTimer,用于Checkpoint異步線程的定時(shí)調(diào)度和執(zhí)行。
  4. 協(xié)調(diào)和管理作業(yè)中的Checkpoint:創(chuàng)建CheckpointCoordinator組件,通過CheckpointCoordinator協(xié)調(diào)和管理作業(yè)中的Checkpoint,同時(shí)收集各Task節(jié)點(diǎn)中Checkpoint的執(zhí)行狀況等信息。
  5. Hook:將Master Hook注冊(cè)到CheckpointCoordinator中,實(shí)現(xiàn)用戶自定義Hook代碼的調(diào)用。
  6. 控制CheckpointCoordinator的啟停:將JobStatusListener的實(shí)現(xiàn)類CheckpointCoordinatorDeActivator注冊(cè)到JobManager中,此時(shí)系統(tǒng)會(huì)根據(jù)作業(yè)的運(yùn)行狀態(tài)控制CheckpointCoordinator的啟停,當(dāng)作業(yè)的狀態(tài)為Running時(shí)會(huì)觸發(fā)啟動(dòng)CheckpointCoordinator組件。
public void enableCheckpointing(
      CheckpointCoordinatorConfiguration chkConfig,
      List<ExecutionJobVertex> verticesToTrigger,
      List<ExecutionJobVertex> verticesToWaitFor,
      List<ExecutionJobVertex> verticesToCommitTo,
      List<MasterTriggerRestoreHook<?>> masterHooks,
      CheckpointIDCounter checkpointIDCounter,
      CompletedCheckpointStore checkpointStore,
      StateBackend checkpointStateBackend,
      CheckpointStatsTracker statsTracker) {
   checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
   checkState(checkpointCoordinator == null, "checkpointing already enabled");
   ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
   ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
   ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
   checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
   // 創(chuàng)建CheckpointFailureManager
   CheckpointFailureManager failureManager = new CheckpointFailureManager(
      chkConfig.getTolerableCheckpointFailureNumber(),
      new CheckpointFailureManager.FailJobCallback() {
         @Override
         public void failJob(Throwable cause) {
            getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
         }
         @Override
         public void failJobDueToTaskFailure(Throwable cause, 
                                             ExecutionAttemptID failingTask) {
            getJobMasterMainThreadExecutor()
               .execute(()  -> failGlobalIfExecutionIsStillRunning(cause, 
                  failingTask));
         }
      }
   );
   // 創(chuàng)建checkpointCoordinatorTimer
   checkState(checkpointCoordinatorTimer == null);
   checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(
      new DispatcherThreadFactory(
         Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
   // 創(chuàng)建checkpointCoordinator
   checkpointCoordinator = new CheckpointCoordinator(
      jobInformation.getJobId(),
      chkConfig,
      tasksToTrigger,
      tasksToWaitFor,
      tasksToCommitTo,
      checkpointIDCounter,
      checkpointStore,
      checkpointStateBackend,
      ioExecutor,
      new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
      SharedStateRegistry.DEFAULT_FACTORY,
      failureManager);
   // 向checkpoint Coordinator中注冊(cè)master Hooks
   for (MasterTriggerRestoreHook<?> hook : masterHooks) {
      if (!checkpointCoordinator.addMasterHook(hook)) {
         LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",
                  hook.getIdentifier());
      }
   }
   //向checkpointCoordinator中設(shè)定checkpointStatsTracker
   checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
     // 注冊(cè)JobStatusListener,用于自動(dòng)啟動(dòng)CheckpointCoordinator
   if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
      registerJobStatusListener(checkpointCoordinator.
         createActivatorDeactivator());
   }
   this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
}

?

參考:《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》–張利兵文章來源地址http://www.zghlxwxcb.cn/news/detail-833463.html

到了這里,關(guān)于【Flink狀態(tài)管理五】Checkpoint的設(shè)計(jì)與實(shí)現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink分流,合流,狀態(tài),checkpoint和精準(zhǔn)一次筆記

    第8章 分流 1.使用側(cè)輸出流 2.合流 2.1 union :使用 ProcessFunction 處理合流后的數(shù)據(jù) 2.2 Connect : 兩條流的格式可以不一樣, map操作使用CoMapFunction,process 傳入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 進(jìn)行了按鍵分區(qū),那么要傳入的就是 KeyedBroadcastProcessFunction; 如果沒有按鍵分

    2024年02月12日
    瀏覽(20)
  • Flink源碼之Checkpoint執(zhí)行流程

    Flink源碼之Checkpoint執(zhí)行流程

    Checkpoint完整流程如上圖所示: JobMaster的CheckpointCoordinator向所有SourceTask發(fā)送RPC觸發(fā)一次CheckPoint SourceTask向下游廣播CheckpointBarrier SouceTask完成狀態(tài)快照后向JobMaster發(fā)送快照結(jié)果 非SouceTask在Barrier對(duì)齊后完成狀態(tài)快照向JobMaster發(fā)送快照結(jié)果 JobMaster保存SubTask快照結(jié)果 JobMaster收到所

    2024年02月11日
    瀏覽(44)
  • 源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行

    源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行 Flink版本:1.13.6 前置知識(shí):源節(jié)點(diǎn)的Checkpoint是由Checkpointcoordinate觸發(fā),具體是通過RPC調(diào)用TaskManager中對(duì)應(yīng)的Task的StreamTask類的performChecpoint方法執(zhí)行Checkpoint。 本文思路:本文先分析checkpoint階段,然后再分析數(shù)據(jù)讀取階段,

    2024年02月14日
    瀏覽(27)
  • Flink的checkpoint是怎么實(shí)現(xiàn)的?

    Flink的checkpoint是怎么實(shí)現(xiàn)的?

    Checkpoint介紹 Checkpoint容錯(cuò)機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄€(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機(jī)制原理來自“Chandy-Lamport algorithm”算法。 Barriers flink 分

    2024年02月10日
    瀏覽(11)
  • Flink非對(duì)齊checkpoint原理(Flink Unaligned Checkpoint)

    Flink非對(duì)齊checkpoint原理(Flink Unaligned Checkpoint)

    為什么提出Unaligned Checkpoint(UC)? 因?yàn)榉磯簢?yán)重時(shí)會(huì)導(dǎo)致Checkpoint失敗,可能導(dǎo)致如下問題 恢復(fù)時(shí)間長(zhǎng)-服務(wù)效率低 非冪等和非事務(wù)會(huì)導(dǎo)致數(shù)據(jù)重復(fù) 持續(xù)反壓導(dǎo)致任務(wù)加入死循環(huán)(可能導(dǎo)致數(shù)據(jù)丟失,例如超過kafka的過期時(shí)間無法重置offset) UC的原理 UC有兩個(gè)階段(UC主要是

    2024年02月14日
    瀏覽(28)
  • Flink中的狀態(tài)管理

    Flink中的狀態(tài)管理

    在Flink中,算子任務(wù)可以分為 有狀態(tài) 和 無狀態(tài) 兩種狀態(tài)。 無狀態(tài)的算子任務(wù)只需要觀察每個(gè)獨(dú)立事件,根據(jù)當(dāng)前輸入的數(shù)據(jù)直接轉(zhuǎn)換輸出結(jié)果。 例如 Map 、 Filter 、 FlatMap 都是屬于 無狀態(tài)算子 。? 而 有狀態(tài)的算子任務(wù),就是除了當(dāng)前數(shù)據(jù)外,還需要一些其他的數(shù)據(jù)來得到

    2024年01月23日
    瀏覽(20)
  • Flink State 狀態(tài)管理

    狀態(tài)在Flink中叫做State,用來保存中間計(jì)算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點(diǎn)內(nèi)容: 狀態(tài)數(shù)據(jù)的存儲(chǔ)和訪問 在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。 狀態(tài)數(shù)據(jù)的備份和恢復(fù) 作業(yè)失敗是無法避免的,那么就要考慮如何高效地將狀態(tài)

    2024年01月17日
    瀏覽(23)
  • 《Flink學(xué)習(xí)筆記》——第八章 狀態(tài)管理

    《Flink學(xué)習(xí)筆記》——第八章 狀態(tài)管理

    8.1 Flink中的狀態(tài) 8.1.1 概述 在Flink中,算子任務(wù)可以分為無狀態(tài)和有狀態(tài)兩種情況。 **無狀態(tài)的算子:**每個(gè)事件不依賴其它數(shù)據(jù),自己處理完就輸出,也不需要依賴中間結(jié)果。例如:打印操作,每個(gè)數(shù)據(jù)只需要它本身就可以完成。 **有狀態(tài)的算子:**事件需要依賴中間或者外

    2024年02月11日
    瀏覽(25)
  • 【大數(shù)據(jù)】Flink 架構(gòu)(四):狀態(tài)管理

    【大數(shù)據(jù)】Flink 架構(gòu)(四):狀態(tài)管理

    《 Flink 架構(gòu) 》系列(已完結(jié)),共包含以下 6 篇文章: Flink 架構(gòu)(一):系統(tǒng)架構(gòu) Flink 架構(gòu)(二):數(shù)據(jù)傳輸 Flink 架構(gòu)(三):事件時(shí)間處理 Flink 架構(gòu)(四):狀態(tài)管理 Flink 架構(gòu)(五):檢查點(diǎn) Checkpoint(看完即懂) Flink 架構(gòu)(六):保存點(diǎn) Savepoint ?? 如果您覺得這篇

    2024年02月19日
    瀏覽(22)
  • Flink 源碼剖析|鍵控狀態(tài)的 API 層

    Flink 源碼剖析|鍵控狀態(tài)的 API 層

    在 Flink 中有如下 5 種鍵控狀態(tài)(Keyed State),這些狀態(tài)僅能在鍵控?cái)?shù)據(jù)流(Keyed Stream)的算子(operator)上使用。鍵控流使用鍵(key)對(duì)數(shù)據(jù)流中的記錄進(jìn)行分區(qū),同時(shí)也會(huì)對(duì)狀態(tài)進(jìn)行分區(qū)。要?jiǎng)?chuàng)建鍵控流,只需要在 DataStream 上使用 keyBy() 方法指定鍵即可。 具體地,這 5 種鍵

    2024年02月20日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包