国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

這篇具有很好參考價(jià)值的文章主要介紹了Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

1.窗口 (window)概念和分類(lèi)

窗口 window 是處理無(wú)限流的核心就是把無(wú)界的數(shù)據(jù)流,按照一定的規(guī)則劃分成一段一段的有界的數(shù)據(jù)流(桶),然后再這個(gè)有界的數(shù)據(jù)流里面去做計(jì)算;

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

2.分類(lèi)體系

2.1 滾動(dòng)窗口

相鄰窗口之間是沒(méi)有數(shù)據(jù)重合

  • window 大小可以是時(shí)間,可以是數(shù)據(jù)長(zhǎng)度
  • 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

2.2 滑動(dòng)窗口

滑動(dòng)步長(zhǎng)小于窗口大小,數(shù)據(jù)會(huì)有重合;正常情況下是有數(shù)據(jù)重合的

  • window 大小可以是時(shí)間,可以是數(shù)據(jù)長(zhǎng)度
  • 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

2.3 會(huì)話窗口

按照時(shí)間分類(lèi)的一個(gè)類(lèi)別,一段時(shí)間沒(méi)有數(shù)據(jù),就重新開(kāi)一個(gè)窗口;

  • window 大小可以是時(shí)間;
  • 按照數(shù)據(jù)流是否可以是 keyed , 在分類(lèi),nonkey window 也叫做global window (并行度為1)

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

3.窗口計(jì)算API

計(jì)算的API 主要是根據(jù)最后window的類(lèi)型是否是 Keyed ,

3.1 KeyedWindow

上游數(shù)據(jù)按照hash key 后計(jì)算,并行度可控

stream
       .keyBy(...)               <-  keyed versus non-keyed windows      //按照上面key 分組
       .window(...)              <-  required: "assigner" //window 類(lèi)別 是 滾動(dòng)/滑動(dòng)/會(huì)話
      [.trigger(...)]            <-  optional: "trigger" (else default trigger) //數(shù)據(jù)進(jìn)入桶觸發(fā)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)  // 桶數(shù)據(jù)擦除策略
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)  //是否允許數(shù)據(jù)遲到
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data) //測(cè)流輸出處理遲到數(shù)據(jù),補(bǔ)救策略
       .reduce/aggregate/apply()      <-  required: "function" //處理函數(shù),一般是聚合計(jì)算
      [.getSideOutput(...)]      <-  optional: "output tag" //
        //滾動(dòng)窗口  處理時(shí)間語(yǔ)義
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        //滑動(dòng)窗口  處理時(shí)間語(yǔ)義
        dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //滾動(dòng)窗口  事件時(shí)間語(yǔ)義
        dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        //滑動(dòng)窗口  事件時(shí)間語(yǔ)義
        dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));
        
        //計(jì)數(shù)滾動(dòng)窗口
        dataStream.countWindowAll(100);
        //計(jì)數(shù)滑動(dòng)窗口
        dataStream.countWindowAll(100,20);

3.2 NonKeyedWindow

上游數(shù)據(jù)聚合到一起計(jì)算 ,并行度是1

stream 
       .windowAll(...)           <-  required: "assigner"  //窗口類(lèi)型是全局窗口 ,這個(gè)不一樣  //window 類(lèi)別 是 滾動(dòng)/滑動(dòng)/會(huì)話
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
        //滾動(dòng)窗口  處理時(shí)間語(yǔ)義
        keyedStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        //滑動(dòng)窗口  處理時(shí)間語(yǔ)義
        keyedStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //滾動(dòng)窗口  事件時(shí)間語(yǔ)義
        keyedStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        //滑動(dòng)窗口  事件時(shí)間語(yǔ)義
        keyedStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //計(jì)數(shù)滾動(dòng)窗口
        keyedStream.countWindow(100);
        //計(jì)數(shù)滑動(dòng)窗口
        keyedStream.countWindow(100,20);

3.3窗口聚合算子

3.3.1 增量窗口聚合算子

數(shù)據(jù)來(lái)一條出庫(kù)一條,比如累加, min,max,minBy,maxBy,sum,reduce,aggregate

min,max,minBy,maxBy,sum

        KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);
        SingleOutputStreamOperator<EventLog> serviceTime = keyedStream
                .countWindow(10,2) // 創(chuàng)建滑動(dòng) count 窗口 
                .min("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是窗口的第一條數(shù)據(jù)
				.max("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是窗口的第一條數(shù)據(jù)
				.minBy("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是 最小 serviceTime  所在哪一行
				.maxBy("serviceTime") ; //serviceTime 是符合邏輯的,其他字段是 最大 serviceTime  所在哪一行
				.sum("serviceTime") ; // serviceTime 是符合邏輯的(serviceTime 字段之和),其他字段是窗口的第一條數(shù)據(jù)

reduce

reduce 函數(shù)的返回值寫(xiě)死了,和數(shù)據(jù)流中一樣

        keyedStream.countWindow(10,2).reduce(new ReduceFunction<EventLog>() {
            @Override
            public EventLog reduce(EventLog value1, EventLog value2) throws Exception {
                //
                return null;
            }
        })

aggregate

aggregate 函數(shù)的返回值可以自定義: 詳細(xì)見(jiàn)下面 3.4.1

    keyedStream.countWindow(10,2).aggregate(new AggregateFunction<EventLog, Object, Object>() {
            ......
        })
3.3.2 全量聚合算子

process

詳細(xì)見(jiàn)下面 3.4.2

相比于apply 更加的強(qiáng)大,可以拿到上下文

apply

        keyedStream.countWindow(10,2).apply(new WindowFunction<EventLog, Object, Long, GlobalWindow>() {
            /**
             * Evaluates the window and outputs none or several elements.
             *
             * @param key  key
             * @param window  窗口,可以拿到開(kāi)始和結(jié)束
             * @param input The elements in the window being evaluated. //窗口所有數(shù)據(jù)
             * @param out //收集器
             */
            @Override
            public void apply(Long aLong, GlobalWindow window, Iterable<EventLog> input, Collector<Object> out) throws Exception {
            }
        })
3.3.3 示例

演示 KeyedWindow 使用的較多,和NonKeyedWindow 區(qū)別也不大;

原始需求 :

計(jì)算時(shí)間窗口內(nèi) EventLog 中根據(jù)guid 分組的個(gè)數(shù)

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventLog {

    private Long guid; //用戶id
    private String event;//用戶事件
    private String timeStamp; //事件發(fā)生時(shí)間
    private Integer serviceTime; //接口時(shí)間

}

自定義source 生成數(shù)據(jù)使用

public class CustomEventLogSourceFunction implements SourceFunction<EventLog> {

	volatile boolean runFlag = true;
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");


	@Override
	public void run(SourceContext<EventLog> sourceContext) throws Exception {
        Random random = new Random();
        while (runFlag) {
            EventLog eventLog = new EventLog(Long.valueOf(random.nextInt(5)),"xw"+random.nextInt(3),simpleDateFormat.format(new Date()));
            Thread.sleep(random.nextInt(5)*1000);
            sourceContext.collect(eventLog);
		}

	}

	@Override
	public void cancel() {
		runFlag = false;
	}
}

aggregate

聚合:窗口中**,拿到每一條數(shù)據(jù)**,去做聚合計(jì)算


public class _03_Window {

	public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
		// 獲取環(huán)境
		Configuration configuration = new Configuration();
		configuration.setInteger("rest.port", 8822);
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1); //并行度設(shè)置為1 好觀察
		//DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStreamSource<EventLog> dataStream = env.addSource(new CustomEventLogSourceFunction());

		// 創(chuàng)建 Watermark 策略 事件時(shí)間推進(jìn)策略
		WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
				.<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
				.withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
					@Override
					public long extractTimestamp(EventLog element, long recordTimestamp) {
                        try {
                            return sdf.parse(element.getTimeStamp()).getTime();
                        } catch (Exception e) {
                            return 0;
                        }
                    }
				});
		// 分配wm , 使用事件時(shí)間
		DataStream<EventLog> streamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy).disableChaining();

		// 需求 10 s 統(tǒng)計(jì)30s 數(shù)據(jù) 用戶行為個(gè)數(shù)
		streamOperator.keyBy(EventLog::getGuid)
                .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 創(chuàng)建滑動(dòng)窗口
				// .apply() // 全部窗口數(shù)據(jù)給你,自定義全窗口運(yùn)算邏輯,返回計(jì)算結(jié)果,靈活度更大
                //.process() //全部窗口數(shù)據(jù)給你,自定義全窗口運(yùn)算邏輯,返回計(jì)算結(jié)果,靈活度更大 可以獲取上下文,數(shù)據(jù)更多
				.aggregate(new AggregateFunction<EventLog, Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { // api 聚合
					// 初始化累加器
					@Override
					public Tuple2<Long,Integer> createAccumulator() {
						return Tuple2.of(null,0);
					}

					// 累加邏輯
					@Override
					public Tuple2<Long,Integer> add(EventLog value, Tuple2<Long,Integer> accumulator) {
						return Tuple2.of(value.getGuid(),accumulator.f1+1);
					}

					// 獲取結(jié)果
					@Override
					public Tuple2<Long,Integer> getResult(Tuple2<Long,Integer> accumulator) {
						return accumulator;
					}

					// 分區(qū)合并 方法
					@Override
					public Tuple2<Long,Integer> merge(Tuple2<Long,Integer> a, Tuple2<Long,Integer> b) {
						return Tuple2.of(a.f0,a.f1+b.f1);
					}
				}).print();


		env.execute();

	}
}

process

聚合:窗口結(jié)束后,拿到窗口中每一條數(shù)據(jù),也可以拿到上下文,去做自定義的邏輯處理數(shù)據(jù),比 aggregate 更加的自由

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                .process(new ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>() {
                    @Override
                    public void process(Long aLong,
                                        ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>.Context context,
                                        Iterable<EventLog> elements, Collector<Tuple2<Long, Integer>> out) throws Exception {
                        TimeWindow window = context.window();
                        String start = sdf.format(new Date(window.getStart()));
                        String end = sdf.format(new Date(window.getEnd()));
                        Iterator<EventLog> iterator = elements.iterator();
                        int count = 0;
                        while (iterator.hasNext()) {
                            iterator.next();
                            count++;
                        }
                        System.out.println("start:" + start + ";end:" + end + ";count:" + count);
                        out.collect(Tuple2.of(aLong, count));
                    }
                }).print();

3.4 延遲數(shù)據(jù)處理

3.4.1 allowdLateness

允許數(shù)據(jù)遲到 3.5 的圖可以參考下


public class _05_Window_allowedLateness {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        // 獲取環(huán)境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8822);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1);
        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        // 字符串流==> 對(duì)象流

        SingleOutputStreamOperator<EventLog> outputStreamOperator = dataStreamSource.map(new MapFunction<String, EventLog>() {
            @Override
            public EventLog map(String value) throws Exception {
                String[] split = value.split(",");
                return new EventLog(Long.valueOf(split[0]), split[1], split[2]);
            }
        });

        // 創(chuàng)建 Watermark 策略 事件時(shí)間推進(jìn)策略
        WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
                .<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
                    @Override
                    public long extractTimestamp(EventLog element, long recordTimestamp) {
                         return  Long.valueOf(element.getTimeStamp());
                    }
                });
        // 分配wm , 使用事件時(shí)間
        DataStream<EventLog> streamOperator = outputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy)
                .disableChaining();
        OutputTag<EventLog> latedata = new OutputTag<EventLog>("latedata", TypeInformation.of(EventLog.class));
        KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);

        SingleOutputStreamOperator<EventLog> guid = keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(30)))
                .allowedLateness(Time.seconds(3))  //允許遲到5s
                .sideOutputLateData(latedata)  //超過(guò)允許遲到的值后 ,輸出到測(cè)流
                .apply(new WindowFunction<EventLog, EventLog, Long, TimeWindow>() {
                    long count = 0;
                    @Override
                    public void apply(Long aLong, TimeWindow window, Iterable<EventLog> input, Collector<EventLog> out) throws Exception {
                        for (EventLog eventLog : input) {
                            count+=eventLog.getGuid();
                        }
                        EventLog eventLog = new EventLog();
                        eventLog.setGuid(count);
                        out.collect(eventLog);
                        String start = sdf.format(new Date(window.getStart()));
                        String end = sdf.format(new Date(window.getEnd()));
                        System.out.println("==> start:" + start + ";end:" + end + ";count:" + count);
                    }
                });

        guid.print("主流輸出");
        DataStream<EventLog> sideOutput = guid.getSideOutput(latedata);
        sideOutput.print("測(cè)流輸出");
        env.execute();

    }
}
3.4.2 window時(shí)間&watermark

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

3.5 窗口的觸發(fā)機(jī)制

3.5.1 trigger 窗口計(jì)算觸發(fā)

窗口計(jì)算的觸發(fā),是trigger 類(lèi)決定,不同的 WindowAssigner 對(duì)應(yīng)不同的trigger

Trigger (org.apache.flink.streaming.api.windowing.triggers)
    ProcessingTimeoutTrigger (org.apache.flink.streaming.api.windowing.triggers)
    EventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    CountTrigger (org.apache.flink.streaming.api.windowing.triggers)
    DeltaTrigger (org.apache.flink.streaming.api.windowing.triggers)
    NeverTrigger in GlobalWindows (org.apache.flink.streaming.api.windowing.assigners)
    ContinuousEventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    PurgingTrigger (org.apache.flink.streaming.api.windowing.triggers)
    ContinuousProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    ProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)

EventTimeTrigger

事件時(shí)間觸發(fā)器

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    /**
    * 數(shù)據(jù)來(lái)的時(shí)候觸發(fā)
    */
    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        //窗口的最大時(shí)間小于當(dāng)前的Watermark() 開(kāi)啟新窗口
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else { //否則繼續(xù)
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    //當(dāng)事件時(shí)間推進(jìn)的時(shí)候 也判斷是否開(kāi)啟新窗口
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

     //該類(lèi)是處理事件時(shí)間,不處理ProcessingTime
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    /**
     * Creates an event-time trigger that fires once the watermark passes the end of the window.
     *
     * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

CountTrigger

數(shù)據(jù)窗口觸發(fā)器

public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private CountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    // 數(shù)量超過(guò)最大 觸發(fā)
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
// 不做 onEventTime 的邏輯潘墩
    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }
// 不做 onProcessingTime 的邏輯潘墩
    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" + maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

結(jié)合上面的例子 寫(xiě)一個(gè)自定義觸發(fā)器,當(dāng)數(shù)據(jù)流中出現(xiàn)特定值,立刻觸發(fā)窗口的計(jì)算,而不需要等到窗口結(jié)束

  class IEventTimeTrigger extends Trigger<EventLog, TimeWindow> {
    private static final long serialVersionUID = 1L;


      /**
       * watermark 是不是大于窗口結(jié)束點(diǎn) 觸發(fā)新的窗口
       */
    @Override
    public TriggerResult onElement(
            EventLog element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            if(element.getGuid().equals(1111111)){   //如果數(shù)據(jù)的GUID 是 1111111 提前觸發(fā)窗口計(jì)算0
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

}

3.5.2 evictor窗口計(jì)算移出數(shù)據(jù)

窗口觸發(fā)前/后 數(shù)據(jù)的移除機(jī)制

Evictor (org.apache.flink.streaming.api.windowing.evictors)
	CountEvictor (org.apache.flink.streaming.api.windowing.evictors)  //數(shù)量窗口清除
	DeltaEvictor (org.apache.flink.streaming.api.windowing.evictors)  //滑動(dòng)窗口清除
	TimeEvictor (org.apache.flink.streaming.api.windowing.evictors)   //時(shí)間窗口清除
public interface Evictor<T, W extends Window> extends Serializable {

//計(jì)算之前
    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

//計(jì)算之后調(diào)用
    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);
...............
}

重寫(xiě)一個(gè)ITimeEvictor 相對(duì)于 TimeEvictor 的改動(dòng)

class ITimeEvictor<W extends Window> implements Evictor<EventLog, W> {
......................

	private void evict(Iterable<TimestampedValue<EventLog>> elements, int size, EvictorContext ctx) {
		if (!hasTimestamp(elements)) {
			return;
		}

		long currentTime = getMaxTimestamp(elements);
		long evictCutoff = currentTime - windowSize;

		for (Iterator<TimestampedValue<EventLog>> iterator = elements.iterator(); iterator.hasNext();) {
			TimestampedValue<EventLog> record = iterator.next();
			// if (record.getTimestamp() <= evictCutoff)
			// 添加條件 ,窗口計(jì)算移除 Guid().equals(222222)
			if (record.getTimestamp() <= evictCutoff || record.getValue().getGuid().equals(222222)) {
				iterator.remove();
			}
		}
	}
 .................................
}
3.5.3 調(diào)用示例
   keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))
				.trigger(new IEventTimeTrigger()) // 如果數(shù)據(jù)的GUID 是 1111111 提前觸發(fā)窗口計(jì)算
				.evictor(ITimeEvictor.of(Time.seconds(30), false)) // 計(jì)算前移除GUID 是 222222的數(shù)據(jù)
				.apply(.....)
3.5.4 調(diào)用時(shí)機(jī)

Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API

  • 數(shù)據(jù)到達(dá)窗口后,調(diào)用 Trigger 的 onElement() 方法

  • 根據(jù)Trigger 的 onElement() 方法 返回值 判斷是否要觸發(fā)窗口計(jì)算

  • 若觸發(fā)窗口計(jì)算,在計(jì)算前調(diào)用Evictor(after/before) 來(lái)移除某些數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-493711.html

到了這里,關(guān)于Flink 學(xué)習(xí)六 Flink 窗口計(jì)算API的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 8 分鐘看完這 7000+ 字,F(xiàn)link 時(shí)間窗口和時(shí)間語(yǔ)義這對(duì)好朋友你一定搞得懂!外送窗口計(jì)算和水印一并搞懂?。?!

    目錄 一、時(shí)間語(yǔ)義 時(shí)間窗口 1. 前摘: 1.1 Flink的時(shí)間和窗口 1.2 什么是時(shí)間窗口和時(shí)間語(yǔ)義呢? 2. 時(shí)間窗口 2.1 舉個(gè)例子: 2.2 3個(gè)實(shí)時(shí)數(shù)據(jù)計(jì)算場(chǎng)景 3. 時(shí)間語(yǔ)義 二、Flink上進(jìn)行窗口計(jì)算: 1. 一個(gè)Flink窗口應(yīng)用的大致骨架結(jié)構(gòu) 2. Flink窗口的骨架結(jié)構(gòu)中有兩個(gè)必須的兩個(gè)操作:

    2024年01月23日
    瀏覽(26)
  • 《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例)

    《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例)

    本章基于Elastic Search 以及 Kafka 用于介紹 Flink 的 sinkTo / addSink 的 API 的使用方法,此外我們還會(huì)實(shí)現(xiàn)幾個(gè)通用的方法,在實(shí)際應(yīng)用場(chǎng)景中,針對(duì)不同的實(shí)體類(lèi)可以通過(guò)這個(gè)通用的方法來(lái)完成,而不需要一對(duì)一地實(shí)現(xiàn)。 flink 寫(xiě)數(shù)據(jù)到ES 此外,還將編寫(xiě)一個(gè)通用的工具類(lèi),用于

    2024年04月26日
    瀏覽(19)
  • 【Flink】Flink窗口觸發(fā)器

    【Flink】Flink窗口觸發(fā)器

    ? ? ? ?數(shù)據(jù)進(jìn)入到窗口的時(shí)候,窗口是否觸發(fā)后續(xù)的計(jì)算由窗口觸發(fā)器決定,每種類(lèi)型的窗口都有對(duì)應(yīng)的窗口觸發(fā)機(jī)制。WindowAssigner 默認(rèn)的 Trigger通??山鉀Q大多數(shù)的情況。我們通常使用方式如下,調(diào)用trigger()方法把我們想執(zhí)行觸發(fā)器傳遞進(jìn)去: ?SingleOutputStreamOperatorProduct

    2024年02月12日
    瀏覽(20)
  • flink 最后一個(gè)窗口一直沒(méi)有新數(shù)據(jù),窗口不關(guān)閉問(wèn)題

    flink 最后一個(gè)窗口一直沒(méi)有新數(shù)據(jù),窗口不關(guān)閉問(wèn)題

    窗口類(lèi)型:滾動(dòng)窗口 代碼: 代碼部分邏輯說(shuō)明 若設(shè)置了自動(dòng)生成watermark 參數(shù),根據(jù)打印日志,設(shè)置對(duì)應(yīng)的時(shí)間(多久沒(méi)新數(shù)據(jù)寫(xiě)入,觸發(fā)窗口計(jì)算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定義的watermark: watermark 周期生成()的疑問(wèn): 1、默認(rèn)200ms,會(huì)連續(xù)生成4次后,

    2024年01月18日
    瀏覽(22)
  • Flink中的窗口

    Flink中的窗口

    ??如下圖所示,在Flink中,窗口可以把流切割成有限大小的多個(gè)“存儲(chǔ)桶”(bucket);每個(gè)數(shù)據(jù)都會(huì)分發(fā)到對(duì)應(yīng)的桶中,當(dāng)?shù)竭_(dá)窗口結(jié)束時(shí)間時(shí),就對(duì)每個(gè)桶中收集的數(shù)據(jù)進(jìn)行計(jì)算處理。 ??注意:Flink中窗口并不是靜態(tài)準(zhǔn)備好的,而是動(dòng)態(tài)創(chuàng)建——當(dāng)有落在這個(gè)窗口區(qū)間

    2024年02月04日
    瀏覽(14)
  • Flink窗口函數(shù)

    Flink窗口函數(shù)

    Flink窗口函數(shù)是指對(duì)數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行分組和聚合操作的函數(shù)。 FlinkSQL支持對(duì)一個(gè)特定的窗口的聚合。例如有用戶想統(tǒng)計(jì)在過(guò)去的1分鐘內(nèi)有多少用戶點(diǎn)擊了某個(gè)的網(wǎng)頁(yè)。在這種情況下,我們可以定義一個(gè)窗口,用來(lái)收集最近一分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算

    2023年04月24日
    瀏覽(28)
  • flink 窗口函數(shù)

    時(shí)間語(yǔ)義 事件像水流一樣到來(lái),經(jīng)過(guò)pipline進(jìn)行處理,為了劃定窗口進(jìn)行計(jì)算,需要以時(shí)間作為標(biāo)準(zhǔn),也就是流中元素事件的先后以及間隔描述。 flink是一個(gè)分布式系統(tǒng),如何讓所有機(jī)器保證時(shí)間的完全同步呢。比如上游任務(wù)8點(diǎn)59分59秒發(fā)送了消息,到達(dá)下游時(shí)是9點(diǎn)零1秒,那

    2024年02月01日
    瀏覽(19)
  • Flink 窗口(1)—— 基礎(chǔ)概念

    Flink 窗口(1)—— 基礎(chǔ)概念

    窗口:將無(wú)限數(shù)據(jù)切割成有限的“數(shù)據(jù)塊”進(jìn)行處理,以便更高效地處理無(wú)界流 在處理無(wú)界數(shù)據(jù)流時(shí),把無(wú)界流進(jìn)行切分,每一段數(shù)據(jù)分別進(jìn)行聚合,結(jié)果只輸出一次。這就相當(dāng)于將無(wú)界流的聚合轉(zhuǎn)化為了有界數(shù)據(jù)集的聚合 在 Flink 中,窗口可以把流切割成有限大小的多個(gè)“

    2024年02月04日
    瀏覽(47)
  • Flink Windows(窗口)詳解

    Flink Windows(窗口)詳解

    Windows是流計(jì)算的核心。Windows將流分成有限大小的“buckets”,我們可以在其上應(yīng)用聚合計(jì)算( ProcessWindowFunction , ReduceFunction , AggregateFunction 或 FoldFunction )等。在Flink中編寫(xiě)一個(gè)窗口計(jì)算的基本結(jié)構(gòu)如下: Keyed Windows Non-Keyed Windows In a nutshell, a window is created as soon as the first

    2024年02月10日
    瀏覽(20)
  • Flink之窗口聚合算子

    Flink之窗口聚合算子

    1.窗口聚合算子 在Flink中窗口聚合算子主要分類(lèi)兩類(lèi) 滾動(dòng)聚合算子(增量聚合) 全窗口聚合算子(全量聚合) 1.1 滾動(dòng)聚合算子 滾動(dòng)聚合算子一次只處理一條數(shù)據(jù),通過(guò)算子中的累加器對(duì)聚合結(jié)果進(jìn)行更新,當(dāng)窗口觸發(fā)時(shí)再?gòu)睦奂悠髦腥〗Y(jié)果數(shù)據(jù),一般使用算子如下: aggregate max maxBy

    2024年02月07日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包