国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Debeizum 增量快照

這篇具有很好參考價值的文章主要介紹了Debeizum 增量快照。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

?在Debeizum1.6版本發(fā)布之后,成功推出了Incremental Snapshot(增量快照)的功能,同時取代了原有的實(shí)驗(yàn)性的Parallel Snapshot(并行快照)。在本篇博客中,我將介紹全新快照方式的原理,以及深入研究其實(shí)現(xiàn)細(xì)節(jié)。

1、快照機(jī)制

? 在以往的Debezium的中,我們需要借助其提供的Snapshot機(jī)制來獲取數(shù)據(jù)源中的歷史數(shù)據(jù)。以MySQL為例,Debezium提供了多種鎖表方式(snapshot.locking.mode),其中minimal是最小化的鎖表方式,connector會在初始化過程中讀取database schemas和其他元數(shù)據(jù)時獲取全局讀鎖,耗時一般不超過1s。然后使用REPEATABLE READS的方式讀取表中的記錄完成后續(xù)的操作。

? 看上去這種方式和mysqldump的邏輯差不多,但這種方式還是有一些硬通病:

  • 這種快照方式依然不能中斷,無法暫停和恢復(fù),一旦失敗就要重新開始,這種語義類似事務(wù)機(jī)制(必須完全執(zhí)行或者根本不執(zhí)行);
  • 如果是運(yùn)行了一段時間的connector需要重新同步歷史數(shù)據(jù),需要暫停當(dāng)前增量任務(wù)并新建新的全量任務(wù),在全量結(jié)束后重新配置增量任務(wù)并且重啟;
  • 在快照生成的過程中,任何對表中進(jìn)行的操作變更都無法捕獲,直到快照完成。這種情況特別是在歷史數(shù)據(jù)非常大時尤其嚴(yán)重;
  • 無法在connector運(yùn)行過程中添加新表。

? 直到2019年底,Netfix開發(fā)了一套參考流式系統(tǒng)中Watermark(水位)概念的數(shù)據(jù)捕獲框架,并在DBLog: A Watermark Based Change-Data-Capture Framework?該篇論文中介紹了該框架的詳細(xì)設(shè)計。其原理簡單來就是將增量任務(wù)和全量任務(wù)一起執(zhí)行,框架將高水位標(biāo)識和低水位標(biāo)識插入到事務(wù)日志中(例如MySQL的binlog),并且在二者發(fā)生在同一水位區(qū)間時做合并。

? ?Debezium 采取了這個思路,實(shí)現(xiàn)了一套增量快照機(jī)制。新的增量快照一次只讀取部分?jǐn)?shù)據(jù),不需要從頭到尾、持續(xù)運(yùn)行,并且支持隨時增加新表,還可以隨時觸發(fā)快照,而不是只在任務(wù)開始時執(zhí)行。更重要的是,快照過程中有數(shù)據(jù)變更,它也可以近乎實(shí)時地把變更也打入Kafka流之中。下面將來介紹這一實(shí)現(xiàn)細(xì)節(jié)。

2、增量快照

? 下面我們以Debezium-MySQL的視角介紹他們是增量快照的實(shí)現(xiàn)。當(dāng)一個表需要獲取其當(dāng)前快照的時候,Debeizum會做兩件事:

  1. 獲取當(dāng)前表中最大的主鍵,作為快照結(jié)束的標(biāo)準(zhǔn),并且將該值存儲在connector offset中;
  2. 根據(jù)主鍵的順序,以及increment.snapshot.chunk.size配置的大小將表分成多個塊(chunk)

? 當(dāng)查詢一個塊時將構(gòu)建一個動態(tài)SQL語句,選擇下一個increment.snapshot.chunk.size數(shù)量記錄,其最小的主鍵大于前一個塊的最后一個主鍵,并且小于或等于快照初始化時記錄的表中最大的主鍵。除此之外,當(dāng)增量快照異常停止恢復(fù)后,可以從記錄的執(zhí)行過的主鍵開始重新執(zhí)行。

? Debezium讀取到一個chunk之后,并不著急立即發(fā)送,而是將chunk放在一個叫snapshot-window的內(nèi)存窗口中間。參考以下過程:

  1. 發(fā)送一個snapshot-window-open的信號;
  2. 讀取當(dāng)前表中的一個chunk,并記錄到內(nèi)存的緩沖區(qū)中;
  3. 發(fā)送一個snapshot-window-close的信號。

snapshot-window可以是需要進(jìn)行快照的數(shù)據(jù)庫中一個表,這里的發(fā)送信號也只是往這個表里插入一條數(shù)據(jù)。時間線可以參考下圖:

Debeizum 增量快照,數(shù)據(jù)庫

? ?圖中T1~T6分別表示數(shù)據(jù)庫當(dāng)前執(zhí)行的事務(wù)從prepare到commit所經(jīng)歷的時間,注意在MySQL中只有commit的事務(wù)才會被記錄到Binlog中,Debezium從發(fā)出OPEN信號到發(fā)送CLOSE信號的過程中,只有T1~T5能夠被監(jiān)聽到。T6因?yàn)槭窃贑LOSE信號之外提交的,所以沒法監(jiān)聽到。(OPEN和CLOSE兩個信號也屬于事務(wù),有自己的binlog記錄以及commit時間)

? Debezium并不是訪問數(shù)據(jù)庫的唯一進(jìn)程。我們可以預(yù)期大量進(jìn)程同時訪問數(shù)據(jù)庫,可能訪問當(dāng)前被快照的相同主鍵記錄。如上圖所示,對數(shù)據(jù)的任何更改都會根據(jù)提交順序?qū)懭胧聞?wù)日志(例如MySQL的binlog)。由于不可能精確地確定塊讀事務(wù)的時間以識別潛在沖突,因此添加了打開和關(guān)閉窗口事件來劃分沖突可能發(fā)生的時間。Debezium的任務(wù)就是消除這些沖突。

??為此,Debezium將塊生成的所有事件記錄到緩沖區(qū)中。當(dāng)接收到snapshot-window-open信號時,將檢查來自事務(wù)日志的所有事件是否屬于快照表。如果是,則檢查緩沖區(qū)是否包含了事務(wù)日志中相同記錄的主鍵。如果是,則快照事件重復(fù)主鍵的記錄將從緩沖區(qū)中刪除,因?yàn)檫@是一個潛在的沖突。由于不可能對快照和事務(wù)日志事件進(jìn)行正確排序,因此只保留事務(wù)日志事件(事務(wù)日志新于快照日志)。當(dāng)接收到快照窗口關(guān)閉信號時,緩沖區(qū)中剩余的快照事件被發(fā)送到下游。如下圖所示:

Debeizum 增量快照,數(shù)據(jù)庫

? 上圖表示,數(shù)據(jù)庫中存在了K2、K3和K4三條記錄。在OPEN信號發(fā)送前,插入了一條K1記錄,更新了K2記錄和刪除了K3記錄,所以當(dāng)前數(shù)據(jù)庫的情況是包含了K1、K2和K4三條記錄。然后在OPEN信號發(fā)送直到CLOSE信號發(fā)送這段時間里,事務(wù)日志里面包含了K4被刪除、K5插入以及K6插入三個事件,而內(nèi)存緩沖區(qū)里面則是讀取了K1、K2、K4和剛剛插入的K5總共4條記錄(沒有加上鎖的情況,所以在讀取快照的過程中是可以讀到窗口打開時插入的數(shù)據(jù))。在窗口打開的范圍內(nèi),存在K4和K5重復(fù)的主鍵,所以從緩沖區(qū)中刪除這兩條消息,然后把事務(wù)日志刷到下游(注意沒有清空事務(wù)日志中的同ID記錄,事務(wù)日志還是原封不動刷到下游的),遇到CLOSE事件之后,將當(dāng)前緩沖區(qū)中的快照數(shù)據(jù)刷到下游去,并清空緩沖區(qū)。這里有幾個注意點(diǎn):

  1. 事務(wù)日志和讀取快照時間不可能保持一致,所以這里一旦事務(wù)日志和緩沖區(qū)內(nèi)存在了相同ID沖突,Debezium保留了事務(wù)日志刷到下游,不然可能會丟失部分刪除恢復(fù)事件。(舉個例子,在A窗口內(nèi)K4記錄被刪除并發(fā)送到事務(wù)日志中,在B窗口中K4記錄重新插入進(jìn)數(shù)據(jù)庫,但是因?yàn)樵隽垦舆t導(dǎo)致讀取快照時增量快照只讀到A窗口所在時間,這里保留了事務(wù)日志,那么會發(fā)送刪除事件到下游,恢復(fù)事件在下次讀取時發(fā)送)
  2. 快照事件應(yīng)該有別于INSERT操作,DEBEZIUM用op:r(有的版本是op:c)表示。

3、實(shí)現(xiàn)分析

以下代碼分析基于Debezium1.9版本介紹MySQL快照,區(qū)別于一開始的全量數(shù)據(jù)同步,增量快照是在運(yùn)行增量同步的同時運(yùn)行的,在Debezium運(yùn)行的過程中,允許通過外部信號的方式觸發(fā)增量快照,默認(rèn)情況下是通過監(jiān)聽某個Kafka的topic獲取信號的。

Debezium的源碼實(shí)現(xiàn)中,會通過Source表示事件源。例如MySQL的增量事件源是MysqlStreamChangeEventSource,而增量快照事件源的實(shí)現(xiàn)放在MysqlReadOnlyIncrementalSnapshotChangeEventSource。不過,要知道如何在增量執(zhí)行同時,執(zhí)行全量快照,需要我們回到增量發(fā)送數(shù)據(jù)到下游時,也就是EventDispatcher.dispatchDataChangeEvent的邏輯中。

    public boolean dispatchDataChangeEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
        try {
            boolean handled = false;
            // 如果從binlog中獲取到的數(shù)據(jù)不需要被訂閱,則忽略
            if (!filter.isIncluded(dataCollectionId)) {
                LOGGER.trace("Filtered data change event for {}", dataCollectionId);
                eventListener.onFilteredEvent(partition, "source = " + dataCollectionId, changeRecordEmitter.getOperation());
                dispatchFilteredEvent(changeRecordEmitter.getPartition(), changeRecordEmitter.getOffset());
            }
            else {
                // 拿到表結(jié)構(gòu)
                DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);

                // TODO handle as per inconsistent schema info option
                if (dataCollectionSchema == null) {
                    final Optional<DataCollectionSchema> replacementSchema = inconsistentSchemaHandler.handle(partition,
                            dataCollectionId, changeRecordEmitter);
                    if (!replacementSchema.isPresent()) {
                        return false;
                    }
                    dataCollectionSchema = replacementSchema.get();
                }

                // 發(fā)送到下游
                changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver<P>() {

                    @Override
                    public void changeRecord(P partition,
                                             DataCollectionSchema schema,
                                             Operation operation,
                                             Object key, Struct value,
                                             OffsetContext offset,
                                             ConnectHeaders headers)
                            throws InterruptedException {
                        if (operation == Operation.CREATE && connectorConfig.isSignalDataCollection(dataCollectionId) && sourceSignalChannel != null) {
                            sourceSignalChannel.process(value);

                            if (signalProcessor != null) {
                                // This is a synchronization point to immediately execute an eventual stop signal, just before emitting the CDC event
                                // in this way the offset context updated by signaling will be correctly saved
                                signalProcessor.processSourceSignal();
                            }
                        }

                        if (neverSkip || !skippedOperations.contains(operation)) {
                            transactionMonitor.dataEvent(partition, dataCollectionId, offset, key, value);
                            eventListener.onEvent(partition, dataCollectionId, offset, key, value, operation);
                            if (incrementalSnapshotChangeEventSource != null) {
                                // 交給下游的snapshot,但是如果window沒有打開的話,這里是不會傳輸給snapshot的
                                // 注意這里只需要傳遞Key就行,因?yàn)槿绻鹶alue一樣的話,默認(rèn)忽略,由stream傳遞給下游
                                // 但我看了下這里是共用同一個dispatcher,所以會影響到增量的發(fā)送
                                incrementalSnapshotChangeEventSource.processMessage(partition, dataCollectionId, key, offset);
                            }
                            // 交給下游的stream增量數(shù)據(jù)
                            streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
                        }
                    }
                });
                handled = true;
            }
            ...
         

注意一個binlog的event中可能會存在修改多個row,所以這里是每發(fā)送一個row在下游之前,就z需要執(zhí)行一下incrementalSnapshotChangeEventSource.processMessage

    // MySqlReadOnlyIncrementalSnapshotChangeEventSource
    public void processMessage(MySqlPartition partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
        // 如果當(dāng)前snapshot的窗口已經(jīng)關(guān)閉了,則立即發(fā)送當(dāng)前window里面的event
        boolean windowClosed = getContext().updateWindowState(offsetContext);
        if (windowClosed) {
            sendWindowEvents(partition, offsetContext);
            // 重新再讀一個chunk的數(shù)據(jù)
            readChunk(partition, offsetContext);
        }
        // 如果還沒關(guān)閉,則delete掉重復(fù)的key數(shù)據(jù)
        else if (!window.isEmpty() && getContext().deduplicationNeeded()) {
            deduplicateWindow(dataCollectionId, key);
        }
    }

增量快照會先檢測到當(dāng)前讀取數(shù)據(jù)窗口是否已經(jīng)關(guān)閉了,如果已經(jīng)關(guān)閉了則立即發(fā)送當(dāng)前窗口中的所有snapshotEvent到下游中,然后讀取下一個chunk的數(shù)據(jù)。

但是這里筆者在閱讀時候想到一個問題,這里是在一個線程中執(zhí)行的操作,檢測到一個row,然后檢查窗口是否關(guān)閉,關(guān)閉了就立即發(fā)送并讀取下一個chunk的數(shù)據(jù)。這樣就很奇怪,它這樣操作會加大發(fā)送延遲不說,每次只能去檢測一個row是否在一個chunk中,這樣未免效率有點(diǎn)低。

所以這里的windowClosed,我們來看下這里的updateWindowState實(shí)現(xiàn):

    /**
     * 如果一個高低水印的GTID集合不包含一個binlog事件的GTID,那么這個水印被傳遞并且窗口處理模式被更新。多個binlog事件可以具有相同的GTID,
     * 這就是為什么算法等待在水印的GTID之外的binlog事件來關(guān)閉窗口,而不是在達(dá)到最大事務(wù)id時立即關(guān)閉它。
     * 重復(fù)數(shù)據(jù)刪除從低水位之后的第一個事件開始,因?yàn)橹钡紾TID包含在低水位(在chunk select語句之前捕獲的executed_gtid_set)。
     * 低水位之后的COMMIT用于確保塊選擇看到在執(zhí)行之前提交的更改。
     * 所有高水位的事件繼續(xù)重復(fù)數(shù)據(jù)刪除。重復(fù)數(shù)據(jù)刪除的塊事件插入在高水位之外的第一個事件之前。
     */
    public boolean updateWindowState(OffsetContext offsetContext) {
        // 獲取當(dāng)前處理了的event對應(yīng)的binlog中g(shù)tid的值
        String currentGtid = getCurrentGtid(offsetContext);
        // windowOpened這個可不是chunk的window打開的標(biāo)志,每一個chunk讀取的時候都是直接讀取然后關(guān)閉的
        // 所以不需要這個值,這個值默認(rèn)為false,只有在監(jiān)聽消息topic收到openWindow的時候這個值才會設(shè)置為true(這里不討論這個場景)
        // 因?yàn)榍懊嫒绻x過一個chunk,那么這里的lowWatermark不會為空,而是當(dāng)時讀取前的gtid的值
        if (!windowOpened && lowWatermark != null) {
            // 如果當(dāng)前stream處理的gtid不存在于增量快照的低水位中且低水位不為空,則表示window打開,設(shè)置windowOpened為true
            // 注意這里的gtid是一個范圍,類似1-100這種,所以這里的contain只需判斷是否在當(dāng)前低水位的范圍內(nèi)
            boolean pastLowWatermark = !lowWatermark.contains(currentGtid);
            if (pastLowWatermark) {
                LOGGER.debug("Current gtid {}, low watermark {}", currentGtid, lowWatermark);
                windowOpened = true;
            }
        }
        // 如果windowOpened為true,而且chunk讀取完了,那么這里的highWatermark就是讀取完后的gtid
        // 否則返回false,表示chunk窗口沒關(guān)閉,全量還沒執(zhí)行完
        if (windowOpened && highWatermark != null) {
            // 正常這里讀取了一大批數(shù)據(jù)的話,高水位應(yīng)該是不包含當(dāng)前stream處理的gtid,應(yīng)該為true
            boolean pastHighWatermark = !highWatermark.contains(currentGtid);
            if (pastHighWatermark) {
                LOGGER.debug("Current gtid {}, high watermark {}", currentGtid, highWatermark);
                // 關(guān)閉窗口,同時情況高低水位信息
                closeWindow();
                return true;
            }
        }
        return false;
    }


    // GtidSet MySQL水位用gtid表示高低水位
    public boolean contains(String gtid) {
        // split獲取出serverId和transactionId范圍
        String[] split = GTID_DELIMITER.split(gtid);
        // 這里叫serverId才對
        String sourceId = split[0];
        // 根據(jù)serverId拿到transactionId,我估計這里用Map存儲的原因是因?yàn)橛锌赡苤鲝那袚Q后
        // 一個gtid里面會存在多個serverId以及對應(yīng)的transactionId
        // gtid類似這樣 4160e9b3-58d9-11e8-b174-005056af6f24:1-19,甚至可以是多個8eed0f5b-6f9b-11e9-94a9-005056a57a4e:1-3:11:47-49
        // GTID = server_uuid :transaction_id
        UUIDSet uuidSet = forServerWithId(sourceId);
        if (uuidSet == null) {
            return false;
        }
        // 你用show master status看的話可能是連著的多個,8eed0f5b-6f9b-11e9-94a9-005056a57a4e:1-3:11:47-49
        // 但是一個行的話只能是一個8eed0f5b-6f9b-11e9-94a9-005056a57a4e:23
        long transactionId = Long.parseLong(split[1]);
        return uuidSet.contains(transactionId);
    }

    // GtidSet
    public boolean contains(long transactionId) {
        for (Interval interval : this.intervals) {
            if (interval.contains(transactionId)) {
                return true;
            }
        }
        return false;
    }

    // GtidSet
    public boolean contains(long transactionId) {
        return getStart() <= transactionId && transactionId <= getEnd();
    }

當(dāng)updateWindowState返回true的時候,就會嘗試發(fā)送快照窗口中的所有數(shù)據(jù)到下游,然后重新讀取一個chunk的數(shù)據(jù),否則調(diào)用deduplicateWindow刪除窗口中與當(dāng)前row同個ID的快照數(shù)據(jù)。

首先,通過SHOW MASTER STATUS獲取到GTID,并設(shè)置為低水位,當(dāng)時獲取到的GTID集合應(yīng)該是類似xxx:1-465,也就是在當(dāng)前集群應(yīng)用過的事務(wù)合集。而從binlog拿出的每一個row,其GTID應(yīng)該是xxx:467這樣的類型。這里的updateWindowState的邏輯,主要是用于判斷當(dāng)前ROW是否在低水位的后面,或者在高水位的后面,以此檢測row是否在窗口的范圍之內(nèi)的流式數(shù)據(jù)。

Debeizum 增量快照,數(shù)據(jù)庫

一旦當(dāng)前row不在低水位的范圍內(nèi),那么表示窗口打開(windowOpen=true),而如果row在高水位的范圍內(nèi),那么當(dāng)前row應(yīng)該是窗口的增量數(shù)據(jù),直到不在這個范圍里面則表示關(guān)閉且應(yīng)該flush掉這些窗口中的數(shù)據(jù)到下游。所以updateWindowState的作用就是檢測增量數(shù)據(jù)是否在窗口的高低水位范圍內(nèi)。對于在范圍內(nèi)的,會采用dedeplicateWindow的邏輯剔除出窗口里的快照數(shù)據(jù)。

    protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) {
        if (context.currentDataCollectionId() == null || !context.currentDataCollectionId().getId().equals(dataCollectionId)) {
            return;
        }
        if (key instanceof Struct) {
            // 直接remove掉
            if (window.remove((Struct) key) != null) {
                LOGGER.info("Removed '{}' from window", key);
            }
        }
    }

最后看下readChunk的邏輯,這里是每次去源集群中獲取足夠多的數(shù)據(jù)。

    // AbstractIncrementalSnapshotChangeEventSource
    protected void readChunk(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (!context.snapshotRunning()) {
            LOGGER.info("Skipping read chunk because snapshot is not running");
            postIncrementalSnapshotCompleted();
            return;
        }
        if (context.isSnapshotPaused()) {
            LOGGER.info("Incremental snapshot was paused.");
            return;
        }
        try {
            preReadChunk(context);
            // This commit should be unnecessary and might be removed later
            jdbcConnection.commit();
            // 開始讀取一個新的chunk
            context.startNewChunk();
            // 打開一個新的窗口,這在Mysql中是設(shè)置GTID為一個窗口的低水位
            emitWindowOpen();
            while (context.snapshotRunning()) {
                if (isTableInvalid(partition, offsetContext)) {
                    continue;
                }
                if (connectorConfig.isIncrementalSnapshotSchemaChangesEnabled() && !schemaHistoryIsUpToDate()) {
                    // Schema has changed since the previous window.
                    // Closing the current window and repeating schema verification within the following window.
                    break;
                }
                final TableId currentTableId = (TableId) context.currentDataCollectionId().getId();
                // 當(dāng)前上下文中沒有關(guān)于currentTableId的key最大值
                if (!context.maximumKey().isPresent()) {
                    // 重新獲取表結(jié)構(gòu)
                    currentTable = refreshTableSchema(currentTable);
                    Object[] maximumKey;
                    try {
                        // 獲取當(dāng)前表的最大key,作為快照結(jié)束的標(biāo)志
                        maximumKey = jdbcConnection.queryAndMap(
                                buildMaxPrimaryKeyQuery(currentTable, context.currentDataCollectionId().getAdditionalCondition()), rs -> {
                                    if (!rs.next()) {
                                        return null;
                                    }
                                    return keyFromRow(jdbcConnection.rowToArray(currentTable, rs,
                                            ColumnUtils.toArray(rs, currentTable)));
                                });
                        context.maximumKey(maximumKey);
                    }
                    catch (SQLException e) {
                        LOGGER.error("Failed to read maximum key for table {}", currentTableId, e);
                        nextDataCollection(partition, offsetContext);
                        continue;
                    }
                    if (!context.maximumKey().isPresent()) {
                        LOGGER.info(
                                "No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
                                currentTableId);
                        nextDataCollection(partition, offsetContext);
                        continue;
                    }
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
                                context.maximumKey().orElse(new Object[0]));
                    }
                }
                // 獲取關(guān)于該表的dataEvent,從這里開始讀取表中的數(shù)據(jù)
                if (createDataEventsForTable(partition)) {

                    String dataCollections = context.getDataCollections().stream()
                            .map(DataCollection::getId)
                            .map(DataCollectionId::identifier).collect(
                                    Collectors.joining(","));

                    // 如果窗口中捕獲不到任何數(shù)據(jù),則立即開始關(guān)于下一個dataCollection的數(shù)據(jù)獲取
                    if (window.isEmpty()) {
                        LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
                                currentTableId);

                        notificationService.notify(buildNotificationWith(SnapshotStatus.TABLE_SCAN_COMPLETED,
                                Map.of(
                                        "data_collections", dataCollections,
                                        "total_rows_scanned", String.valueOf(totalRowsScanned)),
                                offsetContext),
                                Offsets.of(partition, offsetContext));

                        tableScanCompleted(partition);
                        // 開始下一個表dataCollection的獲取
                        nextDataCollection(partition, offsetContext);
                    }
                    else {
                        // 事件通知
                        notificationService.notify(buildNotificationWith(SnapshotStatus.IN_PROGRESS,
                                Map.of(
                                        "data_collections", dataCollections,
                                        "current_collection_in_progress", context.currentDataCollectionId().getId().identifier(),
                                        "maximum_key", context.maximumKey().orElse(new Object[0])[0].toString(),
                                        "last_processed_key", context.chunkEndPosititon()[0].toString()),
                                offsetContext),
                                Offsets.of(partition, offsetContext));
                        break;
                    }
                }
                else {
                    context.revertChunk();
                    break;
                }
            }
            // 關(guān)閉當(dāng)前窗口,設(shè)置gtid為高水位
            emitWindowClose(partition, offsetContext);
        }
        catch (SQLException e) {
            throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e);
        }
        finally {
            postReadChunk(context);
            if (!context.snapshotRunning()) {
                postIncrementalSnapshotCompleted();
            }
        }
    }

這里去讀取快照數(shù)據(jù)之前,會先獲取到當(dāng)前table最大的主鍵的值,作為增量快照結(jié)束的點(diǎn)。關(guān)鍵是在這里的createDataEventsForTable(partition)這里。

    // AbstractIncrementalSnapshotChangeEventSource
    private boolean createDataEventsForTable(P partition) {
        long exportStart = clock.currentTimeInMillis();
        LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.dataCollectionsToBeSnapshottedCount());

        // 構(gòu)建chunk查詢sql
        final String selectStatement = buildChunkQuery(currentTable, context.currentDataCollectionId().getAdditionalCondition());
        LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
                selectStatement, context.chunkEndPosititon(), context.maximumKey().get());

        final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());

        try (PreparedStatement statement = readTableChunkStatement(selectStatement);
                ResultSet rs = statement.executeQuery()) {
            // 檢查表結(jié)構(gòu)是否發(fā)生變化,如果失敗應(yīng)該返回false,并重新讀取表結(jié)構(gòu)和最大key
            if (checkSchemaChanges(rs)) {
                return false;
            }
            final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
            long rows = 0;
            Timer logTimer = getTableScanLogTimer();

            Object[] lastRow = null;
            Object[] firstRow = null;
            while (rs.next()) {
                rows++;
                // 這里是取出表中的記錄的所有字段
                final Object[] row = jdbcConnection.rowToArray(currentTable, rs, columnArray);
                if (firstRow == null) {
                    firstRow = row;
                }
                // 將獲取到的快照數(shù)據(jù)塞入window這個值中,后續(xù)發(fā)送和刪除重復(fù)key都是在這個值中操作
                final Struct keyStruct = tableSchema.keyFromColumnData(row);
                window.put(keyStruct, row);
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(),
                            Strings.duration(stop - exportStart));
                    logTimer = getTableScanLogTimer();
                }
                lastRow = row;
            }
            final Object[] firstKey = keyFromRow(firstRow);
            // 獲取到的數(shù)據(jù)都是根據(jù)id嚴(yán)格排序的,所以這里的lastKey可以作為下一次讀取chunk的查詢條件
            final Object[] lastKey = keyFromRow(lastRow);
            if (context.isNonInitialChunk()) {
                progressListener.currentChunk(partition, context.currentChunkId(), firstKey, lastKey);
            }
            else {
                progressListener.currentChunk(partition, context.currentChunkId(), firstKey, lastKey, context.maximumKey().orElse(null));
            }
            // 記錄lastKey,作為下一次chunk的查詢條件
            context.nextChunkPosition(lastKey);
            if (lastRow != null) {
                LOGGER.debug("\t Next window will resume from {}", (Object) context.chunkEndPosititon());
            }

            LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
                    currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart));
            incrementTableRowsScanned(partition, rows);
        }
        catch (SQLException e) {
            throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
        }
        return true;
    }

    // AbstractIncrementalSnapshotChangeEventSource
    protected PreparedStatement readTableChunkStatement(String sql) throws SQLException {
        final PreparedStatement statement = jdbcConnection.readTablePreparedStatement(connectorConfig, sql,
                OptionalLong.empty());
        if (context.isNonInitialChunk()) {
            final Object[] maximumKey = context.maximumKey().get();
            final Object[] chunkEndPosition = context.chunkEndPosititon();
            // Fill boundaries placeholders
            int pos = 0;
            for (int i = 0; i < chunkEndPosition.length; i++) {
                for (int j = 0; j < i + 1; j++) {
                    statement.setObject(++pos, chunkEndPosition[j]);
                }
            }
            // Fill maximum key placeholders
            for (int i = 0; i < chunkEndPosition.length; i++) {
                for (int j = 0; j < i + 1; j++) {
                    statement.setObject(++pos, maximumKey[j]);
                }
            }
        }
        return statement;
    }

這里作者考慮到表的主鍵可能是復(fù)合主鍵,在每一次重新去讀取chunk的時候,都需要讀取比上一次讀取的最大主鍵大一定數(shù)量的快照數(shù)據(jù)。注意下面批量讀取chunk的設(shè)計,不是用offset和limit的方式,而是使用主鍵進(jìn)行比對,充分使用B+樹的特性文章來源地址http://www.zghlxwxcb.cn/news/detail-623978.html

    // AbstractIncrementalSnapshotChangeEventSource
    protected String buildChunkQuery(Table table, int limit, Optional<String> additionalCondition) {
        String condition = null;
        // Add condition when this is not the first query
        if (context.isNonInitialChunk()) {
            final StringBuilder sql = new StringBuilder();
            // Window boundaries
            addLowerBound(table, sql);
            // Table boundaries
            sql.append(" AND NOT ");
            addLowerBound(table, sql);
            condition = sql.toString();
        }
        final String orderBy = getQueryColumns(table).stream()
                .map(c -> jdbcConnection.quotedColumnIdString(c.name()))
                .collect(Collectors.joining(", "));
        return jdbcConnection.buildSelectWithRowLimits(table.id(),
                limit,
                buildProjection(table),
                Optional.ofNullable(condition),
                additionalCondition,
                orderBy);
    }

    // AbstractIncrementalSnapshotChangeEventSource
    private void addLowerBound(Table table, StringBuilder sql) {
        // To make window boundaries working for more than one column it is necessary to calculate
        // with independently increasing values in each column independently.
        // For one column the condition will be (? will always be the last value seen for the given column)
        // (k1 > ?)
        // For two columns
        // (k1 > ?) OR (k1 = ? AND k2 > ?)
        // For four columns
        // (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
        // etc.
        // 獲取pk column
        final List<Column> pkColumns = getQueryColumns(table);
        if (pkColumns.size() > 1) {
            sql.append('(');
        }
        // 這里的兩個i,j循環(huán)的意思是,根據(jù)主鍵列用OR拼接出主鍵列數(shù)量的條件,例如主鍵有3個,分別是pk1,pk2,pk3
        // 那么拼接出來的條件就是 (pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)
        // 后面還有l(wèi)imit,以此獲取足夠多的chunk,而且根據(jù)逐漸數(shù)量遞增
        for (int i = 0; i < pkColumns.size(); i++) {
            // 是否是最后一列
            final boolean isLastIterationForI = (i == pkColumns.size() - 1);
            sql.append('(');
            for (int j = 0; j < i + 1; j++) {
                final boolean isLastIterationForJ = (i == j);
                // quotedColumnIdString 是避免用戶用關(guān)鍵字作為字段,所以加上開閉服務(wù),類似MySQL可以用`columnName`
                sql.append(jdbcConnection.quotedColumnIdString(pkColumns.get(j).name()));
                // 這里加上  > 是用于保證id大于某個值?
                sql.append(isLastIterationForJ ? " > ?" : " = ?");
                if (!isLastIterationForJ) {
                    sql.append(" AND ");
                }
            }
            sql.append(")");
            if (!isLastIterationForI) {
                sql.append(" OR ");
            }
        }
        if (pkColumns.size() > 1) {
            sql.append(')');
        }
    }

到了這里,關(guān)于Debeizum 增量快照的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink CDC 2.4 正式發(fā)布,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    Flink CDC 2.4 正式發(fā)布,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    Flink CDC [1] 是基于數(shù)據(jù)庫的日志 CDC 技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。配合 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),F(xiàn)link CDC 可以高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時集成。 作為新一代的實(shí)時數(shù)據(jù)集成框架,F(xiàn)link CDC 具有全增量一體化、無鎖讀取、并行讀取、表結(jié)構(gòu)變更

    2024年02月12日
    瀏覽(23)
  • Elasticsearch:增量快照如何工作?

    Elasticsearch:增量快照如何工作?

    作者:Lutf ur Rehman Elastic 提供許多由講師指導(dǎo)的面對面和虛擬現(xiàn)場培訓(xùn)以及點(diǎn)播培訓(xùn)。 我們的旗艦課程是 Elasticsearch 工程師、Kibana 數(shù)據(jù)分析和 Elastic 可觀測性工程師。 所有這些課程都會獲得認(rèn)證。如果你想更多了解這些認(rèn)證方面的知識,請閱讀文章 “Elastic:如何成為一名

    2024年02月11日
    瀏覽(20)
  • 深入解析 Flink CDC 增量快照讀取機(jī)制

    深入解析 Flink CDC 增量快照讀取機(jī)制

    Flink CDC 1.x 使用 Debezium 引擎集成來實(shí)現(xiàn)數(shù)據(jù)采集,支持全量加增量模式,確保數(shù)據(jù)的一致性。然而,這種集成存在一些痛點(diǎn)需要注意: 一致性通過加鎖保證 :在保證數(shù)據(jù)一致性時,Debezium 需要對讀取的庫或表加鎖。全局鎖可能導(dǎo)致數(shù)據(jù)庫出現(xiàn)掛起情況,而表級鎖會影響表的

    2024年02月03日
    瀏覽(22)
  • Flink CDC 2.4 正式發(fā)布,5分鐘了解CDC 2.4新內(nèi)容,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    Flink CDC 2.4 正式發(fā)布,5分鐘了解CDC 2.4新內(nèi)容,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    來源:https://ververica.github.io/flink-cdc-connectors/master/ Flink CDC [1] 是基于數(shù)據(jù)庫的日志 CDC 技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。配合 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),F(xiàn)link CDC 可以高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時集成。 具體關(guān)于Flink CDC是什么?可以看下這篇文字 作

    2024年02月12日
    瀏覽(26)
  • hive 全量表、增量表、快照表、切片表和拉鏈表

    hive 全量表、增量表、快照表、切片表和拉鏈表

    全量表 :記錄每天的所有的最新狀態(tài)的數(shù)據(jù), 增量表 :記錄每天的新增數(shù)據(jù),增量數(shù)據(jù)是上次導(dǎo)出之后的新數(shù)據(jù)。 快照表 :按日分區(qū),記錄截止數(shù)據(jù)日期的全量數(shù)據(jù) 切片表 :切片表根據(jù)基礎(chǔ)表,往往只反映某一個維度的相應(yīng)數(shù)據(jù)。其表結(jié)構(gòu)與基礎(chǔ)表結(jié)構(gòu)相同,但數(shù)據(jù)往往

    2024年02月13日
    瀏覽(23)
  • Flink CDC 2.3 發(fā)布,持續(xù)優(yōu)化性能,更多連接器支持增量快照,新增 Db2 支持

    Flink CDC 2.3 發(fā)布,持續(xù)優(yōu)化性能,更多連接器支持增量快照,新增 Db2 支持

    01 Flink CDC 簡介 Flink CDC? [ 1] 是基于數(shù)據(jù)庫的日志 CDC 技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。配合 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),F(xiàn)link CDC 可以高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時集成。 作為新一代的實(shí)時數(shù)據(jù)集成框架,F(xiàn)link CDC 具有全增量一體化、無鎖讀取、并行讀

    2024年02月01日
    瀏覽(22)
  • 【數(shù)據(jù)庫原理】(32)數(shù)據(jù)庫設(shè)計-數(shù)據(jù)庫物理設(shè)計

    數(shù)據(jù)庫的物理設(shè)計是數(shù)據(jù)庫設(shè)計過程中至關(guān)重要的一個階段。其核心目標(biāo)是選擇一個適合應(yīng)用環(huán)境的物理結(jié)構(gòu),以滿足特定的性能、存儲和訪問需求。這一階段涉及的關(guān)鍵任務(wù)可以分為兩個主要步驟: 1. 確定數(shù)據(jù)的物理結(jié)構(gòu) 存儲結(jié)構(gòu)和存取方法的選擇 :這包括決定數(shù)據(jù)在物

    2024年01月19日
    瀏覽(31)
  • 【數(shù)據(jù)庫概論】圖數(shù)據(jù)庫 Vs 關(guān)系數(shù)據(jù)庫(1)

    【數(shù)據(jù)庫概論】圖數(shù)據(jù)庫 Vs 關(guān)系數(shù)據(jù)庫(1)

    假設(shè)有一個社交網(wǎng)絡(luò)需要用數(shù)據(jù)庫存儲,其中人與人之間的關(guān)系有:朋友(friend)、父母(parent) 首先用關(guān)系數(shù)據(jù)庫來實(shí)現(xiàn)朋友關(guān)系,需要 3 張表:people、people_relation、relation 如果要查詢 Jam 的所有朋友的信息,那么就需要連接三張表: 如果表的數(shù)據(jù)量較大,那么查詢效率就

    2024年03月14日
    瀏覽(39)
  • 【數(shù)據(jù)庫】 | 初始數(shù)據(jù)庫

    【數(shù)據(jù)庫】 | 初始數(shù)據(jù)庫

    ??? 博客新人,希望大家一起加油進(jìn)步 ??? 乾坤未定,你我皆黑馬 1、什么是數(shù)據(jù)庫 存儲數(shù)據(jù)用文件就可以了,為什么還要弄個數(shù)據(jù)庫? 文件保存數(shù)據(jù)有以下幾個缺點(diǎn): 文件的安全性問題 文件不利于數(shù)據(jù)查詢和管理 文件不利于存儲海量數(shù)據(jù) 文件在程序中控制不方便 數(shù)據(jù)

    2023年04月23日
    瀏覽(32)
  • 【數(shù)據(jù)庫】數(shù)據(jù)庫設(shè)計

    【數(shù)據(jù)庫】數(shù)據(jù)庫設(shè)計

    數(shù)據(jù)庫設(shè)計面對的主要有哪些問題 (1) 懂?dāng)?shù)據(jù)庫原理同時懂甲方軟件專業(yè)知識的人缺少; (2) 應(yīng)用的數(shù)據(jù)庫系統(tǒng)的最終目標(biāo)往往在一開始不能完全明確,與開發(fā)者與用戶方最初沒在要求完全一致有關(guān); (3) 應(yīng)用業(yè)務(wù)系統(tǒng)千差萬別的,難以找到一種通用的工具和方法。 (1) 對人員

    2024年02月05日
    瀏覽(40)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包