前言
? ? ? ? 最近已經(jīng)放假了,但是一直在忙一個很重要的自己的一個項目,用 JavaFX 和一個大數(shù)據(jù)組件聯(lián)合開發(fā)一個功能,也算不枉我學了一次 JavaFX,收獲很大,JavaFX 它作為一個 GUI 開發(fā)語言,本質(zhì)還是 Java,所以很好的鍛煉了我的 Java 水平、抽象能力 ... 平??此坪唵蔚囊恍└拍钣玫綄嶋H應用當中才發(fā)現(xiàn)了其中的坑點,比如怎么封裝、什么時候用 static 關鍵字、靜態(tài)資源怎么放、哪些要反復利用的東西需要抽象成一個 pojo、什么情況下需要定義接口 ... 總之收獲很大。
? ? ? ? 今天趕緊繼續(xù)開始大數(shù)據(jù)組件的學習,F(xiàn)link 已經(jīng)停了好長一段時間了,開干開干。
容錯機制
????????流式數(shù)據(jù)連續(xù)不斷地到來,無休無止;所以流處理程序也是持續(xù)運行的,并沒有一個明確的結束退出時間。機器運行程序,996 起來當然比人要容易得多,不過希望“永遠運行”也是不切實際的。因為各種硬件軟件的原因,運行一段時間后程序可能異常退出、機器可能宕機,如果我們只依賴一臺機器來運行,就會使得任務的處理被迫中斷。
????????一個解決方案就是多臺機器組成集群,以“分布式架構”來運行程序。這樣不僅擴展了系統(tǒng)的并行處理能力,而且可以解決單點故障的問題,從而大大提高系統(tǒng)的穩(wěn)定性和可用性。在分布式架構中,當某個節(jié)點出現(xiàn)故障,其他節(jié)點基本不受影響。這時只需要重啟應用,恢復之前某個時間點的狀態(tài)繼續(xù)處理就可以了。這一切看似簡單,可是在實時流處理中,我們不僅需要保證故障后能夠重啟繼續(xù)運行,還要保證結果的正確性、故障恢復的速度、對處理性能的影響,這就需要在架構上做出更加精巧的設計。
????????在 Flink 中,有一套完整的容錯機制(fault tolerance)來保證故障后的恢復,其中最重要的就是檢查點(checkpoint),類似與我們之前學習的 Spark ,它也有檢查點來提供容錯,學完我們對比一下它們究竟有啥不同。
1、檢查點(checkpoint)
????????在流處理中,我們可以用存檔的思路,將之前某個時間點的狀態(tài)保存下來,這份存檔就是我們所謂的“檢查點”。就像我們學大數(shù)據(jù)專業(yè)時候安裝虛擬機的過程,虛擬機的快照功能可以幫助我們恢復到我們機器之前的狀態(tài)。
????????我們知道在有狀態(tài)的流處理中,任務繼續(xù)處理新數(shù)據(jù),并不需要“之前的計算結果”,而是需要任務“之前的狀態(tài)”。比如假設我們有一個長度為 10 的滑動窗口,它的滑動步長是 5 ,任務是求sum。當機器故障時,我們重啟應用,這時候我們的任務會創(chuàng)建新的窗口處理新的數(shù)據(jù),我們知道滑動窗口會在每個步長處觸發(fā)一次計算,所以當我們的窗口到達一個步長時,它的窗口范圍是 [-5,5),而[-5,0) 的數(shù)據(jù)在歷史狀態(tài)(檢查點)中保存著,而且上一個窗口的計算結果我們無法利用,因為它計算的是 [-10,0) 內(nèi)的數(shù)據(jù),所以說我們并不需要“之前的計算結果”,而是需要任務“之前的狀態(tài)”。
????????遇到故障重啟的時候,我們可以從檢查點中“讀檔”,恢復出之前的狀態(tài),這樣就可以回到當時保存的一刻接著處理數(shù)據(jù)了。
????????檢查點是 Flink 容錯機制的核心。這里所謂的“檢查”,其實是針對故障恢復的結果而言的:故障恢復之后繼續(xù)處理的結果,應該與發(fā)生故障前完全一致,我們需要“檢查”結果的正確性。所以,有時又會把 checkpoint 叫作“一致性檢查點”。
1.1、檢查點的保存
????????什么時候進行檢查點的保存呢?最理想的情況下,我們應該“隨時”保存,也就是每處理完一個數(shù)據(jù)就保存一下當前的狀態(tài);這樣如果在處理某條數(shù)據(jù)時出現(xiàn)故障,我們只要回到上一個數(shù)據(jù)處理完之后的狀態(tài),然后重新處理一遍這條數(shù)據(jù)就可以。這樣重復處理的數(shù)據(jù)最少,完全沒有多余操作,可以做到最低的延遲。然而實際情況不會這么完美。
(1)?周期性的觸發(fā)保存
????????“隨時存檔”確實恢復起來方便,但是需要我們不停地做存檔操作,那不是閑得蛋疼嘛。如果每處理一條數(shù)據(jù)就進行檢查點的保存,當大量數(shù)據(jù)同時到來時,就會耗費很多資源來頻繁做檢查點,影響應用處理數(shù)據(jù)的性能,數(shù)據(jù)處理的速度就會受到影響。所以更好的方式是,每隔一段時間去做一次存檔,這樣既不會影響數(shù)據(jù)的正常處理,也不會有太大的延遲——畢竟故障恢復的情況不是隨時發(fā)生的。在 Flink 中,檢查點的保存是周期性觸發(fā)的,間隔時間可以進行設置。
????????所以檢查點作為應用狀態(tài)的一份“存檔”,其實就是所有任務狀態(tài)在同一時間點的一個“快照”(snapshot),它的觸發(fā)是周期性的。具體來說,當每隔一段時間檢查點保存操作被觸發(fā)時,就把每個任務當前的狀態(tài)復制一份,按照一定的邏輯結構放在一起持久化保存起來,就構成了檢查點。
(2)?保存的時間點
????????這里有一個關鍵問題:當檢查點的保存被觸發(fā)時,任務有可能正在處理某個數(shù)據(jù),這時該怎么辦呢?最簡單的想法是,可以在某個時刻“按下暫停鍵”,讓所有任務停止處理數(shù)據(jù)。這樣狀態(tài)就不再更改,大家可以一起復制保存;保存完畢之后,再同時恢復數(shù)據(jù)處理就可以了。然而仔細思考就會發(fā)現(xiàn)這有很多問題。這種想法其實是粗暴地“停止一切來進行快照”,在保存檢查點的過程中,任務完全中斷了,這會造成很大的延遲;我們之前為了實時性做出的所有設計就毀在了做快照上。
????????另一方面,我們做快照的目的是為了故障恢復;現(xiàn)在的快照中,有些任務正在處理數(shù)據(jù),那它保存的到底是處理到什么程度的狀態(tài)呢?舉個例子,我們在程序中某一步操作中自定義了一個 ValueState,處理的邏輯是:當遇到一個數(shù)據(jù)時,狀態(tài)先加 1;而后經(jīng)過一些其他步驟后再加 1。現(xiàn)在停止處理數(shù)據(jù),狀態(tài)到底是被加了 1 還是加了 2 呢?這很重要,因為狀態(tài)恢復之后,我們需要知道當前數(shù)據(jù)從哪里開始繼續(xù)處理。要滿足這個要求,就必須將暫停時的所有環(huán)境信息都保存下來——而這顯然是很麻煩的。為了解決這個問題,我們不應該“一刀切”把所有任務同時停掉,而是至少得先把手頭正在處理的數(shù)據(jù)弄完。這樣的話,我們在檢查點中就不需要保存所有上下文信息,只要知道當前處理到哪個數(shù)據(jù)就可以了。
????????但這樣依然會有問題:分布式系統(tǒng)的節(jié)點之間需要通過網(wǎng)絡通信來傳遞數(shù)據(jù),如果我們保存檢查點的時候剛好有數(shù)據(jù)在網(wǎng)絡傳輸?shù)穆飞?,那么下游任務是沒法將數(shù)據(jù)保存起來的;故障重啟之后,我們只能期待上游任務重新發(fā)送這個數(shù)據(jù)。然而上游任務是無法知道下游任務是否收到數(shù)據(jù)的,只能盲目地重發(fā),這可能導致下游將數(shù)據(jù)處理兩次,結果就會出現(xiàn)錯誤。
????????所以我們最終的選擇是:當所有任務都恰好處理完一個相同的輸入數(shù)據(jù)的時候(這里指的是上游算子和下游算子處理完一個相同的數(shù)據(jù)),將它們的狀態(tài)保存下來。首先,這樣避免了除狀態(tài)之外其他額外信息的存儲,提高了檢查點保存的效率。其次,一個數(shù)據(jù)要么就是被所有任務完整地處理完,狀態(tài)得到了保存;要么就是沒處理完,狀態(tài)全部沒保存:這就相當于構建了一個“事務”(transaction)。
????????如果出現(xiàn)故障,我們恢復到之前保存的狀態(tài),故障時正在處理的所有數(shù)據(jù)都需要重新處理;所以我們只需要讓源(source)任務向數(shù)據(jù)源重新提交偏移量、請求重放數(shù)據(jù)就可以了。這需要源任務可以把偏移量作為算子狀態(tài)保存下來,而且外部數(shù)據(jù)源能夠重置偏移量,Kafka 就能滿足這個要求,我們只需要重置Kafka輸出的偏移量就行,畢竟Kafka是持久保存我們的數(shù)據(jù)的,并不是發(fā)送完數(shù)據(jù)就立馬刪除。
(3)保存的具體流程
????????檢查點的保存,最關鍵的就是要等所有任務將“同一個數(shù)據(jù)”處理完畢。這里舉一個統(tǒng)計詞頻的例子—WordCount。這里為了方便,我們直接從數(shù)據(jù)源讀入已經(jīng)分開的一個個單詞,例如這里輸入的就是:
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”……
????????這里的比如我們的所有的有狀態(tài)算子(source、sum、sink)都處理完第三個單詞 “hello” 后就更新自己的狀態(tài)。
????????當我們的所有任務處理完同一條數(shù)據(jù)后,對狀態(tài)做個快照保存下來。例如上圖中,已經(jīng)處理了 3 條數(shù)據(jù):“hello”“world”“hello”,所以我們會看到 Source 算子的偏移量為 3;后面的 Sum 算子處理完第三條數(shù)據(jù)“hello”之后,此時已經(jīng)有 2 個“hello”和 1 個“world”,所以對應的狀態(tài)為“hello”-> 2,“world”-> 1(這里 KeyedState底層會以 key-value 形式存儲)。此時所有任務都已經(jīng)處理完了前三個數(shù)據(jù),所以我們可以把當前的狀態(tài)保存成一個檢查點,寫入外部存儲中。至于具體保存到哪里,這是由狀態(tài)后端的配置項 “ 檢 查 點 存 儲 ”( CheckpointStorage )來決定的,可以有作業(yè)管理器的堆內(nèi)存(JobManagerCheckpointStorage)和文件系統(tǒng)(FileSystemCheckpointStorage)兩種選擇。一般情況下,我們會將檢查點寫入持久化的分布式文件系統(tǒng)。
1.2、從檢查點恢復狀態(tài)
????????在運行流處理程序時,F(xiàn)link 會周期性地保存檢查點。當發(fā)生故障時,就需要找到最近一次成功保存的檢查點來恢復狀態(tài)。
????????比如我們上面處理完第三個數(shù)據(jù)(“hello”)后保存了一個檢查點。之后繼續(xù)運行,又正常處理了一個數(shù)據(jù)“flink”,在處理第五個數(shù)據(jù)“hello”時發(fā)生了故障:
????????這時?Source 任務已經(jīng)處理完畢,所以偏移量為 5;Map 任務也處理完成了。而其中一個 Sum 任務在處理中發(fā)生了故障,此時狀態(tài)并未保存(“flink”?和 “hello” 的狀態(tài)都未保存)。接下來就需要從檢查點來恢復狀態(tài)了。具體的步驟為:
(1)重啟應用
遇到故障之后,第一步當然就是重啟。我們將應用重新啟動后,所有任務的狀態(tài)會清空:
(2)讀取檢查點,重置狀態(tài)
????????找到最近一次保存的檢查點,從中讀出每個算子任務狀態(tài)的快照,分別填充到對應的狀態(tài)中。這樣,F(xiàn)link 內(nèi)部所有任務的狀態(tài),就恢復到了保存檢查點的那一時刻,也就是剛好處理完第三個數(shù)據(jù)的時候,這里第四條數(shù)據(jù) “flink” 并沒有數(shù)據(jù)到來,所以初始為 0。
(3)重放數(shù)據(jù)
????????從檢查點恢復狀態(tài)后還有一個問題:如果直接繼續(xù)處理數(shù)據(jù),那么保存檢查點之后、到發(fā)生故障這段時間內(nèi)的數(shù)據(jù),也就是第 4、5 個數(shù)據(jù)(“flink”“hello”)就相當于丟掉了;這會造成計算結果的錯誤。為了不丟數(shù)據(jù),我們應該從保存檢查點后開始重新讀取數(shù)據(jù),這可以通過 Source 任務向外部數(shù)據(jù)源重新提交偏移量(offset)來實現(xiàn):
這樣,整個系統(tǒng)的狀態(tài)已經(jīng)完全回退到了檢查點保存完成的那一時刻。
(4)繼續(xù)處理數(shù)據(jù)
接下來,我們就可以正常處理數(shù)據(jù)了。首先是重放第 4、5 個數(shù)據(jù),然后繼續(xù)讀取后面的數(shù)據(jù):
????????當處理到第 5 個數(shù)據(jù)時,就已經(jīng)追上了發(fā)生故障時的系統(tǒng)狀態(tài)。之后繼續(xù)處理,就好像沒有發(fā)生過故障一樣;我們既沒有丟掉數(shù)據(jù)也沒有重復計算數(shù)據(jù),這就保證了計算結果的正確性。在分布式系統(tǒng)中,這叫作實現(xiàn)了“精確一次”(exactly-once)的狀態(tài)一致性保證。
????????這里我們也可以發(fā)現(xiàn),想要正確地從檢查點中讀取并恢復狀態(tài),必須知道每個算子任務狀態(tài)的類型和它們的先后順序(拓撲結構);因此為了可以從之前的檢查點中恢復狀態(tài),我們在改動程序、修復 bug 時要保證狀態(tài)的拓撲順序和類型不變。狀態(tài)的拓撲結構在 JobManager 上可以由 JobGraph 分析得到,而檢查點保存的定期觸發(fā)也是由 JobManager 控制的;所以故障恢復的過程需要 JobManager 的參與。
1.3、檢查點算法
????????我們已經(jīng)知道,F(xiàn)link 保存檢查點的時間點,是所有任務都處理完同一個輸入數(shù)據(jù)的時候。但是不同的任務處理數(shù)據(jù)的速度不同,當?shù)谝粋€ Source 任務處理到某個數(shù)據(jù)時,后面的 Sum任務可能還在處理之前的數(shù)據(jù);而且數(shù)據(jù)經(jīng)過任務處理之后類型和值都會發(fā)生變化,面對著“面目全非”的數(shù)據(jù),不同的任務怎么知道處理的是“同一個”呢?
????????一個簡單的想法是,當接到 JobManager 發(fā)出的保存檢查點的指令后,Source 算子任務處理完當前數(shù)據(jù)就暫停等待,不再讀取新的數(shù)據(jù)了。也就是留一個空檔期,這樣我們就可以保證在流中只有需要保存到檢查點的數(shù)據(jù),只要把它們?nèi)刻幚硗?,就可以保證所有任務剛好處理完最后一個數(shù)據(jù);這時把所有狀態(tài)保存起來,合并之后就是一個檢查點了。就相當于當要進行檢查點保存時,Source任務先停下來,這樣就只需要等待最后一個數(shù)據(jù)被所有任務處理之后再進行保存 ,而且這樣可以保證所有任務保存的都是統(tǒng)一個數(shù)據(jù)。
????????但這樣做最大的問題,就是每個任務的進度可能不同;為了保證狀態(tài)一致前面的任務不能進行其他工作,只能等待后面的任務處理到相同的數(shù)據(jù)再進行檢查點的保存。當先保存完狀態(tài)的任務需要等待其他任務時,就導致了資源的閑置和性能的降低。所以更好的做法是,在不暫停整體流處理的前提下,將狀態(tài)備份保存到檢查點。在 Flink中,采用了基于 Chandy-Lamport 算法的分布式快照。
(1)檢查點分界線(Barrier)
????????我們現(xiàn)在的目標是,在不暫停流處理的前提下,讓每個任務“認出”觸發(fā)檢查點保存的那個數(shù)據(jù)。
????????自然想到,如果給數(shù)據(jù)添加一個特殊標識,任務就可以準確識別并開始保存狀態(tài)了。這需要在 Source 任務收到觸發(fā)檢查點保存的指令后,立即在當前處理的數(shù)據(jù)中插入一個標識字段,然后再向下游任務發(fā)出。但是假如 Source 任務此時并沒有正在處理的數(shù)據(jù),這個操作就無法實現(xiàn)了。所以我們可以借鑒水位線(watermark)的設計,在數(shù)據(jù)流中插入一個特殊的數(shù)據(jù)結構,專門用來表示觸發(fā)檢查點保存的時間點。收到保存檢查點的指令后,Source 任務可以在當前數(shù)據(jù)流中插入這個結構;之后的所有任務只要遇到它就開始對狀態(tài)做持久化快照保存。由于數(shù)據(jù)流是保持順序依次處理的,因此遇到這個標識就代表之前的數(shù)據(jù)都處理完了,可以保存一個檢查點;而在它之后的數(shù)據(jù),引起的狀態(tài)改變就不會體現(xiàn)在這個檢查點中,而需要保存到下一個檢查點。
????????這種特殊的數(shù)據(jù)形式,把一條流上的數(shù)據(jù)按照不同的檢查點分隔開,所以就叫作檢查點的“分界線”(Checkpoint Barrier)。與水位線很類似,檢查點分界線也是一條特殊的數(shù)據(jù),由 Source 算子注入到常規(guī)的數(shù)據(jù)流中,它的位置是限定好的,不能超過其他數(shù)據(jù),也不能被后面的數(shù)據(jù)超過。檢查點分界線中帶有一個檢查點 ID,這是當前要保存的檢查點的唯一標識。
????????這樣,分界線就將一條流邏輯上分成了兩部分:分界線之前到來的數(shù)據(jù)導致的狀態(tài)更改,都會被包含在當前分界線所表示的檢查點中;而基于分界線之后的數(shù)據(jù)導致的狀態(tài)更改,則會被包含在之后的檢查點中。
????????在 JobManager 中有一個“檢查點協(xié)調(diào)器”(checkpoint coordinator),專門用來協(xié)調(diào)處理檢查點的相關工作。檢查點協(xié)調(diào)器會定期向 TaskManager 發(fā)出指令,要求保存檢查點(帶著檢查點 ID);TaskManager 會讓所有的 Source 任務把自己的偏移量(算子狀態(tài))保存起來,并將帶有檢查點 ID 的分界線(barrier)插入到當前的數(shù)據(jù)流中,然后像正常的數(shù)據(jù)一樣像下游傳遞;之后 Source 任務就可以繼續(xù)讀入新的數(shù)據(jù)了。
????????每個算子任務只要處理到這個 barrier,就把當前的狀態(tài)進行快照;在收到 barrier 之前,還是正常地處理之前的數(shù)據(jù),完全不受影響。比如上圖中,Source 任務收到 1 號檢查點保存指令時,讀取完了三個數(shù)據(jù),所以將偏移量 3 保存到外部存儲中;而后將 ID 為 1 的 barrier 注入數(shù)據(jù)流;與此同時,Map 任務剛剛收到上一條數(shù)據(jù)“hello”,而 Sum 任務則還在處理之前的第二條數(shù)據(jù)(world, 1)。下游任務不會在這時就立刻保存狀態(tài),而是等收到 barrier 時才去做快照,這時可以保證前三個數(shù)據(jù)都已經(jīng)處理完了。同樣地,下游任務做狀態(tài)快照時,也不會影響上游任務的處理,每個任務的快照保存并行不悖,不會有暫停等待的時間。
(2)?分布式快照算法(Barrier 對齊的精準一次)
????????通過在流中插入分界線(barrier),我們可以明確地指示觸發(fā)檢查點保存的時間。在一條單一的流上,數(shù)據(jù)依次進行處理,順序保持不變;不過對于分布式流處理來說,想要一直保持數(shù)據(jù)的順序就不是那么容易了。我們先回憶一下水位線(watermark)的處理:上游任務向多個并行下游任務傳遞時,需要廣播出去;而多個上游任務向同一個下游任務傳遞時,則需要下游任務為每個上游并行任務維護一個“分區(qū)水位線”,取其中最小的那個作為當前任務的事件時鐘。那 barier 在并行數(shù)據(jù)流中的傳遞,是不是也有類似的規(guī)則呢?watermark 指示的是“之前的數(shù)據(jù)全部到齊了”,而 barrier 指示的是“之前所有數(shù)據(jù)的狀態(tài)更改保存入當前檢查點”:它們都是一個“截止時間”的標志。所以在處理多個分區(qū)的傳遞時,也要以是否還會有數(shù)據(jù)到來作為一個判斷標準。
????????具體實現(xiàn)上,F(xiàn)link 使用了 Chandy-Lamport 算法的一種變體,被稱為“異步分界線快照”(asynchronous barrier snapshotting)算法。算法的核心就是兩個原則:
- 當上游任務向多個并行下游任務發(fā)送 barrier 時,需要廣播出去;
- 而當多個上游任務向同一個下游任務傳遞 barrier 時,需要在下游任務執(zhí)行“分界線對齊”(barrier alignment)操作,也就是需要等到所有并行分區(qū)的 barrier 都到齊,才可以開始狀態(tài)的保存。
為了詳細解釋檢查點算法的原理,我們對之前的 word count 程序進行擴展,考慮所有算子并行度為 2 的場景:
我們有兩個并行的 Source 任務,會分別讀取兩個數(shù)據(jù)流(或者是一個源的不同分區(qū))。這里每條流中的數(shù)據(jù)都是一個個的單詞:“hello”“world”“hello”“flink”交替出現(xiàn)。此時第一條流的 Source 任務(我們叫它“Source 1”)讀取了 3個數(shù)據(jù),偏移量為 3;而第二條流的 Source 任務(Source 2)只讀取了一個“hello”數(shù)據(jù),偏移量為 1。第一條流中的第一個數(shù)據(jù)“hello”已經(jīng)完全處理完畢,所以 Sum 任務的狀態(tài)中 key為 hello 對應著值 1,而且已經(jīng)發(fā)出了結果(hello, 1);第二個數(shù)據(jù)“world”經(jīng)過了 Map 任務的轉換,還在被 Sum 任務處理;第三個數(shù)據(jù)“hello”還在被 Map 任務處理。而第二條流的第一個數(shù)據(jù)“hello”同樣已經(jīng)經(jīng)過了 Map 轉換,正在被 Sum 任務處理。
接下來就是檢查點保存的算法。具體過程如下:
1.JobManager 發(fā)送指令,觸發(fā)檢查點的保存;
JobManager 會周期性地向每個 TaskManager 發(fā)送一條帶有新檢查點 ID 的消息,通過這種方式來啟動檢查點。收到指令后,TaskManger 會在所有 Source 任務中插入一個分界線(barrier),并將偏移量保存到遠程的持久化存儲中。
并行的 Source 任務保存的狀態(tài)為 3 和 1,表示當前的 1 號檢查點應該包含:第一條流中截至第三個數(shù)據(jù)、第二條流中截至第一個數(shù)據(jù)的所有狀態(tài)更改??梢园l(fā)現(xiàn) Source 任務做這些的時候并不影響下游任務的處理,Sum 任務已經(jīng)處理完了第一條流中傳來的(world, 1),對應的狀態(tài)也有了更改。
2.狀態(tài)快照保存完成,分界線向下游傳遞
狀態(tài)存入持久化存儲之后,會返回通知給 Source 任務;Source 任務就會向 JobManager 確認檢查點完成,然后像數(shù)據(jù)一樣把 barrier 向下游任務傳遞。
由于 Source 和 Map 之間是一對一(forward)的傳輸關系(這里沒有考慮算子鏈 operator chain),所以 barrier 可以直接傳遞給對應的 Map 任務。之后 Source 任務就可以繼續(xù)讀取新的數(shù)據(jù)了。與此同時,Sum 1 已經(jīng)將第二條流傳來的(hello,1)處理完畢,更新了狀態(tài)。
3.向下游多個并行子任務廣播分界線,執(zhí)行分界線對齊
Map 任務沒有狀態(tài),所以直接將 barrier 繼續(xù)向下游傳遞。這時由于進行了 keyBy 分區(qū),所以需要將 barrier 廣播到下游并行的兩個 Sum 任務。同時,Sum 任務可能收到來自上游兩個并行 Map 任務的 barrier,所以需要執(zhí)行“分界線對齊”操作。
所謂分界線對齊,意思就是當前任務要保存狀態(tài)前,需要等待上游任務(多個上游任務才需要對齊)的 barrier 都到齊以后才能保存。
此時的 Sum 2 收到了來自上游兩個 Map 任務的 barrier,說明第一條流第三個數(shù)據(jù)、第二條流第一個數(shù)據(jù)都已經(jīng)處理完,可以進行狀態(tài)的保存了;而 Sum 1 只收到了來自 Map 2 的barrier,所以這時需要等待分界線對齊。在等待的過程中,如果分界線尚未到達的分區(qū)任務Map 1 又傳來了數(shù)據(jù)(hello, 1),說明這是需要保存到檢查點的,Sum 任務應該正常繼續(xù)處理數(shù)據(jù),狀態(tài)更新為 3;而如果分界線已經(jīng)到達的分區(qū)任務 Map 2 又傳來數(shù)據(jù),這已經(jīng)是下一個檢查點要保存的內(nèi)容了,就不應立即處理,而是要緩存起來、等到狀態(tài)保存之后再做處理。
4.分界線對齊后,保存狀態(tài)到持久化存儲
各個分區(qū)的分界線都對齊后,就可以對當前狀態(tài)做快照,保存到持久化存儲了。存儲完成之后,同樣將 barrier 向下游繼續(xù)傳遞,并通知 JobManager 保存完畢。
這個過程中,每個任務保存自己的狀態(tài)都是相對獨立的,互不影響。我們可以看到,當Sum 將當前狀態(tài)保存完畢時,Source 1 任務已經(jīng)讀取到第一條流的第五個數(shù)據(jù)了。
5. 先處理緩存數(shù)據(jù),然后正常繼續(xù)處理
完成檢查點保存之后,任務就可以繼續(xù)正常處理數(shù)據(jù)了。這時如果有等待分界線對齊時緩存的數(shù)據(jù),需要先做處理;然后再按照順序依次處理新到的數(shù)據(jù)。當 JobManager 收到所有任務成功保存狀態(tài)的信息,就可以確認當前檢查點成功保存。之后遇到故障就可以從這里恢復了。由于分界線對齊要求先到達的分區(qū)做緩存等待,一定程度上會影響處理的速度;當出現(xiàn)背壓(backpressure)時,下游任務會堆積大量的緩沖數(shù)據(jù),檢查點可能需要很久才可以保存完畢。為了應對這種場景,F(xiàn)link 1.11 之后提供了不對齊的檢查點保存方式,可以將未處理的緩沖數(shù)據(jù)(in-flight data)也保存進檢查點。這樣,當我們遇到一個分區(qū) barrier 時就不需等待對齊,而是可以直接啟動狀態(tài)的保存了。
背壓機制:背壓機制是一種在異步編程中處理數(shù)據(jù)流的機制,特別是在響應式編程中。當生產(chǎn)者產(chǎn)生的數(shù)據(jù)流速度超過消費者處理的速度時,背壓機制可以用來調(diào)整生產(chǎn)者的生產(chǎn)速率,以適應消費者的處理能力,從而避免數(shù)據(jù)積壓和資源耗盡的問題。
(3)分布式快照算法(Barrier 對齊的至少一次)
在 Barrier 對齊精準一次的方式下,對于 Barrier 之后的數(shù)據(jù),不能進行計算,只能等到 Barrier 對齊并持久化保存之后才能進入下游算子進行計算。
????????而在 Barrier 對齊至少一次的語義下,如果在?Barrier 對齊的過程中,Barrier 后面的數(shù)據(jù)越過了 Barrier 并進行了計算持久化保存到狀態(tài)當中。所以缺點就是如果應用出現(xiàn)了故障需要重啟,那么這部分在 Barrier 之后但是被持久化保存到狀態(tài)中的數(shù)據(jù)就會被重復恢復計算,就會造成結果的不準確。但是優(yōu)點也很明顯,至少一次情況下,它不需要等待,就是說不用等到Barrier 對齊才進行計算,Barrier 后的數(shù)據(jù)就不需要緩存起來,也就不用擔心出現(xiàn)背壓時可能出現(xiàn)的一些其他問題,對我們的程序的壓力不會那么大。
(4)分布式快照算法(非 Barrier 對齊的精準一次)
????????非 Barrier 對齊的精準一次語義是在 Flink1.11 之后提出來的,由于分界線對齊要求先叨叨的分區(qū)做緩存等待,一定程度上會影響處理的速度;當出現(xiàn)背壓時下游任務會堆積大量的數(shù)據(jù),檢查點也可能需要很久才能保存完畢。所以我們的解決方案就是要么使用 Barrier 對齊的至少一次語義,要么就使用非 Barrier 對齊的精準一次語義。
? ? ? ? 非Barrier對齊并不是說不用Barrier,它的意思只是說不需要對齊了,僅此而已。
????????在非Barrier對齊算法中,一個任務在收到第一個Barrier時就開始執(zhí)行備份,可以保證精準一次。
- 收到第一個 Barrier 后,直接把它放到輸出緩沖區(qū)末端,向下游傳遞
- 標記數(shù)據(jù):把被第一個 Barrier 越過的數(shù)據(jù)和其它 Barrier 之前的所有數(shù)據(jù)標記
- 把標記數(shù)據(jù)和當前任務的狀態(tài)保存到當前任務的狀態(tài)當中,當進行檢查點恢復時這些數(shù)據(jù)都會恢復到對應位置
優(yōu)點:
????????非Barrier對齊算法可以避免數(shù)據(jù)阻塞等待的問題,并且可以更精確地控制數(shù)據(jù)處理的語義。同時,非Barrier對齊算法可以更好地利用系統(tǒng)資源,提高數(shù)據(jù)處理的效率和吞吐量。
缺點:
? ? ? ? 需要占用更多的備份磁盤開銷。
這種算法更加符合Chandy-Lamport 算法的思想。
總結
1. Barrier 對齊:一個 Task 收到所有上游的 barrier 之后,才會對自己的本地狀態(tài)進行備份。
? ? ? ? 1.1 精準一次:在對齊過程中,barrier 后面的數(shù)據(jù) 阻塞等待(不會越過 barrier)
? ? ? ? 1.2 至少一次:在對齊的過程中,先到的 barrier 其后面的數(shù)據(jù)不阻塞,將會被計算并備份到狀態(tài)當中?
2. 非 Barrier 對齊:一個 Task 收到第一個 barrier 就開始執(zhí)行備份。
????????能保證精準一次,先到的 barrier 會將本地狀態(tài)備份,后面的數(shù)據(jù)接著計算輸出
? ? ? ? 未到的 barrier,其前面的數(shù)據(jù)接著計算輸出,同時也保存到備份當中
? ? ? ? 最后一個 barrier 到達該 task 時,這個task的備份結束
1.4、檢查點配置
檢查點的作用是為了故障恢復,我們不能因為保存檢查點占據(jù)了大量時間、導致數(shù)據(jù)處理性能明顯降低。為了兼顧容錯性和處理性能,我們可以在代碼中對檢查點進行各種配置。
1. 啟用檢查點
默認情況下,F(xiàn)link 程序是禁用檢查點的。如果想要為 Flink 應用開啟自動保存快照的功能,需要在代碼中顯式地調(diào)用執(zhí)行環(huán)境的.enableCheckpointing()方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒啟動一次檢查點保存
env.enableCheckpointing(1000);
這里需要傳入一個長整型的毫秒數(shù),表示周期性保存檢查點的間隔時間。如果不傳參數(shù)直接啟用檢查點,默認的間隔周期為 500 毫秒,這種方式已經(jīng)被棄用。檢查點的間隔時間是對處理性能和故障恢復速度的一個權衡。如果我們希望對性能的影響更小,可以調(diào)大間隔時間;而如果希望故障重啟后迅速趕上實時的數(shù)據(jù)處理,就需要將間隔時間設小一些。
2. 檢查點存儲
(1)檢查點模式(CheckpointingMode)
????????設置檢查點一致性的保證級別,有“精確一次”(exactly-once)和“至少一次”(at-least-once)兩個選項。默認級別為 exactly-once,而對于大多數(shù)低延遲的流處理程序,at-least-once 就夠用了,而且處理效率會更高。關于一致性級別,我們會在 10.2 節(jié)繼續(xù)展開。
(2)超時時間(checkpointTimeout)
????????用于指定檢查點保存的超時時間,超時沒完成(比如等待其他barrier的時間過長)就視為失敗。傳入一個長整型毫秒數(shù)作為參數(shù),表示超時時間。
(3)最小間隔時間(minPauseBetweenCheckpoints)
????????用于指定在上一個檢查點完成之后,檢查點協(xié)調(diào)器(checkpoint coordinator)最快等多久可以出發(fā)保存下一個檢查點的指令。這就意味著即使已經(jīng)達到了周期觸發(fā)的時間點,只要距離上一個檢查點完成的間隔不夠,就依然不能開啟下一次檢查點的保存。這就為正常處理數(shù)據(jù)留下了充足的間隙。當指定這個參數(shù)時,maxConcurrentCheckpoints 的值強制為 1。(控制一個流作用當中最多存在幾次不同的檢查點,barrier為1的檢查點一直從source到sink,最后JobManager上傳元數(shù)據(jù)到hdfs算一輪完整的checkpoint)
(4)最大并發(fā)檢查點數(shù)量(maxConcurrentCheckpoints)
????????用于指定運行中的檢查點最多可以有多少個。由于每個任務的處理進度不同,完全可能出現(xiàn)后面的任務還沒完成前一個檢查點的保存、前面任務已經(jīng)開始保存下一個檢查點了。這個參數(shù)就是限制同時進行的最大數(shù)量。如果前面設置了 minPauseBetweenCheckpoints,則maxConcurrentCheckpoints 這個參數(shù)就不起作用了。(一個流作業(yè)當中同時最多可以存在的檢查點個數(shù),比如一個流計算當中同時存在 barrier1、barrier2、barrier3...)
(5)開啟外部持久化存儲(enableExternalizedCheckpoints)
????????用于開啟檢查點的外部持久化,而且默認在作業(yè)失敗的時候不會自動清理,如果想釋放空間需要自己手工清理。里面?zhèn)魅氲膮?shù) ExternalizedCheckpointCleanup 指定了當作業(yè)取消的時候外部的檢查點該如何清理。
- DELETE_ON_CANCELLATION:在作業(yè)取消的時候會自動刪除外部檢查點,但是如果是作業(yè)失敗退出,則會保留檢查點。
- RETAIN_ON_CANCELLATION:作業(yè)取消的時候也會保留外部檢查點。
(6)檢查點異常時是否讓整個任務失?。╢ailOnCheckpointingErrors)
????????用于指定在檢查點發(fā)生異常的時候,是否應該讓任務直接失敗退出。默認為 true,如果設置為 false,則任務會丟棄掉檢查點然后繼續(xù)運行。
(7)不對齊檢查點(enableUnalignedCheckpoints)
????????不再執(zhí)行檢查點的分界線對齊操作,啟用之后可以大大減少產(chǎn)生背壓時的檢查點保存時間。這個設置要求檢查點模式(CheckpointingMode)必須為 exctly-once,并且并發(fā)的檢查點個數(shù)為 1。
// 創(chuàng)建一個本地執(zhí)行環(huán)境,并啟用 Web 用戶界面。本地執(zhí)行環(huán)境意味著 Flink 任務將在本地機器上運行,而不是在集群上。Web 用戶界面允許你監(jiān)視和調(diào)試正在運行的 Flink 任務。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
導入hadoop依賴:
<!-- 引入hadoop依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>3.3.0</version>
<!--防止把hadoop的依賴打進項目造成和 flink 依賴沖突-->
<scope>provided</scope>
</dependency>
// 代碼中指定管理檢查點路徑為hdfs,就存儲到hdfs 導入hadoop依賴,指定訪問hdfs的用戶名
System.setProperty("HADOOP_USER_NAME","lyh");
// TODO 檢查點配置
// 1. 周期為 5s 默認就是barrier對齊的精準一次
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 2. 指定檢查點的存儲位置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8080/chk");// 一般我們會存到云端
// 3. 超時時間 默認10分鐘
checkpointConfig.setCheckpointTimeout(60000);
// 4. 同時運行中的checkpoint的最大數(shù)量
checkpointConfig.setMaxConcurrentCheckpoints(2);
// 5. 最小等待間隔 上一輪checkpoint結束 到 下一輪checkpoint開始 之間的間隔
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
// 6. 取消作業(yè)時,checkpoint的數(shù)據(jù)是否保留在外部系統(tǒng) 這里設置成如果作業(yè)結束就把檢查點內(nèi)容刪除
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// 7. 允許 checkpoint 連續(xù)失敗的次數(shù) 默認為0
checkpointConfig.setTolerableCheckpointFailureNumber(10);
IDEA 調(diào)試查看Flink?UI(不需要啟動虛擬機里的Flink集群):
導入依賴(?scope 作用于不可以是 provide,否則打不開 localhost:8081)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
訪問 localhost:8081?
?
完整代碼:
public class CheckpointConfigDemo {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個流式的執(zhí)行環(huán)境
// 注意:用 getExecutionEnvironment 而不是 createLocalEnvironment 否則提交到flink無法完成作業(yè)
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建一個本地執(zhí)行環(huán)境,并啟用 Web 用戶界面。本地執(zhí)行環(huán)境意味著 Flink 任務將在本地機器上運行,而不是在集群上。Web 用戶界面允許你監(jiān)視和調(diào)試正在運行的 Flink 任務。
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT,8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
// 代碼中指定管理檢查點路徑為hdfs,就存儲到hdfs 導入hadoop依賴,指定訪問hdfs的用戶名
System.setProperty("HADOOP_USER_NAME","lyh");
// TODO 檢查點配置
// 1. 周期為 5s 默認就是barrier對齊的精準一次
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 2. 指定檢查點的存儲位置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("file:///D://Desktop/FlinkStudy/chk");// 一般我們會存到云端
// 3. 超時時間 默認10分鐘
checkpointConfig.setCheckpointTimeout(60000);
// 4. 同時運行中的checkpoint的最大數(shù)量
checkpointConfig.setMaxConcurrentCheckpoints(2);
// 5. 最小等待間隔 上一輪checkpoint結束 到 下一輪checkpoint開始 之間的間隔
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
// 6. 取消作業(yè)時,checkpoint的數(shù)據(jù)是否保留在外部系統(tǒng) 這里設置成如果作業(yè)結束就把檢查點內(nèi)容刪除
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// 7. 允許 checkpoint 連續(xù)失敗的次數(shù) 默認為0
checkpointConfig.setTolerableCheckpointFailureNumber(10);
// 2. 流式數(shù)據(jù)處理環(huán)境得到的 DataSource 繼承自 DataStream
env
.socketTextStream("hadoop102",9999)
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.sum(1)
.print();
// 7. 執(zhí)行
env.execute(); // 這里我們的數(shù)據(jù)是有界的,但是真正開發(fā)環(huán)境是無界的,這里需要用execute方法等待新數(shù)據(jù)的到來
}
}
說明:
// 取消作業(yè)時,checkpoint的數(shù)據(jù)是否保留在外部系統(tǒng) 這里設置成如果作業(yè)正常結束就把檢查點內(nèi)容刪除(如果是突然掛掉 還會保存檢查點)
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// 取消作業(yè)時 會將檢查點保留下來
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
非Barrier對齊的精準一次配置
除此之外還可以設置非Barrier?對齊的精準一次,同樣必須在啟用檢查點的時候設置精準一次且設置最大并發(fā)為1(如果是至少一次語義的話雖然不報錯但是非對齊不生效,如果最大并發(fā)不是1將報錯),然后設置:
// 設置精確一次模式
checkpointConfig.setCheckpointingMode(5000,CheckpointingMode.EXACTLY_ONCE);
// 同時運行中的checkpoint的最大數(shù)量
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 啟用不對齊的檢查點保存方式
checkpointConfig.enableUnalignedCheckpoints();
我們可以查看源碼中的說明:
啟用未對齊的檢查點,將大大減少背壓下的檢查點設置時間。
未對齊的檢查點包含作為檢查點狀態(tài)的一部分存儲在緩沖區(qū)中的數(shù)據(jù),這允許檢查點屏障超越這些緩沖區(qū)。因此,檢查點持續(xù)時間變得與當前吞吐量無關,因為檢查點屏障不再有效地嵌入到數(shù)據(jù)流中。
只有在ExecutionCheckpointingOptions的情況下才能啟用未對齊的檢查點。
?新特性-設置對齊超時時間
Flink 16/17+才有的:
????????開啟檢查點才能生效:默認為0 表示一開始就用非對齊的檢查點 如果>0 程序一開始先使用對齊的檢查點(也就是Barrier對齊) 對齊時間超過這個參數(shù)自動切換成非對齊(非Barrier對齊)
// 開啟檢查點才生效:默認為0 表示一開始就用非對齊的檢查點 如果>0 程序一開始先使用對齊的檢查點(也就是Barrier對齊) 對齊時間超過這個參數(shù)自動切換成非對齊(非Barrier對齊)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1L));
3. 通用增量 checkpoint(changelog)
? ? ? ? 在 Flink 1.15 之前,只有 RocksDB 支持增量快照。 不同于產(chǎn)生一個包含所有數(shù)據(jù)的全量備份,增量快照只包含自上一次快照完成后被修改的記錄,因此可以顯著減少快照完成的耗時。
RocksDB 狀態(tài)后端啟用增量 checkpoint:
? ? ? ? 從 Flink 1.15 開始,不管是 hashmap 還是 rocksdb 狀態(tài)后端都可以通過開啟 changelog 實現(xiàn)通用的增量 checkpoint。
我們可以在 Flink 官網(wǎng)看到對 增量快照的解釋:
?執(zhí)行過程:
1. 有狀態(tài)的算子任務將狀態(tài)更改寫入變更日志:
這里的 Stateful Changelog 就是變更日志,它記錄了一些操作,比如原本的檢查點數(shù)據(jù)為 1,2,3 現(xiàn)在變?yōu)榱?1,2,3,4 它就會記錄 +4 ,代表增加了一個數(shù)據(jù) 4。
?State Table 就是操作后的狀態(tài)(但它不是 checkpoint)。
Stateful Changelog 會實時同步到檢查點存儲當中。
2. 狀態(tài)物化:狀態(tài)表定期保存,獨立于檢查點
狀態(tài)表默認 10 分鐘保存一次,可以在配置文件中指定。狀態(tài)表不是存在檢查點的,而是獨立于檢查點之外的其他地方。
3. 狀態(tài)物化后,狀態(tài)變更日志就可以被截斷到相應的點
所謂截斷就是清理歷史的狀態(tài)操作日志,用新的操作日志替換。
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存儲 changelog 數(shù)據(jù)
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint.restore-mode: CLAIM
注意事項
目前為實驗性功能,開啟后可能會造成資源消耗巨大:
- HDFS 上保存的文件數(shù)過多
- 消耗更多的 IO 帶寬用于上傳變更日志
- 更多的 CPU 用于序列化狀態(tài)更改
- TaskManager 使用更多內(nèi)存來緩存狀態(tài)更改
使用限制:
- checkpoint 的最大并發(fā)數(shù)必須為1
- 從 Flink 1.15 開始,只有文件系統(tǒng)的存儲類型實現(xiàn)可用(memory 內(nèi)存存儲還在測試階段)
- 不支持 NO_CLAIM 模式
使用方式:
1)配置文件指定:
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存儲 changelog 數(shù)據(jù)
dstl.dfs.base-path: hdfs://hadoop102:8080/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
executopn.savepoint-restore-mode: CLAIM
2)代碼中設置
引入依賴(打包的時候是不需要打包進去的):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-changelog</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
開啟 changelog:
// 開啟 cheangelog 需要設置檢查點的最大并發(fā)為 1
checkpointConfig.setMaxConcurrentCheckpoints(1);
env.enableChangelogStateBackend(true);
4. 最終檢查點
如果數(shù)據(jù)源是有界的,就可能出現(xiàn)部分 task 已經(jīng)處理完所有數(shù)據(jù)變成finished的狀態(tài),不繼續(xù)工作。從 Flink 1.14開始這些 finished 狀態(tài)的 task,也可以繼續(xù)執(zhí)行檢查點。自 1.15 起默認啟用此功能,并且可以通過功能標志禁用它(一般我們肯定是不希望關掉的):
Configuration conf = new Configuration();
// 從 Flink1.15 開始默認啟用(true)
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
1.5、保存點(Savepoint)
????????除了檢查點(checkpoint)外,F(xiàn)link 還提供了另一個非常獨特的鏡像保存功能——保存點(Savepoint)。
????????從名稱就可以看出,這也是一個存盤的備份,它的原理和算法與檢查點完全相同,只是多了一些額外的元數(shù)據(jù)。
????????事實上,保存點就是通過檢查點的機制來創(chuàng)建流式作業(yè)狀態(tài)的一致性鏡像(consistent image)的。保存點中的狀態(tài)快照,是以算子 ID 和狀態(tài)名稱組織起來的,相當于一個鍵值對。從保存點啟動應用程序時,F(xiàn)link 會將保存點的狀態(tài)數(shù)據(jù)重新分配給相應的算子任務。
1. 保存點的用途
????????保存點與檢查點最大的區(qū)別,就是觸發(fā)的時機(檢查點就像CSDN草稿的自動保存,而保存點就像我們手動的保存草稿)。檢查點是由 Flink 自動管理的,定期創(chuàng)建,發(fā)生故障之后自動讀取進行恢復,這是一個“自動存盤”的功能;而保存點不會自動創(chuàng)建,必須由用戶明確地手動觸發(fā)保存操作,所以就是“手動存盤”。因此兩者盡管原理一致,但用途就有所差別了:檢查點主要用來做故障恢復,是容錯機制的核心;保存點則更加靈活,可以用來做有計劃的手動備份和恢復。保存點可以當作一個強大的運維工具來使用。我們可以在需要的時候創(chuàng)建一個保存點,然后停止應用,做一些處理調(diào)整之后再從保存點重啟。它適用的具體場景有:
- 版本管理和歸檔存儲:對重要的節(jié)點進行手動備份,設置為某一版本,歸檔(archive)存儲應用程序的狀態(tài)。
- ?更新 Flink 版本:目前 Flink 的底層架構已經(jīng)非常穩(wěn)定,所以當 Flink 版本升級時,程序本身一般是兼容的。這時不需要重新執(zhí)行所有的計算,只要創(chuàng)建一個保存點,停掉應用、升級 Flink 后,從保存點重啟就可以繼續(xù)處理了。
- 更新應用程序:我們不僅可以在應用程序不變的時候,更新 Flink 版本;還可以直接更新應用程序。前提是程序必須是兼容的,也就是說更改之后的程序,狀態(tài)的拓撲結構和數(shù)據(jù)類型都是不變的,這樣才能正常從之前的保存點去加載。這個功能非常有用。我們可以及時修復應用程序中的邏輯 bug,更新之后接著繼續(xù)處理;也可以用于有不同業(yè)務邏輯的場景,比如 A/B 測試等等。
- 調(diào)整并行度:如果應用運行的過程中,發(fā)現(xiàn)需要的資源不足或已經(jīng)有了大量剩余,也可以通過從保存點重啟的方式,將應用程序的并行度增大或減小。
- 暫停應用程序:有時候我們不需要調(diào)整集群或者更新程序,只是單純地希望把應用暫停、釋放一些資源來處理更重要的應用程序。使用保存點就可以靈活實現(xiàn)應用的暫停和重啟,可以對有限的集群資源做最好的優(yōu)化配置。
需要注意的是,保存點能夠在程序更改的時候依然兼容,前提是狀態(tài)的拓撲結構(比如原先是 source —> map ——> sum——>sink 之后變成了 source —> map ——> process——>sink 那么愚笨sum中的狀態(tài)肯定不在了,因為這條算子鏈的結構已經(jīng)變了)和數(shù)據(jù)類型(比如原本sum中存的是 ValueState 類型 之后變成了 MapState,這種也恢復不了)不變。我們知道保存點中狀態(tài)都是以算子 ID-狀態(tài)名稱這樣的 key-value 組織起來的,算子ID 可以在代碼中直接調(diào)用 SingleOutputStreamOperator 的.uid()方法來進行指定:
DataStream<String> stream = env
.addSource(new StatefulSource())
.uid("source-id")
.map(new StatefulMapper())
.uid("mapper-id")
.print();
對于沒有設置 ID 的算子,F(xiàn)link 默認會自動進行設置,所以在重新啟動應用后可能會導致ID 不同而無法兼容以前的狀態(tài)。所以為了方便后續(xù)的維護,強烈建議在程序中為每一個算子手動指定 ID。
2. 使用保存點
保存點的使用非常簡單,我們可以使用命令行工具來創(chuàng)建保存點,也可以從保存點恢復作業(yè)。
(1)創(chuàng)建保存點
要在命令行中為運行的作業(yè)創(chuàng)建一個保存點鏡像,只需要執(zhí)行:
bin/flink savepoint :jobId [:targetDirectory]
這里 jobId 需要填充要做鏡像保存的作業(yè) ID,目標路徑 targetDirectory 可選,表示保存點存儲的路徑。
對于保存點的默認路徑,可以通過配置文件 flink-conf.yaml 中的 state.savepoints.dir 項來設定:
state.savepoints.dir: hdfs:///flink/savepoints
當然對于單獨的作業(yè),我們也可以在程序代碼中通過執(zhí)行環(huán)境來設置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
由于創(chuàng)建保存點一般都是希望更改環(huán)境之后重啟,所以創(chuàng)建之后往往緊接著就是停掉作業(yè)的操作。除了對運行的作業(yè)創(chuàng)建保存點,我們也可以在停掉一個作業(yè)時直接創(chuàng)建保存點:
bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)從保存點重啟應用
我們已經(jīng)知道,提交啟動一個 Flink 作業(yè),使用的命令是 flink run;現(xiàn)在要從保存點重啟一個應用,其實本質(zhì)是一樣的:
bin/flink run -s :savepointPath [:runArgs]
這里只要增加一個-s 參數(shù),指定保存點的路徑就可以了,其他啟動時的參數(shù)還是完全一樣的,如果是基于 yarn 的運行模式還需要加上 -yid application-id。在使用 web UI 進行作業(yè)提交時,可以填入的參數(shù)除了入口類、并行度和運行參數(shù),還有一個“Savepoint Path”,這就是從保存點啟動應用的配置。
3. 使用保存點恢復狀態(tài)以及切換狀態(tài)后端
Flink1.17 版本還提供了使用保存點切換狀態(tài)后端,比如我們原本是 rocksdb 狀態(tài)后端,想改成 hashmap 狀態(tài)后端。也就是使用 savepoint 恢復狀態(tài)的時候,去更換狀態(tài)后端。需要注意的是,不要在代碼中指定狀態(tài)后端了,通過配置文件來配置或者 -D 參數(shù)配置。
// yarn 模式
bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar
// 不使用 yarn 模式
bin/flink run-application -d -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar
關閉程序并指定狀態(tài)保存點:
我們這里用的是 cancel ,但其實我們更加推薦用 stop。
這次我們再次重啟應用,并指定保存點:
為了再次驗證是否是從保存點恢復,我們在 netcat 中輸入 a?(之前已經(jīng)統(tǒng)計過一次了,如果保存點成功導入,結果將會輸出 (a,2))
修改狀態(tài)后端(這里我沒有用 yarn 模式):
bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/sp/savepoint-7ca51f-5e1c4815e549 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar
如果是yarn模式直接加一個 -t yarn-application 就好了。
4. 使用 checkpoint 恢復狀態(tài)
同樣,也可以從 checkpoint 來進行狀態(tài)的恢復,但是注意,使用 checkpoint恢復的話不能切換狀態(tài)后端,但是恢復命令還是一樣的,都是指定恢復路徑來重啟應用:
// 路徑必須指定到 checkpoint 的 chk-id 目錄
bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/22516b874d99d1478983a9a5b248c6bf/chk-175 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar
我們說不能切換狀態(tài)后端,但是這里指定 -Dstate.backend=rocksdb 并不影響,因為作業(yè)本來現(xiàn)在狀態(tài)本來就是rocksdb 的。
2、狀態(tài)一致性
2.1、一致性的概念和級別
????????在分布式系統(tǒng)中,一致性(consistency)是一個非常重要的概念;在事務(transaction)中,一致性也是重要的一個特性。Flink 中一致性的概念,主要用在故障恢復的描述中,所以更加類似于事務中的表述。那到底什么是一致性呢?
????????簡單來講,一致性其實就是結果的正確性,一般從數(shù)據(jù)丟失、數(shù)據(jù)重復角度來評估。對于分布式系統(tǒng)而言,強調(diào)的是不同節(jié)點中相同數(shù)據(jù)的副本應該總是“一致的”,也就是從不同節(jié)點讀取時總能得到相同的值;而對于事務而言,是要求提交更新操作后,能夠讀取到新的數(shù)據(jù)。對于 Flink 來說,多個節(jié)點并行處理不同的任務,我們要保證計算結果是正確的,就必須不漏掉任何一個數(shù)據(jù),而且也不會重復處理同一個數(shù)據(jù)。流式計算本身就是一個一個來的,所以正常處理的過程中結果肯定是正確的;在發(fā)生故障、需要恢復狀態(tài)進行回滾時就需要更多的保障機制了。我們通過檢查點的保存來保證狀態(tài)恢復后結果的正確,所以主要討論的就是“狀態(tài)的一致性”。
一般說來,狀態(tài)一致性有三種級別:
? 最多一次(At-Most-Once)
? ? ? ? 就是說數(shù)據(jù)最多只處理一次,不管之后故沒故障,丟沒丟掉,數(shù)據(jù)只來一遍。對于 Flink 而言,不開啟 checkpoint 就是最多一次。
????????當任務發(fā)生故障時,最簡單的做法就是直接重啟,別的什么都不干;既不恢復丟失的狀態(tài),也不重放丟失的數(shù)據(jù)。每個數(shù)據(jù)在正常情況下會被處理一次,遇到故障時就會丟掉,所以就是“最多處理一次”。我們發(fā)現(xiàn),如果數(shù)據(jù)可以直接被丟掉,那其實就是沒有任何操作來保證結果的準確性;所以這種類型的保證也叫“沒有保證”。盡管看起來比較糟糕,不過如果我們的主要訴求是“快”,而對近似正確的結果也能接受,那這也不失為一種很好的解決方案。
? 至少一次(AT-LEAST-ONCE)
? ? ? ? 數(shù)據(jù)至少處理一次,甚至是多次,所以數(shù)據(jù)很可能重復處理。對于 Flink 而言,當?shù)谝粋€ Barrier 到達,而其他 Barrier 沒有到達時,第一個 Barrier 后面的數(shù)據(jù)不會等待,而是直接越過 Barrier ,當出現(xiàn)故障需要恢復檢查點的時候,會把一些 Barrier 之外的數(shù)據(jù)(也就是不該恢復的數(shù)據(jù))重復處理,這就是至少一次。
????????在實際應用中,我們一般會希望至少不要丟掉數(shù)據(jù)。這種一致性級別就叫作“至少一次”(at-least-once),就是說是所有數(shù)據(jù)都不會丟,肯定被處理了;不過不能保證只處理一次,有些數(shù)據(jù)會被重復處理。
????????在有些場景下,重復處理數(shù)據(jù)是不影響結果的正確性的,這種操作具有“冪等性”。比如,如果我們統(tǒng)計電商網(wǎng)站的 UV,需要對每個用戶的訪問數(shù)據(jù)進行去重處理,所以即使同一個數(shù)據(jù)被處理多次,也不會影響最終的結果,這時使用 at-least-once 語義是完全沒問題的。當然,如果重復數(shù)據(jù)對結果有影響,比如統(tǒng)計的是 PV,或者之前的統(tǒng)計詞頻 word count,使用at-least-once 語義就可能會導致結果的不一致了。為了保證達到 at-least-once 的狀態(tài)一致性,我們需要在發(fā)生故障時能夠重放數(shù)據(jù)。最常見的做法是,可以用持久化的事件日志系統(tǒng),把所有的事件寫入到持久化存儲中。這時只要記錄一個偏移量,當任務發(fā)生故障重啟后,重置偏移量就可以重放檢查點之后的數(shù)據(jù)了。Kafka 就是這種架構的一個典型實現(xiàn)。
? 精確一次(EXACTLY-ONCE)
? ? ? ? 第一個 Barrier 到達后,Barrier 后面的數(shù)據(jù)必須老老實實等著,等到所有 Barrier 都對齊之后才進行持久化,持久化完其他數(shù)據(jù)才能繼續(xù)處理?;蛘叻?Barrier 對齊情況下,第一個 Barrier 到達后直接跳到輸出緩沖區(qū)繼續(xù)往下游傳遞,把第一個Barrier 和其他Barrier 之間的數(shù)據(jù)都進行標記。這兩種都是精確一次。
????????最嚴格的一致性保證,就是所謂的“精確一次”(exactly-once,有時也譯作“恰好一次”)。這也是最難實現(xiàn)的狀態(tài)一致性語義。exactly-once 意味著所有數(shù)據(jù)不僅不會丟失,而且只被處理一次,不會重復處理。也就是說對于每一個數(shù)據(jù),最終體現(xiàn)在狀態(tài)和輸出結果上,只能有一次統(tǒng)計。exactly-once 可以真正意義上保證結果的絕對正確,在發(fā)生故障恢復后,就好像從未發(fā)生過故障一樣。很明顯,要做的 exactly-once,首先必須能達到 at-least-once 的要求,就是數(shù)據(jù)不丟。所以同樣需要有數(shù)據(jù)重放機制來保證這一點。另外,還需要有專門的設計保證每個數(shù)據(jù)只被處理一次。Flink 中使用的是一種輕量級快照機制——檢查點(checkpoint)來保證 exactly-once 語義。
2.2 端到端的狀態(tài)一致性
????????我們已經(jīng)知道檢查點可以保證 Flink 內(nèi)部狀態(tài)的一致性,而且可以做到精確一次(exactly-once)。那是不是說,只要開啟了檢查點,發(fā)生故障進行恢復,結果就不會有任何問題呢?
????????沒那么簡單。在實際應用中,一般要保證從用戶的角度看來,最終消費的數(shù)據(jù)是正確的。而用戶或者外部應用不會直接從 Flink 內(nèi)部的狀態(tài)讀取數(shù)據(jù),往往需要我們將處理結果寫入外部存儲中。這就要求我們不僅要考慮 Flink 內(nèi)部數(shù)據(jù)的處理轉換,還涉及從外部數(shù)據(jù)源讀取,以及寫入外部持久化系統(tǒng),整個應用處理流程從頭到尾都應該是正確的。
????????所以完整的流處理應用,應該包括了數(shù)據(jù)源、流處理器和外部存儲系統(tǒng)三個部分。這個完整應用的一致性,就叫作“端到端(end-to-end)的狀態(tài)一致性”,它取決于三個組件中最弱的那一環(huán)。一般來說,能否達到 at-least-once 一致性級別,主要看數(shù)據(jù)源能夠重放數(shù)據(jù);而能否達到 exactly-once 級別,流處理器內(nèi)部、數(shù)據(jù)源、外部存儲都要有相應的保證機制。
? ? ? ? 狀態(tài)一致性實現(xiàn)難度:At-most-once < At-least-once < Exactly-once 。
3、 端到端精確一次(end-to-end exactly-once)
????????實際應用中,最難做到、也最希望做到的一致性語義,無疑就是端到端(end-to-end)的“精確一次”(exactly-once)。我們知道,對于 Flink 內(nèi)部來說,檢查點機制可以保證故障恢復后數(shù)據(jù)不丟(在能夠重放的前提下),并且只處理一次,所以已經(jīng)可以做到 exactly-once 的一致性語義了。需要注意的是,我們說檢查點能夠保證故障恢復后數(shù)據(jù)只處理一次,并不是說之前統(tǒng)計過某個數(shù)據(jù),現(xiàn)在就不能再次統(tǒng)計了;而是要看狀態(tài)的改變和輸出的結果,是否只包含了一次這個數(shù)據(jù)的處理。由于檢查點保存的是之前所有任務處理完某個數(shù)據(jù)后的狀態(tài)快照,所以重放的數(shù)據(jù)引起的狀態(tài)改變一定不會包含在里面,最終結果中只處理了一次。所以,端到端一致性的關鍵點,就在于輸入的數(shù)據(jù)源端和輸出的外部存儲端。
3.1、 輸入端保證
????????輸入端主要指的就是 Flink 讀取的外部數(shù)據(jù)源。對于一些數(shù)據(jù)源來說,并不提供數(shù)據(jù)的緩沖或是持久化保存,數(shù)據(jù)被消費之后就徹底不存在了。例如 socket 文本流就是這樣, socket服務器是不負責存儲數(shù)據(jù)的,發(fā)送一條數(shù)據(jù)之后,我們只能消費一次,是“一錘子買賣”。對于這樣的數(shù)據(jù)源,故障后我們即使通過檢查點恢復之前的狀態(tài),可保存檢查點之后到發(fā)生故障期間的數(shù)據(jù)已經(jīng)不能重發(fā)了,這就會導致數(shù)據(jù)丟失。所以就只能保證 at-most-once 的一致性語義,相當于沒有保證。
????????想要在故障恢復后不丟數(shù)據(jù),外部數(shù)據(jù)源就必須擁有重放數(shù)據(jù)的能力。常見的做法就是對數(shù)據(jù)進行持久化保存,并且可以重設數(shù)據(jù)的讀取位置。一個最經(jīng)典的應用就是 Kafka。在 Flink的 Source 任務中將數(shù)據(jù)讀取的偏移量保存為狀態(tài),這樣就可以在故障恢復時從檢查點中讀取出來,對數(shù)據(jù)源重置偏移量,重新獲取數(shù)據(jù)。數(shù)據(jù)源可重放數(shù)據(jù),或者說可重置讀取數(shù)據(jù)偏移量,加上 Flink 的 Source 算子將偏移量作為狀態(tài)保存進檢查點,就可以保證數(shù)據(jù)不丟。這是達到 at-least-once 一致性語義的基本要求,當然也是實現(xiàn)端到端 exactly-once 的基本要求。
3.2、 輸出端保證
????????有了 Flink 的檢查點機制,以及可重放數(shù)據(jù)的外部數(shù)據(jù)源,我們已經(jīng)能做到 at-least-once了。但是想要實現(xiàn) exactly-once 卻有更大的困難:數(shù)據(jù)有可能重復寫入外部系統(tǒng)。因為檢查點保存之后,繼續(xù)到來的數(shù)據(jù)也會一一處理,任務的狀態(tài)也會更新,最終通過Sink 任務將計算結果輸出到外部系統(tǒng);只是狀態(tài)改變還沒有存到下一個檢查點中。這時如果出現(xiàn)故障,這些數(shù)據(jù)都會重新來一遍,就計算了兩次。我們知道對 Flink 內(nèi)部狀態(tài)來說,重復計算的動作是沒有影響的,因為狀態(tài)已經(jīng)回滾,最終改變只會發(fā)生一次(檢查點在持久化時某一時刻會有兩份檢查點:舊的檢查點和正在保存的,只有正在保存的檢查點保存成功了才會替換掉舊的檢查點);但對于外部系統(tǒng)來說,已經(jīng)寫入的結果就是潑出去的水,已經(jīng)無法收回了,再次執(zhí)行寫入就會把同一個數(shù)據(jù)寫入兩次。所以這時,我們只保證了端到端的 at-least-once 語義。為了實現(xiàn)端到端 exactly-once,我們還需要對外部存儲系統(tǒng)、以及 Sink 連接器有額外的要求。能夠保證 exactly-once 一致性的寫入方式有兩種:
- 冪等寫入
- 事務寫入
????????我們需要外部存儲系統(tǒng)對這兩種寫入方式的支持,而 Flink 也為提供了一些 Sink 連接器接口。接下來我們進行展開講解。
1. 冪等(idempotent)寫入
????????所謂“冪等”操作,就是說一個操作可以重復執(zhí)行很多次,但只導致一次結果更改。也就是說,后面再重復執(zhí)行就不會對結果起作用了。
????????數(shù)學中一個典型的例子是,ex 的求導下操作,無論做多少次,得到的都是自身。而在數(shù)據(jù)處理領域,最典型的就是對 HashMap 的插入操作:如果是相同的鍵值對,后面的重復插入就都沒什么作用了。這相當于說,我們并沒有真正解決數(shù)據(jù)重復計算、寫入的問題;而是說,重復寫入也沒關系,結果不會改變。所以這種方式主要的限制在于外部存儲系統(tǒng)必須支持這樣的冪等寫入:比如 Redis 中鍵值存儲,或者關系型數(shù)據(jù)庫(如 MySQL)中滿足查詢條件的更新操作。需要注意,對于冪等寫入,遇到故障進行恢復時,有可能會出現(xiàn)短暫的不一致。因為保存點完成之后到發(fā)生故障之間的數(shù)據(jù),其實已經(jīng)寫入了一遍,回滾的時候并不能消除它們。如果有一個外部應用讀取寫入的數(shù)據(jù),可能會看到奇怪的現(xiàn)象:短時間內(nèi),結果會突然“跳回”到之前的某個值,然后“重播”一段之前的數(shù)據(jù)。不過當數(shù)據(jù)的重放逐漸超過發(fā)生故障的點的時候,最終的結果還是一致的。
2. 事務(transactional)寫入
????????如果說冪等寫入對應用場景限制太多,那么事務寫入可以說是更一般化的保證一致性的方式。之前我們提到,輸出端最大的問題就是“覆水難收”,寫入到外部系統(tǒng)的數(shù)據(jù)難以撤回。自然想到,那怎樣可以收回一條已寫入的數(shù)據(jù)呢?利用事務就可以做到。我們都知道,事務(transaction)是應用程序中一系列嚴密的操作,所有操作必須成功完成,否則在每個操作中所做的所有更改都會被撤消。事務有四個基本特性:原子性(Atomicity)、一致性(Correspondence)、隔離性(Isolation)和持久性(Durability),這就是著名的 ACID。在 Flink 流處理的結果寫入外部系統(tǒng)時,如果能夠構建一個事務,讓寫入操作可以隨著檢查點來提交和回滾,那么自然就可以解決重復寫入的問題了。所以事務寫入的基本思想就是:用一個事務來進行數(shù)據(jù)向外部系統(tǒng)的寫入,這個事務是與檢查點綁定在一起的。當 Sink 任務遇到 barrier 時,開始保存狀態(tài)的同時就開啟一個事務,接下來所有數(shù)據(jù)的寫入都在這個事務中;待到當前檢查點保存完畢時,將事務提交,所有寫入的數(shù)據(jù)就真正可用了。如果中間過程出現(xiàn)故障,狀態(tài)會回退到上一個檢查點,而當前事務沒有正常關閉(因為當前檢查點沒有保存完),所以也會回滾,寫入到外部的數(shù)據(jù)就被撤銷了。具體來說,又有兩種實現(xiàn)方式:預寫日志(WAL)和兩階段提交(2PC)
(1)預寫日志(write-ahead-log,WAL)
我們發(fā)現(xiàn),事務提交是需要外部存儲系統(tǒng)支持事務的,否則沒有辦法真正實現(xiàn)寫入的回撤。那對于一般不支持事務的存儲系統(tǒng),能夠?qū)崿F(xiàn)事務寫入呢?預寫日志(WAL)就是一種非常簡單的方式。具體步驟是:
????????①先把結果數(shù)據(jù)作為日志(log)狀態(tài)保存起來
????????②進行檢查點保存時,也會將這些結果數(shù)據(jù)一并做持久化存儲
????????③在收到檢查點完成的通知時,將所有結果一次性寫入外部系統(tǒng)。
????????我們會發(fā)現(xiàn),這種方式類似于檢查點完成時做一個批處理,一次性的寫入會帶來一些性能上的問題;而優(yōu)點就是比較簡單,由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無論什么外部存儲系統(tǒng),理論上都能用這種方式一批搞定。在 Flink 中 DataStream API 提供了一個模板類GenericWriteAheadSink,用來實現(xiàn)這種事務型的寫入方式。
????????需要注意的是,預寫日志這種一批寫入的方式,有可能會寫入失?。凰栽趫?zhí)行寫入動作之后,必須等待發(fā)送成功的返回確認消息。在成功寫入所有數(shù)據(jù)后,在內(nèi)部再次確認相應的檢查點,這才代表著檢查點的真正完成。這里需要將確認信息也進行持久化保存,在故障恢復時,只有存在對應的確認信息,才能保證這批數(shù)據(jù)已經(jīng)寫入,可以恢復到對應的檢查點位置。但這種“再次確認”的方式,也會有一些缺陷。如果我們的檢查點已經(jīng)成功保存、數(shù)據(jù)也成功地一批寫入到了外部系統(tǒng),但是最終保存確認信息時出現(xiàn)了故障,F(xiàn)link 最終還是會認為沒有成功寫入。于是發(fā)生故障時,不會使用這個檢查點,而是需要回退到上一個;這樣就會導致這批數(shù)據(jù)的重復寫入。
(2)兩階段提交(two-phase-commit,2PC)
????????前面提到的各種實現(xiàn) exactly-once 的方式,多少都有點缺陷,有沒有更好的方法呢?自然是有的,這就是傳說中的兩階段提交(2PC)。顧名思義,它的想法是分成兩個階段:先做“預提交”,等檢查點完成之后再正式提交。這種提交方式是真正基于事務的,它需要外部系統(tǒng)提供事務支持。
具體的實現(xiàn)步驟為:
????????①當?shù)谝粭l數(shù)據(jù)到來時,或者收到檢查點的分界線時,Sink 任務都會啟動一個事務。
????????②接下來接收到的所有數(shù)據(jù),都通過這個事務寫入外部系統(tǒng);這時由于事務沒有提交,所以數(shù)據(jù)盡管寫入了外部系統(tǒng),但是不可用,是“預提交”的狀態(tài)。
????????③當 Sink 任務收到 JobManager 發(fā)來檢查點完成的通知時,正式提交事務,寫入的結果就真正可用了。
????????當中間發(fā)生故障時,當前未提交的事務就會回滾,于是所有寫入外部系統(tǒng)的數(shù)據(jù)也就實現(xiàn)了撤回。這種兩階段提交(2PC)的方式充分利用了 Flink 現(xiàn)有的檢查點機制:分界線的到來,就標志著開始一個新事務;而收到來自 JobManager 的 checkpoint 成功的消息,就是提交事務的指令。每個結果數(shù)據(jù)的寫入,依然是流式的,不再有預寫日志時批處理的性能問題;最終提交時,也只需要額外發(fā)送一個確認信息。所以 2PC 協(xié)議不僅真正意義上實現(xiàn)了 exactly-once,而且通過搭載 Flink 的檢查點機制來實現(xiàn)事務,只給系統(tǒng)增加了很少的開銷。Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我們自定義實現(xiàn)兩階段提交的SinkFunction 的實現(xiàn),提供了真正端到端的 exactly-once 保證。
不過兩階段提交雖然精巧,卻對外部系統(tǒng)有很高的要求。這里將 2PC 對外部系統(tǒng)的要求,列舉如下:
- 外部系統(tǒng)必須提供事務支持,或者 Sink 任務必須能夠模擬外部系統(tǒng)上的事務。
- 在檢查點的間隔期間里,必須能夠開啟一個事務并接受數(shù)據(jù)寫入。
- 在收到檢查點完成的通知之前,事務必須是“等待提交”的狀態(tài)。在故障恢復的情況下,這可能需要一些時間。如果這個時候外部系統(tǒng)關閉事務(例如超時了),那么未提交的數(shù)據(jù)就會丟失。
- Sink 任務必須能夠在進程失敗后恢復事務。
- 提交事務必須是冪等操作。也就是說,事務的重復提交應該是無效的。
可見,2PC 在實際應用同樣會受到比較大的限制。具體在項目中的選型,最終還應該是一致性級別和處理性能的權衡考量。
3.3、?Flink 和 Kafka 連接時的精確一次保證
????????在流處理的應用中,最佳的數(shù)據(jù)源當然就是可重置偏移量的消息隊列了;它不僅可以提供數(shù)據(jù)重放的功能,而且天生就是以流的方式存儲和處理數(shù)據(jù)的。所以作為大數(shù)據(jù)工具中消息隊列的代表,Kafka 可以說與 Flink 是天作之合,實際項目中也經(jīng)常會看到以 Kafka 作為數(shù)據(jù)源和寫入的外部系統(tǒng)的應用。這里,我們就來具體討論一下 Flink 和 Kafka 連接時,怎樣保證端到端的 exactly-once 狀態(tài)一致性。
這里我們的并行度為2,輸入端是 Kafka ,數(shù)據(jù)從 Kafka產(chǎn)生,經(jīng)過 Flink 處理之后再次輸出到 Kafka。
1. 我們看到,兩個并行度下,Kafka 的兩個分區(qū)分別把單詞 ‘c’?和 ‘a(chǎn)’ 發(fā)送到下游的Source1 和 Source2,此時正好 JobManager 發(fā)出 Barrier ,于是Source1 和 Source2將各自的偏移量持久化到檢查點當中。
2. 這里我們不考慮中間Flink內(nèi)部算子怎么操作持以及久化檢查點的,我們主要關心輸出端是如何實現(xiàn)精確一次的。我們看到第一條數(shù)據(jù)到了 Sink 算子后(注意是整個程序的第一條數(shù)據(jù)并不是每個sink節(jié)點接收到的第一條數(shù)據(jù),這里的圖例有誤),?Sink 節(jié)點開啟第一次事務(也就是第一個數(shù)據(jù)到下一個 Barrier 之間的數(shù)據(jù)將被保存為第一個版本的檢查點狀態(tài)),預提交開始。同時會將事務的狀態(tài)保存到狀態(tài)。
3. 預提交階段:到達Sink的數(shù)據(jù)會調(diào)用 Kafka producer 的 send() 方法,數(shù)據(jù)寫入緩沖區(qū),再 flush() 。此時數(shù)據(jù)寫入到 Kafka,標記為“未提交”狀態(tài),如果任意一個 Sink 節(jié)點預提交過程中出現(xiàn)失敗,整個預提交會放棄(雖然放棄,但是畢竟數(shù)據(jù)已經(jīng)寫入到了 Kafka,我們Flink不可能進去Kafka去刪除數(shù)據(jù),只能在讀取數(shù)據(jù)的時候?qū)τ跇擞洖椤邦A提交”的數(shù)據(jù)選擇視而不見)。
4. id=1的barrier到達sink節(jié)點,觸發(fā)barrier節(jié)點的本地狀態(tài)保存到hdfs本地狀態(tài),包含自身的狀態(tài)和事務快照。同時第一輪檢查點保存結束,再次開啟一個新的Kafka事務,用于該barrier后面的數(shù)據(jù)的預提交。只有第一個事務是由第一個數(shù)據(jù)開啟,之后的事務都是由barrier開啟。
5. 當全部的 節(jié)點做完本地的checkpoint,jobmanager向所有節(jié)點發(fā)送一個本輪成功的回調(diào)消息(JobManager就知道了本輪id的barrier持久化狀態(tài)任務已經(jīng)完成),預提交結束。
6. sink 收到chekpoint 完成的通知,進行事務的正式提交,將寫入Kafka的數(shù)據(jù)標記修改為“已提交”,如果發(fā)生障礙回滾到上次完成快照的時間點。
7. 成功正式提交后,Kafka 會返回一個消息給sink節(jié)點,sink節(jié)點會將存在狀態(tài)里的事務狀態(tài)修改為finished狀態(tài)。
1. 整體介紹
既然是端到端的 exactly-once,我們依然可以從三個組件的角度來進行分析:
(1)Flink 內(nèi)部
????????Flink 內(nèi)部可以通過檢查點機制保證狀態(tài)和處理結果的 exactly-once 語義(也就是開啟檢查點并設置狀態(tài)一致性語義為精準一次)。
(2)輸入端
????????輸入數(shù)據(jù)源端的 Kafka 可以對數(shù)據(jù)進行持久化保存,并可以重置偏移量(offset)。所以我們可以在 Source 任務(FlinkKafkaConsumer)中將當前讀取的偏移量保存為算子狀態(tài),寫入到檢查點中;當發(fā)生故障時,從檢查點中讀取恢復狀態(tài),并由連接器 FlinkKafkaConsumer 向 Kafka重新提交偏移量,就可以重新消費數(shù)據(jù)、保證結果的一致性了。
(3)輸出端
????????輸出端保證 exactly-once 的最佳實現(xiàn),當然就是兩階段提交(2PC)。作為與 Flink 天生一對的 Kafka,自然需要用最強有力的一致性保證來證明自己。Flink 官方實現(xiàn)的 Kafka 連接器中,提供了寫入到 Kafka 的 FlinkKafkaProducer,它就實現(xiàn)了 TwoPhaseCommitSinkFunction 接口。
也就是說我們寫入 Kafka 的過程其實是一個兩段式的提交處理完畢,得到結果寫入 Kafka 是基于事物的“預提交”,等到檢查點保存完畢才會提交事務,進行正式提交,如果中間出現(xiàn)故障,事故進行回滾,預提交就會被放棄,恢復狀態(tài)之后也只能恢復所有已確認提交的操作。
2. 需要的配置
? ? ? ? 在具體應用中,實現(xiàn)真正的端到端 exactly-once ,還需要有一些額外的配置:
- 必須啟用檢查點
- 指定 Kafka Sink 的發(fā)送級別為 DeliveryGuarantee.EXACTLY_ONCE
- 配置 Kafka 讀取數(shù)據(jù)的消費者隔離級別
? ? ? ? 這里所說的 Kafka ,是寫入的外部系統(tǒng)。預提交階段數(shù)據(jù)已經(jīng)寫入,只是被標記為“未提交”(uncommitted),而 Kafka 中默認的隔離級別 isolation.level 是 read_uncommited ,也就是可以讀取未提交的數(shù)據(jù)。這樣一來,外部應用就可以直接消費未提交的數(shù)據(jù),對于事務性的保證就失效了 。所以應該將隔離級別進行配置。
? ? ? ? 為 read_commited ,表示消費者遇到未提交的消息時,會停止從分區(qū)中消費數(shù)據(jù),直到消息被標記為已提交才會再次恢復消費。當然,這樣做的話,外部應用消費就會有顯著的延遲。
? ? ? ? 4. 事務超時配置
? ? ? ? 如果 checkpoint 周期 大于 事務時間,很可能我們要提交的時候事務已經(jīng)關閉,所以我們要保證事務的超時大于checkpoint周期。
? ? ? ? Flink 的 Kafka 連接器中配置的事務超時時間 transaction.timeout.ms?默認是一小時,而 Kafka 集群配置的事務超時時間?transaction.timeout.ms?默認是十五分鐘。所以在檢查點保存時間很長時,有可能出現(xiàn) Kafka 已經(jīng)認為事務超時了,丟棄了提交的數(shù)據(jù);而 Sink 任務認為還可以繼續(xù)等待。如果接下來檢查點保存成功,發(fā)送故障后回滾到這個檢查點的狀態(tài),這部分數(shù)據(jù)就被真正丟掉了。所以這兩個超時時間前者應該小于等于后者。
編碼演示
public class KafkaEOSDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 1. 檢查點配置
// 1. 周期為 5s 默認就是barrier對齊的精準一次
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 2. 指定檢查點的存儲位置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("file:///D://Desktop//FlinkStudy/chk");// 一般我們會存到云端
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// TODO 2. 讀取 Kafka
// 從 Kafka 讀取
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") //指定kafka地址和端口
.setGroupId("lyh") // 指定消費者組id
.setTopics("like") // 指定消費的topic,可以是多個用List<String>
.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因為kafka是生產(chǎn)者 flink作為消費者要反序列化
.setStartingOffsets(OffsetsInitializer.latest()) // 指定flink消費kafka的策略
.build();
DataStreamSource<String> kafka_source = env
.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)), "kafkaSource");
// TODO 3. 寫出到 Kafka
/* 寫到 kafka 的一致性級別: 精準一次 / 至少一次
如果是精準一次
1.必須開啟檢查點 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
2.必須設置事務的前綴
3.必須設置事務的超時時間: 大于 checkpoint間隔 小于 max 15分鐘
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定 kafka 的地址和端口
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
// 指定序列化器 我們是發(fā)送方 所以我們是生產(chǎn)者
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("wc")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 開啟兩階段提交
.setTransactionalIdPrefix("lyh-") // 事務前綴
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"")
.build();
kafka_source.sinkTo(kafkaSink);
env.execute();
}
}
我們在命令行開啟一個Kafka消費者來消費Flink寫入到Kafka的"wc"主題的數(shù)據(jù),運行程序可以發(fā)現(xiàn),當我們的生產(chǎn)者剛發(fā)送數(shù)據(jù),還沒到檢查點周期結束呢就被保存了(現(xiàn)象就是生產(chǎn)者剛發(fā)送一條數(shù)據(jù),消費者已經(jīng)讀取到了)。
我們可以在源碼中看到,默認 Kafka 的消費者的隔離級別是讀未提交,這種情況下,預提交的數(shù)據(jù)也會被讀取到(這是不滿足端到端精準一次的,因為如果我們的中間出故障了,預提交的數(shù)據(jù)應該被丟棄,但是顯然現(xiàn)在預提交的數(shù)據(jù)已經(jīng)被讀取到了,事實上我們應該等到預處理的數(shù)據(jù)被標記為已提交的時候才能被讀?。晕覀冃枰渲?Kafka 的消費者隔離級別。
?我們開啟一個消費者來讀取我們Flink 寫入到 Kafka 中的數(shù)據(jù):
public class KafkaEOSSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 從 Kafka 讀取
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") //指定kafka地址和端口
.setGroupId("lyh") // 指定消費者組id
.setTopics("ws") // 指定消費的topic,可以是多個用List<String>
.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因為kafka是生產(chǎn)者 flink作為消費者要反序列化
.setStartingOffsets(OffsetsInitializer.latest()) // 指定flink消費kafka的策略
.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.build();
env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)),"kafkaSource")
.print();
env.execute();
}
/*
* kafka 消費者的參數(shù):
* auto.reset.offsets:
* earliest: 如果有offset,從offset繼續(xù)消費;如果沒有 就從 最早 消費
* latest : 如果有offset,從offset繼續(xù)消費;如果沒有 就從 最新 消費
* flink 的 kafkaSource offset消費者策略: offsetsInitializer,默認是 earliest
* earliest: 一定從 最早 消費 (不管有沒有offset)
* latest : 一定從 最新 消費 (不管有沒有offset)
*/
}
我們需要觀察的是,當我們生產(chǎn)者生產(chǎn)一條數(shù)據(jù)后,多久才會寫入到Kafka,是不是在一個checkpoint周期(我們這里設置的是5s)之后,如果是5s之后,說明是按照2pc來提交的。
所以,端到端精準一次對輸出端(一般都是Kafka)是有要求的,比如這里就要求必須設置消費者隔離級別為 read_committed 。文章來源:http://www.zghlxwxcb.cn/news/detail-795897.html
總結
Flink 的容錯機制終于是過完了,用時3天左右,收獲滿滿,期待下次復習以及背面試題的時候再來了解,這種底層的原理是真的有意思。希望 Flink 以后可以是我工作的主要工具,太愛了。文章來源地址http://www.zghlxwxcb.cn/news/detail-795897.html
到了這里,關于Flink(十二)【容錯機制】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!