Checkpoint的觸發(fā)方式有兩種
- 一種是數(shù)據(jù)源節(jié)點(diǎn)中的Checkpoint操作觸發(fā),通過(guò)CheckpointCoordinator組件進(jìn)行協(xié)調(diào)和控制。 CheckpointCoordinator通過(guò)注冊(cè)定時(shí)器的方式按照
配置的時(shí)間間隔觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作
。數(shù)據(jù)源節(jié)點(diǎn)會(huì)向下游算子發(fā)出Checkpoint Barrier事件,供下游節(jié)點(diǎn)使用。- 另一種是下游算子節(jié)點(diǎn)
根據(jù)上游發(fā)送的Checkpoint Barrier事件控制算子中Checkpoint操作的觸發(fā)時(shí)機(jī)
,即只有接收到所有上游Barrier事件后,才會(huì)觸發(fā)本節(jié)點(diǎn)的Checkpoint操作。
本文先介紹通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作
CheckpointCoordinator在整個(gè)作業(yè)中扮演了Checkpoint協(xié)調(diào)者
的角色,負(fù)責(zé)在數(shù)據(jù)源節(jié)點(diǎn)觸發(fā)Checkpoint以及整個(gè)作業(yè)的Checkpoint管理,并且CheckpointCoordinator組件會(huì)接收TaskMananger在Checkpoint執(zhí)行完成后返回的Ack消息。
?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-834237.html
一. 啟動(dòng)CheckpointCoordinator
當(dāng)作業(yè)的JobStatus轉(zhuǎn)換為Running時(shí),通知CheckpointCoordinatorDeActivator監(jiān)聽(tīng)器啟動(dòng)CheckpointCoordinator服務(wù)。
如代碼CheckpointCoordinatorDeActivator.jobStatusChanges()方法主要包含如下邏輯。
> 1. 當(dāng)`newJobStatus == JobStatus.RUNNING`時(shí),立即調(diào)用
> coordinator.startCheckpointScheduler()方法啟動(dòng)整個(gè)Job的調(diào)度器
> CheckpointCoordinator,此時(shí)Checkpoint的觸發(fā)依靠CheckpointCoordinator進(jìn)行協(xié)調(diào)。
>
> 2. 當(dāng)JobStatus為其他類(lèi)型狀態(tài)時(shí),調(diào)用coordinator.stopCheckpointScheduler()方法,
> 停止當(dāng)前Job中的Checkpoint操作。
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
private final CheckpointCoordinator coordinator;
public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
this.coordinator = checkNotNull(coordinator);
}
@Override
public void jobStatusChanges(JobID jobId,JobStatus newJobStatus, long timestamp,
Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// 啟動(dòng)Checkpoint調(diào)度程序
coordinator.startCheckpointScheduler();
} else {
// 直接停止CheckpointScheduler
coordinator.stopCheckpointScheduler();
}
}
}
?
二. 開(kāi)啟CheckpointScheduler線(xiàn)程
接下來(lái)在CheckpointCoordinator.startCheckpointScheduler()方法中調(diào)用scheduleTriggerWithDelay()方法進(jìn)行后續(xù)操作,向創(chuàng)建好的checkpointCoordinatorTimer線(xiàn)程池添加定時(shí)調(diào)度執(zhí)行的Runnable線(xiàn)程。
如代碼所示:
在CheckpointCoordinator.scheduleTriggerWithDelay()方法中指定baseInterval參數(shù),設(shè)定執(zhí)行Checkpoint操作的時(shí)間間隔,通過(guò)定時(shí)器周期性地觸發(fā)ScheduledTrigger線(xiàn)程,Checkpoint的具體操作在ScheduledTrigger線(xiàn)程中實(shí)現(xiàn)。
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
?
三. 觸發(fā)Checkpoint
如代碼,ScheduledTrigger也是CheckpointCoordinator的內(nèi)部類(lèi),實(shí)現(xiàn)了Runnable接口。在ScheduledTrigger.run()方法中調(diào)用了CheckpointCoordinator.triggerCheckpoint()方法觸發(fā)和執(zhí)行Checkpoint操作。
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
// 調(diào)用triggerCheckpoint()方法觸發(fā)Checkpoint操作
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
CheckpointCoordinator.triggerCheckpoint()方法包含的執(zhí)行邏輯非常多,這里重點(diǎn)介紹其中的主要邏輯。根據(jù)CheckpointCoordinator觸發(fā)Checkpoint操作的過(guò)程分為以下幾個(gè)部分。
1. Checkpoint執(zhí)行前的工作
- 首先檢查Checkpoint的執(zhí)行環(huán)境和參數(shù),滿(mǎn)足條件后觸發(fā)執(zhí)行Checkpoint操作。Checkpoint執(zhí)行過(guò)程分為異步和同步兩種:
調(diào)用preCheckBeforeTriggeringCheckpoint()方法進(jìn)行一些前置檢查,主要包括檢查CheckpointCoordinator當(dāng)前的狀態(tài)是否為shutdown、Checkpoint嘗試次數(shù)是否超過(guò)配置的最大值。
- 構(gòu)建執(zhí)行和觸發(fā)Checkpoint操作對(duì)應(yīng)的Task節(jié)點(diǎn)實(shí)例的Execution集合,其中tasksToTrigger數(shù)組中存儲(chǔ)了觸發(fā)Checkpoint操作的ExecutionVertex元素,實(shí)際上就是所有的數(shù)據(jù)源節(jié)點(diǎn)。
CheckpointCoordinator僅會(huì)觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作,其他節(jié)點(diǎn)則是通過(guò)Barrier對(duì)齊的方式觸發(fā)的。
- 構(gòu)建需要發(fā)送Ack消息的ExecutionVertex集合,主要是從tasksToWaitFor集合中轉(zhuǎn)換而來(lái)。
tasksToWaitFor中存儲(chǔ)了ExecutonGraph中所有的ExecutionVertex,也就是說(shuō)
每個(gè)ExecutionVertex節(jié)點(diǎn)對(duì)應(yīng)的Task實(shí)例都需要向CheckpointCoordinator中匯報(bào)Ack消息
。
// 主要做前置檢查
synchronized (lock) {
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
// 創(chuàng)建需要執(zhí)行的Task對(duì)應(yīng)的Execution集合
Execution[] executions = new Execution[tasksToTrigger.length];
// 遍歷tasksToTrigger集合,構(gòu)建Execution集合
for (int i = 0; i < tasksToTrigger.length; i++) {
//獲取Task對(duì)應(yīng)的Execution集合
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
// 如果Task對(duì)應(yīng)的Execution集合為空,代表Task沒(méi)有被執(zhí)行,則拋出異常
LOG.info("Checkpoint triggering task {} of job {} is not being
executed at the moment. Aborting checkpoint.", tasksToTrigger[i].
getTaskNameWithSubtaskIndex(), job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
// 如果ExecutionState為RUNNING,則添加到executions集合中
executions[i] = ee;
} else {
// 如果其他ExecutionState不為RUNNING,則拋出異常
LOG.info("Checkpoint triggering task {} of job {} is not in state {}
but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// 組裝用于需要發(fā)送Ack消息的Task集合
Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} of job {} is not being
executed at the moment. Aborting checkpoint.", ev.getTaskNameWith
SubtaskIndex(), job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
?
2. 創(chuàng)建PendingCheckpoint
在執(zhí)行Checkpoint操作之前,需要構(gòu)建PendingCheckpoint對(duì)象,從字面意思上講就是掛起Checkpoint操作。
從開(kāi)始執(zhí)行Checkpoint操作直到Task實(shí)例返回Ack確認(rèn)成功消息,Checkpoint會(huì)一直處于Pending狀態(tài),確保Checkpoint能被成功執(zhí)行。
如代碼邏輯:
- Checkpoint有唯一的checkpointID標(biāo)記,根據(jù)高可用模式選擇不同的計(jì)數(shù)器。
如果基于ZooKeeper實(shí)現(xiàn)了高可用集群,會(huì)調(diào)用ZooKeeperCheckpointIDCounter實(shí)現(xiàn)checkpointID計(jì)數(shù);如果是非高可用集群,則會(huì)通過(guò)StandaloneCheckpointIDCounter完成checkpointID計(jì)數(shù)。
- 創(chuàng)建checkpointStorageLocation,用于定義Checkpoint過(guò)程中狀態(tài)快照數(shù)據(jù)存放的位置。
checkpointStorageLocation通過(guò)checkpointStorage創(chuàng)建和初始化,不同的checkpointStorage實(shí)現(xiàn)創(chuàng)建的checkpointStorageLocation會(huì)有所不同。
- 創(chuàng)建PendingCheckpoint對(duì)象。
包括checkpointID、ackTasks以及checkpointStorageLocation等參數(shù)信息。將創(chuàng)建好的PendingCheckpoint存儲(chǔ)在pendingCheckpoints集合中,并異步執(zhí)行PendingCheckpoint操作。
final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
//通過(guò)checkpointIdCounter獲取checkpointID
checkpointID = checkpointIdCounter.getAndIncrement();
// 獲取checkpointStorageLocation
checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage
.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
// 省略部分代碼
// 創(chuàng)建PendingCheckpoint對(duì)象
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor);
?
3. Checkpoint的觸發(fā)與執(zhí)行
在CheckpointCoordinator.triggerCheckpoint()方法中,會(huì)在synchronized(lock)模塊內(nèi)定義和執(zhí)行Checkpoint操作的具體邏輯,主要包含如下步驟。
獲取coordinator對(duì)象鎖,對(duì)TriggeringCheckpoint對(duì)象進(jìn)行預(yù)檢查,主要包括檢查CheckpointCoordinator狀態(tài)和PendingCheckpoint嘗試次數(shù)等。
將PendingCheckpoint存儲(chǔ)在pendingCheckpoints鍵值對(duì)中,使用定時(shí)器創(chuàng)建cancellerHandle對(duì)象,cancellerHandle用于清理過(guò)期的Checkpoint操作。
通過(guò)checkpoint.setCancellerHandle()方法設(shè)置Checkpoint的CancellerHandle,設(shè)置成功則返回True,如果失敗則返回false,說(shuō)明當(dāng)前Checkpoint已經(jīng)被釋放。
調(diào)用并執(zhí)行MasterHook??梢酝ㄟ^(guò)實(shí)現(xiàn)MasterHook函數(shù),準(zhǔn)備外部系統(tǒng)環(huán)境或觸發(fā)相應(yīng)的系統(tǒng)操作。
遍歷執(zhí)行executions集合中的Execution節(jié)點(diǎn),判斷props.isSynchronous()方法是否為T(mén)rue,如果為T(mén)rue則調(diào)用triggerSynchronousSavepoint()方法同步執(zhí)行Checkpoint操作。
其他情況則調(diào)用triggerCheckpoint()方法異步執(zhí)行Checkpoint操作。
// 獲取coordinator-wide lock
synchronized (lock) {
// TriggeringCheckpoint檢查
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp,
job);
// 將checkpoint存儲(chǔ)在pendingCheckpoints KV集合中
pendingCheckpoints.put(checkpointID, checkpoint);
// 調(diào)度canceller線(xiàn)程,清理過(guò)期的Checkpoint對(duì)象
ScheduledFuture<?> cancellerHandle = timer.schedule(
canceller,
checkpointTimeout, TimeUnit.MILLISECONDS);
// 確定Checkpoint是否已經(jīng)被釋放
if (!checkpoint.setCancellerHandle(cancellerHandle)) {
cancellerHandle.cancel(false);
}
// 調(diào)用MasterHook方法
for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
final MasterState masterState =
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
.get(checkpointTimeout, TimeUnit.MILLISECONDS);
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
}
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
}
// 創(chuàng)建CheckpointOptions
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());
// 分別執(zhí)行executions中的Execution節(jié)點(diǎn)
for (Execution execution: executions) {
if (props.isSynchronous()) {
// 如果是同步執(zhí)行,則調(diào)用triggerSynchronousSavepoint()方法
execution.triggerSynchronousSavepoint(checkpointID, timestamp,
checkpointOptions,
advanceToEndOfTime);
} else {
// 其他情況則調(diào)用triggerCheckpoint()異步方法執(zhí)行
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
// 返回Checkpoint中的CompletionFuture對(duì)象
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();
以上就完成了在CheckpointCoordinator中觸發(fā)Checkpoint的全部操作,具體的執(zhí)行過(guò)程調(diào)用Execution完成。
?
四. Task節(jié)點(diǎn)的Checkpoint操作
在Execution.triggerCheckpoint()方法中實(shí)際上調(diào)用triggerCheckpointHelper()方法完成Execution對(duì)應(yīng)的Task節(jié)點(diǎn)的Checkpoint操作,并通過(guò)Task實(shí)例觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作,如代碼所示。
1. 觸發(fā)準(zhǔn)備
獲取當(dāng)前Execution分配的LogicalSlot,如果LogicalSlot不為空,說(shuō)明Execution成功分配到Slot計(jì)算資源,否則說(shuō)明Execution中沒(méi)有資源,Execution對(duì)應(yīng)的Task實(shí)例不會(huì)被執(zhí)行和啟動(dòng)。
調(diào)用TaskManagerGateway.triggerCheckpoint()的RPC方法,觸發(fā)和執(zhí)行指定Task的Checkpoint操作。
TaskExecutor收到來(lái)自CheckpointCoordinator的Checkpoint觸發(fā)請(qǐng)求后,會(huì)在TaskExecutor實(shí)例中完成對(duì)應(yīng)Task實(shí)例的Checkpoint操作。
private void triggerCheckpointHelper(long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are
allowed to advance the watermark to MAX.");
}
// 獲取當(dāng)前Execution分配的LogicalSlot資源
final LogicalSlot slot = assignedResource;
// 如果LogicalSlot不為空,說(shuō)明Execution運(yùn)行正常
if (slot != null) {
// 通過(guò)slot獲取TaskManagerGateway對(duì)象
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
// 調(diào)用triggerCheckpoint()方法
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(),
checkpointId, timestamp,
checkpointOptions,
advanceToEndOfEventTime);
} else {
// 否則說(shuō)明Execution中沒(méi)有資源,不再執(zhí)行Execution對(duì)應(yīng)的Task實(shí)例
LOG.debug("The execution has no slot assigned. This indicates that the
execution is no longer running.");
}
}
?
2. 調(diào)用TaskExecutor執(zhí)行Checkpoint操作
TaskExecutor接收到來(lái)自CheckpointCoordinator的Checkpoint觸發(fā)請(qǐng)求后,立即根據(jù)Execution信息確認(rèn)Task實(shí)例線(xiàn)程,并且調(diào)用Task實(shí)例觸發(fā)和執(zhí)行數(shù)據(jù)源節(jié)點(diǎn)的Checkpoint操作。如代碼,TaskExecutor.triggerCheckpoint()方法邏輯如下。
檢查CheckpointType的類(lèi)型,CheckpointType共有三種類(lèi)型,分別為CHECKPOINT、SAVEPOINT和SYNC_SAVEPOINT,且只有在同步Savepoints操作時(shí)才能調(diào)整Watermark為MAX。
從taskSlotTable中獲取Execution對(duì)應(yīng)的Task實(shí)例,如果Task實(shí)例不為空,則調(diào)用task.triggerCheckpointBarrier()方法執(zhí)行Task實(shí)例中的Checkpoint操作。
如果Task實(shí)例為空,說(shuō)明Task目前處于異常,無(wú)法執(zhí)行Checkpoint操作。此時(shí)調(diào)用FutureUtils.completedExceptionally()方法,并封裝CheckpointException異常信息,返回給管理節(jié)點(diǎn)的CheckpointCoordinator進(jìn)行處理。
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId,
checkpointTimestamp, executionAttemptID);
//檢查CheckpointType,確保只有同步的savepoint操作才能將Watermark調(diào)整為MAX
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() &&
checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are
allowed to advance the watermark to MAX.");
}
// 從taskSlotTable中獲取當(dāng)前Execution對(duì)應(yīng)的Task
final Task task = taskSlotTable.getTask(executionAttemptID);
// 如果task不為空,則調(diào)用triggerCheckpointBarrier()方法
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp,
checkpointOptions, advanceToEndOfEventTime);
// 返回CompletableFuture對(duì)象
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for
unknown task " + executionAttemptID + '.';
// 如果task為空,則返回CheckpointException異常
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(message,
CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
?
五. 在StreamTask中執(zhí)行Checkpoint操作
在執(zhí)行Task.triggerCheckpointBarrier()方法時(shí),會(huì)借助AbstractInvokable中提供的triggerCheckpointAsync()方法觸發(fā)并執(zhí)行StreamTask中的Checkpoint操作。
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
// 異步提交Checkpoint操作
return mailboxProcessor.getMainMailboxExecutor().submit(
() -> triggerCheckpoint(checkpointMetaData,
checkpointOptions, advanceToEndOfEventTime),
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
}
StreamTask.triggerCheckpoint()方法主要邏輯如下。
- 調(diào)用StreamTask.performCheckpoint()方法執(zhí)行Checkpoint并返回success信息,用于判斷Checkpoint操作是否成功執(zhí)行。
- 如果success信息為False,表明Checkpoint操作沒(méi)有成功執(zhí)行,此時(shí)調(diào)用declineCheckpoint()方法回退。
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions,
checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
在StreamTask.performCheckpoint()方法中,主要執(zhí)行了Task實(shí)例的Checkpoint操作,該方法除了會(huì)通過(guò)CheckpointCoordinator觸發(fā)之外,在下游算子通過(guò)CheckpointBarrier對(duì)齊觸發(fā)Checkpoint操作時(shí),也會(huì)調(diào)用該方法執(zhí)行具體Task的Checkpoint操作。
?
下篇我們繼續(xù)看CheckpointBarrier對(duì)齊觸發(fā)Checkpoint的流程,了解StreamTask中performCheckpoint()方法如何執(zhí)行Checkpoint操作,實(shí)現(xiàn)狀態(tài)數(shù)據(jù)快照與持久化操作。
?
參考:《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》–張利兵文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-834237.html
到了這里,關(guān)于【Flink狀態(tài)管理(六)】Checkpoint的觸發(fā)方式(1)通過(guò)CheckpointCoordinator觸發(fā)算子的Checkpoint操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!