1 Security
在Kafka的官方文檔中介紹了如下幾種安全方式,以滿足用戶不同場(chǎng)景對(duì)安全的需求。
● SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是為 Kerberos 使用,如果當(dāng)前已有 Kerberos 認(rèn)證,只需要為集群中每個(gè) Broker 和訪問(wèn)用戶申請(qǐng) Principle ,然后在 Kafka 配置文件中開啟 Kerberos 的支持即可。官方文檔詳見(jiàn) Authentication using SASL/Kerberos。
● SASL/PLAIN - starting at version 0.10.0.0。一個(gè)簡(jiǎn)單的用戶名和密碼身份認(rèn)證機(jī)制,通常與 TLS/SSL 一起用于加密,以實(shí)現(xiàn)身份驗(yàn)證。是一種比較容易使用的方式,但是也有一個(gè)很明顯的缺點(diǎn),這種方式會(huì)把用戶賬戶文件配置到靜態(tài)文件中,每次想要添加新的賬戶都需要重啟 Kafka 去加載靜態(tài)文件,才能生效,十分不方便。官方文檔詳見(jiàn) Authentication using SASL/PLAIN。
● SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通過(guò)將認(rèn)證信息保存在 ZooKeeper 里面,從而動(dòng)態(tài)的獲取用戶信息,相當(dāng)于把 ZK 用作一個(gè)認(rèn)證中心使用。這種認(rèn)證可以在使用過(guò)程中,使用 Kafka 提供的命令動(dòng)態(tài)地創(chuàng)建和刪除用戶,無(wú)需重啟整個(gè)集群,十分方便。官方文檔詳見(jiàn) Authentication using SASL/SCRAM 。
● SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新認(rèn)證機(jī)制,主要是為了實(shí)現(xiàn)與 OAuth2 框架的集成,Kafka 不提倡單純使用 OAUTHBEARER,因?yàn)樗傻牟话踩?Json Web Token,必須配以 SSL 加密才能在生產(chǎn)環(huán)境中使用。官方文檔詳見(jiàn) Authentication using SASL/OAUTHBEARER 。
2 SASL+ACL實(shí)現(xiàn)用戶及權(quán)限認(rèn)證
以 Kafka 3.3.1 為例。
2.1 下載
wget https://repo.huaweicloud.com/apache/kafka/3.3.1/kafka_2.12-3.3.1.tgz -P /data
cd /data
tar -zxf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka
cd /data/kafka
2.2 Kafka服務(wù)配置
Kafka server 的 jaas 文件如下 /data/kafka/config/jaas/kafka_server_jaas.conf
。這里創(chuàng)建五個(gè)用戶,密碼分別如下,后會(huì)會(huì)對(duì)其進(jìn)行分別授權(quán)。
用戶 | 密碼 |
---|---|
admin | admin123 |
kwriter | kwriter123 |
kreader | kreader123 |
sharga | sharga123 |
shargb | shargb123 |
mkdir /data/kafka/config/jaas/
vim /data/kafka/config/jaas/kafka_server_jaas.conf
配置如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin123"
user_admin="admin123"
user_kwriter="kwriter123"
user_kreader="kreader123"
user_sharga="sharga123"
user_shargb="shargb123";
};
2.3 修改Kafka 服務(wù)啟動(dòng)腳本
1 復(fù)制一份作為帶安全的啟動(dòng)腳本
cp /data/kafka/bin/kafka-server-start.sh /data/kafka/bin/sasl_kafka-server-start.sh
2 修改大概 44 行為如下,引入上面的 jaas.conf 文件,修改為如下
export LOG_DIR=/var/log/kafka-logs
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/jaas/kafka_server_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
2.4 配置server.properties
拷貝一份 cp /data/kafka/config/server.properties /data/kafka/config/jaas/server.properties
,配置如下:
broker.id=0
listeners=SASL_PLAINTEXT://192.168.179.3:9092
advertised.listeners=SASL_PLAINTEXT://192.168.179.3:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# ACL 入口類
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 設(shè)置 admin 用戶為超級(jí)管理員
super.users=User:admin
auto.create.topics.enable=true
log.dirs=/data/kafka/data/kafka-logs
zookeeper.connect=192.168.179.3:2181/kafka
創(chuàng)建需要的目錄
mkdir -p /data/kafka/data/{kafka-logs,zookeeper}
2.5 啟動(dòng)Zookeeper
ZK服務(wù)地址可以使用Kafka自帶的,也可以使用已部署在的ZK。如果已有ZK可以跳過(guò)這一步。這里以Kafka自帶的為例。
配置 zookeeper.properties, vim /data/kafka/config/zookeeper.properties
:
dataDir=/data/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
啟動(dòng)ZK
/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
查看日志:
tail -f /var/log/zookeeper/server.log
可以通過(guò)如下命令進(jìn)一步查看 znode 信息
/data/kafka/bin/zookeeper-shell.sh 192.168.179.3:2181
2.6 啟動(dòng)Kafka 集群
# 1 啟動(dòng) Kafka
/data/kafka/bin/sasl_kafka-server-start.sh -daemon /data/kafka/config/jaas/server.properties
# 2 查看啟動(dòng)日志
tail -f /var/log/kafka-logs/server.log
關(guān)閉的命令如下:
/data/kafka/bin/kafka-server-stop.sh
/data/kafka/bin/zookeeper-server-stop.sh
如果需要調(diào)整Kafka JVM參數(shù),可以如下進(jìn)行修改,調(diào)整期堆大小,或者其它參數(shù)
vim /data/kafka/bin/sasl_kafka-server-start.sh +28
2.7 ACL
接下來(lái)我們對(duì)用戶進(jìn)行授權(quán),大概按照如下進(jìn)行設(shè)置。
用戶 | 密碼 | 權(quán)限 | 說(shuō)明 |
---|---|---|---|
admin | admin123 | All | 超級(jí)管理員用戶 |
kwriter | kwriter123 | producer | 生產(chǎn)者類權(quán)限,Writer、Create、Describe |
kreader | kreader123 | consumer | 消費(fèi)者類權(quán)限,Read、Describe |
sharga | sharga123 | All | 僅對(duì)sharga-* 類Topic有所有權(quán) |
shargb | shargb123 | All | 僅對(duì)shargb-* 類Topic有所有權(quán) |
# 1 授予kwriter用戶為生產(chǎn)者權(quán)限
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:kwriter --producer --topic "*"
# 2 授予kreader 用戶為消費(fèi)者權(quán)限
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:kreader --consumer --topic "*" --group "*"
# 3 對(duì)sharga用戶進(jìn)行授權(quán),前綴模式
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:sharga --group "*" --topic "sharga" --resource-pattern-type prefixed
# 4 對(duì)shargb用戶進(jìn)行授權(quán),前綴模式
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:shargb --group "*" --topic "shargb" --resource-pattern-type prefixed
# 5 查看賦權(quán)
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka --list
小的測(cè)試驗(yàn)證(在沒(méi)有使用正確用戶時(shí),執(zhí)行如下會(huì)失?。?/p>
# 1 創(chuàng)建 Topic(客戶端沒(méi)有使用認(rèn)證是,創(chuàng)建topic會(huì)失?。?/span>
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 \
--partitions 1 --replication-factor 1 --topic test_sasl
# 2 查看 topic(客戶端沒(méi)有使用認(rèn)證是,獲取topic會(huì)失?。?/span>
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 | grep test
# 3 查看 topic詳情(客戶端沒(méi)有使用認(rèn)證是,獲取topic詳情會(huì)失敗)
/data/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.179.3:9092 \
--describe --topic test_sasl
# 4 生產(chǎn)者(此時(shí)會(huì)因?yàn)闄?quán)限問(wèn)題,而報(bào)錯(cuò),下面來(lái)解決這個(gè)問(wèn)題)
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.179.3:9092 \
--topic test_sasl
# 5 消費(fèi)者(此時(shí)會(huì)因?yàn)闄?quán)限問(wèn)題,而報(bào)錯(cuò),下面來(lái)解決這個(gè)問(wèn)題)
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 \
--from-beginning --topic test_sasl
2.7.1 admin
新增admin配置屬性文件 vim /data/kafka/config/jaas/admin.properties
,內(nèi)容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
以 admin 身份執(zhí)行如下命令
# 1 再次創(chuàng)建 topic,此時(shí)可以成功創(chuàng)建
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--partitions 1 --replication-factor 1 --topic test_sasl
# 2 list
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties
# 3 topic詳新信息
/data/kafka/bin/kafka-topics.sh --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--describe --topic test_sasl
如果需要?jiǎng)h除Topic,可使用如下命令
/data/kafka/bin/kafka-topics.sh --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--delete --topic test_sasl
其它(可以執(zhí)行完下面生產(chǎn)者或消費(fèi)者命令后再執(zhí)行查看)
# 1 查看當(dāng)前的消費(fèi)者組
/data/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/admin.properties --list
# 2 查看消費(fèi)者消費(fèi)進(jìn)度
/data/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/admin.properties --describe --group test-consumer-group
# 3 查看 Kafka Topic 的 offset 信息
/data/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties --time -1 --topic test_sasl
2.7.2 生產(chǎn)者
新增生產(chǎn)者kwrite用戶的配置屬性文件 vim /data/kafka/config/jaas/kwriter.properties
,內(nèi)容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kwriter" password="kwriter123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
使用客戶端命令行測(cè)試
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 \
--producer.config /data/kafka/config/jaas/kwriter.properties --topic test_sasl
發(fā)送如下的數(shù)據(jù):
{"id":100,"name":"one","age":1}
{"id":101,"name":"two","age":2}
{"id":102,"name":"three","age":3}
2.7.3 消費(fèi)者
新增生產(chǎn)者kreader用戶的配置屬性文件 vim /data/kafka/config/jaas/kreader.properties
,內(nèi)容如下:
group.id=test-consumer-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kreader" password="kreader123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
消費(fèi)者測(cè)試:
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 \
--consumer.config /data/kafka/config/jaas/kreader.properties --from-beginning --topic test_sasl
2.7.4 sharga用戶
新增生產(chǎn)者sharga用戶的配置屬性文件 vim /data/kafka/config/jaas/sharga.properties
,內(nèi)容如下:
group.id=test-sharga-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sharga" password="sharga123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
執(zhí)行如下命令:
# 1 創(chuàng)建自己的topic
## sharga-t01
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties --partitions 1 --replication-factor 1 --topic sharga-t01
# 2 生產(chǎn)者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/sharga.properties --topic sharga-t01
向sharga-t01的Topic發(fā)送如下消息:
{"id":100,"name":"one","age":1}
{"id":101,"name":"two","age":2}
{"id":102,"name":"three","age":3}
#3 同樣創(chuàng)建 sharga-t02 Topic
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties --partitions 1 --replication-factor 1 --topic sharga-t02
# 4 生產(chǎn)者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/sharga.properties --topic sharga-t02
向sharga-t02的Topic發(fā)送如下消息:
{"id":100,"sex":"man"}
{"id":101,"sex":"man"}
{"id":102,"sex":"woman"}
# 5 查看當(dāng)前的 topic 列表
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties
sharga用戶僅能查看到自己的Topic
2.7.5 shargb用戶
新增生產(chǎn)者shargb用戶的配置屬性文件 vim /data/kafka/config/jaas/shargb.properties
,內(nèi)容如下:
group.id=test-shargb-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="shargb" password="shargb123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
執(zhí)行如下命令:
# 1 創(chuàng)建自己的topic
## shargb-t01
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties --partitions 1 --replication-factor 1 --topic shargb-t01
這里如果用 sharga用戶創(chuàng)建 shargb-t01 就會(huì)包錯(cuò)(沒(méi)有權(quán)限)
# 2 生產(chǎn)者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/shargb.properties --topic shargb-t01
向shargb-t01的Topic發(fā)送如下消息:
{"id":200,"name":"four","age":4}
{"id":201,"name":"five","age":5}
#3 同樣創(chuàng)建 shargb-t02 Topic
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties --partitions 1 --replication-factor 1 --topic shargb-t02
# 4 生產(chǎn)者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/shargb.properties --topic shargb-t02
向shargb-t02的Topic發(fā)送如下消息:
{"id":200,"addr":"北京"}
{"id":201,"addr":"杭州"}
# 5 查看當(dāng)前的 topic 列表
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties
shargb用戶同樣只能查看到自己的Topic文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-771624.html
2.7.6 說(shuō)明
這種方式是將用戶和密碼相關(guān)的信息保存在了服務(wù)器本地文件中了,經(jīng)過(guò)前面的配置,基本涉及到安全認(rèn)證的配置都放置到了/data/kafka/config/jaas
目錄下,因此如果需要在真實(shí)服務(wù)上使用時(shí),這個(gè)目錄需要經(jīng)讀寫權(quán)限詳細(xì)的分配給系統(tǒng)的每個(gè)用戶,對(duì)于不允許使用此用戶配置信息的其他用戶,這個(gè)文件的讀寫權(quán)限可以不授予給這個(gè)用戶。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-771624.html
2.8 生產(chǎn)者客戶端代碼設(shè)置
Properties props = new Properties();
//...
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='kwriter' password='kwriter123';");
2.9 消費(fèi)者客戶端代碼設(shè)置
Properties props = new Properties();
//...
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='kreader' password='kreader123';");
2.10 Spring-kafka需配置如下
spring:
#……
kafka:
bootstrap-servers: node01:9092 #指定kafka server的地址,集群配多個(gè),中間,逗號(hào)隔開
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# sasl 安全認(rèn)證
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='kwriter' password='kwriter123';
consumer:
group-id: app_consumer_group #群組ID
enable-auto-commit: true
auto-commit-interval: 1000s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='kreader' password='kreader123';
2.11 Flink SQL
create table kafka_sharga_t01(
id int,
name string,
age int
) with (
'format' = 'json',
'properties.bootstrap.servers' = '192.168.179.3:9092',
'connector' = 'kafka',
'topic' = 'sharga-t01',
'properties.group.id' = 'group1',
'scan.startup.mode' = 'earliest-offset',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="sharga" password="sharga123";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
到了這里,關(guān)于Kafka安全(以SASL+ACL為例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!