背景
當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程
FlinkKafkaConsumer水位線發(fā)送
1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時(shí)候會(huì)構(gòu)建一個(gè)周期性的水位線發(fā)送線程并啟動(dòng)
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedPartitionStates,
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
periodicEmitter.start();
}
2.隨后,PeriodicWatermarkEmitter中注冊處理時(shí)間定時(shí)器,周期性執(zhí)行
public void start() {
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}
@Override
public void onProcessingTime(long timestamp) {
synchronized (checkpointLock) {
for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
// 這里當(dāng)前算子任務(wù)消費(fèi)的kafka 分區(qū)分別記錄每個(gè)分區(qū)的水位值
state.onPeriodicEmit();
}
//這里當(dāng)前算子會(huì)把自己消費(fèi)的kafka分區(qū)的所有水位線取最小值后當(dāng)成當(dāng)前算子任務(wù)自身的水位線發(fā)送出去,注意這里是當(dāng)前算子任務(wù)級(jí)別的
watermarkOutputMultiplexer.onPeriodicEmit();
}
// schedule the next watermark
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}
}
3.對應(yīng)state.onPeriodicEmit();記錄每個(gè)kafka分區(qū)的水位線方法文章來源:http://www.zghlxwxcb.cn/news/detail-718112.html
@Override
public void onPeriodicEmit(WatermarkOutput output) {
final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
if (next != null) {
output.emitWatermark(new Watermark(next.getTimestamp()));
}
}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代碼如下:
public DeferredOutput(OutputState state) {
this.state = state;
}
@Override
public void emitWatermark(Watermark watermark) {
state.setWatermark(watermark.getTimestamp());
}
所以這里最終效果只是對應(yīng)state(kafka分區(qū)[注意,一個(gè)算子任務(wù)有可能消費(fèi)好幾個(gè)kafka分區(qū)])上設(shè)置了水位線
/**
* Returns true if the watermark was advanced, that is if the new watermark is larger than
* the previous one.
*
* <p>Setting a watermark will clear the idleness flag.
*/
public boolean setWatermark(long watermark) {
this.idle = false;
final boolean updated = watermark > this.watermark;
// 這里也可以看出來,即使代碼里面發(fā)送了更小值的水位線,水位線也不會(huì)回退
this.watermark = Math.max(watermark, this.watermark);
return updated;
}
4.對應(yīng)算子任務(wù)組合當(dāng)前任務(wù)消費(fèi)的所有分區(qū)水位線的方法文章來源地址http://www.zghlxwxcb.cn/news/detail-718112.html
private void updateCombinedWatermark() {
long minimumOverAllOutputs = Long.MAX_VALUE;
boolean hasOutputs = false;
boolean allIdle = true;
for (OutputState outputState : watermarkOutputs) {
if (!outputState.isIdle()) {
minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
allIdle = false;
}
hasOutputs = true;
}
// if we don't have any outputs minimumOverAllOutputs is not valid, it's still
// at its initial Long.MAX_VALUE state and we must not emit that
// 如果算子任務(wù)不消費(fèi)任何分區(qū),它不會(huì)發(fā)出任何水位線,這里是不是就是kafka消費(fèi)者要小于kafka主題的原因所在???
if (!hasOutputs) {
return;
}
if (allIdle) {// 如果當(dāng)前算子任務(wù)處于空閑時(shí)間,標(biāo)識(shí)空閑,以便后續(xù)算子可以繼續(xù)推進(jìn)
underlyingOutput.markIdle();
} else if (minimumOverAllOutputs > combinedWatermark) {
combinedWatermark = minimumOverAllOutputs;
underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
}
}```
到了這里,關(guān)于源碼解析FlinkKafkaConsumer支持周期性水位線發(fā)送的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!