国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink異步IO初步了解

這篇具有很好參考價(jià)值的文章主要介紹了Flink異步IO初步了解。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

? ? ? ? 之前使用Flink查詢(xún)Redis數(shù)據(jù)的過(guò)程中,由于對(duì)數(shù)據(jù)一致性的要求并不是很高,當(dāng)時(shí)是用MapFunction +? State 的方案。先緩存一大堆數(shù)據(jù)到State中,達(dá)到一定數(shù)量之后,將批量Key提交到Redis中進(jìn)行查詢(xún)。

????????由于Redis性能極高,所以并沒(méi)有出現(xiàn)什么問(wèn)題,后來(lái)了解到了Flink異步IO機(jī)制,感覺(jué)使用異步IO機(jī)制實(shí)現(xiàn)會(huì)更加優(yōu)雅一點(diǎn)。本文就是記錄下自己對(duì)Flink異步IO的一個(gè)初步認(rèn)識(shí)。

異步算子主要應(yīng)用于和外部系統(tǒng)交互,提高吞吐量,減少等待延遲。用戶(hù)只需關(guān)注業(yè)務(wù)邏輯即可,消息順序性和一致性由Flink框架來(lái)處理:

圖來(lái)自官網(wǎng):

? ? ? ?Flink異步IO初步了解,flink,大數(shù)據(jù)

異步IO支持輸出無(wú)序和有序,也支持watermark以及ExactlyOnce語(yǔ)義:

異步IO的核心代碼都在AsyncWaitOperator里面:

switch (outputMode) {
   case ORDERED:
      queue = new OrderedStreamElementQueue<>(capacity);
      break;
   case UNORDERED:
      queue = new UnorderedStreamElementQueue<>(capacity);
      break;
   default:
      throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}

????????orderedWait(有序):消息的發(fā)送順序與接收到的順序完全相同(包括 watermark )。

????????unorderWait(無(wú)序):在ProcessingTime中是完全無(wú)序的,即哪個(gè)先完成先發(fā)送(最低延遲和消耗);在EventTime中,以watermark為邊界,介于兩個(gè)watermark之間的消息是亂序的,但是多個(gè)watermark之間的消息是有序的。

????????異步IO處理內(nèi)部會(huì)執(zhí)行 userFunction.asyncInvoke(element.getValue(), resultHandler)?調(diào)用用戶(hù)自己編寫(xiě)的方法來(lái)處理數(shù)據(jù)。userFunction就是用戶(hù)自己編寫(xiě)的自定義方法。resultHandler就是用戶(hù)在完成異步調(diào)用自己,如何把結(jié)果傳入到異步IO算子中:

(ps: userFunction是基于CompletableFuture來(lái)完成開(kāi)發(fā)的。CompletableFuture是 Java 8 中引入的一個(gè)類(lèi),它實(shí)現(xiàn)了CompletionStage接口,提供了一組豐富的方法來(lái)處理異步操作和多個(gè)任務(wù)的結(jié)果。它支持鏈?zhǔn)讲僮?可以方便地處理任務(wù)的依賴(lài)關(guān)系和結(jié)果轉(zhuǎn)換。相比于傳統(tǒng)的Future接口,CompletableFuture更加靈活和強(qiáng)大。具體demo可以看官網(wǎng)示例 或者?看下面參考中的鏈接)

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   // add element first to the queue
   final ResultFuture<OUT> entry = addToWorkQueue(element);

?  // 這里的ResultHandler就是對(duì)數(shù)據(jù)和ResultFuture的一個(gè)封裝
   final ResultHandler resultHandler = new ResultHandler(element, entry);

   // register a timeout for the entry if timeout is configured
   if (timeout > 0L) {
      final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

      final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer(
         timeoutTimestamp,
         timestamp -> userFunction.timeout(element.getValue(), resultHandler));

      resultHandler.setTimeoutTimer(timeoutTimer);
   }

?  // 調(diào)用用戶(hù)編寫(xiě)的方法。 傳入的resultHandler就是讓用戶(hù)在異步完成的時(shí)候傳值用的
   userFunction.asyncInvoke(element.getValue(), resultHandler);
}
@Override
// resultHandler類(lèi)內(nèi)部的complete方法就是在用戶(hù)自定義函數(shù)中傳結(jié)果用的,最終執(zhí)行結(jié)果會(huì)調(diào)用processInMainBox(results)方法,將結(jié)果發(fā)送給下游算子
public void complete(Collection<OUT> results) {
   Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");

   // already completed (exceptionally or with previous complete call from ill-written AsyncFunction), so
   // ignore additional result
   if (!completed.compareAndSet(false, true)) {
      return;
   }

   processInMailbox(results);
}

orderedWait 實(shí)現(xiàn):

? ? ? ? 有序的話很簡(jiǎn)單,就是創(chuàng)建一個(gè)隊(duì)列,然后從隊(duì)首取元素即可

public OrderedStreamElementQueue(int capacity) {
   Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

   this.capacity = capacity;
?  // 所有的元素都放在這么一個(gè)隊(duì)列里面
   this.queue = new ArrayDeque<>(capacity);
}

@Override
public boolean hasCompletedElements() {
?  // 然后FIFO就好了
   return !queue.isEmpty() && queue.peek().isDone();
}

unorderWait 實(shí)現(xiàn):

????????無(wú)序的話實(shí)現(xiàn)就會(huì)稍微復(fù)雜點(diǎn)。queue里面放的不是一條條數(shù)據(jù),而是一個(gè)個(gè)segment。數(shù)據(jù)存放在segment中,中間使用watermark分隔(每條watermark都會(huì)有自己?jiǎn)为?dú)的segment)。

public UnorderedStreamElementQueue(int capacity) {
   Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

   this.capacity = capacity;
   // most likely scenario are 4 segments <elements, watermark, elements, watermark>
   this.segments = new ArrayDeque<>(4);
   this.numberOfEntries = 0;
}
// 每個(gè)segment內(nèi)部會(huì)有兩個(gè)隊(duì)列:未完成 和 已完成。未完成的數(shù)據(jù)在完成之后會(huì)放置到已完成隊(duì)列里面,然后發(fā)送到下游算子
static class Segment<OUT> {
   /** Unfinished input elements. */
   private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

   /** Undrained finished elements. */
   private final Queue<StreamElementQueueEntry<OUT>> completedElements;

   Segment(int initialCapacity) {
      incompleteElements = new HashSet<>(initialCapacity);
      completedElements = new ArrayDeque<>(initialCapacity);
   }

   /**
    * Signals that an entry finished computation.
    */
   void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
      // adding only to completed queue if not completed before
      // there may be a real result coming after a timeout result, which is updated in the queue entry but
      // the entry is not re-added to the complete queue
      if (incompleteElements.remove(elementQueueEntry)) {
         completedElements.add(elementQueueEntry);
      }
   }

一致性實(shí)現(xiàn):

? ? ? ? 一致性實(shí)現(xiàn)看起來(lái)很簡(jiǎn)單,就是將queue中未完成/已完成的數(shù)據(jù)備份下來(lái)。這里的queue就是上面的?OrderedStreamElementQueue?和?UnorderedStreamElementQueue:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
   super.snapshotState(context);

   ListState<StreamElement> partitionableState =
      getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
   partitionableState.clear();

?  // 這里的queue == OrderedStreamElementQueue?/?UnorderedStreamElementQueue
   try {
      partitionableState.addAll(queue.values());
   } catch (Exception e) {
      partitionableState.clear();

      throw new Exception("Could not add stream element queue entries to operator state " +
         "backend of operator " + getOperatorName() + '.', e);
   }
}

參考:

??? ? https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/asyncio/(官網(wǎng)文章)

????????[Flink] Flink異步I/O原理和實(shí)現(xiàn) - 知乎?

flink 異步 io(Async I/O) 示例_java.util.concurrent.cancellationexception flink-CSDN博客文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-769946.html

到了這里,關(guān)于Flink異步IO初步了解的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink番外篇】14、Flink異步I/O訪問(wèn)外部數(shù)據(jù)示例

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月16日
    瀏覽(21)
  • 大數(shù)據(jù)學(xué)習(xí)之Flink、10分鐘了解Flink的核心組件以及它們的工作原理

    ?第一章、Flink的容錯(cuò)機(jī)制 第二章、Flink核心組件和工作原理 第三章、Flink的恢復(fù)策略 第四章、Flink容錯(cuò)機(jī)制的注意事項(xiàng) 第五章、Flink的容錯(cuò)機(jī)制與其他框架的容錯(cuò)機(jī)制相比較 目錄 第二章、Flink核心組件和工作原理 Ⅰ、核心組件 1. Checkpoint組件: 2. Savepoint組件: 3. Barrier組件

    2024年01月23日
    瀏覽(23)
  • 55、Flink之用于外部數(shù)據(jù)訪問(wèn)的異步 I/O介紹及示例

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(17)
  • 大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解(Transformation)轉(zhuǎn)換算子(基礎(chǔ)篇三)

    目錄 Transformation轉(zhuǎn)換算子(基礎(chǔ)篇三) 三、轉(zhuǎn)換算子(Transformation) 1.基本轉(zhuǎn)換算子 1.1 映射(Map) 1.2 過(guò)濾(filter) 1.3 扁平映射(flatmap) 1.4基本轉(zhuǎn)換算子的例子 2.聚合算子(Aggregation) 2.1 按鍵分區(qū)(keyBy) 2.2 簡(jiǎn)單聚合 2.3 歸約聚合(reduce) 3.用戶(hù)自定義函數(shù)(UDF) 3.1?函

    2024年02月20日
    瀏覽(22)
  • 大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解DataStream API(基礎(chǔ)篇一)

    大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解DataStream API(基礎(chǔ)篇一)

    注: 本文只涉及DataStream 原因:隨著大數(shù)據(jù)和流式計(jì)算需求的增長(zhǎng),處理實(shí)時(shí)數(shù)據(jù)流變得越來(lái)越重要。因此,DataStream由于其處理實(shí)時(shí)數(shù)據(jù)流的特性和能力,逐漸替代了DataSet成為了主流的數(shù)據(jù)處理方式。 目錄 DataStream API (基礎(chǔ)篇) 前摘: 一、執(zhí)行環(huán)境 1. 創(chuàng)建執(zhí)行環(huán)境 2. 執(zhí)

    2024年01月23日
    瀏覽(27)
  • [Flink01] 了解Flink

    [Flink01] 了解Flink

    Flink入門(mén)系列文章主要是為了給想學(xué)習(xí)Flink的你建立一個(gè)大體上的框架,助力快速上手Flink。學(xué)習(xí)Flink最有效的方式是先入門(mén)了解框架和概念,然后邊寫(xiě)代碼邊實(shí)踐,然后再把官網(wǎng)看一遍。 Flink入門(mén)分為四篇,第一篇是《了解Flink》,第二篇《架構(gòu)和原理》,第三篇是《DataStre

    2024年02月19日
    瀏覽(14)
  • Flink實(shí)戰(zhàn)(1)-了解Flink

    Flink實(shí)戰(zhàn)(1)-了解Flink

    ? ? ? ? ??伙伴們,好久不見(jiàn)!這里是 葉蒼ii ? ? ? ? ?? ? 作為一名大數(shù)據(jù)博主,我一直致力于分享最新的技術(shù)趨勢(shì)和實(shí)戰(zhàn)經(jīng)驗(yàn)。近期,我在參加Flink的顧客營(yíng)銷(xiāo)項(xiàng)目,使用了PyFlink項(xiàng)目進(jìn)行數(shù)據(jù)處理和分析。 ???????? ??? 在這個(gè)文章合集中,我將與大家分享我的實(shí)戰(zhàn)

    2024年01月16日
    瀏覽(39)
  • 關(guān)于Flink的旁路緩存與異步操作

    關(guān)于Flink的旁路緩存與異步操作

    將數(shù)據(jù)庫(kù)中的數(shù)據(jù),比較經(jīng)常訪問(wèn)的數(shù)據(jù),保存起來(lái),以減少和硬盤(pán)數(shù)據(jù)庫(kù)的交互 比如: 我們使用mysql時(shí) 經(jīng)常查詢(xún)一個(gè)表 , 而這個(gè)表又一般不會(huì)變化,就可以放在內(nèi)存中,查找時(shí)直接對(duì)內(nèi)存進(jìn)行查找,而不需要再和mysql交互 dim層使用的是hbase存儲(chǔ),因?yàn)閐im層可能會(huì)出現(xiàn)大表,出現(xiàn)數(shù)據(jù)量過(guò)

    2024年02月01日
    瀏覽(10)
  • 怎么理解flink的異步檢查點(diǎn)機(jī)制

    flink的checkpoint監(jiān)控頁(yè)面那里有兩個(gè)指標(biāo)Sync Duration 和Async Duration,一個(gè)是開(kāi)始進(jìn)行同步checkpoint所需的時(shí)間,一個(gè)是異步checkpoint過(guò)程所需的時(shí)間,你是否也有過(guò)疑惑,是否只是同步過(guò)程中的時(shí)間才會(huì)阻塞正常的數(shù)據(jù)處理,而異步checkpoint的時(shí)間不會(huì)影響正常的數(shù)據(jù)處理流程? 這

    2024年02月09日
    瀏覽(24)
  • 一、Flink使用異步算子+線程池查詢(xún)MySQL

    一、Flink使用異步算子+線程池查詢(xún)MySQL

    目錄 Flink異步算子使用介紹 使用Flink異步算子+多線程異步查詢(xún)MySQL 相關(guān)閱讀 1 Flink使用異步算子請(qǐng)求高德地圖獲取位置信息 1、概述 1)Flink異步算子使用介紹 1.異步與同步概述 同步:向數(shù)據(jù)庫(kù)發(fā)送一個(gè)請(qǐng)求然后一直等待,直到收到響應(yīng)。在許多情況下,等待占據(jù)了函數(shù)運(yùn)行的

    2024年02月14日
    瀏覽(14)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包