Spark Streaming概述

什么是Spark Streaming
Spark Streaming類似于Apache Storm,用于流式數(shù)據(jù)的處理。根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機器學(xué)習(xí))以及Graphx完美融合。

和Spark基于RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個時間區(qū)間收到的數(shù)據(jù)都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此 得名“離散化”)。

DStream 可以從各種輸入源創(chuàng)建,比如 Flume、Kafka 或者 HDFS。創(chuàng)建出來的DStream 支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會生成一個新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream 提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時間相關(guān)的新操作,比如滑動窗口。
為什么要學(xué)習(xí)Spark Streaming
易用

容錯

易整合到Spark體系

Spark與Storm的對比
Spark |
Storm |
![]() |
![]() |
開發(fā)語言:Scala |
開發(fā)語言:Clojure |
編程模型:DStream |
編程模型:Spout/Bolt |
![]() |
![]() |
運行Spark Streaming
IDEA編寫程序
Pom.xml 加入以下依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } } |
按照Spark Core中的方式進行打包,并將程序上傳到Spark機器上。并運行:
bin/spark-submit --class com.atguigu.streaming.WorldCount ~/wordcount-jar-with-dependencies.jar |
通過Netcat發(fā)送數(shù)據(jù):
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world |
如果程序運行時,log日志太多,可以將spark conf目錄下的log4j文件里面的日志級別改成WARN。
架構(gòu)與抽象
Spark Streaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來。在每個時間區(qū)間開始的時候,一個新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中。在時間區(qū)間結(jié)束時,批次停止增長。時間區(qū)間的大小是由批次間隔這個參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。高層次的架構(gòu)如圖

Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個 RDD 序列,每個RDD代表數(shù)據(jù)流中一個時間片內(nèi)的數(shù)據(jù)。

Spark Streaming在Spark的驅(qū)動器程序—工作節(jié)點的結(jié)構(gòu)的執(zhí)行過程如下圖所示。Spark Streaming為每個輸入源啟動對 應(yīng)的接收器。接收器以任務(wù)的形式運行在應(yīng)用的執(zhí)行器進程中,從輸入源收集數(shù)據(jù)并保存為 RDD。它們收集到輸入數(shù)據(jù)后會把數(shù)據(jù)復(fù)制到另一個執(zhí)行器進程來保障容錯性(默 認(rèn)行為)。數(shù)據(jù)保存在執(zhí)行器進程的內(nèi)存中,和緩存 RDD 的方式一樣。驅(qū)動器程序中的 StreamingContext 會周期性地運行 Spark 作業(yè)來處理這些數(shù)據(jù),把數(shù)據(jù)與之前時間區(qū)間中的 RDD 進行整合。

Spark Streaming解析
初始化StreamingContext
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 可以通過ssc.sparkContext 來訪問SparkContext // 或者通過已經(jīng)存在的SparkContext來創(chuàng)建StreamingContext import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1)) |
初始化完Context之后:
定義消息輸入源來創(chuàng)建DStreams.
定義DStreams的轉(zhuǎn)化操作和輸出操作。
通過 streamingContext.start()來啟動消息采集和處理.
等待程序終止,可以通過streamingContext.awaitTermination()來設(shè)置
通過streamingContext.stop()來手動終止處理程序。
StreamingContext和SparkContext什么關(guān)系?
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1)) |
注意:
StreamingContext一旦啟動,對DStreams的操作就不能修改了。
在同一時間一個JVM中只有一個StreamingContext可以啟動
stop() 方法將同時停止SparkContext,可以傳入?yún)?shù)stopSparkContext用于只停止StreamingContext
在Spark1.4版本后,如何優(yōu)雅的停止SparkStreaming而不丟失數(shù)據(jù),通過設(shè)置sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") 即可。在StreamingContext的start方法中已經(jīng)注冊了Hook方法。
什么是DStreams
Discretized Stream是Spark Streaming的基礎(chǔ)抽象,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實現(xiàn)上,DStream是一系列連續(xù)的RDD來表示。每個RDD含有一段時間間隔內(nèi)的數(shù)據(jù),如下圖:

對數(shù)據(jù)的操作也是按照RDD為單位來進行的

計算過程由Spark engine來完成

DStreams輸入
Spark Streaming原生支持一些不同的數(shù)據(jù)源。一些“核心”數(shù)據(jù)源已經(jīng)被打包到Spark Streaming 的 Maven 工件中,而其他的一些則可以通過 spark-streaming-kafka 等附加工件獲取。每個接收器都以 Spark 執(zhí)行器程序中一個長期運行的任務(wù)的形式運行,因此會占據(jù)分配給應(yīng)用的 CPU 核心。此外,我們還需要有可用的 CPU 核心來處理數(shù)據(jù)。這意味著如果要運行多個接收器,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計算所需要的核心數(shù)。例如,如果我們想要在流計算應(yīng)用中運行 10 個接收器,那么至少需要為應(yīng)用分配 11 個 CPU 核心。所以如果在本地模式運行,不要使用local或者local[1]。
基本數(shù)據(jù)源
文件數(shù)據(jù)源
Socket數(shù)據(jù)流前面的例子已經(jīng)看到過。
文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進行讀取
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) |
Spark Streaming 將會監(jiān)控 dataDirectory 目錄并不斷處理移動進來的文件,記住目前不支持嵌套目錄。
文件需要有相同的數(shù)據(jù)格式
文件進入 dataDirectory的方式需要通過移動或者重命名來實現(xiàn)。
一旦文件移動進目錄,則不能再修改,即便修改了也不會讀取新數(shù)據(jù)。
如果文件比較簡單,則可以使用 streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不需要接收器,不需要單獨分配CPU核。
Hdfs讀取實例:
提前需要在HDFS上建好目錄。
scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4027edeb
scala> val lines = ssc.textFileStream("hdfs://master01:9000/data/") lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@61d9dd15
scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1e084a26
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@8947a4b
scala> wordCounts.print()
scala> ssc.start() |
上傳文件上去:
[bigdata@master01 hadoop-2.7.3]$ ls bin data etc include lib libexec LICENSE.txt logs NOTICE.txt README.txt sbin sdata share [bigdata@master01 hadoop-2.7.3]$ bin/hdfs dfs -put ./LICENSE.txt /data/ [bigdata@master01 hadoop-2.7.3]$ bin/hdfs dfs -put ./README.txt /data/ |
獲取計算結(jié)果:
------------------------------------------- Time: 1504665716000 ms ------------------------------------------- ------------------------------------------- Time: 1504665717000 ms ------------------------------------------- ------------------------------------------- Time: 1504665718000 ms ------------------------------------------- (227.7202-1,2) (created,2) (offer,8) (BUSINESS,11) (agree,10) (hereunder,,1) (“control”,1) (Grant,2) (2.2.,2) (include,11) ... ------------------------------------------- Time: 1504665719000 ms ------------------------------------------- Time: 1504665739000 ms ------------------------------------------- ------------------------------------------- Time: 1504665740000 ms ------------------------------------------- (under,1) (Technology,1) (distribution,2) (http://hadoop.apache.org/core/,1) (Unrestricted,1) (740.13),1) (check,1) (have,1) (policies,1) (uses,1) ... ------------------------------------------- Time: 1504665741000 ms ------------------------------------------- |
自定義數(shù)據(jù)源
通過繼承Receiver,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } } |
可以通過streamingContext.receiverStream(<instance of custom receiver>)
來使用自定義的數(shù)據(jù)采集源
// Assuming ssc is the StreamingContext val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = lines.flatMap(_.split(" ")) ... |
模擬Spark內(nèi)置的Socket鏈接:
package com.atguigu.streaming import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver class CustomReceiver (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { override def onStart(): Unit = { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } override def onStop(): Unit = { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { // 傳送出來 store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } } object CustomReceiver { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.receiverStream(new CustomReceiver("master01", 9999)) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } } |


RDD隊列
測試過程中,可以通過使用streamingContext.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object QueueRdd { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd") val ssc = new StreamingContext(conf, Seconds(1)) // Create the queue through which RDDs can be pushed to // a QueueInputDStream //創(chuàng)建RDD隊列 val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing // 創(chuàng)建QueueInputDStream val inputStream = ssc.queueStream(rddQueue) //處理隊列中的RDD數(shù)據(jù) val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) //打印結(jié)果 reducedStream.print() //啟動計算 ssc.start() // Create and push some RDDs into for (i <- 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) //通過程序停止StreamingContext的運行 //ssc.stop() } } } |
[bigdata@master01 spark-2.1.1-bin-hadoop2.7]$ bin/spark-submit --class com.atguigu.streaming.QueueRdd ~/queueRdd-jar-with-dependencies.jar 17/09/05 23:28:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ------------------------------------------- Time: 1504668485000 ms ------------------------------------------- (4,30) (0,30) (6,30) (8,30) (2,30) (1,30) (3,30) (7,30) (9,30) (5,30)
------------------------------------------- Time: 1504668486000 ms -------------------------------------------
------------------------------------------- Time: 1504668487000 ms ------------------------------------------- (4,30) (0,30) (6,30) (8,30) (2,30) (1,30) (3,30) (7,30) (9,30) (5,30) |
高級數(shù)據(jù)源
除核心數(shù)據(jù)源外,還可以用附加數(shù)據(jù)源接收器來從一些知名數(shù)據(jù)獲取系統(tǒng)中接收的數(shù)據(jù),這些接收器都作為Spark Streaming的組件進行獨立打包了。它們?nèi)匀皇荢park的一部分,不過你需要在構(gòu)建文件中添加額外的包才能使用它們?,F(xiàn)有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ??梢酝ㄟ^添加與Spark版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 來引入這些附加接收器。
Apache Kafka
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 來使用它。包內(nèi)提供的 KafkaUtils 對象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息創(chuàng)建出 DStream。由于 KafkaUtils 可以訂閱多個主題,因此它創(chuàng)建出的 DStream 由成對的主題和消息組成。要創(chuàng)建出一個流數(shù)據(jù),需 要使用 StreamingContext 實例、一個由逗號隔開的 ZooKeeper 主機列表字符串、消費者組的名字(唯一名字),以及一個從主題到針對這個主題的接收器線程數(shù)的映射表來調(diào)用 createStream() 方法
import org.apache.spark.streaming.kafka._...// 創(chuàng)建一個從主題到接收器線程數(shù)的映射表 val topics = List(("pandas", 1), ("logs", 1)).toMap val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) topicLines.map(_._2) |
下面我們進行一個實例,演示SparkStreaming如何從Kafka讀取消息,如果通過連接池方法把消息處理完成后再寫會Kafka:

kafka Connection Pool程序:
package com.atguigu.streaming import java.util.Properties import org.apache.commons.pool2.impl.DefaultPooledObject import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} case class KafkaProducerProxy(brokerList: String, producerConfig: Properties = new Properties, defaultTopic: Option[String] = None, producer: Option[KafkaProducer[String, String]] = None) { type Key = String type Val = String require(brokerList == null || !brokerList.isEmpty, "Must set broker list") private val p = producer getOrElse { var props:Properties= new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); new KafkaProducer[String,String](props) } private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = { val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic"))) require(!t.isEmpty, "Topic must not be empty") key match { case Some(k) => new ProducerRecord(t, k, value) case _ => new ProducerRecord(t, value) } } def send(key: Key, value: Val, topic: Option[String] = None) { p.send(toMessage(value, Option(key), topic)) } def send(value: Val, topic: Option[String]) { send(null, value, topic) } def send(value: Val, topic: String) { send(null, value, Option(topic)) } def send(value: Val) { send(null, value, None) } def shutdown(): Unit = p.close() } abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable { def newInstance(): KafkaProducerProxy } class BaseKafkaProducerFactory(brokerList: String, config: Properties = new Properties, defaultTopic: Option[String] = None) extends KafkaProducerFactory(brokerList, config, defaultTopic) { override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic) } class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory) extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable { override def create(): KafkaProducerProxy = factory.newInstance() override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj) override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = { p.getObject.shutdown() super.destroyObject(p) } } |
KafkaStreaming main:
package com.atguigu.streaming import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.api.java.function.VoidFunction import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object createKafkaProducerPool{ def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = { val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic)) val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory) val poolConfig = { val c = new GenericObjectPoolConfig val maxNumProducers = 10 c.setMaxTotal(maxNumProducers) c.setMaxIdle(maxNumProducers) c } new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig) } } object KafkaStreaming{ def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //創(chuàng)建topic val brobrokers = "172.16.148.150:9092,172.16.148.151:9092,172.16.148.152:9092" val sourcetopic="source"; val targettopic="target"; //創(chuàng)建消費者組 var group="con-consumer-group" //消費者配置 val kafkaParam = Map( "bootstrap.servers" -> brobrokers,//用于初始化鏈接到集群的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用于標(biāo)識這個消費者屬于哪個消費團體 "group.id" -> group, //如果沒有初始化偏移量或者當(dāng)前的偏移量不存在任何服務(wù)器上,可以使用這個配置屬性 //可以使用這個配置,latest自動重置偏移量為最新的偏移量 "auto.offset.reset" -> "latest", //如果是true,則這個消費者的偏移量會在后臺自動提交 "enable.auto.commit" -> (false: java.lang.Boolean) ); //ssc.sparkContext.broadcast(pool) //創(chuàng)建DStream,返回接收到的輸入數(shù)據(jù) var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam)) //每一個stream都是一個ConsumerRecord stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // Get a producer from the shared pool val pool = createKafkaProducerPool(brobrokers, targettopic) val p = pool.borrowObject() partitionOfRecords.foreach {message => System.out.println(message._2);p.send(message._2,Option(targettopic))} // Returning the producer to the pool also shuts it down pool.returnObject(p) }) }) ssc.start() ssc.awaitTermination() } } |
程序部署:
1、啟動zookeeper和kafka。
bin/kafka-server-start.sh -deamon ./config/server.properties |
2、創(chuàng)建兩個topic,一個為source,一個為target
bin/kafka-topics.sh --create --zookeeper 192.168.56.150:2181,192.168.56.151:2181,192.168.56.152:2181 --replication-factor 2 --partitions 2 --topic source |
bin/kafka-topics.sh --create --zookeeper 172.16.148.150:2181,172.16.148.151:2181,172.16.148.152:2181 --replication-factor 2 --partitions 2 --topic target |
3、啟動kafka console producer 寫入source topic
bin/kafka-console-producer.sh --broker-list 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source |
4、啟動kafka console consumer 監(jiān)聽target topic
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source |
5、啟動kafkaStreaming程序:
[bigdata@master01 ~]$ ./hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.KafkaStreaming ./kafkastreaming-jar-with-dependencies.jar |
6、程序運行截圖:

Spark對Kafka兩種連接方式的對比
Spark對于Kafka的連接主要有兩種方式,一種是DirectKafkaInputDStream,另外一種是KafkaInputDStream。DirectKafkaInputDStream 只在 driver 端接收數(shù)據(jù),所以繼承了 InputDStream,是沒有 receivers 的。
主要通過KafkaUtils#createDirectStream以及KafkaUtils#createStream這兩個 API 來創(chuàng)建,除了要傳入的參數(shù)不同外,接收 kafka 數(shù)據(jù)的節(jié)點、拉取數(shù)據(jù)的時機也完全不同。
KafkaUtils#createStream【Receiver-based】
這種方法使用一個 Receiver 來接收數(shù)據(jù)。在該 Receiver 的實現(xiàn)中使用了 Kafka high-level consumer API。Receiver 從 kafka 接收的數(shù)據(jù)將被存儲到 Spark executor 中,隨后啟動的 job 將處理這些數(shù)據(jù)。
在默認(rèn)配置下,該方法失敗后會丟失數(shù)據(jù)(保存在 executor 內(nèi)存里的數(shù)據(jù)在 application 失敗后就沒了),若要保證數(shù)據(jù)不丟失,需要啟用 WAL(即預(yù)寫日志至 HDFS、S3等),這樣再失敗后可以從日志文件中恢復(fù)數(shù)據(jù)。
在該函數(shù)中,會新建一個 KafkaInputDStream對象,KafkaInputDStream繼承于 ReceiverInputDStream。KafkaInputDStream實現(xiàn)了getReceiver方法,返回接收器的實例:
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
//< 不啟用 WAL
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
//< 啟用 WAL
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
根據(jù)是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver。下圖描述了 KafkaReceiver 接收數(shù)據(jù)的具體流程:

需要注意的點:
· Kafka Topic 的 partitions 與RDD 的 partitions 沒有直接關(guān)系,不能一一對應(yīng)。如果增加 topic 的 partition 個數(shù)的話僅僅會增加單個 Receiver 接收數(shù)據(jù)的線程數(shù)。事實上,使用這種方法只會在一個 executor 上啟用一個 Receiver,該 Receiver 包含一個線程池,線程池的線程個數(shù)與所有 topics 的 partitions 個數(shù)總和一致,每條線程接收一個 topic 的一個 partition 的數(shù)據(jù)。而并不會增加處理數(shù)據(jù)時的并行度。
· 對于一個 topic,可以使用多個 groupid 相同的 input DStream 來使用多個 Receivers 來增加并行度,然后 union 他們;對于多個 topics,除了可以用上個辦法增加并行度外,還可以對不同的 topic 使用不同的 input DStream 然后 union 他們來增加并行度
· 如果你啟用了 WAL,為能將接收到的數(shù)據(jù)將以 log 的方式在指定的存儲系統(tǒng)備份一份,需要指定輸入數(shù)據(jù)的存儲等級為 StorageLevel.MEMORY_AND_DISK_SER 或 StorageLevel.MEMORY_AND_DISK_SER_2
KafkaUtils#createDirectStream【W(wǎng)ithOut Receiver】
自 Spark-1.3.0 起,提供了不需要 Receiver 的方法。替代了使用 receivers 來接收數(shù)據(jù),該方法定期查詢每個 topic+partition 的 lastest offset,并據(jù)此決定每個 batch 要接收的 offsets 范圍。
KafkaUtils#createDirectStream調(diào)用中,會新建DirectKafkaInputDStream,DirectKafkaInputDStream#compute(validTime: Time)會從 kafka 拉取數(shù)據(jù)并生成 RDD,流程如下:

如上圖所示,該函數(shù)主要做了以下三個事情:
確定要接收的 partitions 的 offsetRange,以作為第2步創(chuàng)建的 RDD 的數(shù)據(jù)來源
創(chuàng)建 RDD 并執(zhí)行 count 操作,使 RDD 真實具有數(shù)據(jù)
以 streamId、數(shù)據(jù)條數(shù),offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中
進一步看 KafkaRDD 的 getPartitions 實現(xiàn):
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數(shù)據(jù)與 Kafka topic 的某個 partition 的 o.fromOffset 至 o.untilOffset 數(shù)據(jù)是相對應(yīng)的,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對應(yīng)的
該方式相比使用 Receiver 的方式有以下好處:
簡化并行:不再需要創(chuàng)建多個 kafka input DStream 然后再 union 這些 input DStream。使用 directStream,Spark Streaming會創(chuàng)建與 Kafka partitions 相同數(shù)量的 paritions 的 RDD,RDD 的 partition與 Kafka 的 partition 一一對應(yīng),這樣更易于理解及調(diào)優(yōu)
高效:在方式一中要保證數(shù)據(jù)零丟失需要啟用 WAL(預(yù)寫日志),這會占用更多空間。而在方式二中,可以直接從 Kafka 指定的 topic 的指定 offsets 處恢復(fù)數(shù)據(jù),不需要使用 WAL
恰好一次語義保證:基于Receiver方式使用了 Kafka 的 high level API 來在 Zookeeper 中存儲已消費的 offsets。這在某些情況下會導(dǎo)致一些數(shù)據(jù)被消費兩次,比如 streaming app 在處理某個 batch 內(nèi)已接受到的數(shù)據(jù)的過程中掛掉,但是數(shù)據(jù)已經(jīng)處理了一部分,但這種情況下無法將已處理數(shù)據(jù)的 offsets 更新到 Zookeeper 中,下次重啟時,這批數(shù)據(jù)將再次被消費且處理?;赿irect的方式,使用kafka的簡單api,Spark Streaming自己就負(fù)責(zé)追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費一次且僅消費一次。這種方式中,只要將 output 操作和保存 offsets 操作封裝成一個原子操作就能避免失敗后的重復(fù)消費和處理,從而達(dá)到恰好一次的語義(Exactly-once)
通過以上分析,我們可以對這兩種方式的區(qū)別做一個總結(jié):
createStream會使用 Receiver;而createDirectStream不會
createStream使用的 Receiver 會分發(fā)到某個 executor 上去啟動并接受數(shù)據(jù);而createDirectStream直接在 driver 上接收數(shù)據(jù)
createStream使用 Receiver 源源不斷的接收數(shù)據(jù)并把數(shù)據(jù)交給 ReceiverSupervisor 處理最終存儲為 blocks 作為 RDD 的輸入,從 kafka 拉取數(shù)據(jù)與計算消費數(shù)據(jù)相互獨立;而createDirectStream會在每個 batch 拉取數(shù)據(jù)并就地消費,到下個 batch 再次拉取消費,周而復(fù)始,從 kafka 拉取數(shù)據(jù)與計算消費數(shù)據(jù)是連續(xù)的,沒有獨立開
createStream中創(chuàng)建的KafkaInputDStream 每個 batch 所對應(yīng)的 RDD 的 partition 不與 Kafka partition 一一對應(yīng);而createDirectStream中創(chuàng)建的 DirectKafkaInputDStream 每個 batch 所對應(yīng)的 RDD 的 partition 與 Kafka partition 一一對應(yīng)
Flume-ng
Spark提供兩個不同的接收器來使用Apache Flume(http://flume.apache.org/,見圖10-8)。 兩個接收器簡介如下。
? 推式接收器該接收器以 Avro 數(shù)據(jù)池的方式工作,由 Flume 向其中推數(shù)據(jù)。
? 拉式接收器該接收器可以從自定義的中間數(shù)據(jù)池中拉數(shù)據(jù),而其他進程可以使用 Flume 把數(shù)據(jù)推進 該中間數(shù)據(jù)池。
兩種方式都需要重新配置 Flume,并在某個節(jié)點配置的端口上運行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一種方法,都需要在工程中引入 Maven 工件 spark-streaming-flume_2.10。

推式接收器的方法設(shè)置起來很容易,但是它不使用事務(wù)來接收數(shù)據(jù)。在這種方式中,接收 器以 Avro 數(shù)據(jù)池的方式工作,我們需要配置 Flume 來把數(shù)據(jù)發(fā)到 Avro 數(shù)據(jù)池。我們提供的 FlumeUtils 對象會把接收器配置在一個特定的工作節(jié)點的主機名及端口號上。這些設(shè)置必須和 Flume 配置相匹配。

雖然這種方式很簡潔,但缺點是沒有事務(wù)支持。這會增加運行接收器的工作節(jié)點發(fā)生錯誤 時丟失少量數(shù)據(jù)的幾率。不僅如此,如果運行接收器的工作節(jié)點發(fā)生故障,系統(tǒng)會嘗試從 另一個位置啟動接收器,這時需要重新配置 Flume 才能將數(shù)據(jù)發(fā)給新的工作節(jié)點。這樣配 置會比較麻煩。
較新的方式是拉式接收器(在Spark 1.1中引入),它設(shè)置了一個專用的Flume數(shù)據(jù)池供 Spark Streaming讀取,并讓接收器主動從數(shù)據(jù)池中拉取數(shù)據(jù)。這種方式的優(yōu)點在于彈性較 好,Spark Streaming通過事務(wù)從數(shù)據(jù)池中讀取并復(fù)制數(shù)據(jù)。在收到事務(wù)完成的通知前,這 些數(shù)據(jù)還保留在數(shù)據(jù)池中。
我們需要先把自定義數(shù)據(jù)池配置為 Flume 的第三方插件。安裝插件的最新方法請參考 Flume 文檔的相關(guān)部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party- plugins)。由于插件是用 Scala 寫的,因此需要把插件本身以及 Scala 庫都添加到 Flume 插件 中。Spark 1.1 中對應(yīng)的 Maven 索引如例 10-37 所示。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
當(dāng)你把自定義 Flume 數(shù)據(jù)池添加到一個節(jié)點上之后,就需要配置 Flume 來把數(shù)據(jù)推送到這個數(shù)據(jù)池中,
a1.sinks = spark a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.spark.hostname = receiver-hostname a1.sinks.spark.port = port-used-for-sync-not-spark-port a1.sinks.spark.channel = memoryChannel |
等到數(shù)據(jù)已經(jīng)在數(shù)據(jù)池中緩存起來,就可以調(diào)用 FlumeUtils 來讀取數(shù)據(jù)了

DStreams轉(zhuǎn)換
DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關(guān)的原語。
Transformation |
Meaning |
map(func) |
將源DStream中的每個元素通過一個函數(shù)func從而得到新的DStreams。 |
flatMap(func) |
和map類似,但是每個輸入的項可以被映射為0或更多項。 |
filter(func) |
選擇源DStream中函數(shù)func判為true的記錄作為新DStreams |
repartition(numPartitions) |
通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別。 |
union(otherStream) |
聯(lián)合源DStreams和其他DStreams來得到新DStream |
count() |
統(tǒng)計源DStreams中每個RDD所含元素的個數(shù)得到單元素RDD的新DStreams。 |
reduce(func) |
通過函數(shù)func(兩個參數(shù)一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStreams。這個函數(shù)需要關(guān)聯(lián)從而可以被并行計算。 |
countByValue() |
對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream,其中Long值表明相應(yīng)的K在源DStream中每個RDD出現(xiàn)的頻率。 |
reduceByKey(func, [numTasks]) |
對(K,V)對的DStream調(diào)用此函數(shù),返回同樣(K,V)對的新DStream,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來。Note:默認(rèn)情況下,這個操作使用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2,集群模式中的數(shù)量取決于配置參數(shù)spark.default.parallelism)。你也可以傳入可選的參數(shù)numTaska來設(shè)置不同數(shù)量的任務(wù)。 |
join(otherStream, [numTasks]) |
兩DStream分別為(K,V)和(K,W)對,返回(K,(V,W))對的新DStream。 |
cogroup(otherStream, [numTasks]) |
兩DStream分別為(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStreams |
transform(func) |
將RDD到RDD映射的函數(shù)func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作。 |
updateStateByKey(func) |
得到”狀態(tài)”DStream,其中每個key狀態(tài)的更新是通過將給定函數(shù)用于此key的上一個狀態(tài)和新值而得到。這個可用于保存每個key值的任意狀態(tài)數(shù)據(jù)。 |
DStream 的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。
? 在無狀態(tài)轉(zhuǎn)化操作中,每個批次的處理不依賴于之前批次的數(shù)據(jù)。常見的 RDD 轉(zhuǎn)化操作,例如 map()、filter()、reduceByKey() 等,都是無狀態(tài)轉(zhuǎn)化操作。
? 相對地,有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。
無狀態(tài)轉(zhuǎn)化操作
無狀態(tài)轉(zhuǎn)化操作就是把簡單的 RDD 轉(zhuǎn)化操作應(yīng)用到每個批次上,也就是轉(zhuǎn)化 DStream 中的每一個 RDD。部分無狀態(tài)轉(zhuǎn)化操作列在了下表中。 注意,針對鍵值對的 DStream 轉(zhuǎn)化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。

需要記住的是,盡管這些函數(shù)看起來像作用在整個流上一樣,但事實上每個 DStream 在內(nèi)部是由許多 RDD(批次)組成,且無狀態(tài)轉(zhuǎn)化操作是分別應(yīng)用到每個 RDD 上的。例如, reduceByKey() 會歸約每個時間區(qū)間中的數(shù)據(jù),但不會歸約不同區(qū)間之間的數(shù)據(jù)。
舉個例子,在之前的wordcount程序中,我們只會統(tǒng)計1秒內(nèi)接收到的數(shù)據(jù)的單詞個數(shù),而不會累加。
無狀態(tài)轉(zhuǎn)化操作也能在多個 DStream 間整合數(shù)據(jù),不過也是在各個時間區(qū)間內(nèi)。例如,鍵 值對 DStream 擁有和 RDD 一樣的與連接相關(guān)的轉(zhuǎn)化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個批次分別執(zhí)行了對應(yīng)的 RDD 操作。
我們還可以像在常規(guī)的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個 DStream 的內(nèi)容合并起來,也可以使用 StreamingContext.union() 來合并多個流。
有狀態(tài)轉(zhuǎn)化操作
特殊的Transformations
追蹤狀態(tài)變化UpdateStateByKey
UpdateStateByKey原語用于記錄歷史記錄,有時,我們需要在 DStream 中跨批次維護狀態(tài)(例如流計算中累加wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個狀態(tài)變量的訪問,用于鍵值對形式的 DStream。給定一個由(鍵,事件)對構(gòu)成的 DStream,并傳遞一個指定如何根據(jù)新的事件 更新每個鍵對應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài)) 對。
updateStateByKey() 的結(jié)果會是一個新的 DStream,其內(nèi)部的 RDD 序列是由每個時間區(qū)間對應(yīng)的(鍵,狀態(tài))對組成的。
updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態(tài)。為使用這個功能,你需要做下面兩步:
1. 定義狀態(tài),狀態(tài)可以是一個任意的數(shù)據(jù)類型。
2. 定義狀態(tài)更新函數(shù),用此函數(shù)闡明如何使用之前的狀態(tài)和來自輸入流的新值對狀態(tài)進行更新。
使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來保存狀態(tài)。
更新版的wordcount:
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定義更新狀態(tài)方法,參數(shù)values為當(dāng)前批次單詞頻度,state為以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) // 使用updateStateByKey來更新狀態(tài),統(tǒng)計從運行開始以來單詞總的次數(shù) val stateDstream = pairs.updateStateByKey[Int](updateFunc) stateDstream.print() //val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console //wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } } |
啟動nc –lk 9999
[bigdata@master01 ~]$ nc -lk 9999 ni shi shui ni hao ma |
啟動統(tǒng)計程序:
[bigdata@master01 ~]$ ./hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.WorldCount ./statefulwordcount-jar-with-dependencies.jar 17/09/06 04:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ------------------------------------------- Time: 1504685175000 ms ------------------------------------------- ------------------------------------------- Time: 1504685181000 ms ------------------------------------------- (shi,1) (shui,1) (ni,1) ------------------------------------------- Time: 1504685187000 ms ------------------------------------------- (shi,1) (ma,1) (hao,1) (shui,1) (ni,2) [bigdata@master01 ~]$ ls 2df8e0c3-174d-401a-b3a7-f7776c3987db checkpoint-1504685205000 data backup checkpoint-1504685205000.bk debug.log checkpoint-1504685199000 checkpoint-1504685208000 hadoop checkpoint-1504685199000.bk checkpoint-1504685208000.bk receivedBlockMetadata checkpoint-1504685202000 checkpoint-1504685211000 software checkpoint-1504685202000.bk checkpoint-1504685211000.bk statefulwordcount-jar-with-dependencies.jar |
Window Operations
Window Operations有點類似于Storm中的State,可以設(shè)置窗口的大小和滑動窗口的間隔來動態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)。
基于窗口的操作會在一個比 StreamingContext 的批次間隔更長的時間范圍內(nèi),通過整合多個批次的結(jié)果,計算出整個窗口的結(jié)果。

所有基于窗口的操作都需要兩個參數(shù),分別為窗口時長以及滑動步長,兩者都必須是 StreamContext 的批次間隔的整數(shù)倍。窗口時長控制每次計算最近的多少個批次的數(shù)據(jù),其實就是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源 DStream,要創(chuàng)建一個最近 30 秒的時間窗口(即最近 3 個批次),就應(yīng)當(dāng)把 windowDuration 設(shè)為 30 秒。而滑動步長的默認(rèn)值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每兩個批次計算一次窗口結(jié)果, 就應(yīng)該把滑動步長設(shè)置為 20 秒。
假設(shè),你想拓展前例從而每隔十秒對持續(xù)30秒的數(shù)據(jù)生成word count。為做到這個,我們需要在持續(xù)30秒數(shù)據(jù)的(word,1)對DStream上應(yīng)用reduceByKey。使用操作reduceByKeyAndWindow.
# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)

Transformation |
Meaning |
window(windowLength, slideInterval) |
基于對源DStream窗化的批次進行計算返回一個新的DStream |
countByWindow(windowLength, slideInterval) |
返回一個滑動窗口計數(shù)流中的元素。 |
reduceByWindow(func, windowLength, slideInterval) |
通過使用自定義函數(shù)整合滑動區(qū)間流元素來創(chuàng)建一個新的單元素流。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) |
當(dāng)在一個(K,V)對的DStream上調(diào)用此函數(shù),會返回一個新(K,V)對的DStream,此處通過對滑動窗口中批次數(shù)據(jù)使用reduce函數(shù)來整合每個key的value值。Note:默認(rèn)情況下,這個操作使用Spark的默認(rèn)數(shù)量并行任務(wù)(本地是2),在集群模式中依據(jù)配置屬性(spark.default.parallelism)來做grouping。你可以通過設(shè)置可選參數(shù)numTasks來設(shè)置不同數(shù)量的tasks。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) |
這個函數(shù)是上述函數(shù)的更高效版本,每個窗口的reduce值都是通過用前一個窗的reduce值來遞增計算。通過reduce進入到滑動窗口數(shù)據(jù)并”反向reduce”離開窗口的舊數(shù)據(jù)來實現(xiàn)這個操作。一個例子是隨著窗口滑動對keys的“加”“減”計數(shù)。通過前邊介紹可以想到,這個函數(shù)只適用于”可逆的reduce函數(shù)”,也就是這些reduce函數(shù)有相應(yīng)的”反reduce”函數(shù)(以參數(shù)invFunc形式傳入)。如前述函數(shù),reduce任務(wù)的數(shù)量通過可選參數(shù)來配置。注意:為了使用這個操作,檢查點必須可用。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
對(K,V)對的DStream調(diào)用,返回(K,Long)對的新DStream,其中每個key的值是其在滑動窗口中頻率。如上,可配置reduce任務(wù)數(shù)量。 |
reduceByWindow() 和 reduceByKeyAndWindow() 讓我們可以對每個窗口更高效地進行歸約操作。它們接收一個歸約函數(shù),在整個窗口上執(zhí)行,比如 +。除此以外,它們還有一種特殊形式,通過只考慮新進入窗口的數(shù)據(jù)和離開窗 口的數(shù)據(jù),讓 Spark 增量計算歸約結(jié)果。這種特殊形式需要提供歸約函數(shù)的一個逆函數(shù),比 如 + 對應(yīng)的逆函數(shù)為 -。對于較大的窗口,提供逆函數(shù)可以大大提高執(zhí)行效率

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) // 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長// 滑動步長 |
countByWindow() 和 countByValueAndWindow() 作為對數(shù)據(jù)進行 計數(shù)操作的簡寫。countByWindow() 返回一個表示每個窗口中元素個數(shù)的 DStream,而 countByValueAndWindow() 返回的 DStream 則包含窗口中每個值的個數(shù),
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10)) |
WordCount第三版:3秒一個批次,窗口12秒,滑步6秒。
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定義更新狀態(tài)方法,參數(shù)values為當(dāng)前批次單詞頻度,state為以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6)) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } } |
重要操作
Transform Operation
Transform原語允許DStream上執(zhí)行任意的RDD-to-RDD函數(shù)。即使這些函數(shù)并沒有在DStream的API中暴露出來,通過該函數(shù)可以方便的擴展Spark API。
該函數(shù)每一批次調(diào)度一次。
比如下面的例子,在進行單詞統(tǒng)計的時候,想要過濾掉spam的信息。
其實也就是對DStream中的RDD應(yīng)用轉(zhuǎn)換。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... } |
Join 操作
連接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以連接Stream-Stream,windows-stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) |
Stream-dataset joins
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } |
DStreams輸出
輸出操作指定了對流數(shù)據(jù)經(jīng)轉(zhuǎn)化操作得到的數(shù)據(jù)所要執(zhí)行的操作(例如把結(jié)果推入外部數(shù)據(jù)庫或輸出到屏幕上)。與 RDD 中的惰性求值類似,如果一個 DStream 及其派生出的 DStream 都沒有被執(zhí)行輸出操作,那么這些 DStream 就都不會被求值。如果 StreamingContext 中沒有設(shè)定輸出操作,整個 context 就都不會啟動。
Output Operation |
Meaning |
print() |
在運行流程序的驅(qū)動結(jié)點上打印DStream中每一批次數(shù)據(jù)的最開始10個元素。這用于開發(fā)和調(diào)試。在Python API中,同樣的操作叫pprint()。 |
saveAsTextFiles(prefix, [suffix]) |
以text文件形式存儲這個DStream的內(nèi)容。每一批次的存儲文件名基于參數(shù)中的prefix和suffix?!眕refix-Time_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) |
以Java對象序列化的方式將Stream中的數(shù)據(jù)保存為 SequenceFiles . 每一批次的存儲文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 |
saveAsHadoopFiles(prefix, [suffix]) |
將Stream中的數(shù)據(jù)保存為 Hadoop files. 每一批次的存儲文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]". Python API Python中目前不可用。 |
foreachRDD(func) |
這是最通用的輸出操作,即將函數(shù)func用于產(chǎn)生于stream的每一個RDD。其中參數(shù)傳入的函數(shù)func應(yīng)該實現(xiàn)將每一個RDD中數(shù)據(jù)推送到外部系統(tǒng),如將RDD存入文件或者通過網(wǎng)絡(luò)將其寫入數(shù)據(jù)庫。注意:函數(shù)func在運行流應(yīng)用的驅(qū)動中被執(zhí)行,同時其中一般函數(shù)RDD操作從而強制其對于流RDD的運算。 |
通用的輸出操作 foreachRDD(),它用來對 DStream 中的 RDD 運行任意計算。這和transform() 有些類似,都可以讓我們訪問任意 RDD。在 foreachRDD() 中,可以重用我們在 Spark 中實現(xiàn)的所有行動操作。比如,常見的用例之一是把數(shù)據(jù)寫到諸如 MySQL 的外部數(shù)據(jù)庫中。
需要注意的:
連接不能寫在driver層面
如果寫在foreach則每個RDD都創(chuàng)建,得不償失
增加foreachPartition,在分區(qū)創(chuàng)建
可以考慮使用連接池優(yōu)化
dstream.foreachRDD { rdd => // error val connection = createNewConnection() // executed at the driver 序列化錯誤 rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record) // executed at the worker ) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } |
累加器和廣播變量
累加器(Accumulators)和廣播變量(Broadcast variables)不能從Spark Streaming的檢查點中恢復(fù)。如果你啟用檢查并也使用了累加器和廣播變量,那么你必須創(chuàng)建累加器和廣播變量的延遲單實例從而在驅(qū)動因失效重啟后他們可以被重新實例化。如下例述:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }) |
DataFrame ans SQL Operations
你可以很容易地在流數(shù)據(jù)上使用DataFrames和SQL。你必須使用SparkContext來創(chuàng)建StreamingContext要用的SQLContext。此外,這一過程可以在驅(qū)動失效后重啟。我們通過創(chuàng)建一個實例化的SQLContext單實例來實現(xiàn)這個工作。如下例所示。我們對前例word count進行修改從而使用DataFrames和SQL來產(chǎn)生word counts。每個RDD被轉(zhuǎn)換為DataFrame,以臨時表格配置并用SQL進行查詢。
val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } |
你也可以從不同的線程在定義于流數(shù)據(jù)的表上運行SQL查詢(也就是說,異步運行StreamingContext)。僅確定你設(shè)置StreamingContext記住了足夠數(shù)量的流數(shù)據(jù)以使得查詢操作可以運行。否則,StreamingContext不會意識到任何異步的SQL查詢操作,那么其就會在查詢完成之后刪除舊的數(shù)據(jù)。例如,如果你要查詢最后一批次,但是你的查詢會運行5分鐘,那么你需要調(diào)用streamingContext.remember(Minutes(5))(in Scala, 或者其他語言的等價操作)。
Caching / Persistence
和RDDs類似,DStreams同樣允許開發(fā)者將流數(shù)據(jù)保存在內(nèi)存中。也就是說,在DStream上使用persist()方法將會自動把DStreams中的每個RDD保存在內(nèi)存中。當(dāng)DStream中的數(shù)據(jù)要被多次計算時,這個非常有用(如在同樣數(shù)據(jù)上的多次操作)。對于像reduceByWindow和reduceByKeyAndWindow以及基于狀態(tài)的(updateStateByKey)這種操作,保存是隱含默認(rèn)的。因此,即使開發(fā)者沒有調(diào)用persist(),由基于窗操作產(chǎn)生的DStreams會自動保存在內(nèi)存中。
7x24 不間斷運行
檢查點機制
檢查點機制是我們在Spark Streaming中用來保障容錯性的主要機制。與應(yīng)用程序邏輯無關(guān)的錯誤(即系統(tǒng)錯位,JVM崩潰等)有迅速恢復(fù)的能力.
它可以使Spark Streaming階段性地把應(yīng)用數(shù)據(jù)存儲到諸如HDFS或Amazon S3這樣的可靠存儲系統(tǒng)中, 以供恢復(fù)時使用。具體來說,檢查點機制主要為以下兩個目的服務(wù)。
控制發(fā)生失敗時需要重算的狀態(tài)數(shù)。SparkStreaming可以通 過轉(zhuǎn)化圖的譜系圖來重算狀態(tài),檢查點機制則可以控制需要在轉(zhuǎn)化圖中回溯多遠(yuǎn)。
提供驅(qū)動器程序容錯。如果流計算應(yīng)用中的驅(qū)動器程序崩潰了,你可以重啟驅(qū)動器程序 并讓驅(qū)動器程序從檢查點恢復(fù),這樣Spark Streaming就可以讀取之前運行的程序處理 數(shù)據(jù)的進度,并從那里繼續(xù)。
了實現(xiàn)這個,Spark Streaming需要為容錯存儲系統(tǒng)checkpoint足夠的信息從而使得其可以從失敗中恢復(fù)過來。有兩種類型的數(shù)據(jù)設(shè)置檢查點。
Metadata checkpointing:將定義流計算的信息存入容錯的系統(tǒng)如HDFS。元數(shù)據(jù)包括:
配置 – 用于創(chuàng)建流應(yīng)用的配置。
DStreams操作 – 定義流應(yīng)用的DStreams操作集合。
不完整批次 – 批次的工作已進行排隊但是并未完成。
Data checkpointing: 將產(chǎn)生的RDDs存入可靠的存儲空間。對于在多批次間合并數(shù)據(jù)的狀態(tài)轉(zhuǎn)換,這個很有必要。在這樣的轉(zhuǎn)換中,RDDs的產(chǎn)生基于之前批次的RDDs,這樣依賴鏈長度隨著時間遞增。為了避免在恢復(fù)期這種無限的時間增長(和鏈長度成比例),狀態(tài)轉(zhuǎn)換中間的RDDs周期性寫入可靠地存儲空間(如HDFS)從而切短依賴鏈。
總而言之,元數(shù)據(jù)檢查點在由驅(qū)動失效中恢復(fù)是首要需要的。而數(shù)據(jù)或者RDD檢查點甚至在使用了狀態(tài)轉(zhuǎn)換的基礎(chǔ)函數(shù)中也是必要的。
出于這些原因,檢查點機制對于任何生產(chǎn)環(huán)境中的流計算應(yīng)用都至關(guān)重要。你可以通過向 ssc.checkpoint() 方法傳遞一個路徑參數(shù)(HDFS、S3 或者本地路徑均可)來配置檢查點機制,同時你的應(yīng)用應(yīng)該能夠使用檢查點的數(shù)據(jù)
1. 當(dāng)程序首次啟動,其將創(chuàng)建一個新的StreamingContext,設(shè)置所有的流并調(diào)用start()。
2. 當(dāng)程序在失效后重啟,其將依據(jù)檢查點目錄的檢查點數(shù)據(jù)重新創(chuàng)建一個StreamingContext。 通過使用StraemingContext.getOrCreate很容易獲得這個性能。
ssc.checkpoint("hdfs://...") # 創(chuàng)建和設(shè)置一個新的StreamingContext def functionToCreateContext(): sc = SparkContext(...) # new context ssc = new StreamingContext(...) lines = ssc.socketTextStream(...) # create DStreams ... ssc.checkpoint(checkpointDirectory) # 設(shè)置檢查點目錄 return ssc # 從檢查點數(shù)據(jù)中獲取StreamingContext或者重新創(chuàng)建一個 context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) # 在需要完成的context上做額外的配置 # 無論其有沒有啟動 context ... # 啟動context context.start() contaxt.awaitTermination()
|
如果檢查點目錄(checkpointDirectory)存在,那么context將會由檢查點數(shù)據(jù)重新創(chuàng)建。如果目錄不存在(首次運行),那么函數(shù)functionToCreateContext將會被調(diào)用來創(chuàng)建一個新的context并設(shè)置DStreams。
注意RDDs的檢查點引起存入可靠內(nèi)存的開銷。在RDDs需要檢查點的批次里,處理的時間會因此而延長。所以,檢查點的間隔需要很仔細(xì)地設(shè)置。在小尺寸批次(1秒鐘)。每一批次檢查點會顯著減少操作吞吐量。反之,檢查點設(shè)置的過于頻繁導(dǎo)致“血統(tǒng)”和任務(wù)尺寸增長,這會有很不好的影響對于需要RDD檢查點設(shè)置的狀態(tài)轉(zhuǎn)換,默認(rèn)間隔是批次間隔的乘數(shù)一般至少為10秒鐘。可以通過dstream.checkpoint(checkpointInterval)。通常,檢查點設(shè)置間隔是5-10個DStream的滑動間隔。
WAL預(yù)寫日志
WAL 即 write ahead log(預(yù)寫日志),是在 1.2 版本中就添加的特性。作用就是,將數(shù)據(jù)通過日志的方式寫到可靠的存儲,比如 HDFS、s3,在 driver 或 worker failure 時可以從在可靠存儲上的日志文件恢復(fù)數(shù)據(jù)。WAL 在 driver 端和 executor 端都有應(yīng)用。
WAL在 driver 端的應(yīng)用
用于寫日志的對象 writeAheadLogOption: WriteAheadLog。在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 構(gòu)造函數(shù)中被創(chuàng)建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,這里只需要啟用 checkpoint 就可以創(chuàng)建該 driver 端的 WAL 管理實例,而不需要將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。
寫什么、何時寫、寫什么
首選需要明確的是,ReceivedBlockTracker 通過 WAL 寫入 log 文件的內(nèi)容是3種事件(當(dāng)然,會進行序列化):
· case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個 block 及該 block 的具體信息,包括 streamId、blockId、數(shù)據(jù)條數(shù)等
· case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即為某個 batchTime 分配了哪些 blocks 作為該 batch RDD 的數(shù)據(jù)源
· case class BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 對應(yīng)的 block
· 知道了寫了什么內(nèi)容,結(jié)合源碼,也不難找出是什么時候?qū)懥诉@些內(nèi)容。需要再次注意的是,寫上面這三種事件,也不需要將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。
WAL 在 executor 端的應(yīng)用
· Receiver 接收到的數(shù)據(jù)會源源不斷的傳遞給 ReceiverSupervisor,是否啟用 WAL 機制(即是否將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true)會影響 ReceiverSupervisor 在存儲 block 時的行為:
· 不啟用 WAL:你設(shè)置的StorageLevel是什么,就怎么存儲。比如MEMORY_ONLY只會在內(nèi)存中存一份,MEMORY_AND_DISK會在內(nèi)存和磁盤上各存一份等
· 啟用 WAL:在StorageLevel指定的存儲的基礎(chǔ)上,寫一份到 WAL 中。存儲一份在 WAL 上,更不容易丟數(shù)據(jù)但性能損失也比較大
· 關(guān)于是否要啟用 WAL,要視具體的業(yè)務(wù)而定:
· 若可以接受一定的數(shù)據(jù)丟失,則不需要啟用 WAL,因為對性能影響較大
· 若完全不能接受數(shù)據(jù)丟失,那就需要同時啟用 checkpoint 和 WAL,checkpoint 保存著執(zhí)行進度(比如已生成但未完成的 jobs),WAL 中保存著 blocks 及 blocks 元數(shù)據(jù)(比如保存著未完成的 jobs 對應(yīng)的 blocks 信息及 block 文件)。同時,這種情況可能要在數(shù)據(jù)源和 Streaming Application 中聯(lián)合來保證 exactly once 語義
· 預(yù)寫日志功能的流程是:
· 1)一個SparkStreaming應(yīng)用開始時(也就是driver開始時),相關(guān)的StreamingContext使用SparkContext啟動接收器成為長駐運行任務(wù)。這些接收器接收并保存流數(shù)據(jù)到Spark內(nèi)存中以供處理。
· 2)接收器通知driver。
· 3)接收塊中的元數(shù)據(jù)(metadata)被發(fā)送到driver的StreamingContext。
· 這個元數(shù)據(jù)包括:
· (a)定位其在executor內(nèi)存中數(shù)據(jù)的塊referenceid,
· (b)塊數(shù)據(jù)在日志中的偏移信息(如果啟用了)。
· 用戶傳送數(shù)據(jù)的生命周期如下圖所示。
·
·類似Kafka這樣的系統(tǒng)可以通過復(fù)制數(shù)據(jù)保持可靠性。
背壓機制
默認(rèn)情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計算過程中會出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實際計算一個批次花費時間, batch interval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時間,會造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來實現(xiàn),此舉雖然可以通過限制接收速率,來適配當(dāng)前的處理能力,防止內(nèi)存溢出,但也會引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當(dāng)前集群處理能力也高于maxRate,這就會造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。
Spark Streaming Backpressure: 根據(jù)JobScheduler反饋作業(yè)的執(zhí)行信息來動態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認(rèn)值false,即不啟用。
Streaming架構(gòu)如下圖所示

在原架構(gòu)的基礎(chǔ)上加上一個新的組件RateController,這個組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).

流量控制點
當(dāng)Receiver開始接收數(shù)據(jù)時,會通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時將數(shù)據(jù)取走,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時,要獲取許可(令牌)。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞,進而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。
其令牌投放采用令牌桶機制進行, 原理如下圖所示:

令牌桶機制: 大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會超過桶的大小。當(dāng)進行某操作時需要令牌時會從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作,否則阻塞。用完之后不用放回。
驅(qū)動器程序容錯
驅(qū)動器程序的容錯要求我們以特殊的方式創(chuàng)建 StreamingContext。我們需要把檢查點目錄提供給 StreamingContext。與直接調(diào)用 new StreamingContext 不同,應(yīng)該使用 StreamingContext.getOrCreate() 函數(shù)。
配置過程如下:
啟動Driver自動重啟功能
· standalone: 提交任務(wù)時添加 --supervise 參數(shù)
· yarn:設(shè)置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts
· mesos: 提交任務(wù)時添加 --supervise 參數(shù)
設(shè)置checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
支持從checkpoint中重啟配置
def createContext(checkpointDirectory: String): StreamingContext = {
val ssc = new StreamingContext
ssc.checkpoint(checkpointDirectory)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
工作節(jié)點容錯
為了應(yīng)對工作節(jié)點失敗的問題,Spark Streaming使用與Spark的容錯機制相同的方法。所 有從外部數(shù)據(jù)源中收到的數(shù)據(jù)都在多個工作節(jié)點上備份。所有從備份數(shù)據(jù)轉(zhuǎn)化操作的過程 中創(chuàng)建出來的 RDD 都能容忍一個工作節(jié)點的失敗,因為根據(jù) RDD 譜系圖,系統(tǒng)可以把丟 失的數(shù)據(jù)從幸存的輸入數(shù)據(jù)備份中重算出來。對于reduceByKey等Stateful操作重做的lineage較長的,強制啟動checkpoint,減少重做幾率
接收器容錯
運行接收器的工作節(jié)點的容錯也是很重要的。如果這樣的節(jié)點發(fā)生錯誤,Spark Streaming 會在集群中別的節(jié)點上重啟失敗的接收器。然而,這種情況會不會導(dǎo)致數(shù)據(jù)的丟失取決于 數(shù)據(jù)源的行為(數(shù)據(jù)源是否會重發(fā)數(shù)據(jù))以及接收器的實現(xiàn)(接收器是否會向數(shù)據(jù)源確認(rèn) 收到數(shù)據(jù))。舉個例子,使用 Flume 作為數(shù)據(jù)源時,兩種接收器的主要區(qū)別在于數(shù)據(jù)丟失 時的保障。在“接收器從數(shù)據(jù)池中拉取數(shù)據(jù)”的模型中,Spark 只會在數(shù)據(jù)已經(jīng)在集群中 備份時才會從數(shù)據(jù)池中移除元素。而在“向接收器推數(shù)據(jù)”的模型中,如果接收器在數(shù)據(jù) 備份之前失敗,一些數(shù)據(jù)可能就會丟失??偟膩碚f,對于任意一個接收器,你必須同時考 慮上游數(shù)據(jù)源的容錯性(是否支持事務(wù))來確保零數(shù)據(jù)丟失。
一般主要是通過將接收到數(shù)據(jù)后先寫日志(WAL)到可靠文件系統(tǒng)中,后才寫入實際的RDD。如果后續(xù)處理失敗則成功寫入WAL的數(shù)據(jù)通過WAL進行恢復(fù),未成功寫入WAL的數(shù)據(jù)通過可回溯的Source進行重放
總的來說,接收器提供以下保證。
? 所有從可靠文件系統(tǒng)中讀取的數(shù)據(jù)(比如通過StreamingContext.hadoopFiles讀取的) 都是可靠的,因為底層的文件系統(tǒng)是有備份的。Spark Streaming會記住哪些數(shù)據(jù)存放到 了檢查點中,并在應(yīng)用崩潰后從檢查點處繼續(xù)執(zhí)行。
? 對于像Kafka、推式Flume、Twitter這樣的不可靠數(shù)據(jù)源,Spark會把輸入數(shù)據(jù)復(fù)制到其 他節(jié)點上,但是如果接收器任務(wù)崩潰,Spark 還是會丟失數(shù)據(jù)。在 Spark 1.1 以及更早的版 本中,收到的數(shù)據(jù)只被備份到執(zhí)行器進程的內(nèi)存中,所以一旦驅(qū)動器程序崩潰(此時所 有的執(zhí)行器進程都會丟失連接),數(shù)據(jù)也會丟失。在 Spark 1.2 中,收到的數(shù)據(jù)被記錄到諸 如 HDFS 這樣的可靠的文件系統(tǒng)中,這樣即使驅(qū)動器程序重啟也不會導(dǎo)致數(shù)據(jù)丟失。
綜上所述,確保所有數(shù)據(jù)都被處理的最佳方式是使用可靠的數(shù)據(jù)源(例如 HDFS、拉式 Flume 等)。如果你還要在批處理作業(yè)中處理這些數(shù)據(jù),使用可靠數(shù)據(jù)源是最佳方式,因為 這種方式確保了你的批處理作業(yè)和流計算作業(yè)能讀取到相同的數(shù)據(jù),因而可以得到相同的結(jié)果。
操作過程如下:
啟用checkpoint
ssc.setCheckpoint(checkpointDir)
啟用WAL
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
對Receiver使用可靠性存儲StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2
處理保證
由于Spark Streaming工作節(jié)點的容錯保障,Spark Streaming可以為所有的轉(zhuǎn)化操作提供 “精確一次”執(zhí)行的語義,即使一個工作節(jié)點在處理部分?jǐn)?shù)據(jù)時發(fā)生失敗,最終的轉(zhuǎn)化結(jié)
果(即轉(zhuǎn)化操作得到的 RDD)仍然與數(shù)據(jù)只被處理一次得到的結(jié)果一樣。
然而,當(dāng)把轉(zhuǎn)化操作得到的結(jié)果使用輸出操作推入外部系統(tǒng)中時,寫結(jié)果的任務(wù)可能因故 障而執(zhí)行多次,一些數(shù)據(jù)可能也就被寫了多次。由于這引入了外部系統(tǒng),因此我們需要專 門針對各系統(tǒng)的代碼來處理這樣的情況。我們可以使用事務(wù)操作來寫入外部系統(tǒng)(即原子 化地將一個 RDD 分區(qū)一次寫入),或者設(shè)計冪等的更新操作(即多次運行同一個更新操作 仍生成相同的結(jié)果)。比如 Spark Streaming 的 saveAs...File 操作會在一個文件寫完時自動 將其原子化地移動到最終位置上,以此確保每個輸出文件只存在一份。
性能考量
最常見的問題是Spark Streaming可以使用的最小批次間隔是多少。總的來說,500毫秒已經(jīng)被證實為對許多應(yīng)用而言是比較好的最小批次大小。尋找最小批次大小的最佳實踐是從一個比較大的批次大小(10 秒左右)開始,不斷使用更小的批次大小。如果 Streaming 用 戶界面中顯示的處理時間保持不變,你就可以進一步減小批次大小。如果處理時間開始增 加,你可能已經(jīng)達(dá)到了應(yīng)用的極限。
相似地,對于窗口操作,計算結(jié)果的間隔(也就是滑動步長)對于性能也有巨大的影響。 當(dāng)計算代價巨大并成為系統(tǒng)瓶頸時,就應(yīng)該考慮提高滑動步長了。
減少批處理所消耗時間的常見方式還有提高并行度。有以下三種方式可以提高并行度:
? 增加接收器數(shù)目 有時如果記錄太多導(dǎo)致單臺機器來不及讀入并分發(fā)的話,接收器會成為系統(tǒng)瓶頸。這時 你就需要通過創(chuàng)建多個輸入 DStream(這樣會創(chuàng)建多個接收器)來增加接收器數(shù)目,然 后使用 union 來把數(shù)據(jù)合并為一個數(shù)據(jù)源。
? 將收到的數(shù)據(jù)顯式地重新分區(qū)如果接收器數(shù)目無法再增加,你可以通過使用 DStream.repartition 來顯式重新分區(qū)輸 入流(或者合并多個流得到的數(shù)據(jù)流)來重新分配收到的數(shù)據(jù)。
? 提高聚合計算的并行度對于像 reduceByKey() 這樣的操作,你可以在第二個參數(shù)中指定并行度,我們在介紹 RDD 時提到過類似的手段。
高級解析
DStreamGraph對象解析
在 Spark Streaming 中,DStreamGraph 是一個非常重要的組件,主要用來:
1. 通過成員 inputStreams 持有 Spark Streaming 輸入源及接收數(shù)據(jù)的方式
2. 通過成員 outputStreams 持有 Streaming app 的 output 操作,并記錄 DStream 依賴關(guān)系
3. 生成每個 batch 對應(yīng)的 jobs
下面,通過分析一個簡單的例子,結(jié)合源碼分析來說明 DStreamGraph 是如何發(fā)揮作用的。例子如下:
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
創(chuàng)建 DStreamGraph 實例
代碼val ssc = new StreamingContext(sparkConf, Seconds(2))創(chuàng)建了 StreamingContext 實例,StreamingContext 包含了 DStreamGraph 類型的成員graph,graph 在 StreamingContext主構(gòu)造函數(shù)中被創(chuàng)建,如下
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
可以看到,若當(dāng)前 checkpoint 可用,會優(yōu)先從 checkpoint 恢復(fù) graph,否則新建一個。還可以從這里知道的一點是:graph 是運行在 driver 上的
DStreamGraph記錄輸入源及如何接收數(shù)據(jù)
DStreamGraph有和application 輸入數(shù)據(jù)相關(guān)的成員和方法,如下:
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
成員inputStreams為 InputDStream 類型的數(shù)組,InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream,例如 FileInputDStream是 InputDStream 的子類,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream,需要繼承 ReceiverInputDStream,比如 KafkaReceiver。
我們來看看val lines = ssc.textFileStream(args(0))調(diào)用。
為了更容易理解,我畫出了val lines = ssc.textFileStream(args(0))的調(diào)用流程
從上面的調(diào)用流程圖我們可以知道:
ssc.textFileStream會觸發(fā)新建一個FileInputDStream。FileInputDStream繼承于InputDStream,其start()方法定義了數(shù)據(jù)源及如何接收數(shù)據(jù)
在FileInputDStream構(gòu)造函數(shù)中,會調(diào)用ssc.graph.addInputStream(this),將自身添加到 DStreamGraph 的 inputStreams: ArrayBuffer[InputDStream[_]] 中,這樣 DStreamGraph 就知道了這個 Streaming App 的輸入源及如何接收數(shù)據(jù)??赡苣銜婀譃槭裁磇nputStreams 是數(shù)組類型,舉個例子,這里再來一個 val lines1 = ssc.textFileStream(args(0)),那么又將生成一個 FileInputStream 實例添加到inputStreams,所以這里需要集合類型
生成FileInputDStream調(diào)用其 map 方法,將以 FileInputDStream 本身作為 partent 來構(gòu)造新的 MappedDStream。對于 DStream 的 transform 操作,都將生成一個新的 DStream,和 RDD transform 生成新的 RDD 類似
與MappedDStream 不同,所有繼承了 InputDStream 的定義了輸入源及接收數(shù)據(jù)方式的 sreams 都沒有 parent,因為它們就是最初的 streams。
DStream 的依賴鏈
每個 DStream 的子類都會繼承 def dependencies: List[DStream[_]] = List()方法,該方法用來返回自己的依賴的父 DStream 列表。比如,沒有父DStream 的 InputDStream 的 dependencies方法返回List()。
MappedDStream 的實現(xiàn)如下:
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
...
}
在上例中,構(gòu)造函數(shù)參數(shù)列表中的 parent 即在 ssc.textFileStream 中new 的定義了輸入源及數(shù)據(jù)接收方式的最初的 FileInputDStream實例,這里的 dependencies方法將返回該FileInputDStream實例,這就構(gòu)成了第一條依賴??捎萌缦聢D表示,這里特地將 input streams 用藍(lán)色表示,以強調(diào)其與普通由 transform 產(chǎn)生的 DStream 的不同:
繼續(xù)來看val words = lines.flatMap(_.split(" ")),flatMap如下:
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
每一個 transform 操作都將創(chuàng)建一個新的 DStream,flatMap 操作也不例外,它會創(chuàng)建一個FlatMappedDStream,F(xiàn)latMappedDStream的實現(xiàn)如下:
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
...
}
與 MappedDStream 相同,F(xiàn)latMappedDStream#dependencies也返回其依賴的父 DStream,及 lines,到這里,依賴鏈就變成了下圖:
之后的幾步操作不再這樣具體分析,到生成wordCounts時,依賴圖將變成下面這樣:
在 DStream 中,與 transofrm 相對應(yīng)的是 output 操作,包括 print, saveAsTextFiles, saveAsObjectFiles, saveAsHadoopFiles, foreachRDD。output 操作中,會創(chuàng)建ForEachDStream實例并調(diào)用register方法將自身添加到DStreamGraph.outputStreams成員中,該ForEachDStream實例也會持有是調(diào)用的哪個 output 操作。本例的代碼調(diào)用如下,只需看箭頭所指幾行代碼
與 DStream transform 操作返回一個新的 DStream 不同,output 操作不會返回任何東西,只會創(chuàng)建一個ForEachDStream作為依賴鏈的終結(jié)。
至此,生成了完成的依賴鏈,也就是 DAG,如下圖(這里將 ForEachDStream 標(biāo)為黃色以顯示其與眾不同):
ReceiverTracker 與數(shù)據(jù)導(dǎo)入
Spark Streaming 在數(shù)據(jù)接收與導(dǎo)入方面需要滿足有以下三個特點:
兼容眾多輸入源,包括HDFS, Flume, Kafka, Twitter and ZeroMQ。還可以自定義數(shù)據(jù)源
要能為每個 batch 的 RDD 提供相應(yīng)的輸入數(shù)據(jù)
為適應(yīng) 7*24h 不間斷運行,要有接收數(shù)據(jù)掛掉的容錯機制
有容乃大,兼容眾多數(shù)據(jù)源
InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream,例如 FileInputDStream是 InputDStream 的子類,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream,需要繼承 ReceiverInputDStream,比如 KafkaReceiver
只需在 driver 端接收數(shù)據(jù)的 input stream 一般比較簡單且在生產(chǎn)環(huán)境中使用的比較少,本文不作分析,只分析繼承了 ReceiverInputDStream 的 input stream 是如何導(dǎo)入數(shù)據(jù)的。
ReceiverInputDStream有一個def getReceiver(): Receiver[T]方法,每個繼承了ReceiverInputDStream的 input stream 都必須實現(xiàn)這個方法。該方法用來獲取將要分發(fā)到各個 worker 節(jié)點上用來接收數(shù)據(jù)的 receiver(接收器)。不同的 ReceiverInputDStream 子類都有它們對應(yīng)的不同的 receiver,如KafkaInputDStream對應(yīng)KafkaReceiver,F(xiàn)lumeInputDStream對應(yīng)FlumeReceiver,TwitterInputDStream對應(yīng)TwitterReceiver,如果你要實現(xiàn)自己的數(shù)據(jù)源,也需要定義相應(yīng)的 receiver。
繼承 ReceiverInputDStream 并定義相應(yīng)的 receiver,就是 Spark Streaming 能兼容眾多數(shù)據(jù)源的原因。
為每個 batch 的 RDD 提供輸入數(shù)據(jù)
在 StreamingContext 中,有一個重要的組件叫做 ReceiverTracker,它是 Spark Streaming 作業(yè)調(diào)度器 JobScheduler 的成員,負(fù)責(zé)啟動、管理各個 receiver 及管理各個 receiver 接收到的數(shù)據(jù)。
確定 receiver 要分發(fā)到哪些 executors 上執(zhí)行
創(chuàng)建 ReceiverTracker 實例
我們來看 StreamingContext#start() 方法部分調(diào)用實現(xiàn),如下:
可以看到,StreamingContext#start() 會調(diào)用 JobScheduler#start() 方法,在 JobScheduler#start() 中,會創(chuàng)建一個新的 ReceiverTracker 實例 receiverTracker,并調(diào)用其 start() 方法。
ReceiverTracker#start()
繼續(xù)跟進 ReceiverTracker#start(),如下圖,它主要做了兩件事:
初始化一個 endpoint: ReceiverTrackerEndpoint,用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息
調(diào)用 launchReceivers 來自將各個 receivers 分發(fā)到 executors 上
ReceiverTracker#launchReceivers()
繼續(xù)跟進 launchReceivers,它也主要干了兩件事:
1. 獲取 DStreamGraph.inputStreams 中繼承了 ReceiverInputDStream 的 input streams 的 receivers。也就是數(shù)據(jù)接收器
2. 給消息接收處理器 endpoint 發(fā)送 StartAllReceivers(receivers)消息。直接返回,不等待消息被處理
處理StartAllReceivers消息
endpoint 在接收到消息后,會先判斷消息類型,對不同的消息做不同處理。對于StartAllReceivers消息,處理流程如下:
計算每個 receiver 要分發(fā)的目的 executors。遵循兩條原則:
將 receiver 分布的盡量均勻
如果 receiver 的preferredLocation本身不均勻,以preferredLocation為準(zhǔn)
遍歷每個 receiver,根據(jù)第1步中得到的目的 executors 調(diào)用 startReceiver 方法
到這里,已經(jīng)確定了每個 receiver 要分發(fā)到哪些 executors 上
啟動 receivers
接上,通過 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]) 來啟動 receivers,我們來看具體流程:
如上流程圖所述,分發(fā)和啟動 receiver 的方式不可謂不精彩。其中,startReceiverFunc 函數(shù)主要實現(xiàn)如下:
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
supervisor.start() 中會調(diào)用 receiver#onStart 后立即返回。receiver#onStart 一般自行新建線程或線程池來接收數(shù)據(jù),比如在 KafkaReceiver 中,就新建了線程池,在線程池中接收 topics 的數(shù)據(jù)。
supervisor.start() 返回后,由 supervisor.awaitTermination() 阻塞住線程,以讓這個 task 一直不退出,從而可以源源不斷接收數(shù)據(jù)。
數(shù)據(jù)流轉(zhuǎn)
上圖為 receiver 接收到的數(shù)據(jù)的流轉(zhuǎn)過程,讓我們來逐一分析
Step1: Receiver -> ReceiverSupervisor
這一步中,Receiver 將接收到的數(shù)據(jù)源源不斷地傳給 ReceiverSupervisor。Receiver 調(diào)用其 store(...) 方法,store 方法中繼續(xù)調(diào)用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法來傳遞數(shù)據(jù)。Receiver#store 有多重形式, ReceiverSupervisor 也有 pushSingle、pushArrayBuffer、pushIterator、pushBytes 方法與不同的 store 對應(yīng)。
pushSingle: 對應(yīng)單條小數(shù)據(jù)
pushArrayBuffer: 對應(yīng)數(shù)組形式的數(shù)據(jù)
pushIterator: 對應(yīng) iterator 形式數(shù)據(jù)
pushBytes: 對應(yīng) ByteBuffer 形式的塊數(shù)據(jù)
對于細(xì)小的數(shù)據(jù),存儲時需要 BlockGenerator 聚集多條數(shù)據(jù)成一塊,然后再成塊存儲;反之就不用聚集,直接成塊存儲。當(dāng)然,存儲操作并不在 Step1 中執(zhí)行,只為說明之后不同的操作邏輯。
Step2.1: ReceiverSupervisor -> BlockManager -> disk/memory
在這一步中,主要將從 receiver 收到的數(shù)據(jù)以 block(數(shù)據(jù)塊)的形式存儲
存儲 block 的是receivedBlockHandler: ReceivedBlockHandler,根據(jù)參數(shù)spark.streaming.receiver.writeAheadLog.enable配置的不同,默認(rèn)為 false,receivedBlockHandler對象對應(yīng)的類也不同,如下:
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
//< 先寫 WAL,再存儲到 executor 的內(nèi)存或硬盤
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
//< 直接存到 executor 的內(nèi)存或硬盤
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
啟動 WAL 的好處就是在application 掛掉之后,可以恢復(fù)數(shù)據(jù)。
//< 調(diào)用 receivedBlockHandler.storeBlock 方法存儲 block,并得到一個 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//< 使用blockStoreResult初始化一個ReceivedBlockInfo實例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//< 發(fā)送消息通知 ReceiverTracker 新增并存儲了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
不管是 WriteAheadLogBasedBlockHandler 還是 BlockManagerBasedBlockHandler 最終都是通過 BlockManager 將 block 數(shù)據(jù)存儲 execuor 內(nèi)存或磁盤或還有 WAL 方式存入。
這里需要說明的是 streamId,每個 InputDStream 都有它自己唯一的 id,即 streamId,blockInfo包含 streamId 是為了區(qū)分block 是哪個 InputDStream 的數(shù)據(jù)。之后為 batch 分配 blocks 時,需要知道每個 InputDStream 都有哪些未分配的 blocks。
Step2.2: ReceiverSupervisor -> ReceiverTracker
將 block 存儲之后,獲得 block 描述信息 blockInfo: ReceivedBlockInfo,這里面包含:streamId、數(shù)據(jù)位置、數(shù)據(jù)條數(shù)、數(shù)據(jù) size 等信息。
之后,封裝以 block 作為參數(shù)的 AddBlock(blockInfo) 消息并發(fā)送給 ReceiverTracker 以通知其有新增 block 數(shù)據(jù)塊。
Step3: ReceiverTracker -> ReceivedBlockTracker
ReceiverTracker 收到 ReceiverSupervisor 發(fā)來的 AddBlock(blockInfo) 消息后,直接調(diào)用以下代碼將 block 信息傳給 ReceivedBlockTracker:
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
receivedBlockTracker.addBlock中,如果啟用了 WAL,會將新增的 block 信息以 WAL 方式保存。
無論 WAL 是否啟用,都會將新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]中,該變量 key 為 InputDStream 的唯一 id,value 為已存儲未分配的 block 信息。之后為 batch 分配blocks,會訪問該結(jié)構(gòu)來獲取每個 InputDStream 對應(yīng)的未消費的 blocks。
動態(tài)生成JOB
JobScheduler有兩個重要成員,一是ReceiverTracker,負(fù)責(zé)分發(fā) receivers 及源源不斷地接收數(shù)據(jù);二是JobGenerator,負(fù)責(zé)定時的生成 jobs 并 checkpoint。
定時邏輯
在 JobScheduler 的主構(gòu)造函數(shù)中,會創(chuàng)建 JobGenerator 對象。在 JobGenerator 的主構(gòu)造函數(shù)中,會創(chuàng)建一個定時器:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
該定時器每隔 ssc.graph.batchDuration.milliseconds 會執(zhí)行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息,eventLoop收到消息后會進行這個 batch 對應(yīng)的 jobs 的生成及提交執(zhí)行,eventLoop 是一個消息接收處理器。
需要注意的是,timer 在創(chuàng)建之后并不會馬上啟動,將在 StreamingContext#start() 啟動 Streaming Application 時間接調(diào)用到 timer.start(restartTime.milliseconds)才啟動。
為 batch 生成 jobs
eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后的主要處理流程有以上圖中三步:
將已接收到的 blocks 分配給 batch
生成該 batch 對應(yīng)的 jobs
將 jobs 封裝成 JobSet 并提交執(zhí)行
接下來我們就將逐一展開這三步進行分析
將已接受到的 blocks 分配給 batch
上圖是根據(jù)源碼畫出的為 batch 分配 blocks 的流程圖,這里對 『獲得 batchTime 各個 InputDStream 未分配的 blocks』作進一步說明:
我們知道了各個 ReceiverInputDStream 對應(yīng)的 receivers 接收并保存的 blocks 信息會保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues,該成員 key 為 streamId,value 為該 streamId 對應(yīng)的 InputDStream 已接收保存但尚未分配的 blocks 信息。
所以獲取某 InputDStream 未分配的 blocks 只要以該 InputDStream 的 streamId 來從 streamIdToUnallocatedBlockQueues 來 get 就好。獲取之后,會清楚該 streamId 對應(yīng)的value,以保證 block 不會被重復(fù)分配。
在實際調(diào)用中,為 batchTime 分配 blocks 時,會從streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞進 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中,以在之后作為該 batchTime 對應(yīng)的 RDD 的輸入數(shù)據(jù)。
通過以上步驟,就可以為 batch 的所有 InputDStream 分配 blocks。也就是為 batch 分配了 blocks。
生成該 batch 對應(yīng)的 jobs
為指定 batchTime 生成 jobs 的邏輯如上圖所示。你可能會疑惑,為什么 DStreamGraph#generateJobs(time: Time)為什么返回 Seq[Job],而不是單個 job。這是因為,在一個 batch 內(nèi),可能會有多個 OutputStream 執(zhí)行了多次 output 操作,每次 output 操作都將產(chǎn)生一個 Job,最終就會產(chǎn)生多個 Jobs。
我們結(jié)合上圖對執(zhí)行流程進一步分析。
在DStreamGraph#generateJobs(time: Time)中,對于DStreamGraph成員ArrayBuffer[DStream[_]]的每一項,調(diào)用DStream#generateJob(time: Time)來生成這個 outputStream 在該 batchTime 的 job。該生成過程主要有三步:
Step1: 獲取該 outputStream 在該 batchTime 對應(yīng)的 RDD
每個 DStream 實例都有一個 generatedRDDs: HashMap[Time, RDD[T]] 成員,用來保存該 DStream 在每個 batchTime 生成的 RDD,當(dāng) DStream#getOrCompute(time: Time)調(diào)用時
首先會查看generatedRDDs中是否已經(jīng)有該 time 對應(yīng)的 RDD,若有則直接返回
若無,則調(diào)用compute(validTime: Time)來生成 RDD,這一步根據(jù)每個 InputDStream繼承 compute 的實現(xiàn)不同而不同。例如,對于 FileInputDStream,其 compute 實現(xiàn)邏輯如下:
1. 先通過一個 findNewFiles() 方法,找到多個新 file
2. 對每個新 file,都將其作為參數(shù)調(diào)用 sc.newAPIHadoopFile(file),生成一個 RDD 實例
3. 將 2 中的多個新 file 對應(yīng)的多個 RDD 實例進行 union,返回一個 union 后的 UnionRDD
Step2: 根據(jù) Step1中得到的 RDD 生成最終 job 要執(zhí)行的函數(shù) jobFunc
jobFunc定義如下:
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
可以看到,每個 outputStream 的 output 操作生成的 Job 其實與 RDD action 一樣,最終調(diào)用 SparkContext#runJob 來提交 RDD DAG 定義的任務(wù)
Step3: 根據(jù) Step2中得到的 jobFunc 生成最終要執(zhí)行的 Job 并返回
Step2中得到了定義 Job 要干嘛的函數(shù)-jobFunc,這里便以 jobFunc及 batchTime 生成 Job 實例:
Some(new Job(time, jobFunc))
該Job實例將最終封裝在 JobHandler 中被執(zhí)行
至此,我們搞明白了 JobScheduler 是如何通過一步步調(diào)用來動態(tài)生成每個 batchTime 的 jobs。下文我們將分析這些動態(tài)生成的 jobs 如何被分發(fā)及如何執(zhí)行。
job 的提交與執(zhí)行
我們分析了 JobScheduler 是如何動態(tài)為每個 batch生成 jobs,那么生成的 jobs 是如何被提交的。
在 JobScheduler 生成某個 batch 對應(yīng)的 Seq[Job] 之后,會將 batch 及 Seq[Job] 封裝成一個 JobSet 對象,JobSet 持有某個 batch 內(nèi)所有的 jobs,并記錄各個 job 的運行狀態(tài)。
之后,調(diào)用JobScheduler#submitJobSet(jobSet: JobSet)來提交 jobs,在該函數(shù)中,除了一些狀態(tài)更新,主要任務(wù)就是執(zhí)行
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
即,對于 jobSet 中的每一個 job,執(zhí)行jobExecutor.execute(new JobHandler(job)),要搞懂這行代碼干了什么,就必須了解 JobHandler 及 jobExecutor。
JobHandler
JobHandler 繼承了 Runnable,為了說明與 job 的關(guān)系,其精簡后的實現(xiàn)如下:
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
_eventLoop.post(JobStarted(job))
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
_eventLoop = eventLoop
}
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job))
}
}
}
JobHandler#run 方法主要執(zhí)行了 job.run(),該方法最終將調(diào)用到
『生成該 batch 對應(yīng)的 jobs的Step2 定義的 jobFunc』,jonFunc 將提交對應(yīng) RDD DAG 定義的 job。
JobExecutor
知道了 JobHandler 是用來執(zhí)行 job 的,那么 JobHandler 將在哪里執(zhí)行 job 呢?答案是
jobExecutor,jobExecutor為 JobScheduler 成員,是一個線程池,在JobScheduler 主構(gòu)造函數(shù)中創(chuàng)建,如下:
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
JobHandler 將最終在 線程池jobExecutor 的線程中被調(diào)用,jobExecutor的線程數(shù)可通過spark.streaming.concurrentJobs配置,默認(rèn)為1。若配置多個線程,就能讓多個 job 同時運行,若只有一個線程,那么同一時刻只能有一個 job 運行。
以上,即 jobs 被執(zhí)行的邏輯。
Block 的生成與存儲
ReceiverSupervisorImpl共提供了4個將從 receiver 傳遞過來的數(shù)據(jù)轉(zhuǎn)換成 block 并存儲的方法,分別是:
pushSingle: 處理單條數(shù)據(jù)
pushArrayBuffer: 處理數(shù)組形式數(shù)據(jù)
pushIterator: 處理 iterator 形式處理
pushBytes: 處理 ByteBuffer 形式數(shù)據(jù)
其中,pushArrayBuffer、pushIterator、pushBytes最終調(diào)用pushAndReportBlock;而pushSingle將調(diào)用defaultBlockGenerator.addData(data),我們分別就這兩種形式做說明
pushAndReportBlock
我們針對存儲 block 簡化 pushAndReportBlock 后的代碼如下:
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
...
val blockId = blockIdOption.getOrElse(nextBlockId)
receivedBlockHandler.storeBlock(blockId, receivedBlock)
...
}
首先獲取一個新的 blockId,之后調(diào)用 receivedBlockHandler.storeBlock, receivedBlockHandler 在 ReceiverSupervisorImpl 構(gòu)造函數(shù)中初始化。當(dāng)啟用了 checkpoint 且 spark.streaming.receiver.writeAheadLog.enable 為 true 時,receivedBlockHandler 被初始化為 WriteAheadLogBasedBlockHandler 類型;否則將初始化為 BlockManagerBasedBlockHandler類型。
WriteAheadLogBasedBlockHandler#storeBlock 將 ArrayBuffer, iterator, bytes 類型的數(shù)據(jù)序列化后得到的 serializedBlock
交由 BlockManager 根據(jù)設(shè)置的 StorageLevel 存入 executor 的內(nèi)存或磁盤中
通過 WAL 再存儲一份
而BlockManagerBasedBlockHandler#storeBlock將 ArrayBuffer, iterator, bytes 類型的數(shù)據(jù)交由 BlockManager 根據(jù)設(shè)置的 StorageLevel 存入 executor 的內(nèi)存或磁盤中,并不再通過 WAL 存儲一份
pushSingle
pushSingle將調(diào)用 BlockGenerator#addData(data: Any) 通過積攢的方式來存儲數(shù)據(jù)。接下來對 BlockGenerator 是如何積攢一條一條數(shù)據(jù)最后寫入 block 的邏輯
上圖為 BlockGenerator 的各個成員,首選對各個成員做介紹:
currentBuffer
變長數(shù)組,當(dāng) receiver 接收的一條一條的數(shù)據(jù)將會添加到該變長數(shù)組的尾部
可能會有一個 receiver 的多個線程同時進行添加數(shù)據(jù),這里是同步操作
添加前,會由 rateLimiter 檢查一下速率,是否加入的速度過快。如果過快的話就需要 block 住,等到下一秒再開始添加。最高頻率由 spark.streaming.receiver.maxRate 控制,默認(rèn)值為 Long.MaxValue,具體含義是單個 Receiver 每秒鐘允許添加的條數(shù)。
blockIntervalTimer & blockIntervalMs
分別是定時器和時間間隔。blockIntervalTimer中有一個線程,每隔blockIntervalMs會執(zhí)行以下操作:
將 currentBuffer 賦值給 newBlockBuffer
將 currentBuffer 指向新的空的 ArrayBuffer 對象
將 newBlockBuffer 封裝成 newBlock
將 newBlock 添加到 blocksForPushing 隊列中blockIntervalMs 由 spark.streaming.blockInterval 控制,默認(rèn)是 200ms。
blockPushingThread & blocksForPushing & blockQueueSize
blocksForPushing 是一個定長數(shù)組,長度由 blockQueueSize 決定,默認(rèn)為10,可通過 spark.streaming.blockQueueSize 改變。上面分析到,blockIntervalTimer中的線程會定時將 block 塞入該隊列。
還有另一條線程不斷送該隊列中取出 block,然后調(diào)用 ReceiverSupervisorImpl.pushArrayBuffer(...) 來將 block 存儲,這條線程就是blockPushingThread。
PS: blocksForPushing為ArrayBlockingQueue類型。ArrayBlockingQueue是一個阻塞隊列,能夠自定義隊列大小,當(dāng)插入時,如果隊列已經(jīng)沒有空閑位置,那么新的插入線程將阻塞到該隊列,一旦該隊列有空閑位置,那么阻塞的線程將執(zhí)行插入文章來源:http://www.zghlxwxcb.cn/news/detail-782662.html
以上,通過分析各個成員,也說明了 BlockGenerator 是如何存儲單條數(shù)據(jù)的。文章來源地址http://www.zghlxwxcb.cn/news/detail-782662.html
到了這里,關(guān)于spark介紹之spark streaming的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!