分析&回答
Flink的窗口機(jī)制是其底層核心之一,也是高效流處理的關(guān)鍵。Flink窗口分配的基類是WindowAssigner抽象類,下面的類圖示出了Flink能夠提供的所有窗口類型。
Flink窗口分為滾動(dòng)(tumbling)、滑動(dòng)(sliding)和會(huì)話(session)窗口三大類,本文要說的是滑動(dòng)窗口。
下圖示出一個(gè)典型的統(tǒng)計(jì)用戶訪問的滑動(dòng)窗口,來自官方文檔。
假設(shè)每兩條虛線之間代表1分鐘時(shí)間差,那么窗口大?。╯ize)就是2分鐘,滑動(dòng)步長(slide)是1分鐘。若時(shí)間特征為事件時(shí)間,代碼如下。
dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由圖可知,當(dāng)前滑動(dòng)窗口與上一個(gè)滑動(dòng)窗口會(huì)有重疊。在窗口大小size是步長slide的2倍的情況下,(幾乎)每個(gè)DataStream元素都會(huì)處于2個(gè)窗口內(nèi)。
我們簡(jiǎn)單參考一下相關(guān)的Flink源碼,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源碼。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
// 會(huì)話窗口的處理邏輯,略去
} else {
for (W window : elementWindows) {
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
// 最后是側(cè)輸出遲到數(shù)據(jù)的邏輯,略去
}
復(fù)制代碼
該方法先調(diào)用WindowAssigner.assignWindows()方法,根據(jù)輸入元素的時(shí)間戳判斷它應(yīng)該屬于哪些窗口。接著遍歷所有窗口,將該元素加入對(duì)應(yīng)的窗口狀態(tài)(即緩存)中,并根據(jù)觸發(fā)器返回的TriggerResult決定是輸出(fire)還是清除(purge)窗口的內(nèi)容,emitWindowContents()方法會(huì)調(diào)用用戶函數(shù)。最后,還要調(diào)用registerCleanupTimer()方法注冊(cè)計(jì)時(shí)器用來在窗口徹底過期時(shí)清除窗口狀態(tài)。
以下是SlidingEventTimeWindows.assignWindows()方法的源碼。
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
復(fù)制代碼
這段代碼就不難理解了,先調(diào)用getWindowStartWithOffset()方法根據(jù)元素的時(shí)間戳計(jì)算出其窗口的起點(diǎn)時(shí)間戳,再逐次循環(huán)向后滑動(dòng),產(chǎn)生size / slide個(gè)窗口。我們可以將size / slide叫做“粒度”,亦即上述代碼中返回的Collection集合的大小。粒度越大(“細(xì)”),滑動(dòng)窗口之間的重合也越大。
代碼讀完了,有一個(gè)貌似稀松平常的需求:
以3分鐘的頻率實(shí)時(shí)計(jì)算App內(nèi)各個(gè)子模塊近24小時(shí)的PV和UV。
直覺上我們需要用粒度為1440 / 3 = 480的滑動(dòng)窗口來實(shí)現(xiàn)它,但是細(xì)粒度的滑動(dòng)窗口會(huì)帶來性能問題,有兩點(diǎn):
狀態(tài) 由代碼可知,WindowOperator內(nèi)維護(hù)了窗口本身的內(nèi)部狀態(tài)windowState(類型為InternalAppendingState)。對(duì)于一個(gè)元素,會(huì)將其寫入對(duì)應(yīng)的(key, window)二元組所圈定的狀態(tài)中??梢?,如果粒度為480,那么每個(gè)元素到來,更新windowState時(shí)都要遍歷480個(gè)窗口并寫入,開銷是非常大的。在采用HDFS/RocksDB作為狀態(tài)后端時(shí),checkpoint的瓶頸也尤其明顯。
定時(shí)器 在Flink中,定時(shí)器的實(shí)際實(shí)現(xiàn)是TimerHeapInternalTimer類,并且是用Flink自己實(shí)現(xiàn)的優(yōu)先隊(duì)列維護(hù)在堆內(nèi)存中的。而在WindowOperator中,每一個(gè)(key, window)二元組都需要注冊(cè)兩個(gè)定時(shí)器:一是觸發(fā)器注冊(cè)的定時(shí)器,用于決定窗口數(shù)據(jù)何時(shí)輸出;二是registerCleanupTimer()方法注冊(cè)的清理定時(shí)器,用于在窗口徹底過期(如allowedLateness過期)之后及時(shí)清理掉窗口的內(nèi)部狀態(tài)。細(xì)粒度滑動(dòng)窗口會(huì)造成維護(hù)的定時(shí)器增多,內(nèi)存負(fù)擔(dān)加重。
在官方文檔Windows最后一節(jié)的最后,也有如下的提醒:
Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
可能有看官會(huì)問:預(yù)聚合不能解決細(xì)粒度窗口的問題嗎?答案是不能。預(yù)聚合只是讓AggregateFunction/ReduceFunction之后的數(shù)據(jù)量降低,但是進(jìn)入WindowOperator的窗口狀態(tài)的數(shù)據(jù)還是沒變的。換句話說,就算觸發(fā)器實(shí)現(xiàn)為FIRE_AND_PURGE,遍歷大量窗口并寫入狀態(tài)的開銷也是無法消除的。
扯了這么多,有解決方案嗎?
當(dāng)然是有的,辦法總比困難多。我們一般使用 滾動(dòng)窗口+在線存儲(chǔ)+讀時(shí)聚合 的思路作為workaround。簡(jiǎn)單來講就是:
棄用滑動(dòng)窗口,用長度等于原滑動(dòng)窗口步長的滾動(dòng)窗口代替; 每個(gè)滾動(dòng)窗口將其周期內(nèi)的數(shù)據(jù)做聚合,打入外部在線存儲(chǔ)(內(nèi)存數(shù)據(jù)庫如Redis,LSM-based NoSQL存儲(chǔ)如HBase); 掃描在線存儲(chǔ)中對(duì)應(yīng)時(shí)間區(qū)間(可以靈活指定)的所有行,并將計(jì)算結(jié)果返回給前端展示。 針對(duì)上面的PV/UV問題,如果采用Redis作為在線存儲(chǔ),我們可以將時(shí)間戳放在key內(nèi),并設(shè)定24小時(shí)過期時(shí)間。用數(shù)字字符串存儲(chǔ)3分鐘周期內(nèi)的PV量,用HyperLogLog存儲(chǔ)3分鐘周期內(nèi)的UV量。近24小時(shí)的PV和UV就分別可以通過簡(jiǎn)單加減和HyperLogLog的pfmerge/pfcount命令得出了。當(dāng)然,實(shí)際操作起來還是要根據(jù)需求和服務(wù)器資源而定。文章來源:http://www.zghlxwxcb.cn/news/detail-686696.html
喵嗚面試助手:一站式解決面試問題,你可以搜索微信小程序 [喵嗚面試助手]?或關(guān)注 [喵嗚刷題] -> 面試助手?免費(fèi)刷題。如有好的面試知識(shí)或技巧期待您的共享!文章來源地址http://www.zghlxwxcb.cn/news/detail-686696.html
到了這里,關(guān)于說說FLINK細(xì)粒度滑動(dòng)窗口如何處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!