管理Kafka
Kafka 提供了一些命令行工具,用于管理集群的變更。這些工具使用 Java 類實現(xiàn),Kafka 提供了一些腳本來調(diào)用這些 Java 類。不過,它們只提供了一些基本的功能,無法完成那 些復(fù)雜的操作。
雖然 Kafka 實現(xiàn)了操作主題的認證和授權(quán)控制,但還不支持集群的其他大部 分操作。也就是說,在沒有認證的情況下也可以使用這些命令行工具,在沒 有安全檢查和審計的情況下也可以執(zhí)行諸如主題變更之類的操作。
1、主題操作(kafka-topic.sh)
使用 kafka-topics.sh 工具可以執(zhí)行主題的大部分操作(配置變更部分已經(jīng)被棄用并被移動 到 kafka-configs.sh 工具當(dāng)中)。我們可以用它創(chuàng)建、修改、刪除和查看集群里的主題。要使用該工具的全部功能,需要通過 --bootstrap-server 參數(shù)提供broker的連接字符串。
1.1、創(chuàng)建主題(–create)
在集群里創(chuàng)建一個主題需要用到 3 個參數(shù)。這些參數(shù)是必須提供的,盡管有些已經(jīng)有了broker 級別的默認值。
- 主題名字:題名字可以包含字母、數(shù)字、下劃線以及英文狀態(tài)下的破折號和句號。主題名字的開頭部分包含兩個下劃線是合法的,但不建議這么做。具有這種 格式的主題一般是集群的內(nèi)部主題(比如 __consumer_offsets 主題用于保存 消費者群組的偏移量)。也不建議在單個集群里使用英文狀態(tài)下的句號和下 劃線來命名,因為主題的名字會被用在度量指標上,句號會被替換成下劃線
(比如“topic.1”會變成“topic_1”)。 - 復(fù)制系數(shù):主題的副本數(shù)量。
- 分區(qū):主題的分區(qū)數(shù)量
語法:
kafka-topics.sh --bootstrap-server <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>
示例:
# 使用以下命令創(chuàng)建一個叫作my-topic的主題,主題包含8個分區(qū),每個分區(qū)擁有1個副本。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 1 -- partitions 8
忽略重復(fù)創(chuàng)建主題的錯誤
在自動化系統(tǒng)里調(diào)用這個腳本時,可以使用 --if-not-exists 參數(shù),這樣即 使主題已經(jīng)存在,也不會拋出重復(fù)創(chuàng)建主題的錯誤。
1.2、增加分區(qū)(–alter)
主題基于分區(qū)進行伸縮和復(fù)制,增加分區(qū)主要是 為了擴展主題容量或者降低單個分區(qū)的吞吐量。如果要在單個消費者群組內(nèi)運行更多的消 費者,那么主題數(shù)量也需要相應(yīng)增加,因為一個分區(qū)只能由群組里的一個消費者讀取。
- 調(diào)整基于鍵的主題
從消費者角度來看,為基于鍵的主題添加分區(qū)是很困難的。因為如果改變了 分區(qū)的數(shù)量,鍵到分區(qū)之間的映射也會發(fā)生變化。所以,對于基于鍵的主題 來說,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對其進行調(diào)整。 - 忽略主題不存在的錯誤
在使用 --alter 命令修改主題時,如果指定了 --if-exists 參數(shù),主題不存 在的錯誤就會被忽略。如果要修改的主題不存在,該命令并不會返回任何錯 誤。在主題不存在的時候本應(yīng)該創(chuàng)建主題,但它卻把錯誤隱藏起來,因此不 建議使用這個參數(shù)。
示例:
#將my-topic主題的分區(qū)增加到16
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16
1.3、減少分區(qū)數(shù)量(無)
我們無法減少主題的分區(qū)數(shù)量。因為如果刪除了分區(qū),分區(qū)里的數(shù)據(jù)也一并 被刪除,導(dǎo)致數(shù)據(jù)不一致。我們也無法將這些數(shù)據(jù)分配給其他分區(qū),因為這 樣做很難,而且會出現(xiàn)消息亂序。所以,如果一定要減少分區(qū)數(shù)量,只能刪 除整個主題,然后重新創(chuàng)建它。
1.4、刪除主題(–delete)
如果一個主題不再被使用,只要它還存在于集群里,就會占用一定數(shù)量的磁盤空間和文件 句柄。把它刪除就可以釋放被占用的資源。為了能夠刪除主題,broker 的 delete.topic. enable
參數(shù)必須被設(shè)置為 true。如果該參數(shù)被設(shè)為 false,刪除主題的請求會被忽略。
示例:
# 刪除my-topic主題
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
1.5、列出集群里的所有主題(–list)
可以使用主題工具列出集群里的所有主題。每個主題占用一行輸出,主題之間沒有特定的順序。
示例:
kafka-topics.sh --bootstrap-server localhost:9092 --list
1.6、列出主題詳細信息(–describe)
主題工具還能用來獲取主題的詳細信息。信息里包含了分區(qū)數(shù)量、主題的覆蓋配置以及 每個分區(qū)的副本清單。如果通過 --topic 參數(shù)指定特定的主題,就可以只列出指定主題 的詳細信息。
示例:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
describe 命令還提供了一些參數(shù),用于過濾輸出結(jié)果,這在診斷集群問題時會很有用。不要為這些參數(shù)指定 --topic 參數(shù)(因為這些參數(shù)的目的是為了找出集群里所有滿足條件的 主題和分區(qū))。這些參數(shù)也無法與 list 命令一起使用(最后一部分會詳細說明原因)。
使用 --topics-with-overrides
參數(shù)可以找出所有包含覆蓋配置的主題,它只會列出包含了 與集群不一樣配置的主題。
有兩個參數(shù)可用于找出有問題的分區(qū)。--under-replicated-partitions
參數(shù)可以列出 所有包含不同步副本的分區(qū)。--unavailable-partitions
參數(shù)可以列出所有沒有首領(lǐng) 的分區(qū),這些分區(qū)已經(jīng)處于離線狀態(tài),對于生產(chǎn)者和消費者來說是不可用的。
# 列出包含不同步副本的分區(qū)
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
1.7、修改或刪除配置(–config)
覆蓋已有的topic參數(shù):
kafka-topics.sh --bootstrap-server zk_host:port --topic TEST --alter --config flush.messages=1
刪除topic級別配置參數(shù):
kafka-topics.sh --bootstrap-server zk_host:port --alter --topic TEST --delete-config flush.messages=1
2、消費者群組(kafka- consumer-groups.sh )
在 Kafka 里,有兩個地方保存著消費者群組的信息。對于舊版本的消費者來說,它們的信 息保存在 Zookeeper 上;對于新版本的消費者來說,它們的信息保存在 broker 上。kafka- consumer-groups.sh 工具可以用于列出上述兩種消費者群組。它也可以用于刪除消費者群 組和偏移量信息,不過這個功能僅限于舊版本的消費者群組(信息保存在 Zookeeper 上)。 在對舊版本的消費者群組進行操作時,需要通過 --zookeeper 參數(shù)指定 Zookeeper 的地址; 在對新版本的消費者群組進行操作時,則需要使用 --bootstrap-server 參數(shù)指定 broker 的 主機名和端口。
語法:
kafka-consumer-groups.sh [-h] [--bootstrap-server <server to use>] [--command-config <command configuration property file>]
[--group <consumer-group>] [--new-consumer | --zookeeper] [--describe] [--delete] [--reset-offsets]
[--reset-offsets-by-duration <duration controlling how far back to reset>] [--reset-offsets-by-topic <topic to reset>]
[--reset-offsets-by-times] [--all-topics] [--topic <topic>] [--exclude-internal] [--dry-run]
- -–bootstrap-server:Kafka集群的地址,多個地址使用逗號分隔。
- -–command-config:kafka的安全認證配置文件路徑。
- -–group:指定要操作的消費組。
- -–describe:列出消費組的詳情。
- -–delete:刪除消費組。
- -–reset-offsets:重置消費組的偏移量。
- -–reset-offsets-by-duration:指定重置的時間(從現(xiàn)在往前)。
- -–reset-offsets-by-topic:指定重置的topic和partition。
- -–reset-offsets-by-times:指定重置的時間點。
- -–new-consumer:使用新消費者API。
- -–zookeeper:使用舊的Zookeeper API。
- -–all-topics:列出所有topic的所有消費組。
- -–topic:指定要操作的topic。
- -–exclude-internal:不列出.kafka/*的topic。
- -–dry-run:僅輸出要執(zhí)行的操作,不實際運行。
2.1、列出群組(–list)
在使用舊版本的消費者客戶端時,可以使用 --zookeeper 和 --list 參數(shù)列出消費者群 組;在使用新版本的消費者客戶端時,則要使用 --bootstrap-server、–list 參數(shù)。
示例:列出舊版本的消費者群組。
kafka-consumer-groups.sh --zookeeper localhost:2181/kafka-cluster --list
示例:列出新版本的消費者群組。
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
2.2、獲取群組詳細信息(–describe)
使用 --describe,并通過 --group 指定特定的群組, 就可以獲取該群組的詳細信息。它會列出群組里所有主題的信息和每個分區(qū)的偏移量。
示例:獲取消費者群組testGroup的詳細信息
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
字段 | 描述 |
---|---|
GROUP | 消費者群組的名字 |
TOPIC | 正在被讀取的主題名字 |
PARTITION | 正在被讀取的分區(qū) ID |
CURRENT-OFFSET | 消費者群組最近提交的偏移量,也就是消費者在分區(qū)里讀取的當(dāng)前位置 |
LOG-END-OFFSET | 當(dāng)前高水位偏移量,也就是最近一個被讀取消息的偏移量,同時也是最近一個被提 交到集群的偏移量 |
LAG | 消費者的 CURRENT-OFFSET 和 broker 的 LOG-END-OFFSET 之間的差距 |
CONSUMER-ID | 消費者群組里正在讀取該分區(qū)的消費者。這是一個消費者的 ID |
HOST | 消費者主機IP |
2.3、偏移量管理(–reset-offsets)
能夠執(zhí)行成功的一個前提是 消費組這會是不可用狀態(tài);
- 執(zhí)行模式:
–dry-run:這個參數(shù)表示預(yù)執(zhí)行,會打印出來將要處理的結(jié)果;
–excute:真正執(zhí)行;
- 執(zhí)行范圍:
–group:指定具體的消費組;
–all-group:指定所有的消費組;
- 重置模式:
相關(guān)重置Offset的模式;
參數(shù) | 描述 | 示例 |
---|---|---|
--to-earliest | 重置offset到最開始的offset(未被刪除的最早的offset) | |
--to-current | 直接重置offset到當(dāng)前的offset,也就是LOE | |
--to-latest | 重置到最后一個offset | |
--to-detetime | 重置到指定時間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss | --to-datetime “2021-6-26T00:00:00.000” |
--to-offset | 重置到指定的offset,但是通常情況下,匹配到多個分區(qū),這里是將匹配到的所有分區(qū)都重置到這一個值;如果目標最大offset < to-offset,這個時候重置為目標最大offset;如果目標最小offset > to-offset,則重置為最?。?/td> | --to-offset 300 |
--shift-by | 按照偏移量增加或者減少多少個offsete;正數(shù)向前增加、負數(shù)向后退 | --shift-by 100、--shift-by -100 |
--from-file | 根據(jù)CVS文檔來重置 |
示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量為300。
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --topic testTopic --to-offset 300 --execute
示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量向前移動100。
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --topic testTopic --shift-by -100 --execute
示例:通過cvs文檔配置消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量為10000
offsets.cvs:
格式為:Topic,分區(qū)號,偏移量
testTopic,0,10000
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --from-file offsets.cvs --execute
2.4、刪除偏移量(–delete-offsets)
能夠執(zhí)行成功的一個前提是 消費組這會是不可用狀態(tài);
偏移量被刪除了之后,Consumer Group下次啟動的時候,會從頭消費;
示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量刪除
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --topic testTopic --delete-offsets
2.4、查詢消費者成員信息(–members)
示例:
kafka-consumer-groups.sh --describe --all-groups --bootstrap-server localhost:9092 --members
2.5、查詢消費者狀態(tài)信息(–state)
示例:
kafka-consumer-groups.sh --describe --all-groups --bootstrap-server localhost:9092 --state
2.6、刪除消費組(–delete)
想要刪除消費組前提是這個消費組的所有客戶端都停止消費/不在線才能夠成功刪除;否則會報下面異常
示例:刪除指定消費組
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group testGroup
示例:刪除所有消費組
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --all-groups
3、動態(tài)配置變更(kafka-configs.sh)
kafka-configs.sh可以在集群處于運行狀態(tài)時覆蓋主題配置和客戶端的配額參數(shù)。這樣就可以 為特定的主題和客戶端指定配置參數(shù)。一旦設(shè)置完畢,它們就成為集群的永久配置。
參數(shù) | 說明 |
---|---|
--zookeeper | 使用zk配置操作集群,支持三種配置類型topics、clients、users |
--bootstrap | 使用broker連接方式、僅支持一種配置類型brokers,格式為brokerIp01:port,brokerIp02:port,… |
--command-config | 包含要傳遞給Admin Client的配置的屬性文件。僅與--bootstrap-server選項一起使用,用于描述和更改代理配置 |
--alter | 指定需要修改配置 |
--describe | 列舉出指定的實體配置 |
--entity-type | 實體配置類型(topics、users、brokers) |
--entity-name | entity名稱(topicName、clientId、userId、brokerId) |
--add-config | 要添加的鍵值對配置。方括號可用于對包含逗號的值進行分組:‘k1=v1,k2=[v1,v2,v2],k3=v3’ |
--entity-default | clients/users/brokers的默認entity-name,生產(chǎn)zk相對路徑的節(jié)點<default> |
--delete-config | 指定配置項刪除’k1,k2’ |
--force | 禁止控制臺提示 |
3.1、Brokers類型動態(tài)配置(–entity-type brokers)
配置brokers只能指定–bootstrap-server,zk不支持。
3.1.1、增加配置項(–add-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config 'max.connections.per.ip=200,max.connections.per.ip.overrides=[ip1:100,ip2:120]'
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name $brokerId --add-config 'max.connections.per.ip=200,max.connections.per.ip.overrides=[ip1:100]'
3.1.2、刪除配置項(–delete-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --delete-config 'max.connections.per.ip,max.connections.per.ip.overrides'
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name $brokerId --delete-config 'max.connections.per.ip,max.connections.per.ip.overrides'
3.1.3、列出配置項詳情(–describe)
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name $brokerId --describe
3.2、Topics類型動態(tài)配置(–entity-type topics)
Topics類型配置是Brokers類型配置的子集,Brokers類型包含Topics類型所有配置,brokers只是在topics配置項前加了前綴。
3.2.1、增加配置項(–add-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --add-config 'max.message.bytes=50000000,flush.messages=50000,flush.ms=5000'
3.2.2、刪除配置項(–delete-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --delete-config 'max.message.bytes,flush.messages,flush.ms'
3.2.3、列出配置項(–describe)
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testTopic --describe
3.2.4、示例
示例:將主題testTopic的消息保留時間設(shè)為一個小時(3600000ms)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --add-config 'retention.ms=3600000'
示例:刪除retention.ms配置
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --delete-config 'retention.ms'
3.3、Clients類型動態(tài)配置(–entity-type clients)
對于 Kafka 客戶端來說,只能覆蓋生產(chǎn)者配額和消費者配額參數(shù)。這兩個配額都以字節(jié)每 秒為單位,表示客戶端在每個 broker 上的生產(chǎn)速率或消費速率。也就是說,如果集群里有 5 個 broker,生產(chǎn)者的配額是 10MB/s,那么它可以以 10MB/s 的速率在單個 broker 上生成 數(shù)據(jù),總共的速率可以達到 50MB/s。
配置項 | 描述 |
---|---|
producer_bytes_rate | 單個生產(chǎn)者每秒種可以往單個broker上生成的消息字節(jié)數(shù) |
consumer_bytes_rate | 單個消費者每秒鐘可以從單個broker讀取的消息字節(jié)數(shù) |
3.3.1、新增配置項(–add-config)
示例:broker內(nèi)所有clientId累加總和最大producer生產(chǎn)速率為20MB/sec
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients --entity-default --add-config 'producer_byte_rate=20971520'
示例:broker內(nèi)clientA的最大producer生產(chǎn)速率為20MB/sec
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients --entity-name clientA --add-config 'producer_byte_rate=20971520'
3.3.2、刪除配置項(–delete-config)
示例:刪除broker內(nèi)所有clientId的配置項producer_byte_rate
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients --entity-default --delete-config 'producer_byte_rate'
3.3.3、列出配置項(–describe)
示例:列出broker內(nèi)所有clientId的配置
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type clients --entity-default --describe
4、首領(lǐng)選舉(kafka-leader-election)
參數(shù):
參數(shù) | 描述 | 例子 |
---|---|---|
--bootstrap-server 指定kafka服務(wù) | 指定連接到的kafka服務(wù) | --bootstrap-server localhost:9092 |
--topic | 指定Topic,此參數(shù)跟--all-topic-partitions和path-to-json-file 三者互斥 | |
--partition | 指定分區(qū),跟--topic搭配使用 | |
--election-type | 兩個選舉策略(PREFERRED: 優(yōu)先副本選舉,如果第一個副本不在線的話會失敗;UNCLEAN: 策略) | |
--all-topic-partitions | 所有topic所有分區(qū)執(zhí)行Leader重選舉; 此參數(shù)跟--topic和path-to-json-file 三者互斥 | |
--path-to-json-file | 配置文件批量選舉,此參數(shù)跟--topic和all-topic-partitions 三者互斥 |
4.1、指定Topic指定分區(qū)用重新PREFERRED:優(yōu)先副本策略 進行Leader重選舉
示例:指定testTopic主題的0分區(qū)重新選舉
kafka-leader-election.sh --bootstrap-server localhost:9092 --topic testTopic --election-type PREFERRED --partition 0
4.2、所有Topic所有分區(qū)用重新PREFERRED:優(yōu)先副本策略 進行Leader重選舉
示例:所有Topic重新選舉
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions
4.3、設(shè)置配置文件批量指定topic和分區(qū)進行Leader重選舉
配置leader-election.json文件:
{
"partitions": [
{
"topic": "testTopic1",
"partition": 1
},
{
"topic": "testTopic2",
"partition": 2
}
]
}
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --path-to-json-file config/leader-election.json
5、分區(qū)分配(kafka-reassign-partitions.sh)
Kafka系統(tǒng)提供了一個分區(qū)重新分配工具(kafka-reassign-partitions.sh),該工具可用于在Broker之間遷移分區(qū)。理想情況下,將確保所有Broker的數(shù)據(jù)和分區(qū)均勻分配。分區(qū)重新分配工具無法自動分析Kafka群集中的數(shù)據(jù)分布并遷移分區(qū)以實現(xiàn)均勻的負載均衡。因此,管理員在操作的時候,必須弄清楚應(yīng)該遷移哪些Topic或分區(qū)。
5.1、分區(qū)遷移
分區(qū)重新分配工具可以在3種互斥模式下運行:
-
--generate
:在此模式下,給定Topic列表和Broker列表,該工具會生成候選重新??分配,以將指定Topic的所有分區(qū)遷移到新Broker中。此選項僅提供了一種方便的方法,可在給定Topic和目標Broker列表的情況下生成分區(qū)重新分配計劃。 -
--execute
:在此模式下,該工具將根據(jù)用戶提供的重新分配計劃啟動分區(qū)的重新分配。 (使用–reassignment-json-file選項)。由管理員手動制定自定義重新分配計劃,也可以使用–generate選項提供。 -
--verify
:在此模式下,該工具將驗證最后一次–execute期間列出的所有分區(qū)的重新分配狀態(tài)。狀態(tài)可以有成功、失敗或正在進行等狀態(tài)。
使用該工具需要經(jīng)過三個步驟:
- 第一步,根據(jù) broker 清單和主題清單生成一組遷移步驟;
- 第二步,執(zhí)行這些遷移步驟。
- 第三個步驟是可選的,也就是可以使用生成的遷移步驟驗證分區(qū)重分配的進度和完成情況。
案例:原兩臺機器,broker.id分別為0和2。新添加一條機器,broker.id為3。確定要遷移的topic,topic有兩個分區(qū),partition:0分區(qū)存儲在0,2broker上;先要將partiiton:0分區(qū)遷移到2,3上
步驟一:為了生成遷移步驟,需要先創(chuàng)建一個包含了主題清單的 JSON 文件topics-to-move.json,文件格式如下(目前的版本號都是 1)
{
"topics": [
{
"topic": "test2"
}
],
"version": 1
}
執(zhí)行g(shù)enerate:
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3" --generate
"2,3"為目標broker.id,填寫的個數(shù)不能小于副本數(shù)量。運行結(jié)果(生產(chǎn)兩段腳本:當(dāng)期分區(qū)副本分配和建議副本分配配置):
步驟二:這個時候,分區(qū)操作還沒有開始,它只是告訴你當(dāng)前分區(qū)副本配置和建議的分區(qū)副本配置。應(yīng)該保存當(dāng)前分區(qū)副本配置,以防您想要回滾到它。建議的分區(qū)副本配置應(yīng)該保存在一個json文件(例如topic-reassignment.json)
如對任務(wù)編寫熟悉:可以直接跳過第一步;手動編寫建議的分區(qū)副本配置,執(zhí)行execute:
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file topic-reassignment.json --execute
該命令會將指定分區(qū)的副本重新分配到新的 broker 上。集群控制器通過為每個分區(qū)添加 新副本實現(xiàn)重新分配(增加復(fù)制系數(shù))。新的副本將從分區(qū)的首領(lǐng)那里復(fù)制所有數(shù)據(jù)。根 據(jù)分區(qū)大小的不同,復(fù)制過程可能需要花一些時間,因為數(shù)據(jù)是通過網(wǎng)絡(luò)復(fù)制到新副本上 的。在復(fù)制完成之后,控制器將舊副本從副本清單里移除(恢復(fù)到原先的復(fù)制系數(shù))。
第三步:在重分配進行過程中或者完成之后,可以使用 kafka-reassign-partitions.sh 工具驗證重分配 的狀態(tài)。它可以顯示重分配的進度、已經(jīng)完成重分配的分區(qū)以及錯誤信息(如果有的話)。 為了做到這一點,需要在執(zhí)行過程中使用 JSON 對象文件。
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file topic-reassignment.json --verify
5.2、修改復(fù)制系數(shù)
分區(qū)重分配工具提供了一些特性,用于改變分區(qū)的復(fù)制系數(shù),這些特性并沒有在文檔里 說明。如果在創(chuàng)建分區(qū)時指定了錯誤的復(fù)制系數(shù)(比如在創(chuàng)建主題時沒有足夠多可用的 broker),那么就有必要修改它們。這可以通過創(chuàng)建一個 JSON 對象來完成,該對象使用分 區(qū)重新分配的執(zhí)行步驟中使用的格式,顯式指定分區(qū)所需的副本數(shù)量。集群將完成重分配 過程,并使用新的復(fù)制系數(shù)。
例如,假設(shè)主題 my-topic 有一個分區(qū),該分區(qū)的復(fù)制系數(shù)為 1。
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [1]
}
],
"version": 1
}
在分區(qū)重新分配的執(zhí)行步驟中使用以下 JSON 可以將復(fù)制系數(shù)改為 2。
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [1,2]
}
],
"version": 1
}
也可以通過類似的方式減小分區(qū)的復(fù)制系數(shù)。
6、刪除消息(kafka-delete-records.sh)
示例:刪除testTopic的0分區(qū)的消息刪除至offset為1024
先配置json文件offset-json-file.json
{
"partitions": [
{
"topic": "testTopic",
"partition": 0,
"offset": 1024
}
],
"version": 1
}
執(zhí)行命令:
kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file config/offset-json-file.json
從最開始的地方刪除消息到 1024的offset; 是從最前面開始刪除的。
7、查看Broker磁盤信息(kafka-log-dirs.sh)
-
--bootstrap-server
:kafka地址 -
--broker-list
:要查詢的broker地址列表,broker之間逗號隔開,不配置該命令則查詢所有broker -
--topic-list
:指定查詢的topic列表,逗號隔開 -
--command-config
:配置Admin Client -
--describe
:顯示詳情
示例:查詢指定topic磁盤信息 --topic-list testTopic
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testTopic
結(jié)果:
{
"version": 1,
"brokers": [
{
"broker": 0,
"logDirs": [
{
"logDir": "/tmp/kafka-logs",
"error": null,
"partitions": [
{
"partition": "testTopic-0",
"size": 27090690,
"offsetLag": 0,
"isFuture": false
}
]
}
]
}
]
}
示例:查詢指定Broker磁盤信息
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testTopic --broker-list 0
8、查看日志文件(kafka-dump-log.sh)
參數(shù) | 描述 | 例子 |
---|---|---|
--deep-iteration | ||
--files <String: file1, file2, ...> | 必需; 讀取的日志文件 | --files 0000009000.log |
--key-decoder-class | 如果設(shè)置,則用于反序列化鍵。這類應(yīng)實現(xiàn)kafka.serializer。解碼器特性。自定義jar應(yīng)該是在kafka/libs目錄中提供 | |
--max-message-size | 最大的數(shù)據(jù)量,默認:5242880 | |
--offsets-decoder | 如果設(shè)置了,日志數(shù)據(jù)將被解析為來自__consumer_offsets主題的偏移量數(shù)據(jù)。 | |
--print-data-log | 打印內(nèi)容 | |
--transaction-log-decoder | 如果設(shè)置,日志數(shù)據(jù)將被解析為來自__transaction_state主題的事務(wù)元數(shù)據(jù) | |
--value-decoder-class [String] | 如果已設(shè)置,則用于反序列化消息。這個類應(yīng)該實現(xiàn)kafka。序列化程序。解碼器特性。自定義jar應(yīng)該在kafka/libs目錄中可用。(默認值:kafka.serializer.StringDecoder) | |
--verify-index-only | 如果設(shè)置了,只需驗證索引日志,而不打印其內(nèi)容。 |
8.1、查詢Log文件
kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.log
下面每條消息都表示的是batchRecord
baseOffset為起始位置,lastOffset為終止位置,count為本次消息數(shù)量。
baseOffset: 1044628 lastOffset: 1044676 count: 49 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 27083143 CreateTime: 1691300649226 size: 1139 magic: 2 compresscodec: none crc: 2048338167 isvalid: true
baseOffset: 1044677 lastOffset: 1044773 count: 97 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 27084282 CreateTime: 1691300649228 size: 2228 magic: 2 compresscodec: none crc: 1293136921 isvalid: true
8.2、查詢Log文件具體信息(–print-data-log)
kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.log --print-data-log
下面為每條消息的具體信息:
| offset: 399407 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 0 payload: Message_399408
| offset: 399408 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 1 payload: Message_399409
| offset: 399409 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 2 payload: Message_399410
| offset: 399410 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 3 payload: Message_399411
| offset: 399411 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 4 payload: Message_399412
| offset: 399412 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 5 payload: Message_399413
| offset: 399413 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 6 payload: Message_399414
| offset: 399414 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 7 payload: Message_399415
| offset: 399415 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 8 payload: Message_399416
| offset: 399416 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 9 payload: Message_399417
8.3、查詢index文件
kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.index
offset為索引值,position為具體位置,可以看到大概每隔600條消息,就建立一個索引。配置項為log.index.size.max.bytes,來控制創(chuàng)建索引的大小;
offset: 972865 position: 25163202
offset: 973495 position: 25179579
offset: 974125 position: 25195956
offset: 974755 position: 25212333
offset: 975385 position: 25228710
offset: 976015 position: 25245087
offset: 976645 position: 25261464
offset: 977275 position: 25277841
8.4、查詢timeindex文件
kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.timeindex
timestamp: 1691292274425 offset: 475709
timestamp: 1691292274426 offset: 476947
timestamp: 1691292274427 offset: 478255
timestamp: 1691292274428 offset: 479543
timestamp: 1691292274429 offset: 480848
timestamp: 1691292274430 offset: 481767
timestamp: 1691292274431 offset: 483209
timestamp: 1691292274432 offset: 484869
timestamp: 1691292274433 offset: 486408
9、副本一致性驗證(kafka-replica-verification.sh)
可以使用 kafka-replica-verification.sh 工具來驗證集群分區(qū)副本的一致性。它會從指定分區(qū) 的副本上獲取消息,并檢查所有副本是否具有相同的消息。我們必須使用正則表達式將待 驗證主題的名字傳給它。如果不提供這個參數(shù),它會驗證所有的主題。除此之外,還需要 顯式地提供 broker 的地址清單。
示例:對 broker 1 和 broker 2 上以 my- 開頭的主題副本進行驗證。
kafka-replica-verification.sh --broker-list kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
10、控制臺消費者(kafka-console-consumer.sh)
kafka-console-consumer.sh 工具提供了一種從一個或多個主題上讀取消息的方式。消息被打 印在標準輸出上,消息之間以空行分隔。默認情況下,它會打印沒有經(jīng)過格式化的原始消 息字節(jié)(使用 DefaultFormatter)。它有很多可選參數(shù),其中有一些基本的參數(shù)是必選的。
參數(shù) | 描述 | 例子 |
---|---|---|
--group | 指定消費者所屬組的ID | |
--topic | 被消費的topic | |
--partition | 指定分區(qū) ;除非指定–offset,否則從分區(qū)結(jié)束(latest)開始消費 | --partition 0 |
--offset | 執(zhí)行消費的起始offset位置 ;默認值: latest; /latest /earliest /偏移量 | --offset 10 |
--whitelist | 正則表達式匹配topic;--topic就不用指定了; 匹配到的所有topic都會消費; 當(dāng)然用了這個參數(shù),--partition --offset等就不能使用了 | |
--consumer-property | 將用戶定義的屬性以key=value的形式傳遞給使用者 | --consumer-property group.id=test-consumer-group |
--consumer.config | 消費者配置屬性文件請注意,[consumer-property]優(yōu)先于此配置 | --consumer.config config/consumer.properties |
--property | 初始化消息格式化程序的屬性 | print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer> |
--from-beginning | 從存在的最早消息開始,而不是從最新消息開始,注意如果配置了客戶端名稱并且之前消費過,那就不會從頭消費了 | |
--max-messages | 消費的最大數(shù)據(jù)量,若不指定,則持續(xù)消費下去 | --max-messages 100 |
--skip-message-on-error | 如果處理消息時出錯,請?zhí)^它而不是暫停 | |
--isolation-level | 設(shè)置為read_committed以過濾掉未提交的事務(wù)性消息,設(shè)置為read_uncommitted以讀取所有消息,默認值:read_uncommitted | |
--formatter | 格式化器: kafka.tools.DefaultMessageFormatter有一些非常有用的配置選項,這些選項可以通過--property 命令行參數(shù)傳給它(print.timestamp:如果被設(shè)為 true,就會打印每個消息的時間戳。 print.key:如果被設(shè)為 true,除了打印消息的值之外,還會打印消息的鍵。 key.separator: 指定打印消息的鍵和消息的值所使用的分隔符。 line.separator: 指定消息之間的分隔符。 key.deserializer: 指定打印消息的鍵所使用的反序列化器類名。 value.deserializer: 指定打印消息的值所使用的反序列化器類名。 反序列化類必須實現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口,控制 臺消費者會調(diào)用它們的 toString() 方法獲取輸出結(jié)果。一般來說,在使用 kafka_console_ consumer.sh 工具之前,需要通過環(huán)境變量 CLASSPATH 將這些實現(xiàn)類添加到類路徑里。) kafka.tools.LoggingMessageFormatter將消息輸出到日志,而不是輸出到標準的輸出設(shè)備。日志級別為 INFO,并且包含了時 間戳、鍵和值。 kafka.tools.NoOpMessageFormatter讀取消息但不打印消息。 kafka.tools.ChecksumMessageFormatter只打印消息的校驗和。 |
10.1、新客戶端從頭消費(–from-beginning)
注意這里是新客戶端,如果之前已經(jīng)消費過了是不會從頭消費的) 下面沒有指定客戶端名稱,所以每次執(zhí)行都是新客戶端都會從頭消費
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
10.2、正則表達式匹配topic進行消費(–whitelist)
示例:消費所有的test開頭的topic,監(jiān)聽新的消費:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'test.*'
示例:費所有的test開頭的topic,并且從頭消費:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'test.*' --from-beginning
10.3、顯示key進行消費(–property print.key=true)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --property print.key=true
10.4、指定分區(qū)消費(–partition)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --partition 0
10.5、定起始偏移量消費(–offset)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --partition 0 --offset 100
10.6、給客戶端命名(–group)
注意給客戶端命名之后,如果之前有過消費,那么–from-beginning 就不會再從頭消費了
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --group test-group
10.7、添加客戶端屬性(–consumer-property)
這個參數(shù)也可以給客戶端添加屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎(chǔ)上還加上屬性–group test-group 那肯定不行
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer-property group.id=test-consumer-group
10.8、添加客戶端屬性(–consumer.config)
跟–consumer-property 一樣的性質(zhì),都是添加客戶端的屬性,不過這里是指定一個文件,把屬性寫在文件里面, --consumer-property 的優(yōu)先級大于 --consumer.config
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer.config config/consumer.properties
10.9、讀取偏移量主題(舊版本)
有時候,我們需要知道提交的消費者群組偏移量是多少,比如某個特定的群組是否在提交 偏移量,或者偏移量提交的頻度。這個可以通過讓控制臺消費者讀取一個特殊的內(nèi)部主題 __consumer_offsets 來實現(xiàn)。所有消費者的偏移量都以消息的形式寫到這個主題上。為了 解碼這個主題的消息,需要使用 kafka.coordinator.GroupMetadataManager$OffsetsMessage Formatter 這個格式化器。
示例:從偏移量主題讀取一個消息。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 10
11、控制臺生產(chǎn)者(kafka-console-producer.sh)
與控制臺消費者類似,kafka-console-producer.sh 工具可以用于向 Kafka 主題寫入消息。默認情況下,該工具將命令行輸入的每一行視為一個消息,消息的鍵和值以 Tab 字符分隔 (如果沒有出現(xiàn) Tab 字符,那么鍵就是 null)。
參數(shù) | 值類型 | 說明 | 有效值 |
---|---|---|---|
--bootstrap-server | String | 要連接的服務(wù)器必需(除非指定--broker-list) | 如:host1:prot1,host2:prot2 |
--topic | String | (必需)接收消息的主題名稱 | |
--batch-size | Integer | 單個批處理中發(fā)送的消息數(shù) | 200(默認值) |
--compression-codec | String | 壓縮編解碼器 | none、gzip(默認值)snappy、lz4、zstd |
--max-block-ms | Long | 在發(fā)送請求期間,生產(chǎn)者將阻止的最長時間 | 60000(默認值) |
--max-memory-bytes | Long | 生產(chǎn)者用來緩沖等待發(fā)送到服務(wù)器的總內(nèi)存 | 33554432(默認值) |
--max-partition-memory-bytes | Long | 為分區(qū)分配的緩沖區(qū)大小 | 16384 |
--message-send-max-retries | Integer | 最大的重試發(fā)送次數(shù) | 3 |
--metadata-expiry-ms | Long | 強制更新元數(shù)據(jù)的時間閾值(ms) | 300000 |
--producer-property | String | 將自定義屬性傳遞給生成器的機制 | 如:key=value |
--producer.config | String | 生產(chǎn)者配置屬性文件[--producer-property]優(yōu)先于此配置 配置文件完整路徑 | |
--property | String | 自定義消息讀取器 | parse.key=true/false key.separator=<key.separator>ignore.error=true/false |
--request-required-acks | String | 生產(chǎn)者請求的確認方式 | 0、1(默認值)、all |
--request-timeout-ms | Integer | 生產(chǎn)者請求的確認超時時間 | 1500(默認值) |
--retry-backoff-ms | Integer | 生產(chǎn)者重試前,刷新元數(shù)據(jù)的等待時間閾值 | 100(默認值) |
--socket-buffer-size | Integer | TCP接收緩沖大小 | 102400(默認值) |
--timeout | Integer | 消息排隊異步等待處理的時間閾值 | 1000(默認值) |
--sync | 同步發(fā)送消息 | ||
--version | 顯示 Kafka 版本 | 不配合其他參數(shù)時,顯示為本地Kafka版本 | |
--help | 打印幫助信息 |
11.1、生產(chǎn)無key消息
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testTopic --producer.config ../config/producer.properties
11.2、生產(chǎn)有key消息(–property parse.key=true)
默認消息key與消息value間使用“Tab鍵”進行分隔,所以消息key以及value中切勿使用轉(zhuǎn)義字符(\t):
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testTopic --producer.config ../config/producer.properties --property parse.key=true
12、持續(xù)批量推送消息(kafka-verifiable-producer.sh)
該腳本可以生產(chǎn)測試數(shù)據(jù)發(fā)送到指定topic,并將數(shù)據(jù)已json格式打印到控制臺。
- –topic:主題名稱
- –broker-list:broker列表, HOST1:PORT1,HOST2:PORT2,…
- –max-messages:最大消息數(shù)量,默認-1,一直生產(chǎn)消息
- –throughput:設(shè)置吞吐量,默認-1
- –acks:指定分區(qū)中必須有多少個副本收到這條消息,才算消息發(fā)送成功,默認-1
- –producer.config:配置文件
- –message-create-time:設(shè)置消息創(chuàng)建的時間,時間戳
- –value-prefix:設(shè)置消息前綴
- –repeating-keys:key從0開始,每次遞增1,直到指定的值,然后再從0開始
12.1、單次發(fā)送100條消息(–max-messages 100)
kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic --max-messages 100
12.2、每秒發(fā)送最大吞吐量不超過10 (–throughput 10)
推送消息時的吞吐量,單位messages/sec。默認為-1,表示沒有限制
kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic --throughput 10
12.3、發(fā)送的消息體帶前綴(–value-prefix)
kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic --value-prefix 666
注意 --value-prefix 666必須是整數(shù),發(fā)送的消息體的格式是加上一個 點號. 例如: 666.
其他參數(shù): --producer.config CONFIG_FILE 指定producer的配置文件 --acks ACKS 每次推送消息的ack值,默認是-1
13、持續(xù)批量拉取消息(kafka-verifiable-consumer.sh)
- –bootstrap-server:指定kafka服務(wù) 指定連接到的kafka服務(wù);
- –topic:指定消費的topic
- –group-id:消費者id;不指定的話每次都是新的組id
- –group-instance-id:消費組實例ID,唯一值
- –max-messages:單次最大消費的消息數(shù)量
- –enable-autocommit:是否開啟offset自動提交;默認為false
- –reset-policy:當(dāng)以前沒有消費記錄時,選擇要拉取offset的策略,可以是earliest, latest,none。默認是earliest
- –assignment-strategy:consumer分配分區(qū)策略,默認是org.apache.kafka.clients.consumer.RangeAssignor
- –consumer.config:指定consumer的配置文件
13.1、持續(xù)消費
新的groupId,默認從頭消費:
kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --group-id test_consumer --topic testTopic
13.2、單次最大消費(–max-messages)
kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --group-id test_consumer --topic testTopic --max-messages 10
14、生產(chǎn)者壓力測試(kafka-producer-perf-test.sh)
參數(shù) | 描述 | 例子 |
---|---|---|
--topic | 指定消費的topic | |
--num-records | 發(fā)送多少條消息 | |
--throughput | 每秒消息最大吞吐量 | |
--producer-props | 生產(chǎn)者配置, k1=v1,k2=v2 | --producer-props bootstrap.servers= localhost:9092,client.id=test_client |
--producer.config | 生產(chǎn)者配置文件 | --producer.config config/producer.propeties |
--print-metrics | 在test結(jié)束的時候打印監(jiān)控信息,默認false | --print-metrics true |
--transactional-id | 指定事務(wù) ID,測試并發(fā)事務(wù)的性能時需要,只有在 --transaction-duration-ms > 0 時生效,默認值為 performance-producer-default-transactional-id | |
--transaction-duration-ms | 指定事務(wù)持續(xù)的最長時間,超過這段時間后就會調(diào)用 commitTransaction 來提交事務(wù),只有指定了 > 0 的值才會開啟事務(wù),默認值為 0 | |
--record-size | 一條消息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定 | |
--payload-file | 指定消息的來源文件,只支持 UTF-8 編碼的文本文件,文件的消息分隔符通過 --payload-delimeter 指定,默認是用換行\(zhòng)nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的消息 | |
--payload-delimeter | 如果通過 --payload-file 指定了從文件中獲取消息內(nèi)容,那么這個參數(shù)的意義是指定文件的消息分隔符,默認值為 \n,即文件的每一行視為一條消息;如果未指定--payload-file則此參數(shù)不生效;發(fā)送消息的時候是隨機送文件里面選擇消息發(fā)送的; |
- 發(fā)送1024條消息 --num-records 100并且每條消息大小為1KB–record-size 1024 最大吞吐量每秒10000條–throughput 100
kafka-producer-perf-test.sh --topic testTopic --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024
2.用指定消息文件–payload-file 發(fā)送100條消息最大吞吐量每秒100條–throughput 100
先配置好消息文件batchmessage.txt:
1
2
3
4
5
6
7
8
9
0
然后執(zhí)行命令 發(fā)送的消息會從batchmessage.txt里面隨機選擇; 注意這里我們沒有用參數(shù)–payload-delimeter指定分隔符,默認分隔符是\n換行;
kafka-producer-perf-test.sh --topic testTopic --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9092 --payload-file config/batchmessage.txt
15、消費者壓力測試(kafka-consumer-perf-test.sh)
參數(shù) | 描述 | 例子 |
---|---|---|
--bootstrap-server | ||
--consumer.config | 消費者配置文件 | |
--date-format | 結(jié)果打印出來的時間格式化 | 默認:yyyy-MM-dd HH:mm:ss:SSS |
--fetch-size | 單次請求獲取數(shù)據(jù)的大小 | 默認1048576 |
--topic | 指定消費的topic | |
--from-latest | ||
--group | 消費組ID | |
--hide-header | 如果設(shè)置了,則不打印header信息 | |
--messages | 需要消費的數(shù)量 | |
--num-fetch-threads | feth 數(shù)據(jù)的線程數(shù)(廢棄無效) | 默認:1 |
--print-metrics | 結(jié)束的時候打印監(jiān)控數(shù)據(jù) | |
--show-detailed-stats | 如果設(shè)置,則按照--report_interval配置的方式報告每個報告間隔的統(tǒng)計信息 | |
--threads | 消費線程數(shù);(廢棄無效) | 默認 10 |
--reporting-interval | 打印進度信息的時間間隔(以毫秒為單位) |
消費100條消息 --messages 100
kafka-consumer-perf-test.sh -topic testTopic --bootstrap-server localhost:9092 --messages 100
16、常用操作
16.1、查看 topic 指定分區(qū) offset 的最大值或最小值
time 為 -1 時表示最大值,為 -2 時表示最小值
kafka-run-class.sh kafka.tools.GetOffsetShell --topic testTopic --time -1 --broker-list 127.0.0.1:9092 --partitions 0
16.2、查詢topic的offset的范圍
查詢offset最小值:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic --time -2
查詢offset最大值:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic --time -1
16.3、重置消費者offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --execute --to-offset NEW_OFFSET --topic testTopic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --execute --to-earliest/--to-latest --topic testTopic
16.4、刪除topic下的數(shù)據(jù)
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic testTopic --config cleanup.policy=delete
16.5、給指定TOPIC設(shè)置消息存儲時間 – 針對數(shù)據(jù)量大,磁盤小的情況
查看某一個topic設(shè)置過期時間:文章來源:http://www.zghlxwxcb.cn/news/detail-720469.html
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-name testTopic --entity-type topics
單獨對某一個topic設(shè)置過期時間文章來源地址http://www.zghlxwxcb.cn/news/detail-720469.html
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-name testTopic --entity-type topics --add-config retention.ms=86400000
到了這里,關(guān)于Kafka——管理Kafka(命令行工具)詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!