環(huán)境準(zhǔn)備
環(huán)境資源包:
jdk-8u341-linux-x64.tar.gz
kafka_2.12-2.2.0.tgz
zookeeper-3.4.14.tar.gz
server-id | ip | 狀態(tài) |
---|---|---|
server1 | 10.206.120.10 | leader |
server2 | 10.206.120.2 | follower |
server3 | 10.206.120.3 | follower |
一、安裝jdk
因?yàn)閗afka需要Java環(huán)境,所以優(yōu)先配置jdk環(huán)境,若已經(jīng)配置了java環(huán)境,此步驟可以忽略
[root@VM-120-2-centos ~]# tar -xvf jdk-8u341-linux-x64.tar.gz
[root@VM-120-2-centos ~]# mv jdk1.8.0_341/ /usr/local/
#在文件末尾加入以下語句
[root@VM-120-2-centos ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_341
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib
[root@VM-120-2-centos ~]# source /etc/profile
[root@VM-120-2-centos ~]# java -version
java version "1.8.0_341"
Java(TM) SE Runtime Environment (build 1.8.0_341-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.341-b10, mixed mode)
至此jdk環(huán)境配置完成
二、zookeeper集群安裝
[root@VM-120-2-centos ~]# cd
[root@VM-120-2-centos ~]# tar -xvf zookeeper-3.4.14.tar.gz
[root@VM-120-2-centos ~]# mv zookeeper-3.4.14 /usr/local/zookeeper
[root@VM-120-2-centos ~]# cd /usr/local/zookeeper/conf/
[root@VM-120-2-centos ~]# cp zoo_sample.cfg zoo.cfg
[root@VM-120-2-centos ~]# vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataLogDir=/usr/local/zookeeper/logs
dataDir=/usr/local/zookeeper/data
clientPort=2181
autopurge.snapRetainCount=500
autopurge.purgeInterval=24
server.1= 10.206.120.10:2888:3888
server.2= 10.206.120.2:2888:3888
server.3= 10.206.120.2:2888:3888
[root@VM-120-2-centos ~]# mkdir /usr/local/zookeeper/data
#10.206.120.10服務(wù)器上執(zhí)行
[root@VM-120-2-centos ~]# echo "1" > /usr/local/zookeeper/data/myid
#10.206.120.2服務(wù)器上執(zhí)行
[root@VM-120-2-centos ~]# echo "2" > /usr/local/zookeeper/data/myid
#10.206.120.3服務(wù)器上執(zhí)行
[root@VM-120-2-centos ~]# echo "3" > /usr/local/zookeeper/data/myid
[root@VM-120-2-centos ~]# cd ../bin/
[root@VM-120-2-centos ~]# ./zkServer.sh start
[root@VM-120-2-centos ~]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@VM-120-2-centos ~]# netstat -ntlp
看看端口是否正常啟動
二、kafka集群安裝
[root@VM-120-2-centos ~]# cd
[root@VM-120-2-centos ~]# tar -xvf kafka_2.12-2.2.0.tgz
[root@VM-120-2-centos ~]# mv kafka_2.12-2.2.0 /usr/local/kafka
[root@VM-120-2-centos ~]# cd /usr/local/kafka/config/
[root@VM-120-2-centos ~]# cp server.properties server.properties.bak
#10.206.120.2服務(wù)器上將以下內(nèi)容寫入文件
[root@VM-120-2-centos ~]# vim server.properties
broker.id=2
listeners=PLAINTEXT://10.206.120.2:9092
advertised.listeners=PLAINTEXT://43.137.8.225:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=204857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=1680
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=10
max.poll.interval.ms=800000
max.poll.records=50
#10.206.120.10服務(wù)器上將以下內(nèi)容寫入文件
[root@VM-120-2-centos ~]# vim server.properties
broker.id=1
listeners=PLAINTEXT://10.206.120.10:9092
advertised.listeners=PLAINTEXT://118.195.137.101:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=204857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=1680
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=10
max.poll.interval.ms=800000
max.poll.records=50
#10.206.120.3服務(wù)器上將以下內(nèi)容寫入文件
[root@VM-120-2-centos ~]# vim server.properties
broker.id=3
listeners=PLAINTEXT://10.206.120.3:9092
advertised.listeners=PLAINTEXT://175.27.146.204:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=204857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=1680
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=10
max.poll.interval.ms=800000
max.poll.records=50
[root@VM-120-2-centos ~]# cd ../bin/
[root@VM-120-2-centos ~]# nohup ./kafka-server-start.sh ../config/server.properties &
#查看9092端口是否啟動,啟動即為正常
[root@VM-120-10-centos bin]# netstat -ntlp
#下面所有命令在kafka的bin目錄下執(zhí)行(筆者是/usr/local/kafka/bin)
#創(chuàng)建topic話題
./kafka-topics.sh --create --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --replication-factor 3 --topic test --partitions 3
#開啟一個(gè)生產(chǎn)者
./kafka-console-producer.sh --broker-list 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topic test
#開啟一個(gè)消費(fèi)者
./kafka-console-consumer.sh --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topic test --from-beginning
#列出所有topic
./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 -list
#輸出某個(gè)topic
./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --delete --topic test
#詳細(xì)描述某個(gè)topic
./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --describe --topic test
至此集群搭建成功,可以正常使用了
四、測試
外網(wǎng)測試
#1、上傳測試壓縮包并解壓如下圖1
#2、更改application.yml將ip更改為集群主機(jī)的外網(wǎng)ip
spring:
application:
name: kafka-tester
kafka:
bootstrap-servers:
- 10.206.120.3:9092
- 10.206.120.2:9092
- 10.206.120.10:9092
producer:
topic: test
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
topic: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#3、 后臺啟動kafka測試包
nohup ./test.sh &
#4、開啟一個(gè)消費(fèi)者,查看jar包生產(chǎn)結(jié)果圖2(說明kafka-test.jar包啟動正常,kafka集群工作正常)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 175.27.146.204:9092,118.195.137.101:9092,43.137.8.225:9092 --topic test --from-beginning
#5、查看consumer_offsets只有一個(gè)副本數(shù)圖三
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 --describe --topic __consumer_offsets
將集群中的一個(gè)follower關(guān)機(jī),模擬服務(wù)器宕機(jī)
結(jié)果發(fā)現(xiàn)消息隊(duì)列出現(xiàn)問題(kafka集群不可用)
將服務(wù)器啟動,并將zookeeper和kafka集群啟動,此時(shí)集群才正常
故當(dāng)__consumer_offsets副本數(shù)設(shè)置為1時(shí),若服務(wù)器宕機(jī)了,則kafka集群無法正常使用,服務(wù)器啟動,并將zookeeper和kafka集群啟動,此時(shí)集群才正常
解決方案
1.修改系統(tǒng)_offsets副本數(shù)為3
修改kafka的核心配置文件server.properties
將num.partitions參數(shù)(默認(rèn)為1)
修改為3,offsets.topic.replication.factor=3(默認(rèn)為1)
另外需要添加auto.create.topics.enable=true
由于__consumer_offsets是kafka默認(rèn)的主題,無法刪除,我們可以刪除zookeeper中的__consumer_offsets。
進(jìn)入zookeeper/bin目錄執(zhí)行./zkCli.sh
ls /broksers/topics
rmr /brokers/topics/__consumer_offsets
ls /broksers/topics
先將集群停掉
在重新啟動zookeeper和kafka
再次查看__consumer_offsets。發(fā)現(xiàn)副本數(shù)已經(jīng)是3
將集群中的一個(gè)follower關(guān)機(jī),模擬服務(wù)器宕機(jī)
消費(fèi)隊(duì)列正常
cd /usr/local/zookeeper/bin/
./zkServer.sh start
cd /usr/local/kafka/bin/
nohup ./kafka-server-start.sh …/config/server.properties &
./kafka-topics.sh --zookeeper 175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 --describe --topic __consumer_offsets文章來源:http://www.zghlxwxcb.cn/news/detail-758648.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-758648.html
到了這里,關(guān)于jdk+zookeeper+kafka 搭建kafka集群的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!