系列文章目錄
Flink第一章:環(huán)境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位線和窗口
Flink第五章:處理函數(shù)
Flink第六章:多流操作
Flink第七章:狀態(tài)編程
前言
這次我們來學(xué)習(xí)Flink中的狀態(tài)學(xué)習(xí)部分,創(chuàng)建以下scala文件
一、Keyed State(按鍵分區(qū))
1.KeyedStateTest.scala
這個文件里有幾個常用的狀態(tài)創(chuàng)建
package com.atguigu.chapter06
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction, RichFlatMapFunction}
import org.apache.flink.api.common.state._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object KeyedStateTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.user)
.flatMap(new MyFlatMap)
env.execute()
}
class MyFlatMap extends RichFlatMapFunction[Event, String] {
// 定義狀態(tài)
var valueState: ValueState[Event] = _
var listState: ListState[Event] = _
var mapState: MapState[String, Long] = _
var reduceState:ReducingState[Event]= _
var aggState:AggregatingState[Event,String]= _
override def open(parameters: Configuration): Unit = {
valueState = getRuntimeContext.getState(new ValueStateDescriptor[Event]("my-value", classOf[Event]))
listState = getRuntimeContext.getListState(new ListStateDescriptor[Event]("my-list", classOf[Event]))
mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("my-map", classOf[String], classOf[Long]))
reduceState=getRuntimeContext.getReducingState(new ReducingStateDescriptor[Event]("my-reduce",
new ReduceFunction[Event] {
override def reduce(t: Event, t1: Event): Event = Event(t.user,t.url,t1.timestamp)
},classOf[Event]
))
aggState=getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Event,Long,String]("my-agg",
new AggregateFunction[Event,Long,String] {
override def createAccumulator(): Long = 0L
override def add(in: Event, acc: Long): Long = acc+1
override def getResult(acc: Long): String = "當(dāng)前聚合狀態(tài)為:"+acc.toString
override def merge(acc: Long, acc1: Long): Long = ???
},classOf[Long]
))
}
override def flatMap(in: Event, collector: Collector[String]): Unit = {
// 對狀態(tài)進(jìn)行操作
println("值狀態(tài)為:" + valueState.value())
valueState.update(in)
println("值狀態(tài)為:" + valueState.value())
listState.add(in)
println("------------")
val count: Long =if (mapState.contains(in.user)) mapState.get(in.user) else 0
mapState.put(in.user,count+1)
println(s"用戶 ${in.user} 的訪問頻次為: ${mapState.get(in.user)}")
println("-------------")
reduceState.add(in)
println(reduceState.get())
println("-------------")
aggState.add(in)
println(aggState.get())
println("=================")
}
}
}
2.PeriodicPVExample.scala
按鍵分區(qū)中值狀態(tài)編程案例
我們這里會使用用戶 id 來進(jìn)行分流,然后分別統(tǒng)計(jì)每個用戶的 pv 數(shù)據(jù),由于我們并不想每次 pv 加一,就將統(tǒng)計(jì)結(jié)果發(fā)送到下游去,所以這里我們注冊了一個定時器,用來隔一段時間發(fā)送 pv 的統(tǒng)計(jì)結(jié)果,這樣對下游算子的壓力不至于太大。具體實(shí)現(xiàn)方式是定義一個用來保存定時器時間戳的值狀態(tài)變量。當(dāng)定時器觸發(fā)并向下游發(fā)送數(shù)據(jù)以后,便清空儲存定時器時間戳的狀態(tài)變量,這樣當(dāng)新的數(shù)據(jù)到來時,發(fā)現(xiàn)并沒有定時器存在,就可以注冊新的定時器了,注冊完定時器之后將定時器的時間戳繼續(xù)保存在狀態(tài)變量中。
package com.atguigu.chapter06
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object PeriodicPVExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.user)
.process(new PeriodicPv)
.print()
env.execute()
}
class PeriodicPv extends KeyedProcessFunction[String, Event, String] {
// 定義值狀態(tài),保存當(dāng)前用戶的pv數(shù)據(jù)
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
//定義值狀態(tài),保存定時器的時間戳
lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-Ts", classOf[Long]))
override def processElement(value: Event, ctx: KeyedProcessFunction[String, Event, String]#Context, out: Collector[String]): Unit = {
// 每來一個數(shù)據(jù),就將狀態(tài)中的count+1
val count: Long = countState.value()
countState.update(count + 1)
// 注冊定時器,每隔10秒輸出一次統(tǒng)計(jì)結(jié)果
if (timerTsState.value() == 0L) {
ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L)
//更新狀態(tài)
timerTsState.update(value.timestamp + 10 * 1000L)
}
}
// 定時器觸發(fā)
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect(s"用戶 ${ctx.getCurrentKey}的pv值為:${countState.value()}")
// 清理狀態(tài)
timerTsState.clear()
}
}
}
10s統(tǒng)計(jì)一次,并且不斷累加,有點(diǎn)像全局窗口.
3.TwoStreamJoinExample.scala
列表狀態(tài)編程
SELECT * FROM A INNER JOIN B WHERE A.id = B.id;
這樣一條 SQL 語句要慎用,因?yàn)?Flink 會將 A 流和 B 流的所有數(shù)據(jù)都保存下來,然后進(jìn)行 join。不過在這里我們可以用列表狀態(tài)變量來實(shí)現(xiàn)一下這個 SQL 語句的功能。
package com.atguigu.chapter06
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TwoStreamJoinExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream1: DataStream[(String, String, Long)] = env.fromElements(
("a","stream-1",1000L),
("b","stream-1",2000L),
).assignAscendingTimestamps(_._3)
val stream2: DataStream[(String, String, Long)] = env.fromElements(
("a","stream-2",3000L),
("b","stream-2",4000L),
).assignAscendingTimestamps(_._3)
// 連接兩條流進(jìn)行Join操作
stream1.keyBy(_._1)
.connect(stream2.keyBy(_._1))
.process(new TwoStreamJoin)
.print()
env.execute()
}
class TwoStreamJoin extends CoProcessFunction[(String, String, Long),(String, String, Long),String] {
// 定義列表狀態(tài),保存流中已經(jīng)到達(dá)的數(shù)據(jù)
lazy val stream1ListState: ListState[(String, String, Long)] = getRuntimeContext.getListState(new ListStateDescriptor[(String, String, Long)]("stream1-list", classOf[(String, String, Long)]))
lazy val stream2ListState: ListState[(String, String, Long)] = getRuntimeContext.getListState(new ListStateDescriptor[(String, String, Long)]("stream2-list", classOf[(String, String, Long)]))
override def processElement1(value1: (String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, Long), String]#Context, out: Collector[String]): Unit = {
// 直接添加到列表狀態(tài)中
stream1ListState.add(value1)
//遍歷另一條流中已經(jīng)到達(dá)的數(shù)據(jù),輸出配對信心
import scala.collection.convert.ImplicitConversions._
for (value2<-stream2ListState.get()){
out.collect(value1+"=>"+value2)
}
}
override def processElement2(value2: (String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, Long), String]#Context, out: Collector[String]): Unit = {
// 直接添加到列表狀態(tài)中
stream2ListState.add(value2)
//遍歷另一條流中已經(jīng)到達(dá)的數(shù)據(jù),輸出配對信心
import scala.collection.convert.ImplicitConversions._
for (value1<-stream1ListState.get()){
out.collect(value1+"=>"+value2)
}
}
}
}
4.FakeWindowExample.scala
映射狀態(tài)編程
映射狀態(tài)的用法和 Java 中的 HashMap 很相似。在這里我們可以通過 MapState 的使用來探
索一下窗口的底層實(shí)現(xiàn),也就是我們要用映射狀態(tài)來完整模擬窗口的功能。這里我們模擬一個
滾動窗口。我們要計(jì)算的是每一個 url 在每一個窗口中的 pv 數(shù)據(jù)。我們之前使用增量聚合和
全窗口聚合結(jié)合的方式實(shí)現(xiàn)過這個需求。這里我們用 MapState 再來實(shí)現(xiàn)一下。
package com.atguigu.chapter06
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object FakeWindowExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.url)
.process(new FakeWindow(10000L)) //10秒的滾動窗口
.print()
env.execute()
}
class FakeWindow(size:Long) extends KeyedProcessFunction[String,Event,String]{
//定義一個映射狀態(tài),用來保存一個窗口的pv值
lazy val windowMapSate: MapState[Long, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[Long, Long]("window-pv", classOf[Long], classOf[Long]))
override def processElement(value: Event, ctx: KeyedProcessFunction[String, Event, String]#Context, out: Collector[String]): Unit = {
//集散當(dāng)前數(shù)據(jù)落入窗口的啟示時間戳
val start: Long = value.timestamp / size * size
val end: Long = start + size
// 注冊一個定時器,用力觸發(fā)窗口計(jì)算
ctx.timerService().registerEventTimeTimer(end-1)
// 更新狀態(tài) count+1
if (windowMapSate.contains(start)){
val pv: Long = windowMapSate.get(start)
windowMapSate.put(start,pv+1)
} else {
windowMapSate.put(start,1L)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
// 定時器觸發(fā),窗口輸出結(jié)果
val start: Long = timestamp + 1 - size
val pv: Long = windowMapSate.get(start)
// 窗口輸出結(jié)果
out.collect(s"url: ${ctx.getCurrentKey} 瀏覽量為: ${pv} 窗口為:${start}-${start+size}")
//窗口銷毀
windowMapSate.remove(start)
}
}
}
5.AverageTimestampExample.scala
聚合狀態(tài)編程
我們舉一個簡單的例子,對用戶點(diǎn)擊事件流每 5 個數(shù)據(jù)統(tǒng)計(jì)一次平均時間戳。這是一個類似計(jì)數(shù)窗口(CountWindow)求平均值的計(jì)算,這里我們可以使用一個有聚合狀態(tài)的
RichFlatMapFunction 來實(shí)現(xiàn)。
package com.atguigu.chapter06
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object AverageTimestampExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
stream
.keyBy(_.url)
.flatMap(new AvgTimestamp)
.print("input")
stream.print("input")
env.execute()
}
class AvgTimestamp extends RichFlatMapFunction[Event, String] {
// 定義一個聚合狀態(tài)
lazy val avgTsAggState: AggregatingState[Event, Long] = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Event, (Long, Long), Long](
"avg-ts",
new AggregateFunction[Event, (Long, Long), Long] {
override def createAccumulator(): (Long, Long) = (0L, 0L)
override def add(in: Event, acc: (Long, Long)): (Long, Long) = (acc._1 + in.timestamp, acc._2 + 1)
override def getResult(acc: (Long, Long)): Long = acc._1 / acc._2
override def merge(acc: (Long, Long), acc1: (Long, Long)): (Long, Long) = ???
},
classOf[(Long,Long)]
))
// 定義一個值狀態(tài),保存當(dāng)前到達(dá)的數(shù)據(jù)個數(shù)
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
override def flatMap(in: Event, collector: Collector[String]): Unit = {
avgTsAggState.add(in)
// 更新count值
val count: Long = countState.value()
countState.update(count+1)
if (countState.value()==5){
collector.collect(s"${in.user}的平均時間戳為: ${avgTsAggState.get()}")
countState.clear()
}
}
}
}
二、Operator State(算子狀態(tài))
1.BufferingSinkExample.scala
在下面的例子中,自定義的 SinkFunction 會在CheckpointedFunction 中進(jìn)行數(shù)據(jù)緩存,然后統(tǒng)一發(fā)送到下游。這個例子演示了列表狀態(tài)的平均分割重組(event-split redistribution)。
package com.atguigu.chapter06
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import scala.collection.mutable.ListBuffer
object BufferingSinkExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
.addSink(new BufferingSink(10))
env.execute()
}
// 實(shí)現(xiàn)自定義SinkFunction
class BufferingSink(threshold: Int) extends SinkFunction[Event] with CheckpointedFunction {
// 定義列表狀態(tài),保存要緩沖的數(shù)據(jù)
var bufferedState: ListState[Event] = _
// 定義本地變量列表
val bufferedList: ListBuffer[Event] = ListBuffer[Event]()
override def invoke(value: Event, context: SinkFunction.Context): Unit = {
// 緩沖數(shù)據(jù)
bufferedList+=value
// 判斷是否達(dá)到閾值
if (bufferedList.size==threshold){
// 輸出到外部系統(tǒng),打印
bufferedList.foreach(data=>println(data))
println("=======輸出完畢==============")
// 清空緩沖
bufferedList.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
// 清空狀態(tài)
bufferedState.clear()
for (data <- bufferedList){
bufferedState.add(data)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
bufferedState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Event]("buffered-list", classOf[Event]))
// 判斷如果是從故障中恢復(fù),那么就將狀態(tài)中的數(shù)據(jù)添加到局部變量中
if (context.isRestored) {
import scala.collection.convert.ImplicitConversions._
for (data <- bufferedState.get()) {
bufferedList += data
}
}
}
}
}
三、Broadcast State(廣播狀態(tài))
1.BroadcastStateExample.scala
行為匹配案例
package com.atguigu.chapter06
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// 聲明樣例類
case class Action(userid: String, action: String)
case class Pattern(action1: String, action2: String)
object BroadcastStateExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 定義數(shù)據(jù)流
val actionStream: DataStream[Action] = env.fromElements(
Action("Alice", "login"),
Action("Alice", "pay"),
Action("Bob", "login"),
Action("Bob", "buy")
)
// 定義模式流,讀取指定的行為模式
val patternStream: DataStream[Pattern] = env.fromElements(
Pattern("login", "pay"),
Pattern("login", "buy")
)
// 定義廣播狀態(tài)的描述器
val patterns = new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern])
val broadcastStream: BroadcastStream[Pattern] = patternStream.broadcast(patterns)
// 連接兩條流,進(jìn)行處理
actionStream.keyBy(_.userid)
.connect(broadcastStream)
.process(new PatternEvaluation)
.print()
env.execute()
}
// 實(shí)現(xiàn)自定義的KeyedbroadcastProcessFunction
class PatternEvaluation extends KeyedBroadcastProcessFunction[String,Action,Pattern,(String,Pattern)]{
// 定義值狀態(tài),保存上一次用戶行為
lazy val prevActionState: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("prev-action", classOf[String]))
override def processElement(value: Action, ctx: KeyedBroadcastProcessFunction[String, Action, Pattern, (String, Pattern)]#ReadOnlyContext, out: Collector[(String, Pattern)]): Unit = {
// 從廣播狀態(tài)中獲取行為數(shù)據(jù)
val pattern= ctx.getBroadcastState(new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern]))
.get(Unit)
// 從值狀態(tài)中獲取上次的行為
val prevAction: String = prevActionState.value()
if (pattern != null && prevAction != null){
if (pattern.action1==prevAction && pattern.action2==value.action){
out.collect((ctx.getCurrentKey,pattern))
}
}
// 保存狀態(tài)
prevActionState.update(value.action)
}
override def processBroadcastElement(value: Pattern, ctx: KeyedBroadcastProcessFunction[String, Action, Pattern, (String, Pattern)]#Context, out: Collector[(String, Pattern)]): Unit = {
val bcState: BroadcastState[Unit, Pattern] = ctx.getBroadcastState(new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern]))
bcState.put(Unit,value)
}
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-461620.html
總結(jié)
這次記錄就到這里.文章來源地址http://www.zghlxwxcb.cn/news/detail-461620.html
到了這里,關(guān)于Flink第七章:狀態(tài)編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!