1. 時(shí)間語義
這里先介紹一下什么是時(shí)間語義
,時(shí)間語義
在Flink中是一種很重要的概念,下面介紹的水位線
就是基于時(shí)間語義
來講的。
在Flink中我們提到的時(shí)間語義一般指的是事件時(shí)間
和處理時(shí)間
:
-
處理時(shí)間(Processing Time)
,一般指執(zhí)行處理操作的系統(tǒng)時(shí)間,也就是Flink的窗口算子對(duì)該數(shù)據(jù)的操作時(shí)間。 -
事件時(shí)間(Event Time)
, 一般指每個(gè)事件在對(duì)應(yīng)設(shè)備上發(fā)生的時(shí)間,也就是數(shù)據(jù)的生成的時(shí)間。
Flink中之所以會(huì)出現(xiàn)這兩種時(shí)間語義,是因?yàn)镕link的分布式系統(tǒng)會(huì)有網(wǎng)絡(luò)傳輸延遲以及時(shí)鐘飄逸,處理時(shí)間
相對(duì)于事件時(shí)間
會(huì)有所滯后,并且數(shù)據(jù)在網(wǎng)絡(luò)以及Flink
中的傳輸是是亂序的。Flink的1.12版本之前默認(rèn)使用的是處理時(shí)間
,之后已經(jīng)將事件時(shí)間
作為默認(rèn)
的時(shí)間語義。
對(duì)于 先產(chǎn)生的數(shù)據(jù)先被處理
,這就要求需要保證數(shù)據(jù)的到達(dá)順序,但是因?yàn)榫W(wǎng)絡(luò)傳輸延遲不確定性,是無法保證數(shù)據(jù)的到達(dá)順序,在這種情況下就不能簡單使用數(shù)據(jù)自帶的時(shí)間戳當(dāng)做時(shí)鐘,而是需要另外的標(biāo)志來表達(dá)事件時(shí)間
的進(jìn)展,在Flink中這就是事件時(shí)間
的水位線
。
2. 水位線
Flink中的水位線(Watermark)是一種用來表示事件時(shí)間進(jìn)展的機(jī)制,它可以幫助Flink系統(tǒng)及時(shí)處理延遲數(shù)據(jù),并保證處理結(jié)果的正確性。具體來說,水位線是一個(gè)時(shí)間戳,表示對(duì)于所有時(shí)間戳小于該水位線的事件已經(jīng)全部到達(dá),可以進(jìn)行相應(yīng)的計(jì)算處理。
2.1 事件時(shí)間和窗口
在實(shí)際應(yīng)用中,一般都會(huì)采用事件時(shí)間語義,而水位線就是基于事件時(shí)間提出的概念。和水位線相關(guān)的還有一個(gè)窗口概念,事件時(shí)間和窗口也是有很大的關(guān)系。
這里舉一個(gè)例子,我們有一個(gè)班車系統(tǒng),班車上裝的都是帶有生產(chǎn)時(shí)間的商品,如果有一輛車的開車時(shí)間是8點(diǎn),我們需要這輛車到九點(diǎn)就開始發(fā)車。商品一個(gè)個(gè)到達(dá)車上,車上的商品時(shí)間從8點(diǎn)開始依次增長。當(dāng)?shù)竭_(dá)車上的商品生產(chǎn)時(shí)間是九點(diǎn)的時(shí)候,這時(shí)候車門關(guān)閉開始發(fā)車。
在這個(gè)過程中,我們說的班車就相當(dāng)于窗口,定義了一個(gè)8點(diǎn)到九點(diǎn)的窗口。并且我們會(huì)定義一個(gè)邏輯時(shí)鐘,這個(gè)邏輯時(shí)鐘是基于事件時(shí)間來的,不會(huì)自動(dòng)流逝。時(shí)鐘的進(jìn)展就是靠著新的數(shù)據(jù)上的事件時(shí)間時(shí)間戳來推動(dòng)的。這樣就不需要依賴系統(tǒng)的時(shí)間,無論什么時(shí)候進(jìn)行統(tǒng)計(jì)處理,得到的結(jié)果都是正確的。
2.2 什么是水位線
在事件時(shí)間語義下,我們可以不依賴系統(tǒng)時(shí)間,而是基于數(shù)據(jù)自帶的時(shí)間戳去定義了一個(gè)時(shí)鐘,用來表示當(dāng)前時(shí)間的進(jìn)展。這樣每個(gè)并行子任務(wù)都會(huì)有一個(gè)自己的邏輯時(shí)鐘,它的前進(jìn)是靠數(shù)據(jù)的時(shí)間戳來驅(qū)動(dòng)的。
但是在分布式系統(tǒng)中,會(huì)存在一些問題,因?yàn)閿?shù)據(jù)本身在處理轉(zhuǎn)換過程中會(huì)發(fā)生變化,如果遇到窗口聚合的操作,呢么下游的數(shù)據(jù)就會(huì)變少,時(shí)間進(jìn)度的控制就不精細(xì)了。另外,數(shù)據(jù)向下游任務(wù)傳遞時(shí),一般只能傳輸給一個(gè)子任務(wù)(除廣播外),這樣其他的并行子任務(wù)的時(shí)鐘就無法推進(jìn)了。
所以時(shí)鐘也需要以數(shù)據(jù)的形式傳遞出去,告訴下游任務(wù)當(dāng)前時(shí)間的進(jìn)展;而且時(shí)鐘傳遞過程也不會(huì)因?yàn)檫\(yùn)算而停滯。解決這個(gè)問題的想法就是在數(shù)據(jù)流種加入一個(gè)時(shí)間標(biāo)記,記錄當(dāng)前的事件時(shí)間并且廣播到下游,當(dāng)下游收到這個(gè)標(biāo)記就可以更新自己的時(shí)鐘了。這種用來衡量事件時(shí)間進(jìn)展的標(biāo)記在Flink中就被稱作水位線。
如圖 6-5 所示,每個(gè)事件產(chǎn)生的數(shù)據(jù),都包含了一個(gè)時(shí)間戳,我們直接用一個(gè)整數(shù)表示。這里沒有指定單位,可以理解為秒或者毫秒(方便起見,下面講述統(tǒng)一認(rèn)為是秒)。當(dāng)產(chǎn)生于2 秒的數(shù)據(jù)到來之后,當(dāng)前的事件時(shí)間就是 2 秒;在后面插入一個(gè)時(shí)間戳也為 2 秒的水位線,隨著數(shù)據(jù)一起向下游流動(dòng)。而當(dāng) 5 秒產(chǎn)生的數(shù)據(jù)到來之后,同樣在后面插入一個(gè)水位線,時(shí)間戳也為 5,當(dāng)前的時(shí)鐘就推進(jìn)到了 5 秒。這樣,如果出現(xiàn)下游有多個(gè)并行子任務(wù)的情形,我們只要將水位線廣播出去,就可以通知到所有下游任務(wù)當(dāng)前的時(shí)間進(jìn)度了。
水位線就像它的名字一樣,是數(shù)據(jù)流中的一部分,隨著數(shù)據(jù)一起流動(dòng),在不同任務(wù)之間傳輸。不過水位線也存在有序和亂序區(qū)分:
2.2.1 有序流中的水位線
在理想狀態(tài)下,數(shù)據(jù)按照順序排好隊(duì)進(jìn)入數(shù)據(jù)流,并且處理的過程遵循先來后到的原則,這樣每個(gè)數(shù)據(jù)中提取的時(shí)間戳就會(huì)保持從小到大的增長,而水位線也會(huì)不斷增長、事件時(shí)鐘也不斷向前推進(jìn)。
但是在實(shí)際應(yīng)用中,數(shù)據(jù)量非常大,還有可能好多數(shù)據(jù)的事件時(shí)間戳都是相同的,每一個(gè)都插入水位線是做了很多無用功的。所以為了提高效率一般每隔一段時(shí)間生成一個(gè)水位線,這個(gè)水位線的時(shí)間戳就是當(dāng)前最新數(shù)據(jù)的時(shí)間戳,就如下圖所示:
這里注意水位線插入的周期本身也是時(shí)間概念,并且這個(gè)周期時(shí)間是指處理時(shí)間也就是系統(tǒng)時(shí)間,而不是事件時(shí)間,如果使用事件時(shí)間就陷入了死循環(huán)了。
2.2.2 亂序流中的水位線
有序流的處理非常簡單,但是在分布式系統(tǒng)中,會(huì)因?yàn)閿?shù)據(jù)在節(jié)點(diǎn)的傳輸以及網(wǎng)絡(luò)傳輸?shù)难舆t不確定性導(dǎo)致順序發(fā)生改變,這就是亂序數(shù)據(jù)。
這里所說的亂序是指數(shù)據(jù)的先后順序不一致,主要就是基于數(shù)據(jù)的產(chǎn)生時(shí)間而言的。如圖 6-7 所示,一個(gè) 7 秒時(shí)產(chǎn)生的數(shù)據(jù),生成時(shí)間自然要比 9 秒的數(shù)據(jù)早;但是經(jīng)過數(shù)據(jù)緩存和傳輸之后,處理任務(wù)可能先收到了 9 秒的數(shù)據(jù),之后 7 秒的數(shù)據(jù)才姍姍來遲。這時(shí)如果我們希望插入水位線,來指示當(dāng)前的事件時(shí)間進(jìn)展,又該怎么做呢?
解決思路也很簡單:我們插入新的水位線時(shí),要先判斷一下時(shí)間戳是否比之前的大,否則就不再生成新的水位線,如圖 6-8 所示。也就是說,只有數(shù)據(jù)的時(shí)間戳比當(dāng)前時(shí)鐘大,才能推動(dòng)時(shí)鐘前進(jìn),這時(shí)才插入水位線。
考慮到大量數(shù)據(jù)同時(shí)到來的處理效率,我們同樣可以周期性的生成水位線。這時(shí)只需要保存一下之前所有數(shù)據(jù)中的最大時(shí)間戳,需要插入水位線時(shí),就直接以它作為時(shí)間戳生成新的水位線,如圖 6-9 所示。但是又如何處理遲到的數(shù)據(jù)?其實(shí)也很簡單,那就是等待幾秒,比如上圖當(dāng)收到9秒的數(shù)據(jù),等待2秒也就是7秒,這時(shí)候7秒的數(shù)據(jù)到來,數(shù)據(jù)就到齊了。
下圖是我們使用周期性的方式生成等待2秒的水位線
另外需要注意的是,這里一個(gè)窗口所收集的數(shù)據(jù),并不是之前所有已經(jīng)到達(dá)的數(shù)據(jù)。因?yàn)閿?shù)據(jù)屬于哪個(gè)窗口,是由數(shù)據(jù)本身的時(shí)間戳決定的,一個(gè)窗口只會(huì)收集真正屬于它的那些數(shù)據(jù)。也就是說,上圖中盡管水位線 W(20)之前有時(shí)間戳為 22 的數(shù)據(jù)到來,10~20 秒的窗口中也不會(huì)收集這個(gè)數(shù)據(jù),進(jìn)行計(jì)算依然可以得到正確的結(jié)果。
2.2.3 水位線的特性
- 水位線是插入到數(shù)據(jù)流中的一種標(biāo)記,可以認(rèn)為是一種特殊的數(shù)據(jù)
- 水位線主要內(nèi)容是一個(gè)時(shí)間戳,用來表示當(dāng)前事件時(shí)間的進(jìn)展
- 水位線是基于數(shù)據(jù)的時(shí)間戳生成的
- 水位線的時(shí)間戳必須單調(diào)遞增,以確保任務(wù)的事件時(shí)間時(shí)鐘一直向前推進(jìn)
- 水位線可以通過設(shè)置延遲,來保證正確處理亂序數(shù)據(jù)
- 一個(gè)水位線Watermark(t) 表示在當(dāng)前流中事件時(shí)間已經(jīng)達(dá)到了時(shí)間戳t,這代表t之前的所有數(shù)據(jù)都到齊了,之后流中不會(huì)出現(xiàn)時(shí)間戳t’<= t的數(shù)據(jù)
水位線是Flink流處理中保證結(jié)果正確性的核心機(jī)制,它往往會(huì)跟窗口一起配合,完成對(duì)亂序數(shù)據(jù)的正確處理。
2.3 如何生成水位線
水位線是保證窗口處理結(jié)果的正確性的,如果不能正確處理所有亂序數(shù)據(jù),可以嘗試調(diào)大延遲時(shí)間,但是在實(shí)際應(yīng)用中不是隨便調(diào)整的。
2.3.1 生成水位線的總體原則
只能盡量保證水位線的正確,如果想對(duì)結(jié)果正確性要求很高,那就需要設(shè)置等待時(shí)間長一點(diǎn),但是等待的時(shí)間越長,漏掉數(shù)據(jù)的概率越低。但是這樣做的代價(jià)就是處理的實(shí)時(shí)性降低了,所以我們希望能處理的更快,更實(shí)時(shí),就必須將水位線設(shè)置的低一些。而對(duì)于漏掉的數(shù)據(jù),F(xiàn)link提供了窗口處理遲到數(shù)據(jù)。
Flink中的水位線其實(shí)就是流處理中對(duì)延遲和結(jié)果正確性的一個(gè)權(quán)衡機(jī)制,而且把控制權(quán)交給了程序員,我們可以在代碼中定義水位線的生成策略。
2.3.2 水位線的生成策略(Watermark Strategies)
Flink
的DataStream API
中,提供了用于生成水位線的方法.assignTimestampsAndWatermarks()
,主要是用來為流中的數(shù)據(jù)分配時(shí)間戳,并生成水位線來指示事件時(shí)間,
// 為數(shù)據(jù)流中的元素分配時(shí)間戳并生成水位線以表示事件時(shí)間進(jìn)度。給定的 WatermarkStrategy 用于創(chuàng)建 TimestampAssigner 和 WatermarkGenerator。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
具體使用時(shí)直接用DataSream
調(diào)用該方法即可,與普通transform
方法完全一樣
DataStream<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<String> withTimestampsAndWatermarks = stringDataStreamSource.assignTimestampsAndWatermarks(<watermark strategy>);
.assignTimestampsAndWatermarks()
方法需要傳入一個(gè)WatermarkStrategy
作為參數(shù),這個(gè)參數(shù)就是水位線生成策略
。WatermarkStrategy
中包含了一個(gè)時(shí)間戳分配器(TimestampAssigner)
和一個(gè)水位線生成器(WatermarkGenerator)
。
@Public
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
return new RecordTimestampAssigner<>();
}
}
-
TimestampAssigner
:主要負(fù)責(zé)從流中數(shù)據(jù)元素的某個(gè)字段中提取時(shí)間戳,并分配給元素。時(shí)間戳的分配是生成水位線的基礎(chǔ)。 -
WatermarkGenerator
:主要負(fù)責(zé)按照既定的方式,基于時(shí)間戳生成水位線。在WatermarkGenerator
接口中,主要又有兩個(gè)方法:onEvent()
和onPeriodicEmit()
。 -
onEvent
:每個(gè)事件(數(shù)據(jù))到來都會(huì)調(diào)用的方法,它的參數(shù)有當(dāng)前事件,時(shí)間戳以及允許發(fā)出水位線的一個(gè)WatermarkOutput,可以基于事件做各種操作。 -
onPeriodicEmit
:周期性調(diào)用的方法,可以由WatermarkOutput
發(fā)出水位線。周期時(shí)間為處理時(shí)間,可以調(diào)用環(huán)境配置的.setAutoWatermarkInterval()
方法來設(shè)置,默認(rèn)為200ms
。
@Public
public interface WatermarkGenerator<T> {
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
void onPeriodicEmit(WatermarkOutput output);
}
也可以通過設(shè)置周期時(shí)間env.getConfig().setAutoWatermarkInterval(60 * 1000L);
2.3.3 Flink內(nèi)置水位線生成器
Flink提供了兩種內(nèi)置的水位線生成器(WatermarkGenerator),分別對(duì)應(yīng)著有序流和亂序流的場景。
2.3.3.1 有序流
對(duì)于有序流主要特點(diǎn)就是時(shí)間戳單調(diào)增長,所以永遠(yuǎn)不會(huì)出現(xiàn)遲到的數(shù)據(jù)問題。這是周期性生成水位線的最簡單的場景,直接調(diào)用WatermarkStategy.forMonotonousTimestamps()
方法就可以實(shí)現(xiàn),簡單說就是拿當(dāng)前最大時(shí)間戳作為水位線就可以了。
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
源碼可以看到有序流返回的水位線生成器策略是AscendingTimestampsWatermarks
對(duì)象:示例代碼
:
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 使用forMonotonousTimestamps
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
上面的代碼中我們調(diào)用的.withTimestampAssigner()
方法,將數(shù)據(jù)中的timestamp
字段提取出來作為時(shí)間戳
分配給數(shù)據(jù)元素;然后使用內(nèi)置
的有序流水位線生成器
構(gòu)造出了生成策略,這樣提取出來的數(shù)據(jù)時(shí)間戳
就是我們處理計(jì)算時(shí)間的事件時(shí)間
。
要注意的是這里的時(shí)間戳和水位線的單位必須都是毫秒。
2.3.3.2 亂序流
由于亂序流中需要等待遲到的數(shù)據(jù)到齊,所以必須設(shè)置一個(gè)固定量的延遲時(shí)間(Fixed Amount of Lateness)。這時(shí)生成的水位線的時(shí)間戳就是當(dāng)前數(shù)據(jù)流中最大的時(shí)間戳減去延遲的結(jié)果,相當(dāng)于把表調(diào)慢,當(dāng)前時(shí)鐘會(huì)滯后于數(shù)據(jù)的最大時(shí)間戳。調(diào)用WatermarkStrategy. forBoundedOutOfOrderness()
方法就可以實(shí)現(xiàn)。
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
不過這個(gè)方法需要傳一個(gè)maxOutOfOrderness
參數(shù),表示最大亂序程度
, 它表示數(shù)據(jù)流中亂序數(shù)據(jù)時(shí)間戳的最大差值;如果我們能確定亂序程度,呢么設(shè)置對(duì)應(yīng)時(shí)間長度的延遲就可以等到所有的亂序數(shù)據(jù)了。示例代碼
:
DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
上面的代碼提取了timestamp字段作為時(shí)間戳,并且以5毫秒的延遲時(shí)間創(chuàng)建了處理亂序流的水位線生成器。
查看有序流的水位線生成器和亂序流的水位線生成器的源碼可以發(fā)現(xiàn),其實(shí)有序流的水位線生成器本質(zhì)上和亂序流是一樣的,相當(dāng)于延遲為0秒的亂序流水位線生成器。
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
private long maxTimestamp;
private final long outOfOrdernessMillis;
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
這里還要注意一下,亂序流中生成的水位線真正的時(shí)間戳,其實(shí)是當(dāng)前最大時(shí)間戳 - 延遲時(shí)間 - 1
,這里的單位是毫秒。為什么要減去1毫秒是以為Flink中的窗口都是左開右閉的(8,7]
表示7點(diǎn)到8點(diǎn)的數(shù)據(jù)。
2.3.4 自定義水位線策略
在WatermarkStrategy
中,時(shí)間戳分配器TimestampAssigner
都是大同小異的,指定字段提取時(shí)間戳就可以了;不同的點(diǎn)在于WatermarkGenerator
的實(shí)現(xiàn),整體來說,有兩種生成水位線的方式:一種是周期性的(Periodic)
,另一種是斷點(diǎn)式的(Punctuated)
。
創(chuàng)建自定義水位線離不開WatermarkGenerator
的onEvent()
和 onPeriodicEmit()
,前者是每個(gè)事件到來時(shí)調(diào)用,后者由框架周期性的調(diào)用。周期性調(diào)用的方法中onPeriodicEmit()
發(fā)出水位線就是周期性生成水位線,由事件觸發(fā)的 方法中onEvent()
發(fā)出的水位線就是斷點(diǎn)式生成的水位線。兩種方式的不同就集中體現(xiàn)在這兩個(gè)方法上。
2.3.4.1 周期性水位線生成器(Periodic Generator)
周期性生成器一般是通過onEvent()
觀察判斷輸入的事件,而在onPeriodicEmit()
里發(fā)出水位線。
示例代碼:
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 從集合中讀取數(shù)據(jù)
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Event>() {
private Long delayTime = 5000L; // 延遲時(shí)間
private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 觀察到的最大時(shí)間戳
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 每來一條數(shù)據(jù)就調(diào)用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大時(shí)間戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 發(fā)射水位線,默認(rèn) 200ms 調(diào)用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 調(diào)整周期時(shí)間
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
};
}
});
}
2.3.4.2 斷點(diǎn)式水位線生成器(Punctuated Generator)
斷點(diǎn)式生成器會(huì)不停的檢測onEvent()
事件,當(dāng)發(fā)現(xiàn)帶有水位線信息的特殊事件時(shí),就立即發(fā)出水位線。一般來說,斷點(diǎn)式生成器不會(huì)通過onPeriodicEmit()
發(fā)出水位線。示例代碼
:
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 從集合中讀取數(shù)據(jù)
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Event>() {
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的 itemId 時(shí),才發(fā)出水位線
if (event.getUser().equals("Mary")) {
output.emitWatermark(new Watermark(event.getTimestamp() - 1));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要做任何事情,因?yàn)槲覀冊?onEvent 方法中發(fā)射了水位線
}
};
}
});
}
我們在 onEvent()
中判斷當(dāng)前事件的 user 字段,只有遇到“Mary”這個(gè)特殊的值時(shí),才調(diào)用output.emitWatermark()
發(fā)出水位線。這個(gè)過程是完全依靠事件來觸發(fā)的,所以水位線的生成一定在某個(gè)數(shù)據(jù)到來之后。
2.3.5 在自定義數(shù)據(jù)源中發(fā)送水位線
在自定義的數(shù)據(jù)源中抽取事件時(shí)間,然后發(fā)送水位線。這里要注意的是,在自定義數(shù)據(jù)源中發(fā)送了水位線以后,就不能再在程序中使用assignTimestampsAndWatermarks
方法來生成水位線了。在自定義數(shù)據(jù)源中生成水位線和在程序中使用assignTimestampsAndWatermarks
方法生成水位線二者只能取其一。示例代碼
:
env.addSource(new SourceFunction<Event>() {
private boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
String[] userArr = {"Mary", "Bob", "Alice"};
String[] urlArr = {"./home", "./cart", "./prod?id=1"};
while (running) {
long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒時(shí)間戳
String username = userArr[random.nextInt(userArr.length)];
String url = urlArr[random.nextInt(urlArr.length)];
Event event = new Event(username, url, currTs);
// 使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,并指明數(shù)據(jù)中的時(shí)間戳的字段
ctx.collectWithTimestamp(event, event.timestamp);
// 發(fā)送水位線
ctx.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
this.running = false;
}
});
ctx.collectWithTimestamp(event, event.timestamp);
這里發(fā)出的時(shí)間戳,其實(shí)就是我們經(jīng)常使用的內(nèi)置水位線生成器的方法中的recordTimestamp
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
};
在自定義水位線中生成水位線相比 assignTimestampsAndWatermarks 方法更加靈活,可以任意的產(chǎn)生周期性的、非周期性的水位線,以及水位線的大小也完全由我們自定義。所以非常適合用來編寫 Flink 的測試程序,測試 Flink 的各種各樣的特性。
2.4 水位線的傳遞
在每一個(gè)分區(qū)中都有一個(gè)分區(qū)水位線。
如圖 6-12 所示,當(dāng)前任務(wù)的上游,有四個(gè)并行子任務(wù),所以會(huì)接收到來自四個(gè)分區(qū)的水位線;而下游有三個(gè)并行子任務(wù),所以會(huì)向三個(gè)分區(qū)發(fā)出水位線。具體過程如下:
- 上游并行子任務(wù)發(fā)來不同的水位線,當(dāng)前任務(wù)會(huì)為每一個(gè)分區(qū)設(shè)置一個(gè)“分區(qū)水位線”(Partition Watermark),這是一個(gè)分區(qū)時(shí)鐘;而當(dāng)前任務(wù)自己的時(shí)鐘,就是所有分區(qū)時(shí)鐘里最小的那個(gè)。
- 當(dāng)有一個(gè)新的水位線(第一分區(qū)的 4)從上游傳來時(shí),當(dāng)前任務(wù)會(huì)首先更新對(duì)應(yīng)的分區(qū)時(shí)鐘;然后再次判斷所有分區(qū)時(shí)鐘中的最小值,如果比之前大,說明事件時(shí)間有了進(jìn)展,當(dāng)前任務(wù)的時(shí)鐘也就可以更新了。這里要注意,更新后的任務(wù)時(shí)鐘,并不一定是新來的那個(gè)分區(qū)水位線,比如這里改變的是第一分區(qū)的時(shí)鐘,但最小的分區(qū)時(shí)鐘是第三分區(qū)的 3,于是當(dāng)前任務(wù)時(shí)鐘就推進(jìn)到了 3。當(dāng)時(shí)鐘有進(jìn)展時(shí),當(dāng)前任務(wù)就會(huì)將自己的時(shí)鐘以水位線的形式,廣播給下游所有子任務(wù)。
- 再次收到新的水位線(第二分區(qū)的 7)后,執(zhí)行同樣的處理流程。首先將第二個(gè)分區(qū)時(shí)鐘更新為 7,然后比較所有分區(qū)時(shí)鐘;發(fā)現(xiàn)最小值沒有變化,那么當(dāng)前任務(wù)的時(shí)鐘也不變,也不會(huì)向下游任務(wù)發(fā)出水位線。
- 同樣道理,當(dāng)又一次收到新的水位線(第三分區(qū)的 6)之后,第三個(gè)分區(qū)時(shí)鐘更新為6,同時(shí)所有分區(qū)時(shí)鐘最小值變成了第一分區(qū)的 4,所以當(dāng)前任務(wù)的時(shí)鐘推進(jìn)到 4,并發(fā)出時(shí)間戳為 4 的水位線,廣播到下游各個(gè)分區(qū)任務(wù)。
水位線在上下游任務(wù)之間的傳遞,非常巧妙地避免了分布式系統(tǒng)中沒有統(tǒng)一時(shí)鐘的問題,每個(gè)任務(wù)都以“處理完之前所有數(shù)據(jù)”為標(biāo)準(zhǔn)來確定自己的時(shí)鐘,就可以保證窗口處理的結(jié)果總是正確的。
2.5 水位線的總結(jié)
水位線在事件時(shí)間的世界里面,承擔(dān)了時(shí)鐘的角色。也就是說在事件時(shí)間的流中,水位線是唯一的時(shí)間尺度。如果想要知道現(xiàn)在幾點(diǎn),就要看水位線的大小。后面講到的窗口的閉合,以及定時(shí)器的觸發(fā)都要通過判斷水位線的大小來決定是否觸發(fā)。
水位線是一種特殊的事件,由程序員通過編程插入的數(shù)據(jù)流里面,然后跟隨數(shù)據(jù)流向下游流動(dòng)。水位線的默認(rèn)計(jì)算公式:水位線 = 觀察到的最大事件時(shí)間 – 最大延遲時(shí)間 – 1 毫秒。
所以這里涉及到一個(gè)問題,就是不同的算子看到的水位線的大小可能是不一樣的。因?yàn)橄掠蔚乃阕涌赡懿⑽唇邮盏絹碜陨嫌嗡阕拥乃痪€,導(dǎo)致下游算子的時(shí)鐘要落后于上游算子的時(shí)鐘。比如 map->reduce 這樣的操作,如果在 map 中編寫了非常耗時(shí)間的代碼,將會(huì)阻塞水位線的向下傳播,因?yàn)樗痪€也是數(shù)據(jù)流中的一個(gè)事件,位于水位線前面的數(shù)據(jù)如果沒有處理完畢,那么水位線不可能彎道超車?yán)@過前面的數(shù)據(jù)向下游傳播,也就是說會(huì)被前面的數(shù)據(jù)阻塞。這樣就會(huì)影響到下游算子的聚合計(jì)算,因?yàn)橄掠嗡阕又袩o論由窗口聚合還是定時(shí)器的操作,都需要水位線才能觸發(fā)執(zhí)行。這也就告訴了我們,在編寫 Flink 程序時(shí),一定要謹(jǐn)慎的編寫每一個(gè)算子的計(jì)算邏輯,盡量避免大量計(jì)算或者是大量的 IO 操作,這樣才不會(huì)阻塞水位線的向下傳遞。
在數(shù)據(jù)流開始之前,F(xiàn)link 會(huì)插入一個(gè)大小是負(fù)無窮大(在 Java 中是-Long.MAX_VALUE)的水位線,而在數(shù)據(jù)流結(jié)束時(shí),F(xiàn)link 會(huì)插入一個(gè)正無窮大(Long.MAX_VALUE)的水位線,保證所有的窗口閉合以及所有的定時(shí)器都被觸發(fā)。
對(duì)于離線數(shù)據(jù)集,F(xiàn)link 也會(huì)將其作為流讀入,也就是一條數(shù)據(jù)一條數(shù)據(jù)的讀取。在這種情況下,F(xiàn)link 對(duì)于離線數(shù)據(jù)集,只會(huì)插入兩次水位線,也就是在最開始處插入負(fù)無窮大的水位線,在結(jié)束位置插入一個(gè)正無窮大的水位線。因?yàn)橹恍枰迦雰纱嗡痪€,就可以保證計(jì)算的正確,無需在數(shù)據(jù)流的中間插入水位線了。文章來源:http://www.zghlxwxcb.cn/news/detail-722049.html
水位線的重要性在于它的邏輯時(shí)鐘特性,而邏輯時(shí)鐘這個(gè)概念可以說是分布式系統(tǒng)里面最為重要的概念之一了,理解透徹了對(duì)理解各種分布式系統(tǒng)非常有幫助。文章來源地址http://www.zghlxwxcb.cn/news/detail-722049.html
到了這里,關(guān)于【Flink】Flink 中的時(shí)間和窗口之水位線(Watermark)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!