国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行

這篇具有很好參考價值的文章主要介紹了源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行

Flink版本:1.13.6

前置知識:源節(jié)點的Checkpoint是由Checkpointcoordinate觸發(fā),具體是通過RPC調(diào)用TaskManager中對應的Task的StreamTask類的performChecpoint方法執(zhí)行Checkpoint。

本文思路:本文先分析checkpoint階段,然后再分析數(shù)據(jù)讀取階段,最后得出結(jié)論:源節(jié)點Checkpoint時和源節(jié)點讀取數(shù)據(jù)時,都需要搶SourceStreamTask類中l(wèi)ock變量的鎖,最終實現(xiàn)串行執(zhí)行checkpoint與寫數(shù)據(jù)

Checkpoint階段

Checkpoint在StreamTask的performCheckpoint方法中執(zhí)行,該方法調(diào)用過程如下

// 在StreamTask類中 執(zhí)行checkpoint操作
private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetricsBuilder checkpointMetrics )
            throws Exception {
        if (isRunning) {
            //使用actionExecutor 同步觸發(fā)checkpoint
            actionExecutor.runThrowing(
                    () -> {
    					....//經(jīng)過一系列檢查
                        subtaskCheckpointCoordinator.checkpointState(
                                checkpointMetaData,
                                checkpointOptions,
                                checkpointMetrics,
                                operatorChain,
                                this::isRunning);
                    });
            return true;
        } else {
    		....
        }
    }

從上述代碼可以看出,Checkpoint執(zhí)行是由actionExecutor執(zhí)行器執(zhí)行

StreamTask類變量actionExecutor的實現(xiàn)和初始化

StreamTask類變量actionExecution的實現(xiàn)

通過代碼注釋可以知道該執(zhí)行器的實現(xiàn)是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;從SynchronizedStreamTaskActionExecutor源代碼可知,該執(zhí)行器每次執(zhí)行都需要獲得mutex對象鎖

  /**
     * All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another
     * thread) must be executed through this executor to ensure that we don't have concurrent method
     * calls that void consistent checkpoints.
     *
     * <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
     * StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
     * SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.
     */
private final StreamTaskActionExecutor actionExecutor;


class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {
    private final Object mutex;

    public SynchronizedStreamTaskActionExecutor(Object mutex) {
        this.mutex = mutex;
    }

    @Override
    public void run(RunnableWithException runnable) throws Exception {
        synchronized (mutex) {
            runnable.run();
        }
    }
}

StreamTask變量actionExecution初始化

actionExecutor變量在StreamTask中定義,在構造方法中初始化;該構造方法由SourceStreamTask調(diào)用,并傳入SynchronizedStreamTaskActionExecutor對象,代碼如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {
    //調(diào)用的StreamTask構造函數(shù),傳入SynchronizedStreamTaskActionExecutor對象
    super(
            env,
            null,
            FatalExitExceptionHandler.INSTANCE,
            //初始化actionExecutor
            StreamTaskActionExecutor.synchronizedExecutor(lock));
    //將lock對象賦值給類變量lock
    this.lock = Preconditions.checkNotNull(lock);
    this.sourceThread = new LegacySourceFunctionThread();

    getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}

//  StreamTask的方法
protected StreamTask(
        Environment environment,
        @Nullable TimerService timerService,
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
    	//初始化actionExecutor
        StreamTaskActionExecutor actionExecutor)
        throws Exception {
    this(
            environment,
            timerService,
            uncaughtExceptionHandler,
            actionExecutor,
            new TaskMailboxImpl(Thread.currentThread()));
}

protected StreamTask(
        Environment environment,
        @Nullable TimerService timerService,
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
        StreamTaskActionExecutor actionExecutor,
        TaskMailbox mailbox)
        throws Exception {
    super(environment);
    this.configuration = new StreamConfig(getTaskConfiguration());
    this.recordWriter = createRecordWriterDelegate(configuration, environment);
    //初始化actionExecutor
    this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
    this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
    .......}
小結(jié)

actionExecutor執(zhí)行器每次執(zhí)行都需要獲得mutex對象,mutex對象就是SourceStreamTask類中的lock對象;即算子每次執(zhí)行Checkpoint時都需要獲得SourceStreamTask類中l(wèi)ock對象鎖才能進行

數(shù)據(jù)讀取階段

在執(zhí)行Checkpoint時控制讀取源端,則控制點必定是在調(diào)用SourceContext的collect方法時

@Override
public void run(SourceContext<String> ctx) throws Exception {
    int i = 0;
    while (true) {
		//在這個方法里處理
        ctx.collect(String.valueOf(i));
    }
}

點擊collection查看實現(xiàn),選擇NonTimestampContext查看代碼,collect()實現(xiàn)如下

@Override
public void collect(T element) {
    synchronized (lock) {
        output.collect(reuse.replace(element));
    }
}

所以這里控制數(shù)據(jù)讀取發(fā)送是通過lock來控制,lock是如何初始化的?

通過NonTimestampContext構造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
        TimeCharacteristic timeCharacteristic,
        ProcessingTimeService processingTimeService,
        Object checkpointLock,
        StreamStatusMaintainer streamStatusMaintainer,
        Output<StreamRecord<OUT>> output,
        long watermarkInterval,
        long idleTimeout) {

    final SourceFunction.SourceContext<OUT> ctx;
    switch (timeCharacteristic) {
		....
        case ProcessingTime:
            //初始化NonTimestampContext
            ctx = new NonTimestampContext<>(checkpointLock, output);
            break;
        default:
            throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
    }
    return ctx;
}

向上追蹤,在StreamSource類中調(diào)用getSourceContext:

public void run(
        final Object lockingObject,
        final StreamStatusMaintainer streamStatusMaintainer,
        final Output<StreamRecord<OUT>> collector,
        final OperatorChain<?, ?> operatorChain)
        throws Exception {
        ....
        this.ctx =
        
        StreamSourceContexts.getSourceContext(
                timeCharacteristic,
                getProcessingTimeService(),
                lockingObject,
                streamStatusMaintainer,
                collector,
                watermarkInterval,
                -1);
        ....
        }
// 再向上最終run方法的調(diào)用點->是由內(nèi)部方法run調(diào)用
public void run(
        final Object lockingObject,
        final StreamStatusMaintainer streamStatusMaintainer,
        final OperatorChain<?, ?> operatorChain)
        throws Exception {

    run(lockingObject, streamStatusMaintainer, output, operatorChain);
}

//再向上最終run方法的調(diào)用點->SourceStreamTask 調(diào)用run 然后再代用mainOpterator run方法
@Override
public void run() {
    try {
        // 使用的是類變量lock
        mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
        if (!wasStoppedExternally && !isCanceled()) {
            synchronized (lock) {
                operatorChain.setIgnoreEndOfInput(false);
            }
        }
        completionFuture.complete(null);
    } catch (Throwable t) {
        // Note, t can be also an InterruptedException
        completionFuture.completeExceptionally(t);
    }
}
小結(jié)

所以在源端寫數(shù)據(jù)時,必須獲得SourceStreamTask中的類變量lock的鎖才能進行寫數(shù)據(jù);類變量lock剛好和執(zhí)行器時同一個對象

總結(jié)

flink的source算子在Checkpoint時,是通過鎖對象SourceStreamTask.lock,來控制源端數(shù)據(jù)產(chǎn)生和Checkpoint的有序進行文章來源地址http://www.zghlxwxcb.cn/news/detail-631151.html

到了這里,關于源碼解析Flink源節(jié)點數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flink 從savepoint、checkpoint中恢復數(shù)據(jù)

    flink 從savepoint、checkpoint中恢復數(shù)據(jù)

    提示:flink checkpoint重啟: ??flink作業(yè)因為故障導致restart strategy失敗或升級flink版本重新發(fā)布任務,這時就需要從最近的checkpoint恢復。一般而言有兩種方案,第一種方案是 開啟checkpoint且任務取消時不刪除checkpoint (調(diào)整參數(shù)execution.checkpointing.externalized-checkpoint-retention),第

    2024年02月10日
    瀏覽(20)
  • flink正常消費kafka數(shù)據(jù),flink沒有做checkpoint,kafka位點沒有提交

    1、背景 flink消費kafka數(shù)據(jù),多并發(fā),實現(xiàn)雙流join 2、現(xiàn)象 (1)flink任務消費kafka數(shù)據(jù),其中數(shù)據(jù)正常消費,kafka顯示消息堆積,位點沒有提交,并且flink任務沒有做checkpoint (2)其中一個流的subtask顯示finished (3)無背壓 3、問題原因 (1)其中一個topic分區(qū)為1 (2)配置的并行

    2024年02月13日
    瀏覽(22)
  • 【大數(shù)據(jù)】Flink 架構(五):檢查點 Checkpoint(看完即懂)

    【大數(shù)據(jù)】Flink 架構(五):檢查點 Checkpoint(看完即懂)

    《 Flink 架構 》系列(已完結(jié)),共包含以下 6 篇文章: Flink 架構(一):系統(tǒng)架構 Flink 架構(二):數(shù)據(jù)傳輸 Flink 架構(三):事件時間處理 Flink 架構(四):狀態(tài)管理 Flink 架構(五):檢查點 Checkpoint(看完即懂) Flink 架構(六):保存點 Savepoint ?? 如果您覺得這篇

    2024年02月19日
    瀏覽(23)
  • 【Unity 3D】C#從JSON文件中讀取、解析、保存數(shù)據(jù)(附源碼)

    JSON是一種輕量級的數(shù)據(jù)交換格式,采用完全獨立于編程語言的文本格式存儲和表示數(shù)據(jù),簡潔和清晰的層次結(jié)構使JSON成為理想的數(shù)據(jù)交換語言,易于讀者閱讀和編寫,同時也易于機器解析和生成,并有效的提高網(wǎng)絡傳輸效率 生成JSON數(shù)據(jù)實例代碼如下 下面的代碼將JSON中數(shù)據(jù)

    2024年02月11日
    瀏覽(161)
  • Flink系列之:使用Flink CDC從數(shù)據(jù)庫采集數(shù)據(jù),設置checkpoint支持數(shù)據(jù)采集中斷恢復,保證數(shù)據(jù)不丟失

    博主相關技術博客: Flink系列之:Debezium采集Mysql數(shù)據(jù)庫表數(shù)據(jù)到Kafka Topic,同步kafka topic數(shù)據(jù)到StarRocks數(shù)據(jù)庫 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql數(shù)據(jù)到StarRocks數(shù)據(jù)庫

    2024年02月11日
    瀏覽(31)
  • 深入解析 Flink CDC 增量快照讀取機制

    深入解析 Flink CDC 增量快照讀取機制

    Flink CDC 1.x 使用 Debezium 引擎集成來實現(xiàn)數(shù)據(jù)采集,支持全量加增量模式,確保數(shù)據(jù)的一致性。然而,這種集成存在一些痛點需要注意: 一致性通過加鎖保證 :在保證數(shù)據(jù)一致性時,Debezium 需要對讀取的庫或表加鎖。全局鎖可能導致數(shù)據(jù)庫出現(xiàn)掛起情況,而表級鎖會影響表的

    2024年02月03日
    瀏覽(22)
  • Java如何快速讀取&解析JSON數(shù)據(jù)(文件),獲取想要的內(nèi)容?

    Java如何快速讀取&解析JSON數(shù)據(jù)(文件),獲取想要的內(nèi)容?

    手打不易,如果轉(zhuǎn)摘,請注明出處! 注明原文: https://zhangxiaofan.blog.csdn.net/article/details/132764186 目錄 前言 準備工作 Json數(shù)據(jù)(示例) 解析Json文件 第一步:創(chuàng)建一個空類 第二步:使用?Gsonformat 插件 ?第三步:復制Json內(nèi)容,創(chuàng)建對應類 第四步:讀取Json文件,提取目標數(shù)據(jù)

    2024年02月05日
    瀏覽(22)
  • Flink非對齊checkpoint原理(Flink Unaligned Checkpoint)

    Flink非對齊checkpoint原理(Flink Unaligned Checkpoint)

    為什么提出Unaligned Checkpoint(UC)? 因為反壓嚴重時會導致Checkpoint失敗,可能導致如下問題 恢復時間長-服務效率低 非冪等和非事務會導致數(shù)據(jù)重復 持續(xù)反壓導致任務加入死循環(huán)(可能導致數(shù)據(jù)丟失,例如超過kafka的過期時間無法重置offset) UC的原理 UC有兩個階段(UC主要是

    2024年02月14日
    瀏覽(28)
  • Flink 如何定位反壓節(jié)點?

    Flink 如何定位反壓節(jié)點?

    Flink Web UI 自帶的反壓監(jiān)控 —— 直接方式 Flink Web UI 的反壓監(jiān)控提供了 Subtask 級別 的反壓監(jiān)控。監(jiān)控的原理是 通過Thread.getStackTrace() 采集在 TaskManager 上正在運行的所有線程,收集在緩沖區(qū)請求中阻塞的線程數(shù)(意味著下游阻塞),并計算緩沖區(qū)阻塞線程數(shù)與總線程數(shù)的比值

    2024年02月10日
    瀏覽(26)
  • 【Flink】 FlinkCDC讀取Mysql( DataStream 方式)(帶完整源碼,直接可使用)

    【Flink】 FlinkCDC讀取Mysql( DataStream 方式)(帶完整源碼,直接可使用)

    簡介: ? ? FlinkCDC讀取Mysql數(shù)據(jù)源,程序中使用了自定義反序列化器,完整的Flink結(jié)構,開箱即用。 本工程提供 1、項目源碼及詳細注釋,簡單修改即可用在實際生產(chǎn)代碼 2、成功編譯截圖 3、自己編譯過程中可能出現(xiàn)的問題 4、mysql建表語句及測試數(shù)據(jù) 5、修復FlinkCDC讀取Mysql數(shù)

    2024年02月07日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包