国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)

這篇具有很好參考價值的文章主要介紹了Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

系列文章目錄

Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(上)
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)



4. 智能實時報警

本模塊主要負(fù)責(zé)城市交通管理中,可能存在違章或者違法非常嚴(yán)重的行為,系統(tǒng)可以自動實時報警??梢詫崿F(xiàn)億級數(shù)據(jù)在線分布式計算秒級反饋。滿足實戰(zhàn)的“實時”需要,爭分奪秒、聚力辦案。做的真正“零”延遲的報警和出警。主要功能包括:實時套牌分析,實時危險駕駛分析等。

4.1 實時套牌分析

當(dāng)某個卡口中出現(xiàn)一輛行駛的汽車,我們可以通過攝像頭識別車牌號,然后在10秒內(nèi),另外一個卡口(或者當(dāng)前卡口)也識別到了同樣車牌的車輛,那么很有可能這兩輛車之中有很大幾率存在套牌車,因為一般情況下不可能有車輛在10秒內(nèi)經(jīng)過兩個卡口。如果發(fā)現(xiàn)涉嫌套牌車,系統(tǒng)實時發(fā)出報警信息,同時這些存在套牌車嫌疑的車輛,寫入Mysql數(shù)據(jù)庫的結(jié)果表中,在后面的模塊中,可以對這些違法車輛進(jìn)行實時軌跡跟蹤。

本需求可以使用CEP編程,也可以使用狀態(tài)編程。我們采用狀態(tài)編程。

完整的代碼:

object RepatitionCarWarning {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //導(dǎo)入scala包
    import org.apache.flink.streaming.api.scala._

    //設(shè)置并行度
    env.setParallelism(1)

    //設(shè)置事件時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test4")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    val trafficDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val trafficDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    trafficDStream.keyBy(_.car).process(new KeyedProcessFunction[String,TrafficLog,RepatitionCarInfo] {
      lazy private val valueState: ValueState[TrafficLog] = getRuntimeContext.getState(new ValueStateDescriptor[TrafficLog]("valueState",classOf[TrafficLog]))

      override def processElement(value: TrafficLog, ctx: KeyedProcessFunction[String, TrafficLog, RepatitionCarInfo]#Context, out: Collector[RepatitionCarInfo]): Unit = {
        if(valueState.value() != null){//如果狀態(tài)中包含當(dāng)前車輛
          val log: TrafficLog = valueState.value()
          //同一車輛數(shù)據(jù),判斷兩次通過卡扣間隔時長
          var dur = (log.actionTime  - value.actionTime).abs
          if(dur < 10*1000){
            out.collect(new RepatitionCarInfo(value.car,"涉嫌套牌",System.currentTimeMillis(),
              s"該車輛連續(xù)兩次經(jīng)過的卡扣及對應(yīng)時間為:${log.monitorId} - ${log.actionTime} , ${value.monitorId} - ${value.actionTime} "))
          }
          //更新狀態(tài)數(shù)據(jù)
          if(log.actionTime < value.actionTime){
            valueState.update(value)
          }
        }else{ //狀態(tài)中不包含當(dāng)前車輛
          valueState.update(value)
        }
      }
    })
        .addSink(new JdbcWriteSink[RepatitionCarInfo]("RepatitionCarInfo"))

    env.execute()
  }
}

4.2 實時危險駕駛分析

在本項目中,危險駕駛是指在道路上駕駛機動車:追逐超速競駛。我們規(guī)定:如果一輛機動車在2分鐘內(nèi),超速通過卡口超過3次以上;而且每次超速的超過了規(guī)定速度的20%以上;這樣的機動車涉嫌危險駕駛。系統(tǒng)需要實時找出這些機動車,并報警,追蹤這些車輛的軌跡。注意:如果有些卡口沒有設(shè)置限速值,可以設(shè)置一個城市默認(rèn)限速。

這樣的需求在Flink也是有兩種解決思路,第一:狀態(tài)編程。第二:CEP編程。但是當(dāng)前的需求使用狀態(tài)編程過于復(fù)雜了。所以我們采用第二種。同時還要注意:Flume在采集數(shù)據(jù)的過程中出現(xiàn)了數(shù)據(jù)亂序問題,一般最長延遲5秒。

涉嫌危險駕駛的車輛信息保存到Mysql數(shù)據(jù)庫表(t_violation_list)中,以便后面的功能中統(tǒng)一追蹤這些車輛的軌跡。

注意:如果要設(shè)置水位線需要設(shè)置在兩個連接流連接之后。

完整的代碼:

case class newTrafficLog(actionTime:Long,monitorId:String,cameraId:String,car:String,speed:Double,roadId:String,areaId:String,monitorLimitSpeed:Int)

object DangerDriveCarWarning {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //設(shè)置并行度
    env.setParallelism(1)
    //導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._
    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test5")
    props.setProperty("key.serializer",classOf[StringDeserializer].getName)
    props.setProperty("value.serializer",classOf[StringDeserializer].getName)

    //設(shè)置事件時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //主流
//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
      val arr: Array[String] = line.split(",")
      TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
    })

    //廣播流,讀取mysql中的數(shù)據(jù),這里主要是讀取卡扣限速的數(shù)據(jù)
    val bcDStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo"))
     .map(
      one => {
        one.asInstanceOf[MonitorLimitSpeedInfo]
      })
      .broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)

    //將日志流與廣播流進(jìn)行整合,將道路卡扣限速信息與每條車輛運行日志數(shù)據(jù)結(jié)合
    val trafficAllInfoDStream: DataStream[newTrafficLog] = mainDStream.connect(bcDStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog] {
      //處理每個日志元素
      override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#ReadOnlyContext, out: Collector[newTrafficLog]): Unit = {
        //獲取狀態(tài)
        val mapState: ReadOnlyBroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
        //獲取當(dāng)前道路當(dāng)前卡扣 對應(yīng)的限速 ,如果沒有就設(shè)置限速為80
        var limitSpeed = 80
        if (mapState.contains(value.roadId + "_" + value.monitorId)) {
          limitSpeed = mapState.get(value.roadId + "_" + value.monitorId).speedLimit
        }
        out.collect(new newTrafficLog(value.actionTime, value.monitorId, value.cameraId, value.car, value.speed, value.roadId, value.areaId, limitSpeed))

      }

      //處理廣播元素
      override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#Context, out: Collector[newTrafficLog]): Unit = {
        //獲取狀態(tài)
        val mapState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
        //更新當(dāng)前道路當(dāng)前卡扣的限速數(shù)據(jù)
        mapState.put(value.roadId + "_" + value.monitorId, value)
        println("廣播狀態(tài)準(zhǔn)備就緒")
      }
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[newTrafficLog](Time.seconds(0)) {
      override def extractTimestamp(element: newTrafficLog): Long = element.actionTime
    })


    val keyDs: KeyedStream[newTrafficLog, String] = trafficAllInfoDStream.keyBy(_.car)//按照車輛分組


    //使用CEP編程定義模式, 在1分鐘內(nèi)連續(xù)3次超速20%通過道路卡扣的車輛
    val pattern:Pattern[newTrafficLog, newTrafficLog] =
        Pattern.begin[newTrafficLog]("start").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .followedBy("second").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .followedBy("third").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .within(Time.minutes(1))//注意:這里的時間指的是 各個時間之間的相差時間不超過1分鐘。時間采用的是事件時間

    val patternStream: PatternStream[newTrafficLog] = CEP.pattern(keyDs,pattern)
    val result: DataStream[DangerDriveCarInfo] = patternStream.select((map: Map[String, Iterable[newTrafficLog]]) => {
      val begin: newTrafficLog = map.get("start").get.last
      val second: newTrafficLog = map.get("second").get.last
      val third: newTrafficLog = map.get("third").get.last
      val builder = s"第一次通過卡扣${begin.monitorId},當(dāng)前限速:${begin.monitorLimitSpeed},通過的速度為:${begin.speed} |" +
        s"第二次通過卡扣${second.monitorId},當(dāng)前限速:${second.monitorLimitSpeed},通過的速度為:${second.speed}|" +
        s"第三次通過卡扣${third.monitorId},當(dāng)前限速:${third.monitorLimitSpeed},通過的速度為:${third.speed}"
      DangerDriveCarInfo(begin.car, "危險駕駛", System.currentTimeMillis(), builder.toString)
    })
//    result.print()
    result.addSink(new JdbcWriteSink[DangerDriveCarInfo]("DangerDriveCarInfo"))
    env.execute()
  }

}

4.3 出警分析

當(dāng)監(jiān)控到道路中有一起違法交通事故時,例如:車輛危險駕駛、車輛套牌、發(fā)生交通事故等,會有對應(yīng)的交警出警處理案情。違法事故實時數(shù)據(jù)會被實時監(jiān)控放入topicA,交通警察出警記錄會實時上報數(shù)據(jù)被放入topicB中,這里需要對違法交通事故的出警情況進(jìn)行分析并對超時未處理的警情作出對應(yīng)的預(yù)警。

出警分析如下:如果在topicA中出現(xiàn)一條違法車輛信息,如果在5分鐘內(nèi)已經(jīng)出警,將出警信息輸出到結(jié)果庫中。如果5分鐘內(nèi)沒有出警則發(fā)出出警提示。(發(fā)出出警的提示,在側(cè)流中發(fā)出)。
這里為了方便演示,將從socket中讀取數(shù)據(jù)。

(1)使用IntervalJoin實現(xiàn),這是只能輸出出警信息

object PoliceAnalysis1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._

    //設(shè)置事件時間
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props  = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test6")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //獲取監(jiān)控違法車輛信息
//    val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
    val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
      val arr: Array[String] = line.split(",")
      IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
      override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
    })

    //獲取出警信息
//    val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
    val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
      val arr: Array[String] = line.split(",")
      PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
      override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
    })

    //兩個流進(jìn)行 intervalJoin ,相對于join ,這里不需要設(shè)置窗口,必須后面跟上between 來以時間范圍大小進(jìn)行join
    illegalDStream.keyBy(_.car).intervalJoin(policeDStream.keyBy(_.car))
      //這里假設(shè) 違法信息 illegalDStream 先出現(xiàn),policeDStream數(shù)據(jù)流后出現(xiàn)
      //between(Time.seconds(10),Time.seconds(10))相當(dāng)于 illegalDStream.eventTime - 10s <= policeDStream.reporTime <= illegalDStream.eventTime + 10s
      //例如 illegalDStream.eventTime 為 20:05:30  可以與 policeDStream.reporTime 為 20:05:20 - 20:05:40 范圍內(nèi)的數(shù)據(jù)進(jìn)行匹配
      .between(Time.seconds(-10),Time.seconds(10))
      .process(new ProcessJoinFunction[IllegalCarInfo,PoliceInfo,String] {
        override def processElement(left: IllegalCarInfo, right: PoliceInfo, ctx: ProcessJoinFunction[IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
            out.collect(s"違法車輛:${left.car} 已經(jīng)出警,警號:${right.policeId},事故時間:${left.eventTime},出警時間:${right.reporTime}")
        }
      }).print()

    env.execute()
  }
}

(2)使用兩個流的connect,可以監(jiān)測事故超時出警信息

object PoliceAnalysis2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._

    //設(shè)置事件時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //設(shè)置并行度為1
    env.setParallelism(1)

    val props  = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test6")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //獲取監(jiān)控違法車輛信息
    //    val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
    val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
      val arr: Array[String] = line.split(",")
      IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
      override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
    })

    //獲取出警信息
    //    val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
    val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
      val arr: Array[String] = line.split(",")
      PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
      override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
    })

    //定義側(cè)流
    val ic = new OutputTag[IllegalCarInfo]("IllegalCarInfo")
    val pi = new OutputTag[PoliceInfo]("PoliceInfo")

    //以上違法記錄信息 與 交警出警信息 進(jìn)行關(guān)聯(lián)
    val result: DataStream[String] = illegalDStream.keyBy(_.car).connect(policeDStream.keyBy(_.car))
      .process(new KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String] {

        //這里每個 key 都會對應(yīng)一個狀態(tài)
        lazy private val illegalCarInfoState: ValueState[IllegalCarInfo] = getRuntimeContext.getState(new ValueStateDescriptor[IllegalCarInfo]("illegalCarInfoState", classOf[IllegalCarInfo]))
        lazy private val policeInfoState: ValueState[PoliceInfo] = getRuntimeContext.getState(new ValueStateDescriptor[PoliceInfo]("policeInfoState", classOf[PoliceInfo]))

        //先有違法信息
        override def processElement1(value: IllegalCarInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
          //獲取當(dāng)前車輛的出警信息
          val policeInfo: PoliceInfo = policeInfoState.value()
          if (policeInfo != null) { //說明有對應(yīng)的出警記錄,說明 當(dāng)前違法數(shù)據(jù)遲到了
            //輸出結(jié)果
            out.collect(s"違法車輛:${value.car} 已經(jīng)出警,警號:${policeInfo.policeId},事故時間:${value.eventTime},出警時間:${policeInfo.reporTime}")
            //刪除出警狀態(tài)
            policeInfoState.clear()
            //刪除出警記錄定時器
            ctx.timerService().deleteEventTimeTimer(policeInfo.reporTime + 10000)

          } else { //沒有對應(yīng)的出警記錄
            //進(jìn)來當(dāng)前車輛的違法信息后,放入狀態(tài)中
            illegalCarInfoState.update(value)
            //當(dāng)前車輛有了違法記錄就構(gòu)建定時器,定時器設(shè)置當(dāng)前時間時間后10s觸發(fā),除非10s內(nèi)刪除對應(yīng)的定時器就不會觸發(fā)
            ctx.timerService().registerEventTimeTimer(value.eventTime + 10000) //這里方便演示設(shè)置定時器時長為10s
          }
        }

        //后有出警狀態(tài),也有可能出警狀態(tài)先到
        override def processElement2(value: PoliceInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
          val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()
          if (illegalCarInfo != null) {
            //對應(yīng)當(dāng)前車輛的違法記錄中有數(shù)據(jù),說明這個車輛有了對應(yīng)的出警記錄
            println(s"這里打印就是測試是不是一個key有一個狀態(tài): 違法車輛中的狀態(tài)car 是 ${illegalCarInfo.car} ,出警記錄中的車輛是${value.car}")
            //有對應(yīng)的出警記錄就正常輸出數(shù)據(jù)即可:
            out.collect(s"違法車輛:${illegalCarInfo.car} 已經(jīng)出警,警號:${value.policeId},事故時間:${illegalCarInfo.eventTime},出警時間:${value.reporTime}")
            //清空當(dāng)前車輛違法狀態(tài)
            illegalCarInfoState.clear()
            //刪除違法記錄定時器
            ctx.timerService().deleteEventTimeTimer(illegalCarInfo.eventTime + 10000) //刪除定時器
          } else { //有了出警記錄,但是沒有違法記錄
            //這里有了出警狀態(tài),但是沒有發(fā)現(xiàn)當(dāng)前車輛違法記錄,說明 出警狀態(tài)數(shù)據(jù)早到了,違法記錄 遲到了
            //針對這種情況,將出警記錄數(shù)據(jù)放入出警狀態(tài)中
            policeInfoState.update(value)

            //當(dāng)前車輛有了出警就構(gòu)建定時器,定時器設(shè)置當(dāng)前時間時間后10s觸發(fā),除非10s內(nèi)刪除對應(yīng)的定時器就不會觸發(fā)
            ctx.timerService().registerEventTimeTimer(value.reporTime + 10000) //這里方便演示設(shè)置定時器時長為10s
          }
        }

        //觸發(fā)定時器 定時器觸發(fā)后會調(diào)用onTimer 方法 ,timestamp : 觸發(fā)器觸發(fā)時間
        override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#OnTimerContext, out: Collector[String]): Unit = {
          //獲取 違法記錄信息狀態(tài)
          val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()

          //獲取 出警記錄信息狀態(tài)
          val policeInfo: PoliceInfo = policeInfoState.value()

          if (illegalCarInfo != null) {
            //沒有出警記錄 ,輸出到側(cè)流
            ctx.output(ic, illegalCarInfo)
          }

          if (policeInfo != null) { //沒有違法信息  ,輸出到側(cè)流
            ctx.output(pi, policeInfo)
          }
          //清空以上兩種狀態(tài)
          illegalCarInfoState.clear()
          policeInfoState.clear()
        }
      })

    result.print("正常流")

    val illegalCarInfoDStream: DataStream[IllegalCarInfo] = result.getSideOutput(ic)
    val policeInfoDStream: DataStream[PoliceInfo] = result.getSideOutput(pi)

    illegalCarInfoDStream.print("沒有出警記錄,有違法記錄的信息:")
    policeInfoDStream.print("有出警記錄,沒有違法記錄車輛信息:")


    env.execute()

  }

}

4.4 違法車輛軌跡跟蹤

城市交通中,有些車輛需要實時軌跡跟蹤,這些需要跟蹤軌跡的車輛,保存在城市違法表中:t_violation_list。系統(tǒng)需要實時打印這些車輛經(jīng)過的卡口,并且把軌跡數(shù)據(jù)插入數(shù)據(jù)表t_track_info(Hbase數(shù)據(jù)庫)中。根據(jù)前面所學(xué)的知識,我們應(yīng)該使用Flink中的廣播狀態(tài)完成該功能。

需要在hbase中創(chuàng)建表 t_track_info:create ‘t_track_info’,‘cf1’
清空hbase表命令:truncate ‘t_track_info’;

完整的代碼:

object RtCarTracker {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  
    //導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._

    //設(shè)置事件時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test7")

//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))
    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    val mapStateDescriptor = new MapStateDescriptor[String, IllegalCarInfo]("MapStateDescriptor", classOf[String], classOf[IllegalCarInfo])
    //獲取廣播流
    val bcDstream: BroadcastStream[IllegalCarInfo] = env.addSource(new JdbcReadSource("IllegalCarInfo")).map(pojo=>{
      pojo.asInstanceOf[IllegalCarInfo]
    }).broadcast(mapStateDescriptor)

    //連接兩個流
    val result: DataStream[CarThroughMonitorInfo] = mainDStream.connect(bcDstream).process(new BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo] {
      override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#ReadOnlyContext, out: Collector[CarThroughMonitorInfo]): Unit = {
        val bcState: ReadOnlyBroadcastState[String, IllegalCarInfo] = ctx.getBroadcastState(mapStateDescriptor)
        if (bcState.get(value.car) != null) {
          out.collect(new CarThroughMonitorInfo(value.car, value.actionTime, value.monitorId, value.roadId, value.areaId))
        }
      }

      override def processBroadcastElement(value: IllegalCarInfo, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#Context, out: Collector[CarThroughMonitorInfo]): Unit = {
        ctx.getBroadcastState(mapStateDescriptor).put(value.car, value)
      }
    })
    result.countWindowAll(20).process(new ProcessAllWindowFunction[CarThroughMonitorInfo,util.List[Put],GlobalWindow] {
      override def process(context: Context, elements: Iterable[CarThroughMonitorInfo], out: Collector[util.List[Put]]): Unit = {
        val list = new util.ArrayList[Put]()
        for(elem<-elements){
          val put = new Put(Bytes.toBytes(elem.car + "_" + elem.date))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("area_id"),Bytes.toBytes(elem.areaID))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("road_id"),Bytes.toBytes(elem.roadID))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("monitor_id"),Bytes.toBytes(elem.monitorId))
          list.add(put)
        }
        out.collect(list)
      }
    }).addSink(new HBaseWriteSink())
    env.execute()
  }

}

HBaseSink:

class HBaseWriteSink extends RichSinkFunction[java.util.List[Put]]{
  //打開HBase連接
  var config :conf.Configuration = _
  var conn :Connection = _
  override def open(parameters: Configuration): Unit = {
    config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.quorum","mynode3:2181,mynode4:2181,mynode5:2181")
    conn = ConnectionFactory.createConnection(config)
  }

  override def close(): Unit = {
    conn.close()
  }

  override def invoke(value: java.util.List[Put], context: SinkFunction.Context[_]): Unit = {
    //獲取HBase表,在HBase中執(zhí)行 : create 't_track_info','cf1'
    val table: Table = conn.getTable(TableName.valueOf("t_track_info"))
    table.put(value)
  }
}

從HBase中讀取車輛軌跡api:

/**
  * 從Hbase中掃描 rowkey 范圍 查詢數(shù)據(jù)
  */
object GetDataFromHBase {
  def main(args: Array[String]): Unit = {
    //獲取連接
    val conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "mynode3:2181,mynode4:2181,mynode5:2181");
    val conn = ConnectionFactory.createConnection(conf);
    //獲取表
    val table = conn.getTable(TableName.valueOf("t_track_info"));


    //設(shè)置掃描 rowkey 范圍
    val scan = new Scan("魯A65552_1602381959000".getBytes(),"魯A65552_1602382000000".getBytes())

    //查詢獲取結(jié)果
    val scanner: ResultScanner = table.getScanner(scan)

    //獲取結(jié)果一條數(shù)據(jù)
    var result :Result = scanner.next()
    while(result != null){
      val row: Array[Byte] = result.getRow
      val cells: util.List[Cell] = result.listCells()
      import scala.collection.JavaConverters._
      for (cell <- cells.asScala) {
        val rowKey: Array[Byte] = CellUtil.cloneRow(cell)
        val family: Array[Byte] = CellUtil.cloneFamily(cell)
        val qualifier: Array[Byte] = CellUtil.cloneQualifier(cell)
        val value: Array[Byte] = CellUtil.cloneValue(cell)
        println(s"rowKey:${Bytes.toString(row)},列族名稱為:${Bytes.toString(family)},列名稱為:${Bytes.toString(qualifier)},列值為:${Bytes.toString(value)}")
      }
      result  = scanner.next()
    }
  }

}

5. 實時車輛布控

在交警部門的指揮中心應(yīng)該實時的知道整個城市的上路車輛情況,需要知道每個區(qū)一共有多少輛車?現(xiàn)在是否有大量的外地車進(jìn)入城市等等。本模塊主要是針對整個城市整體的實時車輛情況統(tǒng)計。

5.1 實時車輛分布情況

實時車輛分布情況,是指在一段時間內(nèi)(比如:10分鐘)整個城市中每個區(qū)分布多少量車。這里要注意車輛的去重,因為在10分鐘內(nèi)一定會有很多的車,經(jīng)過不同的卡口。這些車牌相同的車,我們只統(tǒng)計一次。其實就是根據(jù)車牌號去重。

代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-769342.html

object RTCarAnalysis1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    //設(shè)置并行度
    env.setParallelism(1)

    //設(shè)置事件時間為當(dāng)前時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test_group8")

    //讀取Kafka中的數(shù)據(jù)
    val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).keyBy(_.areaId)
    mainDStream.timeWindow(Time.minutes(1))
      .process(new ProcessWindowFunction[TrafficLog,String,String,TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[TrafficLog], out: Collector[String]): Unit = {
          val set = scala.collection.mutable.Set[String]()
          for(elem <- elements){
            set.add(elem.car)
          }
          out.collect(s"開始時間:${context.window.getStart} - 結(jié)束時間:${context.window.getEnd},區(qū)域ID:${key},車輛總數(shù) = ${set.size}")
        }
      }).print()

    env.execute()

  }

}

5.2 布隆過濾器(Bloom Filter)

在上節(jié)的例子中,我們把所有數(shù)據(jù)的車牌號car都存在了窗口計算的狀態(tài)里,在窗口收集數(shù)據(jù)的過程中,狀態(tài)會不斷增大。一般情況下,只要不超出內(nèi)存的承受范圍,這種做法也沒什么問題;但如果我們遇到的數(shù)據(jù)量很大呢?

把所有數(shù)據(jù)暫存放到內(nèi)存里,顯然不是一個好注意。我們會想到,可以利用redis這種內(nèi)存級k-v數(shù)據(jù)庫,為我們做一個緩存。但如果我們遇到的情況非常極端,數(shù)據(jù)大到驚人呢?比如上千萬級,億級的卡口車輛數(shù)據(jù)呢?(假設(shè))要去重計算。

如果放到redis中,假設(shè)有6千萬車牌號(每個10-20字節(jié)左右的話)可能需要幾G的空間來存儲。當(dāng)然放到redis中,用集群進(jìn)行擴展也不是不可以,但明顯代價太大了。

一個更好的想法是,其實我們不需要完整地存車輛的信息,只要知道他在不在就行了。所以其實我們可以進(jìn)行壓縮處理,用一位(bit)就可以表示一個車輛的狀態(tài)。這個思想的具體實現(xiàn)就是布隆過濾器(Bloom Filter)。

布隆過濾器的原理:
本質(zhì)上布隆過濾器是一種數(shù)據(jù)結(jié)構(gòu),比較巧妙的概率型數(shù)據(jù)結(jié)構(gòu)(probabilistic data structure),特點是高效地插入和查詢,可以用來告訴你 “某樣?xùn)|西一定不存在或者可能存在”。
它本身是一個很長的二進(jìn)制向量,既然是二進(jìn)制的向量,那么顯而易見的,存放的不是0,就是1。相比于傳統(tǒng)的 List、Set、Map 等數(shù)據(jù)結(jié)構(gòu),它更高效、占用空間更少。我們的目標(biāo)就是,利用某種方法(一般是Hash函數(shù))把每個數(shù)據(jù),對應(yīng)到一個位圖的某一位上去;如果數(shù)據(jù)存在,那一位就是1,不存在則為0。
Bloom Filter是一種空間效率很高的隨機數(shù)據(jù)結(jié)構(gòu),它利用位數(shù)組很簡潔地表示一個集合,并能判斷一個元素是否屬于這個集合。Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬于某個集合時,有可能會把不屬于這個集合的元素誤認(rèn)為屬于這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應(yīng)用場合。而在能容忍低錯誤率的應(yīng)用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節(jié)省。

簡單的例子:
下面是一個簡單的 Bloom filter 結(jié)構(gòu),開始時集合內(nèi)沒有元素:
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下),Flink,flink,大數(shù)據(jù),scala
當(dāng)來了一個元素 a,進(jìn)行判斷,這里需要一個(或者多個)哈希函數(shù)然后二進(jìn)制運算(模運算),計算出對應(yīng)的比特位上為 0 ,即是 a 不在集合內(nèi),將 a 添加進(jìn)去:
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下),Flink,flink,大數(shù)據(jù),scala
之后的元素,要判斷是不是在集合內(nèi),也是同 a 一樣的方法,只有對元素哈希后對應(yīng)位置上都是 1 才認(rèn)為這個元素在集合內(nèi)(雖然這樣可能會誤判):
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下),Flink,flink,大數(shù)據(jù),scala
隨著元素的插入,Bloom filter 中修改的值變多,出現(xiàn)誤判的幾率也隨之變大,當(dāng)新來一個元素時,滿足其在集合內(nèi)的條件,即所有對應(yīng)位都是 1 ,這樣就可能有兩種情況,一是這個元素就在集合內(nèi),沒有發(fā)生誤判;還有一種情況就是發(fā)生誤判,出現(xiàn)了哈希碰撞,這個元素本不在集合內(nèi)。
Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下),Flink,flink,大數(shù)據(jù),scala
本項目中可以采用google 提供的BoolmFilter進(jìn)行位圖計算和判斷:
BloomFilter.create(Funnels.stringFunnel(),100000),F(xiàn)unnels.stringFunnel()指的是將對什么類型的數(shù)據(jù)使用布隆過濾器。這里我們使每個區(qū)域都對應(yīng)一個布隆過濾器,位長度為100000,經(jīng)過測試,可以對100萬左右的數(shù)量進(jìn)行去重判斷,每個布隆過濾器可以認(rèn)為相當(dāng)于一個數(shù)組,大概占用空間為100K。

代碼如下:

object RTCarAnalysis2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    //設(shè)置并行度
//    env.setParallelism(1)

    //設(shè)置事件時間為當(dāng)前時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test_group9")

    //讀取Kafka中的數(shù)據(jù)
    val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).keyBy(_.areaId)

    //存儲 區(qū)域 - 車輛數(shù) map
    val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()

    mainDStream.timeWindow(Time.minutes(1))
      .aggregate(
        new AggregateFunction[TrafficLog,Long,Long] {
        override def createAccumulator(): Long = 0L

        override def add(value: TrafficLog, accumulator: Long): Long = {
          //判斷前Map中是否包含 area_id
          if(map.contains(value.areaId)){
            //如果包含當(dāng)前區(qū)域,獲取當(dāng)前key對應(yīng)的數(shù)值,并判斷
            // 車輛是否重復(fù),
            val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
            if(!bool){//如果不包含,就加1
              //將當(dāng)前車輛設(shè)置到布隆過濾器中
              map.get(value.areaId).get.put(value.car)
              accumulator + 1L
            }else{
              accumulator
            }
          }else{
            //如果不包含當(dāng)前 area_id,就設(shè)置map
            map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
            //將當(dāng)前車輛設(shè)置到布隆過濾器中
            map.get(value.areaId).get.put(value.car)
            //返回1
            accumulator+ 1L
          }
        }

        override def getResult(accumulator: Long): Long = accumulator

        override def merge(a: Long, b: Long): Long = a+b
      },
      new WindowFunction[Long,String,String,TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          out.collect(s"窗口起始時間: ${window.getStart} -${window.getEnd} ,區(qū)域:${key},車輛總數(shù):${input.last}")

        }
      }
      ).print()

    env.execute()


  }

}

5.3 實時外地車分布情況

這個功能和前面的一樣,實時統(tǒng)計外地車在一段時間內(nèi),整個城市的分布情況,整個城市中每個區(qū)多少分布多少量外地車,即統(tǒng)計每個區(qū)域?qū)崟r外地車分布(每分鐘統(tǒng)計一次)

代碼如下:

object NonLocalCarAnalysis {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","grouptest10")


    //設(shè)置 BloomFilter Map
    val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()

    env.addSource(new FlinkKafkaConsumer[String]("traffic-topic",new SimpleStringSchema(),props).setStartFromEarliest())
      .map(line=>{
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong,arr(1),arr(2),arr(3),arr(4).toDouble,arr(5),arr(6))

      }).filter(!_.car.startsWith("京"))
      .keyBy(_.areaId)
      .timeWindow(Time.minutes(1))
      //apply 全量函數(shù) ,process:全量函數(shù),reduce 既有增量也有全量 ,aggregate 既有增量,也有全量
      .aggregate(new AggregateFunction[TrafficLog,Long,Long] {
        override def createAccumulator(): Long = 0L

        override def add(value: TrafficLog, accumulator: Long): Long = {
          //判斷當(dāng)前區(qū)域是否在map中
          if(map.contains(value.areaId)){//包含當(dāng)前areaID
            val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
            if(bool){//布隆過濾器中包含當(dāng)前車輛數(shù)據(jù)
              accumulator
            }else{//布隆過濾器中不包含當(dāng)前車輛數(shù)據(jù)
              map.get(value.areaId).get.put(value.car)
              accumulator +1L
            }
          }else{
            map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
            map.get(value.areaId).get.put(value.car)
            accumulator +1
          }
        }

        override def getResult(accumulator: Long): Long = accumulator

        override def merge(a: Long, b: Long): Long = a+b
      },new WindowFunction[Long,String,String,TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          out.collect(s"起始時間段:${window.getStart} - ${window.getEnd},區(qū)域:${key},車輛數(shù):${input.last}")
        }
      }).print()
    env.execute()
  }

}

到了這里,關(guān)于Flink項目實戰(zhàn)篇 基于Flink的城市交通監(jiān)控平臺(下)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 基于Anylogic仿真系統(tǒng)的節(jié)假日城市軌道交通大客流優(yōu)化—以春熙路地鐵站為例

    基于Anylogic仿真系統(tǒng)的節(jié)假日城市軌道交通大客流優(yōu)化—以春熙路地鐵站為例

    目 錄 1緒論 1 1.1研究背景與意義 1 1.2國內(nèi)外研究現(xiàn)狀 1 1.2.1國外研究現(xiàn)狀 1 1.2.2國內(nèi)研究現(xiàn)狀 2 1.3研究內(nèi)容 2 2春熙路地鐵站交通特性分析 4 2.1車站布局分析 4 2.1.1春熙路站概況 4 2.1.2站廳層設(shè)施布局分析 4 2.1.3站臺層設(shè)施布局分析 5 2.2車站運營流程分析 6 2.2.1乘客進(jìn)出站流程 6

    2024年02月02日
    瀏覽(37)
  • 增強現(xiàn)實與交通:智能交通與未來城市

    隨著人口增長和城市發(fā)展,交通擁堵和環(huán)境污染已經(jīng)成為城市發(fā)展的主要問題之一。智能交通系統(tǒng)(ITS)是一種利用信息與通信技術(shù)來改善交通系統(tǒng)效率、安全和環(huán)境的方法。智能交通系統(tǒng)涉及到多個領(lǐng)域,包括交通信號燈控制、車輛定位、車輛通信、車輛安全系統(tǒng)等。增強

    2024年02月03日
    瀏覽(23)
  • 【人工智能】AI賦能城市交通 未來城市的驅(qū)動力

    【人工智能】AI賦能城市交通 未來城市的驅(qū)動力

    隨著城市化進(jìn)程的不斷加速,交通擁堵、環(huán)境污染等問題日益凸顯,人們對交通系統(tǒng)的效率和可持續(xù)性提出了更高的要求。在這樣的背景下,智能交通技術(shù)正成為改善城市交通的重要驅(qū)動力。本文將探討智能交通技術(shù)在解決城市交通挑戰(zhàn)方面的應(yīng)用和未來發(fā)展趨勢。 隨著城市

    2024年04月12日
    瀏覽(28)
  • “優(yōu)化”城市出行體驗——山海鯨智慧交通解決方案

    “優(yōu)化”城市出行體驗——山海鯨智慧交通解決方案

    隨著城市化進(jìn)程的不斷加速,城市交通問題也變得日益嚴(yán)重。為了改善城市交通體驗、提高出行效率以及減少交通擁堵和環(huán)境污染。 山海鯨可視化 打造城市智慧交通系列解決方案模板,解決方案以“ 數(shù)字 孿生 技術(shù) ”為核心,通過數(shù)據(jù)分析、人工智能和物聯(lián)網(wǎng)技術(shù)來優(yōu)化城

    2024年02月07日
    瀏覽(19)
  • 城市軌道交通供電系統(tǒng)研究(Matlab代碼實現(xiàn))

    城市軌道交通供電系統(tǒng)研究(Matlab代碼實現(xiàn))

    ???????? 歡迎來到本博客 ???????? ??博主優(yōu)勢: ?????? 博客內(nèi)容盡量做到思維縝密,邏輯清晰,為了方便讀者。 ?? 座右銘: 行百里者,半于九十。 ?????? 本文目錄如下: ?????? 目錄 ??1 概述 ??2 運行結(jié)果 ??3?參考文獻(xiàn) ??4 Matlab代碼實現(xiàn) 城市軌道

    2023年04月23日
    瀏覽(29)
  • 從車聯(lián)網(wǎng)到智慧城市:智慧交通的革新之路

    從車聯(lián)網(wǎng)到智慧城市:智慧交通的革新之路

    1、智慧城市的概念和發(fā)展背景 智慧城市(Smart City)是指以信息技術(shù)為基礎(chǔ),運用信息與通信等手段,對城市各個核心系統(tǒng)各項關(guān)鍵數(shù)據(jù)進(jìn)行感測、分析、整合和利用,實現(xiàn)對城市生活環(huán)境的感知、資源的調(diào)控,使城市內(nèi)部的各部分和諧搭配、運作便捷,從而實現(xiàn)物與物、物

    2024年01月17日
    瀏覽(17)
  • 城市安全守護(hù)者:分析無人機在交通領(lǐng)域的應(yīng)用

    城市安全守護(hù)者:分析無人機在交通領(lǐng)域的應(yīng)用

    隨著科技的進(jìn)步,無人機在交通領(lǐng)域的應(yīng)用不斷增加,為智慧交通管理提供了新便利。無人機憑借其靈活性,在違章取證、交通事故偵查、交通疏導(dǎo)等方面展現(xiàn)出巨大的應(yīng)用潛力。無人機在交通領(lǐng)域的應(yīng)用有哪些?跟著我們一探究竟。 1、違章取證與實時監(jiān)控 在傳統(tǒng)監(jiān)控?zé)o法

    2024年02月03日
    瀏覽(25)
  • AI智能助力EasyCVR城市綜合交通管理系統(tǒng)一體化

    AI智能助力EasyCVR城市綜合交通管理系統(tǒng)一體化

    隨著春節(jié)的臨近,春運工作也將進(jìn)入忙碌期。高速公路因為大批車輛的流動、惡劣天氣和自然災(zāi)害,極易發(fā)生交通事故導(dǎo)致道路癱瘓,影響春運安全和暢通。為最大限度地保障春運期間的道路安全和暢通,避免或減少道路事故發(fā)生,推動高速公路春運工作的平穩(wěn)運行,有效地

    2024年02月19日
    瀏覽(23)
  • S281 LoRa網(wǎng)關(guān)助力智慧城市建設(shè)的智能交通管理

    S281 LoRa網(wǎng)關(guān)助力智慧城市建設(shè)的智能交通管理

    S281 LoRa網(wǎng)關(guān)作為智慧城市建設(shè)中的重要組成部分,發(fā)揮著關(guān)鍵的作用,特別是在智能交通管理方面。通過連接各類傳感器設(shè)備和物聯(lián)網(wǎng)終端,S281 LoRa網(wǎng)關(guān)實現(xiàn)了對城市交通系統(tǒng)的遠(yuǎn)程監(jiān)控、智能調(diào)度和信息化管理,為城市交通管理部門提供了全新的解決方案,提高了交通運行

    2024年02月22日
    瀏覽(25)
  • 參展第六屆中國城市軌道交通智慧運維大會 | 圖撲軟件

    參展第六屆中國城市軌道交通智慧運維大會 | 圖撲軟件

    2022(第六屆)中國城市軌道交通智慧運維大會在西安順利舉行。此次大會由現(xiàn)代軌道交通網(wǎng)聯(lián)合中國機械工程學(xué)會設(shè)備智能運維分會主辦,西安市軌道交通集團(tuán)有限公司運營分公司、軌道交通工程信息化國家重點實驗室(中鐵一院)協(xié)辦。來自行業(yè)學(xué)會、地鐵運營單位、設(shè)計院

    2023年04月23日
    瀏覽(96)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包