国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka系列之:連接器客戶(hù)端配置覆蓋策略

這篇具有很好參考價(jià)值的文章主要介紹了Kafka系列之:連接器客戶(hù)端配置覆蓋策略。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、背景

KAFKA引入了每個(gè)源連接器和接收器連接器從工作線(xiàn)程屬性繼承其客戶(hù)端配置的功能。在工作線(xiàn)程屬性中,任何具有“生產(chǎn)者”或“消費(fèi)者”前綴的配置。分別應(yīng)用于所有源連接器和接收器連接器。雖然最初的提案允許覆蓋源連接器和接收器連接器,但它在允許連接器的不同配置方面仍然受到限制。通常,連接用戶(hù)希望能夠執(zhí)行以下操作:-

  • 對(duì)于每個(gè)連接器使用不同的主體,以便它們可以在細(xì)粒度級(jí)別控制 ACL
  • 能夠優(yōu)化每個(gè)連接器的生產(chǎn)者和消費(fèi)者配置,以便根據(jù)其性能特征設(shè)置連接器

KIP-296:客戶(hù)端配置的連接器級(jí)別可配置性旨在通過(guò)允許覆蓋所有配置來(lái)解決此問(wèn)題。但 KIP 不提供連接操作員控制連接器可以覆蓋的內(nèi)容的能力。如果沒(méi)有這種能力,連接器和工作人員之間將不會(huì)有清晰的分隔線(xiàn),因?yàn)檫B接器本身現(xiàn)在可以假設(shè)覆蓋可用。但從操作角度來(lái)看,最好執(zhí)行以下規(guī)定。

  • 能夠控制可以覆蓋的配置鍵。例如管理員可能永遠(yuǎn)不希望代理端點(diǎn)被覆蓋
  • 能夠控制被覆蓋的配置的允許值。這有助于管理員定義其集群的邊界并有效地管理多租戶(hù)集群。管理員可能永遠(yuǎn)不希望“send.buffer.bytes”超過(guò) 512 kb
  • 能夠根據(jù)連接器類(lèi)型、客戶(hù)端類(lèi)型(管理員、生產(chǎn)者、消費(fèi)者)等控制上述內(nèi)容。

基于上述上下文,該提案允許管理員圍繞可以覆蓋的內(nèi)容定義/實(shí)施策略,與“CreateTopicPolicy”非常相似,“CreateTopicPolicy”允許和控制在主題級(jí)別指定的配置。

二、公共接口

在較高層面上,該提案旨在引入類(lèi)似于 Core Kafka 中可用的 CreateTopicPolicy 的可配置策略,用于連接器客戶(hù)端配置覆蓋。更具體地說(shuō),我們將引入一個(gè)新的工作配置,該配置將允許管理員配置連接器客戶(hù)端配置覆蓋的策略。

新配置

Connector.client.config.override.policy - 這將是連接 API 中引入的新接口 ConnectorClientConfigOverridePolicy 的實(shí)現(xiàn)。默認(rèn)值為“None”,不允許任何覆蓋。由于用戶(hù)已經(jīng)使用建議的前綴進(jìn)行配置的可能性非常小,因此向后兼容性通常不是問(wèn)題。在極少數(shù)情況下,用戶(hù)在現(xiàn)有配置中擁有這些配置,他們必須刪除配置才能使其再次工作。

可以使用以下前綴在連接器配置中指定覆蓋

  • Producer.override. - 用于 SinkConnector 上下文中的源連接器的生產(chǎn)者和 DLQ 生產(chǎn)者
  • consumer.override. - 用于接收器連接器
  • admin.override. - 用于在 Sink Connector 中創(chuàng)建 DLQ 主題(KIP 還允許使用 admin 前綴在工作線(xiàn)程中指定 DLQ 設(shè)置,以便與生產(chǎn)者和消費(fèi)者保持一致)

管理員可以指定 ConnectorClientConfigOverridePolicy 實(shí)現(xiàn)的完全限定類(lèi)名或別名(別名被計(jì)算為接口名稱(chēng)“ConnectorClientConfigOverridePolicy”的前綴,這正是大多數(shù)現(xiàn)有連接插件計(jì)算其別名的方式)。

新接口將被視為新的連接插件,并將通過(guò)插件路徑機(jī)制加載。這些插件將通過(guò)類(lèi)似于 RestExtension 和 ConfigProvider 的服務(wù)加載器機(jī)制來(lái)發(fā)現(xiàn)。新接口的結(jié)構(gòu)及其請(qǐng)求描述如下:-

import org.apache.kafka.common.config.ConfigValue;
 
/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {
 
 
    /**
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     *
     * If there are any policy violations, the connector will not be started.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
     *                                     its context; never {@code null}
     * @return List of Config, each Config should indicate if they are allowed via {@link ConfigValue#errorMessages}
     */
    List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
}
public class ConnectorClientConfigRequest {
 
    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class<? extends Connector> connectorClass;
 
    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends Connector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }
 
    /**
     * <pre>
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
     *
     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
     * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
    public Map<String, Object> clientProps() {
        return clientProps;
    }
 
    /**
     * <pre>
     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return enumeration specifying the client type that is being overriden by the worker; never null.
     */
    public ClientType clientType() {
        return clientType;
    }
 
    /**
     * Name of the connector specified in the connector config.
     *
     * @return name of the connector; never null.
     */
    public String connectorName() {
        return connectorName;
    }
 
    /**
     * Type of the Connector.
     *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }
 
    /**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }
 
    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

KIP 引入了 ConnectorClientConfigOverridePolicy 的以下實(shí)現(xiàn),如下表所示

類(lèi)名 別名 描述
AllConnectorClientConfigOverridePolicy All 允許覆蓋生產(chǎn)者、消費(fèi)者和管理員前綴的所有配置。
NoneConnectorClientConfigOverridePolicy None 不允許任何配置覆蓋。這將是默認(rèn)策略。
PrincipalConnectorClientConfigOverridePolicy Principal 允許覆蓋生產(chǎn)者、消費(fèi)者和管理員前綴的“security.protocol”、“sasl.jaas.config”和“sasl.mechanism”。能夠?yàn)槊總€(gè)連接器使用不同的主體。

由于用戶(hù)可以指定任何這些策略,因此連接器本身不應(yīng)依賴(lài)于這些配置的可用性。這些覆蓋純粹是從操作角度使用的。

當(dāng)用戶(hù)嘗試創(chuàng)建連接器或驗(yàn)證連接器時(shí),將強(qiáng)制執(zhí)行策略本身。當(dāng)任何 ConfigValue 有錯(cuò)誤消息時(shí)

  • 驗(yàn)證期間,響應(yīng)將包含錯(cuò)誤,并且不符合策略的特定配置也會(huì)包含響應(yīng)中包含的錯(cuò)誤消息
  • 在創(chuàng)建/更新連接器期間,連接器將無(wú)法啟動(dòng)

三、推薦的改動(dòng)

如上一節(jié)所述,設(shè)計(jì)將包括引入新的工作配置和定義覆蓋策略的接口。

工作人員將在創(chuàng)建連接器流程期間應(yīng)用該策略,如下所示。被覆蓋的配置將在沒(méi)有策略前綴的情況下傳遞:-

  • 為 WorkerSourceTask 構(gòu)建生產(chǎn)者 - 使用“ Producer.override ”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Producer, ConnectorType=Source 并覆蓋(如果沒(méi)有違反策略)

  • 為 DLQ 主題構(gòu)建 DeadLetterQueueReporter 的管理客戶(hù)端和生產(chǎn)者

    • 使用“ Producer.override ”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Producer, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)
    • 使用“admin.override”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Admin, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)
  • 為 WorkerSinkTask 構(gòu)建 Consumer - 使用“consumer.override”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Consumer, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)

在 validate() 流程中,herder(AbstractHerder) 將按如下所示對(duì)所有覆蓋應(yīng)用該策略。被覆蓋的配置將在沒(méi)有前綴的情況下傳遞:-

  • 如果它是源連接器,請(qǐng)?jiān)诿總€(gè)帶有“生產(chǎn)者”的連接器配置上應(yīng)用策略。添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))
  • 如果是水槽連接器,將策略應(yīng)用到每個(gè)帶有“consumer”的連接器配置上。添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))
  • 使用“admin”在每個(gè)連接器配置上應(yīng)用該策略。啟用 DLQ 時(shí)添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))

四、兼容性、棄用和遷移計(jì)劃

有人擁有帶有建議前綴的連接器的可能性非常小,因此向后兼容性并不是真正的問(wèn)題。在極少數(shù)情況下,如果用戶(hù)具有帶有這些前綴的配置,他們必須刪除配置或更改策略才能使其正常工作。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-503916.html

到了這里,關(guān)于Kafka系列之:連接器客戶(hù)端配置覆蓋策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀(guān)點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫(xiě)入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語(yǔ)法及說(shuō)明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過(guò)flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(

    2024年02月08日
    瀏覽(22)
  • 成集云 | 抖店連接器客戶(hù)靜默下單催付數(shù)據(jù)同步釘釘 | 解決方案

    成集云 | 抖店連接器客戶(hù)靜默下單催付數(shù)據(jù)同步釘釘 | 解決方案

    源系統(tǒng) 成集云 目標(biāo)系統(tǒng) 隨著各品牌全渠道鋪貨,主播在平臺(tái)上直播時(shí)客戶(hù)下了訂單后不能及時(shí)付款,第一時(shí)間客戶(hù)收不到提醒,不僅造成了客戶(hù)付款率下降,更大量消耗了企業(yè)的人力成本和經(jīng)濟(jì)。而成集云與釘釘深度合作,企業(yè)可以通過(guò)成集云-抖店連接器將電商平臺(tái)的數(shù)據(jù)

    2024年02月11日
    瀏覽(14)
  • Flink系列之:Elasticsearch SQL 連接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 連接器允許將數(shù)據(jù)寫(xiě)入到 Elasticsearch 引擎的索引中。本文檔描述運(yùn)行 SQL 查詢(xún)時(shí)如何設(shè)置 Elasticsearch 連接器。 連接器可以工作在 upsert 模式,使用 DDL 中定義的主鍵與外部系統(tǒng)交換 UPDATE/DELETE 消息。 如果 DDL 中沒(méi)有定義主鍵,那么

    2024年02月04日
    瀏覽(22)
  • Flink系列之:JDBC SQL 連接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)向任意類(lèi)型的關(guān)系型數(shù)據(jù)庫(kù)讀取或者寫(xiě)入數(shù)據(jù)。本文檔描述了針對(duì)關(guān)系型數(shù)據(jù)庫(kù)如何通過(guò)建立 JDBC 連接器來(lái)執(zhí)行 SQL 查詢(xún)。 如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外

    2024年02月02日
    瀏覽(24)
  • Semantic Kernel 入門(mén)系列:?Connector連接器

    Semantic Kernel 入門(mén)系列:?Connector連接器

    當(dāng)我們使用Native Function的時(shí)候,除了處理一些基本的邏輯操作之外,更多的還是需要進(jìn)行外部數(shù)據(jù)源和服務(wù)的對(duì)接,要么是獲取相關(guān)的數(shù)據(jù),要么是保存輸出結(jié)果。這一過(guò)程在Semantic Kernel中可以被歸類(lèi)為Connector。 Connector更像是一種設(shè)計(jì)模式,并不像Function和Memory 一樣有強(qiáng)制和

    2023年04月15日
    瀏覽(26)
  • Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號(hào)

    Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號(hào)

    Debezium 信號(hào)機(jī)制提供了一種修改連接器行為或觸發(fā)一次性操作(例如啟動(dòng)表的臨時(shí)快照)的方法。要使用信號(hào)觸發(fā)連接器執(zhí)行指定操作,可以將連接器配置為使用以下一個(gè)或多個(gè)通道: 源信號(hào)通道:可以發(fā)出 SQL 命令將信號(hào)消息添加到專(zhuān)門(mén)的信令數(shù)據(jù)集合中。在源數(shù)據(jù)庫(kù)上創(chuàng)

    2024年02月03日
    瀏覽(25)
  • Kafka系列 - 生產(chǎn)者客戶(hù)端架構(gòu)以及3個(gè)重要參數(shù)

    Kafka系列 - 生產(chǎn)者客戶(hù)端架構(gòu)以及3個(gè)重要參數(shù)

    整個(gè)生產(chǎn)者客戶(hù)端由兩個(gè)縣城協(xié)調(diào)運(yùn)行,這兩個(gè)線(xiàn)程分別為主線(xiàn)程和Sender線(xiàn)程(發(fā)送線(xiàn)程)。 主線(xiàn)程中由KafkaProducer創(chuàng)建消息,然后通過(guò)可能的攔截器,序列化器和分區(qū)器之后緩存到 消息累加器(RecordAccumulator) 。Sender線(xiàn)程負(fù)責(zé)從RecordAccumulator中獲取消息并將其發(fā)送到kafka中。

    2024年02月04日
    瀏覽(22)
  • 【Kafka源碼走讀】Admin接口的客戶(hù)端與服務(wù)端的連接流程

    【Kafka源碼走讀】Admin接口的客戶(hù)端與服務(wù)端的連接流程

    注:本文對(duì)應(yīng)的kafka的源碼的版本是trunk分支。寫(xiě)這篇文章的主要目的是當(dāng)作自己閱讀源碼之后的筆記,寫(xiě)的有點(diǎn)凌亂,還望大佬們海涵,多謝! 最近在寫(xiě)一個(gè)Web版的kafka客戶(hù)端工具,然后查看Kafka官網(wǎng),發(fā)現(xiàn)想要與Server端建立連接,只需要執(zhí)行 方法即可,但其內(nèi)部是如何工作

    2024年02月16日
    瀏覽(27)
  • Debezium日常分享系列之:使用 Debezium 連接器實(shí)現(xiàn)密鑰外部化

    隱藏?cái)?shù)據(jù)庫(kù)的賬號(hào)和密碼 當(dāng) Debezium 連接器部署到 Kafka Connect 實(shí)例時(shí),有時(shí)需要對(duì) Connect API 的其他用戶(hù)隱藏?cái)?shù)據(jù)庫(kù)憑據(jù)。 讓我們回顧一下 MySQL Debezium connector的連接器注冊(cè)請(qǐng)求: 用戶(hù)名和密碼以純字符串形式傳遞給 API。更糟糕的是,任何有權(quán)訪(fǎng)問(wèn) Kafka Connect 集群及其 REST AP

    2024年02月16日
    瀏覽(22)
  • Flink系列之:Flink CDC深入了解MySQL CDC連接器

    Flink系列之:Flink CDC深入了解MySQL CDC連接器

    增量快照讀取是一種讀取表快照的新機(jī)制。與舊的快照機(jī)制相比,增量快照具有許多優(yōu)點(diǎn),包括: (1)在快照讀取期間,Source 支持并發(fā)讀取 (2)在快照讀取期間,Source 支持進(jìn)行 chunk 粒度的 checkpoint (3)在快照讀取之前,Source 不需要數(shù)據(jù)庫(kù)鎖權(quán)限。 如果希望 source 并行運(yùn)

    2024年02月02日
    瀏覽(30)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包