目錄
一、Spark Streaming概述
二、添加依賴
三、配置log4j
1.依賴下載好后打開IDEA最左側(cè)的外部庫(kù)
2.找到spark-core
3.找到apache.spark目錄
4.找到log4j-defaults.properties文件
5.將該文件放在資源目錄下,并修改文件名
6.修改log4j.properties第19行的內(nèi)容
四、Spark Streaming讀取Socket數(shù)據(jù)流
1.代碼編寫
2.開啟nc -lk
3.啟動(dòng)Scala程序
五、Spark Streaming讀取kafka消息
1.代碼編寫
2.開啟生產(chǎn)者sparkkafkastu并生產(chǎn)消息
3. 運(yùn)行scala代碼
一、Spark Streaming概述
????????Spark Streaming 用于流式數(shù)據(jù)的處理。Spark Streaming 支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡(jiǎn)單的 TCP 套接字等等。數(shù)據(jù)輸入后可以用 Spark 的RDD如:map、reduce、join、window 等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如 HDFS,數(shù)據(jù)庫(kù)等。
?????????Spark Streaming與Flink的區(qū)別:Spark Streaming是基于秒級(jí)別,而Flink是基于毫秒級(jí)別,是真正的實(shí)時(shí)流,Spark Streaming屬于偽實(shí)時(shí)。因此,在選擇實(shí)時(shí)流計(jì)算框架時(shí),如果對(duì)實(shí)時(shí)速度要求不高的話,選擇Spark Streaming基本足夠。
????????Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個(gè) RDD 序列,每個(gè)RDD代表數(shù)據(jù)流中一個(gè)時(shí)間片內(nèi)的數(shù)據(jù)。
????????應(yīng)用于 DStream 上的轉(zhuǎn)換操作都會(huì)轉(zhuǎn)換為底層RDD上的操作。如對(duì)行 DStream中的每個(gè)RDD應(yīng)用flatMap操作以生成單詞 DStream 的RDD。
二、添加依賴
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.1.2</spark.version>
<mysql.version>8.0.29</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
三、配置log4j
1.依賴下載好后打開IDEA最左側(cè)的外部庫(kù)
2.找到spark-core
3.找到apache.spark目錄
4.找到log4j-defaults.properties文件
5.將該文件放在資源目錄下,并修改文件名
6.修改log4j.properties第19行的內(nèi)容
log4j.rootCategory=ERROR, console
四、Spark Streaming讀取Socket數(shù)據(jù)流
1.代碼編寫
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")
// 定義流,采集周期3秒
val streamingContext = new StreamingContext(conf, Seconds(3))
// TODO 配置數(shù)據(jù)源為指定機(jī)器和端口
val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)
// TODO 業(yè)務(wù)處理
val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))
val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)
// TODO 輸出結(jié)果
wordCountStream.print()
// TODO 啟動(dòng)采集器
streamingContext.start()
streamingContext.awaitTermination()
}
}
2.開啟nc -lk
3.啟動(dòng)Scala程序
五、Spark Streaming讀取kafka消息
1.代碼編寫
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingKafkaSource {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
// 如果沒有topic需要?jiǎng)?chuàng)建
// kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1
ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
)
// KeyValue(key,value)
val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
wordCountStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
2.開啟生產(chǎn)者sparkkafkastu并生產(chǎn)消息
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-426475.html
3. 運(yùn)行scala代碼
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-426475.html
到了這里,關(guān)于SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!