使用事件時間時,需要通過 Flink API 的 WatermarkStrategy
接口配置 watermark 的生成策略。
我們將逐段來看這個 API 的各個部分。
繼承關(guān)系
Flink 使用 WatermarkStrategy<T>
接口來構(gòu)建 Watermark 策略,其中泛型 T
為輸入數(shù)據(jù)流類型。
WatermarkStrategy
接口繼承了 TimestampAssignerSupplier
和 WatermarkGeneratorSupplier
,即相當于包含了 TimestampAssigner
和 WatermarkGenerator
,具體地:
-
TimestampAssigner
用于從消息記錄中提取事件時間戳 -
WatermarkGenerator
用于根據(jù)事件時間戳生成 watermark
源碼:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L19
【Github】package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; import java.time.Duration; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @Public public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
TimestampAssignerSupplier
和 WatermarkGeneratorSupplier
均繼承了 Serializable
,所以WatermarkStrategy
也繼承了 Serializable
,是可序列化的。這是因為在分布式計算過程中,WatermarkStrategy
可能會在不同節(jié)點之間傳輸。
接口方法
需要實現(xiàn)或重寫的方法
在 WatermarkStrategy
接口中,有如下 3 個方法是需要被實現(xiàn)的,這 3 個方法分別對應(yīng)需要確定的 3 個問題:
-
createWatermarkGenerator()
:返回實現(xiàn)了WatermarkGenerator
接口的對象,該對象用于根據(jù)輸入流記錄的事件時間生成 watermark -
createTimestampAssigner()
:返回實現(xiàn)了TimestampAssigner
接口的對象,該對象用于從輸入流中獲取每條記錄的事件時間 -
getAlignmentParameters()
:返回WatermarkAlignmentParams
類的實例,該實例用于控制是否需要對齊不同輸入流的 watermark
源碼|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy
(部分)
// ------------------------------------------------------------------------
// Methods that implementors need to implement.
// ------------------------------------------------------------------------
/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
*/
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
// By default, this is {@link RecordTimestampAssigner},
// for cases where records come out of a source with valid timestamps, for example from
// Kafka.
return new RecordTimestampAssigner<>();
}
/**
* Provides configuration for watermark alignment of a maximum watermark of multiple
* sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*/
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}
其中,只有 createWatermarkGenerator(WatermarkGeneratorSupplier.Context context)
方法是非默認的,這樣就可以通過 Java 的 lambda 表達式語法來實現(xiàn) WatermarkStrategy
接口。
指定 WatermarkGenerator
的方法
WatermarkStrategy
接口提供了如下 4 個直接指定 WatermarkGenerator
以實現(xiàn) WatermarkStrategy
的默認方法。因為 WatermarkStrategy
接口只有 createWatermarkGenerator
這 1 個方法沒有默認實現(xiàn),所以在實現(xiàn)上均使用了 Java 的 lambda 表達式。
-
forMonotonousTimestamps
:使用AscendingTimestampsWatermarks
生成 watermark,適用于事件時間單調(diào)遞增的場景 -
forBoundedOutOfOrderness
:使用BoundedOutOfOrdernessWatermarks
生成 watermark,適用于事件時間雖然不單調(diào)遞增,但延遲時間有限的場景 -
forGenerator
:使用參數(shù)提供的WatermarkGeneratorSupplier
生成的 watermark 生成器 -
noWatermarks
:不使用 watermark
源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy.java:197
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
return generatorSupplier::createWatermarkGenerator;
}
static <T> WatermarkStrategy<T> noWatermarks() {
return (ctx) -> new NoWatermarksGenerator<>();
}
指定 TimestampAssigner
的方法
在 WatermarkStrategy
接口中,提供了如下 3 個指定 TimestampAssigner
的默認方法。這些方法會創(chuàng)建一個新的 WatermarkStrategy
類,并將調(diào)用該方法的基礎(chǔ) WatermarkStrategy
類封裝起來;在調(diào)用新類的 createWatermarkGenerator
方法時,會使用新類中重寫的方法;在調(diào)用新類的其他方法時,會返回被封裝的基礎(chǔ)類的方法。
-
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner)
:使用參數(shù)指定的TimestampAssigner
的提供者類 -
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner)
:使用參數(shù)指定的可序列化的TimestampAssigner
源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy
(部分)文章來源:http://www.zghlxwxcb.cn/news/detail-846325.html
default WatermarkStrategy<T> withTimestampAssigner(
TimestampAssignerSupplier<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}
default WatermarkStrategy<T> withTimestampAssigner(
SerializableTimestampAssigner<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(
this, TimestampAssignerSupplier.of(timestampAssigner));
}
制定處理空閑數(shù)據(jù)源的方法
在 WatermarkStrategy
接口中,提供了 withIdleness(Duration idleTimeout)
方法用于處理存在空閑數(shù)據(jù)源的情況,其參數(shù)為標記空閑數(shù)據(jù)源的時間閾值,即當某個數(shù)據(jù)源超過 idleTimeout
沒有生產(chǎn)消息時則標記為空閑數(shù)據(jù)源。
源碼|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy#withIdleness
/**
* Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
* created {@link WatermarkGenerator}.
*
* <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
* stream for that amount of time, then that partition is considered "idle" and will not hold
* back the progress of watermarks in downstream operators.
*
* <p>Idleness can be important if some partitions have little data and might not have events
* during some periods. Without idleness, these streams can stall the overall event time
* progress of the application.
*/
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
"idleTimeout must be greater than zero");
return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}
指定 AlignmentParameters
的方法
在 WatermarkStrategy
接口中,還提供了如下 2 個方法,用于指定 watermark 的對齊方法。在實現(xiàn)上,這 2 個方法也與指定 TimestampAssigner
的方法類似,在新類中重寫 getAlignmentParameters()
方法。
-
withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift)
:指定 watermark 對齊策略;其中參數(shù)watermarkGroup
為 watermark 的組名,maxAllowedWatermarkDrift
為在暫停消費前允許超出 watermark 時間 -
withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval)
:指定 watermark 對齊策略;其中新增的參數(shù)updateInterval
為 task 上報 watermark 以及調(diào)度器制定對齊 watermark 的時間間隔
源碼|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy
(部分)
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift) {
return withWatermarkAlignment(
watermarkGroup,
maxAllowedWatermarkDrift,
WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
return new WatermarksWithWatermarkAlignment<T>(
this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}
整體說明
通過上述設(shè)計,使 WatermarkStrategy
接口允許使用實現(xiàn) WatermarkGenerator
方法的 Java Lambda 表達式實現(xiàn);對于已實現(xiàn)的 WatermarkStrategy
匿名類,允許通過調(diào)用指定 TimestampAssigner
和 AlignmentParameters
的方法,來覆蓋掉這兩個方法的默認實現(xiàn)。文章來源地址http://www.zghlxwxcb.cn/news/detail-846325.html
到了這里,關(guān)于Flink 源碼學習|使用 Watermark 策略(WatermarkStrategy)【v2 修訂版】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!