Kafka后臺(tái)操作
1)主題
1.創(chuàng)建主題
./bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 3 --partitions 1 --topic second
2.查看所有主題
./bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
3.查看詳細(xì)主題
./bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic test
序號(hào)從0開(kāi)始計(jì)算
Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū)
Replica:副本數(shù),該主題有3個(gè)副本
Leader:副本數(shù)中的主的序號(hào),生產(chǎn)消費(fèi)的對(duì)象
2)分區(qū)
1.修改分區(qū)數(shù)
./bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --partitions 3 --topic test4
修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會(huì)報(bào)錯(cuò)
在根目錄kafka-logs文件中存放了儲(chǔ)消費(fèi)者組的偏移量信息的文件。所有創(chuàng)建的主題都會(huì)存放在該文件夾中
__consumer_offsets-xx:當(dāng)中的xx是根據(jù)分區(qū)主題數(shù)量來(lái)確定的,里面存儲(chǔ)了當(dāng)前分區(qū)的消費(fèi)者組的偏移量信息。例如個(gè)話題有5個(gè)分區(qū),那么他就會(huì)有5個(gè)對(duì)應(yīng)的文件。而-xx前面代表的是主題的名稱(chēng)。
在創(chuàng)建Kafka集群時(shí)__consumer_offsets默認(rèn)會(huì)創(chuàng)建50個(gè)分區(qū),文件內(nèi)的 __consumer_offsets-xx的數(shù)量應(yīng)該大于50個(gè)
選擇一個(gè)文件進(jìn)去查看
log: 分區(qū)的日志文件,用于存儲(chǔ)消費(fèi)者組的偏移量信息。每個(gè)日志文件由多個(gè)消息記錄組成,其中包含了消費(fèi)者組的偏移量提交記錄。
index:分區(qū)的索引文件,用于加速偏移量查詢操作。索引文件保存了消息偏移量和日志文件中的物理偏移量之間的映射關(guān)系。
timeindex:分區(qū)的時(shí)間索引文件,用于加速偏移量查詢操作。時(shí)間索引文件保存了消息的時(shí)間戳和日志文件中的物理偏移量之間的映射關(guān)系。
snapshot:分區(qū)的快照文件,用于記錄偏移量提交的快照信息??煺瘴募似屏刻峤挥涗浀恼畔?,以便在恢復(fù)時(shí)加快恢復(fù)速度。
3)生產(chǎn)者
1.啟動(dòng)生產(chǎn)者
./bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
kafka-console-producer.sh
是 Kafka 提供的一個(gè) shell 腳本,用于在命令行中啟動(dòng)一個(gè)控制臺(tái)生產(chǎn)者,用于向 Kafka 集群發(fā)送消息
--broker-list
: 指定 Kafka brokers 的地址和端口號(hào),用冒號(hào)分隔
--topic
: 指定要發(fā)送消息的主題名。
4)消費(fèi)者組
1.查看消費(fèi)者組
./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --list
2.創(chuàng)建消費(fèi)者從頭開(kāi)始讀并設(shè)置消費(fèi)者組
./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning --group testGroup
bootstrap-server :參數(shù)用于指定 Kafka 集群的地址
topic :參數(shù)用于指定要消費(fèi)的主題名稱(chēng)
group :參數(shù)用于指定消費(fèi)者組的名稱(chēng)
from-beginning :讀取最早的偏移量
group :指定消費(fèi)者組名稱(chēng)
3.創(chuàng)建消費(fèi)者時(shí)設(shè)置偏移量為最新
./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --to-latest --execute
#啟動(dòng)
./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group testGroup
reset-offsets:表示要重置偏移量
to-latest:表示將偏移量移到最新位置
execute:表示執(zhí)行偏移量重置操作
執(zhí)行偏移量重置后,會(huì)移動(dòng)到最新位置,如果需要修改,得重新設(shè)置,謹(jǐn)慎操作。
NEW-OFF:表示設(shè)置的新的偏移量
4.創(chuàng)建消費(fèi)者時(shí)設(shè)置具體的偏移量數(shù)據(jù)(重置偏移量到6)
./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --to-offset 6 --execute
#啟動(dòng)
./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group testGroup
5.創(chuàng)建消費(fèi)者時(shí)設(shè)置相對(duì)的偏移量數(shù)據(jù)(偏移量向前偏移2個(gè)位置)
./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --shift-by 2 --execute
reset-offsets:表示要重置偏移量
to-offset:參數(shù)可以將偏移量重置到指定的偏移量位置
shift-by:參數(shù)可以將偏移量進(jìn)行相對(duì)偏移量重置
6.查看消費(fèi)者組主具體信息
./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group mentugroup
TOPIC:消費(fèi)主題名稱(chēng)
PARTITION:分區(qū)Id
CURRENT-OFFSET: 當(dāng)前消費(fèi)偏移量,即消費(fèi)者當(dāng)前消費(fèi)到的消息的偏移量
LOG-END-OFFSET: 分區(qū)的最新偏移量,即分區(qū)中最新消息的偏移量
LAG:消費(fèi)者落后于最新偏移量的消息數(shù)量,即LAG = LOG-END-OFFSET - CURRENT-OFFSET(最新-當(dāng)前)
CONSUMER-ID:消費(fèi)者ID
HOST:消費(fèi)者所在主機(jī)
CLIENT-ID:消費(fèi)者客戶端ID文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-507559.html
7.刪除消費(fèi)者組文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-507559.html
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroupNew1 --delete
到了這里,關(guān)于Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!