頭歌的大數(shù)據(jù)作業(yè),答案沒找著,遂自己整了一份
第1關(guān):SparkSql 數(shù)據(jù)清洗
任務(wù)描述
本關(guān)任務(wù):將出租車軌跡數(shù)據(jù)規(guī)整化,清洗掉多余的字符串。
相關(guān)知識(shí)
為了完成本關(guān)任務(wù),你需要掌握:1. 如何使用 SparkSQL 讀取 CSV 文件,2. 如何使用正則表達(dá)式清洗掉多余字符串。
編程要求
在右側(cè)編輯器補(bǔ)充代碼,將出租車軌跡數(shù)據(jù)規(guī)整化,清洗掉多余的字符串,并使用 DataFrame.show() 打印輸出。
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ =='__main__':
spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
#**********begin**********#
df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")
df.createTempView("data")
spark.sql("""
select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,
regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,
regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,
regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,
regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,
regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,
regexp_replace(POLYLINE,'\\\W+','') as POLYLINE
from data
""").show()
#**********end**********#
spark.stop()
第2關(guān):SparkSql數(shù)據(jù)分析
任務(wù)描述
本關(guān)任務(wù):使用 SparkSQL 完成數(shù)據(jù)分析。文章來源:http://www.zghlxwxcb.cn/news/detail-770451.html
相關(guān)知識(shí)
為了完成本關(guān)任務(wù),你需要掌握:如何使用 SparkSQL 進(jìn)行數(shù)據(jù)分析文章來源地址http://www.zghlxwxcb.cn/news/detail-770451.html
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
import json
if __name__ == '__main__' :
spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
#**********begin**********#
df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data2.csv")
df.createTempView("data")
spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL, TAXI_ID, ORIGIN_STAND, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show()
spark.udf.register("timeLen", lambda x: {
(len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 8
})
spark.udf.register("startLocation", lambda x: {
str(json.loads(x)[0]) if len(json.loads(x)) > 0 else ""
})
spark.udf.register( "endLocation", lambda x: {
str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else ""
})
df.createTempView("data2")
res=spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME, POLYLINE, timeLen(POLYLINE) as TIMELEN, startLocation(POLYLINE) as STARTLOCATION, endLocation(POLYLINE) as ENDLOCATION from data2")
res.createTempView("data3")
res.show()
spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data3 group by TIME,CALL_TYPE order by CALL_TYPE,TIME").show()
#**********end**********#
到了這里,關(guān)于企業(yè)spark案例 —— 出租車軌跡分析(Python)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!