Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時(shí)以三個(gè)示例來展示watermark的實(shí)際可能的應(yīng)用場景。
本文依賴kafka環(huán)境可用。
本文分為四個(gè)部分,即EventTime與watermark介紹、watermark基本使用、kafka作為數(shù)據(jù)源的watermark使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)。
一、事件時(shí)間與watermark
1、EventTime介紹
處理時(shí)間是指執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間。
事件時(shí)間是每個(gè)事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間。
下圖形象的展示了event time 和 processing time的所處階段。一般將Flink data source下的箭頭表示為到達(dá)Flink的時(shí)間,即injestion time攝入時(shí)間。
2、Eventtime與watermark
支持事件時(shí)間的流處理器需要一種方法來測量事件時(shí)間的進(jìn)度。例如,當(dāng)事件時(shí)間超過一小時(shí)結(jié)束時(shí),需要通知構(gòu)建每小時(shí)窗口的窗口運(yùn)算符,以便操作員可以關(guān)閉正在進(jìn)行的窗口。
事件時(shí)間可以獨(dú)立于處理時(shí)間進(jìn)行。例如,在一個(gè)程序中,操作員的當(dāng)前事件時(shí)間可能略微落后于處理時(shí)間(考慮接收事件的延遲),而兩者以相同的速度進(jìn)行。另一方面,另一個(gè)流程序可能會(huì)通過快進(jìn)已經(jīng)在 Kafka 主題(或其他消息隊(duì)列)中緩沖的一些歷史數(shù)據(jù),在幾秒鐘的處理中完成數(shù)周的事件時(shí)間。
Flink 中衡量事件時(shí)間進(jìn)度的機(jī)制是水印。水印作為數(shù)據(jù)流的一部分流動(dòng),并帶有時(shí)間戳 t。水?。╰) 聲明事件時(shí)間已到達(dá)該流中的時(shí)間 t,這意味著流中不應(yīng)再有時(shí)間戳為 t’ <= t 的元素(即時(shí)間戳早于或等于水印的事件)。
下圖顯示了具有(邏輯)時(shí)間戳和內(nèi)聯(lián)流動(dòng)水印的事件流。在此示例中,事件是按順序排列的(相對于其時(shí)間戳),這意味著水印只是流中的周期性標(biāo)記。
水印對于無序流至關(guān)重要,如下圖所示,事件不按其時(shí)間戳排序。通常,水印是一種聲明,即到流中的該點(diǎn),直到某個(gè)時(shí)間戳的所有事件都應(yīng)該已到達(dá)。水印到達(dá)運(yùn)算符后,運(yùn)算符可以將其內(nèi)部事件時(shí)鐘提前到水印的值。
3、Watermarks in Parallel Streams
水印在source Function處或緊隨source Function之后生成。source Function的每個(gè)并行子任務(wù)通常獨(dú)立生成其水印。這些水印定義該特定并行源的事件時(shí)間。
當(dāng)水印流經(jīng)流媒體程序時(shí),當(dāng)水印到達(dá)的時(shí)候會(huì)觸發(fā)事件時(shí)間的計(jì)算(they advance the event time at the operators where they arrive)。每當(dāng)operator觸發(fā)(advance)其事件時(shí)間時(shí),它都會(huì)為其后續(xù)運(yùn)算符在下游生成新的水印。
一些運(yùn)算符使用多個(gè)輸入流;例如,union,或 keyBy(…) 或 partition(…) 函數(shù)的運(yùn)算符。此類運(yùn)算符的當(dāng)前事件時(shí)間是其輸入流的事件時(shí)間的最小值。當(dāng)其輸入流更新其事件時(shí)間時(shí),運(yùn)算符也會(huì)更新。
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
下圖顯示了流經(jīng)并行流的事件和水印以及跟蹤事件時(shí)間的運(yùn)算符的示例。
4、Lateness延遲
某些元素可能會(huì)違反水印條件,這意味著即使在 Watermark(t) 發(fā)生之后,也會(huì)出現(xiàn)更多時(shí)間戳為 t’ <= t 的元素。事實(shí)上,在許多現(xiàn)實(shí)世界的設(shè)置中,某些元素可以任意延遲,因此無法指定某個(gè)事件時(shí)間戳的所有元素發(fā)生的時(shí)間。此外,即使延遲是有限制的,通常也不希望將水印延遲太多,因?yàn)檫@會(huì)導(dǎo)致事件時(shí)間窗口的評估延遲太多。
出于這個(gè)原因,流應(yīng)用可能會(huì)明確地期望一些后期元素。延遲元素是在系統(tǒng)的事件時(shí)鐘(由水印指示)已經(jīng)過了延遲元素時(shí)間戳的時(shí)間之后到達(dá)的元素。有關(guān)如何在事件時(shí)間窗口中使用延遲元素的搜集(一般用OutputTag)與使用視情況而定。代碼示例如下:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
5、watermark介紹
watermark就是給數(shù)據(jù)再額外的加的一個(gè)時(shí)間列,watermark是個(gè)時(shí)間戳。
watermark= 數(shù)據(jù)的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
watermark= 當(dāng)前窗口的最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
這樣可以保證watermark水位線會(huì)一直上升(變大),不會(huì)下降
- 窗口計(jì)算的觸發(fā)條件為
1、窗口中有數(shù)據(jù)
2、watermark>= 窗口的結(jié)束時(shí)間
6、Watermark 策略簡介
使用 Flink API 時(shí)需要設(shè)置一個(gè)同時(shí)包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構(gòu)建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根據(jù)策略實(shí)例化一個(gè)可分配時(shí)間戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根據(jù)策略實(shí)例化一個(gè) watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
通常情況下,不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。例如,想要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個(gè) lambda 表達(dá)式作為時(shí)間戳分配器,那么可以按照如下方式實(shí)現(xiàn):
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
//其中 TimestampAssigner 的設(shè)置與否是可選的,大多數(shù)情況下,可以不用去特別指定。例如,當(dāng)使用 Kafka 或 Kinesis 數(shù)據(jù)源時(shí),你可以直接從 Kafka/Kinesis 數(shù)據(jù)源記錄中獲取到時(shí)間戳。
7、使用 Watermark 策略
WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用
- 第一種是直接在數(shù)據(jù)源上使用,相比第二種會(huì)更好。因?yàn)閿?shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通??梢愿珳?zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。直接在源上指定 WatermarkStrategy 意味著必須使用特定數(shù)據(jù)源接口,參考下文的kafka部分,以及有關(guān)每個(gè)分區(qū)的 watermark 是如何生成以及工作的。
- 第二種是直接在非數(shù)據(jù)源的操作之后使用,僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略時(shí),才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
使用 WatermarkStrategy 去獲取流并生成帶有時(shí)間戳的元素和 watermark 的新流時(shí),如果原始流已經(jīng)具有時(shí)間戳或 watermark,則新指定的時(shí)間戳分配器將覆蓋原有的時(shí)間戳和 watermark。
8、處理空閑數(shù)據(jù)源
如果數(shù)據(jù)源中的某一個(gè)分區(qū)/分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。
稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問題。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化。
為了解決這個(gè)問題,可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。WatermarkStrategy 為此提供了一個(gè)工具接口:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
9、自定義 WatermarkGenerator
可以針對每個(gè)事件去生成 watermark。但是由于每個(gè) watermark 都會(huì)在下游做一些計(jì)算,因此過多的 watermark 會(huì)降低程序性能。
TimestampAssigner 是一個(gè)可以從事件數(shù)據(jù)中提取時(shí)間戳字段的簡單函數(shù),但是 WatermarkGenerator 的編寫相對就要復(fù)雜一些了,將在接下來的兩小節(jié)中介紹如何實(shí)現(xiàn)此接口。WatermarkGenerator 接口代碼如下:
/**
* {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
*
* <p><b>注意:</b> WatermarkGenerator 將以前互相獨(dú)立的 {@code AssignerWithPunctuatedWatermarks}
* 和 {@code AssignerWithPeriodicWatermarks} 一同包含了進(jìn)來。
*/
@Public
public interface WatermarkGenerator<T> {
/**
* 每來一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時(shí)間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* 周期性的調(diào)用,也許會(huì)生成新的 watermark,也許不會(huì)。
*
* <p>調(diào)用此方法生成 watermark 的間隔時(shí)間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
*/
void onPeriodicEmit(WatermarkOutput output);
}
watermark 的生成方式本質(zhì)上是有兩種:周期性生成和標(biāo)記生成。
- 周期性生成器通常通過 onEvent() 觀察傳入的事件數(shù)據(jù),然后在框架調(diào)用 onPeriodicEmit() 時(shí)發(fā)出 watermark。
- 標(biāo)記生成器將查看 onEvent() 中的事件數(shù)據(jù),并等待檢查在流中攜帶 watermark 的特殊標(biāo)記事件或打點(diǎn)數(shù)據(jù)。當(dāng)獲取到這些事件數(shù)據(jù)時(shí),它將立即發(fā)出 watermark。通常情況下,標(biāo)記生成器不會(huì)通過 onPeriodicEmit() 發(fā)出 watermark。
1)、自定義周期性 Watermark 生成器
周期性生成器會(huì)觀察流事件數(shù)據(jù)并定期生成 watermark(其生成可能取決于流數(shù)據(jù),或者完全基于處理時(shí)間)。
生成 watermark 的時(shí)間間隔(每 n 毫秒)可以通過 ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都會(huì)調(diào)用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一個(gè) watermark,則將發(fā)出新的 watermark。
如下是兩個(gè)使用周期性 watermark 生成器的簡單示例。
Flink 已經(jīng)附帶了 BoundedOutOfOrdernessWatermarks,它實(shí)現(xiàn)了 WatermarkGenerator,其工作原理與下面的 BoundedOutOfOrdernessGenerator 相似。可以在這里參閱如何使用它的內(nèi)容。
/**
* 該 watermark 生成器可以覆蓋的場景是:數(shù)據(jù)源在一定程度上亂序。
* 即某個(gè)最新到達(dá)的時(shí)間戳為 t 的元素將在最早到達(dá)的時(shí)間戳為 t 的元素之后最多 n 毫秒到達(dá)。
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 秒
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 發(fā)出的 watermark = 當(dāng)前最大時(shí)間戳 - 最大亂序時(shí)間
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* 該生成器生成的 watermark 滯后于處理時(shí)間固定量。它假定元素會(huì)在有限延遲后到達(dá) Flink。
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 秒
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// 處理時(shí)間場景下不需要實(shí)現(xiàn)
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
2)、自定義標(biāo)記 Watermark 生成器
標(biāo)記 watermark 生成器觀察流事件數(shù)據(jù)并在獲取到帶有 watermark 信息的特殊事件元素時(shí)發(fā)出 watermark。
如下是實(shí)現(xiàn)標(biāo)記生成器的方法,當(dāng)事件帶有某個(gè)指定標(biāo)記時(shí),該生成器就會(huì)發(fā)出 watermark:
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// onEvent 中已經(jīng)實(shí)現(xiàn)
}
}
10、Watermark 策略與 Kafka 連接器
當(dāng)使用 Apache Kafka 連接器作為數(shù)據(jù)源時(shí),每個(gè) Kafka 分區(qū)可能有一個(gè)簡單的事件時(shí)間模式(遞增的時(shí)間戳或有界無序)。然而,當(dāng)使用 Kafka 數(shù)據(jù)源時(shí),多個(gè)分區(qū)常常并行使用,因此交錯(cuò)來自各個(gè)分區(qū)的事件數(shù)據(jù)就會(huì)破壞每個(gè)分區(qū)的事件時(shí)間模式(這是 Kafka 消費(fèi)客戶端所固有的)。
在這種情況下,你可以使用 Flink 中可識(shí)別 Kafka 分區(qū)的 watermark 生成機(jī)制。使用此特性,將在 Kafka 消費(fèi)端內(nèi)部針對每個(gè) Kafka 分區(qū)生成 watermark,并且不同分區(qū) watermark 的合并方式與在數(shù)據(jù)流 shuffle 時(shí)的合并方式相同。
例如,如果每個(gè) Kafka 分區(qū)中的事件時(shí)間戳嚴(yán)格遞增,則使用時(shí)間戳單調(diào)遞增按分區(qū)生成的 watermark 將生成完美的全局 watermark。注意,在示例中未使用 TimestampAssigner,而是使用了 Kafka 記錄自身的時(shí)間戳。
下圖展示了如何使用單 kafka 分區(qū) watermark 生成機(jī)制,以及在這種情況下 watermark 如何通過 dataflow 傳播。
//kafka數(shù)據(jù)源示例,沒有使用withTimestampAssigner
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<MyType> stream = env.addSource(kafkaSource);
//非kafka數(shù)據(jù)源示例,使用了withTimestampAssigner
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
11、算子處理 Watermark 的方式
一般情況下,在將 watermark 轉(zhuǎn)發(fā)到下游之前,需要算子對其進(jìn)行觸發(fā)的事件完全進(jìn)行處理。例如,WindowOperator 將首先計(jì)算該 watermark 觸發(fā)的所有窗口數(shù)據(jù),當(dāng)且僅當(dāng)由此 watermark 觸發(fā)計(jì)算進(jìn)而生成的所有數(shù)據(jù)被轉(zhuǎn)發(fā)到下游之后,其才會(huì)被發(fā)送到下游。換句話說,由于此 watermark 的出現(xiàn)而產(chǎn)生的所有數(shù)據(jù)元素都將在此 watermark 之前發(fā)出。
相同的規(guī)則也適用于 TwoInputStreamOperator。但是,在這種情況下,算子當(dāng)前的 watermark 會(huì)取其兩個(gè)輸入的最小值。
二、示例1:每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)
每5s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時(shí)間
1、實(shí)現(xiàn)
- bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
private Long enterTime;
}
- 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
* 每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)
*/
public class WatermarkDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
private boolean flag = true;
@Override
public void run(SourceContext<Subway> ctx) throws Exception {
Random random = new Random();
while (flag) {
String sNo = "No"+random.nextInt(3);
int userCount = random.nextInt(100);
// 模擬延遲數(shù)據(jù)
long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
Subway subway = new Subway(sNo, userCount, eventTime);
System.err.println(subway + " ,格式化后時(shí)間 " + df.format(subway.getEnterTime()));
ctx.collect(subway);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
// transformation
// 設(shè)置watermark = 當(dāng)前最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時(shí)間
.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件時(shí)間列
// 計(jì)算窗口
SingleOutputStreamOperator<Subway> result = subwayWithWatermark
.keyBy(Subway::getSNo)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
2、驗(yàn)證
輸出結(jié)果,按照預(yù)期計(jì)算出了結(jié)果
Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后時(shí)間 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后時(shí)間 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后時(shí)間 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后時(shí)間 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后時(shí)間 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后時(shí)間 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后時(shí)間 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后時(shí)間 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后時(shí)間 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后時(shí)間 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后時(shí)間 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后時(shí)間 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后時(shí)間 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。
三、示例2:kafka數(shù)據(jù)源,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)
注意該示例中,發(fā)送數(shù)據(jù)時(shí)不需要eventtime,flink會(huì)以kafka發(fā)送數(shù)據(jù)的時(shí)間戳為eventtime,也不需要withTimestampAssigner指定eventtime,詳見實(shí)現(xiàn)源碼。
每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù)
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)
1、maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
2、實(shí)現(xiàn)
- bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
}
- 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
* @author alanchan
*
*/
public class KafkaWatermarkDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// 準(zhǔn)備kafka連接參數(shù)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "server1:9092");
props.setProperty("group.id", "flink");
props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.partition-discovery.interval-millis", "5000");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
// 使用連接參數(shù)創(chuàng)建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
// 使用kafkaSource
DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// transformation
// 設(shè)置watermark = 當(dāng)前最大的事件時(shí)間 - 最大允許的延遲時(shí)間或亂序時(shí)間
SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允許的延遲時(shí)間
);
// 計(jì)算窗口
SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
3、驗(yàn)證
1)、驗(yàn)證步驟
1、啟動(dòng)kafka,創(chuàng)建topic
2、啟動(dòng)應(yīng)用程序
3、在kafka命令行中輸入符合格式要求的數(shù)據(jù)
4、觀察應(yīng)用程序的控制臺(tái)輸出
2)、驗(yàn)證
啟動(dòng)kafka、創(chuàng)建topic、啟動(dòng)應(yīng)用程序不再贅述,如果不清楚的參考本人kafka專欄。
1、在kafka命令控制臺(tái)輸入數(shù)據(jù)
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6
2、觀察應(yīng)用程序中的控制臺(tái)輸出
7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)
四、示例3:處理延遲數(shù)據(jù)超過能接受的時(shí)間,每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù)
每10s統(tǒng)計(jì)一次地鐵進(jìn)站每個(gè)入口人數(shù),最多接受延遲3s的數(shù)據(jù),超過可接受范圍則另外接收。
一般而言,針對延遲超過計(jì)算窗口的數(shù)據(jù)處理方式不同,視具體的情況而定。
本示例僅僅是打印出來。
數(shù)據(jù)結(jié)構(gòu):進(jìn)站口、人數(shù)和進(jìn)入時(shí)間
1、實(shí)現(xiàn)
- bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
private Long enterTime;
}
- 實(shí)現(xiàn)
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
*
*/
public class WatermarkLatenessDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
private boolean flag = true;
@Override
public void run(SourceContext<Subway> ctx) throws Exception {
Random random = new Random();
while (flag) {
String sNo = "No" + random.nextInt(3);
int userCount = random.nextInt(100);
// 模擬延遲數(shù)據(jù)
long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
Subway subway = new Subway(sNo, userCount, eventTime);
System.err.println(subway + " ,格式化后時(shí)間 " + df.format(subway.getEnterTime()));
ctx.collect(subway);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
// transformation
// 設(shè)置最大允許延遲時(shí)間3s
SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件時(shí)間列
);
// 接收延遲超過允許范圍內(nèi)的數(shù)據(jù),供其他方式處理
OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));
SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");
DataStream<Subway> result2 = result1.getSideOutput(latenessData);
// sink
result1.print("延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù)");
result2.print("延遲不在計(jì)算窗口內(nèi)數(shù)據(jù)");
// execute
env.execute();
}
}
2、驗(yàn)證
驗(yàn)證比較簡單,就是由系統(tǒng)自己生成數(shù)據(jù),然后觀察應(yīng)用程序的控制臺(tái)的輸出是否與預(yù)期一致,經(jīng)驗(yàn)證,本示例與預(yù)期一致。文章來源:http://www.zghlxwxcb.cn/news/detail-614106.html
Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后時(shí)間 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后時(shí)間 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后時(shí)間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后時(shí)間 17:06:07
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后時(shí)間 17:05:59
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后時(shí)間 17:06:00
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后時(shí)間 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后時(shí)間 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后時(shí)間 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后時(shí)間 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后時(shí)間 17:06:07
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后時(shí)間 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后時(shí)間 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后時(shí)間 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后時(shí)間 17:06:23
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后時(shí)間 17:06:19
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后時(shí)間 17:06:14
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后時(shí)間 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后時(shí)間 17:06:15
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后時(shí)間 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后時(shí)間 17:06:18
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后時(shí)間 17:06:34
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后時(shí)間 17:06:24
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后時(shí)間 17:06:28
延遲(含正常)在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后時(shí)間 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后時(shí)間 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后時(shí)間 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后時(shí)間 17:06:26
延遲不在計(jì)算窗口內(nèi)數(shù)據(jù):10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后時(shí)間 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后時(shí)間 17:06:31
以上,詳細(xì)介紹了eventtime和watermark,包括watermark的Flink自帶的api實(shí)現(xiàn)與自定義的實(shí)現(xiàn),同時(shí)以三個(gè)示例來展示watermark的實(shí)際可能的應(yīng)用場景。文章來源地址http://www.zghlxwxcb.cn/news/detail-614106.html
到了這里,關(guān)于Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!