国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink Windows(窗口)詳解

這篇具有很好參考價值的文章主要介紹了Flink Windows(窗口)詳解。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Windows(窗口)

Windows是流計算的核心。Windows將流分成有限大小的“buckets”,我們可以在其上應(yīng)用聚合計算(ProcessWindowFunction,ReduceFunction,AggregateFunctionFoldFunction)等。在Flink中編寫一個窗口計算的基本結(jié)構(gòu)如下:

Keyed Windows

stream
    .keyBy(...)                
    .window(...)               <-  必須制定: 窗口類型
    [.trigger(...)]            <-  可選: "trigger" (都有默認(rèn) 觸發(fā)器),決定窗口什么時候觸發(fā)
    [.evictor(...)]            <-  可選: "evictor" (默認(rèn) 沒有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可選: "lateness" (默認(rèn) 0),不允許又遲到的數(shù)據(jù)
    [.sideOutputLateData(...)] <-  可選: "output tag" 將遲到的數(shù)據(jù)輸出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必須指定: "function",實現(xiàn)對窗口數(shù)據(jù)的聚合計算
    [.getSideOutput(...)]      <-  可選: "output tag" 獲取Sideout的數(shù)據(jù),一般處理遲到數(shù)據(jù)

Non-Keyed Windows

stream
    .windowAll(...)            <-  必須制定: 窗口類型
    [.trigger(...)]            <-  可選: "trigger" (都有默認(rèn) 觸發(fā)器),決定窗口什么時候觸發(fā)
    [.evictor(...)]            <-  可選: "evictor" (默認(rèn) 沒有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可選: "lateness" (默認(rèn) 0),不允許又遲到的數(shù)據(jù)
    [.sideOutputLateData(...)] <-  可選: "output tag" 將遲到的數(shù)據(jù)輸出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必須指定: "function",實現(xiàn)對窗口數(shù)據(jù)的聚合計算
    [.getSideOutput(...)]      <-  可選: "output tag" 獲取Sideout的數(shù)據(jù),一般處理遲到數(shù)據(jù)

Window Lifecycle(生命周期)

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners).

in addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied.

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Window Assigners(窗口分配器)

The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyedstreams) or the windowAll() (for non-keyed streams) call.

A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time.

Tumbling Windows

滾動窗口長度固定,滑動間隔等于窗口長度,窗口元素之間沒有交疊。

Flink Windows(窗口)詳解,flink,flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

Sliding Windows

滑動窗口長度固定,窗口長度大于窗口滑動間隔,元素存在交疊。

Flink Windows(窗口)詳解,flink,flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e <- elements){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Session Windows(MergerWindow)

通過計算元素時間間隔,如果間隔小于session gap,則會合并到一個窗口中;如果大于時間間隔,當(dāng)前窗口關(guān)閉,后續(xù)的元素屬于新的窗口。與滾動窗口和滑動窗口不同的是會話窗口沒有固定的窗口大小,底層本質(zhì)上做的是窗口合并。

Flink Windows(窗口)詳解,flink,flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.apply(new WindowFunction[(String,Int),String,String,TimeWindow]{
    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Global Windows

全局窗口會將所有key相同的元素放到一個窗口中,默認(rèn)該窗口永遠(yuǎn)都不會關(guān)閉(永遠(yuǎn)都不會觸發(fā)),因為該窗口沒有默認(rèn)的窗口觸發(fā)器Trigger,因此需要用戶自定義Trigger。
Flink Windows(窗口)詳解,flink,flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of[GlobalWindow](3))
.apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{
    override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        println("=======window========")
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Window Functions

當(dāng)系統(tǒng)認(rèn)定窗口就緒之后會調(diào)用Window Functions對窗口實現(xiàn)聚合計算。常見的Window Functions有以下形式: ReduceFunction, AggregateFunction, FoldFunction 或者ProcessWindowFunction|WindowFunction(古董|舊版)。

ReduceFunction

class SumReduceFunction extends ReduceFunction[(String,Int)]{
  override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
    (v1._1,v1._2+v2._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new SumReduceFunction)// .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

AggregateFunction

class SumAggregateFunction extends AggregateFunction[(String,Int),(String,Int),(String,Int)]{
  override def createAccumulator(): (String,Int) = {
    ("",0)
  }
  override def merge(a: (String,Int), b: (String,Int)): (String,Int) = {
    (a._1,a._2+b._2)
  }
  override def add(value: (String, Int), accumulator: (String,Int)): (String,Int) = {
    (value._1,accumulator._2+value._2)
  }
  override def getResult(accumulator: (String,Int)): (String, Int) = {
    accumulator
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("CentOS",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new SumAggregateFunction)
.print()
env.execute("window")

FoldFunction

class SumFoldFunction  extends  FoldFunction[(String,Int),(String,Long)]{
  override def fold(accumulator: (String, Long), value: (String, Int)): (String, Long) = {
    (value._1,accumulator._2+value._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",8877)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//.fold(("",0L),new SumFoldFunction)
.fold(("",0L))((acc,v)=>(v._1,acc._2+v._2))
.print()
env.execute("window")

ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String,Int)]): Unit = {
        val results = elements.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
        out.collect(results)
    }
}).print()
env.execute("window")
globalState() | windowState()
  • globalState(), which allows access to keyed state that is not scoped to a window
  • windowState(), which allows access to keyed state that is also scoped to the window
var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    var wvds: ValueStateDescriptor[Int] = _
    var gvds: ValueStateDescriptor[Int] = _

    override def open(parameters: Configuration): Unit = {
        wvds = new ValueStateDescriptor[Int]("window-value", createTypeInformation[Int])
        gvds = new ValueStateDescriptor[Int]("global-value", createTypeInformation[Int])
    }

    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        val ws = context.windowState.getState(wvds)
        val gs=context.globalState.getState(gvds)
        val historyWindowValue = ws.value()
        val historyGlobalValue = gs.value()
        out.collect((key, historyWindowValue + total))
        context.output(globalTag, (key, historyGlobalValue + total))
        ws.update(historyWindowValue + total)
        gs.update(historyGlobalValue + total)
    }
})
countsStream.print("窗口統(tǒng)計")
countsStream.getSideOutput(globalTag).print("全局輸出")
env.execute("window")

ReduceFunction+ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.reduce(new SumReduceFunction,new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
})
countsStream.print("窗口統(tǒng)計")
countsStream.getSideOutput(globalTag).print("全局輸出")
env.execute("window")
var env=StreamExecutionEnvironment.getExecutionEnvironment
val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.fold(("",0L),new SumFoldFunction,new ProcessWindowFunction[(String, Long), (String, Long), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[(String, Long)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
}).print()
env.execute("window")

WindowFunction(不常用)

遺產(chǎn)或古董,一般用ProcessWindowFunction替代。

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1) //不能按照position進(jìn)行keyBy()
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new WindowFunction[(String,Int),(String,Int),String,TimeWindow] {
    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        out.collect((key,input.map(_._2).sum))
    }
}).print()
env.execute("window")

Triggers(觸發(fā)器)

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

WindowAssigners 觸發(fā)器
global window NeverTrigger
event-time window EventTimeTrigger
processing-time window ProcessingTimeTrigger

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

DeltaTrigger

var env=StreamExecutionEnvironment.getExecutionEnvironment

val deltaTrigger =  DeltaTrigger.of[(String,Double),GlobalWindow](2.0,new DeltaFunction[(String,Double)] {
    override def getDelta(oldDataPoint: (String, Double), newDataPoint: (String, Double)): Double = {
        newDataPoint._2-oldDataPoint._2
    }
},createTypeInformation[(String,Double)].createSerializer(env.getConfig))

env.socketTextStream("centos",7788)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble))
.keyBy(0)
.window(GlobalWindows.create())
.trigger(deltaTrigger)
.reduce((v1:(String,Double),v2:(String,Double))=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

evictor(剔出)

The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:

public interface Evictor<T, W extends Window> extends Serializable {
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

}

ErrorEvitor

class ErrorEvictor(isBefore:Boolean) extends Evictor[String,TimeWindow] {
    override def evictBefore(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }

    override def evictAfter(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(!isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }
    private def evictor(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit={
        val iterator = elements.iterator()
        while(iterator.hasNext){
            val it = iterator.next()
            if(it.getValue.contains("error")){//將 含有error數(shù)據(jù)剔出
                iterator.remove()
            }
        }
    }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",7788)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.evictor(new ErrorEvictor(true))
.apply(new AllWindowFunction[String,String,TimeWindow] {
    override def apply(window: TimeWindow, input: Iterable[String], out: Collector[String]): Unit = {
        for(e <- input){
            out.collect(e)
        }
        print()
    }
})
.print()

fsEnv.execute("window")

Event Time

Flink在做窗口計算的時候支持以下語義的window:Processing time、Event time、Ingestion time

Processing time:使用處理節(jié)點時間,計算窗口

Event time:使用事件產(chǎn)生時間,計算窗口- 精確

Ingestion time:數(shù)據(jù)進(jìn)入到Flink的時間,一般是通過SourceFunction指定時間

Flink Windows(窗口)詳解,flink,flink

默認(rèn)Flink使用的是ProcessingTime ,因此一般情況下如果用戶需要使用 Event time/Ingestion time需要設(shè)置時間屬性

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//window  操作
fsEnv.execute("event time")

一旦設(shè)置基于EventTime處理,用戶必須聲明水位線的計算策略,系統(tǒng)需要給每一個流計算出水位線時間T,只有窗口的end time T’ < = watermarker(T)的時候,窗口才會被觸發(fā)。在Flink當(dāng)中需要用戶實現(xiàn)水位線計算的方式,系統(tǒng)并不提供實現(xiàn)。觸發(fā)水位線的計算方式有兩種:①一種是基于定時Interval(推薦)、②通過記錄觸發(fā),每來一條記錄系統(tǒng)會立即更新水位線。

定時

class AccessLogAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def getCurrentWatermark: Watermark = {
    return  new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = {
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

基于記錄

class AccessLogAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def checkAndGetNextWatermark(lastElement: AccessLog, extractedTimestamp: Long): Watermark = {
    new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = { 
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

Watermarker

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//設(shè)置水位線定期計算頻率 1s/每次
fsEnv.setParallelism(1)
//模塊信息 時間
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

遲到數(shù)據(jù)處理

Flink支持對遲到數(shù)據(jù)處理,如果watermaker - window end < allow late time 記錄可以參與窗口計算,否則Flink將too late數(shù)據(jù)丟棄。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//設(shè)置水位線定期計算頻率 1s/每次
fsEnv.setParallelism(1)
//模塊信息 時間
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

fsEnv.execute("event time")

Flink默認(rèn)對too late數(shù)據(jù)采取的是丟棄,如果用戶想拿到過期的數(shù)據(jù),可以使用sideout方式

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//設(shè)置水位線定期計算頻率 1s/每次
fsEnv.setParallelism(1)

val lateTag = new OutputTag[AccessLog]("latetag")
//模塊信息 時間
val keyedWindowStream=fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateTag)
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})

keyedWindowStream.print("正常:")
keyedWindowStream.getSideOutput(lateTag).print("too late:")

fsEnv.execute("event time")

當(dāng)流中存在多個水位線,系統(tǒng)在計算的時候取最低。

Joining

Window Join

基本語法

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Tumbling Window Join

Flink Windows(窗口)詳解,flink,flink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
                        .map(line=>line.split("\\s+"))
                        .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
                        .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
                        .setParallelism(1)

userStream.join(orderStream)
            .where(user=>user.id)
            .equalTo(orderItem=> orderItem.uid)
            .window(TumblingEventTimeWindows.of(Time.seconds(4)))
            .apply((u,o)=>{
                (u.id,u.name,o.name,o.price,o.ts)
            })
.print()

fsEnv.execute("FlinkStreamSlidingWindowJoin")

Sliding Window Join

Flink Windows(窗口)詳解,flink,flink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
    .map(line=>line.split("\\s+"))
    .map(ts=>User(ts(0),ts(1),ts(2).toLong))
    .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
    .setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
    .map(line=>line.split("\\s+"))
    .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
    .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
    .setParallelism(1)

userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamTumblingWindowJoin")
Session Window Join

Flink Windows(窗口)詳解,flink,flink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Interval Join

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.
Flink Windows(窗口)詳解,flink,flink

This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.id)


//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.uid)


userStream.intervalJoin(orderStream)
.between(Time.seconds(-1),Time.seconds(1))
.process(new ProcessJoinFunction[User,OrderItem,String]{
    override def processElement(left: User, right: OrderItem, ctx: ProcessJoinFunction[User, OrderItem, String]#Context, out: Collector[String]): Unit = {
        println(left+" \t"+right)
        out.collect(left.id+" "+left.name+" "+right.name+" "+ right.price+" "+right.ts)
    }
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Flink HA

The JobManager coordinates every Flink deployment. It is responsible for both scheduling and resource management.

By default, there is a single JobManager instance per Flink cluster. This creates a single point of failure (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the SPOF. You can configure high availability for both standalone and YARN clusters.

Standalone Cluster High Availability

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failureand programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.

Flink Windows(窗口)詳解,flink,flink

搭建過程

先決條件(略)

  • 安裝JDK
  • 安裝HADOOP HDFS-HA
  • 安裝Zookeeper

Flink環(huán)境構(gòu)建

  • 配置HADOOP_CLASSPATH
[root@CentOSX ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
[root@CentOSX ~]# source .bashrc
[root@CentOSX ~]# echo $HADOOP_CLASSPATH
/usr/hadoop-2.9.2/etc/hadoop:/usr/hadoop-2.9.2/share/hadoop/common/lib/*:/usr/hadoop-2.9.2/share/hadoop/common/*:/usr/hadoop-2.9.2/share/hadoop/hdfs:/usr/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/usr/hadoop-2.9.2/share/hadoop/hdfs/*:/usr/hadoop-2.9.2/share/hadoop/yarn/lib/*:/usr/hadoop-2.9.2/share/hadoop/yarn/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/*:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar

  • 上傳Flink,配置Flink
[root@CentOSX ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[root@CentOSA ~]# cd /usr/flink-1.8.1
[root@CentOSA flink-1.8.1]# vi conf/flink-conf.yaml

#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true

[root@CentOSX flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[root@CentOSA flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

啟動Flink集群

[root@CentOSA flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群啟動完成后,查看JobManager任務(wù)的日志,在lead主機中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

測試HA

登陸獲取leadership的節(jié)點,然后執(zhí)行以下指令

[root@CentOSB flink-1.8.1]# ./bin/jobmanager.sh stop

查看其它節(jié)點,按照上訴的測試方式,可以查找leadership日志輸出的節(jié)點,該節(jié)點就是master節(jié)點。文章來源地址http://www.zghlxwxcb.cn/news/detail-692620.html

到了這里,關(guān)于Flink Windows(窗口)詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:《Flink 官方文檔 - DataStream API - 算子 - 窗口》 學(xué)習(xí)筆記如下: 窗口(Window):窗口是處理無界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的 “桶” 中,再對每個 “桶” 加以處理。 Keyed Windows 在 Keyed Windows 上使用窗口時,要調(diào)用 keyBy(...) 而后再調(diào)用 window(..

    2024年01月18日
    瀏覽(58)
  • 【Flink】Flink 中的時間和窗口之水位線(Watermark)

    【Flink】Flink 中的時間和窗口之水位線(Watermark)

    這里先介紹一下什么是 時間語義 , 時間語義 在Flink中是一種很重要的概念,下面介紹的 水位線 就是基于 時間語義 來講的。 在Flink中我們提到的時間語義一般指的是 事件時間 和 處理時間 : 處理時間(Processing Time) ,一般指執(zhí)行處理操作的系統(tǒng)時間,也就是Flink的窗口算子

    2024年02月07日
    瀏覽(21)
  • Flink中的窗口

    Flink中的窗口

    ??如下圖所示,在Flink中,窗口可以把流切割成有限大小的多個“存儲桶”(bucket);每個數(shù)據(jù)都會分發(fā)到對應(yīng)的桶中,當(dāng)?shù)竭_(dá)窗口結(jié)束時間時,就對每個桶中收集的數(shù)據(jù)進(jìn)行計算處理。 ??注意:Flink中窗口并不是靜態(tài)準(zhǔn)備好的,而是動態(tài)創(chuàng)建——當(dāng)有落在這個窗口區(qū)間

    2024年02月04日
    瀏覽(14)
  • Flink窗口函數(shù)

    Flink窗口函數(shù)

    Flink窗口函數(shù)是指對數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行分組和聚合操作的函數(shù)。 FlinkSQL支持對一個特定的窗口的聚合。例如有用戶想統(tǒng)計在過去的1分鐘內(nèi)有多少用戶點擊了某個的網(wǎng)頁。在這種情況下,我們可以定義一個窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進(jìn)行計算

    2023年04月24日
    瀏覽(28)
  • flink 窗口函數(shù)

    時間語義 事件像水流一樣到來,經(jīng)過pipline進(jìn)行處理,為了劃定窗口進(jìn)行計算,需要以時間作為標(biāo)準(zhǔn),也就是流中元素事件的先后以及間隔描述。 flink是一個分布式系統(tǒng),如何讓所有機器保證時間的完全同步呢。比如上游任務(wù)8點59分59秒發(fā)送了消息,到達(dá)下游時是9點零1秒,那

    2024年02月01日
    瀏覽(19)
  • Flink 窗口(1)—— 基礎(chǔ)概念

    Flink 窗口(1)—— 基礎(chǔ)概念

    窗口:將無限數(shù)據(jù)切割成有限的“數(shù)據(jù)塊”進(jìn)行處理,以便更高效地處理無界流 在處理無界數(shù)據(jù)流時,把無界流進(jìn)行切分,每一段數(shù)據(jù)分別進(jìn)行聚合,結(jié)果只輸出一次。這就相當(dāng)于將無界流的聚合轉(zhuǎn)化為了有界數(shù)據(jù)集的聚合 在 Flink 中,窗口可以把流切割成有限大小的多個“

    2024年02月04日
    瀏覽(47)
  • Flink 窗口

    介紹:流式計算是一種被設(shè)計用于處理無限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無限數(shù)據(jù)集是指一種不斷增長的本質(zhì)上無限的數(shù)據(jù)集,而 window 是一種切割無限數(shù)據(jù)為有限塊進(jìn)行處理的手段,其分為兩種類型:1、時間窗口,2:計數(shù)窗口 時間窗口根據(jù)窗口實現(xiàn)原理的不同分成三類:滾

    2024年02月09日
    瀏覽(14)
  • Flink之窗口聚合算子

    Flink之窗口聚合算子

    1.窗口聚合算子 在Flink中窗口聚合算子主要分類兩類 滾動聚合算子(增量聚合) 全窗口聚合算子(全量聚合) 1.1 滾動聚合算子 滾動聚合算子一次只處理一條數(shù)據(jù),通過算子中的累加器對聚合結(jié)果進(jìn)行更新,當(dāng)窗口觸發(fā)時再從累加器中取結(jié)果數(shù)據(jù),一般使用算子如下: aggregate max maxBy

    2024年02月07日
    瀏覽(20)
  • Flink中的時間和窗口

    Flink中的時間和窗口

    在傳統(tǒng)的批處理系統(tǒng)中,我們可以等到一批數(shù)據(jù)全部都到齊了之后,對其做相關(guān)的計算;但是在實時處理系統(tǒng)中,數(shù)據(jù)是源源不斷的,正常情況下,我們就得來一條處理一條。那么,我們應(yīng)該如何統(tǒng)計某個實時數(shù)據(jù)源中最近一段時間內(nèi)的數(shù)據(jù)呢? 在Flink的觀念中,引入了“窗

    2024年02月08日
    瀏覽(17)
  • Flink之Window窗口機制

    Flink之Window窗口機制

    在大多數(shù)場景下,需要統(tǒng)計的數(shù)據(jù)流都是無界的,因此無法等待整個數(shù)據(jù)流終止后才進(jìn)行統(tǒng)計。通常情況下,只需要對某個時間范圍或者數(shù)量范圍內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計分析 例如: 因此,在Apache Flink中,窗口是對無界數(shù)據(jù)流進(jìn)行有界處理的機制。窗口可以將無限的數(shù)據(jù)流劃分為

    2024年02月06日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包