国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink狀態(tài)管理(六)】Checkpoint的觸發(fā)方式(1)通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作

這篇具有很好參考價(jià)值的文章主要介紹了【Flink狀態(tài)管理(六)】Checkpoint的觸發(fā)方式(1)通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Checkpoint的觸發(fā)方式有兩種

  • 一種是數(shù)據(jù)源節(jié)點(diǎn)中的Checkpoint操作觸發(fā),通過(guò)CheckpointCoordinator組件進(jìn)行協(xié)調(diào)和控制。 CheckpointCoordinator通過(guò)注冊(cè)定時(shí)器的方式按照配置的時(shí)間間隔觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作。數(shù)據(jù)源節(jié)點(diǎn)會(huì)向下游算子發(fā)出Checkpoint Barrier事件,供下游節(jié)點(diǎn)使用。
  • 另一種是下游算子節(jié)點(diǎn)根據(jù)上游發(fā)送的Checkpoint Barrier事件控制算子中Checkpoint操作的觸發(fā)時(shí)機(jī),即只有接收到所有上游Barrier事件后,才會(huì)觸發(fā)本節(jié)點(diǎn)的Checkpoint操作。

本文先介紹通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作

CheckpointCoordinator在整個(gè)作業(yè)中扮演了Checkpoint協(xié)調(diào)者的角色,負(fù)責(zé)在數(shù)據(jù)源節(jié)點(diǎn)觸發(fā)Checkpoint以及整個(gè)作業(yè)的Checkpoint管理,并且CheckpointCoordinator組件會(huì)接收TaskMananger在Checkpoint執(zhí)行完成后返回的Ack消息。

?

一. 啟動(dòng)CheckpointCoordinator

當(dāng)作業(yè)的JobStatus轉(zhuǎn)換為Running時(shí),通知CheckpointCoordinatorDeActivator監(jiān)聽(tīng)器啟動(dòng)CheckpointCoordinator服務(wù)。

如代碼CheckpointCoordinatorDeActivator.jobStatusChanges()方法主要包含如下邏輯。

> 1. 當(dāng)`newJobStatus == JobStatus.RUNNING`時(shí),立即調(diào)用
> coordinator.startCheckpointScheduler()方法啟動(dòng)整個(gè)Job的調(diào)度器
> CheckpointCoordinator,此時(shí)Checkpoint的觸發(fā)依靠CheckpointCoordinator進(jìn)行協(xié)調(diào)。
> 
> 2. 當(dāng)JobStatus為其他類(lèi)型狀態(tài)時(shí),調(diào)用coordinator.stopCheckpointScheduler()方法,
> 停止當(dāng)前Job中的Checkpoint操作。



public class CheckpointCoordinatorDeActivator implements JobStatusListener {
   private final CheckpointCoordinator coordinator;
   public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
      this.coordinator = checkNotNull(coordinator);
   }
   @Override
   public void jobStatusChanges(JobID jobId,JobStatus newJobStatus, long timestamp,
                              Throwable error) {
      if (newJobStatus == JobStatus.RUNNING) {
         // 啟動(dòng)Checkpoint調(diào)度程序
         coordinator.startCheckpointScheduler();
      } else {
         // 直接停止CheckpointScheduler
         coordinator.stopCheckpointScheduler();
      }
   }
}

?

二. 開(kāi)啟CheckpointScheduler線(xiàn)程

接下來(lái)在CheckpointCoordinator.startCheckpointScheduler()方法中調(diào)用scheduleTriggerWithDelay()方法進(jìn)行后續(xù)操作,向創(chuàng)建好的checkpointCoordinatorTimer線(xiàn)程池添加定時(shí)調(diào)度執(zhí)行的Runnable線(xiàn)程。

如代碼所示:

在CheckpointCoordinator.scheduleTriggerWithDelay()方法中指定baseInterval參數(shù),設(shè)定執(zhí)行Checkpoint操作的時(shí)間間隔,通過(guò)定時(shí)器周期性地觸發(fā)ScheduledTrigger線(xiàn)程,Checkpoint的具體操作在ScheduledTrigger線(xiàn)程中實(shí)現(xiàn)。

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
   return timer.scheduleAtFixedRate(
      new ScheduledTrigger(),
      initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

?

三. 觸發(fā)Checkpoint

如代碼,ScheduledTrigger也是CheckpointCoordinator的內(nèi)部類(lèi),實(shí)現(xiàn)了Runnable接口。在ScheduledTrigger.run()方法中調(diào)用了CheckpointCoordinator.triggerCheckpoint()方法觸發(fā)和執(zhí)行Checkpoint操作。

private final class ScheduledTrigger implements Runnable {
   @Override
   public void run() {
      try {
         // 調(diào)用triggerCheckpoint()方法觸發(fā)Checkpoint操作
         triggerCheckpoint(System.currentTimeMillis(), true);
      }
      catch (Exception e) {
         LOG.error("Exception while triggering checkpoint for job {}.", job, e);
      }
   }
}

CheckpointCoordinator.triggerCheckpoint()方法包含的執(zhí)行邏輯非常多,這里重點(diǎn)介紹其中的主要邏輯。根據(jù)CheckpointCoordinator觸發(fā)Checkpoint操作的過(guò)程分為以下幾個(gè)部分。

1. Checkpoint執(zhí)行前的工作

  1. 首先檢查Checkpoint的執(zhí)行環(huán)境和參數(shù),滿(mǎn)足條件后觸發(fā)執(zhí)行Checkpoint操作。Checkpoint執(zhí)行過(guò)程分為異步和同步兩種:

調(diào)用preCheckBeforeTriggeringCheckpoint()方法進(jìn)行一些前置檢查,主要包括檢查CheckpointCoordinator當(dāng)前的狀態(tài)是否為shutdown、Checkpoint嘗試次數(shù)是否超過(guò)配置的最大值。

  1. 構(gòu)建執(zhí)行和觸發(fā)Checkpoint操作對(duì)應(yīng)的Task節(jié)點(diǎn)實(shí)例的Execution集合,其中tasksToTrigger數(shù)組中存儲(chǔ)了觸發(fā)Checkpoint操作的ExecutionVertex元素,實(shí)際上就是所有的數(shù)據(jù)源節(jié)點(diǎn)。

CheckpointCoordinator僅會(huì)觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作,其他節(jié)點(diǎn)則是通過(guò)Barrier對(duì)齊的方式觸發(fā)的。

  1. 構(gòu)建需要發(fā)送Ack消息的ExecutionVertex集合,主要是從tasksToWaitFor集合中轉(zhuǎn)換而來(lái)。

tasksToWaitFor中存儲(chǔ)了ExecutonGraph中所有的ExecutionVertex,也就是說(shuō)每個(gè)ExecutionVertex節(jié)點(diǎn)對(duì)應(yīng)的Task實(shí)例都需要向CheckpointCoordinator中匯報(bào)Ack消息。

// 主要做前置檢查
   synchronized (lock) {
      preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
   }
   // 創(chuàng)建需要執(zhí)行的Task對(duì)應(yīng)的Execution集合
  Execution[] executions = new Execution[tasksToTrigger.length];
   // 遍歷tasksToTrigger集合,構(gòu)建Execution集合
   for (int i = 0; i < tasksToTrigger.length; i++) {
   //獲取Task對(duì)應(yīng)的Execution集合
      Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
      if (ee == null) {
      // 如果Task對(duì)應(yīng)的Execution集合為空,代表Task沒(méi)有被執(zhí)行,則拋出異常
         LOG.info("Checkpoint triggering task {} of job {} is not being 
            executed at the moment. Aborting checkpoint.", tasksToTrigger[i].
            getTaskNameWithSubtaskIndex(), job);
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      } else if (ee.getState() == ExecutionState.RUNNING) {
         // 如果ExecutionState為RUNNING,則添加到executions集合中
      executions[i] = ee;
      } else {
      // 如果其他ExecutionState不為RUNNING,則拋出異常
         LOG.info("Checkpoint triggering task {} of job {} is not in state {} 
           but {} instead. Aborting checkpoint.",
             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
             job,
             ExecutionState.RUNNING,
             ee.getState());
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      }
   }
   // 組裝用于需要發(fā)送Ack消息的Task集合
   Map<ExecutionAttemptID, ExecutionVertex> ackTasks = 
      new HashMap<>(tasksToWaitFor.length);
   for (ExecutionVertex ev : tasksToWaitFor) {
      Execution ee = ev.getCurrentExecutionAttempt();
      if (ee != null) {
         ackTasks.put(ee.getAttemptId(), ev);
      } else {
         LOG.info("Checkpoint acknowledging task {} of job {} is not being 
            executed at the moment. Aborting checkpoint.", ev.getTaskNameWith
               SubtaskIndex(), job);
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      }
}

?

2. 創(chuàng)建PendingCheckpoint

在執(zhí)行Checkpoint操作之前,需要構(gòu)建PendingCheckpoint對(duì)象,從字面意思上講就是掛起Checkpoint操作。

從開(kāi)始執(zhí)行Checkpoint操作直到Task實(shí)例返回Ack確認(rèn)成功消息,Checkpoint會(huì)一直處于Pending狀態(tài),確保Checkpoint能被成功執(zhí)行。

如代碼邏輯:

  1. Checkpoint有唯一的checkpointID標(biāo)記,根據(jù)高可用模式選擇不同的計(jì)數(shù)器。

如果基于ZooKeeper實(shí)現(xiàn)了高可用集群,會(huì)調(diào)用ZooKeeperCheckpointIDCounter實(shí)現(xiàn)checkpointID計(jì)數(shù);如果是非高可用集群,則會(huì)通過(guò)StandaloneCheckpointIDCounter完成checkpointID計(jì)數(shù)。

  1. 創(chuàng)建checkpointStorageLocation,用于定義Checkpoint過(guò)程中狀態(tài)快照數(shù)據(jù)存放的位置。

checkpointStorageLocation通過(guò)checkpointStorage創(chuàng)建和初始化,不同的checkpointStorage實(shí)現(xiàn)創(chuàng)建的checkpointStorageLocation會(huì)有所不同。

  1. 創(chuàng)建PendingCheckpoint對(duì)象。

包括checkpointID、ackTasks以及checkpointStorageLocation等參數(shù)信息。將創(chuàng)建好的PendingCheckpoint存儲(chǔ)在pendingCheckpoints集合中,并異步執(zhí)行PendingCheckpoint操作。

final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
   //通過(guò)checkpointIdCounter獲取checkpointID
   checkpointID = checkpointIdCounter.getAndIncrement();
      // 獲取checkpointStorageLocation
   checkpointStorageLocation = props.isSavepoint() ?
         checkpointStorage
      .initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
         checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
// 省略部分代碼
// 創(chuàng)建PendingCheckpoint對(duì)象
final PendingCheckpoint checkpoint = new PendingCheckpoint(
   job,
   checkpointID,
   timestamp,
   ackTasks,
   masterHooks.keySet(),
   props,
   checkpointStorageLocation,
   executor);

?

3. Checkpoint的觸發(fā)與執(zhí)行

在CheckpointCoordinator.triggerCheckpoint()方法中,會(huì)在synchronized(lock)模塊內(nèi)定義和執(zhí)行Checkpoint操作的具體邏輯,主要包含如下步驟。

  1. 獲取coordinator對(duì)象鎖,對(duì)TriggeringCheckpoint對(duì)象進(jìn)行預(yù)檢查,主要包括檢查CheckpointCoordinator狀態(tài)和PendingCheckpoint嘗試次數(shù)等。

  2. 將PendingCheckpoint存儲(chǔ)在pendingCheckpoints鍵值對(duì)中,使用定時(shí)器創(chuàng)建cancellerHandle對(duì)象,cancellerHandle用于清理過(guò)期的Checkpoint操作。

通過(guò)checkpoint.setCancellerHandle()方法設(shè)置Checkpoint的CancellerHandle,設(shè)置成功則返回True,如果失敗則返回false,說(shuō)明當(dāng)前Checkpoint已經(jīng)被釋放。

  1. 調(diào)用并執(zhí)行MasterHook??梢酝ㄟ^(guò)實(shí)現(xiàn)MasterHook函數(shù),準(zhǔn)備外部系統(tǒng)環(huán)境或觸發(fā)相應(yīng)的系統(tǒng)操作。

  2. 遍歷執(zhí)行executions集合中的Execution節(jié)點(diǎn),判斷props.isSynchronous()方法是否為T(mén)rue,如果為T(mén)rue則調(diào)用triggerSynchronousSavepoint()方法同步執(zhí)行Checkpoint操作。
    其他情況則調(diào)用triggerCheckpoint()方法異步執(zhí)行Checkpoint操作。

// 獲取coordinator-wide lock
synchronized (lock) {
   // TriggeringCheckpoint檢查
   preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
   LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, 
      job);
      // 將checkpoint存儲(chǔ)在pendingCheckpoints KV集合中
   pendingCheckpoints.put(checkpointID, checkpoint);
      // 調(diào)度canceller線(xiàn)程,清理過(guò)期的Checkpoint對(duì)象
   ScheduledFuture<?> cancellerHandle = timer.schedule(
         canceller,
         checkpointTimeout, TimeUnit.MILLISECONDS);
      // 確定Checkpoint是否已經(jīng)被釋放
   if (!checkpoint.setCancellerHandle(cancellerHandle)) {
      cancellerHandle.cancel(false);
   }
   // 調(diào)用MasterHook方法
   for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
      final MasterState masterState =
         MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
            .get(checkpointTimeout, TimeUnit.MILLISECONDS);
      checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
   }
   Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
}
// 創(chuàng)建CheckpointOptions
final CheckpointOptions checkpointOptions = new CheckpointOptions(
      props.getCheckpointType(),
      checkpointStorageLocation.getLocationReference());
// 分別執(zhí)行executions中的Execution節(jié)點(diǎn)
for (Execution execution: executions) {
   if (props.isSynchronous()) {
      // 如果是同步執(zhí)行,則調(diào)用triggerSynchronousSavepoint()方法
      execution.triggerSynchronousSavepoint(checkpointID, timestamp, 
                                            checkpointOptions,
                                            advanceToEndOfTime);
   } else {
      // 其他情況則調(diào)用triggerCheckpoint()異步方法執(zhí)行
      execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
   }
}
// 返回Checkpoint中的CompletionFuture對(duì)象
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();

以上就完成了在CheckpointCoordinator中觸發(fā)Checkpoint的全部操作,具體的執(zhí)行過(guò)程調(diào)用Execution完成。

?

四. Task節(jié)點(diǎn)的Checkpoint操作

在Execution.triggerCheckpoint()方法中實(shí)際上調(diào)用triggerCheckpointHelper()方法完成Execution對(duì)應(yīng)的Task節(jié)點(diǎn)的Checkpoint操作,并通過(guò)Task實(shí)例觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作,如代碼所示。

1. 觸發(fā)準(zhǔn)備

  1. 獲取當(dāng)前Execution分配的LogicalSlot,如果LogicalSlot不為空,說(shuō)明Execution成功分配到Slot計(jì)算資源,否則說(shuō)明Execution中沒(méi)有資源,Execution對(duì)應(yīng)的Task實(shí)例不會(huì)被執(zhí)行和啟動(dòng)。

  2. 調(diào)用TaskManagerGateway.triggerCheckpoint()的RPC方法,觸發(fā)和執(zhí)行指定Task的Checkpoint操作。

  3. TaskExecutor收到來(lái)自CheckpointCoordinator的Checkpoint觸發(fā)請(qǐng)求后,會(huì)在TaskExecutor實(shí)例中完成對(duì)應(yīng)Task實(shí)例的Checkpoint操作。

private void triggerCheckpointHelper(long checkpointId, 
                                     long timestamp, 
                                     CheckpointOptions checkpointOptions, 
                                     boolean advanceToEndOfEventTime) {
   final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
   if (advanceToEndOfEventTime 
       && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
       throw new IllegalArgumentException("Only synchronous savepoints are 
         allowed to advance the watermark to MAX.");
   }
      // 獲取當(dāng)前Execution分配的LogicalSlot資源
   final LogicalSlot slot = assignedResource;
   // 如果LogicalSlot不為空,說(shuō)明Execution運(yùn)行正常
   if (slot != null) {
      // 通過(guò)slot獲取TaskManagerGateway對(duì)象
      final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            // 調(diào)用triggerCheckpoint()方法
      taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), 
                                           checkpointId, timestamp, 
                                           checkpointOptions,
                                           advanceToEndOfEventTime);
   } else {
      // 否則說(shuō)明Execution中沒(méi)有資源,不再執(zhí)行Execution對(duì)應(yīng)的Task實(shí)例
      LOG.debug("The execution has no slot assigned. This indicates that the 
      execution is no longer running.");
   }
}

?

2. 調(diào)用TaskExecutor執(zhí)行Checkpoint操作

TaskExecutor接收到來(lái)自CheckpointCoordinator的Checkpoint觸發(fā)請(qǐng)求后,立即根據(jù)Execution信息確認(rèn)Task實(shí)例線(xiàn)程,并且調(diào)用Task實(shí)例觸發(fā)和執(zhí)行數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作。如代碼,TaskExecutor.triggerCheckpoint()方法邏輯如下。

  1. 檢查CheckpointType的類(lèi)型,CheckpointType共有三種類(lèi)型,分別為CHECKPOINT、SAVEPOINT和SYNC_SAVEPOINT,且只有在同步Savepoints操作時(shí)才能調(diào)整Watermark為MAX。

  2. 從taskSlotTable中獲取Execution對(duì)應(yīng)的Task實(shí)例,如果Task實(shí)例不為空,則調(diào)用task.triggerCheckpointBarrier()方法執(zhí)行Task實(shí)例中的Checkpoint操作。

  3. 如果Task實(shí)例為空,說(shuō)明Task目前處于異常,無(wú)法執(zhí)行Checkpoint操作。此時(shí)調(diào)用FutureUtils.completedExceptionally()方法,并封裝CheckpointException異常信息,返回給管理節(jié)點(diǎn)的CheckpointCoordinator進(jìn)行處理。

public CompletableFuture<Acknowledge> triggerCheckpoint(
      ExecutionAttemptID executionAttemptID,
      long checkpointId,
      long checkpointTimestamp,
      CheckpointOptions checkpointOptions,
      boolean advanceToEndOfEventTime) {
   log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
      checkpointTimestamp, executionAttemptID);
      //檢查CheckpointType,確保只有同步的savepoint操作才能將Watermark調(diào)整為MAX
   final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
   if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && 
       checkpointType.isSavepoint())) {
      throw new IllegalArgumentException("Only synchronous savepoints are 
         allowed to advance the watermark to MAX.");
   }
      // 從taskSlotTable中獲取當(dāng)前Execution對(duì)應(yīng)的Task
   final Task task = taskSlotTable.getTask(executionAttemptID);
   // 如果task不為空,則調(diào)用triggerCheckpointBarrier()方法
   if (task != null) {
      task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, 
         checkpointOptions, advanceToEndOfEventTime);
   // 返回CompletableFuture對(duì)象
      return CompletableFuture.completedFuture(Acknowledge.get());
   } else {
      final String message = "TaskManager received a checkpoint request for 
         unknown task " + executionAttemptID + '.';
      // 如果task為空,則返回CheckpointException異常
      log.debug(message);
      return FutureUtils.completedExceptionally(
          new CheckpointException(message,
CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
   }
}

?

五. 在StreamTask中執(zhí)行Checkpoint操作

在執(zhí)行Task.triggerCheckpointBarrier()方法時(shí),會(huì)借助AbstractInvokable中提供的triggerCheckpointAsync()方法觸發(fā)并執(zhí)行StreamTask中的Checkpoint操作。

public Future<Boolean> triggerCheckpointAsync(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      boolean advanceToEndOfEventTime) {
   // 異步提交Checkpoint操作
   return mailboxProcessor.getMainMailboxExecutor().submit(    
      () -> triggerCheckpoint(checkpointMetaData, 
                              checkpointOptions, advanceToEndOfEventTime),
      "checkpoint %s with %s",
      checkpointMetaData,
      checkpointOptions);
}

StreamTask.triggerCheckpoint()方法主要邏輯如下。

  1. 調(diào)用StreamTask.performCheckpoint()方法執(zhí)行Checkpoint并返回success信息,用于判斷Checkpoint操作是否成功執(zhí)行。
  2. 如果success信息為False,表明Checkpoint操作沒(méi)有成功執(zhí)行,此時(shí)調(diào)用declineCheckpoint()方法回退。
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, 
                                    checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
   declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;

在StreamTask.performCheckpoint()方法中,主要執(zhí)行了Task實(shí)例的Checkpoint操作,該方法除了會(huì)通過(guò)CheckpointCoordinator觸發(fā)之外,在下游算子通過(guò)CheckpointBarrier對(duì)齊觸發(fā)Checkpoint操作時(shí),也會(huì)調(diào)用該方法執(zhí)行具體Task的Checkpoint操作。

?
下篇我們繼續(xù)看CheckpointBarrier對(duì)齊觸發(fā)Checkpoint的流程,了解StreamTask中performCheckpoint()方法如何執(zhí)行Checkpoint操作,實(shí)現(xiàn)狀態(tài)數(shù)據(jù)快照與持久化操作。

?

參考:《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》–張利兵文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-834237.html

到了這里,關(guān)于【Flink狀態(tài)管理(六)】Checkpoint的觸發(fā)方式(1)通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink分流,合流,狀態(tài),checkpoint和精準(zhǔn)一次筆記

    第8章 分流 1.使用側(cè)輸出流 2.合流 2.1 union :使用 ProcessFunction 處理合流后的數(shù)據(jù) 2.2 Connect : 兩條流的格式可以不一樣, map操作使用CoMapFunction,process 傳入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 進(jìn)行了按鍵分區(qū),那么要傳入的就是 KeyedBroadcastProcessFunction; 如果沒(méi)有按鍵分

    2024年02月12日
    瀏覽(20)
  • el-date-picker實(shí)現(xiàn)通過(guò)其他方式觸發(fā)日期選擇器

    el-date-picker實(shí)現(xiàn)通過(guò)其他方式觸發(fā)日期選擇器

    el-date-picker 目前只能通過(guò)點(diǎn)擊input輸入框觸發(fā)日期選擇器,項(xiàng)目中需要通過(guò)其他方式觸發(fā)日期選擇器同時(shí)把input輸入框去掉,如點(diǎn)擊另一個(gè)按鈕事件 來(lái)觸發(fā)日期選擇器框展開(kāi)。 該模塊由于后端接口數(shù)據(jù)傳輸限制 在前面文章里做了些許改動(dòng)。 需求左右切換 可以快速找到年份,

    2023年04月08日
    瀏覽(25)
  • Flink State 狀態(tài)管理

    狀態(tài)在Flink中叫做State,用來(lái)保存中間計(jì)算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點(diǎn)內(nèi)容: 狀態(tài)數(shù)據(jù)的存儲(chǔ)和訪問(wèn) 在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。 狀態(tài)數(shù)據(jù)的備份和恢復(fù) 作業(yè)失敗是無(wú)法避免的,那么就要考慮如何高效地將狀態(tài)

    2024年01月17日
    瀏覽(23)
  • Flink中的狀態(tài)管理

    Flink中的狀態(tài)管理

    在Flink中,算子任務(wù)可以分為 有狀態(tài) 和 無(wú)狀態(tài) 兩種狀態(tài)。 無(wú)狀態(tài)的算子任務(wù)只需要觀察每個(gè)獨(dú)立事件,根據(jù)當(dāng)前輸入的數(shù)據(jù)直接轉(zhuǎn)換輸出結(jié)果。 例如 Map 、 Filter 、 FlatMap 都是屬于 無(wú)狀態(tài)算子 。? 而 有狀態(tài)的算子任務(wù),就是除了當(dāng)前數(shù)據(jù)外,還需要一些其他的數(shù)據(jù)來(lái)得到

    2024年01月23日
    瀏覽(20)
  • Flink非對(duì)齊checkpoint原理(Flink Unaligned Checkpoint)

    Flink非對(duì)齊checkpoint原理(Flink Unaligned Checkpoint)

    為什么提出Unaligned Checkpoint(UC)? 因?yàn)榉磯簢?yán)重時(shí)會(huì)導(dǎo)致Checkpoint失敗,可能導(dǎo)致如下問(wèn)題 恢復(fù)時(shí)間長(zhǎng)-服務(wù)效率低 非冪等和非事務(wù)會(huì)導(dǎo)致數(shù)據(jù)重復(fù) 持續(xù)反壓導(dǎo)致任務(wù)加入死循環(huán)(可能導(dǎo)致數(shù)據(jù)丟失,例如超過(guò)kafka的過(guò)期時(shí)間無(wú)法重置offset) UC的原理 UC有兩個(gè)階段(UC主要是

    2024年02月14日
    瀏覽(28)
  • 《Flink學(xué)習(xí)筆記》——第八章 狀態(tài)管理

    《Flink學(xué)習(xí)筆記》——第八章 狀態(tài)管理

    8.1 Flink中的狀態(tài) 8.1.1 概述 在Flink中,算子任務(wù)可以分為無(wú)狀態(tài)和有狀態(tài)兩種情況。 **無(wú)狀態(tài)的算子:**每個(gè)事件不依賴(lài)其它數(shù)據(jù),自己處理完就輸出,也不需要依賴(lài)中間結(jié)果。例如:打印操作,每個(gè)數(shù)據(jù)只需要它本身就可以完成。 **有狀態(tài)的算子:**事件需要依賴(lài)中間或者外

    2024年02月11日
    瀏覽(25)
  • 【大數(shù)據(jù)】Flink 架構(gòu)(四):狀態(tài)管理

    【大數(shù)據(jù)】Flink 架構(gòu)(四):狀態(tài)管理

    《 Flink 架構(gòu) 》系列(已完結(jié)),共包含以下 6 篇文章: Flink 架構(gòu)(一):系統(tǒng)架構(gòu) Flink 架構(gòu)(二):數(shù)據(jù)傳輸 Flink 架構(gòu)(三):事件時(shí)間處理 Flink 架構(gòu)(四):狀態(tài)管理 Flink 架構(gòu)(五):檢查點(diǎn) Checkpoint(看完即懂) Flink 架構(gòu)(六):保存點(diǎn) Savepoint ?? 如果您覺(jué)得這篇

    2024年02月19日
    瀏覽(22)
  • Flink狀態(tài)管理與檢查點(diǎn)機(jī)制

    Flink狀態(tài)管理與檢查點(diǎn)機(jī)制

    本專(zhuān)欄案例代碼和數(shù)據(jù)集鏈接:? https://download.csdn.net/download/shangjg03/88477960 相對(duì)于其他流計(jì)算框架,F(xiàn)link?一個(gè)比較重要的特性就是其支持有狀態(tài)計(jì)算。即你可以將中間的計(jì)算結(jié)果進(jìn)行保存,并提供給后續(xù)的計(jì)算使用: 具體而言,F(xiàn)link?又將狀態(tài)?(State)?分為?Keyed?State?與?O

    2024年02月07日
    瀏覽(91)
  • 【狀態(tài)管理|概述】Flink的狀態(tài)管理:為什么需要state、怎么保存state、對(duì)于state過(guò)大怎么處理

    按照數(shù)據(jù)的劃分和擴(kuò)張方式,F(xiàn)link中大致分為2類(lèi): Keyed States:記錄每個(gè)Key對(duì)應(yīng)的狀態(tài)值 因?yàn)橐粋€(gè)任務(wù)的并行度有多少,就會(huì)有多少個(gè)子任務(wù),當(dāng)key的范圍大于并行度時(shí),就會(huì)出現(xiàn)一個(gè)subTask上可能包含多個(gè)Key(),但不同Task上不會(huì)出現(xiàn)相同的Key(解決了shuffle的問(wèn)題?) ? 常

    2024年02月01日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包