Kafka 監(jiān)聽器詳解
Kafka Assistant 是一款 Kafka GUI 管理工具——管理Broker,Topic,Group、查看消費(fèi)詳情、監(jiān)控服務(wù)器狀態(tài)、支持多種消息格式。
你需要將
advertised.listeners
(如果你使用Docker鏡像,則為
KAFKA_ADVERTISED_LISTENERS
)設(shè)置為外部地址(host/IP),以便客戶端可以正確地連接到它。否則,他們會(huì)嘗試連接到內(nèi)部主機(jī)地址–如果無法到達(dá),問題就會(huì)接踵而來。
換一種說法,由Spencer Ruport提供。
listeners
是Kafka所綁定的接口。advertised.listeners
是客戶端的連接方式。
在這篇文章中,我將談?wù)摓槭裁催@是有必要的配置 listeners
和 advertised.listeners
,然后展示如何基于幾個(gè)場景–Docker和AWS來做。
是誰在監(jiān)聽
Apache Kafka是一個(gè)分布式系統(tǒng)。數(shù)據(jù)是從一個(gè)給定的分區(qū)的領(lǐng)導(dǎo)者那里讀取和寫入的,這個(gè)領(lǐng)導(dǎo)者可以是集群中的任何一個(gè)Broker。當(dāng)一個(gè)客戶端(生產(chǎn)者/消費(fèi)者)啟動(dòng)時(shí),它將請求關(guān)于哪個(gè)broker是一個(gè)分區(qū)的領(lǐng)導(dǎo)者的元數(shù)據(jù)–它可以從任何broker那里做到這一點(diǎn)。返回的元數(shù)據(jù)將包括該分區(qū)的領(lǐng)導(dǎo)者broker的可用端點(diǎn),然后客戶端將使用這些端點(diǎn)連接到broker,根據(jù)需要讀/寫數(shù)據(jù)。
正是這些端點(diǎn)給人們帶來了麻煩。在單機(jī)上,運(yùn)行裸機(jī)(沒有虛擬機(jī),沒有Docker),可能只需要使用主機(jī)名(或只是localhost),這很容易。但是,一旦你進(jìn)入更復(fù)雜的網(wǎng)絡(luò)設(shè)置和多節(jié)點(diǎn),你就必須更加注意。
讓我們假設(shè)你有一個(gè)以上的網(wǎng)絡(luò)。比如:
- Docker內(nèi)部網(wǎng)絡(luò)()加上主機(jī)
- 云中的Broker(如AWS EC2)和本地的企業(yè)內(nèi)部機(jī)器(甚至在另一個(gè)云中)。
你需要告訴Kafka Broker如何相互聯(lián)系,但也要確保外部客戶端(生產(chǎn)者/消費(fèi)者)可以聯(lián)系到他們需要聯(lián)系的Broker。
最關(guān)鍵的是,當(dāng)你運(yùn)行一個(gè)客戶端時(shí),你傳遞給它的Broker只是它要去的地方,并從那里獲得集群中Broker的元數(shù)據(jù)。它為讀/寫數(shù)據(jù)而連接的實(shí)際主機(jī)和IP是基于Broker在初始連接中傳遞回來的數(shù)據(jù)–即使它只是一個(gè)單一的節(jié)點(diǎn),而且返回的Broker與它所連接的Broker相同。
為了正確配置,你需要了解Kafka Broker可以有多個(gè)監(jiān)聽器。一個(gè)監(jiān)聽器是一個(gè)組合:
- Host/IP
- Port
- Protocol
讓我們來看看一些配置。通常情況下,協(xié)議也被用于監(jiān)聽器的名稱,但在這里,讓我們通過使用監(jiān)聽器的抽象名稱來使它變得漂亮和清晰。
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
我使用的是Docker配置名稱–如果你直接配置 server.properties
(例如,在AWS等),其對應(yīng)名稱在以下列表中縮進(jìn)顯示。
-
KAFKA_LISTENERS
是一個(gè)以逗號分隔的監(jiān)聽器列表,以及Kafka綁定監(jiān)聽的主機(jī)/IP和端口。對于更復(fù)雜的網(wǎng)絡(luò),這可能是一個(gè)與機(jī)器上的特定網(wǎng)絡(luò)接口相關(guān)的IP地址。默認(rèn)是0.0.0.0,這意味著在所有接口上進(jìn)行監(jiān)聽。listeners
-
KAFKA_ADVERTISED_LISTENERS
是一個(gè)以逗號分隔的監(jiān)聽器列表,包括它們的主機(jī)/IP和端口。這是傳回給客戶端的元數(shù)據(jù)。advertised.listeners
-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
定義了每個(gè)監(jiān)聽器名稱要使用的安全協(xié)議的鍵/值對。listener.security.protocol.map
Kafka Broker之間的通信,通常是在內(nèi)部網(wǎng)絡(luò)(如Docker網(wǎng)絡(luò),AWS VPC等)。要定義使用哪個(gè)監(jiān)聽器,請指定 KAFKA_INTER_BROKER_LISTENER_NAME
(inter.broker.listener.name
)。使用的主機(jī)/IP必須是其他人可以從Broker機(jī)器上訪問的。
Kafka客戶端很可能不在Broker的網(wǎng)絡(luò)中,這就是額外監(jiān)聽器的作用。
每個(gè)監(jiān)聽器在被連接到時(shí),都會(huì)報(bào)告它可以被接連到的地址。你到達(dá)Broker的地址取決于所使用的網(wǎng)絡(luò)。如果你從內(nèi)部網(wǎng)絡(luò)連接到經(jīng)紀(jì)人,它將是一個(gè)不同于外部連接的主機(jī)/IP。
當(dāng)連接到Broker時(shí),返回給客戶端的監(jiān)聽器將是你連接到的監(jiān)聽器(基于端口)。
kafkacat是一個(gè)探索這個(gè)問題的有用工具。使用 -L
,你可以看到你所連接的監(jiān)聽器的元數(shù)據(jù)?;谏鲜鱿嗤谋O(jiān)聽器配置(LISTENER_BOB/LISTENER_FRED
),注意觀察返回的結(jié)果:
- 在9092端口(我們映射為
LISTENER_FRED
)進(jìn)行連接,Broker的地址被反饋為localhost
。
$ kafkacat -b kafka0:9092 \
-L
Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
1 brokers:
broker 0 at localhost:9092
- 在29092端口(我們將其映射為
LISTENER_BOB
)進(jìn)行連接,Broker的地址被反饋為kafka0。
$ kafkacat -b kafka0:29092 \
-L
Metadata for all topics (from broker 0: kafka0:29092/0):
1 brokers:
broker 0 at kafka0:29092
你也可以用tcpdump來檢查連接到Broker服務(wù)器的客戶端的流量,并發(fā)現(xiàn)從代理服務(wù)器返回的主機(jī)名。
為什么我可以連接到Broker,但客戶端仍然失?。?/h3>
即使你能與Broker建立初始連接,在元數(shù)據(jù)中返回的地址仍然可能是一個(gè)你的客戶端無法訪問的主機(jī)名。
讓我們一步一步來分析。
- 我們在AWS上有一個(gè)Broker。我們想從我們的筆記本電腦向它發(fā)送一個(gè)消息。我們知道EC2實(shí)例的外部主機(jī)名(ec2-54-191-84-122.us-west-2.compute.amazonaws.com)。我們已經(jīng)在安全組中創(chuàng)建了必要的條目,為我們的入站流量打開Broker的端口。保險(xiǎn)起見,還要檢查我們的本地機(jī)器是否可以連接到AWS實(shí)例上的端口。
$ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
found 0 associations
found 1 connections:
1: flags=82<CONNECTED,PREFERRED>
outif utun5
src 172.27.230.23 port 53352
dst 54.191.84.122 port 9092
rank info not available
TCP aux info available
我們連上了9092端口,繼續(xù)
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
這一步發(fā)生了什么:
- 我們的筆記本電腦成功地解析了
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
(到IP地址54.191.84.122
),并在9092端口連接到AWS的機(jī)器。 - Broker在端口9092上接收入站連接。它把元數(shù)據(jù)返回給客戶端,主機(jī)名是
ip-172-31-18-160.us-west-2.compute.internal
,因?yàn)檫@是Broker的主機(jī)名和監(jiān)聽器的默認(rèn)值。 - 然后,客戶端試圖使用它被賦予的元數(shù)據(jù)向代理發(fā)送數(shù)據(jù)。由于
ip-172-31-18-160.us-west-2.compute.internal
不能從互聯(lián)網(wǎng)上解析,所以它失敗了。
$ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
>>[2022-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
- 但是,如果我們從Broker所在的機(jī)器上嘗試同樣的事情。
$ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
>>
$ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
foo
它工作得很好! 這是因?yàn)槲覀冋谶B接到 9092 端口,它被配置為內(nèi)部監(jiān)聽器,因此報(bào)告其主機(jī)名為 ip-172-31-18-160.us-west-2.compute.internal
,這可以從Broker機(jī)器上解析(因?yàn)檫@是它自己的主機(jī)名?。?/p>
- 我們可以通過使用
kafkacat -L
標(biāo)志,看到Broker返回的元數(shù)據(jù)。
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
很明顯,內(nèi)部主機(jī)名被返回。這也使得這個(gè)看似混亂的錯(cuò)誤變得更有意義–連接到一個(gè)主機(jī)名,在另一個(gè)主機(jī)上得到一個(gè)查詢錯(cuò)誤。
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
這里,我們在本地機(jī)器上使用消費(fèi)者模式(-C)的kafkacat來嘗試從主題中讀取。和以前一樣,因?yàn)槲覀儚脑獢?shù)據(jù)中的Broker那里得到了內(nèi)部監(jiān)聽器的主機(jī)名,所以客戶端無法解析這個(gè)主機(jī)名來進(jìn)行讀/寫。
修改我的hosts文件
我看到一個(gè)Stack Overflow的答案,建議直接更新我的hosts文件…這不是更簡單嗎?
這可以解決問題,而不是真正修復(fù)它。
如果Broker服務(wù)器報(bào)告了一個(gè)客戶端無法連接的主機(jī)名,那么在本地 /etc/hosts
中硬編碼主機(jī)名/IP組合可能看起來是一個(gè)很好的修復(fù)。但這是一個(gè)非常脆弱的、手動(dòng)的解決方案。當(dāng)IP發(fā)生變化時(shí),當(dāng)你移動(dòng)主機(jī)而忘記帶配置hosts時(shí),以及當(dāng)其他人想做同樣的事情時(shí),會(huì)發(fā)生什么?
了解并實(shí)際修復(fù)你的網(wǎng)絡(luò)的 advertised.listeners
設(shè)置要好得多。
如何連接在Docker上的Kafka
為了在Docker中運(yùn)行,你需要為Kafka配置兩個(gè)監(jiān)聽器。
-
**Docker網(wǎng)絡(luò)內(nèi)的通信。**這可能是Broker之間的通信以及在Docker中運(yùn)行的其他組件之間的通信,如Kafka Connect或第三方客戶端或生產(chǎn)者。對于這些通信,我們需要使用Docker容器的主機(jī)名。同一Docker網(wǎng)絡(luò)上的每個(gè)Docker容器將使用Kafka Broker容器的主機(jī)名來到達(dá)它。
-
**非Docker網(wǎng)絡(luò)流量。**這可能是在Docker主機(jī)上本地運(yùn)行的客戶端,例如。假設(shè)他們將在localhost上連接到從Docker容器暴露出來的端口。這是Docker Compose的片段。
- Docker網(wǎng)絡(luò)中的客戶端使用監(jiān)聽器BOB進(jìn)行連接,端口為29092,主機(jī)名為kafka0。 這樣做,他們會(huì)得到要連接的主機(jī)名kafka0。每個(gè)Docker容器將使用Docker的內(nèi)部網(wǎng)絡(luò)來解析kafka0,并能夠到達(dá)Broker。
- Docker網(wǎng)絡(luò)外部(但都在同一個(gè)Host Machine里)的客戶端使用監(jiān)聽器FRED連接,端口為9092,主機(jī)名為localhost。9092端口是由Docker容器暴露的,因此可以連接到。當(dāng)客戶端連接時(shí),他們被賦予Broker元數(shù)據(jù)的主機(jī)名localhost,因此在讀/寫數(shù)據(jù)時(shí)連接到此。
- 上述配置無法處理Docker外部和主機(jī)外部的客戶端想要連接的情況。這是因?yàn)闊o論是kafka0(Docker內(nèi)部的主機(jī)名)還是localhost(Docker主機(jī)的環(huán)回地址)都是無法解析的。
如何連接到AWS/IaaS上的Kafka
我命名AWS是因?yàn)樗谴蠖鄶?shù)人使用的,但這適用于任何IaaS/云解決方案。
這里適用的概念與Docker完全相同。主要的區(qū)別是,在Docker中,外部連接很可能只是在localhost上(如上),而在云托管的Kafka中(如AWS上),外部連接將來自非本地的機(jī)器。
另一個(gè)復(fù)雜的問題是,雖然Docker網(wǎng)絡(luò)與主機(jī)的網(wǎng)絡(luò)嚴(yán)重隔離,但在IaaS上,外部主機(jī)名往往是可以在內(nèi)部解析的,這使得你可能真正遇到這些問題時(shí),一發(fā)不可收拾。
有兩種方法,取決于你要連接到Broker服務(wù)器的外部地址是否也可以在網(wǎng)絡(luò)(如VPC)上的所有Broker服務(wù)器上進(jìn)行本地解析。
選項(xiàng)1:外部地址是可以在本地解析的
在這里,你可以用一個(gè)監(jiān)聽器搞定?,F(xiàn)有的監(jiān)聽器叫 PLAINTEXT
,只需要設(shè)置 advertised.listeners
(即傳遞給入站客戶的那個(gè))。
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
現(xiàn)在內(nèi)部和外部的連接都將使用 ec2-54-191-84-122.us-west-2.compute.amazonaws.com
進(jìn)行連接。因?yàn)?ec2-54-191-84-122.us-west-2.com compute.amazonaws.com
既可以在本地也可以在外部解決,所以事情進(jìn)展順利。
選項(xiàng)2:外部地址在本地?zé)o法解析
你將需要為Kafka配置兩個(gè)監(jiān)聽器。
- **AWS網(wǎng)絡(luò)(VPC)內(nèi)的通信。**這可能是Broker之間的通信和在VPC中運(yùn)行的其他組件之間的通信,如Kafka Connect或第三方客戶端或生產(chǎn)商。對于這些通信,我們需要使用EC2機(jī)器的內(nèi)部IP(或主機(jī)名,如果配置了DNS)。
- **外部AWS流量。**這可能是測試來自筆記本電腦的連接,或者只是來自不在亞馬遜托管的機(jī)器。在這兩種情況下,需要使用實(shí)例的外部IP(或主機(jī)名,如果配置了DNS)。
下面是一個(gè)例子:文章來源:http://www.zghlxwxcb.cn/news/detail-734660.html
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
inter.broker.listener.name=INTERNAL
原文地址:文章來源地址http://www.zghlxwxcb.cn/news/detail-734660.html
- Kafka Listeners – Explained
到了這里,關(guān)于Kafka 監(jiān)聽器詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!