国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)職業(yè)技能大賽樣題(數(shù)據(jù)采集與實(shí)時計(jì)算:使用Flink處理Kafka中的數(shù)據(jù))

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)職業(yè)技能大賽樣題(數(shù)據(jù)采集與實(shí)時計(jì)算:使用Flink處理Kafka中的數(shù)據(jù))。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

? ? ? ?編寫Scala代碼,使用Flink消費(fèi)Kafka中Topic為order的數(shù)據(jù)并進(jìn)行相應(yīng)的數(shù)據(jù)統(tǒng)計(jì)計(jì)算(訂單信息對應(yīng)表結(jié)構(gòu)order_info,訂單詳細(xì)信息對應(yīng)表結(jié)構(gòu)order_detail(來源類型和來源編號這兩個字段不考慮,所以在實(shí)時數(shù)據(jù)中不會出現(xiàn)),同時計(jì)算中使用order_info或order_detail表中create_time或operate_time取兩者中值較大者作為EventTime,若operate_time為空值或無此列,則使用create_time填充,允許數(shù)據(jù)延遲5s,訂單狀態(tài)分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。另外對于數(shù)據(jù)結(jié)果展示時,不要采用例如:1.9786518E7的科學(xué)計(jì)數(shù)法)。文章來源地址http://www.zghlxwxcb.cn/news/detail-842946.html

  1. 使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
  2. 在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為退回完成, 將key設(shè)置成totalrefundordercount存入Redis中,value存放用戶退款消費(fèi)額。使用redis cli以get key方式獲取totalrefundordercount值,將結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
  3. 在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為取消訂單,將數(shù)據(jù)存入MySQL數(shù)據(jù)庫shtd_result的order_info表中,然后在Linux的MySQL命令行中根據(jù)id降序排序,查詢列id、consignee、consignee_tel、final_total_amount、feight_fee,查詢出前5條,將SQL語句復(fù)制粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,將執(zhí)行結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下。

使用Flink處理Kafka中的數(shù)據(jù)

package module_d

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.{DecimalFormat, SimpleDateFormat}
import java.time.Duration
import java.util.Properties

/**
 * 編寫Scala代碼,使用Flink消費(fèi)Kafka中Topic為order的數(shù)據(jù)并進(jìn)行相應(yīng)的數(shù)據(jù)統(tǒng)計(jì)計(jì)算(訂單信息對應(yīng)表結(jié)構(gòu)order_info,訂單詳細(xì)信息對應(yīng)表結(jié)構(gòu)order_detail(來源類型和來源編號這兩個字段不考慮,所以在實(shí)時數(shù)據(jù)中不會出現(xiàn)),同時計(jì)算中使用order_info或order_detail表中create_time或operate_time取兩者中值較大者作為EventTime,若operate_time為空值或無此屬性,則使用create_time填充,允許數(shù)據(jù)延遲5S,訂單狀態(tài)分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。另外對于數(shù)據(jù)結(jié)果展示時,不要采用例如:1.9786518E7的科學(xué)計(jì)數(shù)法)。
 */
object task1 {
  /**
   * 一個流分成四個流
   */
  lazy val statusother: OutputTag[String] = new OutputTag[String]("other")
  lazy val status1003: OutputTag[String] = new OutputTag[String]("s1003")
  lazy val status1005: OutputTag[String] = new OutputTag[String]("s1005")
  lazy val status1006: OutputTag[String] = new OutputTag[String]("s1006")

  def main(args: Array[String]): Unit = {
    /**
     * 1、使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
     */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //并行度


    //Kafka配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "ngc:9092") //集群地址
    properties.setProperty("group.id", "g1") //消費(fèi)者組

    //原始流
    val stream = env.addSource(new FlinkKafkaConsumer[String]("order1", new SimpleStringSchema(), properties).setStartFromLatest())
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5))//允許數(shù)據(jù)延遲5S
        .withTimestampAssigner(
          new SerializableTimestampAssigner[String] {
            override def extractTimestamp(t: String, l: Long): Long = {
              val sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
              if (t.split(",")(11).equals("")) { //如果operate_time為空
                sdf.parse(t.split(",")(10)).getTime
              } else {
                val create_time = sdf.parse(t.split(",")(10)).getTime
                val operate_time = sdf.parse(t.split(",")(11)).getTime
                math.max(create_time, operate_time)
              }
            }
          }
        ))
    //設(shè)置自定義側(cè)邊流
    val streamProcess = stream.process(new MdSplitProcessFunction)
    /**
     * 1、使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、
     * 退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis
     * cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖
     * 間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
     */
    val ds1 = streamProcess
      .getSideOutput(statusother)
      .map(line => line.split(",")(3).toDouble)
      .keyBy(_ => true) //聚合到一起
      .sum(0)
      .map(n=>new DecimalFormat("#.#").format(n))
    //redis配置
    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("ngc")
      .setPort(6378)
      .setPassword("123456")
      .build()
    ds1.addSink(new RedisSink[String](conf, new MyRedisMapper("totalcount")))
    /**
     * 2、在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為退回完成, 將key設(shè)置成totalrefundordercount存入Redis中,value存放用戶退款消費(fèi)額。使用redis cli以get key方式獲取totalrefundordercount值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
     */
    val ds2 = streamProcess
      .getSideOutput(status1006)
      .map(line => line.split(",")(3).toDouble)
      .keyBy(_ => true) //聚合到一起
      .sum(0)
      .map(n=>new DecimalFormat("#.#").format(n))
    ds2.addSink(new RedisSink[String](conf, new MyRedisMapper("totalrefundordercount")))

    /**
     * 3、在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為取消訂單,將數(shù)據(jù)存入MySQL數(shù)據(jù)庫shtd_result的order_info表中,然后在Linux的MySQL命令行中根據(jù)id降序排序,查詢列id、consignee、consignee_tel、final_total_amount、feight_fee,查詢出前5條,將SQL語句與執(zhí)行結(jié)果截圖粘貼至對應(yīng)報告中。
     */
    val ds3 = streamProcess
      .getSideOutput(status1003)
 
    ds3.addSink(new RichSinkFunction[String] {
      var conn: Connection = _
      var insertStmt: PreparedStatement = _

      override def open(parameters: Configuration): Unit =  {
        conn = DriverManager.getConnection("jdbc:mysql://ngc:3307/shtd_result?useSSL=false", "root", "123456")
        insertStmt = conn.prepareStatement("insert into order_info (id,consignee,consignee_tel,final_total_amount,feight_fee) values (?,?,?,?,?)")
      }

      override def close(): Unit = {
        insertStmt.close()
        conn.close()
      }

      override def invoke(value: String, context: SinkFunction.Context): Unit = {
        val arr = value.split(",")
        insertStmt.setString(1, arr(0))
        insertStmt.setString(2, arr(1))
        insertStmt.setString(3, arr(2))
        insertStmt.setString(4, arr(3))
        insertStmt.setString(5, arr(19))
        insertStmt.execute()
      }
    })

    ds1.print()
    ds2.print()
    env.execute("kafka sink test")
  }


  /**
   * 自定義側(cè)邊流配置
   */
  class MdSplitProcessFunction extends ProcessFunction[String, String] {
    override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
      val line = value.split(",")

      /**
       * 訂單狀態(tài)order_status分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。
       */
      if (line(4).equals("1003")) {
        ctx.output(status1003, value)
      } else if (line(4).equals("1005")) {
        ctx.output(status1005, value)
      } else if (line(4).equals("1006")) {
        ctx.output(status1006, value)
      } else {
        ctx.output(statusother, value)
      }

    }


  }

  /**
   * Redis key——value存儲 也可用RichSinkFunction建立Redis
   */
  class MyRedisMapper(key: String) extends RedisMapper[String] {

    override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)

    override def getValueFromData(data: String): String = data

    override def getKeyFromData(data: String): String = key
  }


}

到了這里,關(guān)于大數(shù)據(jù)職業(yè)技能大賽樣題(數(shù)據(jù)采集與實(shí)時計(jì)算:使用Flink處理Kafka中的數(shù)據(jù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 河北省2023年職業(yè)院校(中職組)技能大賽“網(wǎng)絡(luò)搭建與應(yīng)用”賽項(xiàng)競賽樣題

    河北省2023年職業(yè)院校(中職組)技能大賽“網(wǎng)絡(luò)搭建與應(yīng)用”賽項(xiàng)競賽樣題

    2023 年河北省職業(yè)院校技能大賽 “網(wǎng)絡(luò)搭建與應(yīng)用”賽項(xiàng) 競賽(樣題) “網(wǎng)絡(luò)搭建與應(yīng)用”賽項(xiàng)競賽共分三個部分,其中: 第一部分:網(wǎng)絡(luò)搭建及安全部署項(xiàng)目(500 分) 第二部分:服務(wù)器配置及應(yīng)用項(xiàng)目(480 分)第三部分:職業(yè)規(guī)范與素養(yǎng)(20 分) 1. 禁止攜帶和使用移動存

    2024年02月07日
    瀏覽(19)
  • 全國職業(yè)院校技能大賽-大數(shù)據(jù) 離線數(shù)據(jù)處理模塊-數(shù)據(jù)清洗

    子任務(wù)2:數(shù)據(jù)清洗 ????????編寫Hive SQL代碼,將ods庫中相應(yīng)表數(shù)據(jù)全量抽取到Hive的dwd庫中對應(yīng)表中。表中有涉及到timestamp類型的,均要求按照yyyy-MM-dd HH:mm:ss,不記錄毫秒數(shù),若原數(shù)據(jù)中只有年月日,則在時分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。 抽取

    2024年02月02日
    瀏覽(28)
  • 全國職業(yè)院校技能大賽-大數(shù)據(jù) 離線數(shù)據(jù)處理模塊-指標(biāo)計(jì)算

    賽題來源2023年全國職業(yè)院校技能大賽賽題第1套任務(wù)B中指標(biāo)計(jì)算模塊 編寫Scala代碼,使用Spark計(jì)算相關(guān)指標(biāo)。 注:在指標(biāo)計(jì)算中,不考慮訂單信息表中order_status字段的值,將所有訂單視為有效訂單。計(jì)算訂單金額或訂單總金額時只使用final_total_amount字段。需注意dwd所有的維表

    2024年02月01日
    瀏覽(22)
  • 2023年江西省職業(yè)院校技能競賽“網(wǎng)絡(luò)安全”賽項(xiàng)樣題

    2023年江西省職業(yè)院校技能競賽“網(wǎng)絡(luò)安全”賽項(xiàng)樣題

    二、競賽注意事項(xiàng) 1.競賽期間禁止攜帶和使用移動存儲設(shè)備、計(jì)算器、通信工具及 參考資料。 2.請根據(jù)大賽所提供的競賽環(huán)境,檢查所列的硬件設(shè)備、軟件清 單、材料清單是否齊全,計(jì)算機(jī)設(shè)備是否能正常使用。 3.在進(jìn)行任何操作之前,請閱讀每個部分的所有任務(wù)。各任務(wù)

    2024年02月05日
    瀏覽(19)
  • 2023年全國職業(yè)院校技能大賽-大數(shù)據(jù)應(yīng)用開發(fā)-數(shù)據(jù)可視化

    2023年全國職業(yè)院校技能大賽-大數(shù)據(jù)應(yīng)用開發(fā)-數(shù)據(jù)可視化

    ? ? ? ? 可視化題目與以往相同,做法類似,我這里展示得到語句后處理優(yōu)化以后的代碼,以函數(shù)式來寫可視化,比以前400-500多行代碼簡潔到100多行。其他題目見本欄目,那里面的代碼都是沒有優(yōu)化后的,這次主要以效率和精簡給大家提供更多的思路。 ????????我們得到

    2024年02月04日
    瀏覽(30)
  • 云計(jì)算職業(yè)技能大賽組件介紹(一)

    云計(jì)算職業(yè)技能大賽組件介紹(一)

    上文我們準(zhǔn)備好了一個實(shí)驗(yàn)平臺,我們了解了該如何搭建開源平臺open stack,在此基礎(chǔ)上,我們該理論的,系統(tǒng)的,詳細(xì)的了解一下open stack的各個組件的作用和原理。 官方的解釋是:OpenStack是一個云操作系統(tǒng),通過數(shù)據(jù)中心可控制大型的計(jì)算、存儲、網(wǎng)絡(luò)等資源池。所有的管

    2024年02月21日
    瀏覽(19)
  • 云計(jì)算職業(yè)技能大賽私有云搭建部分

    需要的可聯(lián)系,可提供相關(guān)的軟件包和平臺供測試 數(shù)據(jù)庫安裝與調(diào)優(yōu) 在 controller 節(jié)點(diǎn)上使用 iaas-install-mysql.sh 腳本安裝 Mariadb 、 Memcached 、 RabbitMQ 等服務(wù)。安裝服務(wù)完畢后, 修改 /etc/my.cnf 文件,完成下列要求: 1.設(shè)置數(shù)據(jù)庫支持大小寫; 2.設(shè)置數(shù)據(jù)庫緩存 innodb 表的索引,數(shù)

    2024年04月27日
    瀏覽(20)
  • 23年云計(jì)算全國職業(yè)技能大賽-私有云

    1.1.1 基礎(chǔ)環(huán)境配置[0.2 分] 1.控制節(jié)點(diǎn)主機(jī)名為 controller,設(shè)置計(jì)算節(jié)點(diǎn)主機(jī)名為 compute; 2.hosts 文件將 IP 地址映射為主機(jī)名。 使用提供的用戶名密碼,登錄提供的 OpenStack 私有云平臺,在當(dāng)前租戶下, 使用 CentOS7.9鏡像,創(chuàng)建兩臺云主機(jī),云主機(jī)類型使用 4vCPU/12G/100G_50G 類型。

    2024年02月08日
    瀏覽(22)
  • 【全國職業(yè)院校技能大賽云計(jì)算賽項(xiàng)】

    【全國職業(yè)院校技能大賽云計(jì)算賽項(xiàng)】

    題目: skywalking 服務(wù)部署與應(yīng)用:?使用提供的 OpenStack 私有云平臺,申請一臺 centos7.9 系統(tǒng)的云主機(jī),使用提供的軟 件包安裝 Elasticsearch 服務(wù)和 skywalking 服務(wù),將 skywalking 的 UI 訪問端口修改為 8888。 接下來再申請一臺CentOS7.9的云主機(jī),用于搭建gpmall商城應(yīng)用,并配置SkyWalk

    2024年01月20日
    瀏覽(29)
  • 23云計(jì)算全國職業(yè)技能大賽容器云-容器編排

    編寫 Dockerfile 文件構(gòu)建 mysql 鏡像,要求基于 centos 完成 MariaDB 數(shù)據(jù)庫的安裝和配置,并設(shè)置服務(wù)開機(jī)自啟。編寫 Dockerfile 構(gòu)建鏡像 erp-mysql:v1.0,要求使用 centos7.9.2009 鏡像作為基鏡像,完成 MariaDB 數(shù)據(jù)庫的安裝,設(shè)置 root 用戶的密碼為 tshoperp,新建數(shù)據(jù)庫 jsh_erp 并導(dǎo)入數(shù)據(jù)庫文

    2024年02月08日
    瀏覽(46)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包