目錄
1. RDD隊列
2 textFileStream
3 DIY采集器
4 kafka數(shù)據(jù)源【重點】
1. RDD隊列
? ? ? ?a、使用場景:測試
? ? ? ?b、實現(xiàn)方式: 通過ssc.queueStream(queueOfRDDs)創(chuàng)建DStream,每一個推送這個隊列的RDD,都會作為一個DStream處理
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkconf,Seconds(3))
// 創(chuàng)建一個隊列對象,隊列中存放的是RDD
val queue = new mutable.Queue[RDD[String]]()
// 通過隊列創(chuàng)建DStream
val queueDS: InputDStream[String] = ssc.queueStream(queue)
queueDS.print()
// 啟動采集器
ssc.start()
//這個操作之所以放在這個位置,是為了模擬流式的感覺,數(shù)據(jù)源源不斷的生產(chǎn)
for(i <- 1 to 5 ){
// 循環(huán)創(chuàng)建rdd
val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
// 將RDD存放到隊列中
queue.enqueue(rdd)
// 當(dāng)前線程休眠1秒
Thread.sleep(6000)
}
// 等待采集器的結(jié)束
ssc.awaitTermination()
}
2 textFileStream
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
//從文件中讀取數(shù)據(jù)
val textDS: DStream[String] = ssc.textFileStream("in")
textDS.print()
// 啟動采集器
ssc.start()
// 等待采集器的結(jié)束
ssc.awaitTermination()
3 DIY采集器
? ? 1. 自定義采集器
? ? 2. 什么情況下需要自定采集器呢?
? ? ? ? ?比如從mysql、hbase中讀取數(shù)據(jù)。
? ? ? ? ?采集器的作用是從指定的地方,按照采集周期對數(shù)據(jù)進行采集。
? ? ? ? ?目前有:采集kafka、采集netcat工具的指定端口的數(shù)據(jù)、采集文件目錄中的數(shù)據(jù)等
? ? 3. 自定義采集器的步驟,模仿socketTextStream
? ? ? ? ?a、自定采集器類,繼承extends,并指定數(shù)據(jù)泛型,同時對父類的屬性賦值,指定數(shù)據(jù)存儲的級別
? ? ? ? ?b、重寫onStart和onStop方法
? ? ? ? ? ? onStart:采集器的如何啟動
? ? ? ? ? ? onStop:采集的如何停止
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 獲取采集的流
val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
ds.print()
ssc.start()
ssc.awaitTermination()
}
// 繼承extends Reciver,并指定數(shù)據(jù)泛型,同時對父類的屬性賦值,指定數(shù)據(jù)存儲的級別
class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var socket: Socket = _
def receive = {
// 獲取輸入流
val reader = new BufferedReader(
new InputStreamReader(
socket.getInputStream,
"UTF-8"
)
)
// 設(shè)定一個間接變量
var s: String = null
while (true) {
// 按行讀取數(shù)據(jù)
s = reader.readLine()
if (s != null) {
// 將數(shù)據(jù)進行封裝
store(s)
}
}
}
// 1. 啟動采集器
override def onStart(): Unit = {
socket = new Socket(host, port)
new Thread("Socket Receiver") {
setDaemon(true)
override def run() {
receive
}
}.start()
}
// 2. 停止采集器
override def onStop(): Unit = {
socket.close()
socket = null
}
}
4 kafka數(shù)據(jù)源【重點】
-- DirectAPI:是由計算的Executor來主動消費Kafka的數(shù)據(jù),速度由自身控制。
-- 配置信息基本上是固定寫法文章來源:http://www.zghlxwxcb.cn/news/detail-723305.html
// TODO Spark環(huán)境
// SparkStreaming使用核數(shù)最少是2個
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// TODO 使用SparkStreaming讀取Kafka的數(shù)據(jù)
// Kafka的配置信息
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
)
// 獲取數(shù)據(jù),key是null,value是真實的數(shù)據(jù)
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
// 等待采集器的結(jié)束
ssc.awaitTermination()
?文章來源地址http://www.zghlxwxcb.cn/news/detail-723305.html
到了這里,關(guān)于spark DStream從不同數(shù)據(jù)源采集數(shù)據(jù)(RDD 隊列、文件、diy 采集器、kafka)(scala 編程)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!