国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Apache Hudi初探(五)(與flink的結(jié)合)--Flink 中hudi clean操作

這篇具有很好參考價(jià)值的文章主要介紹了Apache Hudi初探(五)(與flink的結(jié)合)--Flink 中hudi clean操作。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

背景

本文主要是具體說說Flink中的clean操作的實(shí)現(xiàn)

雜說閑談

在flink中主要是CleanFunction函數(shù):

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
    this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
    String instantTime = HoodieActiveTimeline.createNewInstantTime();
    LOG.info(String.format("exec clean with instant time %s...", instantTime));
    executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");
  }

  @Override
  public void notifyCheckpointComplete(long l) throws Exception {
    if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
      executor.execute(() -> {
        try {
          this.writeClient.waitForCleaningFinish();
        } finally {
          // ensure to switch the isCleaning flag
          this.isCleaning = false;
        }
      }, "wait for cleaning finish");
    }
  }

  @Override
  public void snapshotState(FunctionSnapshotContext context) throws Exception {
    if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
      try {
        this.writeClient.startAsyncCleaning();
        this.isCleaning = true;
      } catch (Throwable throwable) {
        // catch the exception to not affect the normal checkpointing
        LOG.warn("Error while start async cleaning", throwable);
      }
    }
  }

  • open函數(shù)

    • writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
      創(chuàng)建FlinkWriteClient,用于寫hudi數(shù)據(jù)

    • this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
      創(chuàng)建一個(gè)只有一個(gè)線程的線程池,改線程池的主要作用來異步執(zhí)行hudi寫操作

    • executor.execute(() -> writeClient.clean(instantTime)
      異步執(zhí)行hudi的清理操作,該clean函數(shù)的主要代碼如下:

         if (!tableServicesEnabled(config)) {
           return null;
         }
         final Timer.Context timerContext = metrics.getCleanCtx();
         CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
             HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
      
         HoodieTable table = createTable(config, hadoopConf);
         if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
           LOG.info("Cleaner started");
           // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
           if (scheduleInline) {
             scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
             table.getMetaClient().reloadActiveTimeline();
           }
         }
      
         // Proceeds to execute any requested or inflight clean instances in the timeline
         HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
         if (timerContext != null && metadata != null) {
           long durationMs = metrics.getDurationInMs(timerContext.stop());
           metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
           LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
               + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
               + " cleanerElapsedMs" + durationMs);
         }
         return metadata;
      
      • CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
        根據(jù)配置
        hoodie.cleaner.policy.failed.writes* 默認(rèn)是EAGER,也就是在寫數(shù)據(jù)失敗的時(shí)候,會立即進(jìn)行這次寫失敗的數(shù)據(jù)的清理,在這種情況下,
        就不會執(zhí)行rollbackFailedWrites操作,也就是回滾寫失敗文件的操作

      • HoodieTable table = createTable *
        創(chuàng)建
        HoodieFlinkMergeOnReadTable*類型的hudi表,用來做clean等操作

      • scheduleTableServiceInternal
        如果hoodie.clean.allow.multiple為true(默認(rèn)為true)或者沒有正在運(yùn)行中clean操作,則會生成Clean計(jì)劃
        這里最終調(diào)用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法

        這里最重要的就是requestClean方法:

        CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
        Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
        List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant)    
        int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
        Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
            .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
            .stream()
            .collect(Collectors.toMap(Pair::getKey, Pair::getValue))    
        Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
            .collect(Collectors.toMap(Map.Entry::getKey,
                e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))    
        List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
            .collect(Collectors.toList())    
        return new HoodieCleanerPlan(earliestInstant
            .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
            planner.getLastCompletedCommitTimestamp(),
            config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
            CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)    
        
        • planner.getEarliestCommitToRetain();
          根據(jù)保留策略,獲取到最早需要保留的commit的HoodieInstant,在這里會兼顧考慮到hoodie.cleaner.commits.retained(默認(rèn)是10)以及hoodie.cleaner.hours.retained默認(rèn)是24小時(shí)以及hoodie.cleaner.policy策略(默認(rèn)是KEEP_LATEST_COMMITS)
        • planner.getPartitionPathsToClean(earliestInstant);
          根據(jù)保留的最新commit的HoodieInstant,得到要?jiǎng)h除的分區(qū),這里會根據(jù)配置hoodie.cleaner.incremental.mode(默認(rèn)是true)來進(jìn)行增量清理,
          這個(gè)時(shí)候就會根據(jù)上一次已經(jīng)clean的信息,只需要?jiǎng)h除差量的分區(qū)數(shù)據(jù)就行
        • cleanOpsWithPartitionMeta = context
          根據(jù)上面得到的需要?jiǎng)h除的分區(qū)信息,獲取需要?jiǎng)h除的文件信息,具體的實(shí)現(xiàn)可以參考CleanPlanner.getFilesToCleanKeepingLatestCommits
          這里的操作主要是先通過fileSystemView獲取分區(qū)下所有的FileGroup,之后再獲取每個(gè)FileGroup下的所有的FileSlice(這里的FileSlice就有版本的概念,也就是commit的版本),之后再與最新保留的commit的時(shí)間戳進(jìn)行比較得到需要?jiǎng)h除的文件信息
        • new HoodieCleanerPlan
          最后組裝成HoodieCleanPlan的計(jì)劃,并且在外層調(diào)用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的狀態(tài)存儲到對應(yīng)的.hoodie目錄下,并建立一個(gè)xxxx.clean.requested的元數(shù)據(jù)文件
      • table.getMetaClient().reloadActiveTimeline()
        重新加載timeline,便于過濾出來剛才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元數(shù)據(jù)文件

      • table.clean(context, cleanInstantTime, skipLocking)
        真正執(zhí)行clean的部分,主要是調(diào)用CleanActionExecutor.execute的方法,最終調(diào)用的是*runPendingClean(table, hoodieInstant)*方法:

         HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
         return runClean(table, cleanInstant, cleanerPlan);
        

        首先是反序列化CleanPlan,然后在進(jìn)行清理,主要是刪除1. 如果沒有滿足的分區(qū),直接刪除該分區(qū),2. 否則刪除該分區(qū)下的滿足條件的文件,最后返回HoodieCleanStat包含刪除的文件信息等。

  • snapshotState方法

    • 如果clean.async.enabled是true(默認(rèn)是true),并且不是正在進(jìn)行clean動(dòng)作,則會進(jìn)行異步清理
      this.writeClient.startAsyncCleaning(); 這里最終也是調(diào)用的writeClient.clean方法。
    • this.isCleaning = true;
      設(shè)置標(biāo)志位,用來保證clean操作的有序性
  • notifyCheckpointComplete方法文章來源地址http://www.zghlxwxcb.cn/news/detail-736703.html

    • 如果clean.async.enabled是true(默認(rèn)是true),并且正在進(jìn)行clean動(dòng)作,則等待clean操作完成,
      并且設(shè)置清理標(biāo)識位,用來和snapshotState方法進(jìn)行呼應(yīng)以保證clean操作的有序性

到了這里,關(guān)于Apache Hudi初探(五)(與flink的結(jié)合)--Flink 中hudi clean操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Apache Hudi初探(九)(與spark的結(jié)合)--非bulk_insert模式

    之前討論的都是’hoodie.datasource.write.operation’:\\\'bulk_insert’的前提下,在這種模式下,是沒有json文件的已形成如下的文件: 因?yàn)槭?bulk insert 操作,所以沒有去重的需要,所以直接采用spark原生的方式, 以下我們討論非spark原生的方式, 繼續(xù)Apache Hudi初探(八)(與spark的結(jié)合)–非

    2024年02月08日
    瀏覽(16)
  • 實(shí)時(shí)數(shù)據(jù)湖 Flink Hudi 實(shí)踐探索

    實(shí)時(shí)數(shù)據(jù)湖 Flink Hudi 實(shí)踐探索

    導(dǎo)讀: 首先做個(gè)自我介紹,我目前在阿里云云計(jì)算平臺,從事研究 Flink 和 Hudi 結(jié)合方向的相關(guān)工作。 目前,F(xiàn)link + Hudi 的方案推廣大概已經(jīng)有了一年半的時(shí)間,在國內(nèi)流行度也已比較高,主流的公司也會嘗試去迭代他們的數(shù)倉方案。所以,今天我介紹的主題是 Flink 和 Hudi 在

    2024年01月16日
    瀏覽(22)
  • 【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】

    【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】

    當(dāng)前表默認(rèn)是快照讀取,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。 1.with參數(shù) 名稱 Required 默認(rèn)值 說明 read.streaming.enabled false false 設(shè)置 true 開啟流讀模式

    2024年02月14日
    瀏覽(20)
  • Flink Catalog 解讀與同步 Hudi 表元數(shù)據(jù)的最佳實(shí)踐

    Flink Catalog 解讀與同步 Hudi 表元數(shù)據(jù)的最佳實(shí)踐

    博主歷時(shí)三年精心創(chuàng)作的《大數(shù)據(jù)平臺架構(gòu)與原型實(shí)現(xiàn):數(shù)據(jù)中臺建設(shè)實(shí)戰(zhàn)》一書現(xiàn)已由知名IT圖書品牌電子工業(yè)出版社博文視點(diǎn)出版發(fā)行,點(diǎn)擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個(gè)工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側(cè)

    2024年02月22日
    瀏覽(23)
  • Hudi(16):Hudi集成Flink之讀取方式

    目錄 0. 相關(guān)文章鏈接 1.?流讀(Streaming Query) 2.?增量讀?。↖ncremental Query) 3.?限流 ?Hudi文章匯總? ????????當(dāng)前表默認(rèn)是快照讀取,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù)read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支

    2024年02月06日
    瀏覽(20)
  • Hudi(17):Hudi集成Flink之寫入方式

    Hudi(17):Hudi集成Flink之寫入方式

    目錄 0. 相關(guān)文章鏈接 1.?CDC 數(shù)據(jù)同步 1.1.?準(zhǔn)備MySQL表 1.2.?flink讀取mysql binlog并寫入kafka 1.3.?flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖 1.4.?使用datafaker插入數(shù)據(jù) 1.5.?統(tǒng)計(jì)數(shù)據(jù)入Hudi情況 1.6.?實(shí)時(shí)查看數(shù)據(jù)入湖情況 2.?離線批量導(dǎo)入 2.1. 原理 2.2.?WITH 參數(shù) 2.3.?案例 3.?全量接增量 3.1.?

    2024年02月05日
    瀏覽(16)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目錄 0. 相關(guān)文章鏈接 1. Bucket索引(從 0.11 開始支持) 1.1.?WITH參數(shù) 1.2.?和 state 索引的對比 2.?Hudi Catalog(從 0.12.0 開始支持) 2.1. 概述 2.2.?WITH 參數(shù) 2.3.?使用dfs方式 ?Hudi文章匯總? ????????默認(rèn)的 flink 流式寫入使用 state 存儲索引信息:primary key 到 fileId 的映射關(guān)系。當(dāng)

    2024年02月05日
    瀏覽(24)
  • 大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

    大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

    Hudi(Hadoop Upserts Deletes and Incrementals) ,簡稱 Hudi ,是一個(gè) 流式數(shù)據(jù)湖平臺 ,關(guān)于Hudi的更多介紹可以參考我以下幾篇文章: 大數(shù)據(jù)Hadoop之——新一代流式數(shù)據(jù)湖平臺 Apache Hudi 大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(Spark,F(xiàn)link與Hudi整合) 這里主要講解Hive、Trino、Starr

    2023年04月20日
    瀏覽(21)
  • Hudi集成Flink

    Hudi集成Flink

    安裝Maven 1)上傳apache-maven-3.6.3-bin.tar.gz到/opt/software目錄,并解壓更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv ? apache -maven-3.6. 3 ?maven 2)添加環(huán)境變量到/etc/profile中 sudo ?vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)測試安裝結(jié)果 sourc

    2023年04月13日
    瀏覽(23)
  • Hudi(四)集成Flink(2)

    Hudi(四)集成Flink(2)

    ????????當(dāng)前表 默認(rèn)是快照讀取 ,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。 1、WITH參數(shù) 名稱 Required 默認(rèn)值 說明 read.streaming.enabled false false 設(shè)置

    2024年02月07日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包