生產(chǎn)環(huán)境偶爾會遇到kafka消費者程序日志報錯的問題
截取主要日志如下:
2023-10-02 19:35:28.554 {trace: d7f97f70dd693e3d} ERROR[Thread-49:137] ConsumerCoordinator$OffsetCommitResponseHandler.handle(812) - [Consumer clientId=consumer-1, groupId=cid_yingzi_fpf_group_device] Offset commit failed on partition topic_dvc_telemetery_bh_bh100-1 at offset 4313614: The request timed out.
2023-10-02 19:35:28.554 {trace: d7f97f70dd693e3d} INFO [Thread-49:137] AbstractCoordinator.markCoordinatorUnknown(727) - [Consumer clientId=consumer-1, groupId=cid_yingzi_fpf_group_device] Group coordinator kafka02.yingzi.com:19292 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
kafka客戶端版本為2.2.0
結(jié)合日志去閱讀代碼,只能大概定位到,是客戶端程序向server發(fā)送commit offset請求的時候,server返回的錯誤信息是:The request timed out
看到 request timed out,第一時間很可能會誤以為是客戶端向server發(fā)送請求超時了。但是查看OffsetCommitResponseHandler.handle()的代碼,發(fā)現(xiàn)server是成功返回信息了的。
server返回的數(shù)據(jù)是一個Map<TopicPartition, Errors>結(jié)構(gòu),每個partition都對應(yīng)一個Errors結(jié)果,如果Errors為Errors.NONE,則表示offset提交成功。
如果Errors不為Errors.NONE,則打印錯誤日志,也就是上面的 Offset commit failed … The request timed out的日志,每個partition打印1條日志。
也就是說,問題發(fā)生在server內(nèi)部處理的時候,可以排除掉是客戶端和server的網(wǎng)絡(luò)問題導(dǎo)致的超時
要繼續(xù)深挖,需要了解下server的處理邏輯,server的入口代碼在KafkaApis.handleOffsetCommitRequest()
查看代碼邏輯,可以發(fā)現(xiàn)早期的offset是保存在zk中,新版本中改為存在kafka的topic中(往__consumer_offsets這個topic發(fā)消息,每個partition一條offset消息)
那么分析下來,大概率就是往__consumer_offsets topic發(fā)消息的時候,產(chǎn)生了超時
繼續(xù)閱讀client的代碼,了解Offset commit的機制
在KafkaConsumer.poll()的代碼中,每次拉取消息時,都會調(diào)用updateAssignmentMetadataIfNeeded()這個方法,這個方法最終會調(diào)用maybeAutoCommitOffsetsAsync()方法
maybeAutoCommitOffsetsAsync()方法根據(jù)autoCommitIntervalMs來判斷,是否要提交offset
這里默認是5秒執(zhí)行一次commit offset請求,每次會把訂閱的所有topic和partition的信息都進行提交
每個topic每個partition對應(yīng)1條消息,如果topic非常多的話,那往__consumer_offsets發(fā)送消息量也會很大
查看生產(chǎn)kafka的監(jiān)控,__consumer_offsets每秒消息量大概為四五千
顯然是不太合理的
于是通過命令去消費__consumer_offsets的數(shù)據(jù)進行查看,注意由于這里消息是序列化的,直接消費的話會顯示亂碼,要通過-formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"去進行消息解碼
測試環(huán)境消費命令如下:
/opt/software/kafka/kafka_2.11-2.0.0_test_yewu/bin/kafka-console-consumer.sh --topic __consumer_offsets --group yz.bigdata.tzy --bootstrap-server kafka-test01.yingzi.com:32295 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
生產(chǎn)環(huán)境統(tǒng)計的消息量最多的group如下(消費25秒):
37485 cid_opf_scene_edgengine_offline_alarm
27216 idp_share_telemetery_scene
25776 idp_scene_lifecycle_offline_alarm
4892 cid_scene_dvc_tele_attr
1440 yingzi-scene-space-group
546 clickhouse-iotcenter-latest-rep
533 re-Main-consumer-b
533 clickhouse-iotcenter-rep
437 yingzi-bizcenter-dvc-telemetery-group
318 cid_yingzi_hwm_group
288 cid_yingzi_fpf_group_device
270 transport-b-node-tb-mqtt-transport-ca-b-8
258 transport-b-node-tb-mqtt-transport-ca-b-6
222 transport-b-node-tb-mqtt-transport-ca-b-5
208 tb-core-b-consumer
200 yz.bigdata.wns
排名前幾的都是跟設(shè)備有關(guān)的,消費幾百個topic
按照上面的分析,每個group每秒發(fā)送的消息量應(yīng)該為:1 / auto-commit-interval * Sum(topic partirionNum)
但是實際計算下來,感覺不應(yīng)該這么高才對
先針對cid_opf_scene_edgengine_offline_alarm這個group進行查看,這個group訂閱的topic有250個,每個topic 6個partition
1/52506=300
但是實際37485/25 = 1500
差了5倍之多
于是找到對應(yīng)的開發(fā)人員,查看kafka的配置,發(fā)現(xiàn)配置:spring.kafka.consumer.auto-commit-interval=1000
提交offset的間隔默認5秒,被人工修改為1秒,正好相差5倍
其他兩個消息量很高的group,經(jīng)分析也是一樣的問題
溝通后建議還是把spring.kafka.consumer.auto-commit-interval配置改回默認值,后續(xù)再繼續(xù)進行觀察文章來源:http://www.zghlxwxcb.cn/news/detail-725501.html
當(dāng)然問題的根本原因其實還是設(shè)計不合理,kafka的性能本身就是會隨著topic的增多而降低的,設(shè)計上應(yīng)該盡量避免產(chǎn)生很多個topic才對,這里就不再展開討論了文章來源地址http://www.zghlxwxcb.cn/news/detail-725501.html
到了這里,關(guān)于kafka消費者程序日志報錯Offset commit failed問題研究的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!