国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

這篇具有很好參考價(jià)值的文章主要介紹了SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

一、Spark Streaming概述

二、添加依賴

三、配置log4j

1.依賴下載好后打開IDEA最左側(cè)的外部庫(kù)

2.找到spark-core

3.找到apache.spark目錄

4.找到log4j-defaults.properties文件

5.將該文件放在資源目錄下,并修改文件名

6.修改log4j.properties第19行的內(nèi)容

四、Spark Streaming讀取Socket數(shù)據(jù)流

1.代碼編寫

2.開啟nc -lk

3.啟動(dòng)Scala程序

五、Spark Streaming讀取kafka消息

1.代碼編寫

2.開啟生產(chǎn)者sparkkafkastu并生產(chǎn)消息

3. 運(yùn)行scala代碼


一、Spark Streaming概述

????????Spark Streaming 用于流式數(shù)據(jù)的處理。Spark Streaming 支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡(jiǎn)單的 TCP 套接字等等。數(shù)據(jù)輸入后可以用 Spark 的RDD如:map、reduce、join、window 等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如 HDFS,數(shù)據(jù)庫(kù)等。

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

?????????Spark Streaming與Flink的區(qū)別:Spark Streaming是基于秒級(jí)別,而Flink是基于毫秒級(jí)別,是真正的實(shí)時(shí)流,Spark Streaming屬于偽實(shí)時(shí)。因此,在選擇實(shí)時(shí)流計(jì)算框架時(shí),如果對(duì)實(shí)時(shí)速度要求不高的話,選擇Spark Streaming基本足夠。

????????Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個(gè) RDD 序列,每個(gè)RDD代表數(shù)據(jù)流中一個(gè)時(shí)間片內(nèi)的數(shù)據(jù)。

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

????????應(yīng)用于 DStream 上的轉(zhuǎn)換操作都會(huì)轉(zhuǎn)換為底層RDD上的操作。如對(duì)行 DStream中的每個(gè)RDD應(yīng)用flatMap操作以生成單詞 DStream 的RDD。 SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

二、添加依賴

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.1.2</spark.version>
    <mysql.version>8.0.29</mysql.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--  https://mvnrepository.com/artifact/org.apache.spark/spark-core  -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>

三、配置log4j

1.依賴下載好后打開IDEA最左側(cè)的外部庫(kù)

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

2.找到spark-core

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

3.找到apache.spark目錄

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

4.找到log4j-defaults.properties文件

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

5.將該文件放在資源目錄下,并修改文件名

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

6.修改log4j.properties第19行的內(nèi)容

log4j.rootCategory=ERROR, console

四、Spark Streaming讀取Socket數(shù)據(jù)流

1.代碼編寫

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")
    // 定義流,采集周期3秒
    val streamingContext = new StreamingContext(conf, Seconds(3))
    // TODO 配置數(shù)據(jù)源為指定機(jī)器和端口
    val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)
    // TODO 業(yè)務(wù)處理
    val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))
    val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))
    val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)

    // TODO 輸出結(jié)果
    wordCountStream.print()
    // TODO 啟動(dòng)采集器
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.開啟nc -lk

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

3.啟動(dòng)Scala程序

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

五、Spark Streaming讀取kafka消息

1.代碼編寫

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafkaSource {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
    )
    
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果沒有topic需要?jiǎng)?chuàng)建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
    )

    // KeyValue(key,value)
    val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)

    wordCountStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.開啟生產(chǎn)者sparkkafkastu并生產(chǎn)消息

SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

3. 運(yùn)行scala代碼

?SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-426475.html

到了這里,關(guān)于SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka - 獲取 Topic 生產(chǎn)者發(fā)布數(shù)據(jù)命令

    從頭開始獲取 20 條數(shù)據(jù)(等價(jià)于時(shí)間升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 獲取最新 20 條數(shù)據(jù)(等價(jià)于時(shí)間降序)去掉 --from-beginning 即可(默認(rèn)) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic?--max-me

    2024年02月14日
    瀏覽(23)
  • 大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)

    大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)

    Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。 發(fā)布/訂閱:消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。 目前企業(yè)中比較常見的消息隊(duì)列產(chǎn)品主要有Kafka、ActiveM

    2024年01月19日
    瀏覽(22)
  • Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    1.1、至少一次 至少一次(At Least Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,kafka集群至少接收到一次數(shù)據(jù)。 至少一次的條件: ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2 1.2、最多一次 最多一次(At Most Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,

    2024年02月01日
    瀏覽(19)
  • Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    單分區(qū)內(nèi),數(shù)據(jù)有序。如下圖partion0、partion1、partion2分區(qū)內(nèi),各自分區(qū)內(nèi)的數(shù)據(jù)有序。 2.1、kafka1.x版本之前保證數(shù)據(jù)單分區(qū)有序的條件 kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下: 2.2、kafka1.x版本及以后保證數(shù)據(jù)單分區(qū)有序的條件 未開啟冪等性 開啟冪等性 2.3、kafka1

    2023年04月27日
    瀏覽(29)
  • java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會(huì)從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。 acks :Broker對(duì)消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會(huì)等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會(huì)等待Broker的Leader副本確認(rèn)

    2024年02月16日
    瀏覽(36)
  • kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開啟冪等性和事務(wù)的作用?

    kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開啟冪等性和事務(wù)的作用?

    適用于消息在寫入到服務(wù)器日志后,由于網(wǎng)絡(luò)故障,生產(chǎn)者沒有及時(shí)收到服務(wù)端的 ACK 消息,生產(chǎn)者誤以為消息沒有持久化到服務(wù)端,導(dǎo)致生產(chǎn)者重復(fù)發(fā)送該消息,造成了消息的重復(fù)現(xiàn)象,而冪等性就是為了解決該問題。 通過3個(gè)值的唯一性去重: PID:生產(chǎn)者ID 分區(qū)號(hào) seq:?jiǎn)?/p>

    2024年02月14日
    瀏覽(17)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題

    [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題 Kafka是一個(gè)高度可擴(kuò)展的分布式流平臺(tái),用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流處理應(yīng)用程序。作為一個(gè)廣泛使用的消息代理系統(tǒng),Kafka在數(shù)據(jù)傳輸方面表現(xiàn)出色,但是在極端情況下,它可能會(huì)出現(xiàn)生產(chǎn)者阻塞的問題。這可能會(huì)導(dǎo)致

    2024年02月11日
    瀏覽(22)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過程 初始化過程中會(huì)新建很多對(duì)象,本文暫先分享部分對(duì)象 1.分區(qū)器---Partitioner partitioner 2.重試時(shí)間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(37)
  • Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Kafka 生產(chǎn)者是 Apache Kafka 中的一個(gè)重要組件,它負(fù)責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群中。在實(shí)時(shí)數(shù)據(jù)處理和流式處理應(yīng)用程序中,Kafka 生產(chǎn)者扮演著非常重要的角色。 這里我們將介紹 Kafka 生產(chǎn)者的概念、工作原理以及如何使用 Kafka 生產(chǎn)者。 Kafka 生產(chǎn)者是一種用于將數(shù)據(jù)發(fā)送到 Kafk

    2024年02月05日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包