# cording:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType
import pyspark.sql.functions as F
if __name__ == '__main__':
# 0.構(gòu)建執(zhí)行環(huán)境入口對(duì)象SparkSession
spark = SparkSession.builder.\
appName('movie_demo').\
master('local[*]').\
getOrCreate()
sc = spark.sparkContext
# 1.讀取文件
schema = StructType().add('user_id', StringType(), nullable=True). \
add('movie_id', IntegerType(), nullable=True).\
add('rank', IntegerType(), nullable=True).\
add('ts', StringType(), nullable=True)
df = spark.read.format('csv').\
option('sep', '\t').\
option('header', False).\
option('encoding', 'utf-8').\
schema(schema=schema).\
load('../input/u.data')
# TODO 1:用戶平均分
df.groupBy('user_id').\
avg('rank').\
withColumnRenamed('avg(rank)', 'avg_rank').\
withColumn('avg_rank', F.round('avg_rank', 2)).\
orderBy('avg_rank', ascending=False).\
show()
# TODO 2:電影的平均分查詢
df.createTempView('movie')
spark.sql('''
SELECT movie_id, ROUND(AVG(rank),2) as avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
''').show()
# TODO 3:查詢大于平均分的電影數(shù)量
print('大于平均分電影數(shù)量為:', df.where(df['rank'] > df.select(F.avg('rank')).first()['avg(rank)']).count())
# TODO 4:查詢高分電影中(>3)打分次數(shù)最多的用戶,此人打分的平均分
# 找出打分次數(shù)最多的人
user_id = df.where('rank>3').\
groupBy('user_id').\
count(). \
withColumnRenamed('count', 'cnt').\
orderBy('cnt', ascennding=False).\
limit(1).\
first()['user_id']
# 算平均分
df.filter(df['user_id'] == user_id).\
select(F.round(F.avg('rank'), 2)).show()
# TODO 5: 查詢每個(gè)用戶的平均分打分,最低打分,最高打分
df.groupBy('user_id').\
agg(
F.round(F.avg('rank'), 2).alias('avg_rank'),
F.min('rank').alias('min_rank'),
F.max('rank').alias('max_rank')
).show()
# TODO 6:查詢?cè)u(píng)分超過100次的電影的平均分 排名TOP10
df.groupBy('movie_id').\
agg(
F.round(F.count('movie_id'),2).alias('cnt'),
F.round(F.avg('rank'),2).alias('avg_rank')
).\
where('cnt > 100').\
orderBy('avg_rank', ascending=False).\
limit(10).\
show()
'''
1.agg:它是GroupedData對(duì)象的API,作用是:在里面可以寫多個(gè)聚合
2.alias:它是Column對(duì)象的API,可以針對(duì)一個(gè)列進(jìn)行改名
3.withColumnRenamed:它是DataFrame的API,可以對(duì)DF中的列進(jìn)行改名,一次改一個(gè)列,改多個(gè)列可以鏈?zhǔn)秸{(diào)用
4.orderBy:DataFrame的API,進(jìn)行排序,參數(shù)1是被排序的列,參數(shù)2是 升序(True)或降序(False)
5.first:DataFrame的API,取出DF的第一行數(shù)據(jù),返回值結(jié)果是Row對(duì)象
## Row對(duì)象:就是一個(gè)數(shù)組,可以通過row['列名']來取出當(dāng)前行中,某一列具體數(shù)值,返回值不再是DF 或者GroupedData 或者Column 而是具體的值(字符串、數(shù)字等)
'''
1.
2.
3.
4.
5.
6.文章來源:http://www.zghlxwxcb.cn/news/detail-718914.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-718914.html
到了這里,關(guān)于電影評(píng)分?jǐn)?shù)據(jù)分析案例-Spark SQL的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!