国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

spark DStream從不同數(shù)據(jù)源采集數(shù)據(jù)(RDD 隊列、文件、diy 采集器、kafka)(scala 編程)

這篇具有很好參考價值的文章主要介紹了spark DStream從不同數(shù)據(jù)源采集數(shù)據(jù)(RDD 隊列、文件、diy 采集器、kafka)(scala 編程)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

1. RDD隊列

2 textFileStream

3 DIY采集器

4 kafka數(shù)據(jù)源【重點】


1. RDD隊列

? ? ? ?a、使用場景:測試
? ? ? ?b、實現(xiàn)方式: 通過ssc.queueStream(queueOfRDDs)創(chuàng)建DStream,每一個推送這個隊列的RDD,都會作為一個DStream處理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
    val ssc = new StreamingContext(sparkconf,Seconds(3))

    // 創(chuàng)建一個隊列對象,隊列中存放的是RDD
    val queue = new mutable.Queue[RDD[String]]()
    // 通過隊列創(chuàng)建DStream
    val queueDS: InputDStream[String] = ssc.queueStream(queue)

    queueDS.print()

    // 啟動采集器
    ssc.start()
       //這個操作之所以放在這個位置,是為了模擬流式的感覺,數(shù)據(jù)源源不斷的生產(chǎn)
       for(i <- 1 to 5 ){
          // 循環(huán)創(chuàng)建rdd
          val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
          // 將RDD存放到隊列中
          queue.enqueue(rdd)
          // 當(dāng)前線程休眠1秒
          Thread.sleep(6000)         
       }
        // 等待采集器的結(jié)束
    ssc.awaitTermination()
    }

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //從文件中讀取數(shù)據(jù)
    val textDS: DStream[String] = ssc.textFileStream("in")
    textDS.print()
  
    // 啟動采集器
    ssc.start()

    // 等待采集器的結(jié)束
    ssc.awaitTermination()

3 DIY采集器

? ? 1. 自定義采集器
? ? 2. 什么情況下需要自定采集器呢?
? ? ? ? ?比如從mysql、hbase中讀取數(shù)據(jù)。
? ? ? ? ?采集器的作用是從指定的地方,按照采集周期對數(shù)據(jù)進行采集。
? ? ? ? ?目前有:采集kafka、采集netcat工具的指定端口的數(shù)據(jù)、采集文件目錄中的數(shù)據(jù)等
? ? 3. 自定義采集器的步驟,模仿socketTextStream
? ? ? ? ?a、自定采集器類,繼承extends,并指定數(shù)據(jù)泛型,同時對父類的屬性賦值,指定數(shù)據(jù)存儲的級別
? ? ? ? ?b、重寫onStart和onStop方法
? ? ? ? ? ? onStart:采集器的如何啟動
? ? ? ? ? ? onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 獲取采集的流
    val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
    ds.print()

    ssc.start()
    ssc.awaitTermination()
  }

  // 繼承extends Reciver,并指定數(shù)據(jù)泛型,同時對父類的屬性賦值,指定數(shù)據(jù)存儲的級別
  class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    private var socket: Socket = _
    def receive = {
     // 獲取輸入流
      val reader = new BufferedReader(
        new InputStreamReader(
          socket.getInputStream,
          "UTF-8"
        )
      )
      // 設(shè)定一個間接變量
      var s: String = null
      while (true) {
        // 按行讀取數(shù)據(jù)
        s = reader.readLine()
        if (s != null) {
      // 將數(shù)據(jù)進行封裝
          store(s)
        }
      }

    }

    // 1. 啟動采集器
    override def onStart(): Unit = {
      socket = new Socket(host, port)
      new Thread("Socket Receiver") {
        setDaemon(true)
        override def run() {
          receive
        }
      }.start()


    }

    // 2. 停止采集器
    override def onStop(): Unit = {
      socket.close()
      socket = null


    }
  }

4 kafka數(shù)據(jù)源【重點】

-- DirectAPI:是由計算的Executor來主動消費Kafka的數(shù)據(jù),速度由自身控制。
-- 配置信息基本上是固定寫法

 // TODO Spark環(huán)境
    // SparkStreaming使用核數(shù)最少是2個
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // TODO 使用SparkStreaming讀取Kafka的數(shù)據(jù)

    // Kafka的配置信息
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
      )
    // 獲取數(shù)據(jù),key是null,value是真實的數(shù)據(jù)
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
   
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    // 等待采集器的結(jié)束
    ssc.awaitTermination()

?文章來源地址http://www.zghlxwxcb.cn/news/detail-723305.html

到了這里,關(guān)于spark DStream從不同數(shù)據(jù)源采集數(shù)據(jù)(RDD 隊列、文件、diy 采集器、kafka)(scala 編程)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Spark大數(shù)據(jù)處理講課筆記4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    Spark大數(shù)據(jù)處理講課筆記4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    ? 目錄 零、本講學(xué)習(xí)目標(biāo) 一、基本操作 二、默認(rèn)數(shù)據(jù)源 (一)默認(rèn)數(shù)據(jù)源Parquet (二)案例演示讀取Parquet文件 1、在Spark Shell中演示 2、通過Scala程序演示 三、手動指定數(shù)據(jù)源 (一)format()與option()方法概述 (二)案例演示讀取不同數(shù)據(jù)源 1、讀取房源csv文件 2、讀取json,保

    2024年02月09日
    瀏覽(26)
  • Spark SQL數(shù)據(jù)源的基本操作

    Spark SQL數(shù)據(jù)源的基本操作

    Spark SQL提供了兩個常用的加載數(shù)據(jù)和寫入數(shù)據(jù)的方法:load()方法和save()方法。load()方法可以加載外部數(shù)據(jù)源為一個DataFrame,save()方法可以將一個DataFrame寫入指定的數(shù)據(jù)源。 默認(rèn)情況下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二進制方式存儲數(shù)據(jù)的,因此

    2024年02月09日
    瀏覽(26)
  • 4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    4.2 Spark SQL數(shù)據(jù)源 - 基本操作

    案例演示讀取Parquet文件 查看Spark的樣例數(shù)據(jù)文件users.parquet 1、在Spark Shell中演示 啟動Spark Shell 查看數(shù)據(jù)幀內(nèi)容 查看數(shù)據(jù)幀模式 對數(shù)據(jù)幀指定列進行查詢,查詢結(jié)果依然是數(shù)據(jù)幀,然后通過write成員的save()方法寫入HDFS指定目錄 查看HDFS上的輸出結(jié)果 執(zhí)行SQL查詢 查看HDFS上的輸

    2024年02月08日
    瀏覽(28)
  • spring boot下基于spring data jpa配置mysql+達夢多數(shù)據(jù)源(以不同包路徑方式,mysql為主數(shù)據(jù)源)

    :mysql 達夢/dameng jpa 多數(shù)據(jù)源 spring boot:2.1.17.RELEASE mysql驅(qū)動:8.0.21(跟隨boot版本) 達夢驅(qū)動:8.1.2.192 lombok:1.18.12(跟隨boot版本) 以mysql為主數(shù)據(jù)源,達夢為第二數(shù)據(jù)源方式配置 適用于舊項目二次開發(fā)接入達夢數(shù)據(jù)庫或基于通用二方/三方包做業(yè)務(wù)擴展等場景 將以不

    2024年02月05日
    瀏覽(59)
  • 一百八十二、大數(shù)據(jù)離線數(shù)倉完整流程——步驟一、用Kettle從Kafka、MySQL等數(shù)據(jù)源采集數(shù)據(jù)然后寫入HDFS

    一百八十二、大數(shù)據(jù)離線數(shù)倉完整流程——步驟一、用Kettle從Kafka、MySQL等數(shù)據(jù)源采集數(shù)據(jù)然后寫入HDFS

    經(jīng)過6個月的奮斗,項目的離線數(shù)倉部分終于可以上線了,因此整理一下離線數(shù)倉的整個流程,既是大家提供一個案例經(jīng)驗,也是對自己近半年的工作進行一個總結(jié)。 項目行業(yè)屬于交通行業(yè),因此數(shù)據(jù)具有很多交通行業(yè)的特征,比如轉(zhuǎn)向比數(shù)據(jù)就是統(tǒng)計車輛左轉(zhuǎn)、右轉(zhuǎn)、直行

    2024年02月07日
    瀏覽(20)
  • springboot整合多數(shù)據(jù)源的配置以及動態(tài)切換數(shù)據(jù)源,注解切換數(shù)據(jù)源

    springboot整合多數(shù)據(jù)源的配置以及動態(tài)切換數(shù)據(jù)源,注解切換數(shù)據(jù)源

    在許多應(yīng)用程序中,可能需要使用多個數(shù)據(jù)庫或數(shù)據(jù)源來處理不同的業(yè)務(wù)需求。Spring Boot提供了簡便的方式來配置和使用多數(shù)據(jù)源,使開發(fā)人員能夠輕松處理多個數(shù)據(jù)庫連接。如果你的項目中可能需要隨時切換數(shù)據(jù)源的話,那我這篇文章可能能幫助到你 ??:這里對于pom文件

    2024年02月10日
    瀏覽(33)
  • NamedParameterJdbcTemplate多數(shù)據(jù)源指定數(shù)據(jù)源

    實戰(zhàn)例子記錄 pom config NamedParameterJdbcTemplate(動態(tài)sql調(diào)用)

    2024年02月08日
    瀏覽(29)
  • 數(shù)據(jù)源作用以及spring配置數(shù)據(jù)源

    數(shù)據(jù)源,簡單理解為數(shù)據(jù)源頭,提供了應(yīng)用程序所需要數(shù)據(jù)的位置。數(shù)據(jù)源保證了應(yīng)用程序與目標(biāo)數(shù)據(jù)之間交互的規(guī)范和協(xié)議,它可以是數(shù)據(jù)庫,文件系統(tǒng)等等。其中數(shù)據(jù)源定義了位置信息,用戶驗證信息和交互時所需的一些特性的配置,同時它封裝了如何建立與數(shù)據(jù)源的連

    2024年02月07日
    瀏覽(27)
  • SpringBoot——動態(tài)數(shù)據(jù)源(多數(shù)據(jù)源自動切換)

    SpringBoot——動態(tài)數(shù)據(jù)源(多數(shù)據(jù)源自動切換)

    日常的業(yè)務(wù)開發(fā)項目中只會配置一套數(shù)據(jù)源,如果需要獲取其他系統(tǒng)的數(shù)據(jù)往往是通過調(diào)用接口, 或者是通過第三方工具比如kettle將數(shù)據(jù)同步到自己的數(shù)據(jù)庫中進行訪問。 但是也會有需要在項目中引用多數(shù)據(jù)源的場景。比如如下場景: 自研數(shù)據(jù)遷移系統(tǒng),至少需要新、老兩

    2024年02月16日
    瀏覽(18)
  • SpringBoot從數(shù)據(jù)庫讀取數(shù)據(jù)數(shù)據(jù)源配置信息,動態(tài)切換數(shù)據(jù)源

    SpringBoot從數(shù)據(jù)庫讀取數(shù)據(jù)數(shù)據(jù)源配置信息,動態(tài)切換數(shù)據(jù)源

    ? ? ? ? 首先準(zhǔn)備多個數(shù)據(jù)庫,主庫smiling-datasource,其它庫test1、test2、test3 ? ? ? ? 接下來,我們在主庫smiling-datasource中,創(chuàng)建表databasesource,用于存儲多數(shù)據(jù)源相關(guān)信息。表結(jié)構(gòu)設(shè)計如下 ? ? ? ? 創(chuàng)建好表之后,向表databasesource中存儲test1、test2、test3三個數(shù)據(jù)庫的相關(guān)配置

    2024年01月16日
    瀏覽(37)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包