kafka最先是作為日志數(shù)據(jù)采集,后用于消息傳遞,kafka能承擔(dān)tb級(jí)別數(shù)據(jù)存儲(chǔ),確保服務(wù)的可用性,允許少量數(shù)據(jù)的丟失
作為消息中間件就有異步、解耦、削峰三個(gè)作用
一、單機(jī)搭建
單機(jī)ip:192.168.64.133
下載地址:Apache Kafka?選擇kafka_2.13-3.4.0.tgz進(jìn)行下載
關(guān)于kafka的版本,前面的2.13是開發(fā)kafka的scala語(yǔ)言的版本,后面的3.4.0是kafka應(yīng)用的版本。
下載Zookeeper,下載地址?Apache ZooKeeper?,kafka有內(nèi)置的zookeeper,Zookeeper的版本并沒(méi)有強(qiáng)制要求,這里我們選擇比較新的3.6.1版本。
#下載解壓
cd /usr/local/kafka
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -zxvf kafka_2.13-3.4.0.tgz
?1、啟動(dòng)Kafka之前需要先啟動(dòng)Zookeeper。**這里就用Kafka自帶的Zookeeper。啟動(dòng)腳本在bin目錄下。
cd kafka_2.13-3.4.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
? 從nohup.out中可以看到zookeeper默認(rèn)會(huì)在2181端口啟動(dòng)。通過(guò)jps指令看到一個(gè)QuorumPeerMain進(jìn)程,確定服務(wù)啟動(dòng)成功。?
?2、啟動(dòng)Kafka。
nohup bin/kafka-server-start.sh config/server.properties &
啟動(dòng)完成后,使用jps指令,看到一個(gè)kafka進(jìn)程,確定服務(wù)啟動(dòng)成功。服務(wù)會(huì)默認(rèn)在9092端口啟動(dòng)。
3、簡(jiǎn)單收發(fā)消息
? Kafka的基礎(chǔ)工作機(jī)制是消息發(fā)送者可以將消息發(fā)送到kafka上指定的topic,而消息消費(fèi)者,可以從指定的topic上消費(fèi)消息。
??首先,可以使用Kafka提供的客戶端腳本創(chuàng)建Topic
#查看幫助命令
bin/kafka-topics.sh --help
#創(chuàng)建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
#查看Topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
?然后,啟動(dòng)一個(gè)消息發(fā)送者端。往一個(gè)名為test的Topic發(fā)送消息。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
當(dāng)命令行出現(xiàn) > 符號(hào)后,隨意輸入一些字符然后enter。Ctrl+C 退出命令行。這樣就完成了往kafka發(fā)消息的操作。
??然后啟動(dòng)一個(gè)消息消費(fèi)端,從名為test的Topic上接收消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
4、其他消費(fèi)模式
? 之前我們通過(guò)kafka提供的生產(chǎn)者和消費(fèi)者腳本,啟動(dòng)了一個(gè)簡(jiǎn)單的消息生產(chǎn)者以及消息消費(fèi)者,實(shí)際上,kafka還提供了豐富的消息消費(fèi)方式。
指定消費(fèi)進(jìn)度
? 通過(guò)kafka-console.consumer.sh啟動(dòng)的控制臺(tái)消費(fèi)者,會(huì)將獲取到的內(nèi)容在命令行中輸出。如果想要消費(fèi)之前發(fā)送的消息,可以通過(guò)添加--from-begining參數(shù)指定。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
如果需要更精確的消費(fèi)消息,甚至可以指定從哪一條消息開始消費(fèi)。?
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 4 --topic test
這表示從第0號(hào)Partition上的第四個(gè)消息開始讀起。Partition和Offset是什么呢,可以用以下指令查看。
分組消費(fèi)
對(duì)于每個(gè)消費(fèi)者,可以指定一個(gè)消費(fèi)者組。kafka中的同一條消息,只能被同一個(gè)消費(fèi)者組下的某一個(gè)消費(fèi)者消費(fèi)。而不屬于同一個(gè)消費(fèi)者組的其他消費(fèi)者,也可以消費(fèi)到這一條消息。在kafka-console-consumer.sh腳本中,可以通過(guò)--consumer-property group.id=testGroup來(lái)指定所屬的消費(fèi)者組。例如,可以啟動(dòng)三個(gè)消費(fèi)者組,來(lái)驗(yàn)證一下分組消費(fèi)機(jī)制:?
#兩個(gè)消費(fèi)者實(shí)例屬于同一個(gè)消費(fèi)者組
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
#這個(gè)消費(fèi)者實(shí)例屬于不同的消費(fèi)者組
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup2 --topic test
查看消費(fèi)者組的偏移量
? 接下來(lái),還可以使用kafka-consumer-groups.sh觀測(cè)消費(fèi)者組的情況。包括他們的消費(fèi)進(jìn)度。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
二、集群搭建
集群就為了保證服務(wù)的高可用和數(shù)據(jù)的安全性,這就是我的理解,萬(wàn)一掛了其它節(jié)點(diǎn)依舊可以對(duì)外提供服務(wù)
1、部署zookeeper集群
#下載解壓
cd /usr/local/zookeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#配置
cd /usr/local/zookeeper/apache-zookeeper-3.6.1-bin/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
#zoo.cfg文件中修改如下配置
#Zookeeper的本地?cái)?shù)據(jù)目錄,默認(rèn)是/tmp/zookeeper。這是Linux的臨時(shí)目錄,隨時(shí)會(huì)被刪掉。
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
#Zookeeper的服務(wù)端口
clientPort=2181
#集群節(jié)點(diǎn)配置
server.1=192.168.64.133:2888:3888
server.2=192.168.64.134:2888:3888
server.3=192.168.64.128:2888:3888
#切換到dataDir目錄下
vim myid
#myid文件內(nèi)容:比如當(dāng)前節(jié)點(diǎn)是server.x,那么內(nèi)容就是x
其中,clientPort 2181是對(duì)客戶端開放的服務(wù)端口。
集群配置部分, server.x這個(gè)x就是節(jié)點(diǎn)在集群中的myid。后面的2888端口是集群內(nèi)部數(shù)據(jù)傳輸使用的端口。3888是集群內(nèi)部進(jìn)行選舉使用的端口。
? 接下來(lái)將整個(gè)Zookeeper的應(yīng)用目錄分發(fā)到另外兩臺(tái)機(jī)器上。就可以在三臺(tái)機(jī)器上都啟動(dòng)Zookeeper服務(wù)了。
#切換到zookeeper的home目錄下,運(yùn)行如下命令啟動(dòng)
bin/zkServer.sh --config conf start
啟動(dòng)完成后,使用jps指令可以看到一個(gè)QuorumPeerMain進(jìn)程就表示服務(wù)啟動(dòng)成功。
? 三臺(tái)機(jī)器都啟動(dòng)完成后,可以查看下集群狀態(tài)。
root@gaorufeng20221210:/usr/local/zookeeper/apache-zookeeper-3.6.1-bin# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.6.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
root@gaorufeng20221210:/usr/local/zookeeper/apache-zookeeper-3.6.1-bin#
這其中Mode 為leader就是主節(jié)點(diǎn),follower就是從節(jié)點(diǎn)。
2、部署kafka集群
? kafka服務(wù)并不需要進(jìn)行選舉,因此也沒(méi)有奇數(shù)臺(tái)服務(wù)的建議。
? 部署Kafka的方式跟部署Zookeeper差不多,就是解壓、配置、啟服務(wù)三板斧。
? 首先將Kafka解壓到/usr/local/kafka目錄下。
? 然后進(jìn)入config目錄,修改server.properties。這個(gè)配置文件里面的配置項(xiàng)非常多,下面列出幾個(gè)要重點(diǎn)關(guān)注的配置。
#broker 的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=0
#監(jiān)聽(tīng)端口,要么就配置hosts文件
listeners=PLAINTEXT://192.168.64.133:9092
#數(shù)據(jù)文件地址。同樣默認(rèn)是給的/tmp目錄。
log.dirs=/usr/local/kafka/kafka-logs
#默認(rèn)的每個(gè)Topic的分區(qū)數(shù)
num.partitions=1
#zookeeper的服務(wù)地址
zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181
#可以選擇指定zookeeper上的基礎(chǔ)節(jié)點(diǎn)。
#zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181/kafka
broker.id需要每個(gè)服務(wù)器上不一樣,分發(fā)到其他服務(wù)器上時(shí),要注意修改一下。
多個(gè)Kafka服務(wù)注冊(cè)到同一個(gè)zookeeper集群上的節(jié)點(diǎn),會(huì)自動(dòng)組成集群。
配置文件中的注釋非常細(xì)致,可以關(guān)注一下。下面是server.properties文件中比較重要的核心配置
Property Default Description broker.id 0 broker的“名字”,你可以選擇任意你喜歡的數(shù)字作為id,只要id是唯每個(gè)broker都可以用一個(gè)唯一的非負(fù)整數(shù)id進(jìn)行標(biāo)識(shí);這個(gè)id可以作為一的即可。 log.dirs /tmp/kafka-logs kafka存放數(shù)據(jù)的路徑。這個(gè)路徑并不是唯一的,可以是多個(gè),路徑之間只需要使用逗號(hào)分隔即可;每當(dāng)創(chuàng)建新partition時(shí),都會(huì)選擇在包含最少partitions的路徑下進(jìn)行。 listeners PLAINTEXT://127.0.0.1:9092 server接受客戶端連接的端口,ip配置kafka本機(jī)ip即可 zookeeper.connect localhost:2181 zookeeper連接地址。hostname:port。如果是Zookeeper集群,用逗號(hào)連接。 log.retention.hours 168 每個(gè)日志文件刪除之前保存的時(shí)間。 num.partitions 1 創(chuàng)建topic的默認(rèn)分區(qū)數(shù) default.replication.factor 1 自動(dòng)創(chuàng)建topic的默認(rèn)副本數(shù)量 min.insync.replicas 1 當(dāng)producer設(shè)置acks為-1時(shí),min.insync.replicas指定replicas的最小數(shù)目(必須確認(rèn)每一個(gè)repica的寫數(shù)據(jù)都是成功的),如果這個(gè)數(shù)目沒(méi)有達(dá)到,producer發(fā)送消息會(huì)產(chǎn)生異常 delete.topic.enable false 是否允許刪除主題
? 接下來(lái)就可以啟動(dòng)kafka服務(wù)了。啟動(dòng)服務(wù)時(shí)需要指定配置文件。
bin/kafka-server-start.sh -daemon config/server.properties
-daemon表示后臺(tái)啟動(dòng)kafka服務(wù),這樣就不會(huì)占用當(dāng)前命令窗口。
? 通過(guò)jps指令可以查看Kafka的進(jìn)程。
三、Kraft集群
? 在Kafka的config目錄下,提供了一個(gè)kraft的文件夾,在這里面就是Kraft協(xié)議的參考配置文件。在這個(gè)文件夾中有三個(gè)配置文件,broker.properties,controller.properties,server.properties,分別給出了Kraft中三種不同角色的示例配置。
- broker.properties: 數(shù)據(jù)節(jié)點(diǎn)
- controller.properties: Controller控制節(jié)點(diǎn)
- server.properties: 即可以是數(shù)據(jù)節(jié)點(diǎn),又可以是Controller控制節(jié)點(diǎn)。
這里同樣列出幾個(gè)比較關(guān)鍵的配置項(xiàng),按照自己的環(huán)境進(jìn)行定制即可
#配置當(dāng)前節(jié)點(diǎn)的角色。Controller相當(dāng)于Zookeeper的功能,負(fù)責(zé)集群管理。Broker提供具體的消息轉(zhuǎn)發(fā)服務(wù)。
process.roles=broker,controller
#配置當(dāng)前節(jié)點(diǎn)的id。與普通集群一樣,要求集群內(nèi)每個(gè)節(jié)點(diǎn)的ID不能重復(fù)。
node.id=1
#配置集群的投票節(jié)點(diǎn)。其中@前面的是節(jié)點(diǎn)的id,后面是節(jié)點(diǎn)的地址和端口,這個(gè)端口跟客戶端訪問(wèn)的端口是不一樣的。通常將集群內(nèi)的所有Controllor節(jié)點(diǎn)都配置進(jìn)去。
controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093
#Broker對(duì)客戶端暴露的服務(wù)地址?;赑LAINTEXT協(xié)議。
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服務(wù)協(xié)議的別名。默認(rèn)就是CONTROLLER
controller.listener.names=CONTROLLER
#配置監(jiān)聽(tīng)服務(wù)。不同的服務(wù)可以綁定不同的接口。這種配置方式在端口前面是省略了一個(gè)主機(jī)IP的,主機(jī)IP默認(rèn)是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#數(shù)據(jù)文件地址。默認(rèn)配置在/tmp目錄下。
log.dirs=/app/kafka/kraft-log
#topic默認(rèn)的partition分區(qū)數(shù)。
num.partitions=2
將配置文件分發(fā),并修改每個(gè)服務(wù)器上的node.id屬性和advertised.listeners屬性。
? 由于Kafka的Kraft集群對(duì)數(shù)據(jù)格式有另外的要求,所以在啟動(dòng)Kraft集群前,還需要對(duì)日志目錄進(jìn)行格式化。
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
j8XGPOrcR_yX4F7ospFkTA
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t j8XGPOrcR_yX4F7ospFkTA -c config/kraft/server.properties?
Formatting /app/kafka/kraft-log with metadata.version 3.4-IV0.
-t 表示集群ID,三個(gè)服務(wù)器上可以使用同一個(gè)集群ID。
? 接下來(lái)就可以指定配置文件,啟動(dòng)Kafka的服務(wù)了。 例如,在Worker1上,啟動(dòng)Broker和Controller服務(wù)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-729196.html
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties?
[oper@worker1 kafka_2.13-3.4.0]$ jps
10993 Jps
10973 Kafka
? 等三個(gè)服務(wù)都啟動(dòng)完成后,就可以像普通集群一樣去創(chuàng)建Topic,并維護(hù)Topic的信息了。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-729196.html
到了這里,關(guān)于kafka環(huán)境搭建以及基本原理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!