国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka消費(fèi)者常用超時(shí)時(shí)間配置

這篇具有很好參考價(jià)值的文章主要介紹了Kafka消費(fèi)者常用超時(shí)時(shí)間配置。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

https://blog.csdn.net/BHSZZY/article/details/126757295文章來源地址http://www.zghlxwxcb.cn/news/detail-780232.html

//心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒)

spring.kafka.properties.session.timeout.ms = 25000

//每次拉取的消息減少為20(之前是默認(rèn)值500)

spring.kafka.consumer.max-poll-records=20

//消息消費(fèi)超時(shí)時(shí)間增加為10分鐘

spring.kafka.properties.max.poll.interval.ms=600000

spring設(shè)置kafka超時(shí)時(shí)間沒有生效的解決方法(解決rebalancing問題)

追逐夢(mèng)想永不停

于 2022-09-08 17:58:41 發(fā)布

2799

收藏 7

文章標(biāo)簽: kafka spring java

版權(quán)

華為云開發(fā)者聯(lián)盟

該內(nèi)容已被華為云開發(fā)者聯(lián)盟社區(qū)收錄,社區(qū)免費(fèi)抽大獎(jiǎng)??,贏華為平板、Switch等好禮!

加入社區(qū)

一、前言

最近生產(chǎn)kafka遇到一個(gè)問題,總是隔幾分鐘就rebalancing,導(dǎo)致沒有消費(fèi)者、消息堆積;

平衡好后,正常消費(fèi)消息幾分鐘后,就又開始rebalancing,消息再次堆積,一直循環(huán)。

登錄kafka服務(wù)器,用命令查看kafka組:

//組名是commonGroup,java里設(shè)置的

./kafka-consumer-groups.sh --bootstrap-server 10.123.123.123:9092 --group commonGroup --describe

就會(huì)發(fā)現(xiàn)報(bào)錯(cuò):

warning: Consumer group 'commonGroup' is rebalancing.

此時(shí)組里的所有topic都會(huì)沒有消費(fèi)者。

再查看消費(fèi)者(java后臺(tái))的日志,會(huì)發(fā)現(xiàn)大量的rebalancing語句,與重新加入分組的語句:

//這個(gè)是心跳發(fā)送失敗報(bào)錯(cuò)的日志,因?yàn)榇藭r(shí)在rebalancing

2022-08-25 17:55:41.801 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=commonGroup] Attempt to heartbeat failed since group is rebalancing

//這個(gè)是重新加入分組的日志,重新加入了commonGroup組里的topic為examTake的第13個(gè)分區(qū)(生產(chǎn)topic分了14個(gè)區(qū))

2022-08-30 16:29:27.434 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO o.s.kafka.listener.KafkaMessageListenerContainer - partitions assigned: [examTake-13]

這個(gè)現(xiàn)象會(huì)導(dǎo)致消息堆積2-3分鐘,然后消息會(huì)統(tǒng)一被消費(fèi)一波,然后繼續(xù)堆積2-3分鐘消息;

因?yàn)閗afka不知道為什么總是rebalancing,每次平衡需要2-3分鐘時(shí)間,此時(shí)沒有消費(fèi)者;

平衡好后,消息被消費(fèi)者消費(fèi)一波,就又開始rebalancing。

用戶明顯感覺到系統(tǒng)變慢,需要想辦法解決這個(gè)問題。

二、可能的原因

百度發(fā)現(xiàn),kafka rebalancing發(fā)生的情況,主要有這幾種:

1.有消費(fèi)者新增/減少

如果啟動(dòng)了新的java程序,增加了消費(fèi)者、或者有消費(fèi)者掛了,kafka就會(huì)重新平衡;

但是排查后發(fā)現(xiàn),所有消費(fèi)者日志打印正常,沒有掛掉的,也沒有新增消費(fèi)者,所以不是這個(gè)問題。

2.有消費(fèi)者在規(guī)定時(shí)間內(nèi)未發(fā)送心跳包

spring里可以配置kafka的session超時(shí)時(shí)間(默認(rèn)10秒):

spring.kafka.properties.session.timeout.ms = 10000

以及心跳包發(fā)送時(shí)間間隔(默認(rèn)隔3秒發(fā)送一次):

spring.kafka.properties.heartbeat.interval.ms = 3000

如果有消費(fèi)者在session規(guī)定時(shí)間內(nèi)沒有發(fā)送心跳包,kafka就會(huì)認(rèn)為該消費(fèi)者不可用,開始rebalancing。

但是排查后發(fā)現(xiàn),項(xiàng)目里配置的超時(shí)時(shí)間是15秒,心跳包間隔時(shí)間沒有配置(默認(rèn)3秒),感覺不應(yīng)該有消費(fèi)者15秒內(nèi)一次心跳包也發(fā)不出去(消費(fèi)者日志打印正常,沒有掛掉的),所以不確定是不是這個(gè)問題。

3.有消費(fèi)者在規(guī)定時(shí)間內(nèi)沒有處理完消息

spring里可以配置消費(fèi)者一次拉取的消息數(shù)(默認(rèn)500,低版本kafka好像不支持修改):

spring.kafka.consumer.max-poll-records=500

以及消費(fèi)消息的超時(shí)時(shí)間(默認(rèn)5分鐘):

spring.kafka.properties.max.poll.interval.ms=300000

如果有消費(fèi)者在規(guī)定時(shí)間內(nèi)沒有處理完消息,那么也會(huì)引起kafka的rebalancing。

但是排查后發(fā)現(xiàn),kafka里的待消費(fèi)消息數(shù)很低時(shí)(幾條-幾十條),仍然會(huì)隔幾分鐘就rebalancing一次,然后消費(fèi)者會(huì)很快把消息全部消費(fèi)完,就算是這樣kafka后續(xù)還是會(huì)rebalancing。這樣看來也不是這個(gè)問題。

三、設(shè)置kafka超時(shí)時(shí)間沒有生效的解決方法

1.問題描述

雖然感覺不像是這幾個(gè)原因?qū)е耴afka反復(fù)重新平衡的,但是還是得嘗試解決。

因此,按照網(wǎng)上的方法,在spring項(xiàng)目里的application.properties中進(jìn)行了配置,增加了超時(shí)時(shí)間:

//心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒)

spring.kafka.properties.session.timeout.ms = 25000

//每次拉取的消息減少為20(之前是默認(rèn)值500)

spring.kafka.consumer.max-poll-records=20

//消息消費(fèi)超時(shí)時(shí)間增加為10分鐘

spring.kafka.properties.max.poll.interval.ms=600000

但是配置了之后,啟動(dòng)項(xiàng)目,發(fā)現(xiàn)這些配置都沒有生效,kafka打印的參數(shù)還是之前的:

max.poll.interval.ms = 300000

max.poll.records = 500

session.timeout.ms = 15000

尤其是max.poll.records參數(shù),這個(gè)都可以點(diǎn)進(jìn)jar包里了,不應(yīng)該不生效的:

2.解決方法

(1)百度發(fā)現(xiàn),低版本kafka好像不支持修改max.poll.records;不過目前項(xiàng)目中不是低版本kafka,應(yīng)該是可以設(shè)置的;而且其它參數(shù)總是可以設(shè)置的,問題是不知道為什么沒有生效。

(2)找了半天,發(fā)現(xiàn)項(xiàng)目中有一個(gè)KafkaConfig.java,其中部分配置為:

@Value("${kafka.session.timeout.ms:15000}")

private String sessionTimeout;

@Value("${kafka.consumer.max.poll.records:500}")

private String maxPollRecords;

@Value("${kafka.max.poll.interval.ms:300000}")

private String maxPollIntervalMs;

@Value("${kafka.group.id:commonGroup}")

private String groupId;

private Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

//這個(gè)是組id

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);

//這個(gè)是心跳(session)超時(shí)時(shí)間

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);

//這個(gè)是每次拉取的消息數(shù)量

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

//這個(gè)是消費(fèi)消息的超時(shí)時(shí)間

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaIntegerDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return props;

}

(3)這下application.properties中配置了kafka參數(shù)沒有生效的原因找到了,看來是java與application.properties中同時(shí)配置了kafka參數(shù)的話,會(huì)以java中配置的為準(zhǔn)。

3.結(jié)果

修改java中的kafka配置后,啟動(dòng)日志顯示配置生效了:

max.poll.interval.ms = 600000

max.poll.records = 20

session.timeout.ms = 25000

然而,項(xiàng)目用這個(gè)配置啟動(dòng)后,kafka反復(fù)rebalancing的狀況還是沒有好,并且rebalancing需要的時(shí)間更長(zhǎng)了,從2-3分鐘延長(zhǎng)到了5-10分鐘,消息積壓時(shí)間延長(zhǎng)、用戶體驗(yàn)更差了。

四、kafka反復(fù)rebalancing最終解決方法

1.排查過程

反復(fù)排查了整個(gè)項(xiàng)目,情況如下:

(1)生產(chǎn)環(huán)境最近只發(fā)版了一個(gè)很小的功能,這個(gè)功能不會(huì)造成kafka反復(fù)rebalancing。

(2)生產(chǎn)環(huán)境發(fā)版后,有2天時(shí)間日志是正常的,kafka沒有反復(fù)rebalancing,說明之前的kafka配置基本沒有問題。

(3)第3天下午開始kafka出現(xiàn)了反復(fù)rebalancing問題,但是期間并沒有發(fā)版,也不是用戶訪問量突然增多導(dǎo)致的。

(4)嘗試調(diào)大kafka超時(shí)時(shí)間,但是沒有作用。

(5)重啟了kafka,也重啟了所有消費(fèi)者,但是反復(fù)rebalancing問題并沒有好轉(zhuǎn)。

2.最終解決方法

1.kafka重新平衡是按group的,具體來說就是commonGroup不知道哪里除了問題:

warning: Consumer group 'commonGroup' is rebalancing.

2.因此,決定把這個(gè)組里比較重要的幾個(gè)topic移動(dòng)出去,換到其它組(java里只需要改一行):

//這里沒有顯式配置組,用的是上方KafkaConfig.java里的commonGroup組

//@KafkaListener(topics = "${kafka.topic.commit}")

//改為了顯式配置組,把這個(gè)topic移動(dòng)到新組 commitGroup

@KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")

3.把重要的topic移動(dòng)出去、分到新組后,發(fā)現(xiàn),新組里的topic工作正常,沒有反復(fù)重新平衡;

舊組commonGroup依然有問題,隔一段時(shí)間就會(huì)rebalancing。

4.由于舊組里的topic不太重要,因此消費(fèi)堆積2-3分鐘的問題勉強(qiáng)可以接受;

由于舊組里的topic還有很多,因此暫時(shí)還沒有排查出是哪個(gè)topic及其消費(fèi)者有問題。

5.最后,這個(gè)問題就勉強(qiáng)算解決了,后續(xù)有時(shí)間后再繼續(xù)研究為什么kafka會(huì)反復(fù)rebalancing。

五、備注

1.spring設(shè)置kafka參數(shù)session超時(shí)時(shí)間時(shí),要小于請(qǐng)求超時(shí)時(shí)間與處理超時(shí)時(shí)間,例如:

request.timeout.ms = 30000 session.timeout.ms = 15000 max.poll.interval.ms = 300000

session.timeout.ms < request.timeout.ms

session.timeout.ms < max.poll.interval.ms

2.kafka的topic的分區(qū),最好是有幾個(gè)消費(fèi)者、就創(chuàng)建幾個(gè)分區(qū),這樣可以一一對(duì)應(yīng),一個(gè)消費(fèi)者對(duì)應(yīng)一個(gè)分區(qū)。

3.kafka的rebalancing是按group的,遇到rebalancing問題,可以把重要的topic移動(dòng)到其它group里,試試能不能行;最好是一個(gè)topic一個(gè)group,這樣可以快速定位是哪個(gè)topic出了問題。

————————————————

版權(quán)聲明:本文為CSDN博主「追逐夢(mèng)想永不?!沟脑瓌?chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

原文鏈接:https://blog.csdn.net/BHSZZY/article/details/126757295

參考文檔

https://blog.csdn.net/sgc1310715652/article/details/117621939

https://blog.csdn.net/BHSZZY/article/details/126757295

到了這里,關(guān)于Kafka消費(fèi)者常用超時(shí)時(shí)間配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka:消費(fèi)者參數(shù)配置

    maven配置 springboot配置類 配置文件 參數(shù)配置列表 屬性 說明 bootstrap.servers 向Kafka集群建立初始連接用到的host/port列表。 客戶端會(huì)使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管 是否指定了哪個(gè)服務(wù)器用作引導(dǎo)。 這個(gè)列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始

    2024年02月09日
    瀏覽(28)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    01. 創(chuàng)建消費(fèi)者 在讀取消息之前,需要先創(chuàng)建一個(gè)KafkaConsumer對(duì)象。創(chuàng)建KafkaConsumer對(duì)象與創(chuàng)建KafkaProducer對(duì)象非常相似——把想要傳給消費(fèi)者的屬性放在Properties對(duì)象里。 為簡(jiǎn)單起見,這里只提供4個(gè)必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • Kafka系列——詳解創(chuàng)建Kafka消費(fèi)者及相關(guān)配置

    參考自kafka系列文章——消費(fèi)者創(chuàng)建與配置 在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對(duì)象。 創(chuàng)建 KafkaConsumer 對(duì)象與創(chuàng)建 KafkaProducer 對(duì)象非常相似——把想要傳給消費(fèi)者的屬性放在 Properties 對(duì)象里,后面深入討論所有屬性。這里我們只需要使用 3 個(gè)必要的屬性: bootstrap.

    2024年02月09日
    瀏覽(21)
  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(23)
  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對(duì)應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場(chǎng)景:需要消費(fèi)來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)
  • Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略

    Kafka消費(fèi)者提交Offset的策略有 自動(dòng)提交Offset: 消費(fèi)者將消息拉取下來以后未被消費(fèi)者消費(fèi)前,直接自動(dòng)提交offset。 自動(dòng)提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來以后,消費(fèi)者掛了 手動(dòng)提交Offset 消費(fèi)者在消費(fèi)消息時(shí)/后,再提交o

    2024年02月08日
    瀏覽(22)
  • java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會(huì)從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。 acks :Broker對(duì)消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會(huì)等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會(huì)等待Broker的Leader副本確認(rèn)

    2024年02月16日
    瀏覽(36)
  • RocketMQ消費(fèi)者可以手動(dòng)消費(fèi)但無法主動(dòng)消費(fèi)問題,或生成者發(fā)送超時(shí)

    RocketMQ消費(fèi)者可以手動(dòng)消費(fèi)但無法主動(dòng)消費(fèi)問題,或生成者發(fā)送超時(shí)

    修改rocketmq文件夾broker.conf 在RocketMQ獨(dú)享實(shí)例中支持IPv4和IPv6雙棧,主要是通過在網(wǎng)絡(luò)層面上同時(shí)支持IPv4和IPv6協(xié)議棧來實(shí)現(xiàn)的。RocketMQ的Broker端、Namesrv端和客戶端都需要支持IPv4和IPv6協(xié)議,以便能夠同時(shí)監(jiān)聽I(yíng)Pv4和IPv6地址,并使用相應(yīng)的協(xié)議棧進(jìn)行通信。在Broker端,我們需要在

    2024年02月13日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包