国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark讀取kafka(流式和批數(shù)據(jù))

這篇具有很好參考價(jià)值的文章主要介紹了Spark讀取kafka(流式和批數(shù)據(jù))。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

spark讀取kafka(批數(shù)據(jù)處理)

Spark讀取kafka(流式和批數(shù)據(jù)),Spark階段,spark,kafka,大數(shù)據(jù)

Spark讀取kafka(流式和批數(shù)據(jù)),Spark階段,spark,kafka,大數(shù)據(jù)

# 按照偏移量讀取kafka數(shù)據(jù)
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# spark讀取kafka
options = {
    # 寫kafka配置信息
    # 指定kafka的連接的broker服務(wù)節(jié)點(diǎn)信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主題
    'subscribe': 'itcast',# 讀取的主題不存在會(huì)自動(dòng)創(chuàng)建
    # todo 注意一:連接的配置
    #       主題名稱 ,分區(qū)編號(hào),偏移量
    # 指定起始偏移量   {主題名稱:{分區(qū)編號(hào)0:偏移量,分區(qū)編號(hào)1:偏移量....}}
    'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,
    # 指定結(jié)束偏移量  {主題名稱:{分區(qū)編號(hào)0:偏移量,分區(qū)編號(hào)1:偏移量....}}
    'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """
    # 注意點(diǎn)  : 偏移量的區(qū)間是左閉右開 ,結(jié)束偏移的指定按照最大偏移量加一 ,所有分區(qū)都要指定
}
# 讀取
# format 指定讀取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:這一步的數(shù)據(jù)處理(將value轉(zhuǎn)化為字符串類型)是必須做的,不然你看不懂?dāng)?shù)據(jù)。
#       可以用df.的方式,那我后來怎么都沒怎么見過了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df數(shù)據(jù)
# todo 注意三:這里使用.show()的方式的,是因?yàn)樗怯薪绫?df_select.show()

Spark讀取kafka(流式和批數(shù)據(jù)),Spark階段,spark,kafka,大數(shù)據(jù)

spark讀取kafka(流數(shù)據(jù)處理)

Spark讀取kafka(流式和批數(shù)據(jù)),Spark階段,spark,kafka,大數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-811711.html

# 流式讀取kafka數(shù)據(jù)
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# todo 注意一:定義kafka的連接配置
options={
    # 寫kafka配置信息
    # 指定kafka的連接的broker服務(wù)節(jié)點(diǎn)信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主題
    'subscribe': 'itheima'  # 讀取的主題不存在會(huì)自動(dòng)創(chuàng)建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必須將value轉(zhuǎn)化為string類型

# 計(jì)算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')

# 輸出
# todo 注意三:輸出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()

到了這里,關(guān)于Spark讀取kafka(流式和批數(shù)據(jù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Spark 數(shù)據(jù)讀取保存

    Spark 的數(shù)據(jù)讀取及數(shù)據(jù)保存可以從兩個(gè)維度來作區(qū)分:文件格式以及文件系統(tǒng): 文件格式: Text 文件、 Json 文件、 csv 文件、 Sequence 文件以及 Object 文件 文件系統(tǒng):本地文件系統(tǒng)、 HDFS、Hbase 以及數(shù)據(jù)庫(kù) text/hdfs 類型的文件讀都可以用 textFile(path) ,保存使用 saveAsTextFile(path)

    2024年02月09日
    瀏覽(20)
  • Spark連接Hive讀取數(shù)據(jù)

    Spark連接Hive讀取數(shù)據(jù)

    ????????Ubuntu 16.04 LTS ????????ubuntu-16.04.6-desktop-i386.iso? ????????spark-3.0.0-bin-without-hadoop.tgz?? ????????hadoop-3.1.3.tar.gz ????????apache-hive-3.1.2-bin.tar.gz ????????spark-hive_2.12-3.2.2.jar ????????openjdk 1.8.0_292 ????????mysql-connector-java-5.1.40.tar.gz? ???????

    2024年02月01日
    瀏覽(22)
  • spark讀取數(shù)據(jù)寫入hive數(shù)據(jù)表

    目錄 spark 讀取數(shù)據(jù) spark從某hive表選取數(shù)據(jù)寫入另一個(gè)表的一個(gè)模板 概述: create_tabel建表函數(shù),定義日期分區(qū) 刪除原有分區(qū)drop_partition函數(shù) generate_data 數(shù)據(jù)處理函數(shù),將相關(guān)數(shù)據(jù)寫入定義的表中? 注: 關(guān)于 insert overwrite/into 中partition時(shí)容易出的分區(qū)報(bào)錯(cuò)問題:? 添加分區(qū)函數(shù)

    2024年01月19日
    瀏覽(22)
  • 【Spark大數(shù)據(jù)習(xí)題】習(xí)題_Spark SQL&&&Kafka&& HBase&&Hive

    PDF資源路徑-Spark1 PDF資源路徑-Spark2 一、填空題 1、Scala語言的特性包含面向?qū)ο缶幊?、函?shù)式編程的、靜態(tài)類型的、可擴(kuò)展的、可以交互操作的。 2、在Scala數(shù)據(jù)類型層級(jí)結(jié)構(gòu)的底部有兩個(gè)數(shù)據(jù)類型,分別是 Nothing和Null。 3、在Scala中,聲明變量的有var聲明變量和val聲明常

    2024年02月06日
    瀏覽(21)
  • Spark Doris Connector 可以支持通過 Spark 讀取 Doris 數(shù)據(jù)類型不兼容報(bào)錯(cuò)解決

    Spark Doris Connector 可以支持通過 Spark 讀取 Doris 數(shù)據(jù)類型不兼容報(bào)錯(cuò)解決

    doris版本: 1.2.8 Spark Connector for Apache Doris 版本: spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT spark版本:spark-3.3.1 Spark Doris Connector - Apache Doris 目前最新發(fā)布版本:?Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub 2.1、Spark Doris Connector概述 Spark Doris Connector 可

    2024年01月23日
    瀏覽(20)
  • 使用Spark SQL讀取阿里云OSS的數(shù)據(jù)

    創(chuàng)建一個(gè)table,并關(guān)聯(lián)OSS目錄路徑 如果數(shù)據(jù)文件是 Parquet 格式的,可以自動(dòng)推斷出表的schema,很方便。 這樣就可以使用sql語句讀取數(shù)據(jù)了。 首先創(chuàng)建一個(gè)關(guān)聯(lián)OSS目錄的 database : 現(xiàn)在就可以通過sql寫入數(shù)據(jù)到OSS了,如下:

    2024年02月02日
    瀏覽(29)
  • HDFS常用操作以及使用Spark讀取文件系統(tǒng)數(shù)據(jù)

    HDFS常用操作以及使用Spark讀取文件系統(tǒng)數(shù)據(jù)

    掌握在Linux虛擬機(jī)中安裝Hadoop和Spark的方法; 熟悉HDFS的基本使用方法; 掌握使用Spark訪問本地文件和HDFS文件的方法。 啟動(dòng)Hadoop,在HDFS中創(chuàng)建用戶目錄“/user/hadoop” 在Linux系統(tǒng)的本地文件系統(tǒng)的“/home/hadoop”目錄下新建一個(gè)文本文件test.txt,并在該文件中隨便輸入一些內(nèi)容,

    2024年04月22日
    瀏覽(25)
  • 大數(shù)據(jù)編程實(shí)驗(yàn)一:HDFS常用操作和Spark讀取文件系統(tǒng)數(shù)據(jù)

    大數(shù)據(jù)編程實(shí)驗(yàn)一:HDFS常用操作和Spark讀取文件系統(tǒng)數(shù)據(jù)

    這是我們大數(shù)據(jù)專業(yè)開設(shè)的第二門課程——大數(shù)據(jù)編程,使用的參考書是《Spark編程基礎(chǔ)》,這門課跟大數(shù)據(jù)技術(shù)基礎(chǔ)是分開學(xué)習(xí)的,但這門課是用的我們自己在電腦上搭建的虛擬環(huán)境進(jìn)行實(shí)驗(yàn)的,不是在那個(gè)平臺(tái)上,而且搭建的還是偽分布式,這門課主要偏向于有關(guān)大數(shù)據(jù)

    2024年04月10日
    瀏覽(26)
  • Spark Streaming + Kafka構(gòu)建實(shí)時(shí)數(shù)據(jù)流

    Spark Streaming + Kafka構(gòu)建實(shí)時(shí)數(shù)據(jù)流

    1. 使用Apache Kafka構(gòu)建實(shí)時(shí)數(shù)據(jù)流 參考文檔鏈接:https://cloud.tencent.com/developer/article/1814030 2. 數(shù)據(jù)見UserBehavior.csv 數(shù)據(jù)解釋:本次實(shí)戰(zhàn)用到的數(shù)據(jù)集是CSV文件,里面是一百零四萬條淘寶用戶行為數(shù)據(jù),該數(shù)據(jù)來源是阿里云天池公開數(shù)據(jù)集 根據(jù)這一csv文檔運(yùn)用Kafka模擬實(shí)時(shí)數(shù)據(jù)流,

    2024年02月12日
    瀏覽(33)
  • 【大數(shù)據(jù)技術(shù)】Spark+Flume+Kafka實(shí)現(xiàn)商品實(shí)時(shí)交易數(shù)據(jù)統(tǒng)計(jì)分析實(shí)戰(zhàn)(附源碼)

    【大數(shù)據(jù)技術(shù)】Spark+Flume+Kafka實(shí)現(xiàn)商品實(shí)時(shí)交易數(shù)據(jù)統(tǒng)計(jì)分析實(shí)戰(zhàn)(附源碼)

    需要源碼請(qǐng)點(diǎn)贊關(guān)注收藏后評(píng)論區(qū)留言私信~~~ 1)Kafka 是一個(gè)非常通用的系統(tǒng),你可以有許多生產(chǎn)者和消費(fèi)者共享多個(gè)主題Topics。相比之下,F(xiàn)lume是一個(gè)專用工具被設(shè)計(jì)為旨在往HDFS,HBase等發(fā)送數(shù)據(jù)。它對(duì)HDFS有特殊的優(yōu)化,并且集成了Hadoop的安全特性。如果數(shù)據(jù)被多個(gè)系統(tǒng)消

    2024年02月03日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包