国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

spark介紹之spark streaming

這篇具有很好參考價值的文章主要介紹了spark介紹之spark streaming。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

  1. Spark Streaming概述

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. 什么是Spark Streaming

Spark Streaming類似于Apache Storm,用于流式數(shù)據(jù)的處理。根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機器學(xué)習(xí))以及Graphx完美融合。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

和Spark基于RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個時間區(qū)間收到的數(shù)據(jù)都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此 得名“離散化”)。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

DStream 可以從各種輸入源創(chuàng)建,比如 Flume、Kafka 或者 HDFS。創(chuàng)建出來的DStream 支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會生成一個新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream 提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時間相關(guān)的新操作,比如滑動窗口。

  1. 為什么要學(xué)習(xí)Spark Streaming

  1. 易用

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. 容錯

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. 易整合到Spark體系

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. Spark與Storm的對比

Spark

Storm

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

開發(fā)語言:Scala

開發(fā)語言:Clojure

編程模型:DStream

編程模型:Spout/Bolt

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. 運行Spark Streaming

  1. IDEA編寫程序

Pom.xml 加入以下依賴:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.11</artifactId>

<version>${spark.version}</version>

<scope>provided</scope>

</dependency>

package com.atguigu.streaming

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

def main(args: Array[String]) {

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("master01", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

}

}

按照Spark Core中的方式進行打包,并將程序上傳到Spark機器上。并運行:

bin/spark-submit --class com.atguigu.streaming.WorldCount ~/wordcount-jar-with-dependencies.jar

通過Netcat發(fā)送數(shù)據(jù):

# TERMINAL 1:

# Running Netcat

$ nc -lk 9999

hello world

如果程序運行時,log日志太多,可以將spark conf目錄下的log4j文件里面的日志級別改成WARN。

  1. 架構(gòu)與抽象

Spark Streaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來。在每個時間區(qū)間開始的時候,一個新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中。在時間區(qū)間結(jié)束時,批次停止增長。時間區(qū)間的大小是由批次間隔這個參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。高層次的架構(gòu)如圖

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個 RDD 序列,每個RDD代表數(shù)據(jù)流中一個時間片內(nèi)的數(shù)據(jù)。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

Spark Streaming在Spark的驅(qū)動器程序—工作節(jié)點的結(jié)構(gòu)的執(zhí)行過程如下圖所示。Spark Streaming為每個輸入源啟動對 應(yīng)的接收器。接收器以任務(wù)的形式運行在應(yīng)用的執(zhí)行器進程中,從輸入源收集數(shù)據(jù)并保存為 RDD。它們收集到輸入數(shù)據(jù)后會把數(shù)據(jù)復(fù)制到另一個執(zhí)行器進程來保障容錯性(默 認(rèn)行為)。數(shù)據(jù)保存在執(zhí)行器進程的內(nèi)存中,和緩存 RDD 的方式一樣。驅(qū)動器程序中的 StreamingContext 會周期性地運行 Spark 作業(yè)來處理這些數(shù)據(jù),把數(shù)據(jù)與之前時間區(qū)間中的 RDD 進行整合。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. Spark Streaming解析

  1. 初始化StreamingContext

import org.apache.spark._

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)

val ssc = new StreamingContext(conf, Seconds(1))

// 可以通過ssc.sparkContext 來訪問SparkContext

// 或者通過已經(jīng)存在的SparkContext來創(chuàng)建StreamingContext

import org.apache.spark.streaming._

val sc = ... // existing SparkContext

val ssc = new StreamingContext(sc, Seconds(1))

初始化完Context之后:

  1. 定義消息輸入源來創(chuàng)建DStreams.

  1. 定義DStreams的轉(zhuǎn)化操作和輸出操作。

  1. 通過 streamingContext.start()來啟動消息采集和處理.

  1. 等待程序終止,可以通過streamingContext.awaitTermination()來設(shè)置

  1. 通過streamingContext.stop()來手動終止處理程序。

StreamingContext和SparkContext什么關(guān)系?

import org.apache.spark.streaming._

val sc = ... // existing SparkContext

val ssc = new StreamingContext(sc, Seconds(1))

注意:

StreamingContext一旦啟動,對DStreams的操作就不能修改了。

在同一時間一個JVM中只有一個StreamingContext可以啟動

stop() 方法將同時停止SparkContext,可以傳入?yún)?shù)stopSparkContext用于只停止StreamingContext

在Spark1.4版本后,如何優(yōu)雅的停止SparkStreaming而不丟失數(shù)據(jù),通過設(shè)置sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") 即可。在StreamingContext的start方法中已經(jīng)注冊了Hook方法。

  1. 什么是DStreams

Discretized Stream是Spark Streaming的基礎(chǔ)抽象,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實現(xiàn)上,DStream是一系列連續(xù)的RDD來表示。每個RDD含有一段時間間隔內(nèi)的數(shù)據(jù),如下圖:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

對數(shù)據(jù)的操作也是按照RDD為單位來進行的

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

計算過程由Spark engine來完成

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. DStreams輸入

Spark Streaming原生支持一些不同的數(shù)據(jù)源。一些“核心”數(shù)據(jù)源已經(jīng)被打包到Spark Streaming 的 Maven 工件中,而其他的一些則可以通過 spark-streaming-kafka 等附加工件獲取。每個接收器都以 Spark 執(zhí)行器程序中一個長期運行的任務(wù)的形式運行,因此會占據(jù)分配給應(yīng)用的 CPU 核心。此外,我們還需要有可用的 CPU 核心來處理數(shù)據(jù)。這意味著如果要運行多個接收器,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計算所需要的核心數(shù)。例如,如果我們想要在流計算應(yīng)用中運行 10 個接收器,那么至少需要為應(yīng)用分配 11 個 CPU 核心。所以如果在本地模式運行,不要使用local或者local[1]。

  1. 基本數(shù)據(jù)源

  1. 文件數(shù)據(jù)源

Socket數(shù)據(jù)流前面的例子已經(jīng)看到過。

文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進行讀取

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 將會監(jiān)控 dataDirectory 目錄并不斷處理移動進來的文件,記住目前不支持嵌套目錄。

文件需要有相同的數(shù)據(jù)格式

文件進入 dataDirectory的方式需要通過移動或者重命名來實現(xiàn)。

一旦文件移動進目錄,則不能再修改,即便修改了也不會讀取新數(shù)據(jù)。

如果文件比較簡單,則可以使用 streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不需要接收器,不需要單獨分配CPU核。

Hdfs讀取實例:

提前需要在HDFS上建好目錄。

scala> import org.apache.spark.streaming._

import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc, Seconds(1))

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4027edeb

scala> val lines = ssc.textFileStream("hdfs://master01:9000/data/")

lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@61d9dd15

scala> val words = lines.flatMap(_.split(" "))

words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1e084a26

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@8947a4b

scala> wordCounts.print()

scala> ssc.start()

上傳文件上去:

[bigdata@master01 hadoop-2.7.3]$ ls

bin data etc include lib libexec LICENSE.txt logs NOTICE.txt README.txt sbin sdata share

[bigdata@master01 hadoop-2.7.3]$ bin/hdfs dfs -put ./LICENSE.txt /data/

[bigdata@master01 hadoop-2.7.3]$ bin/hdfs dfs -put ./README.txt /data/

獲取計算結(jié)果:

-------------------------------------------

Time: 1504665716000 ms

-------------------------------------------

-------------------------------------------

Time: 1504665717000 ms

-------------------------------------------

-------------------------------------------

Time: 1504665718000 ms

-------------------------------------------

(227.7202-1,2)

(created,2)

(offer,8)

(BUSINESS,11)

(agree,10)

(hereunder,,1)

(“control”,1)

(Grant,2)

(2.2.,2)

(include,11)

...

-------------------------------------------

Time: 1504665719000 ms


-------------------------------------------


Time: 1504665739000 ms


-------------------------------------------


-------------------------------------------


Time: 1504665740000 ms


-------------------------------------------


(under,1)


(Technology,1)


(distribution,2)


(http://hadoop.apache.org/core/,1)


(Unrestricted,1)


(740.13),1)


(check,1)


(have,1)


(policies,1)


(uses,1)


...


-------------------------------------------


Time: 1504665741000 ms


-------------------------------------------


  1. 自定義數(shù)據(jù)源

通過繼承Receiver,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。

class CustomReceiver(host: String, port: Int)

extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

def onStart() {

// Start the thread that receives data over a connection

new Thread("Socket Receiver") {

override def run() { receive() }

}.start()

}

def onStop() {

// There is nothing much to do as the thread calling receive()

// is designed to stop by itself if isStopped() returns false

}

/** Create a socket connection and receive data until receiver is stopped */

private def receive() {

var socket: Socket = null

var userInput: String = null

try {

// Connect to host:port

socket = new Socket(host, port)

// Until stopped or connection broken continue reading

val reader = new BufferedReader(

new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

userInput = reader.readLine()

while(!isStopped && userInput != null) {

store(userInput)

userInput = reader.readLine()

}

reader.close()

socket.close()

// Restart in an attempt to connect again when server is active again

restart("Trying to connect again")

} catch {

case e: java.net.ConnectException =>

// restart if could not connect to server

restart("Error connecting to " + host + ":" + port, e)

case t: Throwable =>

// restart if there is any other error

restart("Error receiving data", t)

}

}

}

可以通過streamingContext.receiverStream(<instance of custom receiver>)

來使用自定義的數(shù)據(jù)采集源

// Assuming ssc is the StreamingContext

val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))

val words = lines.flatMap(_.split(" "))

...

模擬Spark內(nèi)置的Socket鏈接:

package com.atguigu.streaming

import java.io.{BufferedReader, InputStreamReader}

import java.net.Socket

import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.receiver.Receiver

class CustomReceiver (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

override def onStart(): Unit = {

// Start the thread that receives data over a connection

new Thread("Socket Receiver") {

override def run() { receive() }

}.start()

}

override def onStop(): Unit = {

// There is nothing much to do as the thread calling receive()

// is designed to stop by itself if isStopped() returns false

}

/** Create a socket connection and receive data until receiver is stopped */

private def receive() {

var socket: Socket = null

var userInput: String = null

try {

// Connect to host:port

socket = new Socket(host, port)

// Until stopped or connection broken continue reading

val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

userInput = reader.readLine()

while(!isStopped && userInput != null) {

// 傳送出來

store(userInput)

userInput = reader.readLine()

}

reader.close()

socket.close()

// Restart in an attempt to connect again when server is active again

restart("Trying to connect again")

} catch {

case e: java.net.ConnectException =>

// restart if could not connect to server

restart("Error connecting to " + host + ":" + port, e)

case t: Throwable =>

// restart if there is any other error

restart("Error receiving data", t)

}

}

}

object CustomReceiver {

def main(args: Array[String]) {

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.receiverStream(new CustomReceiver("master01", 9999))

// Split each line into words

val words = lines.flatMap(_.split(" "))

//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

//ssc.stop()

}

}

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. RDD隊列

測試過程中,可以通過使用streamingContext.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。

package com.atguigu.streaming

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueRdd {

def main(args: Array[String]) {

val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")

val ssc = new StreamingContext(conf, Seconds(1))

// Create the queue through which RDDs can be pushed to

// a QueueInputDStream

//創(chuàng)建RDD隊列

val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

// Create the QueueInputDStream and use it do some processing

// 創(chuàng)建QueueInputDStream

val inputStream = ssc.queueStream(rddQueue)

//處理隊列中的RDD數(shù)據(jù)

val mappedStream = inputStream.map(x => (x % 10, 1))

val reducedStream = mappedStream.reduceByKey(_ + _)

//打印結(jié)果

reducedStream.print()

//啟動計算

ssc.start()

// Create and push some RDDs into

for (i <- 1 to 30) {

rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)

Thread.sleep(2000)

//通過程序停止StreamingContext的運行

//ssc.stop()

}

}

}

[bigdata@master01 spark-2.1.1-bin-hadoop2.7]$ bin/spark-submit --class com.atguigu.streaming.QueueRdd ~/queueRdd-jar-with-dependencies.jar

17/09/05 23:28:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

-------------------------------------------

Time: 1504668485000 ms

-------------------------------------------

(4,30)

(0,30)

(6,30)

(8,30)

(2,30)

(1,30)

(3,30)

(7,30)

(9,30)

(5,30)

-------------------------------------------

Time: 1504668486000 ms

-------------------------------------------

-------------------------------------------

Time: 1504668487000 ms

-------------------------------------------

(4,30)

(0,30)

(6,30)

(8,30)

(2,30)

(1,30)

(3,30)

(7,30)

(9,30)

(5,30)

  1. 高級數(shù)據(jù)源

除核心數(shù)據(jù)源外,還可以用附加數(shù)據(jù)源接收器來從一些知名數(shù)據(jù)獲取系統(tǒng)中接收的數(shù)據(jù),這些接收器都作為Spark Streaming的組件進行獨立打包了。它們?nèi)匀皇荢park的一部分,不過你需要在構(gòu)建文件中添加額外的包才能使用它們?,F(xiàn)有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ??梢酝ㄟ^添加與Spark版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 來引入這些附加接收器。

  1. Apache Kafka

在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 來使用它。包內(nèi)提供的 KafkaUtils 對象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息創(chuàng)建出 DStream。由于 KafkaUtils 可以訂閱多個主題,因此它創(chuàng)建出的 DStream 由成對的主題和消息組成。要創(chuàng)建出一個流數(shù)據(jù),需 要使用 StreamingContext 實例、一個由逗號隔開的 ZooKeeper 主機列表字符串、消費者組的名字(唯一名字),以及一個從主題到針對這個主題的接收器線程數(shù)的映射表來調(diào)用 createStream() 方法

import org.apache.spark.streaming.kafka._...// 創(chuàng)建一個從主題到接收器線程數(shù)的映射表

val topics = List(("pandas", 1), ("logs", 1)).toMap

val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)

topicLines.map(_._2)

下面我們進行一個實例,演示SparkStreaming如何從Kafka讀取消息,如果通過連接池方法把消息處理完成后再寫會Kafka:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

kafka Connection Pool程序:

package com.atguigu.streaming

import java.util.Properties

import org.apache.commons.pool2.impl.DefaultPooledObject

import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

case class KafkaProducerProxy(brokerList: String,

producerConfig: Properties = new Properties,

defaultTopic: Option[String] = None,

producer: Option[KafkaProducer[String, String]] = None) {

type Key = String

type Val = String

require(brokerList == null || !brokerList.isEmpty, "Must set broker list")

private val p = producer getOrElse {

var props:Properties= new Properties();

props.put("bootstrap.servers", brokerList);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

new KafkaProducer[String,String](props)

}

private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {

val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))

require(!t.isEmpty, "Topic must not be empty")

key match {

case Some(k) => new ProducerRecord(t, k, value)

case _ => new ProducerRecord(t, value)

}

}

def send(key: Key, value: Val, topic: Option[String] = None) {

p.send(toMessage(value, Option(key), topic))

}

def send(value: Val, topic: Option[String]) {

send(null, value, topic)

}

def send(value: Val, topic: String) {

send(null, value, Option(topic))

}

def send(value: Val) {

send(null, value, None)

}

def shutdown(): Unit = p.close()

}

abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {

def newInstance(): KafkaProducerProxy

}

class BaseKafkaProducerFactory(brokerList: String,

config: Properties = new Properties,

defaultTopic: Option[String] = None)

extends KafkaProducerFactory(brokerList, config, defaultTopic) {

override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)

}

class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)

extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {

override def create(): KafkaProducerProxy = factory.newInstance()

override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)

override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {

p.getObject.shutdown()

super.destroyObject(p)

}

}

KafkaStreaming main:

package com.atguigu.streaming

import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf

import org.apache.spark.api.java.function.VoidFunction

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import org.apache.spark.streaming.{Seconds, StreamingContext}

object createKafkaProducerPool{

def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {

val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))

val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)

val poolConfig = {

val c = new GenericObjectPoolConfig

val maxNumProducers = 10

c.setMaxTotal(maxNumProducers)

c.setMaxIdle(maxNumProducers)

c

}

new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)

}

}

object KafkaStreaming{

def main(args: Array[String]) {

val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

//創(chuàng)建topic

val brobrokers = "172.16.148.150:9092,172.16.148.151:9092,172.16.148.152:9092"

val sourcetopic="source";

val targettopic="target";

//創(chuàng)建消費者組

var group="con-consumer-group"

//消費者配置

val kafkaParam = Map(

"bootstrap.servers" -> brobrokers,//用于初始化鏈接到集群的地址

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

//用于標(biāo)識這個消費者屬于哪個消費團體

"group.id" -> group,

//如果沒有初始化偏移量或者當(dāng)前的偏移量不存在任何服務(wù)器上,可以使用這個配置屬性

//可以使用這個配置,latest自動重置偏移量為最新的偏移量

"auto.offset.reset" -> "latest",

//如果是true,則這個消費者的偏移量會在后臺自動提交

"enable.auto.commit" -> (false: java.lang.Boolean)

);

//ssc.sparkContext.broadcast(pool)

//創(chuàng)建DStream,返回接收到的輸入數(shù)據(jù)

var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam))

//每一個stream都是一個ConsumerRecord

stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => {

rdd.foreachPartition(partitionOfRecords => {

// Get a producer from the shared pool

val pool = createKafkaProducerPool(brobrokers, targettopic)

val p = pool.borrowObject()

partitionOfRecords.foreach {message => System.out.println(message._2);p.send(message._2,Option(targettopic))}

// Returning the producer to the pool also shuts it down

pool.returnObject(p)

})

})

ssc.start()

ssc.awaitTermination()

}

}

程序部署:

1、啟動zookeeper和kafka。

bin/kafka-server-start.sh -deamon ./config/server.properties

2、創(chuàng)建兩個topic,一個為source,一個為target

bin/kafka-topics.sh --create --zookeeper 192.168.56.150:2181,192.168.56.151:2181,192.168.56.152:2181 --replication-factor 2 --partitions 2 --topic source

bin/kafka-topics.sh --create --zookeeper 172.16.148.150:2181,172.16.148.151:2181,172.16.148.152:2181 --replication-factor 2 --partitions 2 --topic target

3、啟動kafka console producer 寫入source topic

bin/kafka-console-producer.sh --broker-list 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source

4、啟動kafka console consumer 監(jiān)聽target topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source

5、啟動kafkaStreaming程序:

[bigdata@master01 ~]$ ./hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.KafkaStreaming ./kafkastreaming-jar-with-dependencies.jar

6、程序運行截圖:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. Spark對Kafka兩種連接方式的對比

Spark對于Kafka的連接主要有兩種方式,一種是DirectKafkaInputDStream,另外一種是KafkaInputDStream。DirectKafkaInputDStream 只在 driver 端接收數(shù)據(jù),所以繼承了 InputDStream,是沒有 receivers 的。

主要通過KafkaUtils#createDirectStream以及KafkaUtils#createStream這兩個 API 來創(chuàng)建,除了要傳入的參數(shù)不同外,接收 kafka 數(shù)據(jù)的節(jié)點、拉取數(shù)據(jù)的時機也完全不同。

KafkaUtils#createStream【Receiver-based】

這種方法使用一個 Receiver 來接收數(shù)據(jù)。在該 Receiver 的實現(xiàn)中使用了 Kafka high-level consumer API。Receiver 從 kafka 接收的數(shù)據(jù)將被存儲到 Spark executor 中,隨后啟動的 job 將處理這些數(shù)據(jù)。

在默認(rèn)配置下,該方法失敗后會丟失數(shù)據(jù)(保存在 executor 內(nèi)存里的數(shù)據(jù)在 application 失敗后就沒了),若要保證數(shù)據(jù)不丟失,需要啟用 WAL(即預(yù)寫日志至 HDFS、S3等),這樣再失敗后可以從日志文件中恢復(fù)數(shù)據(jù)。

在該函數(shù)中,會新建一個 KafkaInputDStream對象,KafkaInputDStream繼承于 ReceiverInputDStream。KafkaInputDStream實現(xiàn)了getReceiver方法,返回接收器的實例:

def getReceiver(): Receiver[(K, V)] = {

if (!useReliableReceiver) {

//< 不啟用 WAL

new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)

} else {

//< 啟用 WAL

new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)

}

}

根據(jù)是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver。下圖描述了 KafkaReceiver 接收數(shù)據(jù)的具體流程:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

需要注意的點:

  • · Kafka Topic 的 partitions 與RDD 的 partitions 沒有直接關(guān)系,不能一一對應(yīng)。如果增加 topic 的 partition 個數(shù)的話僅僅會增加單個 Receiver 接收數(shù)據(jù)的線程數(shù)。事實上,使用這種方法只會在一個 executor 上啟用一個 Receiver,該 Receiver 包含一個線程池,線程池的線程個數(shù)與所有 topics 的 partitions 個數(shù)總和一致,每條線程接收一個 topic 的一個 partition 的數(shù)據(jù)。而并不會增加處理數(shù)據(jù)時的并行度。

  • · 對于一個 topic,可以使用多個 groupid 相同的 input DStream 來使用多個 Receivers 來增加并行度,然后 union 他們;對于多個 topics,除了可以用上個辦法增加并行度外,還可以對不同的 topic 使用不同的 input DStream 然后 union 他們來增加并行度

  • · 如果你啟用了 WAL,為能將接收到的數(shù)據(jù)將以 log 的方式在指定的存儲系統(tǒng)備份一份,需要指定輸入數(shù)據(jù)的存儲等級為 StorageLevel.MEMORY_AND_DISK_SER 或 StorageLevel.MEMORY_AND_DISK_SER_2

KafkaUtils#createDirectStream【W(wǎng)ithOut Receiver】

自 Spark-1.3.0 起,提供了不需要 Receiver 的方法。替代了使用 receivers 來接收數(shù)據(jù),該方法定期查詢每個 topic+partition 的 lastest offset,并據(jù)此決定每個 batch 要接收的 offsets 范圍。

KafkaUtils#createDirectStream調(diào)用中,會新建DirectKafkaInputDStream,DirectKafkaInputDStream#compute(validTime: Time)會從 kafka 拉取數(shù)據(jù)并生成 RDD,流程如下:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

如上圖所示,該函數(shù)主要做了以下三個事情:

確定要接收的 partitions 的 offsetRange,以作為第2步創(chuàng)建的 RDD 的數(shù)據(jù)來源

創(chuàng)建 RDD 并執(zhí)行 count 操作,使 RDD 真實具有數(shù)據(jù)

以 streamId、數(shù)據(jù)條數(shù),offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中

進一步看 KafkaRDD 的 getPartitions 實現(xiàn):

override def getPartitions: Array[Partition] = {

offsetRanges.zipWithIndex.map { case (o, i) =>

val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))

new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)

}.toArray

}

從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數(shù)據(jù)與 Kafka topic 的某個 partition 的 o.fromOffset 至 o.untilOffset 數(shù)據(jù)是相對應(yīng)的,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對應(yīng)的

該方式相比使用 Receiver 的方式有以下好處:

簡化并行:不再需要創(chuàng)建多個 kafka input DStream 然后再 union 這些 input DStream。使用 directStream,Spark Streaming會創(chuàng)建與 Kafka partitions 相同數(shù)量的 paritions 的 RDD,RDD 的 partition與 Kafka 的 partition 一一對應(yīng),這樣更易于理解及調(diào)優(yōu)

高效:在方式一中要保證數(shù)據(jù)零丟失需要啟用 WAL(預(yù)寫日志),這會占用更多空間。而在方式二中,可以直接從 Kafka 指定的 topic 的指定 offsets 處恢復(fù)數(shù)據(jù),不需要使用 WAL

恰好一次語義保證:基于Receiver方式使用了 Kafka 的 high level API 來在 Zookeeper 中存儲已消費的 offsets。這在某些情況下會導(dǎo)致一些數(shù)據(jù)被消費兩次,比如 streaming app 在處理某個 batch 內(nèi)已接受到的數(shù)據(jù)的過程中掛掉,但是數(shù)據(jù)已經(jīng)處理了一部分,但這種情況下無法將已處理數(shù)據(jù)的 offsets 更新到 Zookeeper 中,下次重啟時,這批數(shù)據(jù)將再次被消費且處理?;赿irect的方式,使用kafka的簡單api,Spark Streaming自己就負(fù)責(zé)追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費一次且僅消費一次。這種方式中,只要將 output 操作和保存 offsets 操作封裝成一個原子操作就能避免失敗后的重復(fù)消費和處理,從而達(dá)到恰好一次的語義(Exactly-once)

通過以上分析,我們可以對這兩種方式的區(qū)別做一個總結(jié):

createStream會使用 Receiver;而createDirectStream不會

createStream使用的 Receiver 會分發(fā)到某個 executor 上去啟動并接受數(shù)據(jù);而createDirectStream直接在 driver 上接收數(shù)據(jù)

createStream使用 Receiver 源源不斷的接收數(shù)據(jù)并把數(shù)據(jù)交給 ReceiverSupervisor 處理最終存儲為 blocks 作為 RDD 的輸入,從 kafka 拉取數(shù)據(jù)與計算消費數(shù)據(jù)相互獨立;而createDirectStream會在每個 batch 拉取數(shù)據(jù)并就地消費,到下個 batch 再次拉取消費,周而復(fù)始,從 kafka 拉取數(shù)據(jù)與計算消費數(shù)據(jù)是連續(xù)的,沒有獨立開

createStream中創(chuàng)建的KafkaInputDStream 每個 batch 所對應(yīng)的 RDD 的 partition 不與 Kafka partition 一一對應(yīng);而createDirectStream中創(chuàng)建的 DirectKafkaInputDStream 每個 batch 所對應(yīng)的 RDD 的 partition 與 Kafka partition 一一對應(yīng)

  1. Flume-ng

Spark提供兩個不同的接收器來使用Apache Flume(http://flume.apache.org/,見圖10-8)。 兩個接收器簡介如下。

? 推式接收器該接收器以 Avro 數(shù)據(jù)池的方式工作,由 Flume 向其中推數(shù)據(jù)。

? 拉式接收器該接收器可以從自定義的中間數(shù)據(jù)池中拉數(shù)據(jù),而其他進程可以使用 Flume 把數(shù)據(jù)推進 該中間數(shù)據(jù)池。

兩種方式都需要重新配置 Flume,并在某個節(jié)點配置的端口上運行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一種方法,都需要在工程中引入 Maven 工件 spark-streaming-flume_2.10。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

推式接收器的方法設(shè)置起來很容易,但是它不使用事務(wù)來接收數(shù)據(jù)。在這種方式中,接收 器以 Avro 數(shù)據(jù)池的方式工作,我們需要配置 Flume 來把數(shù)據(jù)發(fā)到 Avro 數(shù)據(jù)池。我們提供的 FlumeUtils 對象會把接收器配置在一個特定的工作節(jié)點的主機名及端口號上。這些設(shè)置必須和 Flume 配置相匹配。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

雖然這種方式很簡潔,但缺點是沒有事務(wù)支持。這會增加運行接收器的工作節(jié)點發(fā)生錯誤 時丟失少量數(shù)據(jù)的幾率。不僅如此,如果運行接收器的工作節(jié)點發(fā)生故障,系統(tǒng)會嘗試從 另一個位置啟動接收器,這時需要重新配置 Flume 才能將數(shù)據(jù)發(fā)給新的工作節(jié)點。這樣配 置會比較麻煩。

較新的方式是拉式接收器(在Spark 1.1中引入),它設(shè)置了一個專用的Flume數(shù)據(jù)池供 Spark Streaming讀取,并讓接收器主動從數(shù)據(jù)池中拉取數(shù)據(jù)。這種方式的優(yōu)點在于彈性較 好,Spark Streaming通過事務(wù)從數(shù)據(jù)池中讀取并復(fù)制數(shù)據(jù)。在收到事務(wù)完成的通知前,這 些數(shù)據(jù)還保留在數(shù)據(jù)池中。

我們需要先把自定義數(shù)據(jù)池配置為 Flume 的第三方插件。安裝插件的最新方法請參考 Flume 文檔的相關(guān)部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party- plugins)。由于插件是用 Scala 寫的,因此需要把插件本身以及 Scala 庫都添加到 Flume 插件 中。Spark 1.1 中對應(yīng)的 Maven 索引如例 10-37 所示。

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-flume-sink_2.11</artifactId>

<version>1.2.0</version>

</dependency>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.11.11</version>

</dependency>

當(dāng)你把自定義 Flume 數(shù)據(jù)池添加到一個節(jié)點上之后,就需要配置 Flume 來把數(shù)據(jù)推送到這個數(shù)據(jù)池中,

a1.sinks = spark

a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink

a1.sinks.spark.hostname = receiver-hostname

a1.sinks.spark.port = port-used-for-sync-not-spark-port

a1.sinks.spark.channel = memoryChannel

等到數(shù)據(jù)已經(jīng)在數(shù)據(jù)池中緩存起來,就可以調(diào)用 FlumeUtils 來讀取數(shù)據(jù)了

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔
  1. DStreams轉(zhuǎn)換

DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關(guān)的原語。

Transformation

Meaning

map(func)

將源DStream中的每個元素通過一個函數(shù)func從而得到新的DStreams。

flatMap(func)

和map類似,但是每個輸入的項可以被映射為0或更多項。

filter(func)

選擇源DStream中函數(shù)func判為true的記錄作為新DStreams

repartition(numPartitions)

通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別。

union(otherStream)

聯(lián)合源DStreams和其他DStreams來得到新DStream

count()

統(tǒng)計源DStreams中每個RDD所含元素的個數(shù)得到單元素RDD的新DStreams。

reduce(func)

通過函數(shù)func(兩個參數(shù)一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStreams。這個函數(shù)需要關(guān)聯(lián)從而可以被并行計算。

countByValue()

對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream,其中Long值表明相應(yīng)的K在源DStream中每個RDD出現(xiàn)的頻率。

reduceByKey(func, [numTasks])

對(K,V)對的DStream調(diào)用此函數(shù),返回同樣(K,V)對的新DStream,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來。Note:默認(rèn)情況下,這個操作使用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2,集群模式中的數(shù)量取決于配置參數(shù)spark.default.parallelism)。你也可以傳入可選的參數(shù)numTaska來設(shè)置不同數(shù)量的任務(wù)。

join(otherStream, [numTasks])

兩DStream分別為(K,V)和(K,W)對,返回(K,(V,W))對的新DStream。

cogroup(otherStream, [numTasks])

兩DStream分別為(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStreams

transform(func)

將RDD到RDD映射的函數(shù)func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作。

updateStateByKey(func)

得到”狀態(tài)”DStream,其中每個key狀態(tài)的更新是通過將給定函數(shù)用于此key的上一個狀態(tài)和新值而得到。這個可用于保存每個key值的任意狀態(tài)數(shù)據(jù)。

DStream 的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。

? 在無狀態(tài)轉(zhuǎn)化操作中,每個批次的處理不依賴于之前批次的數(shù)據(jù)。常見的 RDD 轉(zhuǎn)化操作,例如 map()、filter()、reduceByKey() 等,都是無狀態(tài)轉(zhuǎn)化操作。

? 相對地,有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。

  1. 無狀態(tài)轉(zhuǎn)化操作

無狀態(tài)轉(zhuǎn)化操作就是把簡單的 RDD 轉(zhuǎn)化操作應(yīng)用到每個批次上,也就是轉(zhuǎn)化 DStream 中的每一個 RDD。部分無狀態(tài)轉(zhuǎn)化操作列在了下表中。 注意,針對鍵值對的 DStream 轉(zhuǎn)化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

需要記住的是,盡管這些函數(shù)看起來像作用在整個流上一樣,但事實上每個 DStream 在內(nèi)部是由許多 RDD(批次)組成,且無狀態(tài)轉(zhuǎn)化操作是分別應(yīng)用到每個 RDD 上的。例如, reduceByKey() 會歸約每個時間區(qū)間中的數(shù)據(jù),但不會歸約不同區(qū)間之間的數(shù)據(jù)。

舉個例子,在之前的wordcount程序中,我們只會統(tǒng)計1秒內(nèi)接收到的數(shù)據(jù)的單詞個數(shù),而不會累加。

無狀態(tài)轉(zhuǎn)化操作也能在多個 DStream 間整合數(shù)據(jù),不過也是在各個時間區(qū)間內(nèi)。例如,鍵 值對 DStream 擁有和 RDD 一樣的與連接相關(guān)的轉(zhuǎn)化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個批次分別執(zhí)行了對應(yīng)的 RDD 操作。

我們還可以像在常規(guī)的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個 DStream 的內(nèi)容合并起來,也可以使用 StreamingContext.union() 來合并多個流。

  1. 有狀態(tài)轉(zhuǎn)化操作

特殊的Transformations

  1. 追蹤狀態(tài)變化UpdateStateByKey

UpdateStateByKey原語用于記錄歷史記錄,有時,我們需要在 DStream 中跨批次維護狀態(tài)(例如流計算中累加wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個狀態(tài)變量的訪問,用于鍵值對形式的 DStream。給定一個由(鍵,事件)對構(gòu)成的 DStream,并傳遞一個指定如何根據(jù)新的事件 更新每個鍵對應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài)) 對。

updateStateByKey() 的結(jié)果會是一個新的 DStream,其內(nèi)部的 RDD 序列是由每個時間區(qū)間對應(yīng)的(鍵,狀態(tài))對組成的。

updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態(tài)。為使用這個功能,你需要做下面兩步:

1. 定義狀態(tài),狀態(tài)可以是一個任意的數(shù)據(jù)類型。

2. 定義狀態(tài)更新函數(shù),用此函數(shù)闡明如何使用之前的狀態(tài)和來自輸入流的新值對狀態(tài)進行更新。

使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來保存狀態(tài)。

更新版的wordcount:

package com.atguigu.streaming

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

def main(args: Array[String]) {

// 定義更新狀態(tài)方法,參數(shù)values為當(dāng)前批次單詞頻度,state為以往批次單詞頻度

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(3))

ssc.checkpoint(".")

// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("master01", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Count each word in each batch

val pairs = words.map(word => (word, 1))

// 使用updateStateByKey來更新狀態(tài),統(tǒng)計從運行開始以來單詞總的次數(shù)

val stateDstream = pairs.updateStateByKey[Int](updateFunc)

stateDstream.print()

//val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

//wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

//ssc.stop()

}

}

啟動nc –lk 9999

[bigdata@master01 ~]$ nc -lk 9999

ni shi shui

ni hao ma

啟動統(tǒng)計程序:

[bigdata@master01 ~]$ ./hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.WorldCount ./statefulwordcount-jar-with-dependencies.jar

17/09/06 04:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

-------------------------------------------

Time: 1504685175000 ms

-------------------------------------------

-------------------------------------------

Time: 1504685181000 ms

-------------------------------------------

(shi,1)

(shui,1)

(ni,1)

-------------------------------------------

Time: 1504685187000 ms

-------------------------------------------

(shi,1)

(ma,1)

(hao,1)

(shui,1)

(ni,2)

[bigdata@master01 ~]$ ls

2df8e0c3-174d-401a-b3a7-f7776c3987db checkpoint-1504685205000 data

backup checkpoint-1504685205000.bk debug.log

checkpoint-1504685199000 checkpoint-1504685208000 hadoop

checkpoint-1504685199000.bk checkpoint-1504685208000.bk receivedBlockMetadata

checkpoint-1504685202000 checkpoint-1504685211000 software

checkpoint-1504685202000.bk checkpoint-1504685211000.bk statefulwordcount-jar-with-dependencies.jar

  1. Window Operations

Window Operations有點類似于Storm中的State,可以設(shè)置窗口的大小和滑動窗口的間隔來動態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)。

基于窗口的操作會在一個比 StreamingContext 的批次間隔更長的時間范圍內(nèi),通過整合多個批次的結(jié)果,計算出整個窗口的結(jié)果。

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

所有基于窗口的操作都需要兩個參數(shù),分別為窗口時長以及滑動步長,兩者都必須是 StreamContext 的批次間隔的整數(shù)倍。窗口時長控制每次計算最近的多少個批次的數(shù)據(jù),其實就是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源 DStream,要創(chuàng)建一個最近 30 秒的時間窗口(即最近 3 個批次),就應(yīng)當(dāng)把 windowDuration 設(shè)為 30 秒。而滑動步長的默認(rèn)值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每兩個批次計算一次窗口結(jié)果, 就應(yīng)該把滑動步長設(shè)置為 20 秒。

假設(shè),你想拓展前例從而每隔十秒對持續(xù)30秒的數(shù)據(jù)生成word count。為做到這個,我們需要在持續(xù)30秒數(shù)據(jù)的(word,1)對DStream上應(yīng)用reduceByKey。使用操作reduceByKeyAndWindow.

# reduce last 30 seconds of data, every 10 second


windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)


spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

Transformation

Meaning

window(windowLength, slideInterval)

基于對源DStream窗化的批次進行計算返回一個新的DStream

countByWindow(windowLength, slideInterval)

返回一個滑動窗口計數(shù)流中的元素。

reduceByWindow(func, windowLength, slideInterval)

通過使用自定義函數(shù)整合滑動區(qū)間流元素來創(chuàng)建一個新的單元素流。

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

當(dāng)在一個(K,V)對的DStream上調(diào)用此函數(shù),會返回一個新(K,V)對的DStream,此處通過對滑動窗口中批次數(shù)據(jù)使用reduce函數(shù)來整合每個key的value值。Note:默認(rèn)情況下,這個操作使用Spark的默認(rèn)數(shù)量并行任務(wù)(本地是2),在集群模式中依據(jù)配置屬性(spark.default.parallelism)來做grouping。你可以通過設(shè)置可選參數(shù)numTasks來設(shè)置不同數(shù)量的tasks。

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

這個函數(shù)是上述函數(shù)的更高效版本,每個窗口的reduce值都是通過用前一個窗的reduce值來遞增計算。通過reduce進入到滑動窗口數(shù)據(jù)并”反向reduce”離開窗口的舊數(shù)據(jù)來實現(xiàn)這個操作。一個例子是隨著窗口滑動對keys的“加”“減”計數(shù)。通過前邊介紹可以想到,這個函數(shù)只適用于”可逆的reduce函數(shù)”,也就是這些reduce函數(shù)有相應(yīng)的”反reduce”函數(shù)(以參數(shù)invFunc形式傳入)。如前述函數(shù),reduce任務(wù)的數(shù)量通過可選參數(shù)來配置。注意:為了使用這個操作,檢查點必須可用。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

對(K,V)對的DStream調(diào)用,返回(K,Long)對的新DStream,其中每個key的值是其在滑動窗口中頻率。如上,可配置reduce任務(wù)數(shù)量。

reduceByWindow() 和 reduceByKeyAndWindow() 讓我們可以對每個窗口更高效地進行歸約操作。它們接收一個歸約函數(shù),在整個窗口上執(zhí)行,比如 +。除此以外,它們還有一種特殊形式,通過只考慮新進入窗口的數(shù)據(jù)和離開窗 口的數(shù)據(jù),讓 Spark 增量計算歸約結(jié)果。這種特殊形式需要提供歸約函數(shù)的一個逆函數(shù),比 如 + 對應(yīng)的逆函數(shù)為 -。對于較大的窗口,提供逆函數(shù)可以大大提高執(zhí)行效率

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))

val ipCountDStream = ipDStream.reduceByKeyAndWindow(

{(x, y) => x + y},

{(x, y) => x - y},

Seconds(30),

Seconds(10))

// 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長// 滑動步長

countByWindow() 和 countByValueAndWindow() 作為對數(shù)據(jù)進行 計數(shù)操作的簡寫。countByWindow() 返回一個表示每個窗口中元素個數(shù)的 DStream,而 countByValueAndWindow() 返回的 DStream 則包含窗口中每個值的個數(shù),

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}

val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))

val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

WordCount第三版:3秒一個批次,窗口12秒,滑步6秒。

package com.atguigu.streaming

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

def main(args: Array[String]) {

// 定義更新狀態(tài)方法,參數(shù)values為當(dāng)前批次單詞頻度,state為以往批次單詞頻度

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(3))

ssc.checkpoint(".")

// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("master01", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

//ssc.stop()

}

}

  1. 重要操作

  1. Transform Operation

Transform原語允許DStream上執(zhí)行任意的RDD-to-RDD函數(shù)。即使這些函數(shù)并沒有在DStream的API中暴露出來,通過該函數(shù)可以方便的擴展Spark API。

該函數(shù)每一批次調(diào)度一次。

比如下面的例子,在進行單詞統(tǒng)計的時候,想要過濾掉spam的信息。

其實也就是對DStream中的RDD應(yīng)用轉(zhuǎn)換。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>

rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning

...

}

  1. Join 操作

連接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以連接Stream-Stream,windows-stream to windows-stream、stream-dataset

Stream-Stream Joins

val stream1: DStream[String, String] = ...

val stream2: DStream[String, String] = ...

val joinedStream = stream1.join(stream2)

val windowedStream1 = stream1.window(Seconds(20))

val windowedStream2 = stream2.window(Minutes(1))

val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

val dataset: RDD[String, String] = ...

val windowedStream = stream.window(Seconds(20))...

val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

  1. DStreams輸出

輸出操作指定了對流數(shù)據(jù)經(jīng)轉(zhuǎn)化操作得到的數(shù)據(jù)所要執(zhí)行的操作(例如把結(jié)果推入外部數(shù)據(jù)庫或輸出到屏幕上)。與 RDD 中的惰性求值類似,如果一個 DStream 及其派生出的 DStream 都沒有被執(zhí)行輸出操作,那么這些 DStream 就都不會被求值。如果 StreamingContext 中沒有設(shè)定輸出操作,整個 context 就都不會啟動。

Output Operation

Meaning

print()

在運行流程序的驅(qū)動結(jié)點上打印DStream中每一批次數(shù)據(jù)的最開始10個元素。這用于開發(fā)和調(diào)試。在Python API中,同樣的操作叫pprint()。

saveAsTextFiles(prefix, [suffix])

以text文件形式存儲這個DStream的內(nèi)容。每一批次的存儲文件名基于參數(shù)中的prefix和suffix?!眕refix-Time_IN_MS[.suffix]”.

saveAsObjectFiles(prefix, [suffix])

以Java對象序列化的方式將Stream中的數(shù)據(jù)保存為 SequenceFiles . 每一批次的存儲文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

saveAsHadoopFiles(prefix, [suffix])

將Stream中的數(shù)據(jù)保存為 Hadoop files. 每一批次的存儲文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]".

Python API Python中目前不可用。

foreachRDD(func)

這是最通用的輸出操作,即將函數(shù)func用于產(chǎn)生于stream的每一個RDD。其中參數(shù)傳入的函數(shù)func應(yīng)該實現(xiàn)將每一個RDD中數(shù)據(jù)推送到外部系統(tǒng),如將RDD存入文件或者通過網(wǎng)絡(luò)將其寫入數(shù)據(jù)庫。注意:函數(shù)func在運行流應(yīng)用的驅(qū)動中被執(zhí)行,同時其中一般函數(shù)RDD操作從而強制其對于流RDD的運算。

通用的輸出操作 foreachRDD(),它用來對 DStream 中的 RDD 運行任意計算。這和transform() 有些類似,都可以讓我們訪問任意 RDD。在 foreachRDD() 中,可以重用我們在 Spark 中實現(xiàn)的所有行動操作。比如,常見的用例之一是把數(shù)據(jù)寫到諸如 MySQL 的外部數(shù)據(jù)庫中。

需要注意的:

連接不能寫在driver層面

如果寫在foreach則每個RDD都創(chuàng)建,得不償失

增加foreachPartition,在分區(qū)創(chuàng)建

可以考慮使用連接池優(yōu)化

dstream.foreachRDD { rdd =>

// error val connection = createNewConnection() // executed at the driver 序列化錯誤

rdd.foreachPartition { partitionOfRecords =>

// ConnectionPool is a static, lazily initialized pool of connections

val connection = ConnectionPool.getConnection()

partitionOfRecords.foreach(record => connection.send(record) // executed at the worker

)

ConnectionPool.returnConnection(connection) // return to the pool for future reuse

}

}

  1. 累加器和廣播變量

累加器(Accumulators)和廣播變量(Broadcast variables)不能從Spark Streaming的檢查點中恢復(fù)。如果你啟用檢查并也使用了累加器和廣播變量,那么你必須創(chuàng)建累加器和廣播變量的延遲單實例從而在驅(qū)動因失效重啟后他們可以被重新實例化。如下例述:

object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {

if (instance == null) {

synchronized {

if (instance == null) {

val wordBlacklist = Seq("a", "b", "c")

instance = sc.broadcast(wordBlacklist)

}

}

}

instance

}

}

object DroppedWordsCounter {

@volatile private var instance: LongAccumulator = null

def getInstance(sc: SparkContext): LongAccumulator = {

if (instance == null) {

synchronized {

if (instance == null) {

instance = sc.longAccumulator("WordsInBlacklistCounter")

}

}

}

instance

}

}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>

// Get or register the blacklist Broadcast

val blacklist = WordBlacklist.getInstance(rdd.sparkContext)

// Get or register the droppedWordsCounter Accumulator

val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)

// Use blacklist to drop words and use droppedWordsCounter to count them

val counts = rdd.filter { case (word, count) =>

if (blacklist.value.contains(word)) {

droppedWordsCounter.add(count)

false

} else {

true

}

}.collect().mkString("[", ", ", "]")

val output = "Counts at time " + time + " " + counts

})

  1. DataFrame ans SQL Operations

你可以很容易地在流數(shù)據(jù)上使用DataFrames和SQL。你必須使用SparkContext來創(chuàng)建StreamingContext要用的SQLContext。此外,這一過程可以在驅(qū)動失效后重啟。我們通過創(chuàng)建一個實例化的SQLContext單實例來實現(xiàn)這個工作。如下例所示。我們對前例word count進行修改從而使用DataFrames和SQL來產(chǎn)生word counts。每個RDD被轉(zhuǎn)換為DataFrame,以臨時表格配置并用SQL進行查詢。

val words: DStream[String] = ...

words.foreachRDD { rdd =>

// Get the singleton instance of SparkSession

val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

import spark.implicits._

// Convert RDD[String] to DataFrame

val wordsDataFrame = rdd.toDF("word")

// Create a temporary view

wordsDataFrame.createOrReplaceTempView("words")

// Do word count on DataFrame using SQL and print it

val wordCountsDataFrame =

spark.sql("select word, count(*) as total from words group by word")

wordCountsDataFrame.show()

}

你也可以從不同的線程在定義于流數(shù)據(jù)的表上運行SQL查詢(也就是說,異步運行StreamingContext)。僅確定你設(shè)置StreamingContext記住了足夠數(shù)量的流數(shù)據(jù)以使得查詢操作可以運行。否則,StreamingContext不會意識到任何異步的SQL查詢操作,那么其就會在查詢完成之后刪除舊的數(shù)據(jù)。例如,如果你要查詢最后一批次,但是你的查詢會運行5分鐘,那么你需要調(diào)用streamingContext.remember(Minutes(5))(in Scala, 或者其他語言的等價操作)。

  1. Caching / Persistence

和RDDs類似,DStreams同樣允許開發(fā)者將流數(shù)據(jù)保存在內(nèi)存中。也就是說,在DStream上使用persist()方法將會自動把DStreams中的每個RDD保存在內(nèi)存中。當(dāng)DStream中的數(shù)據(jù)要被多次計算時,這個非常有用(如在同樣數(shù)據(jù)上的多次操作)。對于像reduceByWindow和reduceByKeyAndWindow以及基于狀態(tài)的(updateStateByKey)這種操作,保存是隱含默認(rèn)的。因此,即使開發(fā)者沒有調(diào)用persist(),由基于窗操作產(chǎn)生的DStreams會自動保存在內(nèi)存中。

  1. 7x24 不間斷運行

  1. 檢查點機制

檢查點機制是我們在Spark Streaming中用來保障容錯性的主要機制。與應(yīng)用程序邏輯無關(guān)的錯誤(即系統(tǒng)錯位,JVM崩潰等)有迅速恢復(fù)的能力.

它可以使Spark Streaming階段性地把應(yīng)用數(shù)據(jù)存儲到諸如HDFS或Amazon S3這樣的可靠存儲系統(tǒng)中, 以供恢復(fù)時使用。具體來說,檢查點機制主要為以下兩個目的服務(wù)。

控制發(fā)生失敗時需要重算的狀態(tài)數(shù)。SparkStreaming可以通 過轉(zhuǎn)化圖的譜系圖來重算狀態(tài),檢查點機制則可以控制需要在轉(zhuǎn)化圖中回溯多遠(yuǎn)。

提供驅(qū)動器程序容錯。如果流計算應(yīng)用中的驅(qū)動器程序崩潰了,你可以重啟驅(qū)動器程序 并讓驅(qū)動器程序從檢查點恢復(fù),這樣Spark Streaming就可以讀取之前運行的程序處理 數(shù)據(jù)的進度,并從那里繼續(xù)。

了實現(xiàn)這個,Spark Streaming需要為容錯存儲系統(tǒng)checkpoint足夠的信息從而使得其可以從失敗中恢復(fù)過來。有兩種類型的數(shù)據(jù)設(shè)置檢查點。

Metadata checkpointing:將定義流計算的信息存入容錯的系統(tǒng)如HDFS。元數(shù)據(jù)包括:

配置 – 用于創(chuàng)建流應(yīng)用的配置。

DStreams操作 – 定義流應(yīng)用的DStreams操作集合。

不完整批次 – 批次的工作已進行排隊但是并未完成。

Data checkpointing: 將產(chǎn)生的RDDs存入可靠的存儲空間。對于在多批次間合并數(shù)據(jù)的狀態(tài)轉(zhuǎn)換,這個很有必要。在這樣的轉(zhuǎn)換中,RDDs的產(chǎn)生基于之前批次的RDDs,這樣依賴鏈長度隨著時間遞增。為了避免在恢復(fù)期這種無限的時間增長(和鏈長度成比例),狀態(tài)轉(zhuǎn)換中間的RDDs周期性寫入可靠地存儲空間(如HDFS)從而切短依賴鏈。

總而言之,元數(shù)據(jù)檢查點在由驅(qū)動失效中恢復(fù)是首要需要的。而數(shù)據(jù)或者RDD檢查點甚至在使用了狀態(tài)轉(zhuǎn)換的基礎(chǔ)函數(shù)中也是必要的。

出于這些原因,檢查點機制對于任何生產(chǎn)環(huán)境中的流計算應(yīng)用都至關(guān)重要。你可以通過向 ssc.checkpoint() 方法傳遞一個路徑參數(shù)(HDFS、S3 或者本地路徑均可)來配置檢查點機制,同時你的應(yīng)用應(yīng)該能夠使用檢查點的數(shù)據(jù)

1. 當(dāng)程序首次啟動,其將創(chuàng)建一個新的StreamingContext,設(shè)置所有的流并調(diào)用start()。

2. 當(dāng)程序在失效后重啟,其將依據(jù)檢查點目錄的檢查點數(shù)據(jù)重新創(chuàng)建一個StreamingContext。 通過使用StraemingContext.getOrCreate很容易獲得這個性能。

ssc.checkpoint("hdfs://...")

# 創(chuàng)建和設(shè)置一個新的StreamingContext

def functionToCreateContext():

sc = SparkContext(...) # new context

ssc = new StreamingContext(...)

lines = ssc.socketTextStream(...) # create DStreams

...

ssc.checkpoint(checkpointDirectory) # 設(shè)置檢查點目錄

return ssc

# 從檢查點數(shù)據(jù)中獲取StreamingContext或者重新創(chuàng)建一個

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# 在需要完成的context上做額外的配置

# 無論其有沒有啟動

context ...

# 啟動context

context.start()

contaxt.awaitTermination()

如果檢查點目錄(checkpointDirectory)存在,那么context將會由檢查點數(shù)據(jù)重新創(chuàng)建。如果目錄不存在(首次運行),那么函數(shù)functionToCreateContext將會被調(diào)用來創(chuàng)建一個新的context并設(shè)置DStreams。

注意RDDs的檢查點引起存入可靠內(nèi)存的開銷。在RDDs需要檢查點的批次里,處理的時間會因此而延長。所以,檢查點的間隔需要很仔細(xì)地設(shè)置。在小尺寸批次(1秒鐘)。每一批次檢查點會顯著減少操作吞吐量。反之,檢查點設(shè)置的過于頻繁導(dǎo)致“血統(tǒng)”和任務(wù)尺寸增長,這會有很不好的影響對于需要RDD檢查點設(shè)置的狀態(tài)轉(zhuǎn)換,默認(rèn)間隔是批次間隔的乘數(shù)一般至少為10秒鐘。可以通過dstream.checkpoint(checkpointInterval)。通常,檢查點設(shè)置間隔是5-10個DStream的滑動間隔。

  1. WAL預(yù)寫日志

WAL 即 write ahead log(預(yù)寫日志),是在 1.2 版本中就添加的特性。作用就是,將數(shù)據(jù)通過日志的方式寫到可靠的存儲,比如 HDFS、s3,在 driver 或 worker failure 時可以從在可靠存儲上的日志文件恢復(fù)數(shù)據(jù)。WAL 在 driver 端和 executor 端都有應(yīng)用。

WAL在 driver 端的應(yīng)用

用于寫日志的對象 writeAheadLogOption: WriteAheadLog。在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 構(gòu)造函數(shù)中被創(chuàng)建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,這里只需要啟用 checkpoint 就可以創(chuàng)建該 driver 端的 WAL 管理實例,而不需要將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。

寫什么、何時寫、寫什么

首選需要明確的是,ReceivedBlockTracker 通過 WAL 寫入 log 文件的內(nèi)容是3種事件(當(dāng)然,會進行序列化):

  • · case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個 block 及該 block 的具體信息,包括 streamId、blockId、數(shù)據(jù)條數(shù)等

  • · case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即為某個 batchTime 分配了哪些 blocks 作為該 batch RDD 的數(shù)據(jù)源

  • · case class BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 對應(yīng)的 block

  • · 知道了寫了什么內(nèi)容,結(jié)合源碼,也不難找出是什么時候?qū)懥诉@些內(nèi)容。需要再次注意的是,寫上面這三種事件,也不需要將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。

WAL 在 executor 端的應(yīng)用

  • · Receiver 接收到的數(shù)據(jù)會源源不斷的傳遞給 ReceiverSupervisor,是否啟用 WAL 機制(即是否將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true)會影響 ReceiverSupervisor 在存儲 block 時的行為:

  • · 不啟用 WAL:你設(shè)置的StorageLevel是什么,就怎么存儲。比如MEMORY_ONLY只會在內(nèi)存中存一份,MEMORY_AND_DISK會在內(nèi)存和磁盤上各存一份等

  • · 啟用 WAL:在StorageLevel指定的存儲的基礎(chǔ)上,寫一份到 WAL 中。存儲一份在 WAL 上,更不容易丟數(shù)據(jù)但性能損失也比較大

  • · 關(guān)于是否要啟用 WAL,要視具體的業(yè)務(wù)而定:

  • · 若可以接受一定的數(shù)據(jù)丟失,則不需要啟用 WAL,因為對性能影響較大

  • · 若完全不能接受數(shù)據(jù)丟失,那就需要同時啟用 checkpoint 和 WAL,checkpoint 保存著執(zhí)行進度(比如已生成但未完成的 jobs),WAL 中保存著 blocks 及 blocks 元數(shù)據(jù)(比如保存著未完成的 jobs 對應(yīng)的 blocks 信息及 block 文件)。同時,這種情況可能要在數(shù)據(jù)源和 Streaming Application 中聯(lián)合來保證 exactly once 語義

  • · 預(yù)寫日志功能的流程是:

  • · 1)一個SparkStreaming應(yīng)用開始時(也就是driver開始時),相關(guān)的StreamingContext使用SparkContext啟動接收器成為長駐運行任務(wù)。這些接收器接收并保存流數(shù)據(jù)到Spark內(nèi)存中以供處理。

  • · 2)接收器通知driver。

  • · 3)接收塊中的元數(shù)據(jù)(metadata)被發(fā)送到driver的StreamingContext。

  • · 這個元數(shù)據(jù)包括:

  • · (a)定位其在executor內(nèi)存中數(shù)據(jù)的塊referenceid,

  • · (b)塊數(shù)據(jù)在日志中的偏移信息(如果啟用了)。

  • · 用戶傳送數(shù)據(jù)的生命周期如下圖所示。

  • ·

  • ·類似Kafka這樣的系統(tǒng)可以通過復(fù)制數(shù)據(jù)保持可靠性。

  1. 背壓機制

默認(rèn)情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計算過程中會出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實際計算一個批次花費時間, batch interval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時間,會造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來實現(xiàn),此舉雖然可以通過限制接收速率,來適配當(dāng)前的處理能力,防止內(nèi)存溢出,但也會引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當(dāng)前集群處理能力也高于maxRate,這就會造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。

Spark Streaming Backpressure: 根據(jù)JobScheduler反饋作業(yè)的執(zhí)行信息來動態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認(rèn)值false,即不啟用。

Streaming架構(gòu)如下圖所示

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

在原架構(gòu)的基礎(chǔ)上加上一個新的組件RateController,這個組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

流量控制點

當(dāng)Receiver開始接收數(shù)據(jù)時,會通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時將數(shù)據(jù)取走,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時,要獲取許可(令牌)。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞,進而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。

其令牌投放采用令牌桶機制進行, 原理如下圖所示:

spark streaming,spark,大數(shù)據(jù),kafka,Powered by 金山文檔

令牌桶機制: 大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會超過桶的大小。當(dāng)進行某操作時需要令牌時會從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作,否則阻塞。用完之后不用放回。

  1. 驅(qū)動器程序容錯

驅(qū)動器程序的容錯要求我們以特殊的方式創(chuàng)建 StreamingContext。我們需要把檢查點目錄提供給 StreamingContext。與直接調(diào)用 new StreamingContext 不同,應(yīng)該使用 StreamingContext.getOrCreate() 函數(shù)。

配置過程如下:

  1. 啟動Driver自動重啟功能

·           standalone: 提交任務(wù)時添加 --supervise 參數(shù)
·           yarn:設(shè)置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts
  • · mesos: 提交任務(wù)時添加 --supervise 參數(shù)

  1. 設(shè)置checkpoint

StreamingContext.setCheckpoint(hdfsDirectory)

  1. 支持從checkpoint中重啟配置

def createContext(checkpointDirectory: String): StreamingContext = {

val ssc = new StreamingContext

ssc.checkpoint(checkpointDirectory)

ssc

}

val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))

  1. 工作節(jié)點容錯

為了應(yīng)對工作節(jié)點失敗的問題,Spark Streaming使用與Spark的容錯機制相同的方法。所 有從外部數(shù)據(jù)源中收到的數(shù)據(jù)都在多個工作節(jié)點上備份。所有從備份數(shù)據(jù)轉(zhuǎn)化操作的過程 中創(chuàng)建出來的 RDD 都能容忍一個工作節(jié)點的失敗,因為根據(jù) RDD 譜系圖,系統(tǒng)可以把丟 失的數(shù)據(jù)從幸存的輸入數(shù)據(jù)備份中重算出來。對于reduceByKey等Stateful操作重做的lineage較長的,強制啟動checkpoint,減少重做幾率

  1. 接收器容錯

運行接收器的工作節(jié)點的容錯也是很重要的。如果這樣的節(jié)點發(fā)生錯誤,Spark Streaming 會在集群中別的節(jié)點上重啟失敗的接收器。然而,這種情況會不會導(dǎo)致數(shù)據(jù)的丟失取決于 數(shù)據(jù)源的行為(數(shù)據(jù)源是否會重發(fā)數(shù)據(jù))以及接收器的實現(xiàn)(接收器是否會向數(shù)據(jù)源確認(rèn) 收到數(shù)據(jù))。舉個例子,使用 Flume 作為數(shù)據(jù)源時,兩種接收器的主要區(qū)別在于數(shù)據(jù)丟失 時的保障。在“接收器從數(shù)據(jù)池中拉取數(shù)據(jù)”的模型中,Spark 只會在數(shù)據(jù)已經(jīng)在集群中 備份時才會從數(shù)據(jù)池中移除元素。而在“向接收器推數(shù)據(jù)”的模型中,如果接收器在數(shù)據(jù) 備份之前失敗,一些數(shù)據(jù)可能就會丟失??偟膩碚f,對于任意一個接收器,你必須同時考 慮上游數(shù)據(jù)源的容錯性(是否支持事務(wù))來確保零數(shù)據(jù)丟失。

一般主要是通過將接收到數(shù)據(jù)后先寫日志(WAL)到可靠文件系統(tǒng)中,后才寫入實際的RDD。如果后續(xù)處理失敗則成功寫入WAL的數(shù)據(jù)通過WAL進行恢復(fù),未成功寫入WAL的數(shù)據(jù)通過可回溯的Source進行重放

總的來說,接收器提供以下保證。

? 所有從可靠文件系統(tǒng)中讀取的數(shù)據(jù)(比如通過StreamingContext.hadoopFiles讀取的) 都是可靠的,因為底層的文件系統(tǒng)是有備份的。Spark Streaming會記住哪些數(shù)據(jù)存放到 了檢查點中,并在應(yīng)用崩潰后從檢查點處繼續(xù)執(zhí)行。

? 對于像Kafka、推式Flume、Twitter這樣的不可靠數(shù)據(jù)源,Spark會把輸入數(shù)據(jù)復(fù)制到其 他節(jié)點上,但是如果接收器任務(wù)崩潰,Spark 還是會丟失數(shù)據(jù)。在 Spark 1.1 以及更早的版 本中,收到的數(shù)據(jù)只被備份到執(zhí)行器進程的內(nèi)存中,所以一旦驅(qū)動器程序崩潰(此時所 有的執(zhí)行器進程都會丟失連接),數(shù)據(jù)也會丟失。在 Spark 1.2 中,收到的數(shù)據(jù)被記錄到諸 如 HDFS 這樣的可靠的文件系統(tǒng)中,這樣即使驅(qū)動器程序重啟也不會導(dǎo)致數(shù)據(jù)丟失。

綜上所述,確保所有數(shù)據(jù)都被處理的最佳方式是使用可靠的數(shù)據(jù)源(例如 HDFS、拉式 Flume 等)。如果你還要在批處理作業(yè)中處理這些數(shù)據(jù),使用可靠數(shù)據(jù)源是最佳方式,因為 這種方式確保了你的批處理作業(yè)和流計算作業(yè)能讀取到相同的數(shù)據(jù),因而可以得到相同的結(jié)果。

操作過程如下:

  • 啟用checkpoint

ssc.setCheckpoint(checkpointDir)

  • 啟用WAL

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

  • 對Receiver使用可靠性存儲StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2

  1. 處理保證

由于Spark Streaming工作節(jié)點的容錯保障,Spark Streaming可以為所有的轉(zhuǎn)化操作提供 “精確一次”執(zhí)行的語義,即使一個工作節(jié)點在處理部分?jǐn)?shù)據(jù)時發(fā)生失敗,最終的轉(zhuǎn)化結(jié)

果(即轉(zhuǎn)化操作得到的 RDD)仍然與數(shù)據(jù)只被處理一次得到的結(jié)果一樣。

然而,當(dāng)把轉(zhuǎn)化操作得到的結(jié)果使用輸出操作推入外部系統(tǒng)中時,寫結(jié)果的任務(wù)可能因故 障而執(zhí)行多次,一些數(shù)據(jù)可能也就被寫了多次。由于這引入了外部系統(tǒng),因此我們需要專 門針對各系統(tǒng)的代碼來處理這樣的情況。我們可以使用事務(wù)操作來寫入外部系統(tǒng)(即原子 化地將一個 RDD 分區(qū)一次寫入),或者設(shè)計冪等的更新操作(即多次運行同一個更新操作 仍生成相同的結(jié)果)。比如 Spark Streaming 的 saveAs...File 操作會在一個文件寫完時自動 將其原子化地移動到最終位置上,以此確保每個輸出文件只存在一份。

  1. 性能考量

最常見的問題是Spark Streaming可以使用的最小批次間隔是多少。總的來說,500毫秒已經(jīng)被證實為對許多應(yīng)用而言是比較好的最小批次大小。尋找最小批次大小的最佳實踐是從一個比較大的批次大小(10 秒左右)開始,不斷使用更小的批次大小。如果 Streaming 用 戶界面中顯示的處理時間保持不變,你就可以進一步減小批次大小。如果處理時間開始增 加,你可能已經(jīng)達(dá)到了應(yīng)用的極限。

相似地,對于窗口操作,計算結(jié)果的間隔(也就是滑動步長)對于性能也有巨大的影響。 當(dāng)計算代價巨大并成為系統(tǒng)瓶頸時,就應(yīng)該考慮提高滑動步長了。

減少批處理所消耗時間的常見方式還有提高并行度。有以下三種方式可以提高并行度:

? 增加接收器數(shù)目 有時如果記錄太多導(dǎo)致單臺機器來不及讀入并分發(fā)的話,接收器會成為系統(tǒng)瓶頸。這時 你就需要通過創(chuàng)建多個輸入 DStream(這樣會創(chuàng)建多個接收器)來增加接收器數(shù)目,然 后使用 union 來把數(shù)據(jù)合并為一個數(shù)據(jù)源。

? 將收到的數(shù)據(jù)顯式地重新分區(qū)如果接收器數(shù)目無法再增加,你可以通過使用 DStream.repartition 來顯式重新分區(qū)輸 入流(或者合并多個流得到的數(shù)據(jù)流)來重新分配收到的數(shù)據(jù)。

? 提高聚合計算的并行度對于像 reduceByKey() 這樣的操作,你可以在第二個參數(shù)中指定并行度,我們在介紹 RDD 時提到過類似的手段。

  1. 高級解析

  1. DStreamGraph對象解析

在 Spark Streaming 中,DStreamGraph 是一個非常重要的組件,主要用來:

  1. 1. 通過成員 inputStreams 持有 Spark Streaming 輸入源及接收數(shù)據(jù)的方式

  1. 2. 通過成員 outputStreams 持有 Streaming app 的 output 操作,并記錄 DStream 依賴關(guān)系

  1. 3. 生成每個 batch 對應(yīng)的 jobs

下面,通過分析一個簡單的例子,結(jié)合源碼分析來說明 DStreamGraph 是如何發(fā)揮作用的。例子如下:

val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
 
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

創(chuàng)建 DStreamGraph 實例

代碼val ssc = new StreamingContext(sparkConf, Seconds(2))創(chuàng)建了 StreamingContext 實例,StreamingContext 包含了 DStreamGraph 類型的成員graph,graph 在 StreamingContext主構(gòu)造函數(shù)中被創(chuàng)建,如下

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(batchDur_)
      newGraph
    }
  }

可以看到,若當(dāng)前 checkpoint 可用,會優(yōu)先從 checkpoint 恢復(fù) graph,否則新建一個。還可以從這里知道的一點是:graph 是運行在 driver 上的

DStreamGraph記錄輸入源及如何接收數(shù)據(jù)

DStreamGraph有和application 輸入數(shù)據(jù)相關(guān)的成員和方法,如下:

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  def addInputStream(inputStream: InputDStream[_]) {
    this.synchronized {
      inputStream.setGraph(this)
      inputStreams += inputStream
    }
  }

成員inputStreams為 InputDStream 類型的數(shù)組,InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream,例如 FileInputDStream是 InputDStream 的子類,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream,需要繼承 ReceiverInputDStream,比如 KafkaReceiver。

我們來看看val lines = ssc.textFileStream(args(0))調(diào)用。

為了更容易理解,我畫出了val lines = ssc.textFileStream(args(0))的調(diào)用流程

從上面的調(diào)用流程圖我們可以知道:

ssc.textFileStream會觸發(fā)新建一個FileInputDStream。FileInputDStream繼承于InputDStream,其start()方法定義了數(shù)據(jù)源及如何接收數(shù)據(jù)

在FileInputDStream構(gòu)造函數(shù)中,會調(diào)用ssc.graph.addInputStream(this),將自身添加到 DStreamGraph 的 inputStreams: ArrayBuffer[InputDStream[_]] 中,這樣 DStreamGraph 就知道了這個 Streaming App 的輸入源及如何接收數(shù)據(jù)??赡苣銜婀譃槭裁磇nputStreams 是數(shù)組類型,舉個例子,這里再來一個 val lines1 = ssc.textFileStream(args(0)),那么又將生成一個 FileInputStream 實例添加到inputStreams,所以這里需要集合類型

生成FileInputDStream調(diào)用其 map 方法,將以 FileInputDStream 本身作為 partent 來構(gòu)造新的 MappedDStream。對于 DStream 的 transform 操作,都將生成一個新的 DStream,和 RDD transform 生成新的 RDD 類似

與MappedDStream 不同,所有繼承了 InputDStream 的定義了輸入源及接收數(shù)據(jù)方式的 sreams 都沒有 parent,因為它們就是最初的 streams。

DStream 的依賴鏈

每個 DStream 的子類都會繼承 def dependencies: List[DStream[_]] = List()方法,該方法用來返回自己的依賴的父 DStream 列表。比如,沒有父DStream 的 InputDStream 的 dependencies方法返回List()。

MappedDStream 的實現(xiàn)如下:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {
 
  override def dependencies: List[DStream[_]] = List(parent)
 
  ...
}

在上例中,構(gòu)造函數(shù)參數(shù)列表中的 parent 即在 ssc.textFileStream 中new 的定義了輸入源及數(shù)據(jù)接收方式的最初的 FileInputDStream實例,這里的 dependencies方法將返回該FileInputDStream實例,這就構(gòu)成了第一條依賴??捎萌缦聢D表示,這里特地將 input streams 用藍(lán)色表示,以強調(diào)其與普通由 transform 產(chǎn)生的 DStream 的不同:

繼續(xù)來看val words = lines.flatMap(_.split(" ")),flatMap如下:

  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

每一個 transform 操作都將創(chuàng)建一個新的 DStream,flatMap 操作也不例外,它會創(chuàng)建一個FlatMappedDStream,F(xiàn)latMappedDStream的實現(xiàn)如下:

class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => Traversable[U]
  ) extends DStream[U](parent.ssc) {
 
  override def dependencies: List[DStream[_]] = List(parent)
 
  ...
}

與 MappedDStream 相同,F(xiàn)latMappedDStream#dependencies也返回其依賴的父 DStream,及 lines,到這里,依賴鏈就變成了下圖:

之后的幾步操作不再這樣具體分析,到生成wordCounts時,依賴圖將變成下面這樣:

在 DStream 中,與 transofrm 相對應(yīng)的是 output 操作,包括 print, saveAsTextFiles, saveAsObjectFiles, saveAsHadoopFiles, foreachRDD。output 操作中,會創(chuàng)建ForEachDStream實例并調(diào)用register方法將自身添加到DStreamGraph.outputStreams成員中,該ForEachDStream實例也會持有是調(diào)用的哪個 output 操作。本例的代碼調(diào)用如下,只需看箭頭所指幾行代碼

與 DStream transform 操作返回一個新的 DStream 不同,output 操作不會返回任何東西,只會創(chuàng)建一個ForEachDStream作為依賴鏈的終結(jié)。

至此,生成了完成的依賴鏈,也就是 DAG,如下圖(這里將 ForEachDStream 標(biāo)為黃色以顯示其與眾不同):

  1. ReceiverTracker 與數(shù)據(jù)導(dǎo)入

Spark Streaming 在數(shù)據(jù)接收與導(dǎo)入方面需要滿足有以下三個特點:

兼容眾多輸入源,包括HDFS, Flume, Kafka, Twitter and ZeroMQ。還可以自定義數(shù)據(jù)源

要能為每個 batch 的 RDD 提供相應(yīng)的輸入數(shù)據(jù)

為適應(yīng) 7*24h 不間斷運行,要有接收數(shù)據(jù)掛掉的容錯機制

有容乃大,兼容眾多數(shù)據(jù)源

InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream,例如 FileInputDStream是 InputDStream 的子類,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream,需要繼承 ReceiverInputDStream,比如 KafkaReceiver

只需在 driver 端接收數(shù)據(jù)的 input stream 一般比較簡單且在生產(chǎn)環(huán)境中使用的比較少,本文不作分析,只分析繼承了 ReceiverInputDStream 的 input stream 是如何導(dǎo)入數(shù)據(jù)的。

ReceiverInputDStream有一個def getReceiver(): Receiver[T]方法,每個繼承了ReceiverInputDStream的 input stream 都必須實現(xiàn)這個方法。該方法用來獲取將要分發(fā)到各個 worker 節(jié)點上用來接收數(shù)據(jù)的 receiver(接收器)。不同的 ReceiverInputDStream 子類都有它們對應(yīng)的不同的 receiver,如KafkaInputDStream對應(yīng)KafkaReceiver,F(xiàn)lumeInputDStream對應(yīng)FlumeReceiver,TwitterInputDStream對應(yīng)TwitterReceiver,如果你要實現(xiàn)自己的數(shù)據(jù)源,也需要定義相應(yīng)的 receiver。

繼承 ReceiverInputDStream 并定義相應(yīng)的 receiver,就是 Spark Streaming 能兼容眾多數(shù)據(jù)源的原因。

為每個 batch 的 RDD 提供輸入數(shù)據(jù)

在 StreamingContext 中,有一個重要的組件叫做 ReceiverTracker,它是 Spark Streaming 作業(yè)調(diào)度器 JobScheduler 的成員,負(fù)責(zé)啟動、管理各個 receiver 及管理各個 receiver 接收到的數(shù)據(jù)。

確定 receiver 要分發(fā)到哪些 executors 上執(zhí)行

創(chuàng)建 ReceiverTracker 實例

我們來看 StreamingContext#start() 方法部分調(diào)用實現(xiàn),如下:

可以看到,StreamingContext#start() 會調(diào)用 JobScheduler#start() 方法,在 JobScheduler#start() 中,會創(chuàng)建一個新的 ReceiverTracker 實例 receiverTracker,并調(diào)用其 start() 方法。

ReceiverTracker#start()

繼續(xù)跟進 ReceiverTracker#start(),如下圖,它主要做了兩件事:

初始化一個 endpoint: ReceiverTrackerEndpoint,用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息

調(diào)用 launchReceivers 來自將各個 receivers 分發(fā)到 executors 上

ReceiverTracker#launchReceivers()

繼續(xù)跟進 launchReceivers,它也主要干了兩件事:

  1. 1. 獲取 DStreamGraph.inputStreams 中繼承了 ReceiverInputDStream 的 input streams 的 receivers。也就是數(shù)據(jù)接收器

  1. 2. 給消息接收處理器 endpoint 發(fā)送 StartAllReceivers(receivers)消息。直接返回,不等待消息被處理

處理StartAllReceivers消息

endpoint 在接收到消息后,會先判斷消息類型,對不同的消息做不同處理。對于StartAllReceivers消息,處理流程如下:

計算每個 receiver 要分發(fā)的目的 executors。遵循兩條原則:

將 receiver 分布的盡量均勻

如果 receiver 的preferredLocation本身不均勻,以preferredLocation為準(zhǔn)

遍歷每個 receiver,根據(jù)第1步中得到的目的 executors 調(diào)用 startReceiver 方法

到這里,已經(jīng)確定了每個 receiver 要分發(fā)到哪些 executors 上

啟動 receivers

接上,通過 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]) 來啟動 receivers,我們來看具體流程:

如上流程圖所述,分發(fā)和啟動 receiver 的方式不可謂不精彩。其中,startReceiverFunc 函數(shù)主要實現(xiàn)如下:

val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()

supervisor.start() 中會調(diào)用 receiver#onStart 后立即返回。receiver#onStart 一般自行新建線程或線程池來接收數(shù)據(jù),比如在 KafkaReceiver 中,就新建了線程池,在線程池中接收 topics 的數(shù)據(jù)。

supervisor.start() 返回后,由 supervisor.awaitTermination() 阻塞住線程,以讓這個 task 一直不退出,從而可以源源不斷接收數(shù)據(jù)。

數(shù)據(jù)流轉(zhuǎn)

上圖為 receiver 接收到的數(shù)據(jù)的流轉(zhuǎn)過程,讓我們來逐一分析

Step1: Receiver -> ReceiverSupervisor

這一步中,Receiver 將接收到的數(shù)據(jù)源源不斷地傳給 ReceiverSupervisor。Receiver 調(diào)用其 store(...) 方法,store 方法中繼續(xù)調(diào)用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法來傳遞數(shù)據(jù)。Receiver#store 有多重形式, ReceiverSupervisor 也有 pushSingle、pushArrayBuffer、pushIterator、pushBytes 方法與不同的 store 對應(yīng)。

pushSingle: 對應(yīng)單條小數(shù)據(jù)

pushArrayBuffer: 對應(yīng)數(shù)組形式的數(shù)據(jù)

pushIterator: 對應(yīng) iterator 形式數(shù)據(jù)

pushBytes: 對應(yīng) ByteBuffer 形式的塊數(shù)據(jù)

對于細(xì)小的數(shù)據(jù),存儲時需要 BlockGenerator 聚集多條數(shù)據(jù)成一塊,然后再成塊存儲;反之就不用聚集,直接成塊存儲。當(dāng)然,存儲操作并不在 Step1 中執(zhí)行,只為說明之后不同的操作邏輯。

Step2.1: ReceiverSupervisor -> BlockManager -> disk/memory

在這一步中,主要將從 receiver 收到的數(shù)據(jù)以 block(數(shù)據(jù)塊)的形式存儲

存儲 block 的是receivedBlockHandler: ReceivedBlockHandler,根據(jù)參數(shù)spark.streaming.receiver.writeAheadLog.enable配置的不同,默認(rèn)為 false,receivedBlockHandler對象對應(yīng)的類也不同,如下:

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    //< 先寫 WAL,再存儲到 executor 的內(nèi)存或硬盤
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    //< 直接存到 executor 的內(nèi)存或硬盤
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

啟動 WAL 的好處就是在application 掛掉之后,可以恢復(fù)數(shù)據(jù)。

//< 調(diào)用 receivedBlockHandler.storeBlock 方法存儲 block,并得到一個 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//< 使用blockStoreResult初始化一個ReceivedBlockInfo實例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//< 發(fā)送消息通知 ReceiverTracker 新增并存儲了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

不管是 WriteAheadLogBasedBlockHandler 還是 BlockManagerBasedBlockHandler 最終都是通過 BlockManager 將 block 數(shù)據(jù)存儲 execuor 內(nèi)存或磁盤或還有 WAL 方式存入。

這里需要說明的是 streamId,每個 InputDStream 都有它自己唯一的 id,即 streamId,blockInfo包含 streamId 是為了區(qū)分block 是哪個 InputDStream 的數(shù)據(jù)。之后為 batch 分配 blocks 時,需要知道每個 InputDStream 都有哪些未分配的 blocks。

Step2.2: ReceiverSupervisor -> ReceiverTracker

將 block 存儲之后,獲得 block 描述信息 blockInfo: ReceivedBlockInfo,這里面包含:streamId、數(shù)據(jù)位置、數(shù)據(jù)條數(shù)、數(shù)據(jù) size 等信息。

之后,封裝以 block 作為參數(shù)的 AddBlock(blockInfo) 消息并發(fā)送給 ReceiverTracker 以通知其有新增 block 數(shù)據(jù)塊。

Step3: ReceiverTracker -> ReceivedBlockTracker

ReceiverTracker 收到 ReceiverSupervisor 發(fā)來的 AddBlock(blockInfo) 消息后,直接調(diào)用以下代碼將 block 信息傳給 ReceivedBlockTracker:

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

receivedBlockTracker.addBlock(receivedBlockInfo)

}

receivedBlockTracker.addBlock中,如果啟用了 WAL,會將新增的 block 信息以 WAL 方式保存。

無論 WAL 是否啟用,都會將新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]中,該變量 key 為 InputDStream 的唯一 id,value 為已存儲未分配的 block 信息。之后為 batch 分配blocks,會訪問該結(jié)構(gòu)來獲取每個 InputDStream 對應(yīng)的未消費的 blocks。

  1. 動態(tài)生成JOB

JobScheduler有兩個重要成員,一是ReceiverTracker,負(fù)責(zé)分發(fā) receivers 及源源不斷地接收數(shù)據(jù);二是JobGenerator,負(fù)責(zé)定時的生成 jobs 并 checkpoint。

定時邏輯

在 JobScheduler 的主構(gòu)造函數(shù)中,會創(chuàng)建 JobGenerator 對象。在 JobGenerator 的主構(gòu)造函數(shù)中,會創(chuàng)建一個定時器:

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

該定時器每隔 ssc.graph.batchDuration.milliseconds 會執(zhí)行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息,eventLoop收到消息后會進行這個 batch 對應(yīng)的 jobs 的生成及提交執(zhí)行,eventLoop 是一個消息接收處理器。

需要注意的是,timer 在創(chuàng)建之后并不會馬上啟動,將在 StreamingContext#start() 啟動 Streaming Application 時間接調(diào)用到 timer.start(restartTime.milliseconds)才啟動。

為 batch 生成 jobs

eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后的主要處理流程有以上圖中三步:

將已接收到的 blocks 分配給 batch

生成該 batch 對應(yīng)的 jobs

將 jobs 封裝成 JobSet 并提交執(zhí)行

接下來我們就將逐一展開這三步進行分析

將已接受到的 blocks 分配給 batch

上圖是根據(jù)源碼畫出的為 batch 分配 blocks 的流程圖,這里對 『獲得 batchTime 各個 InputDStream 未分配的 blocks』作進一步說明:

我們知道了各個 ReceiverInputDStream 對應(yīng)的 receivers 接收并保存的 blocks 信息會保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues,該成員 key 為 streamId,value 為該 streamId 對應(yīng)的 InputDStream 已接收保存但尚未分配的 blocks 信息。

所以獲取某 InputDStream 未分配的 blocks 只要以該 InputDStream 的 streamId 來從 streamIdToUnallocatedBlockQueues 來 get 就好。獲取之后,會清楚該 streamId 對應(yīng)的value,以保證 block 不會被重復(fù)分配。

在實際調(diào)用中,為 batchTime 分配 blocks 時,會從streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞進 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中,以在之后作為該 batchTime 對應(yīng)的 RDD 的輸入數(shù)據(jù)。

通過以上步驟,就可以為 batch 的所有 InputDStream 分配 blocks。也就是為 batch 分配了 blocks。

生成該 batch 對應(yīng)的 jobs

為指定 batchTime 生成 jobs 的邏輯如上圖所示。你可能會疑惑,為什么 DStreamGraph#generateJobs(time: Time)為什么返回 Seq[Job],而不是單個 job。這是因為,在一個 batch 內(nèi),可能會有多個 OutputStream 執(zhí)行了多次 output 操作,每次 output 操作都將產(chǎn)生一個 Job,最終就會產(chǎn)生多個 Jobs。

我們結(jié)合上圖對執(zhí)行流程進一步分析。

在DStreamGraph#generateJobs(time: Time)中,對于DStreamGraph成員ArrayBuffer[DStream[_]]的每一項,調(diào)用DStream#generateJob(time: Time)來生成這個 outputStream 在該 batchTime 的 job。該生成過程主要有三步:

Step1: 獲取該 outputStream 在該 batchTime 對應(yīng)的 RDD

每個 DStream 實例都有一個 generatedRDDs: HashMap[Time, RDD[T]] 成員,用來保存該 DStream 在每個 batchTime 生成的 RDD,當(dāng) DStream#getOrCompute(time: Time)調(diào)用時

  • 首先會查看generatedRDDs中是否已經(jīng)有該 time 對應(yīng)的 RDD,若有則直接返回

  • 若無,則調(diào)用compute(validTime: Time)來生成 RDD,這一步根據(jù)每個 InputDStream繼承 compute 的實現(xiàn)不同而不同。例如,對于 FileInputDStream,其 compute 實現(xiàn)邏輯如下:

  1. 1. 先通過一個 findNewFiles() 方法,找到多個新 file

  1. 2. 對每個新 file,都將其作為參數(shù)調(diào)用 sc.newAPIHadoopFile(file),生成一個 RDD 實例

  1. 3. 將 2 中的多個新 file 對應(yīng)的多個 RDD 實例進行 union,返回一個 union 后的 UnionRDD

Step2: 根據(jù) Step1中得到的 RDD 生成最終 job 要執(zhí)行的函數(shù) jobFunc

jobFunc定義如下:

val jobFunc = () => {

val emptyFunc = { (iterator: Iterator[T]) => {} }

context.sparkContext.runJob(rdd, emptyFunc)

}

可以看到,每個 outputStream 的 output 操作生成的 Job 其實與 RDD action 一樣,最終調(diào)用 SparkContext#runJob 來提交 RDD DAG 定義的任務(wù)

Step3: 根據(jù) Step2中得到的 jobFunc 生成最終要執(zhí)行的 Job 并返回

Step2中得到了定義 Job 要干嘛的函數(shù)-jobFunc,這里便以 jobFunc及 batchTime 生成 Job 實例:

Some(new Job(time, jobFunc))

該Job實例將最終封裝在 JobHandler 中被執(zhí)行

至此,我們搞明白了 JobScheduler 是如何通過一步步調(diào)用來動態(tài)生成每個 batchTime 的 jobs。下文我們將分析這些動態(tài)生成的 jobs 如何被分發(fā)及如何執(zhí)行。

  1. job 的提交與執(zhí)行

我們分析了 JobScheduler 是如何動態(tài)為每個 batch生成 jobs,那么生成的 jobs 是如何被提交的。

在 JobScheduler 生成某個 batch 對應(yīng)的 Seq[Job] 之后,會將 batch 及 Seq[Job] 封裝成一個 JobSet 對象,JobSet 持有某個 batch 內(nèi)所有的 jobs,并記錄各個 job 的運行狀態(tài)。

之后,調(diào)用JobScheduler#submitJobSet(jobSet: JobSet)來提交 jobs,在該函數(shù)中,除了一些狀態(tài)更新,主要任務(wù)就是執(zhí)行

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

即,對于 jobSet 中的每一個 job,執(zhí)行jobExecutor.execute(new JobHandler(job)),要搞懂這行代碼干了什么,就必須了解 JobHandler 及 jobExecutor。

JobHandler

JobHandler 繼承了 Runnable,為了說明與 job 的關(guān)系,其精簡后的實現(xiàn)如下:

private class JobHandler(job: Job) extends Runnable with Logging {
  import JobScheduler._
  def run() {
    _eventLoop.post(JobStarted(job))
    PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
      job.run()
    _eventLoop = eventLoop
    }
    if (_eventLoop != null) {
      _eventLoop.post(JobCompleted(job))
    }
  }
}

JobHandler#run 方法主要執(zhí)行了 job.run(),該方法最終將調(diào)用到

『生成該 batch 對應(yīng)的 jobs的Step2 定義的 jobFunc』,jonFunc 將提交對應(yīng) RDD DAG 定義的 job。

JobExecutor

知道了 JobHandler 是用來執(zhí)行 job 的,那么 JobHandler 將在哪里執(zhí)行 job 呢?答案是

jobExecutor,jobExecutor為 JobScheduler 成員,是一個線程池,在JobScheduler 主構(gòu)造函數(shù)中創(chuàng)建,如下:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

JobHandler 將最終在 線程池jobExecutor 的線程中被調(diào)用,jobExecutor的線程數(shù)可通過spark.streaming.concurrentJobs配置,默認(rèn)為1。若配置多個線程,就能讓多個 job 同時運行,若只有一個線程,那么同一時刻只能有一個 job 運行。

以上,即 jobs 被執(zhí)行的邏輯。

  1. Block 的生成與存儲

ReceiverSupervisorImpl共提供了4個將從 receiver 傳遞過來的數(shù)據(jù)轉(zhuǎn)換成 block 并存儲的方法,分別是:

pushSingle: 處理單條數(shù)據(jù)

pushArrayBuffer: 處理數(shù)組形式數(shù)據(jù)

pushIterator: 處理 iterator 形式處理

pushBytes: 處理 ByteBuffer 形式數(shù)據(jù)

其中,pushArrayBuffer、pushIterator、pushBytes最終調(diào)用pushAndReportBlock;而pushSingle將調(diào)用defaultBlockGenerator.addData(data),我們分別就這兩種形式做說明

pushAndReportBlock

我們針對存儲 block 簡化 pushAndReportBlock 后的代碼如下:

def pushAndReportBlock(
  receivedBlock: ReceivedBlock,
  metadataOption: Option[Any],
  blockIdOption: Option[StreamBlockId]
) {
  ...
  val blockId = blockIdOption.getOrElse(nextBlockId)
  receivedBlockHandler.storeBlock(blockId, receivedBlock)
  ...
}

首先獲取一個新的 blockId,之后調(diào)用 receivedBlockHandler.storeBlock, receivedBlockHandler 在 ReceiverSupervisorImpl 構(gòu)造函數(shù)中初始化。當(dāng)啟用了 checkpoint 且 spark.streaming.receiver.writeAheadLog.enable 為 true 時,receivedBlockHandler 被初始化為 WriteAheadLogBasedBlockHandler 類型;否則將初始化為 BlockManagerBasedBlockHandler類型。

WriteAheadLogBasedBlockHandler#storeBlock 將 ArrayBuffer, iterator, bytes 類型的數(shù)據(jù)序列化后得到的 serializedBlock

交由 BlockManager 根據(jù)設(shè)置的 StorageLevel 存入 executor 的內(nèi)存或磁盤中

通過 WAL 再存儲一份

而BlockManagerBasedBlockHandler#storeBlock將 ArrayBuffer, iterator, bytes 類型的數(shù)據(jù)交由 BlockManager 根據(jù)設(shè)置的 StorageLevel 存入 executor 的內(nèi)存或磁盤中,并不再通過 WAL 存儲一份

pushSingle

pushSingle將調(diào)用 BlockGenerator#addData(data: Any) 通過積攢的方式來存儲數(shù)據(jù)。接下來對 BlockGenerator 是如何積攢一條一條數(shù)據(jù)最后寫入 block 的邏輯

上圖為 BlockGenerator 的各個成員,首選對各個成員做介紹:

currentBuffer

變長數(shù)組,當(dāng) receiver 接收的一條一條的數(shù)據(jù)將會添加到該變長數(shù)組的尾部

可能會有一個 receiver 的多個線程同時進行添加數(shù)據(jù),這里是同步操作

添加前,會由 rateLimiter 檢查一下速率,是否加入的速度過快。如果過快的話就需要 block 住,等到下一秒再開始添加。最高頻率由 spark.streaming.receiver.maxRate 控制,默認(rèn)值為 Long.MaxValue,具體含義是單個 Receiver 每秒鐘允許添加的條數(shù)。

blockIntervalTimer & blockIntervalMs

分別是定時器和時間間隔。blockIntervalTimer中有一個線程,每隔blockIntervalMs會執(zhí)行以下操作:

將 currentBuffer 賦值給 newBlockBuffer

將 currentBuffer 指向新的空的 ArrayBuffer 對象

將 newBlockBuffer 封裝成 newBlock

將 newBlock 添加到 blocksForPushing 隊列中blockIntervalMs 由 spark.streaming.blockInterval 控制,默認(rèn)是 200ms。

blockPushingThread & blocksForPushing & blockQueueSize

blocksForPushing 是一個定長數(shù)組,長度由 blockQueueSize 決定,默認(rèn)為10,可通過 spark.streaming.blockQueueSize 改變。上面分析到,blockIntervalTimer中的線程會定時將 block 塞入該隊列。

還有另一條線程不斷送該隊列中取出 block,然后調(diào)用 ReceiverSupervisorImpl.pushArrayBuffer(...) 來將 block 存儲,這條線程就是blockPushingThread。

PS: blocksForPushing為ArrayBlockingQueue類型。ArrayBlockingQueue是一個阻塞隊列,能夠自定義隊列大小,當(dāng)插入時,如果隊列已經(jīng)沒有空閑位置,那么新的插入線程將阻塞到該隊列,一旦該隊列有空閑位置,那么阻塞的線程將執(zhí)行插入

以上,通過分析各個成員,也說明了 BlockGenerator 是如何存儲單條數(shù)據(jù)的。文章來源地址http://www.zghlxwxcb.cn/news/detail-782662.html

到了這里,關(guān)于spark介紹之spark streaming的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink與Spark Streaming在與kafka結(jié)合的區(qū)別!

    Flink與Spark Streaming在與kafka結(jié)合的區(qū)別!

    首先,我們先看下圖,這是一張生產(chǎn)消息到kafka,從kafka消費消息的結(jié)構(gòu)圖。 當(dāng)然, 這張圖很簡單,拿這張圖的目的是從中可以得到的跟本節(jié)文章有關(guān)的消息,有以下兩個: 1,kafka中的消息不是kafka主動去拉去的,而必須有生產(chǎn)者往kafka寫消息。 2,kafka是不會主動往消費者發(fā)

    2024年04月17日
    瀏覽(36)
  • 推薦系統(tǒng)架構(gòu)設(shè)計實踐:Spark Streaming+Kafka構(gòu)建實時推薦系統(tǒng)架構(gòu)

    作者:禪與計算機程序設(shè)計藝術(shù) 推薦系統(tǒng)(Recommendation System)一直都是互聯(lián)網(wǎng)領(lǐng)域一個非?;馃岬脑掝}。其主要目標(biāo)是在用戶多樣化的信息環(huán)境中,通過分析用戶的偏好、消費習(xí)慣等數(shù)據(jù),提供個性化的信息推送、商品推薦、購物指導(dǎo)等服務(wù)。如何設(shè)計一個推薦系統(tǒng)的架構(gòu)及

    2024年02月08日
    瀏覽(27)
  • Discuz論壇網(wǎng)站標(biāo)題欄Powered by Discuz!版權(quán)信息如何去除或是修改?

    Discuz論壇網(wǎng)站標(biāo)題欄Powered by Discuz!版權(quán)信息如何去除或是修改?

    當(dāng)我們搭建好DZ論壇網(wǎng)站后,為了美化網(wǎng)站,想把標(biāo)題欄的Powered?by?Discuz!去除或是修改,應(yīng)該如何操作呢?今天飛飛和你分享,在操作前務(wù)必把網(wǎng)站源碼和數(shù)據(jù)庫都備份到本地或是網(wǎng)盤。 ? Discuz的版權(quán)信息存在兩處地方,一個是標(biāo)題欄,一個是底部。一般為了美化修改個標(biāo)

    2024年02月08日
    瀏覽(96)
  • (C#) IIS 響應(yīng)標(biāo)頭過濾敏感信息(如:Server/X-Powered-By等) 運維知識

    (C#) IIS 響應(yīng)標(biāo)頭過濾敏感信息(如:Server/X-Powered-By等) 運維知識

    再一次凈網(wǎng)行動中,客戶要求安全改造發(fā)現(xiàn)了接口請求的header標(biāo)頭中出現(xiàn)如圖中的敏感信息。 ? 其意義在于告知瀏網(wǎng)站是用什么語言或者框架編寫的。解決辦法就是修改該響應(yīng)頭為一個錯誤的值,將攻擊者導(dǎo)向一個錯誤的方向。 這里只說windows 的iis環(huán)境,不考慮其他服務(wù)器的

    2024年02月11日
    瀏覽(115)
  • 大數(shù)據(jù)——Spark Streaming

    大數(shù)據(jù)——Spark Streaming

    Spark Streaming是一個可擴展、高吞吐、具有容錯性的流式計算框架。 之前我們接觸的spark-core和spark-sql都是離線批處理任務(wù),每天定時處理數(shù)據(jù),對于數(shù)據(jù)的實時性要求不高,一般都是T+1的。但在企業(yè)任務(wù)中存在很多的實時性的任務(wù)需求,列如雙十一的京東阿里都會要求做一個

    2024年02月07日
    瀏覽(24)
  • Spark Streaming實時數(shù)據(jù)處理

    作者:禪與計算機程序設(shè)計藝術(shù) Apache Spark?Streaming是一個構(gòu)建在Apache Spark?之上的快速、微批次、容錯的流式數(shù)據(jù)處理系統(tǒng),它可以對實時數(shù)據(jù)進行高吞吐量、低延遲地處理。Spark Streaming既可用于流計算場景也可用于離線批處理場景,而且可以將結(jié)構(gòu)化或無結(jié)構(gòu)化數(shù)據(jù)源(如

    2024年02月06日
    瀏覽(27)
  • Spark Streaming實時流式數(shù)據(jù)處理

    作者:禪與計算機程序設(shè)計藝術(shù) Apache Spark Streaming 是 Apache Spark 提供的一個用于高吞吐量、容錯的流式數(shù)據(jù)處理引擎。它可以實時的接收數(shù)據(jù)并在系統(tǒng)內(nèi)部以微批次的方式進行處理,并將結(jié)果輸出到文件、數(shù)據(jù)庫或?qū)崟r消息系統(tǒng)中。Spark Streaming 支持 Java、Scala 和 Python 編程語言

    2024年02月08日
    瀏覽(24)
  • 大數(shù)據(jù)編程實驗四:Spark Streaming

    大數(shù)據(jù)編程實驗四:Spark Streaming

    一、目的與要求 1、通過實驗掌握Spark Streaming的基本編程方法; 2、熟悉利用Spark Streaming處理來自不同數(shù)據(jù)源的數(shù)據(jù)。 3、熟悉DStream的各種轉(zhuǎn)換操作。 4、熟悉把DStream的數(shù)據(jù)輸出保存到文本文件或MySQL數(shù)據(jù)庫中。 二、實驗內(nèi)容 1.參照教材示例,利用Spark Streaming對三種類型的基

    2024年02月03日
    瀏覽(24)
  • 大數(shù)據(jù)流處理與實時分析:Spark Streaming和Flink Stream SQL的對比與選擇

    作者:禪與計算機程序設(shè)計藝術(shù)

    2024年02月07日
    瀏覽(26)
  • spark 的group by ,join數(shù)據(jù)傾斜調(diào)優(yōu)

    spark任務(wù)中最常見的耗時原因就是數(shù)據(jù)分布不均勻,從而導(dǎo)致有些task運行時間很長,長尾效應(yīng)導(dǎo)致的整個job運行耗時很長 首先我們要定位數(shù)據(jù)傾斜,我們可以通過在spark ui界面中查看某個stage下的task的耗時,如果發(fā)現(xiàn)某些task耗時很長,對應(yīng)要處理的數(shù)據(jù)很多,證明有數(shù)據(jù)傾斜

    2024年02月21日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包