前言
今天分享一下kafka的消息丟失問題,kafka的消息丟失是一個(gè)很值得關(guān)注的問題,根據(jù)消息的重要性,消息丟失的嚴(yán)重性也會(huì)進(jìn)行放大,如何從最大程度上保證消息不丟失,要從生產(chǎn)者,消費(fèi)者,broker幾個(gè)端來說。
消息發(fā)送和接收流程
kafka生產(chǎn)者生產(chǎn)好消息后,會(huì)將消息發(fā)送到broker節(jié)點(diǎn),broker對數(shù)據(jù)進(jìn)行存儲(chǔ),kafka的消息是順序存儲(chǔ)在磁盤上,以主題(topic),分區(qū)(partition)的邏輯進(jìn)行劃分,消息最終存儲(chǔ)在日志文件中,消費(fèi)者會(huì)循環(huán)從broker拉取消息。
那么從上圖的圖中可以看出kafka丟消息可能存在的三個(gè)地方分別為:
- 生產(chǎn)者到broker
- broker到磁盤
- 消費(fèi)者
生產(chǎn)者到broker消息丟失
生產(chǎn)者發(fā)送消息到broker是會(huì)存在消息丟失的,大多可能是由于網(wǎng)絡(luò)原因引起的,消息中間件中一般都是通過ack來解決這個(gè)問題的,kafka中可以通過設(shè)置ack來解決這個(gè)問題。
acks有三種類型:
- 0
- 1
- -1(all)
acks為0
acks設(shè)置為0,代表生產(chǎn)者發(fā)送消息后就不管不顧了,不用等待broker的任何響應(yīng),那么可能網(wǎng)絡(luò)異常或者其他原因?qū)е耣roker沒有處理到到這條消息,那么消息就丟失了。
acks為1
acks設(shè)置為1,代表生產(chǎn)者發(fā)送消息到broker后,只需要broker的leader副本確認(rèn)收到后就成功響應(yīng),不需要follower副本響應(yīng),就算follower副本崩潰了,也會(huì)成功響應(yīng)。
acks為-1(all)
acks設(shè)置為-1,或者為all,那么生產(chǎn)者發(fā)送消息需要leader和follower都收到并寫入消息才成功響應(yīng)生產(chǎn)者,也就是ISR
集合要全部寫入,當(dāng)ISR集合中只要有一個(gè)沒有寫入成功,那么就收到失敗響應(yīng),所以acks=-1能夠在最大程度上保證消息的不丟失,但是也是有條件的,需要ISR集合中有兩個(gè)以上副本才能保證,如果只有一個(gè)副本,那么就是就只有一個(gè)leader,沒有follower,如果leader掛掉,就不能選舉出一個(gè)eader,消息自然也就丟失,這和acks=1是一樣的
。
解決消息丟失
從上面三種類型的acks中我們可以看出,acks=-1是保證消息從生產(chǎn)者到broker不丟失的最佳設(shè)置方式,不過我們也能想到,它需要ISR每個(gè)副本都成功應(yīng)答,所以它的效率自然沒有前面兩個(gè)高,不過此篇我們討論的是保證消息不丟失問題,所以一切從不丟失層面區(qū)說。
如果消息發(fā)送失敗,那么生產(chǎn)者可以重試發(fā)送消息,可以手動(dòng)在代碼中編寫消息重發(fā)邏輯,也可以配置重試參數(shù)。
- retries
- retry.backoff.ms
retries表示重試次數(shù),retry.backoff.ms表示重試時(shí)間間隔,比如第一次重試依舊沒成功,那么隔多久再進(jìn)行重試,kafka重試的底層邏輯是將沒發(fā)送成功的消息重新入隊(duì),因?yàn)閗afka的生產(chǎn)者生產(chǎn)消息后,消息并非就直接發(fā)送到broker,而是保存在生產(chǎn)者端的收集器(RecordAccumulator)
,然后由Sender線程去獲取RecordAccumulator中的消息,然后再發(fā)送給broker,當(dāng)消息發(fā)送失敗后,會(huì)將消息重新放入RecordAccumulator中,具體邏輯可以看kafka的生產(chǎn)者端Sender的源碼。
消息重發(fā)引起的消息順序性問題
要注意,消息發(fā)送失敗進(jìn)行重發(fā)不能保證消息發(fā)送的順序性,這里的順序性是單分區(qū)順序性
,如果服務(wù)對于消息的順序性有嚴(yán)格的要求,那么我們可以通過設(shè)置屬性max.in.flight.requests.per.connection=1
來保證消息的順序性,這個(gè)配置對應(yīng)的是kafka中InFlightRequests
,max.in.flight.requests.per.connection
代表請求的個(gè)數(shù),kafka在創(chuàng)建Sender的時(shí)候會(huì)判斷,如果maxInflightRequests為1,那么guaranteeMessageOrder就為true,就能保證消息的順序性。
broker到磁盤丟消息
broker收到消息后,需要將消息寫入磁盤的log文件中,但是并不是馬上寫,因?yàn)槲覀冎?,生產(chǎn)者發(fā)送消息后,消費(fèi)者那邊需要馬上獲取,如果broker要寫入磁盤,那么消費(fèi)者拉取消息,broker還要從log文件中獲取消息,這顯然是不合理的,所以kafka引入了(page cache)頁緩存。
page cache是磁盤和broker之間的消息映射關(guān)系,它是基于內(nèi)存的,當(dāng)broker收到消息后,會(huì)將消息寫入page cache,然后由操作系統(tǒng)進(jìn)行刷盤,將page cache中的數(shù)據(jù)寫入磁盤。
如果broker發(fā)生故障,那么此時(shí)page cache的數(shù)據(jù)就會(huì)丟失,broker端可以設(shè)置刷盤的參數(shù),比如多久刷盤一次,不過這個(gè)參數(shù)不建議去修改,最好的方案還是設(shè)置多副本,一個(gè)分區(qū)設(shè)置幾個(gè)副本,當(dāng)broker故障的時(shí)候,如果還有其他副本,那么數(shù)據(jù)就不會(huì)丟失。
消費(fèi)者丟消息
kafka的消費(fèi)模式是拉模式,需要不斷地向broker拉取消息,拉取的消息消費(fèi)了以后需要提交offset,也就是提交offset這里可能會(huì)出現(xiàn)丟消息,kafka中提供了和offset相關(guān)的幾個(gè)配置項(xiàng)。
- enable.auto.commit
- auto.commit.interval.ms
- auto.offset.reset
下面我們先了解一下kafka offset的提交和參數(shù)詳解。
enable.auto.commit代表是否自動(dòng)提交offset,默認(rèn)為true,auto.commit.interval.ms代表多久提交一次offset,默認(rèn)為5秒。
如下圖,當(dāng)前消費(fèi)者消費(fèi)到了分區(qū)中為3的消息。
那么下次當(dāng)消費(fèi)者讀取消息的時(shí)候是從哪里讀取呢,當(dāng)然從4
開始讀取,因?yàn)槭菑纳洗巫x取的offset的下一位開始讀取,所以我們就說當(dāng)前消費(fèi)組的offset為4
,,因?yàn)橄麓问菑?開始消費(fèi),如果5秒之內(nèi)又消費(fèi)了兩條消息然后自動(dòng)提交了offset,那么此時(shí)的offset如下:
enable.auto.commit如果為false,就代表不會(huì)自動(dòng)提交offset。
auto.offset.reset=latest代表從分區(qū)中最新的offset
處開始讀取消息,比如某個(gè)消費(fèi)者組上次提交的偏移量為5,然后后面又生產(chǎn)了2條消息,再次讀取消息時(shí),讀取到的是6,7,8
這個(gè)三個(gè)消息,如果enable.auto.commit
設(shè)置為false,那么不管往分區(qū)中寫入多少消息,都是從6
開始讀取消息。
此時(shí)如果一個(gè)新的的消費(fèi)組
訂閱了這個(gè)分區(qū),因?yàn)檫@個(gè)消費(fèi)者組沒有在這個(gè)分區(qū)提交過offset,所以它獲取消息并不是從6
開始獲取,而是從1
開始獲取。
所以可知每個(gè)消費(fèi)者組在分區(qū)中的offset是獨(dú)立的。
auto.offset.reset還可以設(shè)置為earliest
和none
,使用earliest,如果此消費(fèi)組從來沒有提交過offset,那么就從頭開始消費(fèi),如果提交過offset,那么就從最新的offset處消費(fèi),就和latest一樣了
,使用none,如果消費(fèi)組沒有提交過offset,在分區(qū)中找不到任何offset,那么就會(huì)拋出異常。
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [stock1-0]
復(fù)制代碼
上面我們初步了解了offset的一些知識,對offset的提交和和讀取有一些了解,因?yàn)樯厦嫖覀冎惶峒皁ffset的自動(dòng)提交,而自動(dòng)提交的主動(dòng)權(quán)在kafka,而不在我們,所以可能因?yàn)橐恍┰蚨鴮?dǎo)致消息丟失。
消息處理異常
當(dāng)我們收到消息后對消息進(jìn)行處理,如果在處理的過程中發(fā)生異常,而又設(shè)置為自動(dòng)提交offset,那么消息沒有處理成功,offset已經(jīng)提交了,當(dāng)下次獲取消息的時(shí)候,由于已經(jīng)提交過ofset,所以之前的消息就獲取不到了,所以應(yīng)該改為手動(dòng)提交offset,當(dāng)消息處理成功后,再進(jìn)行手動(dòng)提交offset。文章來源:http://www.zghlxwxcb.cn/news/detail-474714.html
總結(jié)
關(guān)于kafka的消息丟失問題和解決方案就說到這里,我們分別從生產(chǎn)者到broker,broker到磁盤以及消費(fèi)者端進(jìn)行說明,也引申出一些知識點(diǎn),可能平時(shí)沒有遇到消息丟失的情況,那是因?yàn)榫W(wǎng)絡(luò)比較可靠,數(shù)據(jù)量可能不大,但是如果要真的實(shí)現(xiàn)高可用,高可靠,那么就需要對其進(jìn)行設(shè)計(jì)。文章來源地址http://www.zghlxwxcb.cn/news/detail-474714.html
到了這里,關(guān)于一文讀懂kafka消息丟失問題和解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!