源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行
Flink版本:1.13.6
前置知識:源節(jié)點的Checkpoint是由Checkpointcoordinate觸發(fā),具體是通過RPC調(diào)用TaskManager中對應的Task的StreamTask類的performChecpoint方法執(zhí)行Checkpoint。
本文思路:本文先分析checkpoint階段,然后再分析數(shù)據(jù)讀取階段,最后得出結(jié)論:源節(jié)點Checkpoint時和源節(jié)點讀取數(shù)據(jù)時,都需要搶SourceStreamTask類中l(wèi)ock變量的鎖,最終實現(xiàn)串行執(zhí)行checkpoint與寫數(shù)據(jù)
Checkpoint階段
Checkpoint在StreamTask的performCheckpoint方法中執(zhí)行,該方法調(diào)用過程如下
// 在StreamTask類中 執(zhí)行checkpoint操作
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics )
throws Exception {
if (isRunning) {
//使用actionExecutor 同步觸發(fā)checkpoint
actionExecutor.runThrowing(
() -> {
....//經(jīng)過一系列檢查
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isRunning);
});
return true;
} else {
....
}
}
從上述代碼可以看出,Checkpoint執(zhí)行是由actionExecutor執(zhí)行器執(zhí)行
StreamTask類變量actionExecutor的實現(xiàn)和初始化
StreamTask類變量actionExecution的實現(xiàn)
通過代碼注釋可以知道該執(zhí)行器的實現(xiàn)是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;從SynchronizedStreamTaskActionExecutor源代碼可知,該執(zhí)行器每次執(zhí)行都需要獲得mutex對象鎖
/**
* All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another
* thread) must be executed through this executor to ensure that we don't have concurrent method
* calls that void consistent checkpoints.
*
* <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.
*/
private final StreamTaskActionExecutor actionExecutor;
class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {
private final Object mutex;
public SynchronizedStreamTaskActionExecutor(Object mutex) {
this.mutex = mutex;
}
@Override
public void run(RunnableWithException runnable) throws Exception {
synchronized (mutex) {
runnable.run();
}
}
}
StreamTask變量actionExecution初始化
actionExecutor變量在StreamTask中定義,在構造方法中初始化;該構造方法由SourceStreamTask調(diào)用,并傳入SynchronizedStreamTaskActionExecutor對象,代碼如下所示
// SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {
//調(diào)用的StreamTask構造函數(shù),傳入SynchronizedStreamTaskActionExecutor對象
super(
env,
null,
FatalExitExceptionHandler.INSTANCE,
//初始化actionExecutor
StreamTaskActionExecutor.synchronizedExecutor(lock));
//將lock對象賦值給類變量lock
this.lock = Preconditions.checkNotNull(lock);
this.sourceThread = new LegacySourceFunctionThread();
getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}
// StreamTask的方法
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
//初始化actionExecutor
StreamTaskActionExecutor actionExecutor)
throws Exception {
this(
environment,
timerService,
uncaughtExceptionHandler,
actionExecutor,
new TaskMailboxImpl(Thread.currentThread()));
}
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
super(environment);
this.configuration = new StreamConfig(getTaskConfiguration());
this.recordWriter = createRecordWriterDelegate(configuration, environment);
//初始化actionExecutor
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
.......}
小結(jié)
actionExecutor執(zhí)行器每次執(zhí)行都需要獲得mutex對象,mutex對象就是SourceStreamTask類中的lock對象;即算子每次執(zhí)行Checkpoint時都需要獲得SourceStreamTask類中l(wèi)ock對象鎖才能進行
數(shù)據(jù)讀取階段
在執(zhí)行Checkpoint時控制讀取源端,則控制點必定是在調(diào)用SourceContext的collect方法時
@Override
public void run(SourceContext<String> ctx) throws Exception {
int i = 0;
while (true) {
//在這個方法里處理
ctx.collect(String.valueOf(i));
}
}
點擊collection查看實現(xiàn),選擇NonTimestampContext查看代碼,collect()實現(xiàn)如下
@Override
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
所以這里控制數(shù)據(jù)讀取發(fā)送是通過lock來控制,lock是如何初始化的?
通過NonTimestampContext構造方法可以定位到StreamSourceContexts->getSourceContext方法;
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Object checkpointLock,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OUT>> output,
long watermarkInterval,
long idleTimeout) {
final SourceFunction.SourceContext<OUT> ctx;
switch (timeCharacteristic) {
....
case ProcessingTime:
//初始化NonTimestampContext
ctx = new NonTimestampContext<>(checkpointLock, output);
break;
default:
throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
}
return ctx;
}
向上追蹤,在StreamSource類中調(diào)用getSourceContext:
public void run(
final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain)
throws Exception {
....
this.ctx =
StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
....
}
// 再向上最終run方法的調(diào)用點->是由內(nèi)部方法run調(diào)用
public void run(
final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final OperatorChain<?, ?> operatorChain)
throws Exception {
run(lockingObject, streamStatusMaintainer, output, operatorChain);
}
//再向上最終run方法的調(diào)用點->SourceStreamTask 調(diào)用run 然后再代用mainOpterator run方法
@Override
public void run() {
try {
// 使用的是類變量lock
mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
if (!wasStoppedExternally && !isCanceled()) {
synchronized (lock) {
operatorChain.setIgnoreEndOfInput(false);
}
}
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
completionFuture.completeExceptionally(t);
}
}
小結(jié)
所以在源端寫數(shù)據(jù)時,必須獲得SourceStreamTask中的類變量lock的鎖才能進行寫數(shù)據(jù);類變量lock剛好和執(zhí)行器時同一個對象文章來源:http://www.zghlxwxcb.cn/news/detail-631151.html
總結(jié)
flink的source算子在Checkpoint時,是通過鎖對象SourceStreamTask.lock,來控制源端數(shù)據(jù)產(chǎn)生和Checkpoint的有序進行文章來源地址http://www.zghlxwxcb.cn/news/detail-631151.html
到了這里,關于源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!