一、背景
KAFKA引入了每個(gè)源連接器和接收器連接器從工作線(xiàn)程屬性繼承其客戶(hù)端配置的功能。在工作線(xiàn)程屬性中,任何具有“生產(chǎn)者”或“消費(fèi)者”前綴的配置。分別應(yīng)用于所有源連接器和接收器連接器。雖然最初的提案允許覆蓋源連接器和接收器連接器,但它在允許連接器的不同配置方面仍然受到限制。通常,連接用戶(hù)希望能夠執(zhí)行以下操作:-
- 對(duì)于每個(gè)連接器使用不同的主體,以便它們可以在細(xì)粒度級(jí)別控制 ACL
- 能夠優(yōu)化每個(gè)連接器的生產(chǎn)者和消費(fèi)者配置,以便根據(jù)其性能特征設(shè)置連接器
KIP-296:客戶(hù)端配置的連接器級(jí)別可配置性旨在通過(guò)允許覆蓋所有配置來(lái)解決此問(wèn)題。但 KIP 不提供連接操作員控制連接器可以覆蓋的內(nèi)容的能力。如果沒(méi)有這種能力,連接器和工作人員之間將不會(huì)有清晰的分隔線(xiàn),因?yàn)檫B接器本身現(xiàn)在可以假設(shè)覆蓋可用。但從操作角度來(lái)看,最好執(zhí)行以下規(guī)定。
- 能夠控制可以覆蓋的配置鍵。例如管理員可能永遠(yuǎn)不希望代理端點(diǎn)被覆蓋
- 能夠控制被覆蓋的配置的允許值。這有助于管理員定義其集群的邊界并有效地管理多租戶(hù)集群。管理員可能永遠(yuǎn)不希望“send.buffer.bytes”超過(guò) 512 kb
- 能夠根據(jù)連接器類(lèi)型、客戶(hù)端類(lèi)型(管理員、生產(chǎn)者、消費(fèi)者)等控制上述內(nèi)容。
基于上述上下文,該提案允許管理員圍繞可以覆蓋的內(nèi)容定義/實(shí)施策略,與“CreateTopicPolicy”非常相似,“CreateTopicPolicy”允許和控制在主題級(jí)別指定的配置。
二、公共接口
在較高層面上,該提案旨在引入類(lèi)似于 Core Kafka 中可用的 CreateTopicPolicy 的可配置策略,用于連接器客戶(hù)端配置覆蓋。更具體地說(shuō),我們將引入一個(gè)新的工作配置,該配置將允許管理員配置連接器客戶(hù)端配置覆蓋的策略。
新配置
Connector.client.config.override.policy - 這將是連接 API 中引入的新接口 ConnectorClientConfigOverridePolicy 的實(shí)現(xiàn)。默認(rèn)值為“None”,不允許任何覆蓋。由于用戶(hù)已經(jīng)使用建議的前綴進(jìn)行配置的可能性非常小,因此向后兼容性通常不是問(wèn)題。在極少數(shù)情況下,用戶(hù)在現(xiàn)有配置中擁有這些配置,他們必須刪除配置才能使其再次工作。
可以使用以下前綴在連接器配置中指定覆蓋
-
Producer.override.
- 用于 SinkConnector 上下文中的源連接器的生產(chǎn)者和 DLQ 生產(chǎn)者 -
consumer.override.
- 用于接收器連接器 -
admin.override.
- 用于在 Sink Connector 中創(chuàng)建 DLQ 主題(KIP 還允許使用admin
前綴在工作線(xiàn)程中指定 DLQ 設(shè)置,以便與生產(chǎn)者和消費(fèi)者保持一致)
管理員可以指定 ConnectorClientConfigOverridePolicy 實(shí)現(xiàn)的完全限定類(lèi)名或別名(別名被計(jì)算為接口名稱(chēng)“ConnectorClientConfigOverridePolicy”的前綴,這正是大多數(shù)現(xiàn)有連接插件計(jì)算其別名的方式)。
新接口將被視為新的連接插件,并將通過(guò)插件路徑機(jī)制加載。這些插件將通過(guò)類(lèi)似于 RestExtension 和 ConfigProvider 的服務(wù)加載器機(jī)制來(lái)發(fā)現(xiàn)。新接口的結(jié)構(gòu)及其請(qǐng)求描述如下:-
import org.apache.kafka.common.config.ConfigValue;
/**
* <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
*
* <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
* and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
*/
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {
/**
* Worker will invoke this while constructing the producer for the SourceConnectors, DLQ for SinkConnectors and the consumer for the
* SinkConnectors to validate if all of the overridden client configurations are allowed per the
* policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
*
* If there are any policy violations, the connector will not be started.
*
* @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
* its context; never {@code null}
* @return List of Config, each Config should indicate if they are allowed via {@link ConfigValue#errorMessages}
*/
List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
}
public class ConnectorClientConfigRequest {
private Map<String, Object> clientProps;
private ClientType clientType;
private String connectorName;
private ConnectorType connectorType;
private Class<? extends Connector> connectorClass;
public ConnectorClientConfigRequest(
String connectorName,
ConnectorType connectorType,
Class<? extends Connector> connectorClass,
Map<String, Object> clientProps,
ClientType clientType) {
this.clientProps = clientProps;
this.clientType = clientType;
this.connectorName = connectorName;
this.connectorType = connectorType;
this.connectorClass = connectorClass;
}
/**
* <pre>
* Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
* Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
* Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
* Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
* </pre>
*
* @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
* {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
*/
public Map<String, Object> clientProps() {
return clientProps;
}
/**
* <pre>
* {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
* {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
* {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
* {@link ClientType#ADMIN} for DLQ Topic Creation in {@link ConnectorType#SINK}
* </pre>
*
* @return enumeration specifying the client type that is being overriden by the worker; never null.
*/
public ClientType clientType() {
return clientType;
}
/**
* Name of the connector specified in the connector config.
*
* @return name of the connector; never null.
*/
public String connectorName() {
return connectorName;
}
/**
* Type of the Connector.
*
* @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
*/
public ConnectorType connectorType() {
return connectorType;
}
/**
* The class of the Connector.
*
* @return the class of the Connector being created; never null
*/
public Class<? extends Connector> connectorClass() {
return connectorClass;
}
public enum ClientType {
PRODUCER, CONSUMER, ADMIN;
}
}
KIP 引入了 ConnectorClientConfigOverridePolicy 的以下實(shí)現(xiàn),如下表所示
類(lèi)名 | 別名 | 描述 |
---|---|---|
AllConnectorClientConfigOverridePolicy | All | 允許覆蓋生產(chǎn)者、消費(fèi)者和管理員前綴的所有配置。 |
NoneConnectorClientConfigOverridePolicy | None | 不允許任何配置覆蓋。這將是默認(rèn)策略。 |
PrincipalConnectorClientConfigOverridePolicy | Principal | 允許覆蓋生產(chǎn)者、消費(fèi)者和管理員前綴的“security.protocol”、“sasl.jaas.config”和“sasl.mechanism”。能夠?yàn)槊總€(gè)連接器使用不同的主體。 |
由于用戶(hù)可以指定任何這些策略,因此連接器本身不應(yīng)依賴(lài)于這些配置的可用性。這些覆蓋純粹是從操作角度使用的。
當(dāng)用戶(hù)嘗試創(chuàng)建連接器或驗(yàn)證連接器時(shí),將強(qiáng)制執(zhí)行策略本身。當(dāng)任何 ConfigValue 有錯(cuò)誤消息時(shí)
- 驗(yàn)證期間,響應(yīng)將包含錯(cuò)誤,并且不符合策略的特定配置也會(huì)包含響應(yīng)中包含的錯(cuò)誤消息
- 在創(chuàng)建/更新連接器期間,連接器將無(wú)法啟動(dòng)
三、推薦的改動(dòng)
如上一節(jié)所述,設(shè)計(jì)將包括引入新的工作配置和定義覆蓋策略的接口。
工作人員將在創(chuàng)建連接器流程期間應(yīng)用該策略,如下所示。被覆蓋的配置將在沒(méi)有策略前綴的情況下傳遞:-
-
為 WorkerSourceTask 構(gòu)建生產(chǎn)者 - 使用“ Producer.override ”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Producer, ConnectorType=Source 并覆蓋(如果沒(méi)有違反策略)
-
為 DLQ 主題構(gòu)建 DeadLetterQueueReporter 的管理客戶(hù)端和生產(chǎn)者
- 使用“ Producer.override ”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Producer, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)
- 使用“admin.override”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Admin, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)
-
為 WorkerSinkTask 構(gòu)建 Consumer - 使用“consumer.override”調(diào)用所有配置的驗(yàn)證。 prefix , ClientType=Consumer, ConnectorType=Sink 并覆蓋(如果沒(méi)有違反策略)
在 validate() 流程中,herder(AbstractHerder) 將按如下所示對(duì)所有覆蓋應(yīng)用該策略。被覆蓋的配置將在沒(méi)有前綴的情況下傳遞:-文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-503916.html
- 如果它是源連接器,請(qǐng)?jiān)诿總€(gè)帶有“生產(chǎn)者”的連接器配置上應(yīng)用策略。添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))
- 如果是水槽連接器,將策略應(yīng)用到每個(gè)帶有“consumer”的連接器配置上。添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))
- 使用“admin”在每個(gè)連接器配置上應(yīng)用該策略。啟用 DLQ 時(shí)添加前綴并更新 ConfigInfos 結(jié)果(驗(yàn)證 API 的響應(yīng))
四、兼容性、棄用和遷移計(jì)劃
有人擁有帶有建議前綴的連接器的可能性非常小,因此向后兼容性并不是真正的問(wèn)題。在極少數(shù)情況下,如果用戶(hù)具有帶有這些前綴的配置,他們必須刪除配置或更改策略才能使其正常工作。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-503916.html
到了這里,關(guān)于Kafka系列之:連接器客戶(hù)端配置覆蓋策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!