背景
本文主要是具體說說Flink中的clean操作的實(shí)現(xiàn)
雜說閑談
在flink中主要是CleanFunction
函數(shù):
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info(String.format("exec clean with instant time %s...", instantTime));
executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
executor.execute(() -> {
try {
this.writeClient.waitForCleaningFinish();
} finally {
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
try {
this.writeClient.startAsyncCleaning();
this.isCleaning = true;
} catch (Throwable throwable) {
// catch the exception to not affect the normal checkpointing
LOG.warn("Error while start async cleaning", throwable);
}
}
}
-
open函數(shù)
-
writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
創(chuàng)建FlinkWriteClient,用于寫hudi數(shù)據(jù) -
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
創(chuàng)建一個(gè)只有一個(gè)線程的線程池,改線程池的主要作用來異步執(zhí)行hudi寫操作 -
executor.execute(() -> writeClient.clean(instantTime)
異步執(zhí)行hudi的清理操作,該clean函數(shù)的主要代碼如下:if (!tableServicesEnabled(config)) { return null; } final Timer.Context timerContext = metrics.getCleanCtx(); CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); HoodieTable table = createTable(config, hadoopConf); if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) { LOG.info("Cleaner started"); // proceed only if multiple clean schedules are enabled or if there are no pending cleans. if (scheduleInline) { scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); table.getMetaClient().reloadActiveTimeline(); } } // Proceeds to execute any requested or inflight clean instances in the timeline HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking); if (timerContext != null && metadata != null) { long durationMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs); } return metadata;
-
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
根據(jù)配置hoodie.cleaner.policy.failed.writes* 默認(rèn)是EAGER,也就是在寫數(shù)據(jù)失敗的時(shí)候,會立即進(jìn)行這次寫失敗的數(shù)據(jù)的清理,在這種情況下,
就不會執(zhí)行rollbackFailedWrites操作,也就是回滾寫失敗文件的操作 -
HoodieTable table = createTable *
創(chuàng)建HoodieFlinkMergeOnReadTable*類型的hudi表,用來做clean等操作 -
scheduleTableServiceInternal
如果hoodie.clean.allow.multiple為true(默認(rèn)為true)或者沒有正在運(yùn)行中clean操作,則會生成Clean計(jì)劃
這里最終調(diào)用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法這里最重要的就是requestClean方法:
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config); Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain(); List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant) int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) .stream() .collect(Collectors.toMap(Pair::getKey, Pair::getValue)) Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))) List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey) .collect(Collectors.toList()) return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), planner.getLastCompletedCommitTimestamp(), config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)
-
planner.getEarliestCommitToRetain();
根據(jù)保留策略,獲取到最早需要保留的commit的HoodieInstant,在這里會兼顧考慮到hoodie.cleaner.commits.retained(默認(rèn)是10)以及hoodie.cleaner.hours.retained默認(rèn)是24小時(shí)以及hoodie.cleaner.policy策略(默認(rèn)是KEEP_LATEST_COMMITS) -
planner.getPartitionPathsToClean(earliestInstant);
根據(jù)保留的最新commit的HoodieInstant,得到要?jiǎng)h除的分區(qū),這里會根據(jù)配置hoodie.cleaner.incremental.mode(默認(rèn)是true)來進(jìn)行增量清理,
這個(gè)時(shí)候就會根據(jù)上一次已經(jīng)clean的信息,只需要?jiǎng)h除差量的分區(qū)數(shù)據(jù)就行 -
cleanOpsWithPartitionMeta = context
根據(jù)上面得到的需要?jiǎng)h除的分區(qū)信息,獲取需要?jiǎng)h除的文件信息,具體的實(shí)現(xiàn)可以參考CleanPlanner.getFilesToCleanKeepingLatestCommits
這里的操作主要是先通過fileSystemView獲取分區(qū)下所有的FileGroup,之后再獲取每個(gè)FileGroup下的所有的FileSlice(這里的FileSlice就有版本的概念,也就是commit的版本),之后再與最新保留的commit的時(shí)間戳進(jìn)行比較得到需要?jiǎng)h除的文件信息 -
new HoodieCleanerPlan
最后組裝成HoodieCleanPlan的計(jì)劃,并且在外層調(diào)用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的狀態(tài)存儲到對應(yīng)的.hoodie目錄下,并建立一個(gè)xxxx.clean.requested的元數(shù)據(jù)文件
-
planner.getEarliestCommitToRetain();
-
table.getMetaClient().reloadActiveTimeline()
重新加載timeline,便于過濾出來剛才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元數(shù)據(jù)文件 -
table.clean(context, cleanInstantTime, skipLocking)
真正執(zhí)行clean的部分,主要是調(diào)用CleanActionExecutor.execute的方法,最終調(diào)用的是*runPendingClean(table, hoodieInstant)*方法:HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant); return runClean(table, cleanInstant, cleanerPlan);
首先是反序列化CleanPlan,然后在進(jìn)行清理,主要是刪除1. 如果沒有滿足的分區(qū),直接刪除該分區(qū),2. 否則刪除該分區(qū)下的滿足條件的文件,最后返回HoodieCleanStat包含刪除的文件信息等。
-
-
-
snapshotState方法文章來源:http://www.zghlxwxcb.cn/news/detail-736703.html
- 如果clean.async.enabled是true(默認(rèn)是true),并且不是正在進(jìn)行clean動(dòng)作,則會進(jìn)行異步清理
this.writeClient.startAsyncCleaning(); 這里最終也是調(diào)用的writeClient.clean方法。 -
this.isCleaning = true;
設(shè)置標(biāo)志位,用來保證clean操作的有序性
- 如果clean.async.enabled是true(默認(rèn)是true),并且不是正在進(jìn)行clean動(dòng)作,則會進(jìn)行異步清理
-
notifyCheckpointComplete方法文章來源地址http://www.zghlxwxcb.cn/news/detail-736703.html
- 如果clean.async.enabled是true(默認(rèn)是true),并且正在進(jìn)行clean動(dòng)作,則等待clean操作完成,
并且設(shè)置清理標(biāo)識位,用來和snapshotState方法進(jìn)行呼應(yīng)以保證clean操作的有序性
- 如果clean.async.enabled是true(默認(rèn)是true),并且正在進(jìn)行clean動(dòng)作,則等待clean操作完成,
到了這里,關(guān)于Apache Hudi初探(五)(與flink的結(jié)合)--Flink 中hudi clean操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!