https://blog.csdn.net/BHSZZY/article/details/126757295文章來源地址http://www.zghlxwxcb.cn/news/detail-780232.html
//心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒)
spring.kafka.properties.session.timeout.ms = 25000
//每次拉取的消息減少為20(之前是默認(rèn)值500)
spring.kafka.consumer.max-poll-records=20
//消息消費(fèi)超時(shí)時(shí)間增加為10分鐘
spring.kafka.properties.max.poll.interval.ms=600000
spring設(shè)置kafka超時(shí)時(shí)間沒有生效的解決方法(解決rebalancing問題)
追逐夢(mèng)想永不停
于 2022-09-08 17:58:41 發(fā)布
2799
收藏 7
文章標(biāo)簽: kafka spring java
版權(quán)
華為云開發(fā)者聯(lián)盟
該內(nèi)容已被華為云開發(fā)者聯(lián)盟社區(qū)收錄,社區(qū)免費(fèi)抽大獎(jiǎng)??,贏華為平板、Switch等好禮!
加入社區(qū)
一、前言
最近生產(chǎn)kafka遇到一個(gè)問題,總是隔幾分鐘就rebalancing,導(dǎo)致沒有消費(fèi)者、消息堆積;
平衡好后,正常消費(fèi)消息幾分鐘后,就又開始rebalancing,消息再次堆積,一直循環(huán)。
登錄kafka服務(wù)器,用命令查看kafka組:
//組名是commonGroup,java里設(shè)置的
./kafka-consumer-groups.sh --bootstrap-server 10.123.123.123:9092 --group commonGroup --describe
就會(huì)發(fā)現(xiàn)報(bào)錯(cuò):
warning: Consumer group 'commonGroup' is rebalancing.
此時(shí)組里的所有topic都會(huì)沒有消費(fèi)者。
再查看消費(fèi)者(java后臺(tái))的日志,會(huì)發(fā)現(xiàn)大量的rebalancing語句,與重新加入分組的語句:
//這個(gè)是心跳發(fā)送失敗報(bào)錯(cuò)的日志,因?yàn)榇藭r(shí)在rebalancing
2022-08-25 17:55:41.801 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=commonGroup] Attempt to heartbeat failed since group is rebalancing
//這個(gè)是重新加入分組的日志,重新加入了commonGroup組里的topic為examTake的第13個(gè)分區(qū)(生產(chǎn)topic分了14個(gè)區(qū))
2022-08-30 16:29:27.434 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO o.s.kafka.listener.KafkaMessageListenerContainer - partitions assigned: [examTake-13]
這個(gè)現(xiàn)象會(huì)導(dǎo)致消息堆積2-3分鐘,然后消息會(huì)統(tǒng)一被消費(fèi)一波,然后繼續(xù)堆積2-3分鐘消息;
因?yàn)閗afka不知道為什么總是rebalancing,每次平衡需要2-3分鐘時(shí)間,此時(shí)沒有消費(fèi)者;
平衡好后,消息被消費(fèi)者消費(fèi)一波,就又開始rebalancing。
用戶明顯感覺到系統(tǒng)變慢,需要想辦法解決這個(gè)問題。
二、可能的原因
百度發(fā)現(xiàn),kafka rebalancing發(fā)生的情況,主要有這幾種:
1.有消費(fèi)者新增/減少
如果啟動(dòng)了新的java程序,增加了消費(fèi)者、或者有消費(fèi)者掛了,kafka就會(huì)重新平衡;
但是排查后發(fā)現(xiàn),所有消費(fèi)者日志打印正常,沒有掛掉的,也沒有新增消費(fèi)者,所以不是這個(gè)問題。
2.有消費(fèi)者在規(guī)定時(shí)間內(nèi)未發(fā)送心跳包
spring里可以配置kafka的session超時(shí)時(shí)間(默認(rèn)10秒):
spring.kafka.properties.session.timeout.ms = 10000
以及心跳包發(fā)送時(shí)間間隔(默認(rèn)隔3秒發(fā)送一次):
spring.kafka.properties.heartbeat.interval.ms = 3000
如果有消費(fèi)者在session規(guī)定時(shí)間內(nèi)沒有發(fā)送心跳包,kafka就會(huì)認(rèn)為該消費(fèi)者不可用,開始rebalancing。
但是排查后發(fā)現(xiàn),項(xiàng)目里配置的超時(shí)時(shí)間是15秒,心跳包間隔時(shí)間沒有配置(默認(rèn)3秒),感覺不應(yīng)該有消費(fèi)者15秒內(nèi)一次心跳包也發(fā)不出去(消費(fèi)者日志打印正常,沒有掛掉的),所以不確定是不是這個(gè)問題。
3.有消費(fèi)者在規(guī)定時(shí)間內(nèi)沒有處理完消息
spring里可以配置消費(fèi)者一次拉取的消息數(shù)(默認(rèn)500,低版本kafka好像不支持修改):
spring.kafka.consumer.max-poll-records=500
以及消費(fèi)消息的超時(shí)時(shí)間(默認(rèn)5分鐘):
spring.kafka.properties.max.poll.interval.ms=300000
如果有消費(fèi)者在規(guī)定時(shí)間內(nèi)沒有處理完消息,那么也會(huì)引起kafka的rebalancing。
但是排查后發(fā)現(xiàn),kafka里的待消費(fèi)消息數(shù)很低時(shí)(幾條-幾十條),仍然會(huì)隔幾分鐘就rebalancing一次,然后消費(fèi)者會(huì)很快把消息全部消費(fèi)完,就算是這樣kafka后續(xù)還是會(huì)rebalancing。這樣看來也不是這個(gè)問題。
三、設(shè)置kafka超時(shí)時(shí)間沒有生效的解決方法
1.問題描述
雖然感覺不像是這幾個(gè)原因?qū)е耴afka反復(fù)重新平衡的,但是還是得嘗試解決。
因此,按照網(wǎng)上的方法,在spring項(xiàng)目里的application.properties中進(jìn)行了配置,增加了超時(shí)時(shí)間:
//心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒)
spring.kafka.properties.session.timeout.ms = 25000
//每次拉取的消息減少為20(之前是默認(rèn)值500)
spring.kafka.consumer.max-poll-records=20
//消息消費(fèi)超時(shí)時(shí)間增加為10分鐘
spring.kafka.properties.max.poll.interval.ms=600000
但是配置了之后,啟動(dòng)項(xiàng)目,發(fā)現(xiàn)這些配置都沒有生效,kafka打印的參數(shù)還是之前的:
max.poll.interval.ms = 300000
max.poll.records = 500
session.timeout.ms = 15000
尤其是max.poll.records參數(shù),這個(gè)都可以點(diǎn)進(jìn)jar包里了,不應(yīng)該不生效的:
2.解決方法
(1)百度發(fā)現(xiàn),低版本kafka好像不支持修改max.poll.records;不過目前項(xiàng)目中不是低版本kafka,應(yīng)該是可以設(shè)置的;而且其它參數(shù)總是可以設(shè)置的,問題是不知道為什么沒有生效。
(2)找了半天,發(fā)現(xiàn)項(xiàng)目中有一個(gè)KafkaConfig.java,其中部分配置為:
@Value("${kafka.session.timeout.ms:15000}")
private String sessionTimeout;
@Value("${kafka.consumer.max.poll.records:500}")
private String maxPollRecords;
@Value("${kafka.max.poll.interval.ms:300000}")
private String maxPollIntervalMs;
@Value("${kafka.group.id:commonGroup}")
private String groupId;
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//這個(gè)是組id
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
//這個(gè)是心跳(session)超時(shí)時(shí)間
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
//這個(gè)是每次拉取的消息數(shù)量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//這個(gè)是消費(fèi)消息的超時(shí)時(shí)間
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaIntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
(3)這下application.properties中配置了kafka參數(shù)沒有生效的原因找到了,看來是java與application.properties中同時(shí)配置了kafka參數(shù)的話,會(huì)以java中配置的為準(zhǔn)。
3.結(jié)果
修改java中的kafka配置后,啟動(dòng)日志顯示配置生效了:
max.poll.interval.ms = 600000
max.poll.records = 20
session.timeout.ms = 25000
然而,項(xiàng)目用這個(gè)配置啟動(dòng)后,kafka反復(fù)rebalancing的狀況還是沒有好,并且rebalancing需要的時(shí)間更長(zhǎng)了,從2-3分鐘延長(zhǎng)到了5-10分鐘,消息積壓時(shí)間延長(zhǎng)、用戶體驗(yàn)更差了。
四、kafka反復(fù)rebalancing最終解決方法
1.排查過程
反復(fù)排查了整個(gè)項(xiàng)目,情況如下:
(1)生產(chǎn)環(huán)境最近只發(fā)版了一個(gè)很小的功能,這個(gè)功能不會(huì)造成kafka反復(fù)rebalancing。
(2)生產(chǎn)環(huán)境發(fā)版后,有2天時(shí)間日志是正常的,kafka沒有反復(fù)rebalancing,說明之前的kafka配置基本沒有問題。
(3)第3天下午開始kafka出現(xiàn)了反復(fù)rebalancing問題,但是期間并沒有發(fā)版,也不是用戶訪問量突然增多導(dǎo)致的。
(4)嘗試調(diào)大kafka超時(shí)時(shí)間,但是沒有作用。
(5)重啟了kafka,也重啟了所有消費(fèi)者,但是反復(fù)rebalancing問題并沒有好轉(zhuǎn)。
2.最終解決方法
1.kafka重新平衡是按group的,具體來說就是commonGroup不知道哪里除了問題:
warning: Consumer group 'commonGroup' is rebalancing.
2.因此,決定把這個(gè)組里比較重要的幾個(gè)topic移動(dòng)出去,換到其它組(java里只需要改一行):
//這里沒有顯式配置組,用的是上方KafkaConfig.java里的commonGroup組
//@KafkaListener(topics = "${kafka.topic.commit}")
//改為了顯式配置組,把這個(gè)topic移動(dòng)到新組 commitGroup
@KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")
3.把重要的topic移動(dòng)出去、分到新組后,發(fā)現(xiàn),新組里的topic工作正常,沒有反復(fù)重新平衡;
舊組commonGroup依然有問題,隔一段時(shí)間就會(huì)rebalancing。
4.由于舊組里的topic不太重要,因此消費(fèi)堆積2-3分鐘的問題勉強(qiáng)可以接受;
由于舊組里的topic還有很多,因此暫時(shí)還沒有排查出是哪個(gè)topic及其消費(fèi)者有問題。
5.最后,這個(gè)問題就勉強(qiáng)算解決了,后續(xù)有時(shí)間后再繼續(xù)研究為什么kafka會(huì)反復(fù)rebalancing。
五、備注
1.spring設(shè)置kafka參數(shù)session超時(shí)時(shí)間時(shí),要小于請(qǐng)求超時(shí)時(shí)間與處理超時(shí)時(shí)間,例如:
request.timeout.ms = 30000 session.timeout.ms = 15000 max.poll.interval.ms = 300000
session.timeout.ms < request.timeout.ms
session.timeout.ms < max.poll.interval.ms
2.kafka的topic的分區(qū),最好是有幾個(gè)消費(fèi)者、就創(chuàng)建幾個(gè)分區(qū),這樣可以一一對(duì)應(yīng),一個(gè)消費(fèi)者對(duì)應(yīng)一個(gè)分區(qū)。
3.kafka的rebalancing是按group的,遇到rebalancing問題,可以把重要的topic移動(dòng)到其它group里,試試能不能行;最好是一個(gè)topic一個(gè)group,這樣可以快速定位是哪個(gè)topic出了問題。
————————————————
版權(quán)聲明:本文為CSDN博主「追逐夢(mèng)想永不?!沟脑瓌?chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/BHSZZY/article/details/126757295
參考文檔
https://blog.csdn.net/sgc1310715652/article/details/117621939文章來源:http://www.zghlxwxcb.cn/news/detail-780232.html
https://blog.csdn.net/BHSZZY/article/details/126757295
到了這里,關(guān)于Kafka消費(fèi)者常用超時(shí)時(shí)間配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!