- 聊聊Flink的必知必會(huì)(三)
- 聊聊Flink必知必會(huì)(四)
從源碼中,根據(jù)關(guān)鍵的代碼,梳理一下Flink中的時(shí)間與窗口實(shí)現(xiàn)邏輯。
WindowedStream
對(duì)數(shù)據(jù)流執(zhí)行keyBy()
操作后,再調(diào)用window()
方法,就會(huì)返回WindowedStream
,表示分區(qū)后又加窗的數(shù)據(jù)流。如果數(shù)據(jù)流沒(méi)有經(jīng)過(guò)分區(qū),直接調(diào)用window()
方法則會(huì)返回AllWindowedStream
。
如下:
// 構(gòu)造函數(shù)
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.builder =
new WindowOperatorBuilder<>(
windowAssigner,
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
input.getExecutionConfig(),
input.getType(),
input.getKeySelector(),
input.getKeyType());
}
// KeyedStream類(lèi)型,表示被加窗的輸入流。
private final KeyedStream<T, K> input;
// 用于構(gòu)建WindowOperator,最終會(huì)生成windowAssigner,Evictor,Trigger
private final WindowOperatorBuilder<T, K, W> builder;
在這里面還涉及到一些窗口的基本計(jì)算算子,比如reduce
,aggregate
,apply
,process
,sum
等等.
窗口相關(guān)模型的實(shí)現(xiàn)
Window
Window類(lèi)是Flink中對(duì)窗口的抽象。它是一個(gè)抽象類(lèi),包含抽象方法maxTimestamp(),用于獲取屬于該窗口的最大時(shí)間戳。
TimeWindow類(lèi)是其子類(lèi)。包含了窗口的start,end,offset等時(shí)間概念字段,這里會(huì)計(jì)算窗口的起始時(shí)間:
// 構(gòu)造函數(shù)
public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}
// timestamp:獲取窗口啟動(dòng)時(shí)的第一個(gè)時(shí)間戳epoch毫秒
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
WindowAssigner
WindowAssigner表示窗口分配器,用來(lái)把元素分配到零個(gè)或多個(gè)窗口(Window對(duì)象)中。它是一個(gè)抽象類(lèi),其中重要的抽象方法為assignWindows()方法,用來(lái)給元素分配窗口。
Flink有多種類(lèi)型的窗口,如Tumbling Window、Sliding Window等。各種類(lèi)型的窗口又分為基于事件時(shí)間或處理時(shí)間的窗口。WindowAssigner的實(shí)現(xiàn)類(lèi)就對(duì)應(yīng)著具體類(lèi)型的窗口。
SlidingEventTimeWindows是WindowAssigner的另一個(gè)實(shí)現(xiàn)類(lèi),表示基于事件時(shí)間的Sliding Window。它有3個(gè)long類(lèi)型的字段size、slide和offset,分別表示窗口的大小、滑動(dòng)的步長(zhǎng)和窗口起始位置的偏移量。它對(duì)assignWindows()方法的實(shí)現(xiàn)如下:
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
if (timestamp > Long.MIN_VALUE) {
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
// 返回構(gòu)建好起止時(shí)間的TimeWindow
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
設(shè)置窗口觸發(fā)器Trigger
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
WindowAssigner與其主要實(shí)現(xiàn)類(lèi)的關(guān)系如下:
這些類(lèi)的含義分別如下
- GlobalWindows:將所有元素分配進(jìn)同一個(gè)窗口的全局窗口分配器。
- SlidingEventTimeWindows:基于事件時(shí)間的滑動(dòng)窗口分配器。
- SlidingProcessingTimeWindows:基于處理時(shí)間的滑動(dòng)窗口分配器。
- TumblingEventTimeWindows:基于事件時(shí)間的滾動(dòng)窗口分配器。
- TumblingProcessingTimeWindows:基于處理時(shí)間的滾動(dòng)窗口分配器。
- EventTimeSessionWindows:基于事件時(shí)間的會(huì)話窗口分配器。
- ProcessingTimeSessionWindows:基于處理時(shí)間的會(huì)話窗口分配器。
Trigger
Trigger表示窗口觸發(fā)器。它是一個(gè)抽象類(lèi),主要定義了下面3個(gè)方法用于確定窗口何時(shí)觸發(fā)計(jì)算:
// 每個(gè)元素到來(lái)時(shí)觸發(fā)
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 處理時(shí)間的定時(shí)器觸發(fā)時(shí)
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 事件時(shí)間的定時(shí)器觸發(fā)時(shí)調(diào)用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
這3個(gè)方法的返回結(jié)果為T(mén)riggerResult對(duì)象。TriggerResult是一個(gè)枚舉類(lèi),包含兩個(gè)boolean類(lèi)型的字段fire和purge,分別表示窗口是否觸發(fā)計(jì)算和窗口內(nèi)的元素是否需要清空。
CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
窗口觸發(fā)器的實(shí)現(xiàn)由用戶根據(jù)業(yè)務(wù)需求自定義。Flink默認(rèn)基于事件時(shí)間的觸發(fā)器為EventTimeTrigger
,其三個(gè)方法處理如下
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// 如果水印已經(jīng)超過(guò)窗口,則立即觸發(fā)
return TriggerResult.FIRE;
} else {
// 注冊(cè)事件時(shí)間定時(shí)器
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
/*
* 處理時(shí)間,窗口不觸發(fā)計(jì)算也不清空內(nèi)部元素。
*/
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
Trigger與其主要實(shí)現(xiàn)類(lèi)的繼承關(guān)系
這些類(lèi)的含義如下
- CountTrigger:元素?cái)?shù)達(dá)到設(shè)置的個(gè)數(shù)時(shí)觸發(fā)計(jì)算的觸發(fā)器。
- DeltaTrigger:基于DeltaFunction和設(shè)置的閾值觸發(fā)計(jì)算的觸發(fā)器。
- EventTimeTrigger:基于事件時(shí)間的觸發(fā)器。
- ProcessingTimeTrigger:基于處理時(shí)間的觸發(fā)器。
- PurgingTrigger:可包裝其他觸發(fā)器的清空觸發(fā)器。
- ContinuousEventTimeTrigger:基于事件時(shí)間并按照一定的時(shí)間間隔連續(xù)觸發(fā)計(jì)算的觸發(fā)器。
- ContinuousProcessingTimeTrigger:基于處理時(shí)間并按照一定的時(shí)間間隔連續(xù)觸發(fā)計(jì)算的觸發(fā)器。
windowOperator
從WindowedStream
的構(gòu)造函數(shù)中,會(huì)生成WindowOperatorBuilder
,該類(lèi)可以返回WindowOperator
,這兩個(gè)類(lèi)負(fù)責(zé)窗口分配器、窗口觸發(fā)器和窗口剔除器這些組件在運(yùn)行時(shí)的協(xié)同工作。
對(duì)于WindowOperator,除了窗口分配器和窗口觸發(fā)器的相關(guān)字段,可以先了解下面兩個(gè)字段。
// StateDescriptor類(lèi)型,表示窗口狀態(tài)描述符。
private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
// 表示窗口的狀態(tài),窗口內(nèi)的元素都在其中維護(hù)。
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
窗口中的元素并沒(méi)有保存在Window對(duì)象中,而是維護(hù)在windowState中。windowStateDescriptor則是創(chuàng)建windowState所需用到的描述符。
當(dāng)有元素到來(lái)時(shí),會(huì)調(diào)用WindowOperator的processElement()方法:
public void processElement(StreamRecord<IN> element) throws Exception {
// 分配窗口
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
...
if (windowAssigner instanceof MergingWindowAssigner) { // Session Window的情況
...
} else {
for (W window: elementWindows) { // 非Session Window的情況
...
// 將Window對(duì)象設(shè)置為namespace并添加元素到windowState中
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
// 獲取TriggerResult,確定接下來(lái)是否需要觸發(fā)計(jì)算或清空窗口
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 觸發(fā)計(jì)算
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
// 清空窗口
windowState.clear();
}
...
}
}
...
}
在處理時(shí)間或事件時(shí)間的定時(shí)器觸發(fā)時(shí),會(huì)調(diào)用WindowOperator的onProcessingTime()方法或onEventTime()方法,其中的邏輯與onElement()方法的大同小異。
Watermarks
水位線(watermark)是選用事件時(shí)間來(lái)進(jìn)行數(shù)據(jù)處理時(shí)特有的概念。它的本質(zhì)就是時(shí)間戳,從上游流向下游,表示系統(tǒng)認(rèn)為數(shù)據(jù)中的事件時(shí)間在該時(shí)間戳之前的數(shù)據(jù)都已到達(dá)。
Flink中,Watermark類(lèi)表示水位。
/** Creates a new watermark with the given timestamp in milliseconds. */
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
watermark的生成有兩種方式,這里不贅述,主要講述下基于配置的策略生成watermark的方式。如下的代碼是比較常見(jiàn)的配置:
// 分配事件時(shí)間與水印
.assignTimestampsAndWatermarks(
// forBoundedOutOfOrderness 會(huì)根據(jù)事件的時(shí)間戳和允許的最大亂序時(shí)間生成水印。
// Duration 設(shè)置了最大亂序時(shí)間為1秒。這意味著 Flink 將允許在這1秒的時(shí)間范圍內(nèi)的事件不按照事件時(shí)間的順序到達(dá),這個(gè)時(shí)間段內(nèi)的事件會(huì)被認(rèn)為是"有序的"。
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
// 設(shè)置事件時(shí)間分配器,從Event對(duì)象中提取時(shí)間戳作為事件時(shí)間
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
在Flink內(nèi)部,會(huì)根據(jù)配置的策略調(diào)用BoundedOutOfOrdernessWatermarks
生成watermark。該類(lèi)的代碼如下:
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// 每條數(shù)據(jù)都會(huì)更新最大值
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 發(fā)送 watermark 邏輯
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
onEvent
決定每次事件都會(huì)取得最大的事件時(shí)間更新;onPeriodicEmit
則是周期性的更新并傳遞到下游。
AbstractStreamOperator
WatermarkGenerator
接口的調(diào)用是在AbstractStreamOperator抽象類(lèi)的子類(lèi)TimestampsAndWatermarksOperator
中。其生命周期open
函數(shù)與每個(gè)數(shù)據(jù)到來(lái)的處理函數(shù)processElement
,如下:
@Override
public void open() throws Exception {
super.open();
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator =
emitProgressiveWatermarks
? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
: new NoWatermarksGenerator<>();
wmOutput = new WatermarkEmitter(output);
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0 && emitProgressiveWatermarks) {
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
@Override
public void processElement(final StreamRecord<T> element) throws Exception {
final T event = element.getValue();
final long previousTimestamp =
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
// 從分配器中提取事件時(shí)間戳
final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);
// 調(diào)用水印生成器
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}
從方法的入?yún)⒖梢钥闯鰜?lái) flink 算子間的數(shù)據(jù)流動(dòng)是 StreamRecord 對(duì)象。它對(duì)數(shù)據(jù)的處理邏輯是什么都不做直接向下游發(fā)送,然后調(diào)用 onEvent 記錄最大時(shí)間戳,也就是說(shuō):flink 是先發(fā)送數(shù)據(jù)再生成 watermark,watermark 永遠(yuǎn)在生成它的數(shù)據(jù)之后。
總結(jié)
上面的一系列相關(guān)代碼,只是冰山一角,暫時(shí)只是把關(guān)鍵涉及到的部分捋了一下。最后畫(huà)個(gè)圖,展示其大致思路。
參考:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-746477.html
Flink Watermark 源碼解析文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-746477.html
到了這里,關(guān)于聊聊Flink必知必會(huì)(五)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!