系列文章目錄
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(上)
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)
4. 智能實時報警
本模塊主要負(fù)責(zé)城市交通管理中,可能存在違章或者違法非常嚴(yán)重的行為,系統(tǒng)可以自動實時報警??梢詫崿F(xiàn)億級數(shù)據(jù)在線分布式計算秒級反饋。滿足實戰(zhàn)的“實時”需要,爭分奪秒、聚力辦案。做的真正“零”延遲的報警和出警。主要功能包括:實時套牌分析,實時危險駕駛分析等。
4.1 實時套牌分析
當(dāng)某個卡口中出現(xiàn)一輛行駛的汽車,我們可以通過攝像頭識別車牌號,然后在10秒內(nèi),另外一個卡口(或者當(dāng)前卡口)也識別到了同樣車牌的車輛,那么很有可能這兩輛車之中有很大幾率存在套牌車,因為一般情況下不可能有車輛在10秒內(nèi)經(jīng)過兩個卡口。如果發(fā)現(xiàn)涉嫌套牌車,系統(tǒng)實時發(fā)出報警信息,同時這些存在套牌車嫌疑的車輛,寫入Mysql數(shù)據(jù)庫的結(jié)果表中,在后面的模塊中,可以對這些違法車輛進(jìn)行實時軌跡跟蹤。
本需求可以使用CEP編程,也可以使用狀態(tài)編程。我們采用狀態(tài)編程。
完整的代碼:
object RepatitionCarWarning {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//導(dǎo)入scala包
import org.apache.flink.streaming.api.scala._
//設(shè)置并行度
env.setParallelism(1)
//設(shè)置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("group.id","test4")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
val trafficDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
// val trafficDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
.map(line => {
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
override def extractTimestamp(element: TrafficLog): Long = element.actionTime
})
trafficDStream.keyBy(_.car).process(new KeyedProcessFunction[String,TrafficLog,RepatitionCarInfo] {
lazy private val valueState: ValueState[TrafficLog] = getRuntimeContext.getState(new ValueStateDescriptor[TrafficLog]("valueState",classOf[TrafficLog]))
override def processElement(value: TrafficLog, ctx: KeyedProcessFunction[String, TrafficLog, RepatitionCarInfo]#Context, out: Collector[RepatitionCarInfo]): Unit = {
if(valueState.value() != null){//如果狀態(tài)中包含當(dāng)前車輛
val log: TrafficLog = valueState.value()
//同一車輛數(shù)據(jù),判斷兩次通過卡扣間隔時長
var dur = (log.actionTime - value.actionTime).abs
if(dur < 10*1000){
out.collect(new RepatitionCarInfo(value.car,"涉嫌套牌",System.currentTimeMillis(),
s"該車輛連續(xù)兩次經(jīng)過的卡扣及對應(yīng)時間為:${log.monitorId} - ${log.actionTime} , ${value.monitorId} - ${value.actionTime} "))
}
//更新狀態(tài)數(shù)據(jù)
if(log.actionTime < value.actionTime){
valueState.update(value)
}
}else{ //狀態(tài)中不包含當(dāng)前車輛
valueState.update(value)
}
}
})
.addSink(new JdbcWriteSink[RepatitionCarInfo]("RepatitionCarInfo"))
env.execute()
}
}
4.2 實時危險駕駛分析
在本項目中,危險駕駛是指在道路上駕駛機動車:追逐超速競駛。我們規(guī)定:如果一輛機動車在2分鐘內(nèi),超速通過卡口超過3次以上;而且每次超速的超過了規(guī)定速度的20%以上;這樣的機動車涉嫌危險駕駛。系統(tǒng)需要實時找出這些機動車,并報警,追蹤這些車輛的軌跡。注意:如果有些卡口沒有設(shè)置限速值,可以設(shè)置一個城市默認(rèn)限速。
這樣的需求在Flink也是有兩種解決思路,第一:狀態(tài)編程。第二:CEP編程。但是當(dāng)前的需求使用狀態(tài)編程過于復(fù)雜了。所以我們采用第二種。同時還要注意:Flume在采集數(shù)據(jù)的過程中出現(xiàn)了數(shù)據(jù)亂序問題,一般最長延遲5秒。
涉嫌危險駕駛的車輛信息保存到Mysql數(shù)據(jù)庫表(t_violation_list)中,以便后面的功能中統(tǒng)一追蹤這些車輛的軌跡。
注意:如果要設(shè)置水位線需要設(shè)置在兩個連接流連接之后。
完整的代碼:
case class newTrafficLog(actionTime:Long,monitorId:String,cameraId:String,car:String,speed:Double,roadId:String,areaId:String,monitorLimitSpeed:Int)
object DangerDriveCarWarning {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//設(shè)置并行度
env.setParallelism(1)
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("group.id","test5")
props.setProperty("key.serializer",classOf[StringDeserializer].getName)
props.setProperty("value.serializer",classOf[StringDeserializer].getName)
//設(shè)置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//主流
// val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
.map(line => {
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
})
//廣播流,讀取mysql中的數(shù)據(jù),這里主要是讀取卡扣限速的數(shù)據(jù)
val bcDStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo"))
.map(
one => {
one.asInstanceOf[MonitorLimitSpeedInfo]
})
.broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
//將日志流與廣播流進(jìn)行整合,將道路卡扣限速信息與每條車輛運行日志數(shù)據(jù)結(jié)合
val trafficAllInfoDStream: DataStream[newTrafficLog] = mainDStream.connect(bcDStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog] {
//處理每個日志元素
override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#ReadOnlyContext, out: Collector[newTrafficLog]): Unit = {
//獲取狀態(tài)
val mapState: ReadOnlyBroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
//獲取當(dāng)前道路當(dāng)前卡扣 對應(yīng)的限速 ,如果沒有就設(shè)置限速為80
var limitSpeed = 80
if (mapState.contains(value.roadId + "_" + value.monitorId)) {
limitSpeed = mapState.get(value.roadId + "_" + value.monitorId).speedLimit
}
out.collect(new newTrafficLog(value.actionTime, value.monitorId, value.cameraId, value.car, value.speed, value.roadId, value.areaId, limitSpeed))
}
//處理廣播元素
override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#Context, out: Collector[newTrafficLog]): Unit = {
//獲取狀態(tài)
val mapState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
//更新當(dāng)前道路當(dāng)前卡扣的限速數(shù)據(jù)
mapState.put(value.roadId + "_" + value.monitorId, value)
println("廣播狀態(tài)準(zhǔn)備就緒")
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[newTrafficLog](Time.seconds(0)) {
override def extractTimestamp(element: newTrafficLog): Long = element.actionTime
})
val keyDs: KeyedStream[newTrafficLog, String] = trafficAllInfoDStream.keyBy(_.car)//按照車輛分組
//使用CEP編程定義模式, 在1分鐘內(nèi)連續(xù)3次超速20%通過道路卡扣的車輛
val pattern:Pattern[newTrafficLog, newTrafficLog] =
Pattern.begin[newTrafficLog]("start").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
.followedBy("second").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
.followedBy("third").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
.within(Time.minutes(1))//注意:這里的時間指的是 各個時間之間的相差時間不超過1分鐘。時間采用的是事件時間
val patternStream: PatternStream[newTrafficLog] = CEP.pattern(keyDs,pattern)
val result: DataStream[DangerDriveCarInfo] = patternStream.select((map: Map[String, Iterable[newTrafficLog]]) => {
val begin: newTrafficLog = map.get("start").get.last
val second: newTrafficLog = map.get("second").get.last
val third: newTrafficLog = map.get("third").get.last
val builder = s"第一次通過卡扣${begin.monitorId},當(dāng)前限速:${begin.monitorLimitSpeed},通過的速度為:${begin.speed} |" +
s"第二次通過卡扣${second.monitorId},當(dāng)前限速:${second.monitorLimitSpeed},通過的速度為:${second.speed}|" +
s"第三次通過卡扣${third.monitorId},當(dāng)前限速:${third.monitorLimitSpeed},通過的速度為:${third.speed}"
DangerDriveCarInfo(begin.car, "危險駕駛", System.currentTimeMillis(), builder.toString)
})
// result.print()
result.addSink(new JdbcWriteSink[DangerDriveCarInfo]("DangerDriveCarInfo"))
env.execute()
}
}
4.3 出警分析
當(dāng)監(jiān)控到道路中有一起違法交通事故時,例如:車輛危險駕駛、車輛套牌、發(fā)生交通事故等,會有對應(yīng)的交警出警處理案情。違法事故實時數(shù)據(jù)會被實時監(jiān)控放入topicA,交通警察出警記錄會實時上報數(shù)據(jù)被放入topicB中,這里需要對違法交通事故的出警情況進(jìn)行分析并對超時未處理的警情作出對應(yīng)的預(yù)警。
出警分析如下:如果在topicA中出現(xiàn)一條違法車輛信息,如果在5分鐘內(nèi)已經(jīng)出警,將出警信息輸出到結(jié)果庫中。如果5分鐘內(nèi)沒有出警則發(fā)出出警提示。(發(fā)出出警的提示,在側(cè)流中發(fā)出)。
這里為了方便演示,將從socket中讀取數(shù)據(jù)。
(1)使用IntervalJoin實現(xiàn),這是只能輸出出警信息
object PoliceAnalysis1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
//設(shè)置事件時間
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("group.id","test6")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
//獲取監(jiān)控違法車輛信息
// val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
val arr: Array[String] = line.split(",")
IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
})
//獲取出警信息
// val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
val arr: Array[String] = line.split(",")
PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
})
//兩個流進(jìn)行 intervalJoin ,相對于join ,這里不需要設(shè)置窗口,必須后面跟上between 來以時間范圍大小進(jìn)行join
illegalDStream.keyBy(_.car).intervalJoin(policeDStream.keyBy(_.car))
//這里假設(shè) 違法信息 illegalDStream 先出現(xiàn),policeDStream數(shù)據(jù)流后出現(xiàn)
//between(Time.seconds(10),Time.seconds(10))相當(dāng)于 illegalDStream.eventTime - 10s <= policeDStream.reporTime <= illegalDStream.eventTime + 10s
//例如 illegalDStream.eventTime 為 20:05:30 可以與 policeDStream.reporTime 為 20:05:20 - 20:05:40 范圍內(nèi)的數(shù)據(jù)進(jìn)行匹配
.between(Time.seconds(-10),Time.seconds(10))
.process(new ProcessJoinFunction[IllegalCarInfo,PoliceInfo,String] {
override def processElement(left: IllegalCarInfo, right: PoliceInfo, ctx: ProcessJoinFunction[IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
out.collect(s"違法車輛:${left.car} 已經(jīng)出警,警號:${right.policeId},事故時間:${left.eventTime},出警時間:${right.reporTime}")
}
}).print()
env.execute()
}
}
(2)使用兩個流的connect,可以監(jiān)測事故超時出警信息
object PoliceAnalysis2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
//設(shè)置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設(shè)置并行度為1
env.setParallelism(1)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("group.id","test6")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
//獲取監(jiān)控違法車輛信息
// val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
val arr: Array[String] = line.split(",")
IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
})
//獲取出警信息
// val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
val arr: Array[String] = line.split(",")
PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
})
//定義側(cè)流
val ic = new OutputTag[IllegalCarInfo]("IllegalCarInfo")
val pi = new OutputTag[PoliceInfo]("PoliceInfo")
//以上違法記錄信息 與 交警出警信息 進(jìn)行關(guān)聯(lián)
val result: DataStream[String] = illegalDStream.keyBy(_.car).connect(policeDStream.keyBy(_.car))
.process(new KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String] {
//這里每個 key 都會對應(yīng)一個狀態(tài)
lazy private val illegalCarInfoState: ValueState[IllegalCarInfo] = getRuntimeContext.getState(new ValueStateDescriptor[IllegalCarInfo]("illegalCarInfoState", classOf[IllegalCarInfo]))
lazy private val policeInfoState: ValueState[PoliceInfo] = getRuntimeContext.getState(new ValueStateDescriptor[PoliceInfo]("policeInfoState", classOf[PoliceInfo]))
//先有違法信息
override def processElement1(value: IllegalCarInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
//獲取當(dāng)前車輛的出警信息
val policeInfo: PoliceInfo = policeInfoState.value()
if (policeInfo != null) { //說明有對應(yīng)的出警記錄,說明 當(dāng)前違法數(shù)據(jù)遲到了
//輸出結(jié)果
out.collect(s"違法車輛:${value.car} 已經(jīng)出警,警號:${policeInfo.policeId},事故時間:${value.eventTime},出警時間:${policeInfo.reporTime}")
//刪除出警狀態(tài)
policeInfoState.clear()
//刪除出警記錄定時器
ctx.timerService().deleteEventTimeTimer(policeInfo.reporTime + 10000)
} else { //沒有對應(yīng)的出警記錄
//進(jìn)來當(dāng)前車輛的違法信息后,放入狀態(tài)中
illegalCarInfoState.update(value)
//當(dāng)前車輛有了違法記錄就構(gòu)建定時器,定時器設(shè)置當(dāng)前時間時間后10s觸發(fā),除非10s內(nèi)刪除對應(yīng)的定時器就不會觸發(fā)
ctx.timerService().registerEventTimeTimer(value.eventTime + 10000) //這里方便演示設(shè)置定時器時長為10s
}
}
//后有出警狀態(tài),也有可能出警狀態(tài)先到
override def processElement2(value: PoliceInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()
if (illegalCarInfo != null) {
//對應(yīng)當(dāng)前車輛的違法記錄中有數(shù)據(jù),說明這個車輛有了對應(yīng)的出警記錄
println(s"這里打印就是測試是不是一個key有一個狀態(tài): 違法車輛中的狀態(tài)car 是 ${illegalCarInfo.car} ,出警記錄中的車輛是${value.car}")
//有對應(yīng)的出警記錄就正常輸出數(shù)據(jù)即可:
out.collect(s"違法車輛:${illegalCarInfo.car} 已經(jīng)出警,警號:${value.policeId},事故時間:${illegalCarInfo.eventTime},出警時間:${value.reporTime}")
//清空當(dāng)前車輛違法狀態(tài)
illegalCarInfoState.clear()
//刪除違法記錄定時器
ctx.timerService().deleteEventTimeTimer(illegalCarInfo.eventTime + 10000) //刪除定時器
} else { //有了出警記錄,但是沒有違法記錄
//這里有了出警狀態(tài),但是沒有發(fā)現(xiàn)當(dāng)前車輛違法記錄,說明 出警狀態(tài)數(shù)據(jù)早到了,違法記錄 遲到了
//針對這種情況,將出警記錄數(shù)據(jù)放入出警狀態(tài)中
policeInfoState.update(value)
//當(dāng)前車輛有了出警就構(gòu)建定時器,定時器設(shè)置當(dāng)前時間時間后10s觸發(fā),除非10s內(nèi)刪除對應(yīng)的定時器就不會觸發(fā)
ctx.timerService().registerEventTimeTimer(value.reporTime + 10000) //這里方便演示設(shè)置定時器時長為10s
}
}
//觸發(fā)定時器 定時器觸發(fā)后會調(diào)用onTimer 方法 ,timestamp : 觸發(fā)器觸發(fā)時間
override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#OnTimerContext, out: Collector[String]): Unit = {
//獲取 違法記錄信息狀態(tài)
val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()
//獲取 出警記錄信息狀態(tài)
val policeInfo: PoliceInfo = policeInfoState.value()
if (illegalCarInfo != null) {
//沒有出警記錄 ,輸出到側(cè)流
ctx.output(ic, illegalCarInfo)
}
if (policeInfo != null) { //沒有違法信息 ,輸出到側(cè)流
ctx.output(pi, policeInfo)
}
//清空以上兩種狀態(tài)
illegalCarInfoState.clear()
policeInfoState.clear()
}
})
result.print("正常流")
val illegalCarInfoDStream: DataStream[IllegalCarInfo] = result.getSideOutput(ic)
val policeInfoDStream: DataStream[PoliceInfo] = result.getSideOutput(pi)
illegalCarInfoDStream.print("沒有出警記錄,有違法記錄的信息:")
policeInfoDStream.print("有出警記錄,沒有違法記錄車輛信息:")
env.execute()
}
}
4.4 違法車輛軌跡跟蹤
城市交通中,有些車輛需要實時軌跡跟蹤,這些需要跟蹤軌跡的車輛,保存在城市違法表中:t_violation_list。系統(tǒng)需要實時打印這些車輛經(jīng)過的卡口,并且把軌跡數(shù)據(jù)插入數(shù)據(jù)表t_track_info(Hbase數(shù)據(jù)庫)中。根據(jù)前面所學(xué)的知識,我們應(yīng)該使用Flink中的廣播狀態(tài)完成該功能。
需要在hbase中創(chuàng)建表 t_track_info:create ‘t_track_info’,‘cf1’
清空hbase表命令:truncate ‘t_track_info’;
完整的代碼:
object RtCarTracker {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
//設(shè)置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","test7")
// val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))
val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
.map(line => {
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
override def extractTimestamp(element: TrafficLog): Long = element.actionTime
})
val mapStateDescriptor = new MapStateDescriptor[String, IllegalCarInfo]("MapStateDescriptor", classOf[String], classOf[IllegalCarInfo])
//獲取廣播流
val bcDstream: BroadcastStream[IllegalCarInfo] = env.addSource(new JdbcReadSource("IllegalCarInfo")).map(pojo=>{
pojo.asInstanceOf[IllegalCarInfo]
}).broadcast(mapStateDescriptor)
//連接兩個流
val result: DataStream[CarThroughMonitorInfo] = mainDStream.connect(bcDstream).process(new BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo] {
override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#ReadOnlyContext, out: Collector[CarThroughMonitorInfo]): Unit = {
val bcState: ReadOnlyBroadcastState[String, IllegalCarInfo] = ctx.getBroadcastState(mapStateDescriptor)
if (bcState.get(value.car) != null) {
out.collect(new CarThroughMonitorInfo(value.car, value.actionTime, value.monitorId, value.roadId, value.areaId))
}
}
override def processBroadcastElement(value: IllegalCarInfo, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#Context, out: Collector[CarThroughMonitorInfo]): Unit = {
ctx.getBroadcastState(mapStateDescriptor).put(value.car, value)
}
})
result.countWindowAll(20).process(new ProcessAllWindowFunction[CarThroughMonitorInfo,util.List[Put],GlobalWindow] {
override def process(context: Context, elements: Iterable[CarThroughMonitorInfo], out: Collector[util.List[Put]]): Unit = {
val list = new util.ArrayList[Put]()
for(elem<-elements){
val put = new Put(Bytes.toBytes(elem.car + "_" + elem.date))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("area_id"),Bytes.toBytes(elem.areaID))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("road_id"),Bytes.toBytes(elem.roadID))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("monitor_id"),Bytes.toBytes(elem.monitorId))
list.add(put)
}
out.collect(list)
}
}).addSink(new HBaseWriteSink())
env.execute()
}
}
HBaseSink:
class HBaseWriteSink extends RichSinkFunction[java.util.List[Put]]{
//打開HBase連接
var config :conf.Configuration = _
var conn :Connection = _
override def open(parameters: Configuration): Unit = {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum","mynode3:2181,mynode4:2181,mynode5:2181")
conn = ConnectionFactory.createConnection(config)
}
override def close(): Unit = {
conn.close()
}
override def invoke(value: java.util.List[Put], context: SinkFunction.Context[_]): Unit = {
//獲取HBase表,在HBase中執(zhí)行 : create 't_track_info','cf1'
val table: Table = conn.getTable(TableName.valueOf("t_track_info"))
table.put(value)
}
}
從HBase中讀取車輛軌跡api:
/**
* 從Hbase中掃描 rowkey 范圍 查詢數(shù)據(jù)
*/
object GetDataFromHBase {
def main(args: Array[String]): Unit = {
//獲取連接
val conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "mynode3:2181,mynode4:2181,mynode5:2181");
val conn = ConnectionFactory.createConnection(conf);
//獲取表
val table = conn.getTable(TableName.valueOf("t_track_info"));
//設(shè)置掃描 rowkey 范圍
val scan = new Scan("魯A65552_1602381959000".getBytes(),"魯A65552_1602382000000".getBytes())
//查詢獲取結(jié)果
val scanner: ResultScanner = table.getScanner(scan)
//獲取結(jié)果一條數(shù)據(jù)
var result :Result = scanner.next()
while(result != null){
val row: Array[Byte] = result.getRow
val cells: util.List[Cell] = result.listCells()
import scala.collection.JavaConverters._
for (cell <- cells.asScala) {
val rowKey: Array[Byte] = CellUtil.cloneRow(cell)
val family: Array[Byte] = CellUtil.cloneFamily(cell)
val qualifier: Array[Byte] = CellUtil.cloneQualifier(cell)
val value: Array[Byte] = CellUtil.cloneValue(cell)
println(s"rowKey:${Bytes.toString(row)},列族名稱為:${Bytes.toString(family)},列名稱為:${Bytes.toString(qualifier)},列值為:${Bytes.toString(value)}")
}
result = scanner.next()
}
}
}
5. 實時車輛布控
在交警部門的指揮中心應(yīng)該實時的知道整個城市的上路車輛情況,需要知道每個區(qū)一共有多少輛車?現(xiàn)在是否有大量的外地車進(jìn)入城市等等。本模塊主要是針對整個城市整體的實時車輛情況統(tǒng)計。
5.1 實時車輛分布情況
實時車輛分布情況,是指在一段時間內(nèi)(比如:10分鐘)整個城市中每個區(qū)分布多少量車。這里要注意車輛的去重,因為在10分鐘內(nèi)一定會有很多的車,經(jīng)過不同的卡口。這些車牌相同的車,我們只統(tǒng)計一次。其實就是根據(jù)車牌號去重。
代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-769342.html
object RTCarAnalysis1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//設(shè)置并行度
env.setParallelism(1)
//設(shè)置事件時間為當(dāng)前時間
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","test_group8")
//讀取Kafka中的數(shù)據(jù)
val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
.map(line => {
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
}).keyBy(_.areaId)
mainDStream.timeWindow(Time.minutes(1))
.process(new ProcessWindowFunction[TrafficLog,String,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[TrafficLog], out: Collector[String]): Unit = {
val set = scala.collection.mutable.Set[String]()
for(elem <- elements){
set.add(elem.car)
}
out.collect(s"開始時間:${context.window.getStart} - 結(jié)束時間:${context.window.getEnd},區(qū)域ID:${key},車輛總數(shù) = ${set.size}")
}
}).print()
env.execute()
}
}
5.2 布隆過濾器(Bloom Filter)
在上節(jié)的例子中,我們把所有數(shù)據(jù)的車牌號car都存在了窗口計算的狀態(tài)里,在窗口收集數(shù)據(jù)的過程中,狀態(tài)會不斷增大。一般情況下,只要不超出內(nèi)存的承受范圍,這種做法也沒什么問題;但如果我們遇到的數(shù)據(jù)量很大呢?
把所有數(shù)據(jù)暫存放到內(nèi)存里,顯然不是一個好注意。我們會想到,可以利用redis這種內(nèi)存級k-v數(shù)據(jù)庫,為我們做一個緩存。但如果我們遇到的情況非常極端,數(shù)據(jù)大到驚人呢?比如上千萬級,億級的卡口車輛數(shù)據(jù)呢?(假設(shè))要去重計算。
如果放到redis中,假設(shè)有6千萬車牌號(每個10-20字節(jié)左右的話)可能需要幾G的空間來存儲。當(dāng)然放到redis中,用集群進(jìn)行擴展也不是不可以,但明顯代價太大了。
一個更好的想法是,其實我們不需要完整地存車輛的信息,只要知道他在不在就行了。所以其實我們可以進(jìn)行壓縮處理,用一位(bit)就可以表示一個車輛的狀態(tài)。這個思想的具體實現(xiàn)就是布隆過濾器(Bloom Filter)。
布隆過濾器的原理:
本質(zhì)上布隆過濾器是一種數(shù)據(jù)結(jié)構(gòu),比較巧妙的概率型數(shù)據(jù)結(jié)構(gòu)(probabilistic data structure),特點是高效地插入和查詢,可以用來告訴你 “某樣?xùn)|西一定不存在或者可能存在”。
它本身是一個很長的二進(jìn)制向量,既然是二進(jìn)制的向量,那么顯而易見的,存放的不是0,就是1。相比于傳統(tǒng)的 List、Set、Map 等數(shù)據(jù)結(jié)構(gòu),它更高效、占用空間更少。我們的目標(biāo)就是,利用某種方法(一般是Hash函數(shù))把每個數(shù)據(jù),對應(yīng)到一個位圖的某一位上去;如果數(shù)據(jù)存在,那一位就是1,不存在則為0。
Bloom Filter是一種空間效率很高的隨機數(shù)據(jù)結(jié)構(gòu),它利用位數(shù)組很簡潔地表示一個集合,并能判斷一個元素是否屬于這個集合。Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬于某個集合時,有可能會把不屬于這個集合的元素誤認(rèn)為屬于這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應(yīng)用場合。而在能容忍低錯誤率的應(yīng)用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節(jié)省。
簡單的例子:
下面是一個簡單的 Bloom filter 結(jié)構(gòu),開始時集合內(nèi)沒有元素:
當(dāng)來了一個元素 a,進(jìn)行判斷,這里需要一個(或者多個)哈希函數(shù)然后二進(jìn)制運算(模運算),計算出對應(yīng)的比特位上為 0 ,即是 a 不在集合內(nèi),將 a 添加進(jìn)去:
之后的元素,要判斷是不是在集合內(nèi),也是同 a 一樣的方法,只有對元素哈希后對應(yīng)位置上都是 1 才認(rèn)為這個元素在集合內(nèi)(雖然這樣可能會誤判):
隨著元素的插入,Bloom filter 中修改的值變多,出現(xiàn)誤判的幾率也隨之變大,當(dāng)新來一個元素時,滿足其在集合內(nèi)的條件,即所有對應(yīng)位都是 1 ,這樣就可能有兩種情況,一是這個元素就在集合內(nèi),沒有發(fā)生誤判;還有一種情況就是發(fā)生誤判,出現(xiàn)了哈希碰撞,這個元素本不在集合內(nèi)。
本項目中可以采用google 提供的BoolmFilter進(jìn)行位圖計算和判斷:
BloomFilter.create(Funnels.stringFunnel(),100000),F(xiàn)unnels.stringFunnel()指的是將對什么類型的數(shù)據(jù)使用布隆過濾器。這里我們使每個區(qū)域都對應(yīng)一個布隆過濾器,位長度為100000,經(jīng)過測試,可以對100萬左右的數(shù)量進(jìn)行去重判斷,每個布隆過濾器可以認(rèn)為相當(dāng)于一個數(shù)組,大概占用空間為100K。
代碼如下:
object RTCarAnalysis2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//設(shè)置并行度
// env.setParallelism(1)
//設(shè)置事件時間為當(dāng)前時間
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","test_group9")
//讀取Kafka中的數(shù)據(jù)
val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
.map(line => {
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
}).keyBy(_.areaId)
//存儲 區(qū)域 - 車輛數(shù) map
val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()
mainDStream.timeWindow(Time.minutes(1))
.aggregate(
new AggregateFunction[TrafficLog,Long,Long] {
override def createAccumulator(): Long = 0L
override def add(value: TrafficLog, accumulator: Long): Long = {
//判斷前Map中是否包含 area_id
if(map.contains(value.areaId)){
//如果包含當(dāng)前區(qū)域,獲取當(dāng)前key對應(yīng)的數(shù)值,并判斷
// 車輛是否重復(fù),
val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
if(!bool){//如果不包含,就加1
//將當(dāng)前車輛設(shè)置到布隆過濾器中
map.get(value.areaId).get.put(value.car)
accumulator + 1L
}else{
accumulator
}
}else{
//如果不包含當(dāng)前 area_id,就設(shè)置map
map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
//將當(dāng)前車輛設(shè)置到布隆過濾器中
map.get(value.areaId).get.put(value.car)
//返回1
accumulator+ 1L
}
}
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
},
new WindowFunction[Long,String,String,TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
out.collect(s"窗口起始時間: ${window.getStart} -${window.getEnd} ,區(qū)域:${key},車輛總數(shù):${input.last}")
}
}
).print()
env.execute()
}
}
5.3 實時外地車分布情況
這個功能和前面的一樣,實時統(tǒng)計外地車在一段時間內(nèi),整個城市的分布情況,整個城市中每個區(qū)多少分布多少量外地車,即統(tǒng)計每個區(qū)域?qū)崟r外地車分布(每分鐘統(tǒng)計一次)文章來源:http://www.zghlxwxcb.cn/news/detail-769342.html
代碼如下:
object NonLocalCarAnalysis {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","grouptest10")
//設(shè)置 BloomFilter Map
val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()
env.addSource(new FlinkKafkaConsumer[String]("traffic-topic",new SimpleStringSchema(),props).setStartFromEarliest())
.map(line=>{
val arr: Array[String] = line.split(",")
TrafficLog(arr(0).toLong,arr(1),arr(2),arr(3),arr(4).toDouble,arr(5),arr(6))
}).filter(!_.car.startsWith("京"))
.keyBy(_.areaId)
.timeWindow(Time.minutes(1))
//apply 全量函數(shù) ,process:全量函數(shù),reduce 既有增量也有全量 ,aggregate 既有增量,也有全量
.aggregate(new AggregateFunction[TrafficLog,Long,Long] {
override def createAccumulator(): Long = 0L
override def add(value: TrafficLog, accumulator: Long): Long = {
//判斷當(dāng)前區(qū)域是否在map中
if(map.contains(value.areaId)){//包含當(dāng)前areaID
val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
if(bool){//布隆過濾器中包含當(dāng)前車輛數(shù)據(jù)
accumulator
}else{//布隆過濾器中不包含當(dāng)前車輛數(shù)據(jù)
map.get(value.areaId).get.put(value.car)
accumulator +1L
}
}else{
map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
map.get(value.areaId).get.put(value.car)
accumulator +1
}
}
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
},new WindowFunction[Long,String,String,TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
out.collect(s"起始時間段:${window.getStart} - ${window.getEnd},區(qū)域:${key},車輛數(shù):${input.last}")
}
}).print()
env.execute()
}
}
到了這里,關(guān)于Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!