spark讀取kafka(批數(shù)據(jù)處理)
# 按照偏移量讀取kafka數(shù)據(jù)
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# spark讀取kafka
options = {
# 寫kafka配置信息
# 指定kafka的連接的broker服務(wù)節(jié)點(diǎn)信息
'kafka.bootstrap.servers': 'node1:9092',
# 指定主題
'subscribe': 'itcast',# 讀取的主題不存在會(huì)自動(dòng)創(chuàng)建
# todo 注意一:連接的配置
# 主題名稱 ,分區(qū)編號(hào),偏移量
# 指定起始偏移量 {主題名稱:{分區(qū)編號(hào)0:偏移量,分區(qū)編號(hào)1:偏移量....}}
'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,
# 指定結(jié)束偏移量 {主題名稱:{分區(qū)編號(hào)0:偏移量,分區(qū)編號(hào)1:偏移量....}}
'endingOffsets':""" {"itcast":{"0":3,"1":2}} """
# 注意點(diǎn) : 偏移量的區(qū)間是左閉右開 ,結(jié)束偏移的指定按照最大偏移量加一 ,所有分區(qū)都要指定
}
# 讀取
# format 指定讀取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:這一步的數(shù)據(jù)處理(將value轉(zhuǎn)化為字符串類型)是必須做的,不然你看不懂?dāng)?shù)據(jù)。
# 可以用df.的方式,那我后來怎么都沒怎么見過了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df數(shù)據(jù)
# todo 注意三:這里使用.show()的方式的,是因?yàn)樗怯薪绫?df_select.show()
文章來源:http://www.zghlxwxcb.cn/news/detail-811711.html
spark讀取kafka(流數(shù)據(jù)處理)
文章來源地址http://www.zghlxwxcb.cn/news/detail-811711.html
# 流式讀取kafka數(shù)據(jù)
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# todo 注意一:定義kafka的連接配置
options={
# 寫kafka配置信息
# 指定kafka的連接的broker服務(wù)節(jié)點(diǎn)信息
'kafka.bootstrap.servers': 'node1:9092',
# 指定主題
'subscribe': 'itheima' # 讀取的主題不存在會(huì)自動(dòng)創(chuàng)建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必須將value轉(zhuǎn)化為string類型
# 計(jì)算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')
# 輸出
# todo 注意三:輸出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()
到了這里,關(guān)于Spark讀取kafka(流式和批數(shù)據(jù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!