?一.容錯(cuò)機(jī)制
在Flink中,有一套完整的容錯(cuò)機(jī)制來(lái)保證故障后的恢復(fù),其中最重要的就是檢查點(diǎn)。
1.1?檢查點(diǎn)(Checkpoint)
在流處理中,我們可以用存檔讀檔的思路,將之前某個(gè)時(shí)間點(diǎn)的所有狀態(tài)保存下來(lái),這份存檔就被稱為“檢查點(diǎn)(CkeckPoint)”。
當(dāng)Flink程序異常重啟時(shí),我們就可以在檢查點(diǎn)中“讀檔”,恢復(fù)出異常之前的狀態(tài)。
?1.1.1?檢查點(diǎn)的保存
(1) 周期性的觸發(fā)保存
在Flink中,檢查點(diǎn)的保存是周期性觸發(fā)的,間隔時(shí)間可以進(jìn)行設(shè)置。但是不建議保存太頻繁,會(huì)消耗很多資源來(lái)做檢查點(diǎn)。
(2)?保存的時(shí)間點(diǎn)
我們應(yīng)該在所有任務(wù)(算子)都恰好處理完一個(gè)相同的輸入數(shù)據(jù)的時(shí)候,將它們的狀態(tài)保存下來(lái)。
這樣做可以實(shí)現(xiàn)一個(gè)數(shù)據(jù)被所有任務(wù)(算子)完整地處理完,狀態(tài)得到了保存。
如果出現(xiàn)故障,我們恢復(fù)到之前保存的狀態(tài),故障時(shí)正在處理的所有數(shù)據(jù)都需要重新處理;我們只需要讓源(source)任務(wù)向數(shù)據(jù)源重新提交偏移量、請(qǐng)求重放數(shù)據(jù)就可以了(即重新將故障時(shí)的數(shù)據(jù)讀入Flink)。當(dāng)然這需要源任務(wù)可以把偏移量作為算子狀態(tài)保存下來(lái),而且外部數(shù)據(jù)源能夠重置偏移量;kafka就是滿足這些要求的一個(gè)最好的例子。
(3)?保存的具體流程
檢查點(diǎn)的保存,最關(guān)鍵的就是要等所有任務(wù)將“同一個(gè)數(shù)據(jù)”處理完畢。
例如詞頻統(tǒng)計(jì),依次輸入“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…
例如每個(gè)任務(wù)算子都處理完“hello”之后,可以保存自己的狀態(tài)。
1.1.2 從檢查點(diǎn)恢復(fù)狀態(tài)
(1)檢查點(diǎn)的保存具體流程
接著上面的例子,當(dāng)我們需要保存檢查點(diǎn)時(shí),就是在所有任務(wù)算子將“同一個(gè)數(shù)據(jù)”處理完畢后,對(duì)所有狀態(tài)進(jìn)行快照并保存。例如輸入“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…在第三個(gè)數(shù)據(jù)“hello”被所有任務(wù)處理完時(shí),做了檢查點(diǎn),保存了當(dāng)前所有狀態(tài)。
(2) 處理數(shù)據(jù)過(guò)程發(fā)生故障
當(dāng)發(fā)生故障時(shí),就需要找到最近一次成功保存的檢查點(diǎn)來(lái)恢復(fù)狀態(tài)。
例如在第三條數(shù)據(jù)“hello”處理完后保存了一次檢查點(diǎn),然后繼續(xù)運(yùn)行,正常處理了第四條數(shù)據(jù)“flink”,隨即在處理第五條數(shù)據(jù)“hello”時(shí)發(fā)生故障。
此時(shí),source任務(wù)處理完畢,偏移量為5,map任務(wù)也處理完畢,處理到KeyBy時(shí)發(fā)生故障,此時(shí)狀態(tài)未保存。
(3) 重啟應(yīng)用 -> 讀取檢查點(diǎn),重置狀態(tài)?
1.重啟應(yīng)用
遇到故障后,需要重啟Flink程序,屆時(shí),重啟后的所有任務(wù)的狀態(tài)會(huì)被清空。
2.讀取檢查點(diǎn),重置狀態(tài)?
找到最近一次保存的檢查點(diǎn),從中讀出每個(gè)算子的快照,并分別填充到對(duì)于的算子狀態(tài)中,這樣Flink內(nèi)部算子的狀態(tài)就恢復(fù)到了保存檢查點(diǎn)的那一刻,就是處理完第三條數(shù)據(jù)時(shí)。
3.重置偏移量
?此時(shí)從檢查點(diǎn)恢復(fù)狀態(tài)后還存在一個(gè)問(wèn)題,如果接著處理故障后的數(shù)據(jù)也就是第6、7條數(shù)據(jù),那么從最后一次檢查點(diǎn)到故障前的數(shù)據(jù)(第4、5條的“flink”,“hello”)則被丟棄了,就造成了計(jì)算結(jié)果錯(cuò)誤。
為了不丟數(shù)據(jù),我們應(yīng)該從最后一次保存的檢查點(diǎn)后重新讀取數(shù)據(jù)(重放),這可以通過(guò)Source任務(wù)向外部系統(tǒng)提交偏移量(offset)來(lái)解決。
這樣,整個(gè)系統(tǒng)的狀態(tài)已經(jīng)完全回退到了檢查點(diǎn)保存完成的那一時(shí)刻。
4.繼續(xù)處理數(shù)據(jù)
接下來(lái)繼續(xù)處理重放的第4、5條數(shù)據(jù),接著處理后續(xù)的數(shù)據(jù)。
在處理完上次發(fā)生故障的數(shù)據(jù)時(shí),就已經(jīng)完全恢復(fù)了正常,似乎沒(méi)有發(fā)生過(guò)故障,也沒(méi)有造成重復(fù)計(jì)算導(dǎo)致計(jì)算錯(cuò)誤,這就保證了計(jì)算結(jié)果的準(zhǔn)確性。在分布式系統(tǒng)中,這叫做實(shí)現(xiàn)了“精準(zhǔn)一次”(exactly-once)的狀態(tài)一致性保證。?
1.1.3?檢查點(diǎn)算法
在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暫停整體流處理的前提下,將狀態(tài)備份保存到檢查點(diǎn)。
1.1.3.1 檢查點(diǎn)分界線(Barrier)
借鑒水位線的設(shè)計(jì),在數(shù)據(jù)流中插入一個(gè)特殊的數(shù)據(jù)結(jié)構(gòu),專門用來(lái)表示觸發(fā)檢查點(diǎn)保存的時(shí)間點(diǎn)。收到保存檢查點(diǎn)的指令后,Source任務(wù)可以在當(dāng)前數(shù)據(jù)流中插入這個(gè)結(jié)構(gòu);之后的所有任務(wù)只要遇到它就開(kāi)始對(duì)狀態(tài)做持久化快照保存。由于數(shù)據(jù)流是保持順序依次處理的,因此遇到這個(gè)標(biāo)識(shí)就代表之前的數(shù)據(jù)都處理完了,可以保存一個(gè)檢查點(diǎn);而在它之后的數(shù)據(jù),引起的狀態(tài)改變就不會(huì)體現(xiàn)在這個(gè)檢查點(diǎn)中,而需要保存到下一個(gè)檢查點(diǎn)。
這種特殊的數(shù)據(jù)形式,把一條流上的數(shù)據(jù)按照不同的檢查點(diǎn)分隔開(kāi),所以就叫做檢查點(diǎn)的“分界線”(Checkpoint Barrier)。
具體實(shí)現(xiàn):
在JobManager中有一個(gè)“檢查點(diǎn)協(xié)調(diào)器”,專門用來(lái)協(xié)調(diào)處理檢查點(diǎn)相關(guān)的工作。檢查點(diǎn)協(xié)調(diào)器會(huì)定期向TaskManager發(fā)出指令,要求保存檢查點(diǎn)(攜帶檢查點(diǎn)ID)。TaskManager會(huì)讓所有的Source任務(wù)把自己的偏移量(Source任務(wù)狀態(tài))保存起來(lái),并將帶有檢查的ID的分界線插入當(dāng)前數(shù)據(jù)流中,然后該分界線會(huì)像正常數(shù)據(jù)一樣向下游傳遞,當(dāng)下游算子任務(wù)遇到分界線則保存自己的狀態(tài)。
簡(jiǎn)單來(lái)說(shuō),就是在該需要保存檢查點(diǎn)時(shí),JobManager中的“檢查點(diǎn)協(xié)調(diào)器”會(huì)向TaskManager發(fā)出指令要求保存檢查點(diǎn),這時(shí),TaskManager會(huì)在會(huì)讓所有的Source任務(wù)保存自己的狀態(tài),并在當(dāng)前流插入一個(gè)特殊的數(shù)據(jù)(分界線),分界線會(huì)依次向下游傳遞,當(dāng)下游的算子遇到分界線就保存自己的狀態(tài),這個(gè)分界線后面到達(dá)的數(shù)據(jù)則屬于下一個(gè)檢查點(diǎn)的數(shù)據(jù)了。這也是很符合“流”的概念。
1.1.3.2 分布式快照算法(Barrier對(duì)齊的精準(zhǔn)一次)?
barrier指示的是“之前所有數(shù)據(jù)的狀態(tài)更改保存入當(dāng)前檢查點(diǎn)”:其實(shí)是一個(gè)“截止時(shí)間”的標(biāo)志。所以在處理多個(gè)分區(qū)的傳遞時(shí),也要以是否還會(huì)有數(shù)據(jù)到來(lái)作為一個(gè)判斷標(biāo)準(zhǔn)。
具體實(shí)現(xiàn)上,F(xiàn)link使用了Chandy-Lamport算法的一種變體,被稱為“異步分界線快照”算法。算法的核心就是兩個(gè)原則:
- 當(dāng)上游任務(wù)向多個(gè)并行下游任務(wù)發(fā)送barrier時(shí),需要廣播出去;
- 而當(dāng)多個(gè)上游任務(wù)向同一個(gè)下游任務(wù)傳遞分界線時(shí),需要在下游任務(wù)執(zhí)行“分界線對(duì)齊”操作,也就是需要等到所有并行分區(qū)的barrier都到齊,才可以開(kāi)始狀態(tài)的保存。
檢查點(diǎn)算法的并行場(chǎng)景?
當(dāng)前應(yīng)用全局并行度為2,Source也有兩個(gè)并行任務(wù),分別讀取兩條數(shù)據(jù)流,流中數(shù)據(jù)都是一個(gè)一個(gè)的單詞。此時(shí)第一條流讀了三條數(shù)據(jù),Source偏移量為3.;第二條流讀了一條數(shù)據(jù),Source偏移量為1。
檢查點(diǎn)保存算法具體過(guò)程為:
(1)觸發(fā)檢查點(diǎn):JobManager向Source發(fā)送Barrier;
????????JobManager發(fā)送指令,觸發(fā)檢查點(diǎn)保存;所有的Source任務(wù)中插入一個(gè)Barrier(分界線),并保存Source的偏移量(狀態(tài))。
? ? ? ? 說(shuō)明:檢查點(diǎn)保存時(shí),只會(huì)保存分界線到來(lái)前的所有狀態(tài)。并且該操作并不會(huì)影響其上下游算子任務(wù)的正常運(yùn)行。?
(2)Barrier發(fā)送:向下游廣播發(fā)送;
? ? ? ? Source狀態(tài)保存完成后,會(huì)返回通知給Source任務(wù),隨后Source任務(wù)會(huì)像JobManager發(fā)送ACK來(lái)確認(rèn)檢查點(diǎn)完成,然后繼續(xù)將Barrier(分界線)向下游傳遞
? ? ? ? 此時(shí),由于Source算子和Map算子是一對(duì)一的關(guān)系,可以直接將Barrier傳遞給Map算子。
(3)向下游多個(gè)并行算子廣播分界線,執(zhí)行分界線對(duì)齊;
? ? ? ? Map算子沒(méi)有狀態(tài),則直接將Barrier繼續(xù)向下游傳遞。這時(shí)由于進(jìn)行到了KeyBy分區(qū)操作,會(huì)將Barrier廣播到下游并行的兩個(gè)Sum任務(wù),這時(shí),Sum算子可能會(huì)收到來(lái)自上游兩個(gè)并行Map任務(wù)的Barrier,所以需要執(zhí)行“分界線對(duì)齊”操作。
????????此時(shí),Sum2接收到了上游Map傳來(lái)的兩個(gè)Barrier,說(shuō)明第一條流的三條數(shù)據(jù)和第二條流的一條數(shù)據(jù)都已經(jīng)處理完畢,則可以進(jìn)行狀態(tài)保存。而Sum1只收到了一個(gè)Barrier,則必須等待Barrier到齊才可以保存狀態(tài),此時(shí)Sum1分界線后的數(shù)據(jù)則會(huì)被緩存起來(lái),等到當(dāng)前檢查點(diǎn)保存后再處理。分界線前的所有狀態(tài)才會(huì)被保存。
(4)狀態(tài)保存:有狀態(tài)的算子將狀態(tài)保存至持久化。
? ? ? ? 各個(gè)分區(qū)的分界線到齊后,就可以對(duì)當(dāng)前狀態(tài)做快照,保存到持久化存儲(chǔ)。存儲(chǔ)完成后,同樣繼續(xù)將Barrier向下游繼續(xù)傳遞,并通知JobManager檢查點(diǎn)保存完畢。
在這個(gè)過(guò)程中,每個(gè)任務(wù)保存自己的狀態(tài)都是相對(duì)獨(dú)立的,互不影響,并且不影響流中其他算子的正常運(yùn)行。?
說(shuō)明:
????????由于分界線對(duì)齊要求先到達(dá)的分區(qū)做緩存等待,一定程度上會(huì)影響處理的速度;當(dāng)出現(xiàn)背壓時(shí),下游任務(wù)會(huì)堆積大量的緩沖數(shù)據(jù),檢查點(diǎn)可能需要很久才可以保存完畢。
????????為了應(yīng)對(duì)這種場(chǎng)景,Barrier對(duì)齊中提供了至少一次語(yǔ)義以及Flink 1.11之后提供了不對(duì)齊的檢查點(diǎn)保存方式,可以將未處理的緩沖數(shù)據(jù)也保存進(jìn)檢查點(diǎn)。這樣,當(dāng)我們遇到一個(gè)分區(qū)barrier時(shí)就不需等待對(duì)齊,而是可以直接啟動(dòng)狀態(tài)的保存了。
1.1.3.3 分布式快照算法(Barrier對(duì)齊的至少一次)?
之前的精準(zhǔn)一次中,在進(jìn)行“分界線對(duì)齊”時(shí),下游算子必須等待上游算子發(fā)來(lái)的所有的Barrier到齊才可以進(jìn)行狀態(tài)保存,并且Barrier后到達(dá)的數(shù)據(jù)都會(huì)被緩存起來(lái),不會(huì)被當(dāng)前檢查點(diǎn)所計(jì)算和保存。
而B(niǎo)arrier對(duì)齊的至少一次指的是,但在等待所有的Barrier到齊之前,到達(dá)該分區(qū)的數(shù)據(jù)會(huì)被直接計(jì)算,并被保存至此檢查點(diǎn)。這樣的話,如果程序重啟,數(shù)據(jù)重放時(shí),介于兩個(gè)Barrier之間到達(dá)的數(shù)據(jù)會(huì)被再次計(jì)算。(至少一次)
優(yōu)點(diǎn):數(shù)據(jù)無(wú)需阻塞,也就不需要額外的空間對(duì)其存儲(chǔ)。
缺點(diǎn):程序重啟可能會(huì)造成數(shù)據(jù)重復(fù)計(jì)算。
1.1.3.4 分布式快照算法(非Barrier對(duì)齊的精準(zhǔn)一次)
知識(shí):數(shù)據(jù)會(huì)先進(jìn)入算子的輸入緩沖區(qū),處理完畢后進(jìn)入該算子的輸出緩存區(qū),再發(fā)往下游算子的輸入緩沖區(qū)。
非Barrier對(duì)齊的精準(zhǔn)一次指的就是,當(dāng)下游算子接收到一個(gè)Barrier時(shí)(到達(dá)輸入緩沖區(qū)時(shí)),會(huì)直接將第一個(gè)Barrier放到輸入緩沖區(qū)末端,繼續(xù)向下游傳遞。被第一個(gè)Barrier越過(guò)的輸入緩沖區(qū)和輸出緩沖區(qū)的數(shù)據(jù)以及在其他Barrier之前的數(shù)據(jù)會(huì)被標(biāo)記,在進(jìn)行狀態(tài)保存時(shí),這些被標(biāo)記的數(shù)據(jù)和狀態(tài)都會(huì)被保存進(jìn)檢查點(diǎn),在進(jìn)行恢復(fù)時(shí),則直接恢復(fù)這些數(shù)據(jù)和狀態(tài)。
優(yōu)點(diǎn):數(shù)據(jù)無(wú)需阻塞
缺點(diǎn):增大IO壓力
1.1.3.5?檢查點(diǎn)算法總結(jié)
1.Barrier對(duì)齊:一個(gè)Task等待所有上游發(fā)送同編號(hào)Barrier到齊后,才對(duì)自己的本地狀態(tài)做備份。
? ? ? ? 精準(zhǔn)一次:在Barrier對(duì)齊過(guò)程中,Barrier后面的數(shù)據(jù)阻塞等待(被緩存),不會(huì)越過(guò)Barrier。
? ? ? ? 至少一次:在Barrier對(duì)齊過(guò)程中,第一個(gè)Barrier后的數(shù)據(jù)不阻塞,接著計(jì)算。
2.非Barrier對(duì)齊:一個(gè)Task收到第一個(gè)Barrier時(shí),開(kāi)始執(zhí)行備份,最后一個(gè)Barrier到達(dá)時(shí)結(jié)束備份。
? ? ? ? 先到的Barrier,將本地狀態(tài)備份,后面的數(shù)據(jù)接著計(jì)算
? ? ? ? 未到的Barrier,之前的數(shù)據(jù)接著計(jì)算,同時(shí)將這些數(shù)據(jù)保存到備份中
? ? ? ? 最后一個(gè)Barrier到達(dá)時(shí),該Task備份結(jié)束
1.1.4 檢查點(diǎn)配置
檢查點(diǎn)的作用是為了故障恢復(fù),我們不能因?yàn)楸4鏅z查點(diǎn)占據(jù)了大量時(shí)間、導(dǎo)致數(shù)據(jù)處理性能明顯降低。為了兼顧容錯(cuò)性和處理性能,我們可以在代碼中對(duì)檢查點(diǎn)進(jìn)行各種配置。
1.1.4.1 檢查點(diǎn)常用配置
啟用檢查點(diǎn)
// 啟用檢查點(diǎn),周期性保存(5s),默認(rèn)Barrier對(duì)齊,精準(zhǔn)一次
env.enableCheckpointing(1000);
獲取檢查點(diǎn)配置,后續(xù)配置都需要基于checkPointConfig
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
指定檢查點(diǎn)存儲(chǔ)位置
// 指定檢查點(diǎn)存儲(chǔ)位置,可以是HDFS,也可以是本地路徑
checkpointConfig.setCheckpointStorage("hdfs://hadoop:8001/checkpoint");
檢查點(diǎn)執(zhí)行超時(shí)時(shí)間
// checkPoint執(zhí)行超時(shí)時(shí)間,超時(shí)則認(rèn)為失敗(默認(rèn)十分鐘)
checkpointConfig.setCheckpointTimeout(60000);
checkPoint最大并行數(shù)量
// 最大同時(shí)運(yùn)行的checkPoint數(shù)量,推薦為1,減少程序壓力
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkPoint最小等待間隔?
// 最小等待間隔,指的是 上一輪checkPoint結(jié)束 到 下次checkPoint開(kāi)始之間的間隔,大于0,則checkPoint最大數(shù)量為1
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
取消作業(yè)時(shí)(Cancel),checkPoint的數(shù)據(jù)是否保存在外部存儲(chǔ)系統(tǒng)中
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
// DELETE_ON_CANCELLATION:任務(wù)主動(dòng)取消,不保留checkPoint;程序異常退出,則不會(huì)刪除
// RETAIN_ON_CANCELLATION:任務(wù)主動(dòng)取消,保留checkPoint
允許checkPint連續(xù)失敗次數(shù)
// 允許checkPint連續(xù)失敗次數(shù),默認(rèn)為0 ,超過(guò)時(shí)任務(wù)會(huì)掛掉
checkpointConfig.setTolerableCheckpointFailureNumber(10);
開(kāi)啟非對(duì)齊檢查點(diǎn)(barrier非對(duì)齊)
// 開(kāi)啟非對(duì)齊檢查點(diǎn)(barrier非對(duì)齊)
// 開(kāi)啟要求:checkPoint最大并發(fā)為1,并且checkPoint模式為精準(zhǔn)一次
checkpointConfig.enableUnalignedCheckpoints();
// 設(shè)置對(duì)齊超時(shí)時(shí)間
// 默認(rèn)為0,默認(rèn)直接使用非Barrier對(duì)齊
// 當(dāng)對(duì)齊超時(shí)時(shí)間>1時(shí),會(huì)先使用Barrier對(duì)齊,對(duì)齊時(shí)間超過(guò)這個(gè)參時(shí),則切換為非Barrier對(duì)齊
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1000));
1.1.4.2 最終檢查點(diǎn)
如果數(shù)據(jù)源是有界的,就可能出現(xiàn)部分Task已經(jīng)處理完所有數(shù)據(jù),變成finished狀態(tài),不繼續(xù)工作。從 Flink 1.14 開(kāi)始,這些finished狀態(tài)的Task,也可以繼續(xù)執(zhí)行檢查點(diǎn)。自 1.15 起默認(rèn)啟用此功能,并且可以通過(guò)功能標(biāo)志禁用它(不推薦禁用):
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
1.1.5 保存點(diǎn)(Savepoint)
除了檢查點(diǎn)外,F(xiàn)link還提供了另一個(gè)非常獨(dú)特的鏡像保存功能——保存點(diǎn)(savepoint)。
從名稱就可以看出,這也是一個(gè)存盤的備份,它的原理和算法與檢查點(diǎn)完全相同,只是多了一些額外的元數(shù)據(jù)。
1.1.5.1 保存點(diǎn)的用途
保存點(diǎn)與檢查點(diǎn)最大的區(qū)別,就是觸發(fā)的時(shí)機(jī)。檢查點(diǎn)是由Flink自動(dòng)管理的,定期創(chuàng)建,發(fā)生故障之后自動(dòng)讀取進(jìn)行恢復(fù),這是一個(gè)“自動(dòng)存盤”的功能;而保存點(diǎn)不會(huì)自動(dòng)創(chuàng)建,必須由用戶明確地手動(dòng)觸發(fā)保存操作,所以就是“手動(dòng)存盤”。
保存點(diǎn)可以當(dāng)作一個(gè)強(qiáng)大的運(yùn)維工具來(lái)使用。我們可以在需要的時(shí)候創(chuàng)建一個(gè)保存點(diǎn),然后停止應(yīng)用,做一些處理調(diào)整之后再?gòu)谋4纥c(diǎn)重啟。它適用的具體場(chǎng)景有:
- 版本管理和歸檔存儲(chǔ)
- 更新Flink版本
- 更新應(yīng)用程序
- 調(diào)整并行度
- 暫停應(yīng)用程序
需要注意的是,保存點(diǎn)能夠在程序更改的時(shí)候依然兼容,前提是狀態(tài)的拓?fù)浣Y(jié)構(gòu)和數(shù)據(jù)類型不變。我們知道保存點(diǎn)中狀態(tài)都是以算子ID-狀態(tài)名稱這樣的key-value組織起來(lái)的,算子ID可以在代碼中直接調(diào)用SingleOutputStreamOperator的.uid()方法來(lái)進(jìn)行指定:
DataStream<String> stream = env
.addSource(new StatefulSource()).uid("source-id") // 指定算子Uid
.map(new StatefulMapper()).uid("mapper-id")
.print();
對(duì)于沒(méi)有設(shè)置ID的算子,F(xiàn)link默認(rèn)會(huì)自動(dòng)進(jìn)行設(shè)置,所以在重新啟動(dòng)應(yīng)用后可能會(huì)導(dǎo)致ID不同而無(wú)法兼容以前的狀態(tài)。所以為了方便后續(xù)的維護(hù),強(qiáng)烈建議在程序中為每一個(gè)算子手動(dòng)指定ID。
?1.1.5.2?使用保存點(diǎn)
保存點(diǎn)的使用非常簡(jiǎn)單,我們可以使用命令行工具來(lái)創(chuàng)建保存點(diǎn),也可以從保存點(diǎn)恢復(fù)作業(yè)。
1.創(chuàng)建保存點(diǎn)
要在命令行中為運(yùn)行的作業(yè)創(chuàng)建一個(gè)保存點(diǎn)鏡像,只需要執(zhí)行:
bin/flink savepoint :jobId [:targetDirectory]
這里jobId需要填充要做鏡像保存的作業(yè)ID,目標(biāo)路徑targetDirectory可選,表示保存點(diǎn)存儲(chǔ)的路徑。
對(duì)于保存點(diǎn)的默認(rèn)路徑,可以通過(guò)配置文件flink-conf.yaml中的state.savepoints.dir項(xiàng)來(lái)設(shè)定:
state.savepoints.dir: hdfs:///flink/savepoints
?當(dāng)然對(duì)于單獨(dú)的作業(yè),我們也可以在程序代碼中通過(guò)執(zhí)行環(huán)境來(lái)設(shè)置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
?由于創(chuàng)建保存點(diǎn)一般都是希望更改環(huán)境之后重啟,所以創(chuàng)建之后往往緊接著就是停掉作業(yè)的操作。除了對(duì)運(yùn)行的作業(yè)創(chuàng)建保存點(diǎn),我們也可以在停掉一個(gè)作業(yè)時(shí)直接創(chuàng)建保存點(diǎn):
bin/flink stop --savepointPath [:targetDirectory] :jobId
2.從保存點(diǎn)重啟應(yīng)用
我們已經(jīng)知道,提交啟動(dòng)一個(gè)Flink作業(yè),使用的命令是flink run;現(xiàn)在要從保存點(diǎn)重啟一個(gè)應(yīng)用,其實(shí)本質(zhì)是一樣的:
bin/flink run -s :savepointPath [:runArgs]
這里只要增加一個(gè)-s參數(shù),指定保存點(diǎn)的路徑就可以了,其它啟動(dòng)時(shí)的參數(shù)還是完全一樣的,如果是基于yarn的運(yùn)行模式還需要加上 -yid application-id。當(dāng)使用web UI進(jìn)行作業(yè)提交時(shí),可以填入的參數(shù)除了入口類、并行度和運(yùn)行參數(shù),還有一個(gè)“Savepoint Path”,這就是從保存點(diǎn)啟動(dòng)應(yīng)用的配置。
1.1.5.3 使用保存點(diǎn)切換狀態(tài)后端
在命令行重新恢復(fù)作業(yè)時(shí),在命令行中添加以下命令以切換狀態(tài)后端
-D state.backend=rocksdb
1.2? 狀態(tài)一致性
1.2.1 一致性的概念和級(jí)別
一致性其實(shí)就是結(jié)果的正確性,一般從數(shù)據(jù)丟失、數(shù)據(jù)重復(fù)來(lái)評(píng)估。
流式計(jì)算本身就是一個(gè)一個(gè)來(lái)的,所以正常處理的過(guò)程中結(jié)果肯定是正確的;但在發(fā)生故障、需要恢復(fù)狀態(tài)進(jìn)行回滾時(shí)就需要更多的保障機(jī)制了。我們通過(guò)檢查點(diǎn)的保存來(lái)保證狀態(tài)恢復(fù)后結(jié)果的正確,所以主要討論的就是“狀態(tài)的一致性”。
一般說(shuō)來(lái),狀態(tài)一致性有三種級(jí)別:
- 最多一次(At-Most-Once)
- 至少一次(At-Least-Once)
- 精確一次(Exactly-Once)
1.2.2 端到端的狀態(tài)一致性
在Flink中可以通過(guò)檢查點(diǎn)機(jī)制來(lái)保障內(nèi)部狀態(tài)的一致性,但往往在實(shí)際應(yīng)用中,F(xiàn)link是從外部系統(tǒng)(Source)中讀取數(shù)據(jù),最終輸出到外部系統(tǒng)(Sink)中,并不是Flink可以做到精確一次,整個(gè)程序在異常時(shí)就不會(huì)出現(xiàn)如何問(wèn)題。例如外部數(shù)據(jù)源并不支持?jǐn)?shù)據(jù)重放。這就要求我們不僅要考慮Flink內(nèi)部數(shù)據(jù)的處理轉(zhuǎn)換,還涉及到從外部數(shù)據(jù)源讀取,以及寫入外部持久化系統(tǒng),整個(gè)應(yīng)用處理流程從頭到尾都應(yīng)該是正確的。
所以完整的流處理應(yīng)用,應(yīng)該包括了數(shù)據(jù)源、流處理器和外部存儲(chǔ)系統(tǒng)三個(gè)部分。這個(gè)完整應(yīng)用的一致性,就叫做“端到端(end-to-end)的狀態(tài)一致性”,它取決于三個(gè)組件中最弱的那一環(huán)。一般來(lái)說(shuō),能否達(dá)到at-least-once一致性級(jí)別,主要看數(shù)據(jù)源能夠重放數(shù)據(jù);而能否達(dá)到exactly-once級(jí)別,流處理器內(nèi)部、數(shù)據(jù)源、外部存儲(chǔ)都要有相應(yīng)的保證機(jī)制。
1.3 端到端精確一次(End-To-End Exactly-Once)?
實(shí)際應(yīng)用中,最難做到、也最希望做到的一致性語(yǔ)義,無(wú)疑就是端到端(end-to-end)的“精確一次”。我們知道,對(duì)于Flink內(nèi)部來(lái)說(shuō),檢查點(diǎn)機(jī)制可以保證故障恢復(fù)后數(shù)據(jù)不丟(在能夠重放的前提下),并且只處理一次,所以已經(jīng)可以做到exactly-once的一致性語(yǔ)義了。
所以端到端一致性的關(guān)鍵點(diǎn),就在于輸入的數(shù)據(jù)源端和輸出的外部存儲(chǔ)端。
1.3.1 輸入端保證?
輸入端主要指的就是Flink讀取的外部數(shù)據(jù)源。對(duì)于一些數(shù)據(jù)源來(lái)說(shuō),并不提供數(shù)據(jù)的緩沖或是持久化保存,數(shù)據(jù)被消費(fèi)之后就徹底不存在了,例如socket文本流。對(duì)于這樣的數(shù)據(jù)源,故障后我們即使通過(guò)檢查點(diǎn)恢復(fù)之前的狀態(tài),可保存檢查點(diǎn)之后到發(fā)生故障期間的數(shù)據(jù)已經(jīng)不能重發(fā)了,這就會(huì)導(dǎo)致數(shù)據(jù)丟失。所以就只能保證at-most-once的一致性語(yǔ)義,相當(dāng)于沒(méi)有保證。
想要在故障恢復(fù)后不丟數(shù)據(jù),外部數(shù)據(jù)源就必須擁有重放數(shù)據(jù)的能力。例如Fafka可以重置偏移量來(lái)達(dá)到數(shù)據(jù)重放,這也是實(shí)現(xiàn)端到端exactly-once的基本要求。
1.3.2 輸出端保證
有了Flink的檢查點(diǎn)機(jī)制,以及可重放數(shù)據(jù)的外部數(shù)據(jù)源,我們已經(jīng)能做到at-least-once了。但是想要實(shí)現(xiàn)exactly-once卻有更大的困難:數(shù)據(jù)有可能重復(fù)寫入外部系統(tǒng)。
為了防止數(shù)據(jù)重復(fù)寫入外部系統(tǒng),保證exactly-once一致性的寫入方式有兩種:
- 冪等寫入
- 事務(wù)寫入
1.冪等(Idempotent)寫入
所謂“冪等”操作,就是說(shuō)一個(gè)操作可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改。也就是說(shuō),后面再重復(fù)執(zhí)行就不會(huì)對(duì)結(jié)果起作用了。
例如使用Redis中的鍵值存儲(chǔ)、MySQL中的唯一約束等。
2.事務(wù)(Transactional)寫入
事務(wù)有兩種實(shí)現(xiàn)方式:預(yù)寫日志(WAL)和兩階段提交(2PC)
(1)預(yù)寫日志(write-ahead-log,WAL)
我們發(fā)現(xiàn),事務(wù)提交是需要外部存儲(chǔ)系統(tǒng)支持事務(wù)的,否則沒(méi)有辦法真正實(shí)現(xiàn)寫入的回撤。那對(duì)于一般不支持事務(wù)的存儲(chǔ)系統(tǒng),能夠?qū)崿F(xiàn)事務(wù)寫入呢?
預(yù)寫日志(WAL)就是一種非常簡(jiǎn)單的方式。具體步驟是:
①先把結(jié)果數(shù)據(jù)作為日志(log)狀態(tài)保存起來(lái)
②進(jìn)行檢查點(diǎn)保存時(shí),也會(huì)將這些結(jié)果數(shù)據(jù)一并做持久化存儲(chǔ)
③在收到檢查點(diǎn)完成的通知時(shí),將所有結(jié)果一次性寫入外部系統(tǒng)。
④在成功寫入所有數(shù)據(jù)后,在內(nèi)部再次確認(rèn)(ack)相應(yīng)的檢查點(diǎn),將確認(rèn)信息也進(jìn)行持久化保存。這才代表著檢查點(diǎn)的真正完成。
我們會(huì)發(fā)現(xiàn),這種方式類似于檢查點(diǎn)完成時(shí)做一個(gè)批處理,一次性的寫入會(huì)帶來(lái)一些性能上的問(wèn)題;而優(yōu)點(diǎn)就是比較簡(jiǎn)單,由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無(wú)論什么外部存儲(chǔ)系統(tǒng),理論上都能用這種方式一批搞定。在Flink中DataStream API提供了一個(gè)模板類GenericWriteAheadSink,用來(lái)實(shí)現(xiàn)這種事務(wù)型的寫入方式。
需要注意的是,預(yù)寫日志這種一批寫入的方式,有可能會(huì)寫入失敗;所以在執(zhí)行寫入動(dòng)作之后,必須等待發(fā)送成功的返回確認(rèn)消息。在成功寫入所有數(shù)據(jù)后,在內(nèi)部再次確認(rèn)相應(yīng)的檢查點(diǎn),這才代表著檢查點(diǎn)的真正完成。這里需要將確認(rèn)信息也進(jìn)行持久化保存,在故障恢復(fù)時(shí),只有存在對(duì)應(yīng)的確認(rèn)信息,才能保證這批數(shù)據(jù)已經(jīng)寫入,可以恢復(fù)到對(duì)應(yīng)的檢查點(diǎn)位置。
但這種“再次確認(rèn)”的方式,也會(huì)有一些缺陷。如果我們的檢查點(diǎn)已經(jīng)成功保存、數(shù)據(jù)也成功地一批寫入到了外部系統(tǒng),但是最終保存確認(rèn)信息時(shí)出現(xiàn)了故障,F(xiàn)link最終還是會(huì)認(rèn)為沒(méi)有成功寫入。于是發(fā)生故障時(shí),不會(huì)使用這個(gè)檢查點(diǎn),而是需要回退到上一個(gè);這樣就會(huì)導(dǎo)致這批數(shù)據(jù)的重復(fù)寫入。
(2)兩階段提交(two-phase-commit,2PC)
前面提到的各種實(shí)現(xiàn)exactly-once的方式,多少都有點(diǎn)缺陷;而更好的方法就是兩階段提交(2PC)。
顧名思義,它的想法是分成兩個(gè)階段:先做“預(yù)提交”,等檢查點(diǎn)完成之后再正式提交。這種提交方式是真正基于事務(wù)的,它需要外部系統(tǒng)提供事務(wù)支持。
具體的實(shí)現(xiàn)步驟為:
①當(dāng)?shù)谝粭l數(shù)據(jù)到來(lái)時(shí),或者收到檢查點(diǎn)的分界線時(shí),Sink任務(wù)都會(huì)啟動(dòng)一個(gè)事務(wù)。
②接下來(lái)接收到的所有數(shù)據(jù),都通過(guò)這個(gè)事務(wù)寫入外部系統(tǒng);這時(shí)由于事務(wù)沒(méi)有提交,所以數(shù)據(jù)盡管寫入了外部系統(tǒng),但是不可用,是“預(yù)提交”的狀態(tài)。
③當(dāng)Sink任務(wù)收到JobManager發(fā)來(lái)檢查點(diǎn)完成的通知時(shí),正式提交事務(wù),寫入的結(jié)果就真正可用了。
簡(jiǎn)單來(lái)說(shuō),就是第一條數(shù)據(jù)達(dá)到,或者分界線到達(dá)的時(shí)候開(kāi)啟事務(wù),數(shù)據(jù)被寫入外部系統(tǒng)(預(yù)提交)。檢查點(diǎn)保存成功,則提交事務(wù),此時(shí)數(shù)據(jù)真正可用;否則事務(wù)回滾,外部系統(tǒng)的數(shù)據(jù)也被回滾。
當(dāng)事務(wù)中發(fā)生故障時(shí),事務(wù)將會(huì)回滾,被寫入外部系統(tǒng)的數(shù)據(jù)也應(yīng)該被撤回。兩階段提交充分的利用了Flink的檢查點(diǎn)機(jī)制,當(dāng)分界線到來(lái)時(shí),則開(kāi)啟一個(gè)事務(wù);當(dāng)檢查點(diǎn)成功時(shí),則提交該事務(wù),并且該方法不用預(yù)寫日志的批處理,減少了很多開(kāi)銷。
在我們使用Flink官方提供的連接器時(shí),無(wú)需自己實(shí)現(xiàn)兩階段提交(P2P)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-817228.html
不過(guò)兩階段提交雖然精巧,卻對(duì)外部系統(tǒng)有很高的要求。這里將2PC對(duì)外部系統(tǒng)的要求列舉如下:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-817228.html
- 外部系統(tǒng)必須提供事務(wù)支持,或者Sink任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)。
- 在檢查點(diǎn)的間隔期間里,必須能夠開(kāi)啟一個(gè)事務(wù)并接受數(shù)據(jù)寫入。
- 在收到檢查點(diǎn)完成的通知之前,事務(wù)必須是“等待提交”的狀態(tài)。在故障恢復(fù)的情況下,這可能需要一些時(shí)間。如果這個(gè)時(shí)候外部系統(tǒng)關(guān)閉事務(wù)(例如超時(shí)了),那么未提交的數(shù)據(jù)就會(huì)丟失。
- Sink任務(wù)必須能夠在進(jìn)程失敗后恢復(fù)事務(wù)(持久化事務(wù)至檢查點(diǎn))。
- 提交事務(wù)必須是冪等操作。也就是說(shuō),事務(wù)的重復(fù)提交應(yīng)該是無(wú)效的。
到了這里,關(guān)于Flink中的容錯(cuò)機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!