結(jié)構(gòu)化流介紹
有界和無(wú)界數(shù)據(jù)
- 有界數(shù)據(jù):
指的數(shù)據(jù)有固定的開(kāi)始和固定的結(jié)束,數(shù)據(jù)大小是固定。我們稱(chēng)之為有界數(shù)據(jù)。對(duì)于有界數(shù)據(jù),一般采用批處理方案(離線計(jì)算)
特點(diǎn):
1-數(shù)據(jù)大小是固定
2-程序處理有界數(shù)據(jù),程序最終一定會(huì)停止
- 無(wú)界數(shù)據(jù):
指的數(shù)據(jù)有固定的開(kāi)始,但是沒(méi)有固定的結(jié)束。我們稱(chēng)之為無(wú)界數(shù)據(jù)
對(duì)于無(wú)界數(shù)據(jù),我們一般采用流式處理方案(實(shí)時(shí)計(jì)算)
特點(diǎn):
1-數(shù)據(jù)沒(méi)有明確的結(jié)束,也就是數(shù)據(jù)大小不固定
2-數(shù)據(jù)是源源不斷的過(guò)來(lái)
3-程序處理無(wú)界數(shù)據(jù),程序會(huì)一直運(yùn)行不會(huì)結(jié)束
基本介紹
結(jié)構(gòu)化流是構(gòu)建在Spark SQL處理引擎之上的一個(gè)流式的處理引擎,主要是針對(duì)無(wú)界數(shù)據(jù)的處理操作。對(duì)于結(jié)構(gòu)化流同樣也支持多種語(yǔ)言操作的API:比如 Python Java Scala SQL …
Spark的核心是RDD。RDD出現(xiàn)主要的目的就是提供更加高效的離線的迭代計(jì)算操作,RDD是針對(duì)的有界的數(shù)據(jù)集,但是為了能夠兼容實(shí)時(shí)計(jì)算的處理場(chǎng)景,提供微批處理模型,本質(zhì)上還是批處理,只不過(guò)批與批之間的處理間隔時(shí)間變短了,讓我們感覺(jué)是在進(jìn)行流式的計(jì)算操作,目前默認(rèn)的微批可以達(dá)到100毫秒一次
? 真正的流處理引擎: Flink、Storm(早期流式處理引擎)、Flume(流式數(shù)據(jù)采集)
實(shí)時(shí)數(shù)據(jù)案例–詞頻統(tǒng)計(jì)
需求:
代碼實(shí)現(xiàn):
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對(duì)象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('structured_streaming_wordcount')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
# 3- 數(shù)據(jù)處理
result_df = init_df.select(
F.explode(F.split('value',' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt')
)
# init_df.show()
# 4- 數(shù)據(jù)輸出
# 5- 啟動(dòng)流式任務(wù)
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
程序運(yùn)行結(jié)果:
代碼測(cè)試操作步驟:
首先: 先下載一個(gè) nc(netcat) 命令. 通過(guò)此命令打開(kāi)一個(gè)端口號(hào), 并且可以向這個(gè)端口寫(xiě)入數(shù)據(jù)
yum -y install nc
執(zhí)行nc命令, 開(kāi)啟端口號(hào), 寫(xiě)入數(shù)據(jù):
nc -lk 55555
注意: 要先啟動(dòng)nc,再啟動(dòng)我們的程序
查看端口號(hào)是否被使用命令:
netstat -nlp | grep 要查詢的端口
可能遇到的錯(cuò)誤:
結(jié)構(gòu)化流的編程模型
數(shù)據(jù)結(jié)構(gòu)
在結(jié)構(gòu)化流中,我們可以將DataFrame稱(chēng)為無(wú)界的DataFrame或者無(wú)界的二維表
數(shù)據(jù)源部分
結(jié)構(gòu)化流默認(rèn)提供了多種數(shù)據(jù)源,從而可以支持不同的數(shù)據(jù)源的處理工作。目前提供了如下數(shù)據(jù)源:
- Socket Source:網(wǎng)絡(luò)套接字?jǐn)?shù)據(jù)源,一般用于測(cè)試。也就是從網(wǎng)絡(luò)上消費(fèi)/讀取數(shù)據(jù)
- File Source:文件數(shù)據(jù)源。讀取文件系統(tǒng),一般用于測(cè)試。如果文件夾下發(fā)生變化,有新文件產(chǎn)生,那么就會(huì)觸發(fā)程序的運(yùn)行
- Kafka Source:Kafka數(shù)據(jù)源。也就是作為消費(fèi)者來(lái)讀取Kafka中的數(shù)據(jù)。一般用于生產(chǎn)環(huán)境。
- Rate Source:速率數(shù)據(jù)源。一般用于測(cè)試。通過(guò)配置參數(shù),由結(jié)構(gòu)化流自動(dòng)生成測(cè)試數(shù)據(jù)。## Operation操作
對(duì)應(yīng)官網(wǎng)文檔內(nèi)容:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources
File Source
將目錄中寫(xiě)入的文件作為數(shù)據(jù)流讀取,支持的文件格式為:text、csv、json、orc、parquet…
相關(guān)的參數(shù):
option參數(shù) | 描述說(shuō)明 |
---|---|
maxFilesPerTrigger | 每次觸發(fā)時(shí)要考慮的最大新文件數(shù) (默認(rèn): no max) |
latestFirst | 是否先處理最新的新文件, 當(dāng)有大量文件積壓時(shí)有用 (默認(rèn): false) |
fileNameOnly | 是否檢查新文件只有文件名而不是完整路徑(默認(rèn)值:false)將此設(shè)置為 true 時(shí),以下文件將被視為同一個(gè)文件,因?yàn)樗鼈兊奈募癲ataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” |
讀取代碼通用格式:
sparksession.readStream
.format('CSV|JSON|Text|Parquet|ORC...')
.option('參數(shù)名1','參數(shù)值1')
.option('參數(shù)名2','參數(shù)值2')
.option('參數(shù)名N','參數(shù)值N')
.schema(元數(shù)據(jù)信息)
.load('需要監(jiān)聽(tīng)的目錄地址')
針對(duì)具體數(shù)據(jù)格式,還有對(duì)應(yīng)的簡(jiǎn)寫(xiě)API格式,例如:
sparksession.readStream.csv(path='需要監(jiān)聽(tīng)的目錄地址',schema=元數(shù)據(jù)信息。。。)
代碼操作
import os
from pyspark.sql import SparkSession
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對(duì)象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions","1")\
.appName('file_source')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入:File Source文件數(shù)據(jù)源
"""
File Source總結(jié)
1- 只能監(jiān)聽(tīng)目錄,不能監(jiān)聽(tīng)具體的文件
2- 可以通過(guò)*通配符的形式監(jiān)聽(tīng)目錄中滿足條件的文件
3- 如果監(jiān)聽(tīng)目錄中有子目錄,那么無(wú)法監(jiān)聽(tīng)到子目錄的變化情況
"""
init_df = spark.readStream.csv(
path="file:///export/data/",
sep=",",
encoding="UTF-8",
schema="id int,name string"
)
# 3- 數(shù)據(jù)處理
# 4- 數(shù)據(jù)輸出
# 5- 啟動(dòng)流式任務(wù)
init_df.writeStream.format("console").outputMode("append").start().awaitTermination()
可能遇到的錯(cuò)誤一:
原因: 如果是文件數(shù)據(jù)源,需要手動(dòng)指定schema信息
可能遇到的錯(cuò)誤二:
原因: File source只能監(jiān)聽(tīng)目錄,不能監(jiān)聽(tīng)具體文件
文件數(shù)據(jù)源特點(diǎn):
1- 不能夠監(jiān)聽(tīng)具體的文件,否則會(huì)報(bào)錯(cuò)誤java.lang.IllegalArgumentException: Option 'basePath' must be a directory
2- 可以通過(guò)通配符的形式,來(lái)監(jiān)聽(tīng)目錄下的文件,符合要求的才會(huì)被讀取
3- 如果監(jiān)聽(tīng)目錄中有子目錄,那么無(wú)法監(jiān)聽(tīng)到子目錄的變化情況
Operations操作
指的是數(shù)據(jù)處理部分,該操作和Spark SQL中是完全一致??梢允褂肧QL方式進(jìn)行處理,也可以使用DSL方式進(jìn)行處理。
Sink輸出操作
在結(jié)構(gòu)化流中定義好DataFrame或者處理好DataFrame之后,調(diào)用writeStream()方法完成數(shù)據(jù)的輸出操作。在輸出的過(guò)程中,我們可以設(shè)置一些相關(guān)的屬性,然后啟動(dòng)結(jié)構(gòu)化流程序運(yùn)行。
輸出模式
在進(jìn)行數(shù)據(jù)輸出的時(shí)候,必須通過(guò)outputMode來(lái)設(shè)置輸出模式。輸出模式提供了3種不同的模式:
-
1- append模式:增量模式
特點(diǎn):當(dāng)結(jié)構(gòu)化程序處理數(shù)據(jù)的時(shí)候,如果有了新數(shù)據(jù),才會(huì)觸發(fā)執(zhí)行。而且該模式只支持追加。不支持?jǐn)?shù)據(jù)處理階段有聚合的操作。如果有了聚合操作,直接報(bào)錯(cuò)。而且也不支持排序操作。如果有了排序,直接報(bào)錯(cuò)。
-
2- complete模式:完全(全量)模式
特點(diǎn):當(dāng)結(jié)構(gòu)化程序處理數(shù)據(jù)的時(shí)候,每一次都是針對(duì)全量的數(shù)據(jù)進(jìn)行處理。由于數(shù)據(jù)越來(lái)越多,所以在數(shù)據(jù)處理階段,必須要有聚合操作。如果沒(méi)有聚合操作,直接報(bào)錯(cuò)。另外還支持排序,但是不是強(qiáng)制要求。
-
3- update模式:更新模式
特點(diǎn):支持聚合操作。當(dāng)結(jié)構(gòu)化程序處理數(shù)據(jù)的時(shí)候,如果處理階段沒(méi)有聚合操作,該模式效果和append模式是一致。如果有了聚合操作,只會(huì)輸出有變化和新增的內(nèi)容。但是不支持排序操作,如果有了排序,直接報(bào)錯(cuò)。
append模式:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對(duì)象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('structured_streaming_wordcount')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
init_df.createTempView("tmp_table")
# 3- 數(shù)據(jù)處理
# 正常:沒(méi)有聚合操作,也沒(méi)有排序
result_df = spark.sql("""
select
explode(split(value,' ')) as word
from tmp_table
""")
# 異常:有聚合操作,沒(méi)有排序
# result_df = spark.sql("""
# select
# word,count(1) as cnt
# from (
# select
# explode(split(value,' ')) as word
# from tmp_table
# )
# group by word
# """)
# 異常:沒(méi)有聚合操作,有排序
# result_df = spark.sql("""
# select
# word
# from (
# select
# explode(split(value,' ')) as word
# from tmp_table
# )
# order by word
# """)
# 4- 數(shù)據(jù)輸出
# 5- 啟動(dòng)流式任務(wù)
result_df.writeStream.format('console').outputMode('append').start().awaitTermination()
如果有了聚合操作,會(huì)報(bào)如下錯(cuò)誤:
如果有了排序操作,會(huì)報(bào)如下錯(cuò)誤:
complete模式:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對(duì)象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('structured_streaming_wordcount')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
init_df.createTempView("tmp_table")
# 3- 數(shù)據(jù)處理
# 異常:沒(méi)有聚合操作
# result_df = spark.sql("""
# select
# explode(split(value,' ')) as word
# from tmp_table
# """)
# 正常:有聚合操作,沒(méi)有排序
result_df = spark.sql("""
select
word,count(1) as cnt
from (
select
explode(split(value,' ')) as word
from tmp_table
)
group by word
order by cnt
""")
# 4- 數(shù)據(jù)輸出
# 5- 啟動(dòng)流式任務(wù)
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
如果沒(méi)有聚合操作,會(huì)報(bào)如下錯(cuò)誤:
update模式:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-798844.html
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 綁定指定的Python解釋器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 創(chuàng)建SparkSession對(duì)象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('structured_streaming_wordcount')\
.master('local[*]')\
.getOrCreate()
# 2- 數(shù)據(jù)輸入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
init_df.createTempView("tmp_table")
# 3- 數(shù)據(jù)處理
# 正常:沒(méi)有聚合操作
result_df = spark.sql("""
select
explode(split(value,' ')) as word
from tmp_table
""")
# 正常:有聚合操作,沒(méi)有排序
# result_df = spark.sql("""
# select
# word,count(1) as cnt
# from (
# select
# explode(split(value,' ')) as word
# from tmp_table
# )
# group by word
# """)
# 異常:有排序
result_df = spark.sql("""
select
word
from (
select
explode(split(value,' ')) as word
from tmp_table
)
order by word
""")
# 4- 數(shù)據(jù)輸出
# 5- 啟動(dòng)流式任務(wù)
result_df.writeStream.format('console').outputMode('update').start().awaitTermination()
如果有了排序操作,會(huì)報(bào)如下錯(cuò)誤:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-798844.html
到了這里,關(guān)于結(jié)構(gòu)化流(Structured Streaming)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!