Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API
1.窗口 (window)概念和分類(lèi)
窗口 window 是處理無(wú)限流的核心就是把無(wú)界的數(shù)據(jù)流,按照一定的規(guī)則劃分成一段一段的有界的數(shù)據(jù)流(桶),然后再這個(gè)有界的數(shù)據(jù)流里面去做計(jì)算;
2.分類(lèi)體系
2.1 滾動(dòng)窗口
相鄰窗口之間是沒(méi)有數(shù)據(jù)重合
- window 大小可以是時(shí)間,可以是數(shù)據(jù)長(zhǎng)度
- 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)
2.2 滑動(dòng)窗口
滑動(dòng)步長(zhǎng)小于窗口大小,數(shù)據(jù)會(huì)有重合;正常情況下是有數(shù)據(jù)重合的
- window 大小可以是時(shí)間,可以是數(shù)據(jù)長(zhǎng)度
- 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)
2.3 會(huì)話窗口
按照時(shí)間分類(lèi)的一個(gè)類(lèi)別,一段時(shí)間沒(méi)有數(shù)據(jù),就重新開(kāi)一個(gè)窗口;
- window 大小可以是時(shí)間;
- 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)
3.窗口計(jì)算API
計(jì)算的API 主要是根據(jù)最后window的類(lèi)型是否是 Keyed ,
3.1 KeyedWindow
上游數(shù)據(jù)按照hash key 后計(jì)算,并行度可控
stream
.keyBy(...) <- keyed versus non-keyed windows //按照上面key 分組
.window(...) <- required: "assigner" //window 類(lèi)別 是 滾動(dòng)/滑動(dòng)/會(huì)話
[.trigger(...)] <- optional: "trigger" (else default trigger) //數(shù)據(jù)進(jìn)入桶觸發(fā)
[.evictor(...)] <- optional: "evictor" (else no evictor) // 桶數(shù)據(jù)擦除策略
[.allowedLateness(...)] <- optional: "lateness" (else zero) //是否允許數(shù)據(jù)遲到
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) //測(cè)流輸出處理遲到數(shù)據(jù),補(bǔ)救策略
.reduce/aggregate/apply() <- required: "function" //處理函數(shù),一般是聚合計(jì)算
[.getSideOutput(...)] <- optional: "output tag" //
//滾動(dòng)窗口 處理時(shí)間語(yǔ)義
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//滑動(dòng)窗口 處理時(shí)間語(yǔ)義
dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));
//滾動(dòng)窗口 事件時(shí)間語(yǔ)義
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
//滑動(dòng)窗口 事件時(shí)間語(yǔ)義
dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));
//計(jì)數(shù)滾動(dòng)窗口
dataStream.countWindowAll(100);
//計(jì)數(shù)滑動(dòng)窗口
dataStream.countWindowAll(100,20);
3.2 NonKeyedWindow
上游數(shù)據(jù)聚合到一起計(jì)算 ,并行度是1
stream
.windowAll(...) <- required: "assigner" //窗口類(lèi)型是全局窗口 ,這個(gè)不一樣 //window 類(lèi)別 是 滾動(dòng)/滑動(dòng)/會(huì)話
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
//滾動(dòng)窗口 處理時(shí)間語(yǔ)義
keyedStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//滑動(dòng)窗口 處理時(shí)間語(yǔ)義
keyedStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));
//滾動(dòng)窗口 事件時(shí)間語(yǔ)義
keyedStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
//滑動(dòng)窗口 事件時(shí)間語(yǔ)義
keyedStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));
//計(jì)數(shù)滾動(dòng)窗口
keyedStream.countWindow(100);
//計(jì)數(shù)滑動(dòng)窗口
keyedStream.countWindow(100,20);
3.3窗口聚合算子
3.3.1 增量窗口聚合算子
數(shù)據(jù)來(lái)一條出庫(kù)一條,比如累加, min,max,minBy,maxBy,sum,reduce,aggregate
min,max,minBy,maxBy,sum
KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);
SingleOutputStreamOperator<EventLog> serviceTime = keyedStream
.countWindow(10,2) // 創(chuàng)建滑動(dòng) count 窗口
.min("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是窗口的第一條數(shù)據(jù)
.max("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是窗口的第一條數(shù)據(jù)
.minBy("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是 最小 serviceTime 所在哪一行
.maxBy("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是 最大 serviceTime 所在哪一行
.sum("serviceTime") ; // serviceTime 是符合邏輯的(serviceTime 字段之和),其他字段是窗口的第一條數(shù)據(jù)
reduce
reduce 函數(shù)的返回值寫(xiě)死了,和數(shù)據(jù)流中一樣
keyedStream.countWindow(10,2).reduce(new ReduceFunction<EventLog>() {
@Override
public EventLog reduce(EventLog value1, EventLog value2) throws Exception {
//
return null;
}
})
aggregate
aggregate 函數(shù)的返回值可以自定義: 詳細(xì)見(jiàn)下面 3.4.1
keyedStream.countWindow(10,2).aggregate(new AggregateFunction<EventLog, Object, Object>() {
......
})
3.3.2 全量聚合算子
process
詳細(xì)見(jiàn)下面 3.4.2
相比于apply 更加的強(qiáng)大,可以拿到上下文
apply
keyedStream.countWindow(10,2).apply(new WindowFunction<EventLog, Object, Long, GlobalWindow>() {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key key
* @param window 窗口,可以拿到開(kāi)始和結(jié)束
* @param input The elements in the window being evaluated. //窗口所有數(shù)據(jù)
* @param out //收集器
*/
@Override
public void apply(Long aLong, GlobalWindow window, Iterable<EventLog> input, Collector<Object> out) throws Exception {
}
})
3.3.3 示例
演示 KeyedWindow 使用的較多,和NonKeyedWindow 區(qū)別也不大;
原始需求 :
計(jì)算時(shí)間窗口內(nèi) EventLog 中根據(jù)guid 分組的個(gè)數(shù)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventLog {
private Long guid; //用戶id
private String event;//用戶事件
private String timeStamp; //事件發(fā)生時(shí)間
private Integer serviceTime; //接口時(shí)間
}
自定義source 生成數(shù)據(jù)使用
public class CustomEventLogSourceFunction implements SourceFunction<EventLog> {
volatile boolean runFlag = true;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
@Override
public void run(SourceContext<EventLog> sourceContext) throws Exception {
Random random = new Random();
while (runFlag) {
EventLog eventLog = new EventLog(Long.valueOf(random.nextInt(5)),"xw"+random.nextInt(3),simpleDateFormat.format(new Date()));
Thread.sleep(random.nextInt(5)*1000);
sourceContext.collect(eventLog);
}
}
@Override
public void cancel() {
runFlag = false;
}
}
aggregate
聚合:窗口中**,拿到每一條數(shù)據(jù)**,去做聚合計(jì)算
public class _03_Window {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 獲取環(huán)境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1); //并行度設(shè)置為1 好觀察
//DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
DataStreamSource<EventLog> dataStream = env.addSource(new CustomEventLogSourceFunction());
// 創(chuàng)建 Watermark 策略 事件時(shí)間推進(jìn)策略
WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
.<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
@Override
public long extractTimestamp(EventLog element, long recordTimestamp) {
try {
return sdf.parse(element.getTimeStamp()).getTime();
} catch (Exception e) {
return 0;
}
}
});
// 分配wm , 使用事件時(shí)間
DataStream<EventLog> streamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy).disableChaining();
// 需求 10 s 統(tǒng)計(jì)30s 數(shù)據(jù) 用戶行為個(gè)數(shù)
streamOperator.keyBy(EventLog::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 創(chuàng)建滑動(dòng)窗口
// .apply() // 全部窗口數(shù)據(jù)給你,自定義全窗口運(yùn)算邏輯,返回計(jì)算結(jié)果,靈活度更大
//.process() //全部窗口數(shù)據(jù)給你,自定義全窗口運(yùn)算邏輯,返回計(jì)算結(jié)果,靈活度更大 可以獲取上下文,數(shù)據(jù)更多
.aggregate(new AggregateFunction<EventLog, Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { // api 聚合
// 初始化累加器
@Override
public Tuple2<Long,Integer> createAccumulator() {
return Tuple2.of(null,0);
}
// 累加邏輯
@Override
public Tuple2<Long,Integer> add(EventLog value, Tuple2<Long,Integer> accumulator) {
return Tuple2.of(value.getGuid(),accumulator.f1+1);
}
// 獲取結(jié)果
@Override
public Tuple2<Long,Integer> getResult(Tuple2<Long,Integer> accumulator) {
return accumulator;
}
// 分區(qū)合并 方法
@Override
public Tuple2<Long,Integer> merge(Tuple2<Long,Integer> a, Tuple2<Long,Integer> b) {
return Tuple2.of(a.f0,a.f1+b.f1);
}
}).print();
env.execute();
}
}
process
聚合:窗口結(jié)束后,拿到窗口中每一條數(shù)據(jù),也可以拿到上下文,去做自定義的邏輯處理數(shù)據(jù),比 aggregate 更加的自由
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>() {
@Override
public void process(Long aLong,
ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>.Context context,
Iterable<EventLog> elements, Collector<Tuple2<Long, Integer>> out) throws Exception {
TimeWindow window = context.window();
String start = sdf.format(new Date(window.getStart()));
String end = sdf.format(new Date(window.getEnd()));
Iterator<EventLog> iterator = elements.iterator();
int count = 0;
while (iterator.hasNext()) {
iterator.next();
count++;
}
System.out.println("start:" + start + ";end:" + end + ";count:" + count);
out.collect(Tuple2.of(aLong, count));
}
}).print();
3.4 延遲數(shù)據(jù)處理
3.4.1 allowdLateness
允許數(shù)據(jù)遲到 3.5 的圖可以參考下
public class _05_Window_allowedLateness {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 獲取環(huán)境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1);
DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
// 字符串流==> 對(duì)象流
SingleOutputStreamOperator<EventLog> outputStreamOperator = dataStreamSource.map(new MapFunction<String, EventLog>() {
@Override
public EventLog map(String value) throws Exception {
String[] split = value.split(",");
return new EventLog(Long.valueOf(split[0]), split[1], split[2]);
}
});
// 創(chuàng)建 Watermark 策略 事件時(shí)間推進(jìn)策略
WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
.<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
@Override
public long extractTimestamp(EventLog element, long recordTimestamp) {
return Long.valueOf(element.getTimeStamp());
}
});
// 分配wm , 使用事件時(shí)間
DataStream<EventLog> streamOperator = outputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy)
.disableChaining();
OutputTag<EventLog> latedata = new OutputTag<EventLog>("latedata", TypeInformation.of(EventLog.class));
KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);
SingleOutputStreamOperator<EventLog> guid = keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.allowedLateness(Time.seconds(3)) //允許遲到5s
.sideOutputLateData(latedata) //超過(guò)允許遲到的值后 ,輸出到測(cè)流
.apply(new WindowFunction<EventLog, EventLog, Long, TimeWindow>() {
long count = 0;
@Override
public void apply(Long aLong, TimeWindow window, Iterable<EventLog> input, Collector<EventLog> out) throws Exception {
for (EventLog eventLog : input) {
count+=eventLog.getGuid();
}
EventLog eventLog = new EventLog();
eventLog.setGuid(count);
out.collect(eventLog);
String start = sdf.format(new Date(window.getStart()));
String end = sdf.format(new Date(window.getEnd()));
System.out.println("==> start:" + start + ";end:" + end + ";count:" + count);
}
});
guid.print("主流輸出");
DataStream<EventLog> sideOutput = guid.getSideOutput(latedata);
sideOutput.print("測(cè)流輸出");
env.execute();
}
}
3.4.2 window時(shí)間&watermark
3.5 窗口的觸發(fā)機(jī)制
3.5.1 trigger 窗口計(jì)算觸發(fā)
窗口計(jì)算的觸發(fā),是trigger 類(lèi)決定,不同的 WindowAssigner 對(duì)應(yīng)不同的trigger
Trigger (org.apache.flink.streaming.api.windowing.triggers)
ProcessingTimeoutTrigger (org.apache.flink.streaming.api.windowing.triggers)
EventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
CountTrigger (org.apache.flink.streaming.api.windowing.triggers)
DeltaTrigger (org.apache.flink.streaming.api.windowing.triggers)
NeverTrigger in GlobalWindows (org.apache.flink.streaming.api.windowing.assigners)
ContinuousEventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
PurgingTrigger (org.apache.flink.streaming.api.windowing.triggers)
ContinuousProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
ProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
EventTimeTrigger
事件時(shí)間觸發(fā)器
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
/**
* 數(shù)據(jù)來(lái)的時(shí)候觸發(fā)
*/
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
//窗口的最大時(shí)間小于當(dāng)前的Watermark() 開(kāi)啟新窗口
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else { //否則繼續(xù)
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
//當(dāng)事件時(shí)間推進(jìn)的時(shí)候 也判斷是否開(kāi)啟新窗口
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
//該類(lèi)是處理事件時(shí)間,不處理ProcessingTime
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
CountTrigger
數(shù)據(jù)窗口觸發(fā)器
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}
// 數(shù)量超過(guò)最大 觸發(fā)
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
// 不做 onEventTime 的邏輯潘墩
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
// 不做 onProcessingTime 的邏輯潘墩
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
結(jié)合上面的例子 寫(xiě)一個(gè)自定義觸發(fā)器,當(dāng)數(shù)據(jù)流中出現(xiàn)特定值,立刻觸發(fā)窗口的計(jì)算,而不需要等到窗口結(jié)束
class IEventTimeTrigger extends Trigger<EventLog, TimeWindow> {
private static final long serialVersionUID = 1L;
/**
* watermark 是不是大于窗口結(jié)束點(diǎn) 觸發(fā)新的窗口
*/
@Override
public TriggerResult onElement(
EventLog element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
if(element.getGuid().equals(1111111)){ //如果數(shù)據(jù)的GUID 是 1111111 提前觸發(fā)窗口計(jì)算0
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
}
3.5.2 evictor窗口計(jì)算移出數(shù)據(jù)
窗口觸發(fā)前/后 數(shù)據(jù)的移除機(jī)制
Evictor (org.apache.flink.streaming.api.windowing.evictors)
CountEvictor (org.apache.flink.streaming.api.windowing.evictors) //數(shù)量窗口清除
DeltaEvictor (org.apache.flink.streaming.api.windowing.evictors) //滑動(dòng)窗口清除
TimeEvictor (org.apache.flink.streaming.api.windowing.evictors) //時(shí)間窗口清除
public interface Evictor<T, W extends Window> extends Serializable {
//計(jì)算之前
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
//計(jì)算之后調(diào)用
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
...............
}
重寫(xiě)一個(gè)ITimeEvictor
相對(duì)于 TimeEvictor
的改動(dòng)
class ITimeEvictor<W extends Window> implements Evictor<EventLog, W> {
......................
private void evict(Iterable<TimestampedValue<EventLog>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<EventLog>> iterator = elements.iterator(); iterator.hasNext();) {
TimestampedValue<EventLog> record = iterator.next();
// if (record.getTimestamp() <= evictCutoff)
// 添加條件 ,窗口計(jì)算移除 Guid().equals(222222)
if (record.getTimestamp() <= evictCutoff || record.getValue().getGuid().equals(222222)) {
iterator.remove();
}
}
}
.................................
}
3.5.3 調(diào)用示例
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.trigger(new IEventTimeTrigger()) // 如果數(shù)據(jù)的GUID 是 1111111 提前觸發(fā)窗口計(jì)算
.evictor(ITimeEvictor.of(Time.seconds(30), false)) // 計(jì)算前移除GUID 是 222222的數(shù)據(jù)
.apply(.....)
3.5.4 調(diào)用時(shí)機(jī)
-
數(shù)據(jù)到達(dá)窗口后,調(diào)用 Trigger 的 onElement() 方法
-
根據(jù)Trigger 的 onElement() 方法 返回值 判斷是否要觸發(fā)窗口計(jì)算文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-493711.html
-
若觸發(fā)窗口計(jì)算,在計(jì)算前調(diào)用Evictor(after/before) 來(lái)移除某些數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-493711.html
到了這里,關(guān)于Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!