国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

深入解析 Flink CDC 增量快照讀取機(jī)制

這篇具有很好參考價(jià)值的文章主要介紹了深入解析 Flink CDC 增量快照讀取機(jī)制。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、Flink-CDC 1.x 痛點(diǎn)

Flink CDC 1.x 使用 Debezium 引擎集成來實(shí)現(xiàn)數(shù)據(jù)采集,支持全量加增量模式,確保數(shù)據(jù)的一致性。然而,這種集成存在一些痛點(diǎn)需要注意:

  1. 一致性通過加鎖保證:在保證數(shù)據(jù)一致性時(shí),Debezium 需要對讀取的庫或表加鎖。全局鎖可能導(dǎo)致數(shù)據(jù)庫出現(xiàn)掛起情況,而表級鎖會影響表的寫操作。

  2. 只支持單并發(fā)讀取:Flink CDC 1.x版本只支持單并發(fā)讀取,對于大表讀取非常耗時(shí)。如果需要讀取的數(shù)據(jù)量較大,可能會導(dǎo)致性能瓶頸。

  3. 全量讀取階段不支持 checkpoint:CDC 的initial模式下讀取分為兩個(gè)階段,全量和增量。然而,在全量讀取階段,不支持 checkpoint 的功能。如果出現(xiàn)故障,必須重新進(jìn)行全量讀取操作。

1.1、全局鎖

在 Flink CDC 1.x 中,全量讀取時(shí)的鎖機(jī)制流程如下:

  1. 開始全量讀?。寒?dāng) Flink CDC 啟動全量讀取任務(wù)時(shí),它會與 MySQL 數(shù)據(jù)庫建立連接,并開始讀取源表的數(shù)據(jù)。

  2. 獲取讀取的鎖:為了保證數(shù)據(jù)的一致性,F(xiàn)link CDC 在全量讀取過程中需要獲取讀取的鎖。在默認(rèn)情況下,F(xiàn)link CDC 使用全局鎖(Global Lock)來確保數(shù)據(jù)的一致性。

  3. 全局鎖的獲取:Flink CDC 通過向 MySQL 數(shù)據(jù)庫發(fā)送命令來獲取全局鎖。全局鎖將阻塞其他對源表進(jìn)行寫操作的事務(wù),確保在全量讀取期間不會有數(shù)據(jù)的變更。

  4. 全量讀取數(shù)據(jù):一旦獲得全局鎖,F(xiàn)link CDC 開始進(jìn)行全量讀取。它會掃描源表的所有數(shù)據(jù),并將其傳輸?shù)侥繕?biāo)系統(tǒng)(如 Doris)進(jìn)行加載和處理。

  5. 釋放全局鎖:當(dāng)全量讀取完成后,F(xiàn)link CDC 會釋放全局鎖,允許其他事務(wù)對源表進(jìn)行寫操作。

全局鎖的獲取可能會導(dǎo)致一些潛在的問題:

  1. 長時(shí)間鎖定:全局鎖通常需要在全量讀取過程中長時(shí)間持有,這可能會對其他業(yè)務(wù)操作產(chǎn)生影響。如果全量讀取任務(wù)的持續(xù)時(shí)間較長,其他事務(wù)可能需要等待較長時(shí)間才能執(zhí)行讀寫操作。
  2. 性能影響:獲取全局鎖可能導(dǎo)致性能下降。當(dāng)全局鎖被獲取時(shí),其他事務(wù)需要等待鎖的釋放,這可能導(dǎo)致并發(fā)性下降,特別是在高負(fù)載的情況下。長時(shí)間的等待可能會導(dǎo)致數(shù)據(jù)庫掛起(hang),影響整體系統(tǒng)的吞吐量和響應(yīng)時(shí)間。

1.2、表級鎖

在 Flink CDC 1.x 中,全量讀取表時(shí)的表鎖機(jī)制流程如下:

  1. 開始全量讀?。寒?dāng) Flink CDC 啟動全量讀取任務(wù)時(shí),它會與 MySQL 數(shù)據(jù)庫建立連接,并準(zhǔn)備開始讀取源表的數(shù)據(jù)。

  2. 獲取表級鎖:為了確保數(shù)據(jù)的一致性,在全量讀取期間需要獲取源表的表級鎖。表級鎖將阻塞其他事務(wù)對源表進(jìn)行寫操作,以保證讀取過程中數(shù)據(jù)不會發(fā)生變化。

  3. 發(fā)起鎖請求:Flink CDC 向 MySQL 數(shù)據(jù)庫發(fā)送請求,嘗試獲取源表的表級鎖。這個(gè)請求將被發(fā)送到 MySQL 的鎖管理器。

  4. 等待鎖釋放:如果源表的表級鎖已經(jīng)被其他事務(wù)占用,F(xiàn)link CDC 將等待鎖釋放的信號。在等待期間,F(xiàn)link CDC 將一直保持連接并監(jiān)測鎖的狀態(tài)。

  5. 獲取鎖成功:一旦源表的表級鎖被成功獲取,F(xiàn)link CDC 可以開始進(jìn)行全量數(shù)據(jù)的讀取操作。它會掃描源表的所有數(shù)據(jù),并將其傳輸?shù)侥繕?biāo)系統(tǒng)進(jìn)行加載和處理。

  6. 釋放表級鎖:當(dāng)全量讀取完成后,F(xiàn)link CDC 會釋放源表的表級鎖,允許其他事務(wù)對源表進(jìn)行寫操作。

表級鎖的獲取和釋放可能會帶來一些潛在的問題:

  1. 數(shù)據(jù)一致性問題:表級鎖在全量讀取期間會鎖定整張表,以保證數(shù)據(jù)的一致性。然而,在某些情況下,如果全量讀取過程中出現(xiàn)了長時(shí)間的阻塞或異常情況,可能會導(dǎo)致數(shù)據(jù)一致性問題。
  2. 長時(shí)間鎖定:表級鎖通常需要在讀取過程中長時(shí)間持有,特別是在全量讀取時(shí)。這可能會對其他事務(wù)產(chǎn)生長時(shí)間的阻塞,影響系統(tǒng)的響應(yīng)性能。

二、Flink-CDC 2.x 新特性

Flink 2.x不僅引入了增量快照讀取機(jī)制,還帶來了一些其他功能的改進(jìn)。以下是對Flink 2.x的主要功能的介紹:

  1. 增量快照讀?。篎link 2.x引入了增量快照讀取機(jī)制,這是一種全新的數(shù)據(jù)讀取方式。該機(jī)制支持并發(fā)讀取和以chunk為粒度進(jìn)行checkpoint。在增量快照讀取過程中,F(xiàn)link首先根據(jù)表的主鍵將其劃分為多個(gè)塊(chunk),然后將這些塊分配給多個(gè)讀取器并行讀取數(shù)據(jù)。這一機(jī)制極大地提高了數(shù)據(jù)讀取的效率。
  2. 精確一次性處理:Flink 2.x引入了Exactly-Once語義,確保數(shù)據(jù)處理結(jié)果的精確一次性。MySQL CDC 連接器是Flink的Source連接器,可以利用Flink的checkpoint機(jī)制來確保精確一次性處理。
  3. 動態(tài)加表:Flink 2.x支持動態(tài)加表,通過使用savepoint來復(fù)用之前作業(yè)的狀態(tài),解決了動態(tài)加表的問題。
  4. 無主鍵表的處理:Flink 2.x對無主鍵表的讀取和處理進(jìn)行了優(yōu)化。在無主鍵表中,F(xiàn)link可以通過一些額外的字段來識別數(shù)據(jù)記錄的唯一性,從而實(shí)現(xiàn)準(zhǔn)確的數(shù)據(jù)讀取和處理。

本文主要介紹了Flink 2.x引入的重要特性之一:增量快照讀取機(jī)制。該機(jī)制帶來了并發(fā)讀取、chunk粒度的checkpoint等優(yōu)勢,提升了數(shù)據(jù)讀取的效率。

三、增量快照讀取機(jī)制

3.1、功能

增量快照讀取基本功能:

  1. 并發(fā)讀?。涸谠隽靠煺兆x取期間,源(Source)可以支持并發(fā)讀取。這意味著多個(gè)讀取器可以同時(shí)讀取數(shù)據(jù),從而提高讀取的速度和效率。
  2. Chunk級別的checkpoint:增量快照讀取期間,源可以進(jìn)行chunk級別的checkpoint。這意味著在讀取過程中,可以對數(shù)據(jù)進(jìn)行更細(xì)粒度的檢查點(diǎn),提高故障恢復(fù)的準(zhǔn)確性和效率。
  3. 全量增量無鎖讀取算法:相比于舊的快照機(jī)制,全量快照讀取不需要源具有數(shù)據(jù)庫鎖權(quán)限。這降低了對數(shù)據(jù)庫的依賴和權(quán)限要求,簡化了配置和部署的過程。

3.2、并發(fā)讀取

增量快照讀取的并行讀取功能利用了Flink的Source并行度來控制源的并行度。你可以通過設(shè)置作業(yè)的并行度(parallelism.default)來實(shí)現(xiàn)。

在SQL CLI中,可以使用以下命令進(jìn)行設(shè)置:

Flink SQL> SET 'parallelism.default' = 4;

通過將并行度設(shè)置為4,F(xiàn)link CDC Source算子將占用4個(gè)slot來并行讀取數(shù)據(jù)。這樣可以最大程度地利用系統(tǒng)資源,提高數(shù)據(jù)讀取的效率和速度。

3.3、Chunk級別的checkpoint

3.3.1、Chunk

為了充分利用并行Source,MySQL CDC Source在增量快照讀取過程中使用主鍵列將表劃分為多個(gè)分片(chunk)。默認(rèn)情況下,MySQL CDC Source會識別表的主鍵列,并使用主鍵中的第一列作為分片列。如果表中沒有主鍵,增量快照讀取將失敗。你可以通過禁用scan.incremental.snapshot.enabled來回退到舊的快照讀取機(jī)制。

對于數(shù)值和自動增量拆分列,MySQL CDC Source會按照固定步長高效地拆分塊。例如,如果你有一個(gè)主鍵列為id的表,類型為自動增量的BIGINT,最小值為0,最大值為100,并設(shè)置表選項(xiàng)scan.incremental.snapshot.chunk.size的值為25,那么表將被拆分為以下塊:

(-∞, 25),
[25, 50),
[50, 75),
[75, 100),
[100, +∞)

對于其他類型的主鍵列,MySQL CDC Source執(zhí)行類似以下形式的語句來獲取每個(gè)塊的低值和高值:SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25),然后將塊集分割如下:

(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).

通過這種分片方式,MySQL CDC Source可以高效地劃分表數(shù)據(jù),以實(shí)現(xiàn)并行的增量快照讀取。每個(gè)讀取器將負(fù)責(zé)讀取和處理一個(gè)或多個(gè)分片的數(shù)據(jù),從而提高整體的讀取性能和效率。

注意,scan.incremental.snapshot.chunk.size的默認(rèn)值為8096

3.3.2、原理

在 Flink CDC 中實(shí)現(xiàn) Chunk 級別的 checkpoint 本質(zhì)是使用 Flink 的 Checkpointing 機(jī)制和相應(yīng)的配置,啟用 Chunk 級別的 checkpoint 后,F(xiàn)link CDC 將在每個(gè) Chunk 完成讀取后進(jìn)行一次 checkpoint,以確保數(shù)據(jù)的一致性和容錯(cuò)性。

注意,F(xiàn)link 的 checkpoint 機(jī)制包括兩種類型的 checkpoint:時(shí)間驅(qū)動和計(jì)數(shù)驅(qū)動。但Flink CDC 中 Chunk 級別的 checkpoint 并不是直接利用Flink 計(jì)數(shù)驅(qū)動的 checkpoint 來實(shí)現(xiàn)的,相反,它是 Flink CDC 根據(jù)自身的機(jī)制自己實(shí)現(xiàn)的。它提供了在每個(gè) Chunk 完成讀取時(shí)進(jìn)行一次 checkpoint 的能力,以實(shí)現(xiàn)更細(xì)粒度的數(shù)據(jù)一致性和容錯(cuò)性保障。

3.4、全量增量無鎖讀取算法【重點(diǎn)】

3.4.1、原理

3.4.1.1、全量無鎖讀取算法流程
  1. 首先,F(xiàn)linkCDC 會先根據(jù)主鍵和粒度將要讀取的表劃分為多個(gè)分片(chunk)。

  2. 每個(gè) MySQL CDC Source 負(fù)責(zé)讀取一個(gè)分片,多個(gè)Source 可以并發(fā)讀取多個(gè)chunk,完成當(dāng)前分片處理后才可以讀取下一個(gè)分片,直到讀取完所有分片。

  3. 在讀取每個(gè)分片時(shí),F(xiàn)linkCDC 使用一種名為偏移信號算法的方法來獲取快照區(qū)塊的最終一致輸出。以下是該算法的簡要步驟:

    • (1) 在讀取chunk數(shù)據(jù)前先記錄當(dāng)前的 binlog 位置,即 LOW 偏移量。

    • (2) 執(zhí)行語句 SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high,讀取chunk分片內(nèi)的數(shù)據(jù)并緩存至快照區(qū)塊。

    • (3) 讀取完chunk后再次記錄當(dāng)前的 binlog 位置記錄,即 HIGH 偏移量,如下圖:
      深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

    • (4) 讀取binlog:從 LOW 偏移量到 HIGH 偏移量之間的 binlog 記錄,讀取到的數(shù)據(jù) append 到上個(gè)隊(duì)列后面,并將此時(shí)binlog的最終offset保存至最后,如下圖:

    深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

    • (5) 檢查讀取到的 binlog 每條記錄,如果屬于chunk分片范圍,則對之前緩存的chunk隊(duì)列里的數(shù)據(jù)進(jìn)行修正,最后將修正后的記錄作為快照區(qū)塊的最終輸出,如下圖:

    深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

    • (6) 將此次chunk的元信息【lw,hw等】保存至MySqlSourceReader 進(jìn)行備份【checkponit階段也會保存此數(shù)據(jù)】,為后續(xù)增量讀取做準(zhǔn)備。
  4. 當(dāng)所有chunk都被消費(fèi)完畢后,即全量階段同步完畢,此時(shí)將結(jié)束Source的并發(fā)讀取,改為單線程讀取binlog日志進(jìn)行后續(xù)同步,此步驟在3.4.1.2、增量無鎖讀取算法流程。


  • 為了方便理解舉例:表當(dāng)前總數(shù)據(jù)為9條,Chunk切分粒度scan.incremental.snapshot.chunk.size=5;作業(yè)的并發(fā)數(shù)為2,故Mysql CDC Source 會有兩個(gè)Task并行讀取Chunk01,Chunk02,讀取過程如下:

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  • chunk01的數(shù)據(jù)流轉(zhuǎn)過程如下:由于update#6、update#9 不屬于chunk01分片范圍故不做處理。

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  • chunk02的數(shù)據(jù)流轉(zhuǎn)過程如下:update#9、delete#7屬于切片范圍故修正緩存數(shù)據(jù),而 update#4 不屬于chunk02分片范圍故不做處理。

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

FAQ[常見問題]:

  • chunk01 與 chunk02階段有重疊部分,即 update#9,是否會影響數(shù)據(jù)準(zhǔn)確性?
    • 答:不會,因?yàn)閏hunk只會對屬于該分片范圍的數(shù)據(jù)進(jìn)行處理,故不會重復(fù)執(zhí)行。
  • chunk01 與 chunk02 均未處理 update#4 日志,是否會影響數(shù)據(jù)準(zhǔn)確性?
    • 答:不會,因?yàn)楫?dāng)所有chunk階段結(jié)束后,MySqlSourceEnumerator調(diào)查員會根據(jù)所有chunk中的min(lw) 再次讀取binlog,選擇性補(bǔ)全數(shù)據(jù),具體細(xì)節(jié)在:3.4.1.2、增量無鎖讀取算法流程
  • chun02 沒有讀取update#6的日志,是否會影響數(shù)據(jù)準(zhǔn)確性?
    • 答:不會,因?yàn)閡pdate#6的日志 < lw,說明chunk02在lw時(shí)已經(jīng)讀取到了update#6后的最新數(shù)據(jù),故不會影響數(shù)據(jù)準(zhǔn)確性。
3.4.1.2、增量無鎖讀取算法流程
  1. 當(dāng)全量階段同步完畢后, MySqlSourceReader 會將每個(gè) chunk 的 lw,hw等元數(shù)據(jù)匯報(bào)給 MySqlSourceEnumerator調(diào)查員,如下圖:

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  1. MySqlSourceEnumerator調(diào)查員取所有chunk中最小的lw 作為offset 來讀取binlog日志,如下圖:

    深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  2. 當(dāng)一個(gè) binlog 記錄屬于一個(gè)分片的主鍵范圍內(nèi)時(shí),如果該記錄在這個(gè)分片的 hw 之后,則該記錄應(yīng)該發(fā)送給下游,如下圖:update#6、update#9雖然數(shù)據(jù)chunk02分片范圍但<=hw 故舍棄;而update#4屬于chunk01分片范圍 且 >hw 代表缺失該條記錄故發(fā)送至下游。

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  1. 當(dāng)一個(gè) binlog 記錄已經(jīng)處于所有chunk中最大的hw時(shí),即表示日志記錄已經(jīng)進(jìn)入 Pure Binlog Phase,對于這樣的 binlog 記錄,不需進(jìn)行比較,直接發(fā)送給下游,如下圖:

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

至此增量無鎖讀取算法流程完畢

3.4.2、源碼分析

  • MySql cdc 類圖關(guān)系如下:

深入解析 Flink CDC 增量快照讀取機(jī)制,flink,flink,大數(shù)據(jù),flinkcdc,數(shù)據(jù)同步,mysql

  • 快照讀取chunk分片邏輯:MySqlSnapshotSplitReadTask#doExecute
protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());

        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

        LOG.info("Snapshot step 2 - Snapshotting data");
        createDataEvents(ctx, snapshotSplit.getTableId());

        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);

        return SnapshotResult.completed(ctx.offset);
}
  • chunk分片數(shù)據(jù)讀取后進(jìn)行格式處理歸一邏輯:RecordUtils#normalizedSplitRecords
/**
     * Normalize the records of snapshot split which represents the split records state on high
     * watermark. data input: [low watermark event] [snapshot events ] [high watermark event]
     * [binlog events] [binlog-end event] data output: [low watermark event] [normalized events]
     * [high watermark event]
     */
    public static List<SourceRecord> normalizedSplitRecords(
            MySqlSnapshotSplit snapshotSplit,
            List<SourceRecord> sourceRecords,
            SchemaNameAdjuster nameAdjuster) {
        List<SourceRecord> normalizedRecords = new ArrayList<>();
        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
        List<SourceRecord> binlogRecords = new ArrayList<>();
        if (!sourceRecords.isEmpty()) {

            SourceRecord lowWatermark = sourceRecords.get(0);
            checkState(
                    isLowWatermarkEvent(lowWatermark),
                    String.format(
                            "The first record should be low watermark signal event, but is %s",
                            lowWatermark));
            SourceRecord highWatermark = null;
            int i = 1;
            for (; i < sourceRecords.size(); i++) {
                SourceRecord sourceRecord = sourceRecords.get(i);
                if (!isHighWatermarkEvent(sourceRecord)) {
                    snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
                } else {
                    highWatermark = sourceRecord;
                    i++;
                    break;
                }
            }

            if (i < sourceRecords.size() - 1) {
                List<SourceRecord> allBinlogRecords =
                        sourceRecords.subList(i, sourceRecords.size() - 1);
                for (SourceRecord binlog : allBinlogRecords) {
                    if (isDataChangeRecord(binlog)) {
                        Object[] key =
                                getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);
                        // 當(dāng)獲取chunk lw hw 的binlog后會先判斷是否數(shù)據(jù)chunk的區(qū)間內(nèi),只有負(fù)責(zé)chunk區(qū)間內(nèi)的數(shù)據(jù)才會被更正
                        if (splitKeyRangeContains(
                                key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {
                            binlogRecords.add(binlog);
                        }
                    }
                }
            }
            checkState(
                    isHighWatermarkEvent(highWatermark),
                    String.format(
                            "The last record should be high watermark signal event, but is %s",
                            highWatermark));
            // chunk數(shù)據(jù)修正邏輯函數(shù):upsertBinlog
            normalizedRecords =
                    upsertBinlog(
                            snapshotSplit,
                            lowWatermark,
                            highWatermark,
                            snapshotRecords,
                            binlogRecords);
        }
        return normalizedRecords;
    }
  • chunk數(shù)據(jù)修正邏輯:RecordUtils#upsertBinlog

    private static List<SourceRecord> upsertBinlog(
                MySqlSplit split,
                SourceRecord lowWatermarkEvent,
                SourceRecord highWatermarkEvent,
                Map<Struct, SourceRecord> snapshotRecords,
                List<SourceRecord> binlogRecords) {
            final List<SourceRecord> normalizedBinlogRecords = new ArrayList<>();
            normalizedBinlogRecords.add(lowWatermarkEvent);
            // upsert binlog events to snapshot events of split
            if (!binlogRecords.isEmpty()) {
                for (SourceRecord binlog : binlogRecords) {
                    Struct key = (Struct) binlog.key();
                    Struct value = (Struct) binlog.value();
                    if (value != null) {
                        Envelope.Operation operation =
                                Envelope.Operation.forCode(
                                        value.getString(Envelope.FieldName.OPERATION));
                        switch (operation) {
                            case UPDATE:
                                Envelope envelope = Envelope.fromSchema(binlog.valueSchema());
                                Struct source = value.getStruct(Envelope.FieldName.SOURCE);
                                Struct updateAfter = value.getStruct(Envelope.FieldName.AFTER);
                                Instant ts =
                                        Instant.ofEpochMilli(
                                                (Long) source.get(Envelope.FieldName.TIMESTAMP));
                                SourceRecord record =
                                        new SourceRecord(
                                                binlog.sourcePartition(),
                                                binlog.sourceOffset(),
                                                binlog.topic(),
                                                binlog.kafkaPartition(),
                                                binlog.keySchema(),
                                                binlog.key(),
                                                binlog.valueSchema(),
                                                envelope.read(updateAfter, source, ts));
                                snapshotRecords.put(key, record);
                                break;
                            case DELETE:
                                snapshotRecords.remove(key);
                                break;
                            case CREATE:
                                snapshotRecords.put(key, binlog);
                                break;
                            case READ:
                                throw new IllegalStateException(
                                        String.format(
                                                "Binlog record shouldn't use READ operation, the the record is %s.",
                                                binlog));
                        }
                    }
                }
            }
            normalizedBinlogRecords.addAll(snapshotRecords.values());
            normalizedBinlogRecords.add(highWatermarkEvent);
            return normalizedBinlogRecords;
        }
    
  • 全量快照結(jié)束后MySqlSourceReader 整合各個(gè)split,匯報(bào)給MySqlSourceEnumerator邏輯:handleSourceEvents

@Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
            LOG.debug(
                    "The subtask {} receives ack event for {} from enumerator.",
                    subtaskId,
                    ackEvent.getFinishedSplits());
            for (String splitId : ackEvent.getFinishedSplits()) {
                this.finishedUnackedSplits.remove(splitId);
            }
        } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            // report finished snapshot splits
            LOG.debug(
                    "The subtask {} receives request to report finished snapshot splits.",
                    subtaskId);
            reportFinishedSnapshotSplitsIfNeed();
        } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
            LOG.debug(
                    "The subtask {} receives binlog meta with group id {}.",
                    subtaskId,
                    ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
            fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!finishedUnackedSplits.isEmpty()) {
            final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
            for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent =
                    new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            context.sendSourceEventToCoordinator(reportEvent);
            LOG.debug(
                    "The subtask {} reports offsets of finished snapshot splits {}.",
                    subtaskId,
                    finishedOffsets);
        }
    }
  • MySqlSourceEnumerator 收到全量快照結(jié)束后處理邏輯:createBinlogSplit

當(dāng) MySqlSourceEnumerator 將所有 split 的 hw 收齊之后,會創(chuàng)建一個(gè) binlog split,該分片包含了需要讀取 binlog 的起始位置(所有分片 hw 的最小值)和所有分片的 hw 信息。

private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());

        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }

        // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
        // then transfer them

        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }
  • 增量階段邏輯:shouldEmit

當(dāng) MySqlSourceEnumerator 將 binlog 分片分配給 MySqlSourceReader 時(shí),任務(wù)從全量階段轉(zhuǎn)變?yōu)樵隽侩A段。MySqlSourceReader 在讀取 binlog 數(shù)據(jù)后,使用 shouldEmit 來判斷是否應(yīng)該將該記錄發(fā)送給下游。文章來源地址http://www.zghlxwxcb.cn/news/detail-772375.html

/**
     * Returns the record should emit or not.
     *
     * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
     * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
     * since the offset is after its high watermark.
     *
     * <pre> E.g: the data input is :
     *    snapshot-split-0 info : [0,    1024) highWatermark0
     *    snapshot-split-1 info : [1024, 2048) highWatermark1
     *  the data output is:
     *  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,
     *  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
     * </pre>
     */
    private boolean shouldEmit(SourceRecord sourceRecord) {
        if (isDataChangeRecord(sourceRecord)) {
            TableId tableId = getTableId(sourceRecord);
            BinlogOffset position = getBinlogPosition(sourceRecord);
           	// 判斷是否處于純凈的binlog區(qū)域
            if (hasEnterPureBinlogPhase(tableId, position)) {
                return true;
            }
            // only the table who captured snapshot splits need to filter
            if (finishedSplitsInfo.containsKey(tableId)) {
                RowType splitKeyType =
                        ChunkUtils.getSplitType(
                                statefulTaskContext.getDatabaseSchema().tableFor(tableId));
                Object[] key =
                        getSplitKey(
                                splitKeyType,
                                sourceRecord,
                                statefulTaskContext.getSchemaNameAdjuster());
                for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                    if (RecordUtils.splitKeyRangeContains(
                                    key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                            && position.isAfter(splitInfo.getHighWatermark())) { // 判斷該binlog是否屬于chunk區(qū)間且是否>該chunk的hw
                        return true;
                    }
                }
            }
            // not in the monitored splits scope, do not emit
            return false;
        }
        // always send the schema change event and signal event
        // we need record them to state of Flink
        return true;
    }

	private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
        // the existed tables those have finished snapshot reading
        if (maxSplitHighWatermarkMap.containsKey(tableId)
                && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
            return true;
        }
        // capture dynamically new added tables
        // TODO: there is still very little chance that we can't capture new added table.
        //  That the tables dynamically added after discovering captured tables in enumerator
        //  and before the lowest binlog offset of all table splits. This interval should be
        //  very short, so we don't support it for now.
        return !maxSplitHighWatermarkMap.containsKey(tableId)
                && capturedTableFilter.isIncluded(tableId);
    }

四、相關(guān)文檔

  • 官方文檔
  • Flink CDC 設(shè)計(jì)文檔
  • FAQ

到了這里,關(guān)于深入解析 Flink CDC 增量快照讀取機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 十分鐘掌握 Flink CDC,實(shí)現(xiàn)Mysql數(shù)據(jù)增量備份到Clickhouse [純干貨,建議收藏]

    十分鐘掌握 Flink CDC,實(shí)現(xiàn)Mysql數(shù)據(jù)增量備份到Clickhouse [純干貨,建議收藏]

    Clickhouse的優(yōu)點(diǎn). 真正的面向列的 DBMS ClickHouse 是一個(gè) DBMS,而不是一個(gè)單一的數(shù)據(jù)庫。它允許在運(yùn)行時(shí)創(chuàng)建表和數(shù)據(jù)庫、加載數(shù)據(jù)和運(yùn)行 查詢,而無需重新配置和重新啟動服務(wù)器。 數(shù)據(jù)壓縮 一些面向列的 DBMS(InfiniDB CE 和 MonetDB)不使用數(shù)據(jù)壓縮。但是,數(shù)據(jù)壓縮確實(shí)提高了

    2024年04月14日
    瀏覽(95)
  • 實(shí)戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴: 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象 5、CDC 數(shù)據(jù)實(shí)體類 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(24)
  • Flink系列之:Flink CDC深入了解MySQL CDC連接器

    Flink系列之:Flink CDC深入了解MySQL CDC連接器

    增量快照讀取是一種讀取表快照的新機(jī)制。與舊的快照機(jī)制相比,增量快照具有許多優(yōu)點(diǎn),包括: (1)在快照讀取期間,Source 支持并發(fā)讀取 (2)在快照讀取期間,Source 支持進(jìn)行 chunk 粒度的 checkpoint (3)在快照讀取之前,Source 不需要數(shù)據(jù)庫鎖權(quán)限。 如果希望 source 并行運(yùn)

    2024年02月02日
    瀏覽(30)
  • 11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無法獲取增量數(shù)據(jù)問題

    11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無法獲取增量數(shù)據(jù)問題

    問題是來自于 群友, 2024.03.29, 也是花了一些時(shí)間 來排查這個(gè)問題? 大致的問題是用 mysql-cdc 連接了一個(gè) mysql-pxc 集群, 然后創(chuàng)建了一個(gè) test_user 表? 使用 \\\"select * from test_user\\\" 獲取數(shù)據(jù)表的數(shù)據(jù), 可以拿到 查詢時(shí)的快照, 但是 無法獲取到后續(xù)對于 test_user 表的增量操作的數(shù)據(jù), 比如

    2024年04月15日
    瀏覽(43)
  • flink-cdc之讀取mysql變化數(shù)據(jù)

    flink-cdc之讀取mysql變化數(shù)據(jù)

    pom 代碼 注意開啟checkpoint 和不開啟是有區(qū)別的(savepoint也可以 啟動的flink指定時(shí)候 -s savepath) 不開啟,如果項(xiàng)目重啟了,會重新讀取所有的數(shù)據(jù) 開啟了,項(xiàng)目重啟了額,會根據(jù)保留的信息去讀取變化的數(shù)據(jù) ?mysql ? 數(shù)據(jù)庫表 ?增加一條數(shù)據(jù) 打印日志 op:c 是create ==FlinkCDC==

    2024年02月16日
    瀏覽(29)
  • 使用Flink CDC從數(shù)據(jù)庫采集數(shù)據(jù),保證數(shù)據(jù)不丟失:實(shí)現(xiàn)斷點(diǎn)續(xù)傳機(jī)制

    大數(shù)據(jù)技術(shù)在當(dāng)前的數(shù)據(jù)分析和處理中扮演著重要的角色。Apache Flink作為一種快速、可靠的流處理引擎,在大規(guī)模數(shù)據(jù)處理中廣受歡迎。本文將介紹如何使用Flink CDC(Change Data Capture)從數(shù)據(jù)庫采集數(shù)據(jù),并通過設(shè)置checkpoint來支持?jǐn)?shù)據(jù)采集中斷恢復(fù),從而保證數(shù)據(jù)不丟失。

    2024年02月04日
    瀏覽(27)
  • 為什么Flink-CDC讀取Decimal等數(shù)值類型變成了非數(shù)值字符串

    為什么Flink-CDC讀取Decimal等數(shù)值類型變成了非數(shù)值字符串

    每遇到一個(gè)問題,在經(jīng)過努力研究明白之后,總想寫點(diǎn)東西記錄。怎奈又沒這個(gè)好習(xí)慣,過了一兩天這個(gè)激情就沒了,想寫也寫不出來了。最近在做一個(gè)flink-cdc采集數(shù)據(jù)的測試和產(chǎn)品化開發(fā),遇到一個(gè)數(shù)據(jù)轉(zhuǎn)換的問題,折騰了我兩個(gè)早上,有些心血來潮,就記錄一下吧,對我

    2023年04月09日
    瀏覽(27)
  • 深入了解 Flink 的檢查點(diǎn)機(jī)制

    Flink 是一個(gè)流處理框架,用于實(shí)時(shí)數(shù)據(jù)處理。檢查點(diǎn)(checkpoint)機(jī)制是 Flink 的一個(gè)核心組件,用于保證流處理作業(yè)的可靠性和容錯(cuò)性。在這篇文章中,我們將深入了解 Flink 的檢查點(diǎn)機(jī)制,涵蓋其核心概念、算法原理、實(shí)例代碼以及未來發(fā)展趨勢。 Flink 的檢查點(diǎn)機(jī)制是一種保存

    2024年02月20日
    瀏覽(22)
  • 源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行

    源碼解析Flink源節(jié)點(diǎn)數(shù)據(jù)讀取是如何與checkpoint串行執(zhí)行 Flink版本:1.13.6 前置知識:源節(jié)點(diǎn)的Checkpoint是由Checkpointcoordinate觸發(fā),具體是通過RPC調(diào)用TaskManager中對應(yīng)的Task的StreamTask類的performChecpoint方法執(zhí)行Checkpoint。 本文思路:本文先分析checkpoint階段,然后再分析數(shù)據(jù)讀取階段,

    2024年02月14日
    瀏覽(28)
  • 【Flink-CDC】Flink CDC 介紹和原理概述

    【Flink-CDC】Flink CDC 介紹和原理概述

    CDC是( Change Data Capture 變更數(shù)據(jù)獲取 )的簡稱。 核心思想是, 監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)。 CDC 主要分為基于查詢和基于

    2024年01月20日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包