国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

記一次Kafka重復(fù)消費(fèi)解決過程

這篇具有很好參考價(jià)值的文章主要介紹了記一次Kafka重復(fù)消費(fèi)解決過程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

? ? ? ? 起因:車聯(lián)網(wǎng)項(xiàng)目開發(fā),車輛發(fā)生故障需要給三個(gè)系統(tǒng)推送消息,故障上報(bào)較為頻繁,所以為了不阻塞主流程,采用了使用kafka。消費(fèi)方負(fù)責(zé)推送并保存推送記錄,但在一次壓測(cè)中發(fā)現(xiàn),實(shí)際只發(fā)生了10次故障,但是推送記錄卻有30多條。

????????問題排查,發(fā)現(xiàn)是因?yàn)槠渲幸粋€(gè)系統(tǒng)宕機(jī),導(dǎo)致往這個(gè)系統(tǒng)推送消息時(shí),一直連接超時(shí),導(dǎo)致每條消息的推送時(shí)長(zhǎng)被拉長(zhǎng)。而且kafka消息拉取參數(shù)max-poll-records設(shè)置了500,意味著一次會(huì)批量拉取500條消息到本地處理,而max.poll.interval.ms參數(shù)默認(rèn)是5分鐘,當(dāng)500條消息處理時(shí)長(zhǎng)超過5分鐘后,就會(huì)認(rèn)為消費(fèi)者死掉了,觸發(fā)再均衡,導(dǎo)致同一個(gè)消息被重復(fù)消費(fèi)。

解決:

? ? ? ? 主要是提高消費(fèi)者的處理速度,避免不必要的Rebalance。主要采用2種措施:

  1. 減少每次拉去消息數(shù)max-poll-records,從500,降到20
  2. 拉取到消息之后異步處理(創(chuàng)建線程池,對(duì)推送消息的部分利用多線程處理)

常見配置

fetch.min.byte:配置Consumer一次拉取請(qǐng)求中能從Kafka中拉取的最小數(shù)據(jù)量,默認(rèn)為1B,如果小于這個(gè)參數(shù)配置的值,就需要進(jìn)行等待,直到數(shù)據(jù)量滿足這個(gè)參數(shù)的配置大小。調(diào)大可以提交吞吐量,但也會(huì)造成延遲

fetch.max.bytes,一次拉取數(shù)據(jù)的最大數(shù)據(jù)量,默認(rèn)為52428800B,也就是50M,但是如果設(shè)置的值過小,甚至小于每條消息的值,實(shí)際上也是能消費(fèi)成功的

fetch.wait.max.ms,若是不滿足fetch.min.bytes時(shí),等待消費(fèi)端請(qǐng)求的最長(zhǎng)等待時(shí)間,默認(rèn)是500ms

max.poll.records,單次poll調(diào)用返回的最大消息記錄數(shù),如果處理邏輯很輕量,可以適當(dāng)提高該值。一次從kafka中poll出來的數(shù)據(jù)條數(shù),max.poll.records條數(shù)據(jù)需要在在session.timeout.ms這個(gè)時(shí)間內(nèi)處理完,默認(rèn)值為500

consumer.poll(100)?,100 毫秒是一個(gè)超時(shí)時(shí)間,一旦拿到足夠多的數(shù)據(jù)(fetch.min.bytes 參數(shù)設(shè)置),consumer.poll(100)會(huì)立即返回 ConsumerRecords<String, String> records。如果沒有拿到足夠多的數(shù)據(jù),會(huì)阻塞100ms,但不會(huì)超過100ms就會(huì)返回

max.poll.interval.ms,兩次拉取消息的間隔,默認(rèn)5分鐘;通過消費(fèi)組管理消費(fèi)者時(shí),該配置指定拉取消息線程最長(zhǎng)空閑時(shí)間,若超過這個(gè)時(shí)間間隔沒有發(fā)起poll操作,則消費(fèi)組認(rèn)為該消費(fèi)者已離開了消費(fèi)組,將進(jìn)行再均衡操作(將分區(qū)分配給組內(nèi)其他消費(fèi)者成員)

若超過這個(gè)時(shí)間則報(bào)如下異常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already
 rebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or by
 reducing the maximum size of batches returned in poll() with max.poll.records. 

  即:無法完成提交,因?yàn)榻M已經(jīng)重新平衡并將分區(qū)分配給另一個(gè)成員。這意味著對(duì)poll()的后續(xù)調(diào)用之間的時(shí)間比配置的max.poll.interval.ms長(zhǎng),這通常意味著poll循環(huán)花費(fèi)了太多的時(shí)間來處理消息。

可以通過增加max.poll.interval.ms來解決這個(gè)問題,也可以通過減少在poll()中使用max.poll.records返回的批的最大大小來解決這個(gè)問題。

max.partition.fetch.bytes:該屬性指定了服務(wù)器從每個(gè)分區(qū)返回給消費(fèi)者的最大字節(jié)數(shù),默認(rèn)為 1MB。

session.timeout.ms:消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)是 3s,將觸發(fā)再均衡操作。

對(duì)于每一個(gè)Consumer Group,Kafka集群為其從Broker集群中選擇一個(gè)Broker作為其Coordinator。Coordinator主要做兩件事:

  1. 維持Group成員的組成。這包括加入新的成員,檢測(cè)成員的存活性,清除不再存活的成員。

  2. 協(xié)調(diào)Group成員的行為。

poll機(jī)制文章來源地址http://www.zghlxwxcb.cn/news/detail-648604.html

  •    每次poll的消息處理完成之后再進(jìn)行下一次poll,是同步操作
  •    每次poll之前檢查是否可以進(jìn)行位移提交,如果可以,那么就會(huì)提交上一次輪詢的位移
  •    每次poll時(shí),consumer都將嘗試使用上次消費(fèi)的offset作為起始o(jì)ffset,然后依次拉取消息
  •    poll(long timeout),timeout指等待輪詢緩沖區(qū)的數(shù)據(jù)所花費(fèi)的時(shí)間,單位是毫秒

到了這里,關(guān)于記一次Kafka重復(fù)消費(fèi)解決過程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【JAVA】生產(chǎn)環(huán)境kafka重復(fù)消費(fèi)問題記錄

    業(yè)務(wù)系統(tǒng)每周都有定時(shí)任務(wù)在跑,由于是大任務(wù)因此采用分而治之思想將其拆分為多個(gè)分片小任務(wù)采用 kafka異步隊(duì)列消費(fèi) 的形式來減少服務(wù)器壓力,每個(gè)小任務(wù)都會(huì)調(diào)用后臺(tái)的c++算法,調(diào)用完成之后便會(huì)回寫數(shù)據(jù)庫的成功次數(shù)。今天觀測(cè)到定時(shí)任務(wù)的分片小任務(wù)存在被重復(fù)消

    2024年04月12日
    瀏覽(19)
  • JAVA面試題分享一百六十二:Kafka消息重復(fù)消費(fèi)問題?

    消息重復(fù)消費(fèi)的根本原因都在于:已經(jīng)消費(fèi)了數(shù)據(jù),但是offset沒有成功提交。 其中很大一部分原因在于發(fā)生了再均衡。 1)消費(fèi)者宕機(jī)、重啟等。導(dǎo)致消息已經(jīng)消費(fèi)但是沒有提交offset。 2)消費(fèi)者使用自動(dòng)提交offset,但當(dāng)還沒有提交的時(shí)候,有新的消費(fèi)者加入或者移除,發(fā)生

    2024年02月03日
    瀏覽(19)
  • Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費(fèi)者沒有自動(dòng)消費(fèi)新的分區(qū)的解決方法

    Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費(fèi)者沒有自動(dòng)消費(fèi)新的分區(qū)的解決方法

    生產(chǎn)環(huán)境Kafka集群壓力大,Topic讀寫壓力大,消費(fèi)的lag比較大,因此通過擴(kuò)容Topic的分區(qū),增大Topic的讀寫性能 理論上下游消費(fèi)者應(yīng)該能夠自動(dòng)消費(fèi)到新的分區(qū),例如flume消費(fèi)到了新的分區(qū),但是實(shí)際情況是存在flink消費(fèi)者沒有消費(fèi)到新的分區(qū) 出現(xiàn)無法消費(fèi)topic新的分區(qū)這種情況

    2024年02月14日
    瀏覽(63)
  • 【kafka】記一次kafka磁盤空間爆滿問題處理

    問題如下: 1、今天忽然出現(xiàn)告警,kafka某節(jié)點(diǎn)出現(xiàn)磁盤使用率超過80%告警,回顧最近操作,沒有什么大業(yè)務(wù)變動(dòng),此kafka集群已經(jīng)平穩(wěn)運(yùn)行1300多天; 2、檢查集群發(fā)現(xiàn),只有此節(jié)點(diǎn)有異常,磁盤空間消耗過度; 百度了下,然后如下方案成功解決 查看現(xiàn)有的__consumer_offsets清理

    2024年02月13日
    瀏覽(18)
  • 【kafka】記一次kafka基于linux的原生命令的使用

    【kafka】記一次kafka基于linux的原生命令的使用

    環(huán)境是linux,4臺(tái)機(jī)器,版本3.6,kafka安裝在node 1 2 3 上,zookeeper安裝在node2 3 4上。 安裝好kafka,進(jìn)入bin目錄,可以看到有很多sh文件,是我們執(zhí)行命令的基礎(chǔ)。 啟動(dòng)kafka,下面的命令的后面帶的配置文件的相對(duì)路徑 遇到不熟悉的sh文件,直接輸入名字并回車,就會(huì)提示你可用的

    2024年02月05日
    瀏覽(27)
  • kafka怎么避免重復(fù)消費(fèi)

    kafka怎么避免重復(fù)消費(fèi)

    首先,Kafka Broker上存儲(chǔ)的消息都有一個(gè)Offset的標(biāo)記,然后Kafka的消費(fèi)者是通過Offset這個(gè)標(biāo)記來維護(hù)當(dāng)前已經(jīng)消費(fèi)的一個(gè)數(shù)據(jù)的。消費(fèi)者每消費(fèi)一批數(shù)據(jù),Kafka Broker就會(huì)更新OffSet的一個(gè)值,避免重復(fù)消費(fèi)的一個(gè)問題。 默認(rèn)情況下,消息消費(fèi)完成以后,會(huì)自動(dòng)提交Offset這樣一個(gè)值

    2024年04月15日
    瀏覽(29)
  • 記一次kafka消息積壓的排查

    kafka消息積壓報(bào)警,首先進(jìn)行了自查,這個(gè)現(xiàn)象頻頻出現(xiàn),之前每次都是先重新分配分區(qū)或者回溯(消息可丟棄防止大量積壓消費(fèi)跟不上)。 根據(jù)手冊(cè)首先排查下消息拉取是否正常,看到了消息拉取線程是waiting狀態(tài),然后看到kafka這塊邏輯是消費(fèi)線程阻塞了拉取線程。 對(duì)比了

    2024年03月24日
    瀏覽(21)
  • Kafka重復(fù)消費(fèi)、Dubbo重復(fù)調(diào)用問題排查

    Kafka重復(fù)消費(fèi)、Dubbo重復(fù)調(diào)用問題排查

    ????????本業(yè)務(wù)為車機(jī)流量充值業(yè)務(wù),大致流程為:收到微信、支付寶端用戶支付成功回調(diào)后,將用戶訂單信息發(fā)送至kafka中;消費(fèi)者接收到kafka中信息后進(jìn)行解析,處理用戶訂單信息,為用戶訂購相關(guān)流量包(調(diào)用電信相關(guān)接口),訂購成功/失敗后會(huì)通過MQTT發(fā)送訂購成功

    2024年03月24日
    瀏覽(18)
  • kafka如何避免消息重復(fù)消費(fèi)

    kafka如何避免消息重復(fù)消費(fèi)

    Kafka 避免消息重復(fù)消費(fèi)通常依賴于以下策略和機(jī)制: Kafka使用Consumer Group ID來跟蹤每個(gè)消費(fèi)者所讀取的消息。確保每個(gè)消費(fèi)者都具有唯一的Group ID。如果多個(gè)消費(fèi)者屬于同一個(gè)Group ID,那么它們將共享消息,但每個(gè)分區(qū)的消息只能由一個(gè)消費(fèi)者處理。 Kafka會(huì)記錄每個(gè)消費(fèi)者組消

    2024年01月15日
    瀏覽(23)
  • 記一次線上kafka造成的事故

    背景:所有的原始數(shù)據(jù)均存儲(chǔ)在mysql,mysql會(huì)通過binlog將數(shù)據(jù)同步至kafka消息隊(duì)列,但是有人將mysql中的數(shù)據(jù)進(jìn)行刪除(大概有2、3年的數(shù)據(jù)),被刪除的數(shù)據(jù)也通過binlog被同步至消息隊(duì)列里導(dǎo)致大量消息積壓,且該消息隊(duì)列只有3個(gè)分區(qū),最多3個(gè)線程消費(fèi),消費(fèi)方即使過濾也遠(yuǎn)

    2024年02月13日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包