Apache Flink 是一個(gè)分布式流處理框架,可以通過(guò)多種方式處理延遲數(shù)據(jù)。這里有幾個(gè)選項(xiàng):
??水位線WaterMarker:Flink 使用水位線來(lái)跟蹤流中的時(shí)間進(jìn)度。水位線是由源發(fā)出的周期性時(shí)間戳,用于確定一條數(shù)據(jù)的延遲時(shí)間。您可以根據(jù)水位線指定數(shù)據(jù)的最大延遲。例如,您可以指定延遲超過(guò) 10 個(gè)水位線的數(shù)據(jù)應(yīng)該被刪除。
??窗口延遲數(shù)據(jù):Flink 允許為預(yù)計(jì)在窗口內(nèi)無(wú)序到達(dá)的數(shù)據(jù)指定最大延遲(延遲)。allowedLateness可以在定義窗口時(shí)使用參數(shù)設(shè)置此最大延遲。任何在最大延遲時(shí)間之后到達(dá)的數(shù)據(jù)都將被丟棄。
??側(cè)輸入流:Flink 還允許您使用側(cè)輸入來(lái)處理延遲數(shù)據(jù)。輔助輸入是除了主輸入流之外還使用的數(shù)據(jù)集。您可以使用輔助輸入來(lái)存儲(chǔ)鍵的最新已知值,然后在遲到的數(shù)據(jù)到達(dá)時(shí)使用該值更新主輸入流。Output Mode:Flink 還允許您指定在使用帶參數(shù)的窗口時(shí)應(yīng)處理多晚的數(shù)據(jù)OutputMode。您可以選擇僅輸出延遲數(shù)據(jù)、僅輸出更新數(shù)據(jù)或同時(shí)輸出延遲數(shù)據(jù)和更新數(shù)據(jù)。
1、設(shè)置水位線延遲時(shí)間
???????水位線是事件時(shí)間的進(jìn)展,它是我們整個(gè)應(yīng)用的全局邏輯時(shí)鐘。水位線生成之后,會(huì)隨著數(shù)據(jù)在任務(wù)間流動(dòng),從而給每個(gè)任務(wù)指明當(dāng)前的事件時(shí)間。所以從這個(gè)意義上講,水位線是一個(gè)覆蓋萬(wàn)物的存在,它并不只針對(duì)事件時(shí)間窗口有效。之前我們講到觸發(fā)器時(shí)曾提到過(guò)“定時(shí)器”,時(shí)間窗口的操作底層就是靠定時(shí)器來(lái)控制觸發(fā)的。既然是底層機(jī)制,定時(shí)器自然就不可能是窗口的專利了;事實(shí)上它是 Flink 底層 API— —處理函數(shù)(process function)的重要部分。
???????所以水位線其實(shí)是所有事件時(shí)間定時(shí)器觸發(fā)的判斷標(biāo)準(zhǔn)。那么水位線的延遲,當(dāng)然也就是全局時(shí)鐘的滯后,相當(dāng)于是上帝撥動(dòng)了琴弦,所有人的表都變慢了。既然水位線這么重要,那一般情況就不應(yīng)該把它的延遲設(shè)置得太大,否則流處理的實(shí)時(shí)性就會(huì)大大降低。因?yàn)樗痪€的延遲主要是用來(lái)對(duì)付分布式網(wǎng)絡(luò)傳輸導(dǎo)致的數(shù)據(jù)亂序,而網(wǎng)絡(luò)傳輸?shù)膩y序程度一般并不會(huì)很大,大多集中在幾毫秒至幾百毫秒。所以實(shí)際應(yīng)用中,我們往往會(huì)給水位線設(shè)置一個(gè)“能夠處理大多數(shù)亂序數(shù)據(jù)的小延遲”,視需求一般設(shè)在毫秒~秒級(jí)。當(dāng)我們?cè)O(shè)置了水位線延遲時(shí)間后,所有定時(shí)器就都會(huì)按照延遲后的水位線來(lái)觸發(fā)。如果一個(gè)數(shù)據(jù)所包含的時(shí)間戳,小于當(dāng)前的水位線,那么它就是所謂的“遲到數(shù)據(jù)”。
~代碼示例
// Define a window with a maximum lateness of 5 seconds
Window<Tuple2<String, Integer>> window = Window.into(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5));
2、允許窗口處理遲到數(shù)據(jù)
???????水位線延遲設(shè)置的比較小,那之后如果仍有數(shù)據(jù)遲到該怎么辦?對(duì)于窗口計(jì)算而言,如果水位線已經(jīng)到了窗口結(jié)束時(shí)間,默認(rèn)窗口就會(huì)關(guān)閉,那么之后再來(lái)的數(shù)據(jù)就要被丟棄了。自然想到,F(xiàn)link 的窗口也是可以設(shè)置延遲時(shí)間,允許繼續(xù)處理遲到數(shù)據(jù)的。這種情況下,由于大部分亂序數(shù)據(jù)已經(jīng)被水位線的延遲等到了,所以往往遲到的數(shù)據(jù)不會(huì)太多。這樣,我們會(huì)在水位線到達(dá)窗口結(jié)束時(shí)間時(shí),先快速地輸出一個(gè)近似正確的計(jì)算結(jié)果;然后保持窗口繼續(xù)等到延遲數(shù)據(jù),每來(lái)一條數(shù)據(jù),窗口就會(huì)再次計(jì)算,并將更新后的結(jié)果輸出。這樣就可以逐步修正計(jì)算結(jié)果,最終得到準(zhǔn)確的統(tǒng)計(jì)值了。
???????類比班車的例子,我們可以這樣理解:大多數(shù)人是在發(fā)車時(shí)刻前后到達(dá)的,所以我們只要把表調(diào)慢,稍微等一會(huì)兒,絕大部分人就都上車了,這個(gè)把表調(diào)慢的時(shí)間就是水位線的延遲;到點(diǎn)之后,班車就準(zhǔn)時(shí)出發(fā)了,不過(guò)可能還有該來(lái)的人沒(méi)趕上。于是我們就先慢慢往前開(kāi),這段時(shí)間內(nèi),如果遲到的人抓點(diǎn)緊還是可以追上的;如果有人追上來(lái)了,就停車開(kāi)門讓他上來(lái),然后車?yán)^續(xù)向前開(kāi)。當(dāng)然我們的車不能一直慢慢開(kāi),需要有一個(gè)時(shí)間限制,這就是窗口的允許延遲時(shí)間。一旦超過(guò)了這個(gè)時(shí)間,班車就不再停留,開(kāi)上高速疾馳而去了。所以我們將水位線的延遲和窗口的允許延遲數(shù)據(jù)結(jié)合起來(lái),最后的效果就是先快速實(shí)時(shí)地輸出一個(gè)近似的結(jié)果,而后再不斷調(diào)整,最終得到正確的計(jì)算結(jié)果?;叵肓魈幚淼陌l(fā)展過(guò)程,這不就是著名的 Lambda 架構(gòu)嗎?原先需要兩套獨(dú)立的系統(tǒng)來(lái)同時(shí)保證實(shí)時(shí)性和結(jié)果的最終正確性,如今 Flink 一套系統(tǒng)就全部搞定了。
~代碼示例
// Define a watermark strategy that drops data more than 10 watermarks late
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Integer>> watermarkStrategy =
//有界無(wú)序時(shí)間戳提取器,設(shè)置 watermark 延遲時(shí)間,10 秒鐘
new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Integer>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple2<String, Integer> element) {
// extract the timestamp from the element
return element.f0;
}
};
// Assign the watermark strategy to the input stream
DataStream<Tuple2<String, Integer>> watermarkedStream = inputStream.assignTimestampsAndWatermarks(watermarkStrategy);
// Process the watermarked stream
watermarkedStream.process(new ProcessFunction<Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) {
// data with a timestamp more than 10 watermarks in the past will not be processed
}
});
3、將遲到數(shù)據(jù)放入窗口側(cè)輸出流
???????即使我們有了前面的雙重保證,可窗口不能一直等下去,最后總要真正關(guān)閉。窗口一旦關(guān)閉,后續(xù)的數(shù)據(jù)就都要被丟棄了。那如果真的還有漏網(wǎng)之魚又該怎么辦呢?
???????用窗口的側(cè)輸出流來(lái)收集關(guān)窗以后的遲到數(shù)據(jù)。這種方式是最后“兜底”的方法,只能保證數(shù)據(jù)不丟失;因?yàn)榇翱谝呀?jīng)真正關(guān)閉,所以是無(wú)法基于之前窗口的結(jié)果直接做更新的。我們只能將之前的窗口計(jì)算結(jié)果保存下來(lái),然后獲取側(cè)輸出流中的遲到數(shù)據(jù),判斷數(shù)據(jù)所屬的窗口,手動(dòng)對(duì)結(jié)果進(jìn)行合并更新。盡管有些煩瑣,實(shí)時(shí)性也不夠強(qiáng),但能夠保證最終結(jié)果一定是正確的。如果還用趕班車來(lái)類比,那就是車已經(jīng)上高速開(kāi)走了,這班車是肯定趕不上了。不過(guò)我們還留下了行進(jìn)路線和聯(lián)系方式,遲到的人如果想辦法輾轉(zhuǎn)到了目的地,還是可以和大部隊(duì)會(huì)合的。最終,所有該到的人都會(huì)在目的地出現(xiàn)。所以總結(jié)起來(lái),F(xiàn)link 處理遲到數(shù)據(jù),對(duì)于結(jié)果的正確性有三重保障:水位線的延遲,窗口允許遲到數(shù)據(jù),以及將遲到數(shù)據(jù)放入窗口側(cè)輸出流。
~代碼示例
// Define an OutputTag for late data
OutputTag<Tuple2<String, Integer>> lateDataOutputTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};
// Apply a window to the stream with an allowed lateness of 5 seconds
DataStream<Tuple2<String, Integer>> windowedStream = inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(lateDataOutputTag); //Late data enters the measurement output stream
// Process the main output stream
windowedStream.apply((window, values, out) -> {
// values in this stream will include only those that arrived within 5 seconds of the end of the window
});
// Process the late data output stream
DataStream<Tuple2<String, Integer>> lateDataStream = windowedStream.getSideOutput(lateDataOutputTag);
lateDataStream.apply((window, values, out) -> {
// values in this stream will include late data that arrived more than 5 seconds after the end of the window
});
在上面簡(jiǎn)單示例中,我們將一個(gè)窗口應(yīng)用于輸入流,最大延遲為 5 秒。該sideOutputLateData()方法用于將任何遲到的數(shù)據(jù)輸出到側(cè)輸出流,該側(cè)輸出流使用該getSideOutput()方法和lateDataOutputTagOutputTag 進(jìn)行訪問(wèn)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-463323.html
主輸出流將僅包含在窗口結(jié)束后 5 秒內(nèi)到達(dá)的那些值,而延遲數(shù)據(jù)輸出流將包含在窗口結(jié)束后超過(guò) 5 秒到達(dá)的任何值。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-463323.html
到了這里,關(guān)于Flink對(duì)遲到數(shù)據(jù)的處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!