問題場景
1. 應(yīng)用程序異常關(guān)閉導(dǎo)致Offset未提交
首先,Kafka Broker上存儲的消息都有一個Offset的標(biāo)記,然后Kafka的消費者是通過Offset這個標(biāo)記來維護(hù)當(dāng)前已經(jīng)消費的一個數(shù)據(jù)的。消費者每消費一批數(shù)據(jù),Kafka Broker就會更新OffSet的一個值,避免重復(fù)消費的一個問題。
默認(rèn)情況下,消息消費完成以后,會自動提交Offset這樣一個值,避免重復(fù)消費。
Kafka自動提交的邏輯里面,有一個默認(rèn)5秒的一個間隔,也就是說在5秒之后的下一次向Broker去獲取消息的時候來實現(xiàn)Offset的一個提交。所以在Consumer的消費過程中,應(yīng)用程序強(qiáng)制被kill掉或者宕機(jī)的時候,可能會導(dǎo)致Offset沒有提交,從而會產(chǎn)生重復(fù)消費的問題。
2. 消息量大導(dǎo)致Offset自動提交失敗
除此之外,還有另外一種情況也會出現(xiàn)重復(fù)消費,在Kafka里面有一個叫Partition Balance的一個機(jī)制,就是把多個Partition均衡地分配給多個消費者。那么Comsumer會從分配的Partition里面去消費消息。如果Consumer在默認(rèn)的5分鐘以內(nèi)沒辦法處理完這一批消息的時候,就會觸發(fā)Kafka的Rebalance的一個機(jī)制,從而導(dǎo)致Offset自動提交失敗,而在重新Rebalance以后,Consumer端還是會從之前沒有提交的Offset的位置開始去消費,從而去導(dǎo)致重復(fù)消費的一個問題。
解決方法
1. 提高消費端處理性能
提高消費端處理性能,避免觸發(fā)Balance。
比如說我們可以用異步的方式來處理消息,縮短單個消息消費的時長?;蛘哌€可以調(diào)整消息處理的超時時間,我們可以將其調(diào)整得更長一些。同時還可以減少一次性從Broker上拉取數(shù)據(jù)的條數(shù)。
2. 生成md5用來判斷是否消費過
可以針對每條消息生成md5然后保存到mysql或者redis里面,在處理消息之前先去mysql或者redis里面判斷是否已經(jīng)消費過。
判斷是否已經(jīng)存在相同的消息的md5值,如果存在那么就不需要去再消費了。
其實這個方法就是利用冪等性的這樣一個思想來實現(xiàn)。文章來源:http://www.zghlxwxcb.cn/news/detail-851801.html
參考資料:kafka怎么避免重復(fù)消費文章來源地址http://www.zghlxwxcb.cn/news/detail-851801.html
到了這里,關(guān)于kafka怎么避免重復(fù)消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!