国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka——管理Kafka(命令行工具)詳解

這篇具有很好參考價值的文章主要介紹了Kafka——管理Kafka(命令行工具)詳解。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

管理Kafka

Kafka 提供了一些命令行工具,用于管理集群的變更。這些工具使用 Java 類實現(xiàn),Kafka 提供了一些腳本來調(diào)用這些 Java 類。不過,它們只提供了一些基本的功能,無法完成那 些復(fù)雜的操作。

雖然 Kafka 實現(xiàn)了操作主題的認證和授權(quán)控制,但還不支持集群的其他大部 分操作。也就是說,在沒有認證的情況下也可以使用這些命令行工具,在沒 有安全檢查和審計的情況下也可以執(zhí)行諸如主題變更之類的操作。

1、主題操作(kafka-topic.sh)

使用 kafka-topics.sh 工具可以執(zhí)行主題的大部分操作(配置變更部分已經(jīng)被棄用并被移動 到 kafka-configs.sh 工具當(dāng)中)。我們可以用它創(chuàng)建、修改、刪除和查看集群里的主題。要使用該工具的全部功能,需要通過 --bootstrap-server 參數(shù)提供broker的連接字符串。

1.1、創(chuàng)建主題(–create)

在集群里創(chuàng)建一個主題需要用到 3 個參數(shù)。這些參數(shù)是必須提供的,盡管有些已經(jīng)有了broker 級別的默認值。

  1. 主題名字:題名字可以包含字母、數(shù)字、下劃線以及英文狀態(tài)下的破折號和句號。主題名字的開頭部分包含兩個下劃線是合法的,但不建議這么做。具有這種 格式的主題一般是集群的內(nèi)部主題(比如 __consumer_offsets 主題用于保存 消費者群組的偏移量)。也不建議在單個集群里使用英文狀態(tài)下的句號和下 劃線來命名,因為主題的名字會被用在度量指標上,句號會被替換成下劃線
    (比如“topic.1”會變成“topic_1”)。
  2. 復(fù)制系數(shù):主題的副本數(shù)量。
  3. 分區(qū):主題的分區(qū)數(shù)量

語法:

kafka-topics.sh --bootstrap-server <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>

示例:

# 使用以下命令創(chuàng)建一個叫作my-topic的主題,主題包含8個分區(qū),每個分區(qū)擁有1個副本。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 1 -- partitions 8

忽略重復(fù)創(chuàng)建主題的錯誤
在自動化系統(tǒng)里調(diào)用這個腳本時,可以使用 --if-not-exists 參數(shù),這樣即 使主題已經(jīng)存在,也不會拋出重復(fù)創(chuàng)建主題的錯誤。

1.2、增加分區(qū)(–alter)

主題基于分區(qū)進行伸縮和復(fù)制,增加分區(qū)主要是 為了擴展主題容量或者降低單個分區(qū)的吞吐量。如果要在單個消費者群組內(nèi)運行更多的消 費者,那么主題數(shù)量也需要相應(yīng)增加,因為一個分區(qū)只能由群組里的一個消費者讀取。

  • 調(diào)整基于鍵的主題
    從消費者角度來看,為基于鍵的主題添加分區(qū)是很困難的。因為如果改變了 分區(qū)的數(shù)量,鍵到分區(qū)之間的映射也會發(fā)生變化。所以,對于基于鍵的主題 來說,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對其進行調(diào)整。
  • 忽略主題不存在的錯誤
    在使用 --alter 命令修改主題時,如果指定了 --if-exists 參數(shù),主題不存 在的錯誤就會被忽略。如果要修改的主題不存在,該命令并不會返回任何錯 誤。在主題不存在的時候本應(yīng)該創(chuàng)建主題,但它卻把錯誤隱藏起來,因此不 建議使用這個參數(shù)。

示例:

#將my-topic主題的分區(qū)增加到16
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16

1.3、減少分區(qū)數(shù)量(無)

我們無法減少主題的分區(qū)數(shù)量。因為如果刪除了分區(qū),分區(qū)里的數(shù)據(jù)也一并 被刪除,導(dǎo)致數(shù)據(jù)不一致。我們也無法將這些數(shù)據(jù)分配給其他分區(qū),因為這 樣做很難,而且會出現(xiàn)消息亂序。所以,如果一定要減少分區(qū)數(shù)量,只能刪 除整個主題,然后重新創(chuàng)建它。

1.4、刪除主題(–delete)

如果一個主題不再被使用,只要它還存在于集群里,就會占用一定數(shù)量的磁盤空間和文件 句柄。把它刪除就可以釋放被占用的資源。為了能夠刪除主題,broker 的 delete.topic. enable 參數(shù)必須被設(shè)置為 true。如果該參數(shù)被設(shè)為 false,刪除主題的請求會被忽略。

示例:

# 刪除my-topic主題
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

1.5、列出集群里的所有主題(–list)

可以使用主題工具列出集群里的所有主題。每個主題占用一行輸出,主題之間沒有特定的順序。

示例:

kafka-topics.sh --bootstrap-server localhost:9092 --list

kafka管理工具,kafka,kafka,命令行工具

1.6、列出主題詳細信息(–describe)

主題工具還能用來獲取主題的詳細信息。信息里包含了分區(qū)數(shù)量、主題的覆蓋配置以及 每個分區(qū)的副本清單。如果通過 --topic 參數(shù)指定特定的主題,就可以只列出指定主題 的詳細信息。

示例:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

kafka管理工具,kafka,kafka,命令行工具

describe 命令還提供了一些參數(shù),用于過濾輸出結(jié)果,這在診斷集群問題時會很有用。不要為這些參數(shù)指定 --topic 參數(shù)(因為這些參數(shù)的目的是為了找出集群里所有滿足條件的 主題和分區(qū))。這些參數(shù)也無法與 list 命令一起使用(最后一部分會詳細說明原因)。

使用 --topics-with-overrides 參數(shù)可以找出所有包含覆蓋配置的主題,它只會列出包含了 與集群不一樣配置的主題。

有兩個參數(shù)可用于找出有問題的分區(qū)。
--under-replicated-partitions 參數(shù)可以列出 所有包含不同步副本的分區(qū)。
--unavailable-partitions 參數(shù)可以列出所有沒有首領(lǐng) 的分區(qū),這些分區(qū)已經(jīng)處于離線狀態(tài),對于生產(chǎn)者和消費者來說是不可用的。

# 列出包含不同步副本的分區(qū)
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions

1.7、修改或刪除配置(–config)

覆蓋已有的topic參數(shù):

kafka-topics.sh --bootstrap-server zk_host:port  --topic TEST --alter --config flush.messages=1

刪除topic級別配置參數(shù):

kafka-topics.sh --bootstrap-server zk_host:port --alter --topic TEST --delete-config flush.messages=1

2、消費者群組(kafka- consumer-groups.sh )

在 Kafka 里,有兩個地方保存著消費者群組的信息。對于舊版本的消費者來說,它們的信 息保存在 Zookeeper 上;對于新版本的消費者來說,它們的信息保存在 broker 上。kafka- consumer-groups.sh 工具可以用于列出上述兩種消費者群組。它也可以用于刪除消費者群 組和偏移量信息,不過這個功能僅限于舊版本的消費者群組(信息保存在 Zookeeper 上)。 在對舊版本的消費者群組進行操作時,需要通過 --zookeeper 參數(shù)指定 Zookeeper 的地址; 在對新版本的消費者群組進行操作時,則需要使用 --bootstrap-server 參數(shù)指定 broker 的 主機名和端口。

語法:

kafka-consumer-groups.sh [-h] [--bootstrap-server <server to use>] [--command-config <command configuration property file>]
  [--group <consumer-group>] [--new-consumer | --zookeeper] [--describe] [--delete] [--reset-offsets]
  [--reset-offsets-by-duration <duration controlling how far back to reset>] [--reset-offsets-by-topic <topic to reset>]
  [--reset-offsets-by-times] [--all-topics] [--topic <topic>] [--exclude-internal] [--dry-run]
  • -–bootstrap-server:Kafka集群的地址,多個地址使用逗號分隔。
  • -–command-config:kafka的安全認證配置文件路徑。
  • -–group:指定要操作的消費組。
  • -–describe:列出消費組的詳情。
  • -–delete:刪除消費組。
  • -–reset-offsets:重置消費組的偏移量。
  • -–reset-offsets-by-duration:指定重置的時間(從現(xiàn)在往前)。
  • -–reset-offsets-by-topic:指定重置的topic和partition。
  • -–reset-offsets-by-times:指定重置的時間點。
  • -–new-consumer:使用新消費者API。
  • -–zookeeper:使用舊的Zookeeper API。
  • -–all-topics:列出所有topic的所有消費組。
  • -–topic:指定要操作的topic。
  • -–exclude-internal:不列出.kafka/*的topic。
  • -–dry-run:僅輸出要執(zhí)行的操作,不實際運行。

2.1、列出群組(–list)

在使用舊版本的消費者客戶端時,可以使用 --zookeeper 和 --list 參數(shù)列出消費者群 組;在使用新版本的消費者客戶端時,則要使用 --bootstrap-server、–list 參數(shù)。

示例:列出舊版本的消費者群組。

kafka-consumer-groups.sh --zookeeper localhost:2181/kafka-cluster --list

示例:列出新版本的消費者群組。

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

2.2、獲取群組詳細信息(–describe)

使用 --describe,并通過 --group 指定特定的群組, 就可以獲取該群組的詳細信息。它會列出群組里所有主題的信息和每個分區(qū)的偏移量。

示例:獲取消費者群組testGroup的詳細信息

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

kafka管理工具,kafka,kafka,命令行工具

字段 描述
GROUP 消費者群組的名字
TOPIC 正在被讀取的主題名字
PARTITION 正在被讀取的分區(qū) ID
CURRENT-OFFSET 消費者群組最近提交的偏移量,也就是消費者在分區(qū)里讀取的當(dāng)前位置
LOG-END-OFFSET 當(dāng)前高水位偏移量,也就是最近一個被讀取消息的偏移量,同時也是最近一個被提 交到集群的偏移量
LAG 消費者的 CURRENT-OFFSET 和 broker 的 LOG-END-OFFSET 之間的差距
CONSUMER-ID 消費者群組里正在讀取該分區(qū)的消費者。這是一個消費者的 ID
HOST 消費者主機IP

2.3、偏移量管理(–reset-offsets)

能夠執(zhí)行成功的一個前提是 消費組這會是不可用狀態(tài);

  1. 執(zhí)行模式:
    –dry-run:這個參數(shù)表示預(yù)執(zhí)行,會打印出來將要處理的結(jié)果;
    –excute:真正執(zhí)行;
  1. 執(zhí)行范圍:
    –group:指定具體的消費組;
    –all-group:指定所有的消費組;
  1. 重置模式:
    相關(guān)重置Offset的模式;
參數(shù) 描述 示例
--to-earliest 重置offset到最開始的offset(未被刪除的最早的offset)
--to-current 直接重置offset到當(dāng)前的offset,也就是LOE
--to-latest 重置到最后一個offset
--to-detetime 重置到指定時間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss --to-datetime “2021-6-26T00:00:00.000”
--to-offset 重置到指定的offset,但是通常情況下,匹配到多個分區(qū),這里是將匹配到的所有分區(qū)都重置到這一個值;如果目標最大offset < to-offset,這個時候重置為目標最大offset;如果目標最小offset > to-offset,則重置為最?。?/td> --to-offset 300
--shift-by 按照偏移量增加或者減少多少個offsete;正數(shù)向前增加、負數(shù)向后退 --shift-by 100、--shift-by -100
--from-file 根據(jù)CVS文檔來重置

示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量為300。

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --topic testTopic --to-offset 300 --execute

示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量向前移動100。

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --topic testTopic --shift-by -100 --execute

示例:通過cvs文檔配置消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量為10000

offsets.cvs:
格式為:Topic,分區(qū)號,偏移量

testTopic,0,10000
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --from-file offsets.cvs --execute

2.4、刪除偏移量(–delete-offsets)

能夠執(zhí)行成功的一個前提是 消費組這會是不可用狀態(tài);
偏移量被刪除了之后,Consumer Group下次啟動的時候,會從頭消費;

示例:將消費組”testGroup”的”testTopic”上的所有分區(qū)的偏移量刪除

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --topic testTopic --delete-offsets

2.4、查詢消費者成員信息(–members)

示例:

kafka-consumer-groups.sh --describe --all-groups --bootstrap-server localhost:9092 --members

kafka管理工具,kafka,kafka,命令行工具

2.5、查詢消費者狀態(tài)信息(–state)

示例:

kafka-consumer-groups.sh --describe --all-groups --bootstrap-server localhost:9092 --state

kafka管理工具,kafka,kafka,命令行工具

2.6、刪除消費組(–delete)

想要刪除消費組前提是這個消費組的所有客戶端都停止消費/不在線才能夠成功刪除;否則會報下面異常

示例:刪除指定消費組

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group testGroup 

示例:刪除所有消費組

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --all-groups

3、動態(tài)配置變更(kafka-configs.sh)

kafka-configs.sh可以在集群處于運行狀態(tài)時覆蓋主題配置和客戶端的配額參數(shù)。這樣就可以 為特定的主題和客戶端指定配置參數(shù)。一旦設(shè)置完畢,它們就成為集群的永久配置。

參數(shù) 說明
--zookeeper 使用zk配置操作集群,支持三種配置類型topics、clients、users
--bootstrap 使用broker連接方式、僅支持一種配置類型brokers,格式為brokerIp01:port,brokerIp02:port,…
--command-config 包含要傳遞給Admin Client的配置的屬性文件。僅與--bootstrap-server選項一起使用,用于描述和更改代理配置
--alter 指定需要修改配置
--describe 列舉出指定的實體配置
--entity-type 實體配置類型(topics、users、brokers)
--entity-name entity名稱(topicName、clientId、userId、brokerId)
--add-config 要添加的鍵值對配置。方括號可用于對包含逗號的值進行分組:‘k1=v1,k2=[v1,v2,v2],k3=v3’
--entity-default clients/users/brokers的默認entity-name,生產(chǎn)zk相對路徑的節(jié)點<default>
--delete-config 指定配置項刪除’k1,k2’
--force 禁止控制臺提示

3.1、Brokers類型動態(tài)配置(–entity-type brokers)

kafka管理工具,kafka,kafka,命令行工具

kafka管理工具,kafka,kafka,命令行工具
kafka管理工具,kafka,kafka,命令行工具
配置brokers只能指定–bootstrap-server,zk不支持。

3.1.1、增加配置項(–add-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default  --add-config 'max.connections.per.ip=200,max.connections.per.ip.overrides=[ip1:100,ip2:120]' 

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name  $brokerId --add-config 'max.connections.per.ip=200,max.connections.per.ip.overrides=[ip1:100]' 
3.1.2、刪除配置項(–delete-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default  --delete-config  'max.connections.per.ip,max.connections.per.ip.overrides' 

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name  $brokerId --delete-config  'max.connections.per.ip,max.connections.per.ip.overrides' 
3.1.3、列出配置項詳情(–describe)
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name $brokerId --describe

3.2、Topics類型動態(tài)配置(–entity-type topics)

Topics類型配置是Brokers類型配置的子集,Brokers類型包含Topics類型所有配置,brokers只是在topics配置項前加了前綴。

kafka管理工具,kafka,kafka,命令行工具
kafka管理工具,kafka,kafka,命令行工具

3.2.1、增加配置項(–add-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --add-config 'max.message.bytes=50000000,flush.messages=50000,flush.ms=5000'
3.2.2、刪除配置項(–delete-config)
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name testTopic --delete-config 'max.message.bytes,flush.messages,flush.ms'
3.2.3、列出配置項(–describe)
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testTopic --describe
3.2.4、示例

示例:將主題testTopic的消息保留時間設(shè)為一個小時(3600000ms)

kafka-configs.sh --bootstrap-server localhost:9092  --alter --entity-type topics --entity-name testTopic --add-config 'retention.ms=3600000'

示例:刪除retention.ms配置

kafka-configs.sh --bootstrap-server localhost:9092  --alter --entity-type topics --entity-name testTopic --delete-config 'retention.ms'

3.3、Clients類型動態(tài)配置(–entity-type clients)

對于 Kafka 客戶端來說,只能覆蓋生產(chǎn)者配額和消費者配額參數(shù)。這兩個配額都以字節(jié)每 秒為單位,表示客戶端在每個 broker 上的生產(chǎn)速率或消費速率。也就是說,如果集群里有 5 個 broker,生產(chǎn)者的配額是 10MB/s,那么它可以以 10MB/s 的速率在單個 broker 上生成 數(shù)據(jù),總共的速率可以達到 50MB/s。

配置項 描述
producer_bytes_rate 單個生產(chǎn)者每秒種可以往單個broker上生成的消息字節(jié)數(shù)
consumer_bytes_rate 單個消費者每秒鐘可以從單個broker讀取的消息字節(jié)數(shù)
3.3.1、新增配置項(–add-config)

示例:broker內(nèi)所有clientId累加總和最大producer生產(chǎn)速率為20MB/sec

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients --entity-default  --add-config 'producer_byte_rate=20971520'

示例:broker內(nèi)clientA的最大producer生產(chǎn)速率為20MB/sec

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients  --entity-name clientA  --add-config 'producer_byte_rate=20971520'
3.3.2、刪除配置項(–delete-config)

示例:刪除broker內(nèi)所有clientId的配置項producer_byte_rate

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type clients  --entity-default  --delete-config 'producer_byte_rate'
3.3.3、列出配置項(–describe)

示例:列出broker內(nèi)所有clientId的配置

kafka-configs.sh --bootstrap-server localhost:9092 --entity-type clients --entity-default --describe

4、首領(lǐng)選舉(kafka-leader-election)

參數(shù):

參數(shù) 描述 例子
--bootstrap-server 指定kafka服務(wù) 指定連接到的kafka服務(wù) --bootstrap-server localhost:9092
--topic 指定Topic,此參數(shù)跟--all-topic-partitions和path-to-json-file 三者互斥
--partition 指定分區(qū),跟--topic搭配使用
--election-type 兩個選舉策略(PREFERRED: 優(yōu)先副本選舉,如果第一個副本不在線的話會失敗;UNCLEAN: 策略)
--all-topic-partitions 所有topic所有分區(qū)執(zhí)行Leader重選舉; 此參數(shù)跟--topic和path-to-json-file 三者互斥
--path-to-json-file 配置文件批量選舉,此參數(shù)跟--topic和all-topic-partitions 三者互斥

4.1、指定Topic指定分區(qū)用重新PREFERRED:優(yōu)先副本策略 進行Leader重選舉

示例:指定testTopic主題的0分區(qū)重新選舉

kafka-leader-election.sh --bootstrap-server localhost:9092 --topic testTopic --election-type PREFERRED --partition 0

4.2、所有Topic所有分區(qū)用重新PREFERRED:優(yōu)先副本策略 進行Leader重選舉

示例:所有Topic重新選舉

kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

4.3、設(shè)置配置文件批量指定topic和分區(qū)進行Leader重選舉

配置leader-election.json文件:

{
  "partitions": [
    {
      "topic": "testTopic1",
      "partition": 1
    },
    {
      "topic": "testTopic2",
      "partition": 2
    }
  ]
}
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred  --path-to-json-file config/leader-election.json
 

5、分區(qū)分配(kafka-reassign-partitions.sh)

Kafka系統(tǒng)提供了一個分區(qū)重新分配工具(kafka-reassign-partitions.sh),該工具可用于在Broker之間遷移分區(qū)。理想情況下,將確保所有Broker的數(shù)據(jù)和分區(qū)均勻分配。分區(qū)重新分配工具無法自動分析Kafka群集中的數(shù)據(jù)分布并遷移分區(qū)以實現(xiàn)均勻的負載均衡。因此,管理員在操作的時候,必須弄清楚應(yīng)該遷移哪些Topic或分區(qū)。

5.1、分區(qū)遷移

分區(qū)重新分配工具可以在3種互斥模式下運行:

  1. --generate:在此模式下,給定Topic列表和Broker列表,該工具會生成候選重新??分配,以將指定Topic的所有分區(qū)遷移到新Broker中。此選項僅提供了一種方便的方法,可在給定Topic和目標Broker列表的情況下生成分區(qū)重新分配計劃。
  2. --execute:在此模式下,該工具將根據(jù)用戶提供的重新分配計劃啟動分區(qū)的重新分配。 (使用–reassignment-json-file選項)。由管理員手動制定自定義重新分配計劃,也可以使用–generate選項提供。
  3. --verify:在此模式下,該工具將驗證最后一次–execute期間列出的所有分區(qū)的重新分配狀態(tài)。狀態(tài)可以有成功、失敗或正在進行等狀態(tài)。

使用該工具需要經(jīng)過三個步驟:

  • 第一步,根據(jù) broker 清單和主題清單生成一組遷移步驟;
  • 第二步,執(zhí)行這些遷移步驟。
  • 第三個步驟是可選的,也就是可以使用生成的遷移步驟驗證分區(qū)重分配的進度和完成情況。

案例:原兩臺機器,broker.id分別為0和2。新添加一條機器,broker.id為3。確定要遷移的topic,topic有兩個分區(qū),partition:0分區(qū)存儲在0,2broker上;先要將partiiton:0分區(qū)遷移到2,3上

步驟一:為了生成遷移步驟,需要先創(chuàng)建一個包含了主題清單的 JSON 文件topics-to-move.json,文件格式如下(目前的版本號都是 1)

{
  "topics": [
    {
      "topic": "test2"
    }
  ],
  "version": 1
}

執(zhí)行g(shù)enerate:

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3" --generate

"2,3"為目標broker.id,填寫的個數(shù)不能小于副本數(shù)量。運行結(jié)果(生產(chǎn)兩段腳本:當(dāng)期分區(qū)副本分配和建議副本分配配置):
kafka管理工具,kafka,kafka,命令行工具

步驟二:這個時候,分區(qū)操作還沒有開始,它只是告訴你當(dāng)前分區(qū)副本配置和建議的分區(qū)副本配置。應(yīng)該保存當(dāng)前分區(qū)副本配置,以防您想要回滾到它。建議的分區(qū)副本配置應(yīng)該保存在一個json文件(例如topic-reassignment.json)

如對任務(wù)編寫熟悉:可以直接跳過第一步;手動編寫建議的分區(qū)副本配置,執(zhí)行execute:

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file topic-reassignment.json --execute

該命令會將指定分區(qū)的副本重新分配到新的 broker 上。集群控制器通過為每個分區(qū)添加 新副本實現(xiàn)重新分配(增加復(fù)制系數(shù))。新的副本將從分區(qū)的首領(lǐng)那里復(fù)制所有數(shù)據(jù)。根 據(jù)分區(qū)大小的不同,復(fù)制過程可能需要花一些時間,因為數(shù)據(jù)是通過網(wǎng)絡(luò)復(fù)制到新副本上 的。在復(fù)制完成之后,控制器將舊副本從副本清單里移除(恢復(fù)到原先的復(fù)制系數(shù))。

第三步:在重分配進行過程中或者完成之后,可以使用 kafka-reassign-partitions.sh 工具驗證重分配 的狀態(tài)。它可以顯示重分配的進度、已經(jīng)完成重分配的分區(qū)以及錯誤信息(如果有的話)。 為了做到這一點,需要在執(zhí)行過程中使用 JSON 對象文件。

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file topic-reassignment.json --verify

5.2、修改復(fù)制系數(shù)

分區(qū)重分配工具提供了一些特性,用于改變分區(qū)的復(fù)制系數(shù),這些特性并沒有在文檔里 說明。如果在創(chuàng)建分區(qū)時指定了錯誤的復(fù)制系數(shù)(比如在創(chuàng)建主題時沒有足夠多可用的 broker),那么就有必要修改它們。這可以通過創(chuàng)建一個 JSON 對象來完成,該對象使用分 區(qū)重新分配的執(zhí)行步驟中使用的格式,顯式指定分區(qū)所需的副本數(shù)量。集群將完成重分配 過程,并使用新的復(fù)制系數(shù)。

例如,假設(shè)主題 my-topic 有一個分區(qū),該分區(qū)的復(fù)制系數(shù)為 1。

{
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "replicas": [1]
    }
  ],
  "version": 1
}

在分區(qū)重新分配的執(zhí)行步驟中使用以下 JSON 可以將復(fù)制系數(shù)改為 2。

{
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "replicas": [1,2]
    }
  ],
  "version": 1
}

也可以通過類似的方式減小分區(qū)的復(fù)制系數(shù)。

6、刪除消息(kafka-delete-records.sh)

示例:刪除testTopic的0分區(qū)的消息刪除至offset為1024

先配置json文件offset-json-file.json

{
  "partitions": [
    {
      "topic": "testTopic",
      "partition": 0,
      "offset": 1024
    }
  ],
  "version": 1
}

執(zhí)行命令:

kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file config/offset-json-file.json

從最開始的地方刪除消息到 1024的offset; 是從最前面開始刪除的。

7、查看Broker磁盤信息(kafka-log-dirs.sh)

  • --bootstrap-server:kafka地址
  • --broker-list:要查詢的broker地址列表,broker之間逗號隔開,不配置該命令則查詢所有broker
  • --topic-list:指定查詢的topic列表,逗號隔開
  • --command-config:配置Admin Client
  • --describe:顯示詳情

示例:查詢指定topic磁盤信息 --topic-list testTopic

kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testTopic

結(jié)果:

{
  "version": 1,
  "brokers": [
    {
      "broker": 0,
      "logDirs": [
        {
          "logDir": "/tmp/kafka-logs",
          "error": null,
          "partitions": [
            {
              "partition": "testTopic-0",
              "size": 27090690,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
  ]
}

示例:查詢指定Broker磁盤信息

kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testTopic --broker-list 0

8、查看日志文件(kafka-dump-log.sh)

參數(shù) 描述 例子
--deep-iteration
--files <String: file1, file2, ...> 必需; 讀取的日志文件 --files 0000009000.log
--key-decoder-class 如果設(shè)置,則用于反序列化鍵。這類應(yīng)實現(xiàn)kafka.serializer。解碼器特性。自定義jar應(yīng)該是在kafka/libs目錄中提供
--max-message-size 最大的數(shù)據(jù)量,默認:5242880
--offsets-decoder 如果設(shè)置了,日志數(shù)據(jù)將被解析為來自__consumer_offsets主題的偏移量數(shù)據(jù)。
--print-data-log 打印內(nèi)容
--transaction-log-decoder 如果設(shè)置,日志數(shù)據(jù)將被解析為來自__transaction_state主題的事務(wù)元數(shù)據(jù)
--value-decoder-class [String] 如果已設(shè)置,則用于反序列化消息。這個類應(yīng)該實現(xiàn)kafka。序列化程序。解碼器特性。自定義jar應(yīng)該在kafka/libs目錄中可用。(默認值:kafka.serializer.StringDecoder)
--verify-index-only 如果設(shè)置了,只需驗證索引日志,而不打印其內(nèi)容。

8.1、查詢Log文件

kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.log

下面每條消息都表示的是batchRecord
baseOffset為起始位置,lastOffset為終止位置,count為本次消息數(shù)量。

baseOffset: 1044628 lastOffset: 1044676 count: 49 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 27083143 CreateTime: 1691300649226 size: 1139 magic: 2 compresscodec: none crc: 2048338167 isvalid: true

baseOffset: 1044677 lastOffset: 1044773 count: 97 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 27084282 CreateTime: 1691300649228 size: 2228 magic: 2 compresscodec: none crc: 1293136921 isvalid: true

8.2、查詢Log文件具體信息(–print-data-log)

kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.log --print-data-log

下面為每條消息的具體信息:

| offset: 399407 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 0 payload: Message_399408
| offset: 399408 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 1 payload: Message_399409
| offset: 399409 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 2 payload: Message_399410
| offset: 399410 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 3 payload: Message_399411
| offset: 399411 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 4 payload: Message_399412
| offset: 399412 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 5 payload: Message_399413
| offset: 399413 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 6 payload: Message_399414
| offset: 399414 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 7 payload: Message_399415
| offset: 399415 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 8 payload: Message_399416
| offset: 399416 CreateTime: 1691292274339 keySize: 4 valueSize: 14 sequence: -1 headerKeys: [] key: 9 payload: Message_399417

8.3、查詢index文件

kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.index

offset為索引值,position為具體位置,可以看到大概每隔600條消息,就建立一個索引。配置項為log.index.size.max.bytes,來控制創(chuàng)建索引的大小;

offset: 972865 position: 25163202
offset: 973495 position: 25179579
offset: 974125 position: 25195956
offset: 974755 position: 25212333
offset: 975385 position: 25228710
offset: 976015 position: 25245087
offset: 976645 position: 25261464
offset: 977275 position: 25277841

8.4、查詢timeindex文件

kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.timeindex
timestamp: 1691292274425 offset: 475709
timestamp: 1691292274426 offset: 476947
timestamp: 1691292274427 offset: 478255
timestamp: 1691292274428 offset: 479543
timestamp: 1691292274429 offset: 480848
timestamp: 1691292274430 offset: 481767
timestamp: 1691292274431 offset: 483209
timestamp: 1691292274432 offset: 484869
timestamp: 1691292274433 offset: 486408

9、副本一致性驗證(kafka-replica-verification.sh)

可以使用 kafka-replica-verification.sh 工具來驗證集群分區(qū)副本的一致性。它會從指定分區(qū) 的副本上獲取消息,并檢查所有副本是否具有相同的消息。我們必須使用正則表達式將待 驗證主題的名字傳給它。如果不提供這個參數(shù),它會驗證所有的主題。除此之外,還需要 顯式地提供 broker 的地址清單。

示例:對 broker 1 和 broker 2 上以 my- 開頭的主題副本進行驗證。

kafka-replica-verification.sh --broker-list kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'

10、控制臺消費者(kafka-console-consumer.sh)

kafka-console-consumer.sh 工具提供了一種從一個或多個主題上讀取消息的方式。消息被打 印在標準輸出上,消息之間以空行分隔。默認情況下,它會打印沒有經(jīng)過格式化的原始消 息字節(jié)(使用 DefaultFormatter)。它有很多可選參數(shù),其中有一些基本的參數(shù)是必選的。

參數(shù) 描述 例子
--group 指定消費者所屬組的ID
--topic 被消費的topic
--partition 指定分區(qū) ;除非指定–offset,否則從分區(qū)結(jié)束(latest)開始消費 --partition 0
--offset 執(zhí)行消費的起始offset位置 ;默認值: latest; /latest /earliest /偏移量 --offset 10
--whitelist 正則表達式匹配topic;--topic就不用指定了; 匹配到的所有topic都會消費; 當(dāng)然用了這個參數(shù),--partition --offset等就不能使用了
--consumer-property 將用戶定義的屬性以key=value的形式傳遞給使用者 --consumer-property group.id=test-consumer-group
--consumer.config 消費者配置屬性文件請注意,[consumer-property]優(yōu)先于此配置 --consumer.config config/consumer.properties
--property 初始化消息格式化程序的屬性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 從存在的最早消息開始,而不是從最新消息開始,注意如果配置了客戶端名稱并且之前消費過,那就不會從頭消費了
--max-messages 消費的最大數(shù)據(jù)量,若不指定,則持續(xù)消費下去 --max-messages 100
--skip-message-on-error 如果處理消息時出錯,請?zhí)^它而不是暫停
--isolation-level 設(shè)置為read_committed以過濾掉未提交的事務(wù)性消息,設(shè)置為read_uncommitted以讀取所有消息,默認值:read_uncommitted
--formatter 格式化器:
kafka.tools.DefaultMessageFormatter有一些非常有用的配置選項,這些選項可以通過--property 命令行參數(shù)傳給它(print.timestamp:如果被設(shè)為 true,就會打印每個消息的時間戳。
print.key:如果被設(shè)為 true,除了打印消息的值之外,還會打印消息的鍵。
key.separator: 指定打印消息的鍵和消息的值所使用的分隔符。
line.separator: 指定消息之間的分隔符。
key.deserializer: 指定打印消息的鍵所使用的反序列化器類名。
value.deserializer: 指定打印消息的值所使用的反序列化器類名。
反序列化類必須實現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口,控制 臺消費者會調(diào)用它們的 toString() 方法獲取輸出結(jié)果。一般來說,在使用 kafka_console_ consumer.sh 工具之前,需要通過環(huán)境變量 CLASSPATH 將這些實現(xiàn)類添加到類路徑里。)
kafka.tools.LoggingMessageFormatter將消息輸出到日志,而不是輸出到標準的輸出設(shè)備。日志級別為 INFO,并且包含了時 間戳、鍵和值。
kafka.tools.NoOpMessageFormatter讀取消息但不打印消息。
kafka.tools.ChecksumMessageFormatter只打印消息的校驗和。

10.1、新客戶端從頭消費(–from-beginning)

注意這里是新客戶端,如果之前已經(jīng)消費過了是不會從頭消費的) 下面沒有指定客戶端名稱,所以每次執(zhí)行都是新客戶端都會從頭消費

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning

10.2、正則表達式匹配topic進行消費(–whitelist)

示例:消費所有的test開頭的topic,監(jiān)聽新的消費:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'test.*'

示例:費所有的test開頭的topic,并且從頭消費:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'test.*' --from-beginning

10.3、顯示key進行消費(–property print.key=true)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --property print.key=true

10.4、指定分區(qū)消費(–partition)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --partition 0

10.5、定起始偏移量消費(–offset)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --partition 0 --offset	100

10.6、給客戶端命名(–group)

注意給客戶端命名之后,如果之前有過消費,那么–from-beginning 就不會再從頭消費了

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --group test-group

10.7、添加客戶端屬性(–consumer-property)

這個參數(shù)也可以給客戶端添加屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎(chǔ)上還加上屬性–group test-group 那肯定不行

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer-property group.id=test-consumer-group

10.8、添加客戶端屬性(–consumer.config)

跟–consumer-property 一樣的性質(zhì),都是添加客戶端的屬性,不過這里是指定一個文件,把屬性寫在文件里面, --consumer-property 的優(yōu)先級大于 --consumer.config

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer.config config/consumer.properties

10.9、讀取偏移量主題(舊版本)

有時候,我們需要知道提交的消費者群組偏移量是多少,比如某個特定的群組是否在提交 偏移量,或者偏移量提交的頻度。這個可以通過讓控制臺消費者讀取一個特殊的內(nèi)部主題 __consumer_offsets 來實現(xiàn)。所有消費者的偏移量都以消息的形式寫到這個主題上。為了 解碼這個主題的消息,需要使用 kafka.coordinator.GroupMetadataManager$OffsetsMessage Formatter 這個格式化器。

示例:從偏移量主題讀取一個消息。

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 10

11、控制臺生產(chǎn)者(kafka-console-producer.sh)

與控制臺消費者類似,kafka-console-producer.sh 工具可以用于向 Kafka 主題寫入消息。默認情況下,該工具將命令行輸入的每一行視為一個消息,消息的鍵和值以 Tab 字符分隔 (如果沒有出現(xiàn) Tab 字符,那么鍵就是 null)。

參數(shù) 值類型 說明 有效值
--bootstrap-server String 要連接的服務(wù)器必需(除非指定--broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收消息的主題名稱
--batch-size Integer 單個批處理中發(fā)送的消息數(shù) 200(默認值)
--compression-codec String 壓縮編解碼器 none、gzip(默認值)snappy、lz4、zstd
--max-block-ms Long 在發(fā)送請求期間,生產(chǎn)者將阻止的最長時間 60000(默認值)
--max-memory-bytes Long 生產(chǎn)者用來緩沖等待發(fā)送到服務(wù)器的總內(nèi)存 33554432(默認值)
--max-partition-memory-bytes Long 為分區(qū)分配的緩沖區(qū)大小 16384
--message-send-max-retries Integer 最大的重試發(fā)送次數(shù) 3
--metadata-expiry-ms Long 強制更新元數(shù)據(jù)的時間閾值(ms) 300000
--producer-property String 將自定義屬性傳遞給生成器的機制 如:key=value
--producer.config String 生產(chǎn)者配置屬性文件[--producer-property]優(yōu)先于此配置 配置文件完整路徑
--property String 自定義消息讀取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
--request-required-acks String 生產(chǎn)者請求的確認方式 0、1(默認值)、all
--request-timeout-ms Integer 生產(chǎn)者請求的確認超時時間 1500(默認值)
--retry-backoff-ms Integer 生產(chǎn)者重試前,刷新元數(shù)據(jù)的等待時間閾值 100(默認值)
--socket-buffer-size Integer TCP接收緩沖大小 102400(默認值)
--timeout Integer 消息排隊異步等待處理的時間閾值 1000(默認值)
--sync 同步發(fā)送消息
--version 顯示 Kafka 版本 不配合其他參數(shù)時,顯示為本地Kafka版本
--help 打印幫助信息

11.1、生產(chǎn)無key消息

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testTopic --producer.config ../config/producer.properties

11.2、生產(chǎn)有key消息(–property parse.key=true)

默認消息key與消息value間使用“Tab鍵”進行分隔,所以消息key以及value中切勿使用轉(zhuǎn)義字符(\t):

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testTopic --producer.config ../config/producer.properties  --property parse.key=true

12、持續(xù)批量推送消息(kafka-verifiable-producer.sh)

該腳本可以生產(chǎn)測試數(shù)據(jù)發(fā)送到指定topic,并將數(shù)據(jù)已json格式打印到控制臺。

  • –topic:主題名稱
  • –broker-list:broker列表, HOST1:PORT1,HOST2:PORT2,…
  • –max-messages:最大消息數(shù)量,默認-1,一直生產(chǎn)消息
  • –throughput:設(shè)置吞吐量,默認-1
  • –acks:指定分區(qū)中必須有多少個副本收到這條消息,才算消息發(fā)送成功,默認-1
  • –producer.config:配置文件
  • –message-create-time:設(shè)置消息創(chuàng)建的時間,時間戳
  • –value-prefix:設(shè)置消息前綴
  • –repeating-keys:key從0開始,每次遞增1,直到指定的值,然后再從0開始

12.1、單次發(fā)送100條消息(–max-messages 100)

kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic --max-messages 100

12.2、每秒發(fā)送最大吞吐量不超過10 (–throughput 10)

推送消息時的吞吐量,單位messages/sec。默認為-1,表示沒有限制

kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic --throughput 10

12.3、發(fā)送的消息體帶前綴(–value-prefix)

kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --topic testTopic  --value-prefix 666

注意 --value-prefix 666必須是整數(shù),發(fā)送的消息體的格式是加上一個 點號. 例如: 666.

其他參數(shù): --producer.config CONFIG_FILE 指定producer的配置文件 --acks ACKS 每次推送消息的ack值,默認是-1

13、持續(xù)批量拉取消息(kafka-verifiable-consumer.sh)

  • –bootstrap-server:指定kafka服務(wù) 指定連接到的kafka服務(wù);
  • –topic:指定消費的topic
  • –group-id:消費者id;不指定的話每次都是新的組id
  • –group-instance-id:消費組實例ID,唯一值
  • –max-messages:單次最大消費的消息數(shù)量
  • –enable-autocommit:是否開啟offset自動提交;默認為false
  • –reset-policy:當(dāng)以前沒有消費記錄時,選擇要拉取offset的策略,可以是earliest, latest,none。默認是earliest
  • –assignment-strategy:consumer分配分區(qū)策略,默認是org.apache.kafka.clients.consumer.RangeAssignor
  • –consumer.config:指定consumer的配置文件

13.1、持續(xù)消費

新的groupId,默認從頭消費:

kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --group-id test_consumer  --topic testTopic

13.2、單次最大消費(–max-messages)

kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --group-id test_consumer  --topic testTopic --max-messages 10 

14、生產(chǎn)者壓力測試(kafka-producer-perf-test.sh)

參數(shù) 描述 例子
--topic 指定消費的topic
--num-records 發(fā)送多少條消息
--throughput 每秒消息最大吞吐量
--producer-props 生產(chǎn)者配置, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config 生產(chǎn)者配置文件 --producer.config config/producer.propeties
--print-metrics 在test結(jié)束的時候打印監(jiān)控信息,默認false --print-metrics true
--transactional-id 指定事務(wù) ID,測試并發(fā)事務(wù)的性能時需要,只有在 --transaction-duration-ms > 0 時生效,默認值為 performance-producer-default-transactional-id
--transaction-duration-ms 指定事務(wù)持續(xù)的最長時間,超過這段時間后就會調(diào)用 commitTransaction 來提交事務(wù),只有指定了 > 0 的值才會開啟事務(wù),默認值為 0
--record-size 一條消息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定
--payload-file 指定消息的來源文件,只支持 UTF-8 編碼的文本文件,文件的消息分隔符通過 --payload-delimeter 指定,默認是用換行\(zhòng)nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的消息
--payload-delimeter 如果通過 --payload-file 指定了從文件中獲取消息內(nèi)容,那么這個參數(shù)的意義是指定文件的消息分隔符,默認值為 \n,即文件的每一行視為一條消息;如果未指定--payload-file則此參數(shù)不生效;發(fā)送消息的時候是隨機送文件里面選擇消息發(fā)送的;
  1. 發(fā)送1024條消息 --num-records 100并且每條消息大小為1KB–record-size 1024 最大吞吐量每秒10000條–throughput 100
kafka-producer-perf-test.sh --topic testTopic --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

2.用指定消息文件–payload-file 發(fā)送100條消息最大吞吐量每秒100條–throughput 100

先配置好消息文件batchmessage.txt:

1
2
3
4
5
6
7
8
9
0

然后執(zhí)行命令 發(fā)送的消息會從batchmessage.txt里面隨機選擇; 注意這里我們沒有用參數(shù)–payload-delimeter指定分隔符,默認分隔符是\n換行;

kafka-producer-perf-test.sh --topic testTopic --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9092 --payload-file config/batchmessage.txt

15、消費者壓力測試(kafka-consumer-perf-test.sh)

參數(shù) 描述 例子
--bootstrap-server
--consumer.config 消費者配置文件
--date-format 結(jié)果打印出來的時間格式化 默認:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 單次請求獲取數(shù)據(jù)的大小 默認1048576
--topic 指定消費的topic
--from-latest
--group 消費組ID
--hide-header 如果設(shè)置了,則不打印header信息
--messages 需要消費的數(shù)量
--num-fetch-threads feth 數(shù)據(jù)的線程數(shù)(廢棄無效) 默認:1
--print-metrics 結(jié)束的時候打印監(jiān)控數(shù)據(jù)
--show-detailed-stats 如果設(shè)置,則按照--report_interval配置的方式報告每個報告間隔的統(tǒng)計信息
--threads 消費線程數(shù);(廢棄無效) 默認 10
--reporting-interval 打印進度信息的時間間隔(以毫秒為單位)

消費100條消息 --messages 100

kafka-consumer-perf-test.sh -topic testTopic --bootstrap-server localhost:9092 --messages 100

16、常用操作

16.1、查看 topic 指定分區(qū) offset 的最大值或最小值

time 為 -1 時表示最大值,為 -2 時表示最小值

kafka-run-class.sh kafka.tools.GetOffsetShell --topic testTopic --time -1 --broker-list 127.0.0.1:9092 --partitions 0

16.2、查詢topic的offset的范圍

查詢offset最小值:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic --time -2

查詢offset最大值:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic --time -1

16.3、重置消費者offset

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup  --reset-offsets --execute --to-offset NEW_OFFSET --topic testTopic

kafka-consumer-groups.sh --bootstrap-server localhost:9092  --group testGroup  --reset-offsets --execute --to-earliest/--to-latest --topic testTopic

16.4、刪除topic下的數(shù)據(jù)

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic testTopic --config cleanup.policy=delete

16.5、給指定TOPIC設(shè)置消息存儲時間 – 針對數(shù)據(jù)量大,磁盤小的情況

查看某一個topic設(shè)置過期時間:

kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-name testTopic --entity-type topics

單獨對某一個topic設(shè)置過期時間文章來源地址http://www.zghlxwxcb.cn/news/detail-720469.html

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-name testTopic --entity-type topics --add-config retention.ms=86400000

到了這里,關(guān)于Kafka——管理Kafka(命令行工具)詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka可視化管理工具kafka-manager部署安裝和使用

    Kafka可視化管理工具kafka-manager部署安裝和使用

    為了簡化開發(fā)者和服務(wù)工程師維護Kafka集群的工作,yahoo構(gòu)建了一個叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。 這個管理工具可以很容易地發(fā)現(xiàn)分布在集群中的哪些topic分布不均勻,或者是分區(qū)在整個集群分布不均勻的的情況。 它支持管理多個集群、選擇副本、副本重新

    2024年02月16日
    瀏覽(1191)
  • 五分鐘,Docker安裝kafka 3.5,kafka-map圖形化管理工具

    五分鐘,Docker安裝kafka 3.5,kafka-map圖形化管理工具

    首先確保已經(jīng)安裝docker,如果是windows安裝docker,可參考 wsl2安裝docker 注意:將192.168.xx.xx替換為你的主機IP即可 進入kafka 查看kafka版本 創(chuàng)建一個新的主題: 在開啟一個新的終端,一個作為生產(chǎn)者,一個作為消費者 消費者 生產(chǎn)者 在生產(chǎn)者頁面輸入測試內(nèi)容: 在消費者頁面查看

    2024年02月03日
    瀏覽(18)
  • kafka可視化web管理工具-KafkaMmap

    kafka可視化web管理工具-KafkaMmap

    ? 使用過kafka的小伙伴應(yīng)該都知道kafka本身是沒有管理界面的,所有操作都需要手動執(zhí)行命令來完成。但有些命令又多又長,如果沒有做筆記,別說是新手,就連老手也不一定能記得住,每次想要使用的時候都要上網(wǎng)搜索一下。有些崇尚g(shù)eek精神的人或許覺得命令行才是真愛,

    2024年01月24日
    瀏覽(106)
  • 【kafka-ui】支持kafka with kraft的可視化集群管理工具

    【kafka-ui】支持kafka with kraft的可視化集群管理工具

    本文在kafka3.3.1版本基礎(chǔ)上進行測試 在早期使用kafka的時候一般使用Kafka Tool或者kafka eagle,前者為桌面軟件,后者為瀏覽器軟件??傮w來說體驗一般,但是還比較夠用。 但是從kafka3.3.1開始,已經(jīng)正式拋棄zookeeper使用自己的仲裁器了,但是上述兩種kafka可視化工具的更新好像并

    2024年02月02日
    瀏覽(295)
  • 探索Conduktor的Kafka Stack Docker Compose:一款強大的Apache Kafka管理工具

    項目地址:https://gitcode.com/conduktor/kafka-stack-docker-compose Conduktor的Kafka Stack Docker Compose 是一個便捷的開源解決方案,它提供了一種快速部署和管理Apache Kafka集群的方法,通過Docker容器化技術(shù)實現(xiàn)了輕量級的本地環(huán)境搭建。該項目旨在簡化開發(fā)者和數(shù)據(jù)工程師的工作流程,使他們能

    2024年04月13日
    瀏覽(22)
  • Kafka消息監(jiān)控管理工具Offset Explorer的使用教程

    Kafka消息監(jiān)控管理工具Offset Explorer的使用教程

    Offset Explorer是一款用于監(jiān)控和管理Apache Kafka集群中消費者組偏移量的開源工具。它提供了一個簡單直觀的用戶界面,用于查看和管理Kafka消費者組偏移量的詳細信息。 Offset Explorer具有以下主要功能和特點: 實時監(jiān)控:Offset Explorer可以實時監(jiān)控Kafka集群中的消費者組偏移量。它

    2024年02月16日
    瀏覽(33)
  • Linux journalctl命令詳解(journalctl指令)(systemd服務(wù)默認日志管理工具)

    Linux journalctl命令詳解(journalctl指令)(systemd服務(wù)默認日志管理工具)

    Linux提供了一個強大的日志系統(tǒng),它可以跟蹤和記錄系統(tǒng)的各種活動。在這個系統(tǒng)中, journalctl 是一個非常重要的工具,用于查詢和操作由systemd進程管理的日志。 本文將深入探討 journalctl 命令,介紹其基本使用、高級選項及示例等內(nèi)容。 Systemd是Linux發(fā)行版的初始化系統(tǒng),負

    2024年02月08日
    瀏覽(23)
  • 【linux命令講解大全】022.網(wǎng)絡(luò)管理工具和命令概述

    用于查看文件的第二擴展文件系統(tǒng)屬性。 語法 lsattr(選項)(參數(shù)) 選項 -E :可顯示設(shè)備屬性的當(dāng)前值,但這個當(dāng)前值是從用戶設(shè)備數(shù)據(jù)庫中獲得的,而不是從設(shè)備直接獲得的。 -D :顯示屬性的名稱,屬性的默認值,描述和用戶是否可以修改屬性值的標志。 -R :遞歸的操作方式

    2024年02月11日
    瀏覽(20)
  • python源管理工具之pip命令摘要

    ? ? ? ? 日常python使用過程中,免不了要安裝各種包,可以使用Anaconda和Conda之類工具去維護,由于先入為主養(yǎng)成的習(xí)慣,博主一直使用pip輕量級工具(python默認安裝后自帶)。 ? ? ? ? 詳細的命令,此處就不贅述了,可以直接敲pip -h看子命令,或進一步看子命令的幫助(比

    2024年02月08日
    瀏覽(47)
  • Windows自帶的超強命令行磁盤管理工具

    Windows自帶的超強命令行磁盤管理工具

    不知道大家在裝Windows系統(tǒng)的時候,使用原版安裝的多不多。GHOST版系統(tǒng)是個好東西,能夠讓不懂計算機的操作者能夠一鍵裝好系統(tǒng),并且GHOST版的制作大神們還針對系統(tǒng)做了非常多的優(yōu)化操作。 雖然【優(yōu)化】這個詞最近才有了一些恐怖的意思,不過我確實因為那些我不太了解

    2024年02月09日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包