国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka系列之:對(duì)源連接器的的Exactly-Once支持

這篇具有很好參考價(jià)值的文章主要介紹了Kafka系列之:對(duì)源連接器的的Exactly-Once支持。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、背景

冪等生產(chǎn)者:

  • 寫(xiě)入 Kafka 的應(yīng)用程序重復(fù)記錄的一個(gè)常見(jiàn)來(lái)源是自動(dòng)生產(chǎn)者重試,其中生產(chǎn)者會(huì)在某些情況下將批次重新發(fā)送到 Kafka,即使該批次已由代理提交。KIP-98 允許用戶將其生產(chǎn)者配置為冪等地執(zhí)行這些重試,這樣下游應(yīng)用程序只能看到自動(dòng)重新發(fā)送的生產(chǎn)者批次中任何消息的一個(gè)實(shí)例,即使該批次已由生產(chǎn)者多次發(fā)送到 Kafka。

交易生產(chǎn)者:

  • 該 KIP 面向用戶的變化的另一個(gè)主要組成部分是事務(wù)生產(chǎn)者的引入,它可以創(chuàng)建跨越多個(gè)主題分區(qū)的事務(wù)并對(duì)它們執(zhí)行原子寫(xiě)入。此外,事務(wù)生產(chǎn)者能夠“隔離”其他生產(chǎn)者,這涉及聲明事務(wù) ID 的所有權(quán),并禁止具有相同事務(wù) ID 的任何先前生產(chǎn)者實(shí)例能夠?qū)?Kafka 進(jìn)行任何后續(xù)寫(xiě)入。這對(duì)于防止客戶端應(yīng)用程序的僵尸實(shí)例發(fā)送重復(fù)消息特別有用。

術(shù)語(yǔ)
此處使用“防護(hù)”作為通用術(shù)語(yǔ)來(lái)描述禁用某些類型的應(yīng)用程序的舊實(shí)例或?qū)嵗M。

  • “隔離”事務(wù)生產(chǎn)者意味著禁止該生產(chǎn)者對(duì) Kafka 執(zhí)行任何進(jìn)一步的寫(xiě)入操作(例如,實(shí)現(xiàn)此目的的一種方法是使用相同的事務(wù) ID 實(shí)例化一個(gè)新的事務(wù)生產(chǎn)者)
  • “隔離”一代源任務(wù)意味著禁止該代中的每個(gè)源任務(wù)向 Kafka 生成更多源記錄,或提交更多源偏移量

“源分區(qū)”在這里有兩個(gè)含義。它可以指:

  • 源連接器從中讀取并能夠分配給單個(gè)任務(wù)的外部系統(tǒng)的一部分。例如,數(shù)據(jù)庫(kù)中的表或 Kafka 源連接器(例如 MirrorMaker 2)中的主題分區(qū)
  • 源任務(wù)向 Connect 框架提供的鍵/值對(duì)中的鍵以及源記錄(例如,請(qǐng)參閱SourceRecord 構(gòu)造函數(shù)),用于跟蹤源分區(qū)的消費(fèi)進(jìn)度
  • “源偏移量”可以用來(lái)指上述鍵/值對(duì)中以“源分區(qū)”為鍵的值,也可以指整個(gè)鍵/值對(duì)(與“offset”可以用來(lái)指主題分區(qū)和其中某個(gè)位置的組合,或者只是主題/分區(qū)/偏移三重奏中的最后一個(gè)坐標(biāo))

二、目標(biāo)

接收器連接器已經(jīng)可以實(shí)現(xiàn)一次性交付一段時(shí)間了,并且在具有獨(dú)特鍵約束等功能的系統(tǒng)中,甚至相對(duì)容易。然而,鑒于 Kafka 缺乏允許輕松修復(fù)和/或防止重復(fù)的功能,對(duì)于源連接器來(lái)說(shuō)情況并非如此,并且如果沒(méi)有框架級(jí)別的調(diào)整,源連接器的一次性交付仍然是不可能的。

造成這種情況的主要原因有兩個(gè):

源偏移準(zhǔn)確性:一旦源任務(wù)對(duì)應(yīng)的源記錄已成功發(fā)送到 Kafka, Connect 框架就會(huì)以可配置的時(shí)間間隔定期將源任務(wù)偏移寫(xiě)入內(nèi)部 Kafka 主題。這為大多數(shù)源連接器提供了至少一次傳送保證,但在源記錄寫(xiě)入 Kafka 但工作線程在將這些記錄的偏移量寫(xiě)入 Kafka 之前被中斷的情況下,允許重復(fù)寫(xiě)入的可能性。

僵尸:某些場(chǎng)景可能會(huì)導(dǎo)致多個(gè)任務(wù)同時(shí)運(yùn)行并為同一源分區(qū)(例如數(shù)據(jù)庫(kù)表或 Kafka 主題分區(qū))生成數(shù)據(jù)。這也可能導(dǎo)致框架產(chǎn)生重復(fù)的記錄。

源連接器中重復(fù)記錄的另一個(gè)常見(jiàn)來(lái)源是自動(dòng)生產(chǎn)者重試。但是,用戶已經(jīng)可以通過(guò)將源連接器配置為通過(guò)工作人員級(jí)別 producer.enable.idempotence 或連接器級(jí)別 producer.override.enable.idempotence 屬性使用冪等生產(chǎn)者來(lái)解決此問(wèn)題。

為了支持源連接器的一次性交付保證,框架應(yīng)該擴(kuò)展為以原子方式將源記錄及其源偏移寫(xiě)入 Kafka,并防止僵尸任務(wù)向 Kafka 生成數(shù)據(jù)。

通過(guò)這些更改,任何源連接器的每個(gè)源分區(qū)一次最多分配給一個(gè)任務(wù),并且能夠在啟動(dòng)時(shí)根據(jù)先前任務(wù)向框架提供的源偏移量恢復(fù)上游源的消耗,應(yīng)該能夠一次性交付。

目標(biāo)

  • 需要盡可能少地更改每個(gè)連接器。Connect 生態(tài)系統(tǒng)目前已經(jīng)相當(dāng)成熟,即使是像新 SourceTask 方法這樣輕量級(jí)的東西也必須應(yīng)用于潛在的數(shù)十個(gè)甚至數(shù)百個(gè)連接器。當(dāng)前的提案不需要對(duì)現(xiàn)有連接器進(jìn)行此類更改,只要它們正確使用源偏移量 API 即可。
  • 最大限度地減少配置更改。此功能的實(shí)現(xiàn)可能會(huì)變得相當(dāng)復(fù)雜,但用戶應(yīng)該很容易啟用和理解。 例如, KIP-98添加了對(duì) Kafka 及其 Java 客戶端的一次性支持,并且僅向用戶公開(kāi)了 3 個(gè)生產(chǎn)者屬性和 1 個(gè)消費(fèi)者屬性。當(dāng)前的提案僅添加了五個(gè)面向用戶的配置屬性。
  • 最大限度地減少用戶的操作負(fù)擔(dān)。此功能很可能會(huì)被大量使用,甚至可能在 Connect 的更高版本中默認(rèn)啟用。應(yīng)盡可能少地支持各種 Connect 部署方式和環(huán)境,包括但不限于安全意識(shí)和資源意識(shí)環(huán)境。當(dāng)前提案在安全意識(shí)環(huán)境中的要求定義明確且易于預(yù)期,并且不會(huì)顯著改變 Connect 的資源分配情況。
  • 最大限度地減少問(wèn)題和潛在的問(wèn)題。沒(méi)有人喜歡精美的印刷品。如果我們能夠解決 Connect 用戶或 Connect 開(kāi)發(fā)人員的常見(jiàn)用例,我們應(yīng)該;為了更簡(jiǎn)單的設(shè)計(jì)或較小的更改而放棄某些內(nèi)容只能作為最后的手段。即使有詳細(xì)記錄,用例中的已知差距(尤其是那些因連接器而異的差距)也可能不會(huì)傳播給最終用戶或被最終用戶理解,并且可能被視為此功能質(zhì)量的缺陷。
  • 總的來(lái)說(shuō),讓這個(gè)功能給人們帶來(lái)使用的樂(lè)趣,而不是痛苦。在投票通過(guò)之前,這個(gè)問(wèn)題還沒(méi)有定論!

三、公共接口

參數(shù) 類型 默認(rèn)值 重要性 描述
exactly.once.source.support STRING disabled HIGH 是否通過(guò)在 Kafka 事務(wù)中寫(xiě)入源記錄及其偏移量,以及在啟動(dòng)新任務(wù)之前主動(dòng)隔離舊任務(wù)代,來(lái)啟用對(duì)集群中源連接器的一次性支持。請(qǐng)注意,必須在集群中的每個(gè)工作線程上啟用此功能,才能保證一次性交付,并且即使啟用此支持,某些源連接器可能仍然無(wú)法提供一次性交付保證。允許的值為“禁用”、“準(zhǔn)備”和“啟用”。為了安全地啟用對(duì)源連接器的一次性支持,必須首先更新集群中的所有工作線程以使用此屬性的“準(zhǔn)備”值。完成此操作后,應(yīng)對(duì)集群中的所有工作線程執(zhí)行第二次更新,以將此屬性的值更改為“啟用”。
exactly.once.support STRING “requested” MEDIUM 允許的值是“請(qǐng)求的”和“必需的”。如果設(shè)置為“必需”,則強(qiáng)制對(duì)連接器進(jìn)行預(yù)檢,以確保它可以使用給定的配置提供一次性交付。某些連接器可能能夠提供一次性傳送,但不會(huì)向 Connect 發(fā)出信號(hào)表明它們支持此功能;在這種情況下,在創(chuàng)建連接器之前應(yīng)仔細(xì)查閱連接器的文檔,并且該屬性的值應(yīng)設(shè)置為“請(qǐng)求”。此外,如果該值設(shè)置為“必需”,但執(zhí)行預(yù)檢驗(yàn)證的工作線程沒(méi)有為源連接器啟用恰好一次支持,則創(chuàng)建或驗(yàn)證連接器的請(qǐng)求將失敗。
transaction.boundary STRING “poll” MEDIUM 允許的值為“輪詢”、“連接器”和“間隔”。如果設(shè)置為“輪詢”,則將為該連接器中的每個(gè)任務(wù)提供給 Connect 的每批記錄啟動(dòng)并提交一個(gè)新的生產(chǎn)者事務(wù)。如果設(shè)置為“連接器”,則依賴于連接器定義的事務(wù)邊界;請(qǐng)注意,并非所有連接器都能夠定義自己的事務(wù)邊界,在這種情況下,嘗試將此屬性設(shè)置為“連接器”來(lái)創(chuàng)建它們將會(huì)失敗。最后,如果設(shè)置為“interval”,則僅在用戶定義的時(shí)間間隔過(guò)去后提交事務(wù)。
offsets.storage.topic STRING null LOW 用于此連接器的單獨(dú)偏移主題的名稱。如果為空或未指定,則將使用工作人員的全局偏移量主題名稱。如果指定,如果該連接器的目標(biāo) Kafka 集群上尚不存在偏移量主題,則將創(chuàng)建該偏移量主題(如果連接器生產(chǎn)者的 bootstrap.servers 屬性已被設(shè)置,則該偏移量主題可能與用于工作線程全局偏移量主題的主題不同)從工人的覆蓋)。
transaction.boundary.interval.ms STRING null LOW 如果“transaction.boundary”設(shè)置為“interval”,則確定連接器任務(wù)提交生產(chǎn)者事務(wù)的時(shí)間間隔。如果未設(shè)置,則默認(rèn)為工作級(jí)別“offset.flush.interval.ms”屬性的值。

四、連接器 API 擴(kuò)展

此處任何新引入的接口、類等都將添加到工件中 connect-api ,因?yàn)樗鼈儗⒊蔀?Connect 公共 API 的一部分。

ExactlyOnceSupport 引入了一個(gè)新的 枚舉:

ExactlyOnce

package org.apache.kafka.connect.source;
 
/**
 * An enum to represent the level of support for exactly-once delivery from a source connector.
 */
public enum ExactlyOnceSupport {
    /**
     * Signals that a connector supports exactly-once delivery.
     */
    SUPPORTED,
    /**
     * Signals that a connector does not support exactly-once delivery.
     */
    UNSUPPORTED
}

API SourceConnector 已擴(kuò)展為允許開(kāi)發(fā)人員指定其連接器是否支持一次性交付:
源連接器

package org.apache.kafka.connect.source;
 
public abstract class SourceConnector extends Connector {
    // Existing fields and methods omitted
 
    /**
     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
     * The default implementation will return {@code null}.
     * @param connectorConfigs the configuration that will be used for the connector.
     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support,
     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the
     * connector cannot.
     */
    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
        return null;
    }
}

TransactionContext 引入了一個(gè)新 界面:

事務(wù)上下文

package org.apache.kafka.connect.source;
 
/**
 * Provided to source tasks to allow them to define their own producer transaction boundaries when
 * exactly-once support is enabled.
 */
public interface TransactionContext {
    /**
     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
     * is processed.
     */
    void commitTransaction();
 
    /**
     * Request a transaction commit after a source record is processed. The source record will be the
     * last record in the committed transaction.
     * @param record the record to commit the transaction after.
     */
    void commitTransaction(SourceRecord record);
 
    /**
     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
     * the records in that transaction will be discarded and will not appear in a committed transaction..
     */
    void abortTransaction();
 
    /**
     * Requests a transaction abort after a source record is processed. The source record will be the
     * last record in the aborted transaction. All of the records in that transaction will be discarded
     * and will not appear in a committed transaction.
     * @param record the record to abort the transaction after.
     */
    void abortTransaction(SourceRecord record);
}

該 SourceTaskContext 接口經(jīng)過(guò)擴(kuò)展,為開(kāi)發(fā)人員提供對(duì) TransactionContext 實(shí)例的訪問(wèn)(Javadocs 大部分是從 上的現(xiàn)有文檔復(fù)制的SinkTaskContext::errantRecordReporter ):

源任務(wù)上下文

package org.apache.kafka.connect.source;
 
public interface SourceTaskContext {
    // Existing fields and methods omitted
 
    /**
     * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
     * when exactly-once support is enabled for the connector.
     *
     * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to
     * maintain backward compatibility so they can also be deployed to older Connect runtimes
     * should guard the call to this method with a try-catch block, since calling this method will result in a
     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
     * Connect runtimes older than Kafka 3.0. For example:
     * <pre>
     *     TransactionContext transactionContext;
     *     try {
     *         transactionContext = context.transactionContext();
     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
     *         transactionContext = null;
     *     }
     * </pre>
     *
     * @return the transaction context, or null if the user does not want the connector to define
     * its own transaction boundaries
     * @since 3.0
     */
    default TransactionContext transactionContext() {
        return null
    }
}

ConnectorTransactionBoundaries 引入了一個(gè)新的 枚舉:

源連接器

package org.apache.kafka.connect.source;
 
/**
 * An enum to represent the level of support for connector-defined transaction boundaries.
 */
public enum ConnectorTransactionBoundaries {
    /**
     * Signals that a connector can define its own transaction boundaries.
     */
    SUPPORTED,
    /**
     * Signals that a connector cannot define its own transaction boundaries.
     */
    UNSUPPORTED
}

API還 SourceConnector 通過(guò)第二種新方法進(jìn)行了擴(kuò)展,允許開(kāi)發(fā)人員指定其連接器是否可以定義自己的事務(wù)邊界:源連接器

package org.apache.kafka.connect.source;
 
public abstract class SourceConnector extends Connector {
    // Existing fields and methods omitted
 
    /**
     * Signals whether the connector can define its own transaction boundaries with the proposed
     * configuration. Developers must override this method if they wish to add connector-defined
     * transaction boundary support; if they do not, users will be unable to create instances of
     * this connector that use connector-defined transaction boundaries. The default implementation
     * will return {@code UNSUPPORTED}.
     * @param connectorConfigs the configuration that will be used for the connector
     * @return whether the connector can define its own transaction boundaries  with the given
     * config.
     */
    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
        return ConnectorTransactionBoundaries.UNSUPPORTED;
    }
}

五、REST API驗(yàn)證

當(dāng)用戶今天提交新的連接器配置時(shí),Connect 會(huì)采取措施確保連接器配置在將其寫(xiě)入配置主題之前有效。這些步驟是同步執(zhí)行的,如果檢測(cè)到任何錯(cuò)誤或發(fā)生意外故障,則會(huì)在其請(qǐng)求的 HTTP 響應(yīng)正文和狀態(tài)中向用戶報(bào)告。

此驗(yàn)證在以下端點(diǎn)上進(jìn)行:

  • POST /connectors/
  • PUT /connectors/{connector}/config
  • PUT /connector-plugins/{connectorType}/config/validate
  • 將添加一些新的飛行前驗(yàn)證邏輯,以實(shí)現(xiàn)同樣適用于這些端點(diǎn)(以及將來(lái)執(zhí)行類似飛行前連接器配置驗(yàn)證的任何其他端點(diǎn))的一次性邏輯。

當(dāng)需要一次性支持時(shí)

  • 如果將exact.once.support 連接器配置設(shè)置為 required ,則將查詢連接器的 SourceConnector::exactlyOnceSupport 方法。
  • 如果結(jié)果為 UNSUPPORTED ,則將報(bào)告錯(cuò)誤,并包含 Excellent.once.support 屬性,指出連接器不提供給定配置的 Exactly-Once 支持。
  • 如果結(jié)果為 null ,則將報(bào)告一個(gè)錯(cuò)誤,并包含 Excellent.once.support 屬性,指出 Connect 無(wú)法確定連接器是否提供精確一次保證,并且用戶應(yīng)在繼續(xù)操作之前仔細(xì)查閱連接器的文檔,方法可能是設(shè)置“請(qǐng)求”該屬性。
  • 如果結(jié)果是 SUPPORTED ,則不會(huì)報(bào)錯(cuò)。

當(dāng)需要連接器定義的事務(wù)邊界時(shí)

  • 如果 transaction.boundary 連接器配置設(shè)置為 Connector ,則將查詢連接器的 SourceConnector::canDefineTransactionBoundaries 方法。
  • 如果結(jié)果為 ConnectorTransactionBoundaries.UNSUPPORTED 或 null ,則 transaction.boundary 屬性將報(bào)告錯(cuò)誤,指出連接器不支持定義自己的事務(wù)邊界,并且將鼓勵(lì)用戶使用不同的事務(wù)邊界配置連接器類型。
  • 如果結(jié)果是 ConnectorTransactionBoundaries.SUPPORTED ,則不會(huì)報(bào)告任何錯(cuò)誤。

六、新指標(biāo)

將添加三個(gè)新的任務(wù)級(jí) JMX 屬性:

MBean name: kafka.connect:type=source-task-metrics,connector=([-.\w]+),task=([\d]+)

Kafka系列之:對(duì)源連接器的的Exactly-Once支持

七、計(jì)劃變更

偏移讀取

當(dāng)為worker設(shè)置為啟用exactly.once.source.support時(shí),該worker用于從偏移量主題讀取源偏移量的消費(fèi)者將配置一個(gè)新的默認(rèn)屬性isolation.level=read_comfilled。這將導(dǎo)致它們忽略屬于未提交事務(wù)的任何記錄,但仍然消耗根本不屬于事務(wù)的記錄。

用戶將無(wú)法在工作線程配置中使用屬性consumer.isolation.level或在連接器配置中使用屬性consumer.override.isolation.level顯式覆蓋此設(shè)置。如果他們嘗試這樣做,用戶提供的值將被忽略,并且將記錄一條警告消息,通知他們這一事實(shí)。

偏移(和記錄)寫(xiě)入

當(dāng)為worker設(shè)置為啟用exactly.once.source.support時(shí),該worker創(chuàng)建的所有源任務(wù)將使用事務(wù)生產(chǎn)者寫(xiě)入Kafka。他們提供給工作人員的所有源記錄都將在事務(wù)中寫(xiě)入 Kafka。一旦提交該事務(wù),當(dāng)前批次的偏移量信息將被寫(xiě)入同一事務(wù)內(nèi)的偏移量主題。然后,事務(wù)將被提交。這將確保當(dāng)且僅當(dāng)該批次的源記錄也寫(xiě)入 Kafka 時(shí),源偏移量才會(huì)提交到 Kafka。

用戶將能夠使用新引入的 transaction.boundary 連接器配置屬性(在“公共接口”部分中更詳細(xì)地描述)來(lái)配置連接器的事務(wù)邊界。

一旦偏移量提交完成,如果連接器(隱式或顯式)配置了單獨(dú)的偏移量主題,則提交的偏移量也將使用非事務(wù)性生產(chǎn)者和工作人員的主體寫(xiě)入工作人員的全局偏移主題。這將在與任務(wù)工作和偏移提交線程分開(kāi)的線程上處理,并且根本不應(yīng)該阻塞或干擾任務(wù)。如果工作線程由于某種原因未能寫(xiě)入這些偏移量,它將無(wú)限期地重試,但不會(huì)使任務(wù)失敗。這樣做是為了促進(jìn)“硬”降級(jí)以及用戶從每個(gè)連接器偏移主題切換回全局偏移主題的情況。

任務(wù)生產(chǎn)者將獲得一個(gè)事務(wù) ID g r o u p I d ? {groupId}- groupId?{connector}-${taskId},其中 g r o u p I d 是 C o n n e c t 集群的組 I D , {groupId} 是 Connect 集群的組 ID, groupIdConnect集群的組ID{connector} 是連接器的名稱,并且${taskId} 是任務(wù)的 ID(從零開(kāi)始)。用戶將無(wú)法使用工作人員級(jí)別的 Producer.transactional.id 或連接器級(jí)別的 Producer.override.transactional.id 屬性覆蓋此屬性。如果他們嘗試這樣做,用戶提供的值將被忽略,并且將記錄一條警告消息,通知他們這一事實(shí)。

對(duì)于恰好一次的源任務(wù),工作線程級(jí)別的 offset.flush.timeout.ms 屬性將被忽略。它們將被允許花費(fèi)必要的時(shí)間來(lái)完成偏移量提交,因?yàn)榇藭r(shí)失敗的成本是使源任務(wù)失敗。目前,所有源任務(wù)偏移量提交都發(fā)生在單個(gè)共享工作全局線程上。為了支持沒(méi)有超時(shí)的源任務(wù)提交,同時(shí)也防止滯后任務(wù)破壞集群上其他任務(wù)的可用性,將修改工作線程以允許同時(shí)源任務(wù)偏移量提交。

任務(wù)將其所有記錄刷新到 Kafka 所需的時(shí)間可能會(huì)長(zhǎng)于事務(wù)超時(shí)。在這種情況下,用戶可以采取一些補(bǔ)救措施來(lái)使連接器恢復(fù)健康:調(diào)整生產(chǎn)者配置以獲得更高的吞吐量,增加連接器使用的生產(chǎn)者的事務(wù)超時(shí),減少偏移提交間隔(如果使用間隔) -基于事務(wù)邊界),或切換到 transaction.boundary 屬性的輪詢值。對(duì)于由于生產(chǎn)者事務(wù)超時(shí)而失敗的任務(wù),我們將在錯(cuò)誤消息中包含這些步驟。

SourceTask記錄提交API

SourceTask::commit 和 SourceTask::commitRecord 方法將在每次成功的偏移提交后立即調(diào)用,包括在任務(wù)關(guān)閉期間發(fā)生的生命周期結(jié)束偏移提交。首先,將為已提交批次中的每條記錄調(diào)用 commitRecord,然后調(diào)用 commit。在成功提交偏移量和完成這些調(diào)用之間,工作人員或任務(wù)可能會(huì)突然死亡。

因此,不能保證每次成功提交偏移量后都會(huì)調(diào)用這些方法。然而,只要任務(wù)能夠僅根據(jù)提供給 Connect 框架的偏移信息及其生成的每條記錄來(lái)準(zhǔn)確恢復(fù),就不會(huì)損害一次性交付保證。這些方法可以而且應(yīng)該仍然用于在保證記錄交付后清理和放棄資源,但使用它們來(lái)跟蹤偏移量將阻止連接器實(shí)現(xiàn)一次性支持。

每個(gè)連接器偏移主題
無(wú)論worker上的exactly.once.source.support屬性的值如何,源連接器都將被允許使用自定義偏移主題,可通過(guò)offsets.storage.topic屬性進(jìn)行配置。

首先,隨著 KIP-458 的出現(xiàn),現(xiàn)在可以建立一個(gè)單獨(dú)的 Connect 集群,其連接器每個(gè)都針對(duì)不同的 Kafka 集群(通過(guò) Producer.override.bootstrap.servers、consumer.override.bootstrap.servers 和/或 admin.override.bootstrap.servers 連接器屬性)。目前,源任務(wù)偏移量仍然在整個(gè) Connect 集群的單個(gè)全局偏移量主題中進(jìn)行跟蹤;但是,如果任務(wù)的生產(chǎn)者用于寫(xiě)入源偏移量,并且該生產(chǎn)者的目標(biāo) Kafka 集群與 Connect 工作線程用于其內(nèi)部主題的集群不同,則這種情況將不再可能。為了很好地支持這種情況,必須對(duì)每個(gè)連接器的偏移主題進(jìn)行推理,以允許平滑升級(jí)和降級(jí),而不會(huì)在偏移數(shù)據(jù)中產(chǎn)生不必要的大間隙(請(qǐng)參閱下面的“遷移”)。盡管在這種情況下,用戶不一定需要控制每個(gè)連接器的偏移主題的名稱(例如,工作人員可以簡(jiǎn)單地在連接器覆蓋的 Kafka 集群上創(chuàng)建一個(gè)具有相同名稱的偏移主題),但公開(kāi)此級(jí)別一旦意識(shí)到每個(gè)連接器偏移主題的必要性,控制的增加就不會(huì)增加此設(shè)計(jì)的復(fù)雜性。這樣做的好處是,它應(yīng)該使用戶可以更輕松地在安全環(huán)境中配置連接器,在安全環(huán)境中,連接器主體對(duì)其目標(biāo) Kafka 集群的訪問(wèn)權(quán)限可能受到限制。對(duì)于熟悉 Kafka Streams 的用戶和開(kāi)發(fā)人員來(lái)說(shuō),這是導(dǎo)致全局偏移主題不可能實(shí)現(xiàn)的主要差異化因素。

其次,它消除了潛在的安全漏洞,惡意連接器可能會(huì)破壞集群上其他連接器可用的偏移量信息。目前可以通過(guò)將連接器配置為與工作人員主體不同的主體,并且僅向工作人員主體授予對(duì)偏移量主題的寫(xiě)訪問(wèn)權(quán)限來(lái)防止這種情況。但是,由于現(xiàn)在將使用同一個(gè)生產(chǎn)者來(lái)寫(xiě)入偏移數(shù)據(jù)和寫(xiě)入源任務(wù)記錄,因此該方法將不再起作用。相反,允許連接器使用自己的偏移量主題應(yīng)該允許管理員維護(hù)其集群的安全性,尤其是在多租戶環(huán)境中。

最后,它允許用戶限制在偏移量主題上掛起交易所產(chǎn)生的影響。如果任務(wù)A和B使用相同的offsets主題,并且任務(wù)A在任務(wù)B啟動(dòng)之前在該offsets主題上發(fā)起事務(wù),那么任務(wù)A在沒(méi)有提交其事務(wù)的情況下突然死亡,任務(wù)B將不得不等待該事務(wù)超時(shí)在它可以讀取到偏移量主題的末尾之前。如果任務(wù) A 的事務(wù)超時(shí)設(shè)置得非常高(例如,為了適應(yīng)高吞吐量的突發(fā)),這將阻止任務(wù) B 長(zhǎng)時(shí)間處理任何數(shù)據(jù)。盡管這種情況在某些情況下可能是不可避免的,但為每個(gè)連接器使用專用的偏移量主題應(yīng)該允許集群管理員隔離偏移量主題上掛起事務(wù)的影響范圍。這樣,雖然同一個(gè)連接器的任務(wù)仍然可能互相干擾,但至少不會(huì)干擾其他連接器的任務(wù)。這對(duì)于大多數(shù)多租戶環(huán)境來(lái)說(shuō)應(yīng)該足夠了。

配置
用戶只能配置每個(gè)連接器偏移主題的名稱。其他屬性(例如分區(qū)數(shù)量和復(fù)制因子)將從工作配置中派生。

添加了工作程序啟動(dòng)時(shí)創(chuàng)建這些主題(如果這些主題不存在)的功能,并使用以下屬性來(lái)定義這些新主題的復(fù)制因子和分區(qū)數(shù)量:

config.storage.replication.factor – 默認(rèn)為 3,值必須 >= 1
offset.storage.replication.factor – 默認(rèn)為 3,值必須 >= 1
status.storage.replication.factor – 默認(rèn)為 3,值必須 >= 1
offset.storage.partitions – 默認(rèn)為 25,值必須 >= 1
status.storage.partitions – 默認(rèn)為 5,值必須 >= 1

此外,所有工作線程配置將更改為與以下模式匹配的其他可選屬性,并在嘗試創(chuàng)建內(nèi)部主題時(shí)將它們(不攜帶)傳遞給 Connect 所有工作線程創(chuàng)建的新主題請(qǐng)求:

屬性 類型 默認(rèn)值 描述 額外的屬性
config.storage.<topic-specific-setting> several broker value 創(chuàng)建 Connect 存儲(chǔ)連接器配置的內(nèi)部 Kafka 主題時(shí)使用的其他特定于主題的設(shè)置。這里的“<topic-specific-setting>”必須是應(yīng)創(chuàng)建主題的 Kafka 代理版本的任何有效的 Kafka 主題級(jí)配置;如果代理不知道“<主題特定設(shè)置>”,則 Connect 工作線程將在啟動(dòng)時(shí)失敗。 partitions (always set to 1)cleanup.policy (always set to compact)
offset.storage.<topic-specific-setting> several broker value 創(chuàng)建內(nèi)部 Kafka 主題時(shí)使用的其他特定于主題的設(shè)置,其中 Connect 存儲(chǔ)源連接器的源偏移量。這里的“<topic-specific-setting>”必須是應(yīng)創(chuàng)建主題的 Kafka 代理版本的任何有效的 Kafka 主題級(jí)配置;如果代理不知道“<主題特定設(shè)置>”,則 Connect 工作線程將在啟動(dòng)時(shí)失敗。 cleanup.policy (always set to compact)
status.storage.<topic-specific-setting> several broker value 創(chuàng)建內(nèi)部 Kafka 主題(其中 Connect 存儲(chǔ)連接器和任務(wù)狀態(tài))時(shí)使用的其他特定于主題的設(shè)置。這里的“<topic-specific-setting>”必須是應(yīng)創(chuàng)建主題的 Kafka 代理版本的任何有效的 Kafka 主題級(jí)配置;如果代理不知道“<主題特定設(shè)置>”,則 Connect 工作線程將在啟動(dòng)時(shí)失敗。 cleanup.policy (always set to compact)

托管Kafka集群
無(wú)論worker上的exactly.once.source.support屬性的值是什么,如果連接器配置包含offsets.storage.topic屬性的值,它將在它生成的Kafka集群上使用具有該名稱的偏移量主題數(shù)據(jù)到(可能與托管工作人員的全局偏移量主題的數(shù)據(jù)不同)。

隱式使用
在某些情況下,可能會(huì)隱式配置每個(gè)連接器偏移主題。具體來(lái)說(shuō),如果連接器的配置不包含 offsets.storage.topic 屬性,但確實(shí)包含覆蓋的 bootstrap.servers 值,該值會(huì)導(dǎo)致連接器針對(duì)與其他 Kafka 集群不同的連接器,則會(huì)發(fā)生這種情況(啟用一次源支持時(shí))承載工作人員的全局偏移量主題的主題。

像這樣的連接器將被稱為“隱式配置”以使用單獨(dú)的偏移量主題。

該主題的名稱將與全局偏移主題的名稱相同,該名稱由工作配置中的 offsets.storage.topic 屬性控制。

創(chuàng)建
如果連接器顯式或隱式配置為使用單獨(dú)的偏移量主題,但該主題尚不存在,則工作線程將在啟動(dòng)之前自動(dòng)嘗試為屬于該連接器的任何任務(wù)或連接器實(shí)例創(chuàng)建主題。這將使用從連接器配置構(gòu)建的管理客戶端(使用各種 admin.override.* 連接器屬性、admin.* 工作器屬性和按優(yōu)先級(jí)降序排列的頂級(jí)工作器屬性)來(lái)完成,使用的邏輯與工作人員已將其用于其內(nèi)部主題。

平滑遷移
當(dāng)?shù)谝淮蝿?chuàng)建單獨(dú)的offsets主題時(shí),它自然是空的。為了避免丟失可能存儲(chǔ)在全局偏移量主題中的偏移量信息,當(dāng)連接器或任務(wù)實(shí)例向工作器請(qǐng)求偏移量時(shí),它將獲得其單獨(dú)的偏移量主題和工作器的偏移量信息的組合視圖。全局偏移主題。將優(yōu)先考慮單獨(dú)偏移量主題中存在的偏移量信息。

例如,如果連接器的全局偏移主題中存儲(chǔ)的偏移量為:

工作人員的全局偏移量主題中存在的偏移量

{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761"
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2112"
  }
}

連接器單獨(dú)的偏移量主題中的偏移量是:
單獨(dú)偏移量主題中存在的偏移量

{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169"
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

由工作線程傳遞到連接器的偏移量將是:
提交給任務(wù)的偏移量

{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761" // No offset for this partition was present in the separate offsets topic, so the one in the global offsets topic is used instead
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169" // Preference is given to the offset for this partition that comes from the separate offsets topic
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

八、任務(wù)計(jì)數(shù)記錄

配置主題中將使用一種新類型的記錄,即“任務(wù)計(jì)數(shù)記錄”。如果在使用新的任務(wù)配置集啟動(dòng)任何任務(wù)之前重新配置連接器,則此記錄顯式跟蹤必須被隔離的任務(wù)生產(chǎn)者的數(shù)量,并且將隱式跟蹤是否有必要在開(kāi)始之前執(zhí)行一輪隔離連接器的任務(wù)。

名為“reddit-source”的連接器(包含 11 個(gè)任務(wù))的此記錄示例如下所示:

鍵:“任務(wù)計(jì)數(shù)-reddit-源”
值:{“任務(wù)”:11}

如果連接器的最新任務(wù)計(jì)數(shù)記錄位于配置主題中的最新任務(wù)配置集之后,則工作人員可以安全地為該連接器創(chuàng)建和運(yùn)行任務(wù)。例如,如果配置主題的內(nèi)容(由每條記錄的鍵表示)如下:

偏移量 0:任務(wù)-reddit-source-0
偏移量 1:任務(wù)-reddit-source-1
偏移量2:commit-reddit-source
偏移量 3:task-count-reddit-source

然后工作人員將能夠安全地為 reddit 源連接器啟動(dòng)任務(wù)。

但是,如果配置主題的內(nèi)容是這樣的:

偏移量 0:任務(wù)計(jì)數(shù)-reddit-源
偏移量 1:任務(wù)-reddit-source-0
偏移量 2:任務(wù)-reddit-source-1
偏移量3:commit-reddit-source

新的內(nèi)部端點(diǎn)將添加到 REST API 中:

PUT /連接器/{連接器}/柵欄

該端點(diǎn)將通過(guò) KIP-507:保護(hù)內(nèi)部連接 ??REST 端點(diǎn)中引入的會(huì)話密鑰機(jī)制進(jìn)行保護(hù),并且僅用于工作人員間通信;用戶不應(yīng)該直接查詢它。無(wú)論worker的exactly.once.source.support值是多少,它都可用。

當(dāng)工作人員收到對(duì)此端點(diǎn)的請(qǐng)求時(shí),它將:

  • 檢查一下是否是領(lǐng)導(dǎo)者。如果不是領(lǐng)導(dǎo)者,則要么將請(qǐng)求轉(zhuǎn)發(fā)給領(lǐng)導(dǎo)者,要么使請(qǐng)求失敗,最多使用兩跳。這種兩跳策略已經(jīng)在 Connect 中為其他需要轉(zhuǎn)發(fā)請(qǐng)求的 REST 端點(diǎn)實(shí)現(xiàn)了,因此這里不再詳細(xì)描述。
  • 驗(yàn)證請(qǐng)求 URL 中的連接器是否存在并且是源連接器;如果不是,則請(qǐng)求失敗。
  • 閱讀配置主題的末尾。
  • 如果在連接器的最新任務(wù)配置集之后的配置主題中存在連接器的任務(wù)計(jì)數(shù)記錄,則提供空體 200 響應(yīng)。
  • 如果配置主題中存在連接器的現(xiàn)有任務(wù)計(jì)數(shù)記錄,并且該記錄中的任務(wù)計(jì)數(shù)大于 1,或者連接器的新任務(wù)數(shù)大于 1*:
    • 通過(guò)使用連接器主體實(shí)例化管理客戶端并調(diào)用 Admin::fenceProducers(如下所述的新 API;請(qǐng)參閱用于隔離事務(wù)性生產(chǎn)者的管理 API),隔離對(duì)該連接器的先前任務(wù)實(shí)例可能仍處于活動(dòng)狀態(tài)的所有生產(chǎn)者。事務(wù) ID 將是連接器的每個(gè)任務(wù)將使用的 ID,假設(shè)活動(dòng)的任務(wù)數(shù)量與連接器的最新任務(wù)計(jì)數(shù)記錄一樣多。這將并行完成。
    • 如果領(lǐng)導(dǎo)者在此期間收到為連接器寫(xiě)入新任務(wù)配置的請(qǐng)求,則該輪防護(hù)將立即取消,并提供 HTTP 409 CONFLICT 響應(yīng)。
  • 將隔離連接器的新任務(wù)計(jì)數(shù)記錄寫(xiě)入配置主題
  • 讀取配置主題的末尾以驗(yàn)證任務(wù)計(jì)數(shù)記錄是否已成功寫(xiě)入。
  • 提供空的 200 響應(yīng)。

第 6 步將在單獨(dú)的線程上進(jìn)行,以避免阻塞工作線程的牧羊人蜱線程(這對(duì)于檢測(cè)和處理重新平衡以及執(zhí)行某些 REST 請(qǐng)求的工作至關(guān)重要)。完成后,步驟 7 到 9 將在工作人員的牧羊人蜱線程上處理,以確保工作人員能夠準(zhǔn)確地了解在步驟 6 期間是否已提交連接器的新任務(wù)配置。

    • 檢查任務(wù)計(jì)數(shù)是為了避免對(duì)永久單任務(wù)連接器(例如 Debezium 的 CDC 源連接器)進(jìn)行不必要的防護(hù)工作。如果連接器的最新任務(wù)計(jì)數(shù)記錄顯示一項(xiàng)任務(wù),則只有一項(xiàng)任務(wù)需要隔離。而且,如果該連接器的新配置包含一個(gè)任務(wù),則該新任務(wù)將自動(dòng)隔離其單個(gè)前任任務(wù),因?yàn)樗鼈儗⑹褂孟嗤氖聞?wù) ID。同樣的邏輯不適用于多任務(wù)連接器,即使重新配置后任務(wù)數(shù)量不變;

九、重新平衡的準(zhǔn)備

當(dāng)將正好.once.source.support 設(shè)置為啟用并且檢測(cè)到連接器的一組新任務(wù)配置時(shí),工作人員將搶先停止該連接器的源任務(wù)。更詳細(xì)地說(shuō):

  • 當(dāng)觸發(fā)重新平衡時(shí),在重新加入集群組之前,工作人員將搶先停止所有源連接器的所有任務(wù),這些連接器的任務(wù)配置出現(xiàn)在最新任務(wù)計(jì)數(shù)記錄之后的配置主題中。此步驟對(duì)于保留一次性交付保證不是必需的,但應(yīng)該為任務(wù)在被強(qiáng)制隔離之前提供一個(gè)合理的機(jī)會(huì)來(lái)優(yōu)雅地關(guān)閉。
  • 停止這些任務(wù)將并行完成(因?yàn)榉植际焦ぷ骶€程已經(jīng)完成了集體連接器/任務(wù)停止/啟動(dòng)),并且工作線程在正常任務(wù)關(guān)閉超時(shí)期限??內(nèi)無(wú)法停止的任何任務(wù)(默認(rèn)為五秒,但可以通過(guò)task.shutdown.graceful.timeout.ms工作者屬性控制)將被放棄并允許繼續(xù)運(yùn)行。如果/當(dāng)領(lǐng)導(dǎo)者在一輪僵尸圍欄期間將其生產(chǎn)者圍出時(shí)(如下所述),它們將被禁用。
  • 由于這項(xiàng)工作將并行完成,任務(wù)關(guān)閉的默認(rèn)超時(shí)時(shí)間相當(dāng)?shù)停ㄎ迕耄?,并且?dāng)前用于執(zhí)行集體任務(wù)停止的線程數(shù)為 8,因此這不太可能妨礙工作人員的工作。能夠在重新平衡超時(shí)(默認(rèn)為 60 秒)超時(shí)之前重新加入組。

這部分是對(duì)牧民重新平衡機(jī)制的誤解的結(jié)果,實(shí)際上是沒(méi)有必要的。在重新平衡期間(重新)加入組之前,工作線程已對(duì)所有重新配置的任務(wù)(如果使用增量重新平衡)或所有任務(wù)(如果使用急切重新平衡)執(zhí)行此搶先停止。

十、源任務(wù)啟動(dòng)

當(dāng)為工作線程設(shè)置為啟用正好.once.source.support 時(shí),將采取額外的步驟來(lái)確保任務(wù)僅在安全時(shí)運(yùn)行。

在工作線程啟動(dòng)源任務(wù)之前,它將首先向領(lǐng)導(dǎo)者的內(nèi)部僵尸防護(hù)端點(diǎn)發(fā)送任務(wù)連接器的請(qǐng)求。如果該請(qǐng)求因任何原因失敗,該任務(wù)將被標(biāo)記為 FAILED 并且啟動(dòng)將中止。

一旦工作線程實(shí)例化了源任務(wù)的生產(chǎn)者,它將再次讀取配置主題的末尾,并且如果已為該連接器生成一組新的任務(wù)配置,它將中止任務(wù)的啟動(dòng)。

總而言之,任務(wù)的啟動(dòng)過(guò)程將是:

  1. Worker開(kāi)始啟動(dòng)一個(gè)源任務(wù)(由于用戶手動(dòng)重啟、重新平衡、重新配置等)
  2. Worker向Leader發(fā)出連接請(qǐng)求;如果此請(qǐng)求因任何原因失敗,則任務(wù)將被標(biāo)記為 FAILED 并且啟動(dòng)將中止
  3. Worker 讀取配置主題的末尾,并驗(yàn)證在連接器的最新任務(wù)配置集之后現(xiàn)在是否存在連接器的任務(wù)計(jì)數(shù)記錄;如果不是這種情況,則放棄任務(wù)啟動(dòng)*
  4. 如果連接器隱式或顯式配置為使用單獨(dú)的偏移量主題:
    Worker嘗試創(chuàng)建主題(如果主題已經(jīng)存在,則默默地吞下錯(cuò)誤);如果由于不可接受的原因而失?。ㄊ褂门c工作線程在創(chuàng)建其內(nèi)部主題時(shí)相同的邏輯來(lái)區(qū)分可接受和不可接受的錯(cuò)誤),則任務(wù)將被標(biāo)記為 FAILED 并且啟動(dòng)將中止
  5. Worker 為任務(wù)實(shí)例化事務(wù)生產(chǎn)者
  6. Worker 讀取配置主題的末尾
  7. 如果此后為連接器生成了一組新的任務(wù)配置,則放棄任務(wù)啟動(dòng)*
    否則,開(kāi)始輪詢?nèi)蝿?wù)以獲取數(shù)據(jù)
    • 如果發(fā)生這種情況,將自動(dòng)啟動(dòng)一個(gè)新任務(wù)來(lái)代替該任務(wù),以響應(yīng)配置主題中的新任務(wù)配置集。用戶無(wú)需執(zhí)行任何操作(例如重新啟動(dòng)任務(wù))。

十一、領(lǐng)導(dǎo)者訪問(wèn)配置主題

當(dāng)將正好.once.source.support 設(shè)置為為某個(gè)工作程序準(zhǔn)備或啟用時(shí),如果該工作程序是集群的領(lǐng)導(dǎo)者,它現(xiàn)在將使用事務(wù)生產(chǎn)者來(lái)保證最多一個(gè)工作程序能夠?qū)懭肱渲弥黝}隨時(shí)。更詳細(xì)地說(shuō):

  • 重新平衡后,如果一個(gè)worker發(fā)現(xiàn)自己成為集群的leader,它會(huì)實(shí)例化一個(gè)事務(wù)生產(chǎn)者,其事務(wù)ID為 connect-cluster-${groupId} ,其中 ${groupId } 是集群的組ID。如果用戶嘗試通過(guò)在工作程序配置中設(shè)置 transactional.id 屬性來(lái)覆蓋此設(shè)置,則用戶提供的值將被忽略,并且將記錄一條警告消息,通知他們這一事實(shí)。工作線程將使用這個(gè)生產(chǎn)者來(lái)執(zhí)行它在配置主題上執(zhí)行的所有寫(xiě)入操作。它將開(kāi)始并為其寫(xiě)入的每條記錄提交一個(gè)事務(wù)。這可能看起來(lái)不尋?!绻聞?wù)并不是真正必要的話,為什么要使用事務(wù)性生產(chǎn)者?——但它確保只有最新的領(lǐng)導(dǎo)者才能夠生產(chǎn)配置主題,而僵尸領(lǐng)導(dǎo)者(盡管可能很少見(jiàn))會(huì)不能夠。
  • 例如,如果領(lǐng)導(dǎo)者在嘗試為連接器排除上一代任務(wù)生產(chǎn)者時(shí)停滯不前,那么它可能會(huì)脫離組并成為僵尸。如果此時(shí)另一個(gè)工作進(jìn)程被選舉為領(lǐng)導(dǎo)者,然后重新配置連接器,則僵尸領(lǐng)導(dǎo)者可能會(huì)解除阻塞,然后在一組新的任務(wù)配置寫(xiě)入主題后嘗試將任務(wù)計(jì)數(shù)記錄寫(xiě)入配置主題。這會(huì)破壞配置主題的狀態(tài)并違反一個(gè)關(guān)鍵假設(shè):連接器的任務(wù)配置之后出現(xiàn)的任務(wù)計(jì)數(shù)記錄意味著在啟動(dòng)該連接器的任務(wù)之前沒(méi)有必要隔離上一代生產(chǎn)者。
  • 通過(guò)在領(lǐng)導(dǎo)者上使用事務(wù)生產(chǎn)者,我們可以保證,如果從那時(shí)起沒(méi)有其他工作人員成為領(lǐng)導(dǎo)者并為該主題的連接器寫(xiě)入新的任務(wù)配置,那么領(lǐng)導(dǎo)者只能將任務(wù)計(jì)數(shù)記錄寫(xiě)入配置主題。我們還解決了 Connect 中現(xiàn)有的邊緣情況,其中僵尸領(lǐng)導(dǎo)者可能會(huì)向配置主題寫(xiě)入不正確的信息。

十二、用于隔離事務(wù)生產(chǎn)者的管理 API

Java API 添加
管理界面將進(jìn)行擴(kuò)展,以包含通過(guò)事務(wù) ID 隔離生產(chǎn)者的新方法。通過(guò)使用一個(gè)或多個(gè)事務(wù)性生產(chǎn)者并調(diào)用其 initTransactions 方法,已經(jīng)可以實(shí)現(xiàn)相同的功能,但在管理客戶端上進(jìn)行了復(fù)制,以便更容易用于其他情況(有關(guān)更多詳細(xì)信息,請(qǐng)參閱僅 Fencing 生產(chǎn)者拒絕的替代方案):

Admin.java

public interface Admin {
 
    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options          The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);
 
    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(transactionalIds, new FenceProducersOptions());
    }
 
}

API 的新類也將被添加。

FenceProducersResult 類:

FenceProducersResult.java

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {
 
    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
        // Implementation goes here
    }
 
    /**
     * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
     */
    public KafkaFuture<Long> producerId(String transactionalId) {
        // Implementation goes here
    }
 
    /**
     * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
     */
    public KafkaFuture<Short> epochId(String transactionalId) {
        // Implementation goes here
    }
 
    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
        // Implementation goes here
    }
 
}

以及 FenceProducersOptions 類:
FenceProducersOptions.java

package org.apache.kafka.clients.admin;
 
/**
 * Options for {@link Admin#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

執(zhí)行
在底層,這將通過(guò) FindCoordinator 協(xié)議查找每個(gè)事務(wù) ID 的事務(wù)協(xié)調(diào)器,然后通過(guò)調(diào)用沒(méi)有生產(chǎn)者 ID 和生產(chǎn)者紀(jì)元的 InitProducerId 協(xié)議來(lái)使用該 ID 初始化新事務(wù)來(lái)實(shí)現(xiàn)。

  • 提高 PID 的紀(jì)元,以便生產(chǎn)者的任何先前的僵尸實(shí)例都被隔離并且無(wú)法繼續(xù)其事務(wù)。
  • 恢復(fù)(前滾或回滾)生產(chǎn)者的前一個(gè)實(shí)例留下的任何未完成的事務(wù)。

ACL
此新 API 所需的 ACL 將與為每個(gè)指定事務(wù) ID 使用事務(wù)生產(chǎn)者所需的 ACL 相同。具體來(lái)說(shuō),這相當(dāng)于對(duì) TransactionalId 資源上的 Write 和Describe 操作的授權(quán),以及對(duì) Cluster 資源上的 IdempoteWrite 操作的授權(quán)。

請(qǐng)注意,IdempotWrite ACL 從 2.8 開(kāi)始已被棄用(請(qǐng)參閱 KIP-679),并且僅對(duì)于在 2.8 之前的 Kafka 集群上運(yùn)行的 Connect 集群是必需的。

限制

  • 需要分布式模式;一次性源連接器尚不支持獨(dú)立模式
  • 只能對(duì)所有源連接器啟用或不啟用 Exactly-once 源支持;它不能在每個(gè)連接器的基礎(chǔ)上進(jìn)行切換
  • 為了實(shí)現(xiàn)一次性交付,連接器必須一次將源分區(qū)最多分配給一個(gè)任務(wù)(否則可能會(huì)發(fā)生重復(fù)寫(xiě)入)
  • 為了實(shí)現(xiàn)一次性交付,連接器必須使用 Connect 源偏移 API 來(lái)跟蹤進(jìn)度(否則可能會(huì)發(fā)生重復(fù)寫(xiě)入或丟失記錄)

十三、解決了故障/退化場(chǎng)景

這些場(chǎng)景不會(huì)損害集群的一次性交付保證。

在僵尸防護(hù)期間,Leader 無(wú)法將任務(wù)計(jì)數(shù)記錄寫(xiě)入配置主題

在一輪僵尸防護(hù)期間,領(lǐng)導(dǎo)者將無(wú)限期地嘗試將任務(wù)計(jì)數(shù)寫(xiě)入配置主題,并在寫(xiě)入后通過(guò)從主題讀回來(lái)驗(yàn)證它是否已寫(xiě)入。

如果花費(fèi)的時(shí)間太長(zhǎng),工人可能會(huì)從小組中退出,并且可能會(huì)選舉出新的領(lǐng)導(dǎo)者。如果(現(xiàn)已被廢黜的)領(lǐng)導(dǎo)者在此過(guò)程中變得暢通無(wú)阻并嘗試寫(xiě)入配置主題,它將被其替代者隔離。如果它在讀取配置主題時(shí)被阻止,它將醒來(lái)并完成響應(yīng)服務(wù)。在牧羊人蜱線程的下一次迭代中,它將發(fā)現(xiàn)它已脫離組并嘗試重新加入。

如果讀取/寫(xiě)入配置主題的嘗試沒(méi)有被阻止,而是由于某種原因失敗,領(lǐng)導(dǎo)者將立即離開(kāi)組,然后嘗試重新加入,提供 HTTP 500 ERROR 響應(yīng),其中包含交互失敗的堆棧跟蹤與配置主題。

在僵尸圍欄期間無(wú)法將生產(chǎn)者拒之門(mén)外
例如,如果領(lǐng)導(dǎo)者無(wú)法聯(lián)系或找到事務(wù)協(xié)調(diào)器,或者連接器的管理主體缺乏隔離生產(chǎn)者執(zhí)行其任務(wù)所需的 ACL,則領(lǐng)導(dǎo)者可能無(wú)法在一輪僵尸隔離期間隔離生產(chǎn)者。

如果發(fā)生這種情況,領(lǐng)導(dǎo)者將提供 HTTP 500 ERROR 響應(yīng),其中包含未能隔離生產(chǎn)者的堆棧跟蹤。

對(duì)于某些錯(cuò)誤(例如由 ACL 不足引起的錯(cuò)誤),堆棧跟蹤將包含一條有用的錯(cuò)誤消息,其中包含有關(guān)如何修復(fù)故障的說(shuō)明(例如授予連接器的管理員主體權(quán)限,以使用適用的列表或描述來(lái)隔離生產(chǎn)者)交易 ID)。

收到此 500 響應(yīng)的工作線程將在狀態(tài)主題中標(biāo)記任務(wù)對(duì)象 FAILED,并使用響應(yīng)中包含的堆棧跟蹤來(lái)填充狀態(tài)消息的跟蹤字段。

手動(dòng)重新啟動(dòng)隔離任務(wù)

如果任務(wù)由于其生產(chǎn)者已被隔離而失敗,用戶可以嘗試通過(guò) REST API 手動(dòng)重新啟動(dòng)該任務(wù)。如果工作人員對(duì)配置主題的視圖是最新的,這只會(huì)導(dǎo)致重新啟動(dòng)的任務(wù)替換同一任務(wù)的任何現(xiàn)有實(shí)例,并且將保留一次性交付保證。但是,如果工作線程的配置主題視圖已過(guò)時(shí),則如果該任務(wù)最終從已分配給最新任務(wù)配置集中的不同任務(wù)的源分區(qū)生成數(shù)據(jù),則重新啟動(dòng)該任務(wù)可能會(huì)損害一次性保證對(duì)于連接器。

為了防止手動(dòng)用戶重新啟動(dòng)損害一次性交付保證,Connect 工作人員將在繼續(xù)任何源任務(wù)重新啟動(dòng)之前閱讀配置主題的末尾。如果工作線程無(wú)法讀取到配置主題的末尾,則重新啟動(dòng)請(qǐng)求將收到 HTTP 500 TIMEOUT 響應(yīng)。如果工作線程在讀取主題末尾時(shí)發(fā)現(xiàn)任務(wù)不再存在(因?yàn)檫B接器已被刪除或其任務(wù)計(jì)數(shù)減少),它將提供通常的 HTTP 404 NOT FOUND 響應(yīng)。如果由于從配置主題讀取新信息而需要重新平衡,則將提供 HTTP 409 CONFLICT 響應(yīng)。

Worker 在被分配源任務(wù)之后但在實(shí)例化生產(chǎn)者之前出現(xiàn)滯后
如果為工作人員分配了一個(gè)源任務(wù),并在該連接器的最新一組任務(wù)配置之后在配置主題中看到任務(wù)計(jì)數(shù)記錄,則假設(shè)該連接器的所有先前任務(wù)生成的所有生產(chǎn)者都已被隔離。而且,在領(lǐng)導(dǎo)者進(jìn)行擊劍時(shí),這個(gè)假設(shè)可能成立。但是,工作人員可能會(huì)被分配一個(gè)源任務(wù),觀察配置主題中的任務(wù)計(jì)數(shù)記錄,表明可以安全地啟動(dòng)該任務(wù),然后在能夠構(gòu)造生產(chǎn)者之前阻塞一段較長(zhǎng)的時(shí)間為了那個(gè)任務(wù)。例如,考慮一個(gè)場(chǎng)景,其中工作人員 F(追隨者)、O(其他追隨者)和 L(領(lǐng)導(dǎo)者)在源連接器 C 的任務(wù) T 上進(jìn)行操作:

  1. 連接器 C 已重新配置,并且新的任務(wù)配置已寫(xiě)入配置主題
  2. 觸發(fā)重新平衡,worker L 將任務(wù) T 分配給worker F
  3. 工作人員 F 收到分配的任務(wù),并在向工作人員 L 請(qǐng)求成功一輪防護(hù)后,能夠從配置主題讀取連接器 C 的最新任務(wù)計(jì)數(shù)記錄
  4. 工作線程 F 在能夠?qū)嵗蝿?wù) T 的事務(wù)生產(chǎn)者之前會(huì)阻塞
  5. 連接器 C 已重新配置,并且新的任務(wù)配置已寫(xiě)入配置主題
  6. 觸發(fā)重新平衡,由于 F 已脫離組,工作人員 L 將任務(wù) T 分配給工作人員 O
  7. 工作人員 O 收到其分配,并在向工作人員 L 請(qǐng)求成功一輪防護(hù)后,能夠從配置主題讀取連接器 C 的最新任務(wù)計(jì)數(shù)記錄,實(shí)例化任務(wù) T 的事務(wù)生產(chǎn)者,然后開(kāi)始處理數(shù)據(jù)從那個(gè)任務(wù)
  8. 工作線程 F 解除阻塞,實(shí)例化任務(wù) T 的事務(wù)生產(chǎn)者,然后開(kāi)始處理該任務(wù)的數(shù)據(jù)

在這種情況下,如果最初分配給任務(wù) T 的源分區(qū)在步驟 5 的重新配置期間被重新分配給不同的任務(wù),則工作線程 F 在步驟 8 中創(chuàng)建的任務(wù)實(shí)例將開(kāi)始產(chǎn)生重復(fù)數(shù)據(jù),并且一次性交付保證將受到損害。

這是可能的,因?yàn)樵陬I(lǐng)導(dǎo)者執(zhí)行隔離時(shí),不能保證所有可能對(duì)連接器處于活動(dòng)狀態(tài)的生產(chǎn)者都已創(chuàng)建。如果生產(chǎn)者尚未創(chuàng)建,則無(wú)法將其排除在外;如果它是在隔離發(fā)生之后創(chuàng)建的,則必須采取不同的方法來(lái)確保不會(huì)產(chǎn)生重復(fù)數(shù)據(jù)。

在為源任務(wù)啟動(dòng)事務(wù)生產(chǎn)者之后,工作線程應(yīng)通過(guò)對(duì)配置主題末尾的附加讀取來(lái)覆蓋這種情況。如果在工作線程被阻止并且發(fā)生一輪生產(chǎn)者防護(hù)后重新配置連接器,則工作線程將為其源任務(wù)啟動(dòng)事務(wù)性生產(chǎn)者,但隨后發(fā)現(xiàn)連接器重新配置并中止啟動(dòng)。如果工作線程啟動(dòng)了事務(wù)性生產(chǎn)者,然后在完成對(duì)配置主題末尾的讀取之前被阻塞,則領(lǐng)導(dǎo)者發(fā)起的這一輪生產(chǎn)者圍欄應(yīng)該隔離該生產(chǎn)者,并且任何后續(xù)的重新啟動(dòng)嘗試都將阻塞,直到/除非工作線程能夠完成對(duì)配置主題末尾的讀?。ú⑻幚碓诖诉^(guò)程中從主題讀取的新信息所需的任何重新平衡)。

意外的任務(wù)提交丟失的消息
如果事務(wù)生產(chǎn)者成功實(shí)例化,它不會(huì)再次嘗試聯(lián)系代理,直到嘗試實(shí)際發(fā)送記錄。這可能會(huì)導(dǎo)致有趣的行為:生產(chǎn)者啟動(dòng)并能夠與事務(wù)協(xié)調(diào)器進(jìn)行初始聯(lián)系,然后整個(gè) Kafka 集群死亡,然后生產(chǎn)者開(kāi)始并提交事務(wù),而不嘗試發(fā)送任何記錄,并且沒(méi)有異?;蚱渌闆r生產(chǎn)者在此事件序列中的任何點(diǎn)生成失敗指示。

什么時(shí)候可能會(huì)發(fā)生這種情況?如果任務(wù)配置了激進(jìn)的 SMT,會(huì)刪除給定批次中的所有記錄,則其生產(chǎn)者在提交當(dāng)前事務(wù)之前絕不會(huì)嘗試發(fā)送任何記錄。并且,一旦發(fā)生這種情況,工作線程將調(diào)用 SourceTask::commit,這可能會(huì)導(dǎo)致任務(wù)從上游系統(tǒng)刪除數(shù)據(jù)(例如,通過(guò)確認(rèn)來(lái)自 JMS 代理的一批消息)。即使發(fā)生這種情況,也不應(yīng)該成為問(wèn)題:可以從上游系統(tǒng)中刪除本應(yīng)由連接器刪除的消息,無(wú)論這些記錄的源偏移量是否已提交給 Kafka,因?yàn)樽罱K結(jié)果是無(wú)論哪種方式都一樣。

十四、允許的故障場(chǎng)景

這些場(chǎng)景可能會(huì)損害集群的一次性交付保證。

異構(gòu)集群

如果工作線程處于活動(dòng)狀態(tài)并且不支持精確一次傳送(因?yàn)槲磳⒄?once.source.support設(shè)置為啟用,或者工作線程正在運(yùn)行舊版本的 Connect,該功能不可用),整個(gè)集群提供一次性保證的能力將受到損害。沒(méi)有辦法排除不合規(guī)的工人。即使開(kāi)發(fā)出來(lái)了,問(wèn)題也只會(huì)略有改變:如果一個(gè)工作人員處于活動(dòng)狀態(tài)并且無(wú)法被集群中的其他工作人員隔離,那么我們將處于與以前完全相同的位置。

這種失敗場(chǎng)景需要在公共文檔中指出,以便用戶了解他們有責(zé)任確保沒(méi)有不合規(guī)的工作人員處于活動(dòng)狀態(tài)。

非工人、非領(lǐng)導(dǎo)者生產(chǎn)者配置主題

如果除有效的領(lǐng)導(dǎo)者工作人員之外的任何內(nèi)容最終寫(xiě)入配置主題,則集群依賴于確定啟動(dòng)任務(wù)是否安全以及是否需要隔離的所有假設(shè)都可能無(wú)效。在相當(dāng)安全的 Connect 集群中,這種情況永遠(yuǎn)不應(yīng)該發(fā)生,如果發(fā)生了,與破壞配置主題的其他后果相比,受損的交付保證可能會(huì)顯得蒼白無(wú)力。

十五、兼容性、棄用和遷移計(jì)劃

與再平衡協(xié)議的互操作性

通過(guò)急切和增量合作再平衡協(xié)議都可以實(shí)現(xiàn) Exactly-once 支持。支持這兩種協(xié)議的額外設(shè)計(jì)復(fù)雜性是最小的,并且考慮到 Connect 框架仍然支持這兩種協(xié)議,一些用戶可能仍然會(huì)使用急切的重新平衡。如果支持這兩種協(xié)議的工作不是令人望而卻步,他們就不必升級(jí)到較新的重新平衡協(xié)議來(lái)獲得此功能。

Worker主體權(quán)限
在將正好.once.source.support 設(shè)置為preparing 或enabled 之前,必須向worker 的生產(chǎn)者主體授予其寫(xiě)入的Kafka 集群的以下權(quán)限:

操作 資源類型 Resource Name
Write TransactionalId connect-cluster-${groupId} ,其中 ${groupId } 是集群的組 ID
Describe TransactionalId connect-cluster-${groupId} ,其中 ${groupId } 是集群的組 ID
IdempotentWrite * Cluster Connect 集群定位的 Kafka 集群
    • 請(qǐng)注意,IdempotWrite ACL 從 2.8 開(kāi)始已被棄用,并且僅對(duì)于在 2.8 之前的 Kafka 集群上運(yùn)行的 Connect 集群是必需的。

連接器主體權(quán)限
Producer
啟用一次性源支持后,每個(gè)源連接器的生產(chǎn)者主體必須在其寫(xiě)入的 Kafka 集群上獲得以下權(quán)限:

操作 資源類型 Resource Name
Write TransactionalId g r o u p I d ? {groupId}- groupId?{connector}-${taskId},對(duì)于連接器將創(chuàng)建的每個(gè)任務(wù),其中 g r o u p I d 是 C o n n e c t 集群的組 I D , {groupId} 是 Connect 集群的組 ID, groupIdConnect集群的組ID,{connector} 是連接器的名稱,并且${taskId} 是任務(wù)的 ID(從零開(kāi)始)。如果不存在與其他事務(wù) ID 沖突的風(fēng)險(xiǎn)或者用戶可以接受沖突,則可以使用 g r o u p I d ? {groupId}- groupId?{connector}* 通配符前綴以方便起見(jiàn)。
Describe TransactionalId g r o u p I d ? {groupId}- groupId?{connector}-${taskId},對(duì)于連接器將創(chuàng)建的每個(gè)任務(wù),其中 g r o u p I d 是 C o n n e c t 集群的組 I D , {groupId} 是 Connect 集群的組 ID, groupIdConnect集群的組ID,{connector} 是連接器的名稱,并且${taskId} 是任務(wù)的 ID(從零開(kāi)始)。如果不存在與其他事務(wù) ID 沖突的風(fēng)險(xiǎn)或者用戶可以接受沖突,則可以使用 g r o u p I d ? {groupId}- groupId?{connector}* 通配符前綴以方便起見(jiàn)。
Write Topic 連接器使用的偏移主題,它是連接器配置中的 offsets.storage.topic 屬性的值(如果提供),或者是工作線程配置中的 offsets.storage.topic 屬性的值(如果沒(méi)有)。
IdempotentWrite * Cluster 連接器針對(duì)的 Kafka 集群。
    • 請(qǐng)注意,IdempotWrite ACL 從 2.8 開(kāi)始已被棄用,并且僅對(duì)于在 2.8 之前的 Kafka 集群上運(yùn)行的 Connect 集群是必需的。

消費(fèi)者
每個(gè)源連接器的消費(fèi)者主體必須在其讀取偏移量的 Kafka 集群上獲得以下權(quán)限:

操作 資源類型 Resource Name
Read TransactionalId 連接器使用的偏移主題,它是連接器配置中的 offsets.storage.topic 屬性的值(如果提供),或者是工作線程配置中的 offsets.storage.topic 屬性的值(如果沒(méi)有)。

請(qǐng)注意,當(dāng)前沒(méi)有 Connect 用例需要為源連接器配置使用者主體。因此,此提議的更改在技術(shù)上為源連接器引入了“新”配置屬性:以 Consumer.override 為前綴的消費(fèi)者級(jí)別覆蓋。

Admin
啟用一次性源支持后,必須向連接器的管理主體授予其從中讀取偏移量的 Kafka 集群的以下權(quán)限:

操作 資源類型 Resource Name
Write TransactionalId g r o u p I d ? {groupId}- groupId?{connector}-${taskId},對(duì)于連接器將創(chuàng)建的每個(gè)任務(wù),其中 g r o u p I d 是 C o n n e c t 集群的組 I D , {groupId} 是 Connect 集群的組 ID, groupIdConnect集群的組ID,{connector} 是連接器的名稱,并且${taskId} 是任務(wù)的 ID(從零開(kāi)始)。如果不存在與其他事務(wù) ID 沖突的風(fēng)險(xiǎn)或者用戶可以接受沖突,則可以使用 g r o u p I d ? {groupId}- groupId?{connector}* 通配符前綴以方便起見(jiàn)。
Describe TransactionalId g r o u p I d ? {groupId}- groupId?{connector}-${taskId},對(duì)于連接器將創(chuàng)建的每個(gè)任務(wù),其中 g r o u p I d 是 C o n n e c t 集群的組 I D , {groupId} 是 Connect 集群的組 ID, groupIdConnect集群的組ID,{connector} 是連接器的名稱,并且${taskId} 是任務(wù)的 ID(從零開(kāi)始)。如果不存在與其他事務(wù) ID 沖突的風(fēng)險(xiǎn)或者用戶可以接受沖突,則可以使用 g r o u p I d ? {groupId}- groupId?{connector}* 通配符前綴以方便起見(jiàn)。

如果連接器的(隱式或顯式配置的)偏移量主題尚不存在,則必須在將托管偏移量主題的 Kafka 集群上為其管理主體授予以下權(quán)限:

操作 資源類型 Resource Name
Create Topic 連接器使用的偏移量主題,它是連接器配置中 offsets.storage.topic 屬性的值。如果未提供,并且連接器針對(duì)工作線程用于其內(nèi)部主題的同一 Kafka 集群,則不需要此 ACL,因?yàn)楣ぷ骶€程的共享偏移量主題應(yīng)在啟動(dòng)任何連接器之前自動(dòng)創(chuàng)建。

此外,無(wú)論如何,連接器的管理主體都必須在它讀取偏移量的 Kafka 集群上獲得以下權(quán)限:

操作 資源類型 Resource Name
Describe Topic 連接器使用的偏移量主題,它是連接器配置中 offsets.storage.topic 屬性的值。如果未提供,并且連接器針對(duì)工作線程用于其內(nèi)部主題的同一 Kafka 集群,則不需要此 ACL,因?yàn)楣ぷ骶€程的共享偏移量主題應(yīng)在啟動(dòng)任何連接器之前自動(dòng)創(chuàng)建。

為了確保 Connect 工作線程已讀取到偏移量主題的末尾,這是必要的,即使在讀取到末尾時(shí)該主題上存在打開(kāi)的事務(wù)。這將通過(guò)以下方式完成:使用具有 read_uncommissed 隔離級(jí)別的管理客戶端列出偏移量主題的結(jié)束偏移量,然后使用具有 read_commissed 隔離級(jí)別的該主題進(jìn)行消費(fèi),直到至少管理客戶端列出的那些偏移量為止。

十六、滾動(dòng)升級(jí)

最多需要兩次滾動(dòng)升級(jí)才能在 Connect 集群上啟用一次性源支持。

  • 第一個(gè)滾動(dòng)升級(jí)是將每個(gè)工作人員升級(jí)到可以提供一次性源支持的 Connect 版本,并將 just.once.source.support 設(shè)置為preparing。
  • 第二個(gè)滾動(dòng)升級(jí)將是在每個(gè)工作線程上實(shí)際啟用一次源支持(通過(guò)將正好.once.source.support 設(shè)置為enabled)。

需要進(jìn)行兩次升級(jí),以確保內(nèi)部防護(hù)端點(diǎn)在每個(gè)工作人員需要之前都可用,并且在升級(jí)期間非領(lǐng)導(dǎo)工作人員無(wú)法寫(xiě)入配置主題。

十七、降級(jí)

可能有兩種降級(jí)??梢酝ㄟ^(guò)將exact.once.source.support設(shè)置為disabled或?yàn)榧褐械膚orker準(zhǔn)備(“軟”降級(jí))來(lái)禁用對(duì)集群的exactly-once支持,或者可以將worker恢復(fù)為使用舊版本的Connect框架根本不支持一次性源(“硬”降級(jí))。

軟降級(jí)
軟降級(jí)應(yīng)該可以通過(guò)集群滾動(dòng)來(lái)實(shí)現(xiàn),其中每個(gè)工作線程都被關(guān)閉,其配置被更改為禁用一次性源支持,然后重新啟動(dòng)。在此過(guò)程中,所有連接器及其任務(wù)應(yīng)繼續(xù)運(yùn)行和處理數(shù)據(jù),這要?dú)w功于使?jié)L動(dòng)升級(jí)成為可能的相同邏輯。

硬降級(jí)
偏置精度
由于升級(jí)工作線程上的源任務(wù)偏移量仍會(huì)寫(xiě)入工作線程的全局偏移主題,因此即使降級(jí)工作線程不支持每個(gè)連接器偏移主題,它仍然可以獲取其連接器的相對(duì)最新的源偏移量。其中一些偏移量可能已經(jīng)過(guò)時(shí)或比連接器單獨(dú)的偏移量主題中的偏移量更舊,但唯一的后果是連接器重復(fù)寫(xiě)入,這在沒(méi)有啟用一次性支持的任何集群上都是可能的。如果沒(méi)有對(duì)全局偏移量主題進(jìn)行任何寫(xiě)入,則自切換到專用偏移量主題以來(lái)連接器處理的所有記錄都將在降級(jí)后重新處理,并可能導(dǎo)致大量重復(fù)項(xiàng)。雖然在技術(shù)上是允許的,因?yàn)樵谶@種情況下,用戶將故意切換到不支持一次性源連接器的 Connect 框架版本(因此容易受到記錄的重復(fù)傳遞),但這種情況下的用戶體驗(yàn)可能會(huì)受到影響。非常糟糕,因此 Connect 需要付出一些額外的努力來(lái)顯著減少這種情況下降級(jí)的影響。

如果此時(shí)需要重新升級(jí),則可能需要事先刪除任何單獨(dú)的每個(gè)連接器偏移主題。否則,工作人員將優(yōu)先考慮現(xiàn)有的單獨(dú)偏移量主題,即使該主題中的數(shù)據(jù)已過(guò)時(shí)并且全局偏移量主題中實(shí)際上存在更新的信息。

兩步降級(jí)
執(zhí)行硬降級(jí)的最安全方法是遵循與滾動(dòng)升級(jí)相同的步驟,但相反。具體來(lái)說(shuō):

  • 執(zhí)行初始滾動(dòng)降級(jí),其中每個(gè)工作線程的正好.once.source.support 設(shè)置為 false。
  • 執(zhí)行第二次滾動(dòng)降級(jí),其中每個(gè)工作線程都被修改為使用早期版本的 Connect。

由于使?jié)L動(dòng)升級(jí)成為可能的相同邏輯,所有連接器和任務(wù)都應(yīng)繼續(xù)運(yùn)行和處理數(shù)據(jù)。

單步降級(jí)
如果以單步滾動(dòng)降級(jí)方式執(zhí)行硬降級(jí)(即每個(gè)工作線程關(guān)閉后,立即降級(jí)到較低版本的 Connect),則某些任務(wù)可能會(huì)開(kāi)始失敗,因?yàn)槠涔ぷ骶€程將無(wú)法訪問(wèn)已降級(jí)的工作人員的內(nèi)部防護(hù)端點(diǎn)。降級(jí)完成后,重新啟動(dòng)這些任務(wù)就足以讓它們?cè)俅芜\(yùn)行。

十八、附加配置屬性

這里引入的工作線程級(jí)別的正好.once.source.support和連接器級(jí)別的offsets.storage.topic、transaction.boundary、exact.once.support和transaction.boundary.interval.ms配置屬性不應(yīng)違反向后兼容性,因?yàn)樗鼈兌紟в斜A衄F(xiàn)有行為的默認(rèn)值。從技術(shù)上講,現(xiàn)有連接器可能會(huì)公開(kāi) offsets.storage.topic 配置屬性,該屬性現(xiàn)在將與新引入的框架屬性發(fā)生沖突,但風(fēng)險(xiǎn)足夠低,可以接受。

新引入的源連接器consumer.override。 -prefixed 屬性在技術(shù)上也將向后不兼容,但出于同樣的原因,這些屬性最初是為接收器連接器和類似的 Producer.override 引入的。 - 和 admin.override。 -為源連接器引入了前綴屬性,這應(yīng)該是可以接受的。風(fēng)險(xiǎn)足夠低。

十九、新的管理API

用于隔離生產(chǎn)者的新管理方法將使用現(xiàn)有協(xié)議(特別是 FindCoordinator 和 InitProducerId)來(lái)實(shí)現(xiàn),因此不需要對(duì) Kafka 二進(jìn)制協(xié)議進(jìn)行任何更改。因此,這些新方法應(yīng)該適用于支持這些協(xié)議的任何代理。

二十、未來(lái)的工作

每個(gè)連接器的粒度
我們可能希望在每個(gè)連接器的基礎(chǔ)上啟用一次。這可以通過(guò)使用“合理的生產(chǎn)者”實(shí)例化每個(gè)源任務(wù)來(lái)實(shí)現(xiàn),該生產(chǎn)者使用事務(wù) ID 寫(xiě)入 Kafka,但實(shí)際上并不在事務(wù)內(nèi)生成記錄。這樣,如果為該連接器啟用了恰好一次,則其第一輪僵尸防護(hù)將能夠隔離所有先前的生產(chǎn)者實(shí)例,即使它們不使用傳統(tǒng)的事務(wù)性生產(chǎn)者。

獨(dú)立模式支持
由于源記錄及其偏移量的原子寫(xiě)入設(shè)計(jì)依賴于存儲(chǔ)在 Kafka 主題中的源偏移量,因此獨(dú)立模式不符合條件。如果有足夠的需求,我們將來(lái)可能會(huì)將此功能添加到獨(dú)立模式中。

默認(rèn)啟用
在該功能已可用于多個(gè)版本并且看起來(lái)足夠穩(wěn)定之后,我們可能希望默認(rèn)啟用該功能。

更保守的任務(wù)圍欄
如果源分區(qū)在這些任務(wù)之間的劃分不改變,則可能沒(méi)有必要隔離上一代的所有任務(wù)。但是,不可能知道源分區(qū)到任務(wù)的實(shí)際分配,因?yàn)檫B接器沒(méi)有 API 能夠向 Connect 框架提供此信息??梢詫?duì) Connect 框架進(jìn)行擴(kuò)展以添加此 API,但留待將來(lái)工作。對(duì)于此類 API 需要考慮的一個(gè)重要問(wèn)題是,它是否會(huì)考慮連接器本身不可見(jiàn)的配置更改,例如轉(zhuǎn)換器或轉(zhuǎn)換鏈更改。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-508619.html

到了這里,關(guān)于Kafka系列之:對(duì)源連接器的的Exactly-Once支持的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫(xiě)入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語(yǔ)法及說(shuō)明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過(guò)flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(

    2024年02月08日
    瀏覽(22)
  • Flink系列之:Elasticsearch SQL 連接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 連接器允許將數(shù)據(jù)寫(xiě)入到 Elasticsearch 引擎的索引中。本文檔描述運(yùn)行 SQL 查詢時(shí)如何設(shè)置 Elasticsearch 連接器。 連接器可以工作在 upsert 模式,使用 DDL 中定義的主鍵與外部系統(tǒng)交換 UPDATE/DELETE 消息。 如果 DDL 中沒(méi)有定義主鍵,那么

    2024年02月04日
    瀏覽(22)
  • Flink系列之:JDBC SQL 連接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)向任意類型的關(guān)系型數(shù)據(jù)庫(kù)讀取或者寫(xiě)入數(shù)據(jù)。本文檔描述了針對(duì)關(guān)系型數(shù)據(jù)庫(kù)如何通過(guò)建立 JDBC 連接器來(lái)執(zhí)行 SQL 查詢。 如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外

    2024年02月02日
    瀏覽(24)
  • Semantic Kernel 入門(mén)系列:?Connector連接器

    Semantic Kernel 入門(mén)系列:?Connector連接器

    當(dāng)我們使用Native Function的時(shí)候,除了處理一些基本的邏輯操作之外,更多的還是需要進(jìn)行外部數(shù)據(jù)源和服務(wù)的對(duì)接,要么是獲取相關(guān)的數(shù)據(jù),要么是保存輸出結(jié)果。這一過(guò)程在Semantic Kernel中可以被歸類為Connector。 Connector更像是一種設(shè)計(jì)模式,并不像Function和Memory 一樣有強(qiáng)制和

    2023年04月15日
    瀏覽(26)
  • Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號(hào)

    Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號(hào)

    Debezium 信號(hào)機(jī)制提供了一種修改連接器行為或觸發(fā)一次性操作(例如啟動(dòng)表的臨時(shí)快照)的方法。要使用信號(hào)觸發(fā)連接器執(zhí)行指定操作,可以將連接器配置為使用以下一個(gè)或多個(gè)通道: 源信號(hào)通道:可以發(fā)出 SQL 命令將信號(hào)消息添加到專門(mén)的信令數(shù)據(jù)集合中。在源數(shù)據(jù)庫(kù)上創(chuàng)

    2024年02月03日
    瀏覽(24)
  • Debezium日常分享系列之:使用 Debezium 連接器實(shí)現(xiàn)密鑰外部化

    隱藏?cái)?shù)據(jù)庫(kù)的賬號(hào)和密碼 當(dāng) Debezium 連接器部署到 Kafka Connect 實(shí)例時(shí),有時(shí)需要對(duì) Connect API 的其他用戶隱藏?cái)?shù)據(jù)庫(kù)憑據(jù)。 讓我們回顧一下 MySQL Debezium connector的連接器注冊(cè)請(qǐng)求: 用戶名和密碼以純字符串形式傳遞給 API。更糟糕的是,任何有權(quán)訪問(wèn) Kafka Connect 集群及其 REST AP

    2024年02月16日
    瀏覽(22)
  • Flink系列之:Flink CDC深入了解MySQL CDC連接器

    Flink系列之:Flink CDC深入了解MySQL CDC連接器

    增量快照讀取是一種讀取表快照的新機(jī)制。與舊的快照機(jī)制相比,增量快照具有許多優(yōu)點(diǎn),包括: (1)在快照讀取期間,Source 支持并發(fā)讀取 (2)在快照讀取期間,Source 支持進(jìn)行 chunk 粒度的 checkpoint (3)在快照讀取之前,Source 不需要數(shù)據(jù)庫(kù)鎖權(quán)限。 如果希望 source 并行運(yùn)

    2024年02月02日
    瀏覽(30)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(27)
  • 滿足新能源三電系統(tǒng)氣密和電性能測(cè)試的E10系列多功能電連接器

    滿足新能源三電系統(tǒng)氣密和電性能測(cè)試的E10系列多功能電連接器

    在新能源汽車(chē)的測(cè)試領(lǐng)域中,三電系統(tǒng)的測(cè)試是質(zhì)量管控過(guò)程中非常重要的組成部分。無(wú)論是防水防塵的氣密性測(cè)試,還是EOL/DCR等電性能相關(guān)的測(cè)試,都是確保新能源汽車(chē)正常工作中不可缺少的一部分。 在以往的測(cè)試中,每種測(cè)試都是獨(dú)立的。每個(gè)測(cè)試工序中,都需要獨(dú)立

    2024年02月07日
    瀏覽(18)
  • 汽車(chē)連接器接線端子和多芯線束連接界面

    汽車(chē)連接器接線端子和多芯線束連接界面

    冷壓接的開(kāi)式壓接和閉式壓接以及熱壓接的超聲波焊接對(duì)汽車(chē)連接器接線端子和多芯線束連接界面 連接器接線端子和多芯線束的連接是電子線束行業(yè),特別是汽車(chē)行業(yè)常用的導(dǎo)線連接方式。汽車(chē)整車(chē)線束又由許多分支線束組成,而分支線束必須通過(guò)連接器實(shí)現(xiàn)連接,連接核心

    2024年01月19日
    瀏覽(16)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包