1.簡(jiǎn)介
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁(yè)瀏覽,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過(guò)處理日志和日志聚合來(lái)解決。
Kafka?[1]??是一種高吞吐量?[2]??的分布式發(fā)布訂閱消息系統(tǒng),有如下特性:
通過(guò)O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。
高吞吐量?[2]??:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬(wàn)?[2]??的消息。
1.1 使用消息隊(duì)列好處
1)、提高擴(kuò)展性:因?yàn)橄㈥?duì)列解耦了處理過(guò)程,有新增需求時(shí)只要另外增加處理過(guò)程即可。
2)、提高峰值處理能力:在訪問(wèn)量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷請(qǐng)求而完全崩潰;
3)、提高系統(tǒng)的可恢復(fù)性:消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)回復(fù)后被處理
1.2 kafka的結(jié)構(gòu)
- Producer:消息生產(chǎn)者,產(chǎn)生的消息將會(huì)被發(fā)送到某個(gè)topic
- Consumer:消息消費(fèi)者,消費(fèi)的消息內(nèi)容來(lái)自某個(gè)topic
- Topic:消息根據(jù)topic進(jìn)行歸類,topic其本質(zhì)是一個(gè)目錄,即將同一主題消息歸類到同一個(gè)目錄
- Broker:每一個(gè)kafka實(shí)例(或者說(shuō)每臺(tái)kafka服務(wù)器節(jié)點(diǎn))就是一個(gè)broker,一個(gè)broker可以有多個(gè)topic
2.下載
獲取我的baidu網(wǎng)盤中ELK
kafka.apache.org 官方網(wǎng)址
由于我下載的filebeat是7.9.3,與這個(gè)版本filebeat配合的kafka版本為0.11--2.2.2
https://www.elastic.co/guide/en/beats/filebeat/7.9/kafka-output.html 這里有說(shuō)明。
Compatibility
This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions might work as well, but are
not supported.
3.安裝
tar zxvf kafka_2.11-2.0.1.tgz -C /usr/local
ls
cd /usr/local/
mv kafka_2.11-2.0.1 kafka
./bin 執(zhí)行文件
./config 配置文件主配置文件為 server.properties
4.配置
vi server.properties
broker.id=0 集群內(nèi)的唯一標(biāo)識(shí)
listeners=PLAINTEXT://ip[或者主機(jī)名]:9092 ? ? ? ? ?#配置監(jiān)聽(tīng)端口 ?9092位kafka默認(rèn)端口
log.dirs=/usr/local/kafka/logs ? ? ?#日志及傳輸?shù)臄?shù)據(jù)存放目錄可以通過(guò)“,”分隔存放到多個(gè)目錄,如
log.dirs=/data1,/data2
num.partitions=6 ?#定義新的topic有多少個(gè)分區(qū)
num.retention.hours=60 ?消息日志保留的時(shí)間,單位是小時(shí)
#num.retention.minutes 或者num.retention.ms,可以多個(gè)參數(shù)同時(shí)設(shè)置,時(shí)間最小生效
log.segment.bytes=1073741824 ?每個(gè)段文件的大小,1G為默認(rèn)值,topic--partition--segement
zookeeper.connect=10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
auto.create.topics.enable=true ?#可以自動(dòng)創(chuàng)建topics
delete.topic.enable=true ?#可以自動(dòng)刪除空閑的topics,是邏輯的刪除,非物理刪除。
zookeeper.connect.timeout.ms=6000
mkdir /data1
mkdir /data2
5.啟動(dòng)kafka?
cd /usr/local/kafka
./bin/kafka-server-start.sh ./config/server.properties -d ? ? ?#-d 后臺(tái)執(zhí)行
或者nohup ./kafka-server-start.sh ../config/server.properties &
啟動(dòng)時(shí)可能會(huì)報(bào)錯(cuò),注意關(guān)閉服務(wù)器的防火墻
jps查看
[root@localhost kafka]# jps
23696 Kafka
9271 QuorumPeerMain
24077 Jps
kafka提供了多個(gè)命令來(lái)查看,創(chuàng)建,修改,刪除topic信息,也可以通過(guò)命令來(lái)測(cè)試日志消息
命令存放路徑為usr/local/kafka/bin
[root@localhost bin]# pwd
/usr/local/kafka/bin
[root@localhost bin]# ls
connect-distributed.sh ? ? ? ?kafka-consumer-groups.sh ? ? kafka-preferred-replica-election.sh ?kafka-streams-
application-reset.sh ?zookeeper-server-start.sh
connect-standalone.sh ? ? ? ? kafka-consumer-perf-test.sh ?kafka-producer-perf-test.sh ? ? ? ? ?kafka-topics.sh ?
? ? ? ? ? ? ? ? ? ?zookeeper-server-stop.sh
kafka-acls.sh ? ? ? ? ? ? ? ? kafka-delegation-tokens.sh ? kafka-reassign-partitions.sh ? ? ? ? kafka-
verifiable-consumer.sh ? ? ? ?zookeeper-shell.sh
kafka-broker-api-versions.sh ?kafka-delete-records.sh ? ? ?kafka-replica-verification.sh ? ? ? ?kafka-
verifiable-producer.sh
kafka-configs.sh ? ? ? ? ? ? ?kafka-dump-log.sh ? ? ? ? ? ?kafka-run-class.sh ? ? ? ? ? ? ? ? ? trogdor.sh
kafka-console-consumer.sh ? ? kafka-log-dirs.sh ? ? ? ? ? ?kafka-server-start.sh ? ? ? ? ? ? ? ?windows
kafka-console-producer.sh ? ? kafka-mirror-maker.sh ? ? ? ?kafka-server-stop.sh ? ? ? ? ? ? ? ? zookeeper-
security-migration.sh
例子:kafka的任何操作都需要zookeeper
查看有多少個(gè)topic
./kafka-topics.sh ?--zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --list
創(chuàng)建topic
./kafka-topics.sh ?--create --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --replication-factor
1 --partitions 3 --topic mytopic
實(shí)例:
[root@localhost bin]# ./kafka-topics.sh ?--create --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--replication-factor 1 --partitions 3 --topic mytopic
Created topic "mytopic".
[root@localhost bin]# ./kafka-topics.sh ?--zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --list ?
? ? ? ? ? ?mytopic
[root@localhost bin]#
看看topic的具體信息
./kafka-topics.sh ?--describe --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 ?--topic mytopic
[root@localhost bin]# ./kafka-topics.sh ?--describe --zookeeper
10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 ?--topic mytopic
Topic:mytopic ? PartitionCount:3 ? ? ? ?ReplicationFactor:1 ? ? Configs:
? ? ? ? Topic: mytopic ?Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1
? ? ? ? Topic: mytopic ?Partition: 1 ? ?Leader: 2 ? ? ? Replicas: 2 ? ? Isr: 2
? ? ? ? Topic: mytopic ?Partition: 2 ? ?Leader: 3 ? ? ? Replicas: 3 ? ? Isr: 3
消費(fèi)信息
交互式的生成消息到topic mytopic中
./kafka-console-producer.sh ?--broker-list 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic mytopic
ctrl +c ?結(jié)束查看消費(fèi)消息
./kafka-console-consumer.sh ?--bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
mytopic
./kafka-console-consumer.sh ?--bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
mytopic
?--from-beginning ?#從頭開(kāi)始消費(fèi)
刪除topic文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-694089.html
[root@localhost bin]# ./kafka-topics.sh ?--delete --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--topic mytopic
[root@localhost bin]# ./kafka-topics.sh ?--delete --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--topic mytopic
Topic mytopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-694089.html
到了這里,關(guān)于ELK安裝、部署、調(diào)試(四)KAFKA消息隊(duì)列的安裝和部署的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!