? ? ? ? 起因:車聯(lián)網(wǎng)項(xiàng)目開發(fā),車輛發(fā)生故障需要給三個(gè)系統(tǒng)推送消息,故障上報(bào)較為頻繁,所以為了不阻塞主流程,采用了使用kafka。消費(fèi)方負(fù)責(zé)推送并保存推送記錄,但在一次壓測(cè)中發(fā)現(xiàn),實(shí)際只發(fā)生了10次故障,但是推送記錄卻有30多條。
????????問題排查,發(fā)現(xiàn)是因?yàn)槠渲幸粋€(gè)系統(tǒng)宕機(jī),導(dǎo)致往這個(gè)系統(tǒng)推送消息時(shí),一直連接超時(shí),導(dǎo)致每條消息的推送時(shí)長(zhǎng)被拉長(zhǎng)。而且kafka消息拉取參數(shù)max-poll-records設(shè)置了500,意味著一次會(huì)批量拉取500條消息到本地處理,而max.poll.interval.ms參數(shù)默認(rèn)是5分鐘,當(dāng)500條消息處理時(shí)長(zhǎng)超過5分鐘后,就會(huì)認(rèn)為消費(fèi)者死掉了,觸發(fā)再均衡,導(dǎo)致同一個(gè)消息被重復(fù)消費(fèi)。
解決:
? ? ? ? 主要是提高消費(fèi)者的處理速度,避免不必要的Rebalance。主要采用2種措施:
- 減少每次拉去消息數(shù)max-poll-records,從500,降到20
- 拉取到消息之后異步處理(創(chuàng)建線程池,對(duì)推送消息的部分利用多線程處理)
常見配置
fetch.min.byte:配置Consumer一次拉取請(qǐng)求中能從Kafka中拉取的最小數(shù)據(jù)量,默認(rèn)為1B,如果小于這個(gè)參數(shù)配置的值,就需要進(jìn)行等待,直到數(shù)據(jù)量滿足這個(gè)參數(shù)的配置大小。調(diào)大可以提交吞吐量,但也會(huì)造成延遲
fetch.max.bytes,一次拉取數(shù)據(jù)的最大數(shù)據(jù)量,默認(rèn)為52428800B,也就是50M,但是如果設(shè)置的值過小,甚至小于每條消息的值,實(shí)際上也是能消費(fèi)成功的
fetch.wait.max.ms,若是不滿足fetch.min.bytes時(shí),等待消費(fèi)端請(qǐng)求的最長(zhǎng)等待時(shí)間,默認(rèn)是500ms
max.poll.records,單次poll調(diào)用返回的最大消息記錄數(shù),如果處理邏輯很輕量,可以適當(dāng)提高該值。一次從kafka中poll出來的數(shù)據(jù)條數(shù),max.poll.records條數(shù)據(jù)需要在在session.timeout.ms這個(gè)時(shí)間內(nèi)處理完,默認(rèn)值為500
consumer.poll(100)?,100 毫秒是一個(gè)超時(shí)時(shí)間,一旦拿到足夠多的數(shù)據(jù)(fetch.min.bytes 參數(shù)設(shè)置),consumer.poll(100)會(huì)立即返回 ConsumerRecords<String, String> records。如果沒有拿到足夠多的數(shù)據(jù),會(huì)阻塞100ms,但不會(huì)超過100ms就會(huì)返回
max.poll.interval.ms,兩次拉取消息的間隔,默認(rèn)5分鐘;通過消費(fèi)組管理消費(fèi)者時(shí),該配置指定拉取消息線程最長(zhǎng)空閑時(shí)間,若超過這個(gè)時(shí)間間隔沒有發(fā)起poll操作,則消費(fèi)組認(rèn)為該消費(fèi)者已離開了消費(fèi)組,將進(jìn)行再均衡操作(將分區(qū)分配給組內(nèi)其他消費(fèi)者成員)
若超過這個(gè)時(shí)間則報(bào)如下異常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
即:無法完成提交,因?yàn)榻M已經(jīng)重新平衡并將分區(qū)分配給另一個(gè)成員。這意味著對(duì)poll()的后續(xù)調(diào)用之間的時(shí)間比配置的max.poll.interval.ms長(zhǎng),這通常意味著poll循環(huán)花費(fèi)了太多的時(shí)間來處理消息。
可以通過增加max.poll.interval.ms來解決這個(gè)問題,也可以通過減少在poll()中使用max.poll.records返回的批的最大大小來解決這個(gè)問題。
max.partition.fetch.bytes:該屬性指定了服務(wù)器從每個(gè)分區(qū)返回給消費(fèi)者的最大字節(jié)數(shù),默認(rèn)為 1MB。
session.timeout.ms:消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)是 3s,將觸發(fā)再均衡操作。
對(duì)于每一個(gè)Consumer Group,Kafka集群為其從Broker集群中選擇一個(gè)Broker作為其Coordinator。Coordinator主要做兩件事:
-
維持Group成員的組成。這包括加入新的成員,檢測(cè)成員的存活性,清除不再存活的成員。
-
協(xié)調(diào)Group成員的行為。文章來源:http://www.zghlxwxcb.cn/news/detail-648604.html
poll機(jī)制文章來源地址http://www.zghlxwxcb.cn/news/detail-648604.html
- 每次poll的消息處理完成之后再進(jìn)行下一次poll,是同步操作
- 每次poll之前檢查是否可以進(jìn)行位移提交,如果可以,那么就會(huì)提交上一次輪詢的位移
- 每次poll時(shí),consumer都將嘗試使用上次消費(fèi)的offset作為起始o(jì)ffset,然后依次拉取消息
- poll(long timeout),timeout指等待輪詢緩沖區(qū)的數(shù)據(jù)所花費(fèi)的時(shí)間,單位是毫秒
到了這里,關(guān)于記一次Kafka重復(fù)消費(fèi)解決過程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!