目錄
Kafka 集群配置
準(zhǔn)備
配置流程
Jaas(Java Authentication and Authorization Service?)文件
zookeeper 配置文件
SSL自簽名
啟動(dòng)zookeeper集群
啟動(dòng)kafka集群?
spring-cloud-starter-bus-kafka 集成
Kafka 集群配置
準(zhǔn)備
下載統(tǒng)一版本Kafka服務(wù)包至三臺(tái)不同的服務(wù)器上
文章使用版本為?kafka_2.13-3.5.0.tgz 下載地址
jdk版本為 Adopt JDK-17?OpenJDK17U-jdk_x64_linux_hotspot_17.0.7_7.tar.gz 下載地址
配置流程
Jaas(Java Authentication and Authorization Service?)文件
? ? ? ? 在kafka包解壓目錄下的?config?目錄下新建zookeeper認(rèn)證所需jaas文件,文件名隨意,以?.conf?結(jié)尾即可
? ? ? ? 文件內(nèi)容如下
? ? ? ? user_{username}為固定寫法 {username} 為用戶名 密碼為雙引號(hào)內(nèi)容
? ? ? ? 注意,這里zookeeper的jaas有三個(gè)用戶名和密碼分別對(duì)應(yīng)著三臺(tái)kafka broker去認(rèn)證時(shí)使用的用戶名和密碼,每一臺(tái)上的zookeeper的jaas文件內(nèi)容建議完全相同
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="super-sec"
user_kafkabroker1="kafkabroker1-sec"
user_kafkabroker2="kafkabroker2-sec"
user_kafkabroker3="kafkabroker3-sec";
};
? ? ? ? 在相同目錄下建立kafka所需認(rèn)證jaas文件
? ? ? ? 以下是三臺(tái)服務(wù)器中其中一臺(tái)的kafka jaas認(rèn)證文件內(nèi)容,Client內(nèi)容為本臺(tái)機(jī)器上的broker認(rèn)證本臺(tái)機(jī)器上的zookeeper的用戶名和密碼 ( 注意最后一行和倒數(shù)第二行需要有分號(hào)??!?) KafkaServer端有一對(duì) username="kbroker1"? password="kbroker1-sec" 是內(nèi)部brokers之間進(jìn)行認(rèn)證所用賬號(hào)密碼但是本文內(nèi)部broker配置為ssl鏈接,去掉應(yīng)該也沒事若不同則加一下
????????當(dāng)然每個(gè)broker的KafkaServer段也需要有定義這個(gè)用戶名和密碼( 對(duì)應(yīng)??user_kbroker1="kbroker1-sec" )???user_client="client-sec"?為外部客戶端認(rèn)證時(shí)所需用戶名密碼
這里為了方便,全部brokers共享一個(gè)賬號(hào),客戶端user_client(也就是連接Kakfa時(shí)的producer、consumer或者編程語(yǔ)言SDK讀取或配置客戶端jaas文件時(shí))也為統(tǒng)一用戶名密碼
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kbroker1"
password="kbroker1-sec"
user_kbroker1="kbroker1-sec"
user_client="client-sec";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafkabroker3"
password="kafkabroker3-sec";
};
另外兩臺(tái)用戶名密碼如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kbroker1"
password="kbroker1-sec"
user_kbroker1="kbroker1-sec"
user_client="client-sec";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafkabroker1"
password="kafkabroker1-sec";
};
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kbroker1"
password="kbroker1-sec"
user_kbroker1="kbroker1-sec"
user_client="client-sec";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafkabroker2"
password="kafkabroker2-sec";
};
zookeeper 配置文件
同樣是在config目錄下編輯zookeeper.properties文件
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/zookeeper-dir
dataLogDir=/opt/kafka/zookeeper-log
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
# 一個(gè)ip最多可以對(duì)這個(gè)zookeeper服務(wù)進(jìn)行連接的數(shù)量
maxClientCnxns=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=1.1.1.1:2182:1999
server.2=2.2.2.2:2182:1999
server.3=3.3.3.3:2182:1999
# security
# 開啟zookeeper sasl認(rèn)證必須配置
authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
maxClientCnxns=5
# 這里可以設(shè)置為false 同樣設(shè)置zookeeper jaas認(rèn)證也無效了
sessionRequireClientSASLAuth=true
jaasLoginRenew=360000000
注意 clientPort與 <外網(wǎng)ip>:<內(nèi)部互聯(lián)連端口>:<選舉專用端口> 這些端口要區(qū)分開來 不然zookeeper服務(wù)啟動(dòng)會(huì)報(bào)錯(cuò),三臺(tái)配置基本一直
注意 server.<int>=<外網(wǎng)ip> 若是連接本機(jī)有問題,可以將<外網(wǎng)ip>換成0.0.0.0
zookeeper啟動(dòng)腳本如下
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &
使用export 導(dǎo)出KAFKA_OPTS中的變量,讓zookeeper啟動(dòng)時(shí)加載jaas認(rèn)證文件,參數(shù)key為
-Djava.security.auth.login.config
nohup 可以讓zookeeper在后臺(tái)運(yùn)行,不占用終端,滾動(dòng)日志可以在?kafka-zookeeper-start.log 文件查看,若想滾動(dòng)查看日志可以用
tail -f?kafka-zookeeper-start.log
SSL自簽名
? ? 這個(gè)地方卡住很久,遇到了些bug都是關(guān)于自簽證書的問題,文章用下圖說明SSL證書在kafka中的使用流程 每個(gè)keystore包含數(shù)字簽名和證書
????????每臺(tái)機(jī)器上都應(yīng)該有自己的keystore與truststore文件,證書可以每臺(tái)上都使用openssl生成一張,但是,要把每臺(tái)機(jī)器上的證書必須互相導(dǎo)入到其他borkers的truststore與keystore中,而且每臺(tái)機(jī)器上的keystore還需要多次導(dǎo)入所有證書簽名之后生成的證書與數(shù)字簽名。不然在brokers互相創(chuàng)建SSL隧道時(shí)會(huì)有各種問題,例如下圖,將broker1機(jī)器上生成的kafka.client.truststore.jks直接 scp 傳輸?shù)降絙roker2 后使用,broker1 與 broker2建立SSL隧道時(shí),kafka config 目錄下log4j.properties修改TRACE級(jí)別日志記錄如下
?參考
原文連接
參考連接2
?很多網(wǎng)站上面只是做單臺(tái)機(jī)器或者單個(gè)證書的全部生成過程,這里記錄下自己的創(chuàng)建流程
?注意,全局只生成了一次CA ,僅包含一個(gè) ca-cert 與?ca-key
參考如下
?
?部署SSL 創(chuàng)建密鑰與證書,創(chuàng)建自簽名的頒發(fā)機(jī)構(gòu),證書簽名
?文件含義
?keystore 可以存儲(chǔ)私鑰、證書和對(duì)稱密鑰的存儲(chǔ)庫(kù)。
?引用stackoverflow的回答
?
?ca-cert 具體證書
?ca-key 證書私鑰
?ca-password 頒發(fā)機(jī)構(gòu)密鑰
?cert-file 導(dǎo)出未簽名的證書文件
?cert-signed 帶有數(shù)字簽名的證書
?首先在每個(gè)機(jī)器上面都要?jiǎng)?chuàng)建keystore密鑰庫(kù)
keytool命令無效可以去JAVA_HOME/bin目錄下找
SSL hostname校驗(yàn)可通過兩種方式配置
在kafka的配置文件中添加以下配置取消校驗(yàn)
ssl.endpoint.identification.algorithm=
或配置CN與SAN分別為hostname與FQDN什么是FQDN
?
?文章采用前者,忽略SSL對(duì)hostname的認(rèn)證并按照SAN格式創(chuàng)建keystore
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}
? ? ? ?-alias 后面用 hostname,localhost與hostname都可以
? ? ? ? {validity} 為過期時(shí)間 自簽可以長(zhǎng)一點(diǎn) 例 9999
????????{keystore-pass} 與?{key-pass} 為密碼,建議設(shè)為同一個(gè)值
????????-ext SAN=DNS:{hostname} 注意,必須為hostname (終端 鍵入hostname查看)
?這里my-host-name可用localhost代替? 或者用VPS云服務(wù)器專屬的hostname
?
?????????kafka.ser.keystore.jks生成結(jié)束
? ? ? ?創(chuàng)建CA證書
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
CN也要寫hostname或直接用localhost
此時(shí)一共生成三個(gè)文件
注意!copy ca-cert 與 ca-key 文件到所有kafka broker機(jī)器上,(若是想在其他機(jī)器上連接也要把這兩個(gè)文件拷貝過去,例如本地開發(fā)集成spring boot時(shí)),并放在固定位置?
ca-cert 與 ca-key 代表一張CA
導(dǎo)入 ca-cert 到所有brokers的kafka.server.truststore.jks中,終端交互輸入 yes
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
原文中多一部將kafka.server.keystore.jks密鑰庫(kù)的證書導(dǎo)出才簽名
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
證書簽名?
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
此時(shí)一共生成六個(gè)文件
向密鑰庫(kù) kafka.server.keystore.jks 導(dǎo)入證書與數(shù)字簽名
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
查看kafka.server.keystore.jks包含的內(nèi)容
keytool --list -v -keystore? kafka.server.keystore.jks
?
?全部命令如下,思路就是全局生成一張CA (包含ca-cert ca-key)
????????
- 每臺(tái)機(jī)器生成
- kafka.server.keystore.jks
- ca-cert導(dǎo)入到每一個(gè) kafka.server.truststore.jks(ca-cert導(dǎo)完了就生成) 的CAroot中
- 每一個(gè)kafka.server.keystore.jks導(dǎo)出一個(gè)cert-file
- 用ca-cert ca-key 給 cert-file 簽出一個(gè) cert-signed,
- ca-cert cert-signed都導(dǎo)入kafka.server.keystore.jks
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
原文鏈接
現(xiàn)在全部文件如下(忽略 auto-create-kafka-ssl-keys.sh 這個(gè)是自動(dòng)生成證書的腳本github連接?與 備份壓縮包demo.tar)
?編輯kafka服務(wù)配置文件 server.properties
advertised.listeners要和listeners對(duì)應(yīng),
advertised.listeners概述
?advertised.listeners不要綁定0.0.0.0端口,這里配置SSL為內(nèi)部訪問所以使用云服務(wù)器的hostname, brokers之間如何根據(jù)彼此的hostname來尋找呢?Linux可編輯/etc/hosts文件
末尾加上 <ip>:<port>添加dns映射 windows在C:\Windows\System32\drivers\etc目錄下找hosts文件
broker.id=1
############################# Socket Server Settings #############################
listeners=SSL://:9093,SASL_SSL://:9094
# 注意 這里SSL是做內(nèi)部brokers通信用的,外部暴露方式為SASL_SSL
advertised.listeners=SSL://Your-Host-name:9093,SASL_SSL://:9094
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=1.1.1.1:2182,2.2.2.2:2182,3.3.3.3:2182
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Kafka Security ###########################
ssl.endpoint.identification.algorithm=
security.inter.broker.protocol=SSL
ssl.client.auth=required
# ssl加密協(xié)議選擇
ssl.enabled.protocols=TLSv1.3,TLSv1.1,TLSv1
# Broker security settings
sasl.enabled.mechanisms=PLAIN
#ssl.truststore.password=123456
ssl.truststore.password=123456
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
啟動(dòng)zookeeper集群
編輯一個(gè)腳本分別啟動(dòng)每一個(gè)broker上的zookeeper??
kafka-zookeeper-quick-start.sh*
#!/bin/bash
# 讓jaas文件被zookeeper加載到運(yùn)行時(shí)環(huán)境
# KAFKA_OPTS為固定用法
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &
zookeeper集群?jiǎn)?dòng)結(jié)束
啟動(dòng)kafka集群?
編寫一個(gè)腳本啟動(dòng)每一個(gè)broker 優(yōu)先啟動(dòng)的broker會(huì)作為主節(jié)點(diǎn)
kafka-server-quick-start.sh
#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/kafka_server_jaas.conf
nohup /opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties > kafka-server-start.log 2>&1 &
#/opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties
?
?任意報(bào)錯(cuò)可以修改config目錄下的log4j.properties 將所有l(wèi)ogger設(shè)置成trace查看
?注意 設(shè)為trace之后 非kafka主節(jié)點(diǎn)會(huì)瘋狂滾動(dòng)一個(gè)controller就緒日志
?以下為主節(jié)點(diǎn)
??什么是kafka controller
?全局只有一個(gè)broker節(jié)點(diǎn)的controller會(huì)生效,暫不深究
?ssl 生效測(cè)試
本文Kafka配置為?TLSv1.3,TLSv1.1,TLSv1 可加入?TLSv1.2
具體協(xié)議版本會(huì)與jdk版本有關(guān)
openssl s_client --debug -connect <ip>:<port>?-tls1 次處 Verify return code: 0 代表最低版本tls協(xié)議生效
openssl s_client --debug -connect <ip>:<port>?-tls1_1
openssl s_client --debug -connect <ip>:<port>?-tls1_2
openssl s_client --debug -connect <ip>:<port>?-tls1_3
隨便登上一臺(tái)機(jī)器或者在開發(fā)本地在一個(gè)固定目錄下創(chuàng)建公用配置文件
client_security.properties
ssl.endpoint.identification.algorithm=
#security.protocol=SSL
security.protocol=SASL_SSL
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="client" \
password="client-sec";
編寫一個(gè)腳本創(chuàng)建topic
腳本內(nèi)容
./bin/kafka-topics.sh --create --topic demo-topic-1 --command-config /opt/kafka/kafka-server/config/client_security.properties --partitions 3 --replication-factor 3 --bootstreap-server your-hostname-1:9094,your-hostname-2:9094,your-hostname-3:9094
編寫一個(gè)腳本測(cè)試producer和consumer
腳本內(nèi)容
/opt/kafka/kafka-server/bin/kafka-console-producer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --producer.config /opt/kafka/kafka-server/config/client_security.properties
?啟動(dòng)consumer
?腳本內(nèi)容
#!/bin/bash
/opt/kafka/kafka-server/bin/kafka-console-consumer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --consumer.config /opt/kafka/kafka-server/config/client_security.properties --from-beginning
這里consumer 添加了 --from-beginning 選項(xiàng),會(huì)從頭讀取producer寫入的數(shù)據(jù)
spring-cloud-starter-bus-kafka 集成
使用spring-cloud-dependencies-2021.0.8 版本?spring-boot-dependencies-2.7.13
spring-cloud-starter-bus-kafka 包含兩個(gè)依賴
注意,本地生成keystore與truststore步驟與上面的生成步驟一致,需要把全局唯一的ca-cert ca-key拷貝到本地來生成keystore與truststore
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.2.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
<version>3.1.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
application配置
server.port=13001
server.servlet.context-path=/liquid/configs-dev
# Kafka
spring.kafka.bootstrap-servers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.security.protocol=SASL_SSL
spring.kafka.ssl.key-store-location=kafka.server.keystore.jks
spring.kafka.ssl.key-store-password=123456
spring.kafka.ssl.key-store-type=jks
spring.kafka.ssl.trust-store-location=kafka.server.truststore.jks
spring.kafka.ssl.trust-store-password=123456
spring.kafka.ssl.trust-store-type=jks
spring.kafka.retry.topic.attempts=3
# Kafka stream
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=123456
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.ssl.truststore.password=123456
spring.cloud.stream.kafka.binder.brokers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.streams.replication-factor=1
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.auto-create-topics=false
# spring cloud config
spring.cloud.config.server.git.uri=https://github.com/spring-cloud-samples/config-repo
創(chuàng)建一個(gè)Java Base Configuration
package com.liquid.config.center.configs;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.*;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class LiquidKafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
public String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_SSL");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=client" +
"password=client-sec;");
log.info(">>> Loading Kafka Admin With Jaas String end");
return new KafkaAdmin(configs);
}
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean("JaasLoginModuleConfiguration")
public JaasLoginModuleConfiguration creatStreamJaasLoginModule() {
Map<String, String> configs = new HashMap<>();
configs.put("security.protocol", "SASL_SSL");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=client" +
"password=client-sec;");
log.info(">>> Loading Kafka Admin with jaas string end");
JaasLoginModuleConfiguration jaasLoginModuleConfiguration = new JaasLoginModuleConfiguration();
jaasLoginModuleConfiguration.setOptions(configs);
return jaasLoginModuleConfiguration;
}
@Bean
@Primary
public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaBinderConfigurationProperties properties) {
String saslJaasConfigString = String.format("%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec");
Map<String, String> configMap = properties.getConfiguration();
configMap.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfigString);
return properties;
}
}
最終啟動(dòng)后 日志
?SSL handshake completed successfully with peerHost
文章來源:http://www.zghlxwxcb.cn/news/detail-763173.html
?至此 kafka內(nèi)部ssl 客戶端SASL_SSL認(rèn)證成功文章來源地址http://www.zghlxwxcb.cn/news/detail-763173.html
到了這里,關(guān)于[Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!