【Kafka-3.x-教程】專欄:
【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入門
【Kafka-3.x-教程】-【二】Kafka-生產(chǎn)者-Producer
【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft
【Kafka-3.x-教程】-【四】Kafka-消費(fèi)者-Consumer
【Kafka-3.x-教程】-【五】Kafka-監(jiān)控-Eagle
【Kafka-3.x-教程】-【六】Kafka 外部系統(tǒng)集成 【Flume、Flink、SpringBoot、Spark】
【Kafka-3.x-教程】-【七】Kafka 生產(chǎn)調(diào)優(yōu)、Kafka 壓力測試
1)Kafka 硬件配置選擇
1.1.場景說明
100 萬日活,每人每天 100 條日志,每天總共的日志條數(shù)是 100 萬 * 100 條 = 1 億條。
1 億/24 小時(shí)/60 分/60 秒 = 1150 條/每秒鐘。
每條日志大?。?.5k - 2k(取 1k)。
1150 條/每秒鐘 * 1k ≈ 1m/s 。
高峰期每秒鐘:1150 條 * 20 倍 = 23000 條。
每秒多少數(shù)據(jù)量:20MB/s。
1.2.服務(wù)器臺數(shù)選擇
服務(wù)器臺數(shù) = 2 * (生產(chǎn)者峰值生產(chǎn)速率 * 副本 / 100) + 1 = 2 * (20m/s * 2 / 100) + 1 = 3 臺
建議 3 臺服務(wù)器。
1.3.磁盤選擇
kafka 底層主要是順序?qū)?,固態(tài)硬盤和機(jī)械硬盤的順序?qū)懰俣炔畈欢唷?/p>
建議選擇普通的機(jī)械硬盤。
每天總數(shù)據(jù)量:1 億條 * 1k ≈ 100g
100g * 副本 2 * 保存時(shí)間 3 天 / 0.7 ≈ 1T。
建議三臺服務(wù)器硬盤總大小,大于等于 1T。
1.4.內(nèi)存選擇
Kafka 內(nèi)存組成:堆內(nèi)存 + 頁緩存
1、Kafka 堆內(nèi)存建議每個(gè)節(jié)點(diǎn):10g ~ 15g
在 kafka-server-start.sh 中修改
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
(1)查看 Kafka 進(jìn)程號
jps
2321 Kafka
5255 Jps
1931 QuorumPeerMain
(2)根據(jù) Kafka 進(jìn)程號,查看 Kafka 的 GC 情況
jstat -gc 2321 1s 10
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT GCT
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
參數(shù)說明:
- S0C:第一個(gè)幸存區(qū)的大??;
- S1C:第二個(gè)幸存區(qū)的大小
- S0U:第一個(gè)幸存區(qū)的使用大小;
- S1U:第二個(gè)幸存區(qū)的使用大小
- EC:伊甸園區(qū)的大?。?/li>
- EU:伊甸園區(qū)的使用大小
- OC:老年代大??;
- OU:老年代使用大小
- MC:方法區(qū)大??;
- MU:方法區(qū)使用大小
- CCSC:壓縮類空間大??;
- CCSU:壓縮類空間使用大小
- YGC:年輕代垃圾回收次數(shù);
- YGCT:年輕代垃圾回收消耗時(shí)間
- FGC:老年代垃圾回收次數(shù);
- FGCT:老年代垃圾回收消耗時(shí)間
- GCT:垃圾回收消耗總時(shí)間;
(3)根據(jù) Kafka 進(jìn)程號,查看 Kafka 的堆內(nèi)存
jmap -heap 2321
Attaching to process ID 2321, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.212-b10
using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)
Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 2147483648 (2048.0MB)
NewSize = 1363144 (1.2999954223632812MB)
MaxNewSize = 1287651328 (1228.0MB)
OldSize = 5452592 (5.1999969482421875MB)
NewRatio = 2
SurvivorRatio = 8
MetaspaceSize = 21807104 (20.796875MB)
CompressedClassSpaceSize = 1073741824 (1024.0MB)
MaxMetaspaceSize = 17592186044415 MB
G1HeapRegionSize = 1048576 (1.0MB)
Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 246367744 (234.95458984375MB)
free = 1901115904 (1813.04541015625MB)
11.472392082214355% used
G1 Young Generation:
Eden Space:
regions = 83
capacity = 105906176 (101.0MB)
used = 87031808 (83.0MB)
free = 18874368 (18.0MB)
82.17821782178218% used
Survivor Space:
regions = 7
capacity = 7340032 (7.0MB)
used = 7340032 (7.0MB)
free = 0 (0.0MB)
100.0% used
G1 Old Generation:
regions = 147
capacity = 2034237440 (1940.0MB)
used = 151995904 (144.95458984375MB)
free = 1882241536 (1795.04541015625MB)
7.471886074420103% used
13364 interned Strings occupying 1449608 bytes.
2、頁緩存:頁緩存是 Linux 系統(tǒng)服務(wù)器的內(nèi)存。我們只需要保證 1 個(gè) segment(1g)中 25%的數(shù)據(jù)在內(nèi)存中就好。
每個(gè)節(jié)點(diǎn)頁緩存大小 =(分區(qū)數(shù) * 1g * 25%)/ 節(jié)點(diǎn)數(shù)。例如 10 個(gè)分區(qū),頁緩存大小 =(10 * 1g * 25%)/ 3 ≈ 1g
建議服務(wù)器內(nèi)存大于等于 11G。
1.5.CPU 選擇
num.io.threads = 8 負(fù)責(zé)寫磁盤的線程數(shù),整個(gè)參數(shù)值要占總核數(shù)的 50%。
num.replica.fetchers = 1 副本拉取線程數(shù),這個(gè)參數(shù)占總核數(shù)的 50%的 1/3。
num.network.threads = 3 數(shù)據(jù)傳輸線程數(shù),這個(gè)參數(shù)占總核數(shù)的 50%的 2/3。
建議 32 個(gè) cpu core。
1.6.網(wǎng)絡(luò)選擇
網(wǎng)絡(luò)帶寬 = 峰值吞吐量 ≈ 20MB/s 選擇千兆網(wǎng)卡即可。
100Mbps 單位是 bit;10M/s 單位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的網(wǎng)卡(100Mbps )、千兆的網(wǎng)卡(1000Mbps)、萬兆的網(wǎng)卡(10000Mbps)。
2)Kafka 生產(chǎn)者
詳見:【Kafka-3.x-教程】-【二】Kafka-生產(chǎn)者-Producer
3.1.1 Updating Broker Configs
From Kafka version 1.1 onwards, some of the broker configs can be
updated without restarting the broker. See the Dynamic Update Mode
column in Broker Configs for the update mode of each broker config.
read-only: Requires a broker restart for update
per-broker: May be updated dynamically for each broker
cluster-wide: May be updated dynamically as a cluster-wide default.
May also be updated as a per-broker value for testing.
2.1.Kafka 生產(chǎn)者核心參數(shù)配置
2.2.生產(chǎn)者如何提高吞吐量
2.3.數(shù)據(jù)可靠性
2.4.數(shù)據(jù)去重
1、參數(shù)配置
2、Kafka 的事務(wù)一共有如下 5 個(gè) API
// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
2.5.數(shù)據(jù)有序
單分區(qū)內(nèi),有序(有條件的,不能亂序);多分區(qū),分區(qū)與分區(qū)間無序;
2.6.數(shù)據(jù)亂序
3)Kafka Broker
詳見:【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft
3.1.Broker 核心參數(shù)配置
3.2.服役新節(jié)點(diǎn)/退役舊節(jié)點(diǎn)
1、創(chuàng)建一個(gè)要均衡的主題。
vim topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}
2、生成一個(gè)負(fù)載均衡的計(jì)劃。
bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --topics-to-move-json-file
topics-to-move.json --broker-list "0,1,2,3" --generate
3、創(chuàng)建副本存儲計(jì)劃(所有副本存儲在 broker0、broker1、broker2、broker3 中)。
vim increase-replication-factor.json
4、執(zhí)行副本存儲計(jì)劃。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
5、驗(yàn)證副本存儲計(jì)劃。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify
3.3.增加分區(qū)
修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
3.4.增加副本因子
1、創(chuàng)建 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
2、手動增加副本存儲
(1)創(chuàng)建副本存儲計(jì)劃(所有副本都指定存儲在 broker0、broker1、broker2 中)。
vim increase-replication-factor.json
#輸入如下內(nèi)容:
{"version":1,"partitions":[{"topic":"four","partition":0,"replica
s":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"t
opic":"four","partition":2,"replicas":[0,1,2]}]}
(2)執(zhí)行副本存儲計(jì)劃。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
3.5.手動調(diào)整分區(qū)副本存儲
1、創(chuàng)建副本存儲計(jì)劃(所有副本都指定存儲在 broker0、broker1 中)。
vim increase-replication-factor.json
#輸入如下內(nèi)容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
3、驗(yàn)證副本存儲計(jì)劃。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify
3.6.Leader Partition 負(fù)載平衡
3.7.自動創(chuàng)建主題
如果 broker 端配置參數(shù) auto.create.topics.enable 設(shè)置為 true(默認(rèn)值是 true)
,那么當(dāng)生產(chǎn)者向一個(gè)未創(chuàng)建的主題發(fā)送消息時(shí),會自動創(chuàng)建一個(gè)分區(qū)數(shù)為 num.partitions(默認(rèn)值為1)、副本因子為 default.replication.factor(默認(rèn)值為 1)的主題。除此之外,當(dāng)一個(gè)消費(fèi)者開始從未知主題中讀取消息時(shí),或者當(dāng)任意一個(gè)客戶端向未知主題發(fā)送元數(shù)據(jù)請求時(shí),都會自動創(chuàng)建一個(gè)相應(yīng)主題。這種創(chuàng)建主題的方式是非預(yù)期的,增加了主題管理和維護(hù)的難度。
生產(chǎn)環(huán)境建議將該參數(shù)設(shè)置為 false
。
1、向一個(gè)沒有提前創(chuàng)建 five 主題發(fā)送數(shù)據(jù)
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic five
>hello world
2、查看 five 主題的詳情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five
4)Kafka 消費(fèi)者
詳見:【Kafka-3.x-教程】-【四】Kafka-消費(fèi)者-Consumer
4.1.Kafka 消費(fèi)者核心參數(shù)配置
4.2.消費(fèi)者再平衡
4.3.指定 Offset 消費(fèi)
kafkaConsumer.seek(topic, 1000);
4.4.指定時(shí)間消費(fèi)
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, System.currentTimeMillis() -1 * 24 * 3600 * 1000);
kafkaConsumer.offsetsForTimes(timestampToSearch);
4.5.消費(fèi)者事務(wù)
4.6.消費(fèi)者如何提高吞吐量
增加分區(qū)數(shù);
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
5)Kafka 總體
5.1.如何提升吞吐量
1、提升生產(chǎn)吞吐量
(1)buffer.memory
:發(fā)送消息的緩沖區(qū)大小,默認(rèn)值是 32m,可以增加到 64m。
(2)batch.size
:默認(rèn)是 16k。如果 batch 設(shè)置太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果 batch 太大,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。
(3)linger.ms
:這個(gè)值默認(rèn)是 0,意思就是消息必須立即被發(fā)送。一般設(shè)置一個(gè) 5-100 毫秒。如果 linger.ms 設(shè)置的太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果 linger.ms 太長,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。
(4)compression.type
:默認(rèn)是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大 producer 端的 CPU 開銷。
2、增加分區(qū)
3、消費(fèi)者提高吞吐量
(1)調(diào)整 fetch.max.bytes
大小,默認(rèn)是 50m。
(2)調(diào)整 max.poll.records
大小,默認(rèn)是 500 條。
4、增加下游消費(fèi)者處理能力
5.2.數(shù)據(jù)精準(zhǔn)一次
1、生產(chǎn)者角度
- acks 設(shè)置為 -1 (acks=-1)。
- 冪等性(
enable.idempotence = true
) + 事務(wù) 。
2、broker 服務(wù)端角度
- 分區(qū)副本大于等于 2 (–replication-factor 2)。
- ISR 里應(yīng)答的最小副本數(shù)量大于等于 2 (
min.insync.replicas = 2
)。
3、消費(fèi)者
- 事務(wù) + 手動提交 offset (
enable.auto.commit = false
)。 - 消費(fèi)者輸出的目的地必須支持事務(wù)(MySQL、Kafka)。
5.3.合理設(shè)置分區(qū)數(shù)
1、創(chuàng)建一個(gè)只有 1 個(gè)分區(qū)的 topic。
2、測試這個(gè) topic 的 producer 吞吐量和 consumer 吞吐量。
3、假設(shè)他們的值分別是 Tp 和 Tc,單位可以是 MB/s。
4、然后假設(shè)總的目標(biāo)吞吐量是 Tt,那么分區(qū)數(shù) = Tt / min(Tp,Tc)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;
分區(qū)數(shù) = 100 / 20 = 5 分區(qū)
分區(qū)數(shù)一般設(shè)置為:3-10 個(gè)
分區(qū)數(shù)不是越多越好,也不是越少越好,需要搭建完集群,進(jìn)行壓測,再靈活調(diào)整分區(qū)
個(gè)數(shù)。
5.4.單條日志大于1m
5.5.服務(wù)器掛了
在生產(chǎn)環(huán)境中,如果某個(gè) Kafka 節(jié)點(diǎn)掛掉。正常處理辦法:
1、先嘗試重新啟動一下,如果能啟動正常,那直接解決。
2、如果重啟不行,考慮增加內(nèi)存、增加 CPU、網(wǎng)絡(luò)帶寬。
3、如果將 kafka 整個(gè)節(jié)點(diǎn)誤刪除,如果副本數(shù)大于等于 2,可以按照服役新節(jié)點(diǎn)的方式重新服役一個(gè)新節(jié)點(diǎn),并執(zhí)行負(fù)載均衡。
6)Kafka 壓測
用 Kafka 官方自帶的腳本,對 Kafka 進(jìn)行壓測。
-
生產(chǎn)者壓測:kafka-producer-perf-test.sh
-
消費(fèi)者壓測:kafka-consumer-perf-test.sh
6.1.Kafka Producer 壓力測試
1、創(chuàng)建一個(gè) test topic,設(shè)置為 3 個(gè)分區(qū) 3 個(gè)副本
bin/kafka-topics.sh --bootstrapserver hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test
2、在 /opt/module/kafka/bin 目錄下面有這兩個(gè)文件。我們來測試一下
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092batch.size=16384 linger.ms=0
參數(shù)說明:
- record-size 是一條信息有多大,單位是字節(jié),本次測試設(shè)置為 1k。
- num-records 是總共發(fā)送多少條信息,本次測試設(shè)置為 100 萬條。
- throughput 是每秒多少條信息,設(shè)成-1,表示不限流,盡可能快的生產(chǎn)數(shù)據(jù),可測
出生產(chǎn)者最大吞吐量。本次實(shí)驗(yàn)設(shè)置為每秒鐘 1 萬條。 - producer-props 后面可以配置生產(chǎn)者相關(guān)參數(shù),
batch.size 配置為 16k
。
輸出結(jié)果:
ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384
linger.ms=0
37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency,
1453.0 ms max latency.
50535 records sent, 10107.0 records/sec (9.87 MB/sec), 1199.5 ms avg
latency, 1404.0 ms max latency.
47835 records sent, 9567.0 records/sec (9.34 MB/sec), 1350.8 ms avg latency,
1570.0 ms max latency.
。。。 。。。
42390 records sent, 8444.2 records/sec (8.25 MB/sec), 3372.6 ms avg latency,
4008.0 ms max latency.
37800 records sent, 7558.5 records/sec (7.38 MB/sec), 4079.7 ms avg latency,
4758.0 ms max latency.
33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency,
5049.0 ms max latency.
1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms
avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms
99th, 5030 ms 99.9th.
3、調(diào)整 batch.size 大小
(1)batch.size 默認(rèn)值是 16k。本次實(shí)驗(yàn) batch.size 設(shè)置為 32k。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=32768 linger.ms=0
輸出結(jié)果:
49922 records sent, 9978.4 records/sec (9.74 MB/sec), 64.2 ms avg latency,
340.0 ms max latency.
49940 records sent, 9988.0 records/sec (9.75 MB/sec), 15.3 ms avg latency,
31.0 ms max latency.
50018 records sent, 10003.6 records/sec (9.77 MB/sec), 16.4 ms avg latency,
52.0 ms max latency.
。。。 。。。
49960 records sent, 9992.0 records/sec (9.76 MB/sec), 17.2 ms avg latency,
40.0 ms max latency.
50090 records sent, 10016.0 records/sec (9.78 MB/sec), 16.9 ms avg latency,
47.0 ms max latency.
1000000 records sent, 9997.600576 records/sec (9.76 MB/sec), 20.20 ms avg
latency, 340.00 ms max latency, 16 ms 50th, 30 ms 95th, 168 ms 99th, 249
ms 99.9th.
(2)batch.size 默認(rèn)值是 16k。本次實(shí)驗(yàn) batch.size 設(shè)置為 4k。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=0
輸出結(jié)果:
15598 records sent, 3117.1 records/sec (3.04 MB/sec), 1878.3 ms avg latency,
3458.0 ms max latency.
17748 records sent, 3549.6 records/sec (3.47 MB/sec), 5072.5 ms avg latency,
6705.0 ms max latency.
18675 records sent, 3733.5 records/sec (3.65 MB/sec), 6800.9 ms avg latency,
7052.0 ms max latency.
。。。 。。。
19125 records sent, 3825.0 records/sec (3.74 MB/sec), 6416.5 ms avg latency,
7023.0 ms max latency.
1000000 records sent, 3660.201531 records/sec (3.57 MB/sec), 6576.68 ms
avg latency, 7677.00 ms max latency, 6745 ms 50th, 7298 ms 95th, 7507 ms
99th, 7633 ms 99.9th.
4、調(diào)整 linger.ms 時(shí)間:linger.ms 默認(rèn)是 0ms。本次實(shí)驗(yàn) linger.ms 設(shè)置為 50ms。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50
輸出結(jié)果:
16804 records sent, 3360.1 records/sec (3.28 MB/sec), 1841.6 ms avg latency,
3338.0 ms max latency.
18972 records sent, 3793.6 records/sec (3.70 MB/sec), 4877.7 ms avg latency,
6453.0 ms max latency.
19269 records sent, 3852.3 records/sec (3.76 MB/sec), 6477.9 ms avg latency,
6686.0 ms max latency.
。。。 。。。
17073 records sent, 3414.6 records/sec (3.33 MB/sec), 6987.7 ms avg latency,
7353.0 ms max latency.
19326 records sent, 3865.2 records/sec (3.77 MB/sec), 6756.5 ms avg latency,
7357.0 ms max latency.
1000000 records sent, 3842.754486 records/sec (3.75 MB/sec), 6272.49 ms
avg latency, 7437.00 ms max latency, 6308 ms 50th, 6880 ms 95th, 7289 ms
99th, 7387 ms 99.9th.
5、調(diào)整壓縮方式
(1)默認(rèn)的壓縮方式是 none。本次實(shí)驗(yàn) compression.type 設(shè)置為 snappy。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=snappy
輸出結(jié)果:
17244 records sent, 3446.0 records/sec (3.37 MB/sec), 5207.0 ms avg latency,
6861.0 ms max latency.
18873 records sent, 3774.6 records/sec (3.69 MB/sec), 6865.0 ms avg latency,
7094.0 ms max latency.
18378 records sent, 3674.1 records/sec (3.59 MB/sec), 6579.2 ms avg latency,
6738.0 ms max latency.
。。。 。。。
17631 records sent, 3526.2 records/sec (3.44 MB/sec), 6671.3 ms avg latency,
7566.0 ms max latency.
19116 records sent, 3823.2 records/sec (3.73 MB/sec), 6739.4 ms avg latency,
7630.0 ms max latency.
1000000 records sent, 3722.925028 records/sec (3.64 MB/sec), 6467.75 ms
avg latency, 7727.00 ms max latency, 6440 ms 50th, 7308 ms 95th, 7553 ms
99th, 7665 ms 99.9th.
(2)默認(rèn)的壓縮方式是 none。本次實(shí)驗(yàn) compression.type 設(shè)置為 zstd。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=zstd
輸出結(jié)果:
23820 records sent, 4763.0 records/sec (4.65 MB/sec), 1580.2 ms avg latency,
2651.0 ms max latency.
29340 records sent, 5868.0 records/sec (5.73 MB/sec), 3666.0 ms avg latency,
4752.0 ms max latency.
28950 records sent, 5788.8 records/sec (5.65 MB/sec), 5785.2 ms avg latency,
6865.0 ms max latency.
。。。 。。。
29580 records sent, 5916.0 records/sec (5.78 MB/sec), 6907.6 ms avg latency,
7432.0 ms max latency.
29925 records sent, 5981.4 records/sec (5.84 MB/sec), 6948.9 ms avg latency,
7541.0 ms max latency.
1000000 records sent, 5733.583318 records/sec (5.60 MB/sec), 6824.75 ms
avg latency, 7595.00 ms max latency, 7067 ms 50th, 7400 ms 95th, 7500 ms
99th, 7552 ms 99.9th.
(3)默認(rèn)的壓縮方式是 none。本次實(shí)驗(yàn) compression.type 設(shè)置為 gzip。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=gzip
輸出結(jié)果:
27170 records sent, 5428.6 records/sec (5.30 MB/sec), 1374.0 ms avg latency,
2311.0 ms max latency.
31050 records sent, 6210.0 records/sec (6.06 MB/sec), 3183.8 ms avg latency,
4228.0 ms max latency.
32145 records sent, 6427.7 records/sec (6.28 MB/sec), 5028.1 ms avg latency,
6042.0 ms max latency.
。。。 。。。
31710 records sent, 6342.0 records/sec (6.19 MB/sec), 6457.1 ms avg latency,
6777.0 ms max latency.
31755 records sent, 6348.5 records/sec (6.20 MB/sec), 6498.7 ms avg latency,
6780.0 ms max latency.
32760 records sent, 6548.1 records/sec (6.39 MB/sec), 6375.7 ms avg latency,
6822.0 ms max latency.
1000000 records sent, 6320.153706 records/sec (6.17 MB/sec), 6155.42 ms
avg latency, 6943.00 ms max latency, 6437 ms 50th, 6774 ms 95th, 6863 ms
99th, 6912 ms 99.9th.
(4)默認(rèn)的壓縮方式是 none。本次實(shí)驗(yàn) compression.type 設(shè)置為 lz4。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=lz4
輸出結(jié)果:
16696 records sent, 3339.2 records/sec (3.26 MB/sec), 1924.5 ms avg latency,
3355.0 ms max latency.
19647 records sent, 3928.6 records/sec (3.84 MB/sec), 4841.5 ms avg latency,
6320.0 ms max latency.
20142 records sent, 4028.4 records/sec (3.93 MB/sec), 6203.2 ms avg latency,
6378.0 ms max latency.
。。。 。。。
20130 records sent, 4024.4 records/sec (3.93 MB/sec), 6073.6 ms avg latency,
6396.0 ms max latency.
19449 records sent, 3889.8 records/sec (3.80 MB/sec), 6195.6 ms avg latency,
6500.0 ms max latency.
19872 records sent, 3972.8 records/sec (3.88 MB/sec), 6274.5 ms avg latency,
6565.0 ms max latency.
1000000 records sent, 3956.087430 records/sec (3.86 MB/sec), 6085.62 ms
avg latency, 6745.00 ms max latency, 6212 ms 50th, 6524 ms 95th, 6610 ms
99th, 6695 ms 99.9th.
6、調(diào)整緩存大小:默認(rèn)生產(chǎn)者端緩存大小 32m。本次實(shí)驗(yàn) buffer.memory 設(shè)置為 64m。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 buffer.memory=67108864
輸出結(jié)果:
20170 records sent, 4034.0 records/sec (3.94 MB/sec), 1669.5 ms avg latency,
3040.0 ms max latency.
21996 records sent, 4399.2 records/sec (4.30 MB/sec), 4407.9 ms avg latency,
5806.0 ms max latency.
22113 records sent, 4422.6 records/sec (4.32 MB/sec), 7189.0 ms avg latency,
8623.0 ms max latency.
。。。 。。。
19818 records sent, 3963.6 records/sec (3.87 MB/sec), 12416.0 ms avg
latency, 12847.0 ms max latency.
20331 records sent, 4062.9 records/sec (3.97 MB/sec), 12400.4 ms avg
latency, 12874.0 ms max latency.
19665 records sent, 3933.0 records/sec (3.84 MB/sec), 12303.9 ms avg
latency, 12838.0 ms max latency.
1000000 records sent, 4020.100503 records/sec (3.93 MB/sec), 11692.17 ms
avg latency, 13796.00 ms max latency, 12238 ms 50th, 12949 ms 95th, 13691
ms 99th, 13766 ms 99.9th.
6.2.Kafka Consumer 壓力測試
1、修改 /opt/module/kafka/config/consumer.properties 文件中的一次拉取條數(shù)為 500:
max.poll.records=500
2、消費(fèi) 100 萬條日志進(jìn)行壓測
bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
參數(shù)說明:
- –bootstrap-server 指定 Kafka 集群地址
- –topic 指定 topic 的名稱
- –messages 總共要消費(fèi)的消息個(gè)數(shù)。本次實(shí)驗(yàn) 100 萬條。
輸出結(jié)果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457,
1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418
3、一次拉取條數(shù)為 2000
(1)修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取條數(shù)為 2000:
max.poll.records=2000
(2)再次執(zhí)行文章來源:http://www.zghlxwxcb.cn/news/detail-823913.html
bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
輸出結(jié)果:文章來源地址http://www.zghlxwxcb.cn/news/detail-823913.html
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:18:06:268, 2022-01-20 10:18:12:863, 977.5146, 148.2206,
1000975, 151777.8620, 358, 6237, 156.7283, 160489.8188
4、調(diào)整 fetch.max.bytes 大小為 100m
(1)修改/opt/module/kafka/config/consumer.properties 文件中的拉取一批數(shù)據(jù)大小 100m:
fetch.max.bytes=104857600
(2)再次執(zhí)行
bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
輸出結(jié)果:
start.time, end.time, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms,
fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:26:13:203, 2022-01-20 10:26:19:662, 977.5146,
151.3415, 1000975, 154973.6801, 362, 6097, 160.3272, 164175.0041
到了這里,關(guān)于【Kafka-3.x-教程】-【七】Kafka 生產(chǎn)調(diào)優(yōu)、Kafka 壓力測試的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!