? ? ? ? 之前使用Flink查詢(xún)Redis數(shù)據(jù)的過(guò)程中,由于對(duì)數(shù)據(jù)一致性的要求并不是很高,當(dāng)時(shí)是用MapFunction +? State 的方案。先緩存一大堆數(shù)據(jù)到State中,達(dá)到一定數(shù)量之后,將批量Key提交到Redis中進(jìn)行查詢(xún)。
????????由于Redis性能極高,所以并沒(méi)有出現(xiàn)什么問(wèn)題,后來(lái)了解到了Flink異步IO機(jī)制,感覺(jué)使用異步IO機(jī)制實(shí)現(xiàn)會(huì)更加優(yōu)雅一點(diǎn)。本文就是記錄下自己對(duì)Flink異步IO的一個(gè)初步認(rèn)識(shí)。
異步算子主要應(yīng)用于和外部系統(tǒng)交互,提高吞吐量,減少等待延遲。用戶(hù)只需關(guān)注業(yè)務(wù)邏輯即可,消息順序性和一致性由Flink框架來(lái)處理:
圖來(lái)自官網(wǎng):
? ? ? ?
異步IO支持輸出無(wú)序和有序,也支持watermark以及ExactlyOnce語(yǔ)義:
異步IO的核心代碼都在AsyncWaitOperator里面:
switch (outputMode) { case ORDERED: queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); }
????????orderedWait(有序):消息的發(fā)送順序與接收到的順序完全相同(包括 watermark )。
????????unorderWait(無(wú)序):在ProcessingTime中是完全無(wú)序的,即哪個(gè)先完成先發(fā)送(最低延遲和消耗);在EventTime中,以watermark為邊界,介于兩個(gè)watermark之間的消息是亂序的,但是多個(gè)watermark之間的消息是有序的。
????????異步IO處理內(nèi)部會(huì)執(zhí)行 userFunction.asyncInvoke(element.getValue(), resultHandler)?調(diào)用用戶(hù)自己編寫(xiě)的方法來(lái)處理數(shù)據(jù)。userFunction就是用戶(hù)自己編寫(xiě)的自定義方法。resultHandler就是用戶(hù)在完成異步調(diào)用自己,如何把結(jié)果傳入到異步IO算子中:
(ps: userFunction是基于CompletableFuture來(lái)完成開(kāi)發(fā)的。CompletableFuture是 Java 8 中引入的一個(gè)類(lèi),它實(shí)現(xiàn)了CompletionStage接口,提供了一組豐富的方法來(lái)處理異步操作和多個(gè)任務(wù)的結(jié)果。它支持鏈?zhǔn)讲僮?可以方便地處理任務(wù)的依賴(lài)關(guān)系和結(jié)果轉(zhuǎn)換。相比于傳統(tǒng)的Future接口,CompletableFuture更加靈活和強(qiáng)大。具體demo可以看官網(wǎng)示例 或者?看下面參考中的鏈接)
@Override public void processElement(StreamRecord<IN> element) throws Exception { // add element first to the queue final ResultFuture<OUT> entry = addToWorkQueue(element); ? // 這里的ResultHandler就是對(duì)數(shù)據(jù)和ResultFuture的一個(gè)封裝 final ResultHandler resultHandler = new ResultHandler(element, entry); // register a timeout for the entry if timeout is configured if (timeout > 0L) { final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer( timeoutTimestamp, timestamp -> userFunction.timeout(element.getValue(), resultHandler)); resultHandler.setTimeoutTimer(timeoutTimer); } ? // 調(diào)用用戶(hù)編寫(xiě)的方法。 傳入的resultHandler就是讓用戶(hù)在異步完成的時(shí)候傳值用的 userFunction.asyncInvoke(element.getValue(), resultHandler); }@Override // resultHandler類(lèi)內(nèi)部的complete方法就是在用戶(hù)自定義函數(shù)中傳結(jié)果用的,最終執(zhí)行結(jié)果會(huì)調(diào)用processInMainBox(results)方法,將結(jié)果發(fā)送給下游算子 public void complete(Collection<OUT> results) { Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing"); // already completed (exceptionally or with previous complete call from ill-written AsyncFunction), so // ignore additional result if (!completed.compareAndSet(false, true)) { return; } processInMailbox(results); }
orderedWait 實(shí)現(xiàn):
? ? ? ? 有序的話很簡(jiǎn)單,就是創(chuàng)建一個(gè)隊(duì)列,然后從隊(duì)首取元素即可
public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; ? // 所有的元素都放在這么一個(gè)隊(duì)列里面 this.queue = new ArrayDeque<>(capacity); } @Override public boolean hasCompletedElements() { ? // 然后FIFO就好了 return !queue.isEmpty() && queue.peek().isDone(); }
unorderWait 實(shí)現(xiàn):
????????無(wú)序的話實(shí)現(xiàn)就會(huì)稍微復(fù)雜點(diǎn)。queue里面放的不是一條條數(shù)據(jù),而是一個(gè)個(gè)segment。數(shù)據(jù)存放在segment中,中間使用watermark分隔(每條watermark都會(huì)有自己?jiǎn)为?dú)的segment)。
public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; // most likely scenario are 4 segments <elements, watermark, elements, watermark> this.segments = new ArrayDeque<>(4); this.numberOfEntries = 0; }// 每個(gè)segment內(nèi)部會(huì)有兩個(gè)隊(duì)列:未完成 和 已完成。未完成的數(shù)據(jù)在完成之后會(huì)放置到已完成隊(duì)列里面,然后發(fā)送到下游算子 static class Segment<OUT> { /** Unfinished input elements. */ private final Set<StreamElementQueueEntry<OUT>> incompleteElements; /** Undrained finished elements. */ private final Queue<StreamElementQueueEntry<OUT>> completedElements; Segment(int initialCapacity) { incompleteElements = new HashSet<>(initialCapacity); completedElements = new ArrayDeque<>(initialCapacity); } /** * Signals that an entry finished computation. */ void completed(StreamElementQueueEntry<OUT> elementQueueEntry) { // adding only to completed queue if not completed before // there may be a real result coming after a timeout result, which is updated in the queue entry but // the entry is not re-added to the complete queue if (incompleteElements.remove(elementQueueEntry)) { completedElements.add(elementQueueEntry); } }
一致性實(shí)現(xiàn):
? ? ? ? 一致性實(shí)現(xiàn)看起來(lái)很簡(jiǎn)單,就是將queue中未完成/已完成的數(shù)據(jù)備份下來(lái)。這里的queue就是上面的?OrderedStreamElementQueue?和?UnorderedStreamElementQueue:
@Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); ListState<StreamElement> partitionableState = getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); ? // 這里的queue == OrderedStreamElementQueue?/?UnorderedStreamElementQueue try { partitionableState.addAll(queue.values()); } catch (Exception e) { partitionableState.clear(); throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e); } }
參考:
??? ? https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/asyncio/(官網(wǎng)文章)
????????[Flink] Flink異步I/O原理和實(shí)現(xiàn) - 知乎?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-769946.html
flink 異步 io(Async I/O) 示例_java.util.concurrent.cancellationexception flink-CSDN博客文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-769946.html
到了這里,關(guān)于Flink異步IO初步了解的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!