国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

[Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka

這篇具有很好參考價(jià)值的文章主要介紹了[Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

Kafka 集群配置

準(zhǔn)備

配置流程

Jaas(Java Authentication and Authorization Service?)文件

zookeeper 配置文件

SSL自簽名

啟動(dòng)zookeeper集群

啟動(dòng)kafka集群?

spring-cloud-starter-bus-kafka 集成


Kafka 集群配置

準(zhǔn)備

下載統(tǒng)一版本Kafka服務(wù)包至三臺(tái)不同的服務(wù)器上

文章使用版本為?kafka_2.13-3.5.0.tgz 下載地址

jdk版本Adopt JDK-17?OpenJDK17U-jdk_x64_linux_hotspot_17.0.7_7.tar.gz 下載地址

配置流程

Jaas(Java Authentication and Authorization Service?)文件

? ? ? ? 在kafka包解壓目錄下的?config?目錄下新建zookeeper認(rèn)證所需jaas文件,文件名隨意,以?.conf?結(jié)尾即可

? ? ? ? 文件內(nèi)容如下

? ? ? ? user_{username}為固定寫法 {username} 為用戶名 密碼為雙引號(hào)內(nèi)容

? ? ? ? 注意,這里zookeeper的jaas有三個(gè)用戶名和密碼分別對(duì)應(yīng)著三臺(tái)kafka broker去認(rèn)證時(shí)使用的用戶名和密碼,每一臺(tái)上的zookeeper的jaas文件內(nèi)容建議完全相同

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_super="super-sec"
       user_kafkabroker1="kafkabroker1-sec"
       user_kafkabroker2="kafkabroker2-sec"
       user_kafkabroker3="kafkabroker3-sec";
};

? ? ? ? 在相同目錄下建立kafka所需認(rèn)證jaas文件

? ? ? ? 以下是三臺(tái)服務(wù)器中其中一臺(tái)的kafka jaas認(rèn)證文件內(nèi)容,Client內(nèi)容為本臺(tái)機(jī)器上的broker認(rèn)證本臺(tái)機(jī)器上的zookeeper的用戶名和密碼 ( 注意最后一行和倒數(shù)第二行需要有分號(hào)??!?) KafkaServer端有一對(duì) username="kbroker1"? password="kbroker1-sec"內(nèi)部brokers之間進(jìn)行認(rèn)證所用賬號(hào)密碼但是本文內(nèi)部broker配置為ssl鏈接,去掉應(yīng)該也沒事若不同則加一下

????????當(dāng)然每個(gè)broker的KafkaServer段也需要有定義這個(gè)用戶名和密碼( 對(duì)應(yīng)??user_kbroker1="kbroker1-sec" )???user_client="client-sec"?外部客戶端認(rèn)證時(shí)所需用戶名密碼

這里為了方便,全部brokers共享一個(gè)賬號(hào),客戶端user_client(也就是連接Kakfa時(shí)的producer、consumer或者編程語(yǔ)言SDK讀取或配置客戶端jaas文件時(shí))也為統(tǒng)一用戶名密碼

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker3"
   password="kafkabroker3-sec";
};

另外兩臺(tái)用戶名密碼如下

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker1"
   password="kafkabroker1-sec";
};
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker2"
   password="kafkabroker2-sec";
};

zookeeper 配置文件

同樣是在config目錄下編輯zookeeper.properties文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/zookeeper-dir
dataLogDir=/opt/kafka/zookeeper-log
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
# 一個(gè)ip最多可以對(duì)這個(gè)zookeeper服務(wù)進(jìn)行連接的數(shù)量
maxClientCnxns=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=1.1.1.1:2182:1999
server.2=2.2.2.2:2182:1999
server.3=3.3.3.3:2182:1999

# security
# 開啟zookeeper sasl認(rèn)證必須配置
authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
maxClientCnxns=5
# 這里可以設(shè)置為false 同樣設(shè)置zookeeper jaas認(rèn)證也無效了
sessionRequireClientSASLAuth=true
jaasLoginRenew=360000000

注意 clientPort與 <外網(wǎng)ip>:<內(nèi)部互聯(lián)連端口>:<選舉專用端口> 這些端口要區(qū)分開來 不然zookeeper服務(wù)啟動(dòng)會(huì)報(bào)錯(cuò),三臺(tái)配置基本一直

注意 server.<int>=<外網(wǎng)ip> 若是連接本機(jī)有問題,可以將<外網(wǎng)ip>換成0.0.0.0

zookeeper啟動(dòng)腳本如下

#!/bin/bash

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &

使用export 導(dǎo)出KAFKA_OPTS中的變量,讓zookeeper啟動(dòng)時(shí)加載jaas認(rèn)證文件,參數(shù)key為

-Djava.security.auth.login.config

nohup 可以讓zookeeper在后臺(tái)運(yùn)行,不占用終端,滾動(dòng)日志可以在?kafka-zookeeper-start.log 文件查看,若想滾動(dòng)查看日志可以用

tail -f?kafka-zookeeper-start.log

SSL自簽名

? ? 這個(gè)地方卡住很久,遇到了些bug都是關(guān)于自簽證書的問題,文章用下圖說明SSL證書在kafka中的使用流程 每個(gè)keystore包含數(shù)字簽名和證書

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

????????每臺(tái)機(jī)器上都應(yīng)該有自己的keystore與truststore文件,證書可以每臺(tái)上都使用openssl生成一張,但是,要把每臺(tái)機(jī)器上的證書必須互相導(dǎo)入到其他borkers的truststore與keystore中,而且每臺(tái)機(jī)器上的keystore還需要多次導(dǎo)入所有證書簽名之后生成的證書與數(shù)字簽名。不然在brokers互相創(chuàng)建SSL隧道時(shí)會(huì)有各種問題,例如下圖,將broker1機(jī)器上生成的kafka.client.truststore.jks直接 scp 傳輸?shù)降絙roker2 后使用,broker1 與 broker2建立SSL隧道時(shí),kafka config 目錄下log4j.properties修改TRACE級(jí)別日志記錄如下

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?參考

原文連接

參考連接2

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?很多網(wǎng)站上面只是做單臺(tái)機(jī)器或者單個(gè)證書的全部生成過程,這里記錄下自己的創(chuàng)建流程

?注意,全局只生成了一次CA ,僅包含一個(gè) ca-cert 與?ca-key

參考如下

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?部署SSL 創(chuàng)建密鑰與證書,創(chuàng)建自簽名的頒發(fā)機(jī)構(gòu),證書簽名

?文件含義

?keystore 可以存儲(chǔ)私鑰、證書和對(duì)稱密鑰的存儲(chǔ)庫(kù)。
?引用stackoverflow的回答

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?ca-cert 具體證書

?ca-key 證書私鑰

?ca-password 頒發(fā)機(jī)構(gòu)密鑰

?cert-file 導(dǎo)出未簽名的證書文件

?cert-signed 帶有數(shù)字簽名的證書

?首先在每個(gè)機(jī)器上面都要?jiǎng)?chuàng)建keystore密鑰庫(kù)

keytool命令無效可以去JAVA_HOME/bin目錄下找

SSL hostname校驗(yàn)可通過兩種方式配置

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

在kafka的配置文件中添加以下配置取消校驗(yàn)

ssl.endpoint.identification.algorithm=

或配置CN與SAN分別為hostname與FQDN什么是FQDN

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?文章采用前者,忽略SSL對(duì)hostname的認(rèn)證并按照SAN格式創(chuàng)建keystore

keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}

? ? ? ?-alias 后面用 hostname,localhost與hostname都可以

? ? ? ? {validity} 為過期時(shí)間 自簽可以長(zhǎng)一點(diǎn) 例 9999

????????{keystore-pass} 與?{key-pass} 為密碼,建議設(shè)為同一個(gè)值

????????-ext SAN=DNS:{hostname} 注意,必須為hostname (終端 鍵入hostname查看)

?這里my-host-name可用localhost代替? 或者用VPS云服務(wù)器專屬的hostnamekafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?????????kafka.ser.keystore.jks生成結(jié)束

? ? ? ?創(chuàng)建CA證書

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

CN也要寫hostname或直接用localhost

此時(shí)一共生成三個(gè)文件

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

注意!copy ca-cert 與 ca-key 文件到所有kafka broker機(jī)器上,(若是想在其他機(jī)器上連接也要把這兩個(gè)文件拷貝過去,例如本地開發(fā)集成spring boot時(shí)),并放在固定位置?

ca-cert 與 ca-key 代表一張CA

導(dǎo)入 ca-cert 到所有brokers的kafka.server.truststore.jks中,終端交互輸入 yes

keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert

原文中多一部將kafka.server.keystore.jks密鑰庫(kù)的證書導(dǎo)出才簽名

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

證書簽名?

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456

此時(shí)一共生成六個(gè)文件

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

向密鑰庫(kù) kafka.server.keystore.jks 導(dǎo)入證書與數(shù)字簽名

keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

查看kafka.server.keystore.jks包含的內(nèi)容

keytool --list -v -keystore? kafka.server.keystore.jks

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?全部命令如下,思路就是全局生成一張CA (包含ca-cert ca-key)

????????

  • 每臺(tái)機(jī)器生成
    • kafka.server.keystore.jks
  • ca-cert導(dǎo)入到每一個(gè) kafka.server.truststore.jks(ca-cert導(dǎo)完了就生成) 的CAroot中
  • 每一個(gè)kafka.server.keystore.jks導(dǎo)出一個(gè)cert-file
  • 用ca-cert ca-key 給 cert-file 簽出一個(gè) cert-signed,
  • ca-cert cert-signed都導(dǎo)入kafka.server.keystore.jks
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

原文鏈接

現(xiàn)在全部文件如下(忽略 auto-create-kafka-ssl-keys.sh 這個(gè)是自動(dòng)生成證書的腳本github連接?與 備份壓縮包demo.tar)

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?編輯kafka服務(wù)配置文件 server.properties

advertised.listeners要和listeners對(duì)應(yīng),

advertised.listeners概述

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?advertised.listeners不要綁定0.0.0.0端口,這里配置SSL為內(nèi)部訪問所以使用云服務(wù)器的hostname, brokers之間如何根據(jù)彼此的hostname來尋找呢?Linux可編輯/etc/hosts文件

末尾加上 <ip>:<port>添加dns映射 windows在C:\Windows\System32\drivers\etc目錄下找hosts文件


broker.id=1

############################# Socket Server Settings #############################

listeners=SSL://:9093,SASL_SSL://:9094
# 注意 這里SSL是做內(nèi)部brokers通信用的,外部暴露方式為SASL_SSL
advertised.listeners=SSL://Your-Host-name:9093,SASL_SSL://:9094


log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=1.1.1.1:2182,2.2.2.2:2182,3.3.3.3:2182
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Kafka Security ###########################
ssl.endpoint.identification.algorithm=
security.inter.broker.protocol=SSL
ssl.client.auth=required
# ssl加密協(xié)議選擇
ssl.enabled.protocols=TLSv1.3,TLSv1.1,TLSv1
# Broker security settings
sasl.enabled.mechanisms=PLAIN
#ssl.truststore.password=123456
ssl.truststore.password=123456
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

啟動(dòng)zookeeper集群

編輯一個(gè)腳本分別啟動(dòng)每一個(gè)broker上的zookeeper??

kafka-zookeeper-quick-start.sh*

#!/bin/bash
# 讓jaas文件被zookeeper加載到運(yùn)行時(shí)環(huán)境
# KAFKA_OPTS為固定用法
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

zookeeper集群?jiǎn)?dòng)結(jié)束

啟動(dòng)kafka集群?

編寫一個(gè)腳本啟動(dòng)每一個(gè)broker 優(yōu)先啟動(dòng)的broker會(huì)作為主節(jié)點(diǎn)

kafka-server-quick-start.sh

#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/kafka_server_jaas.conf
nohup /opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties > kafka-server-start.log 2>&1 &
#/opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties

?kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?任意報(bào)錯(cuò)可以修改config目錄下的log4j.properties 將所有l(wèi)ogger設(shè)置成trace查看

?注意 設(shè)為trace之后 非kafka主節(jié)點(diǎn)會(huì)瘋狂滾動(dòng)一個(gè)controller就緒日志

?以下為主節(jié)點(diǎn)

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

??什么是kafka controllerkafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?全局只有一個(gè)broker節(jié)點(diǎn)的controller會(huì)生效,暫不深究

?ssl 生效測(cè)試

本文Kafka配置為?TLSv1.3,TLSv1.1,TLSv1 可加入?TLSv1.2

具體協(xié)議版本會(huì)與jdk版本有關(guān)

openssl s_client --debug -connect <ip>:<port>?-tls1 次處 Verify return code: 0 代表最低版本tls協(xié)議生效

openssl s_client --debug -connect <ip>:<port>?-tls1_1

openssl s_client --debug -connect <ip>:<port>?-tls1_2

openssl s_client --debug -connect <ip>:<port>?-tls1_3

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

隨便登上一臺(tái)機(jī)器或者在開發(fā)本地在一個(gè)固定目錄下創(chuàng)建公用配置文件

client_security.properties

ssl.endpoint.identification.algorithm=
#security.protocol=SSL
security.protocol=SASL_SSL
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-sec";

編寫一個(gè)腳本創(chuàng)建topic

腳本內(nèi)容

./bin/kafka-topics.sh --create --topic demo-topic-1 --command-config /opt/kafka/kafka-server/config/client_security.properties --partitions 3 --replication-factor 3 --bootstreap-server your-hostname-1:9094,your-hostname-2:9094,your-hostname-3:9094

編寫一個(gè)腳本測(cè)試producer和consumer

腳本內(nèi)容

/opt/kafka/kafka-server/bin/kafka-console-producer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --producer.config /opt/kafka/kafka-server/config/client_security.properties

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?啟動(dòng)consumer

?腳本內(nèi)容

#!/bin/bash
/opt/kafka/kafka-server/bin/kafka-console-consumer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --consumer.config /opt/kafka/kafka-server/config/client_security.properties --from-beginning

這里consumer 添加了 --from-beginning 選項(xiàng),會(huì)從頭讀取producer寫入的數(shù)據(jù)

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

spring-cloud-starter-bus-kafka 集成

使用spring-cloud-dependencies-2021.0.8 版本?spring-boot-dependencies-2.7.13

spring-cloud-starter-bus-kafka 包含兩個(gè)依賴

注意,本地生成keystore與truststore步驟與上面的生成步驟一致,需要把全局唯一的ca-cert ca-key拷貝到本地來生成keystore與truststore

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      <version>3.2.4</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-bus</artifactId>
      <version>3.1.2</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

application配置

server.port=13001
server.servlet.context-path=/liquid/configs-dev

# Kafka
spring.kafka.bootstrap-servers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.security.protocol=SASL_SSL
spring.kafka.ssl.key-store-location=kafka.server.keystore.jks
spring.kafka.ssl.key-store-password=123456
spring.kafka.ssl.key-store-type=jks
spring.kafka.ssl.trust-store-location=kafka.server.truststore.jks
spring.kafka.ssl.trust-store-password=123456
spring.kafka.ssl.trust-store-type=jks
spring.kafka.retry.topic.attempts=3


# Kafka stream
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=123456
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.ssl.truststore.password=123456
spring.cloud.stream.kafka.binder.brokers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.streams.replication-factor=1
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.auto-create-topics=false
# spring cloud config
spring.cloud.config.server.git.uri=https://github.com/spring-cloud-samples/config-repo

創(chuàng)建一個(gè)Java Base Configuration

package com.liquid.config.center.configs;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.*;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class LiquidKafkaConfiguration {


    @Value("${spring.kafka.bootstrap-servers}")
    public String bootstrapServers;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        configs.put("security.protocol", "SASL_SSL");
        configs.put("sasl.mechanism", "PLAIN");
        configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=client" +
                "password=client-sec;");
        log.info(">>> Loading Kafka Admin With Jaas String end");
        return new KafkaAdmin(configs);
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
        ));

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
        ));

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean("JaasLoginModuleConfiguration")
    public JaasLoginModuleConfiguration creatStreamJaasLoginModule() {
        Map<String, String> configs = new HashMap<>();
        configs.put("security.protocol", "SASL_SSL");
        configs.put("sasl.mechanism", "PLAIN");
        configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=client" +
                "password=client-sec;");
        log.info(">>> Loading Kafka Admin with jaas string end");
        JaasLoginModuleConfiguration jaasLoginModuleConfiguration = new JaasLoginModuleConfiguration();
        jaasLoginModuleConfiguration.setOptions(configs);
        return jaasLoginModuleConfiguration;
    }

    @Bean
    @Primary
    public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaBinderConfigurationProperties properties) {
        String saslJaasConfigString = String.format("%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec");
        Map<String, String> configMap = properties.getConfiguration();
        configMap.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfigString);
        return properties;
    }

}

最終啟動(dòng)后 日志

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?SSL handshake completed successfully with peerHost

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

kafka_2.13-3.5.0.tgz下載,kafka,分布式,spring cloud,zookeeper

?至此 kafka內(nèi)部ssl 客戶端SASL_SSL認(rèn)證成功文章來源地址http://www.zghlxwxcb.cn/news/detail-763173.html

到了這里,關(guān)于[Kafka集群] 配置支持Brokers內(nèi)部SSL認(rèn)證\外部客戶端支持SASL_SSL認(rèn)證并集成spring-cloud-starter-bus-kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka開啟SSL認(rèn)證(包括內(nèi)置zookeeper開啟SSL)

    zookeeper和kafka的SSL開啟都可單獨(dú)進(jìn)行 使用jre自帶的keytool工具生成,linux和windows下生成的證書可以通用 生成含有一個(gè)私鑰的keystore文件,有效期10年(本文證書密碼統(tǒng)一使用test123) keytool -genkeypair -alias certificatekey -dname “CN=127.0.0.1, OU=127.0.0.1, O=127.0.0.1, L=SH, ST=SH, C=CN” -keyalg RS

    2024年01月17日
    瀏覽(17)
  • JAVA連接Kafka及SSL認(rèn)證

    1、Maven驅(qū)動(dòng)(注意一定要對(duì)應(yīng)自己的Kafka版本) 2、生產(chǎn)者生產(chǎn)數(shù)據(jù) 2.1 普通方式創(chuàng)建Producer 2.2 ssl加密和認(rèn)證創(chuàng)建Producer(Plain) 2.3 ssl加密和認(rèn)證創(chuàng)建Producer(Plain使用配置文件方式) kafka_client_jaas_plain配置文件信息: 具體代碼實(shí)現(xiàn): 2.4 ssl加密和認(rèn)證創(chuàng)建Producer(Scram) 2.5 ssl加密和認(rèn)證創(chuàng)

    2024年02月12日
    瀏覽(17)
  • kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息隊(duì)列有兩種消費(fèi)模式,分別是 點(diǎn)對(duì)點(diǎn)模式 和 訂閱/發(fā)布模式 。具體比較可以參考Kafka基礎(chǔ)–消息隊(duì)列與消費(fèi)模式。 下圖是一個(gè) 點(diǎn)對(duì)點(diǎn) 的Kafka結(jié)構(gòu)示意圖,其中有以下幾個(gè)部分: producer:消息生產(chǎn)者 consumer:消息消費(fèi)者 Topic:消息主題 partition:主題內(nèi)分區(qū) Brokers:消

    2024年02月04日
    瀏覽(39)
  • 【kafka+Kraft模式集群+SASL安全認(rèn)證】

    【kafka+Kraft模式集群+SASL安全認(rèn)證】

    準(zhǔn)備3個(gè)kafka,我這里用的kafka版本為:kafka_2.13-3.6.0,下載后解壓: 更改解壓后的文件名稱: cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3 分別得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3 copy一份config/kraft/server.properties配置文件,修改名稱 server-sasl.properties 進(jìn)入各個(gè)config/kraft/server

    2024年02月03日
    瀏覽(27)
  • nginx配置ssl的坑(TLSv1.3\ngx_http_ssl_module)

    查看openssl版本openssl version,一般騰訊云為1.0.2k版本。 到官網(wǎng) www.openssl.org 查看最新版本openssl,現(xiàn)在最新為1.1.1h版。 下載nginx 重新make下nginx,最后openssl配置上面升級(jí)后的 完成后執(zhí)行 make 命令,make失敗執(zhí)行 make clean后重新make 配置后的nginx.config 檢查配置 nginx -t 啟動(dòng)nginx ./ngin

    2024年02月16日
    瀏覽(22)
  • 配置https---Nginx認(rèn)證ssl證書

    配置https---Nginx認(rèn)證ssl證書

    nginx作為前端的負(fù)載均衡服務(wù)器已經(jīng)很熟悉了,項(xiàng)目需要使用https安全的時(shí)候就需要認(rèn)證證書了 dockerweb管理工具 Portainer 如果對(duì)docker不那么熟悉可以使用docker 第三方管理端 然后訪問本地9000端口,登錄后可以管理容器鏡像 有了該工具可以直接進(jìn)入容器查看日志等操作 nginx環(huán)境安裝

    2024年01月19日
    瀏覽(25)
  • 《Kafka系列》Offset Explorer連接Kafka問題集合,Timeout expired while.. topic metadata,Uable to find any brokers

    《Kafka系列》Offset Explorer連接Kafka問題集合,Timeout expired while.. topic metadata,Uable to find any brokers

    1.創(chuàng)建語(yǔ)句如下所示,按照習(xí)慣在添加zookeeper參數(shù)的時(shí)候,指定了 zxy:2181/kafka ,但是卻創(chuàng)建失敗, Error while executing topic command : Replication factor: 1 larger than available brokers: 0. 2.檢查各個(gè)broker的server.properties文件 發(fā)現(xiàn)在配置參數(shù)的時(shí)候, zookeeper.connect 指定的是 zxy:2181,zxy:2182,zxy:21

    2024年02月06日
    瀏覽(23)
  • 達(dá)夢(mèng)數(shù)據(jù)庫(kù)配置SSL認(rèn)證加密

    達(dá)夢(mèng)數(shù)據(jù)庫(kù)配置SSL認(rèn)證加密

    OS Version:Kylin Linux Advanced Server release V10 (SP1) /(Tercel)-x86_64-Build19/20210319 DB Version:DM V8 1-2-18-21.06.24-142387-10013-ENT Pack4 OpenSSL:OpenSSL 1.1.1f JAVA:openjdk version “1.8.0_242” 64bit 參考手冊(cè):《DM8_DISQL使用手冊(cè)》《DM8安全管理》《DM8程序員手冊(cè)》 DM8 產(chǎn)品手冊(cè) | 達(dá)夢(mèng)技術(shù)社區(qū) 1、配置ope

    2023年04月08日
    瀏覽(24)
  • emqx 配置ssl/tls 雙向認(rèn)證(親自測(cè)試有效)

    emqx 配置ssl/tls 雙向認(rèn)證(親自測(cè)試有效)

    bash腳本,生成自簽名ca、服務(wù)端、客戶端的key和證書 openssl.cnf配置文件 驗(yàn)證證書是否有效 將證書文件拷貝到emqxetccerts目錄下(默認(rèn)目錄),并修改配置文件emqx.conf。SSL/TLS 雙向連接的啟用及驗(yàn)證 mqttx連接驗(yàn)證 出現(xiàn)連接成功,代表測(cè)試無問題 ?

    2024年03月11日
    瀏覽(24)
  • 【kafka-ui】支持kafka with kraft的可視化集群管理工具

    【kafka-ui】支持kafka with kraft的可視化集群管理工具

    本文在kafka3.3.1版本基礎(chǔ)上進(jìn)行測(cè)試 在早期使用kafka的時(shí)候一般使用Kafka Tool或者kafka eagle,前者為桌面軟件,后者為瀏覽器軟件。總體來說體驗(yàn)一般,但是還比較夠用。 但是從kafka3.3.1開始,已經(jīng)正式拋棄zookeeper使用自己的仲裁器了,但是上述兩種kafka可視化工具的更新好像并

    2024年02月02日
    瀏覽(295)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包