- 研究背景與方案
1.1.研究背景
在大數(shù)據(jù)時(shí)代背景下,各行業(yè)數(shù)據(jù)的規(guī)模大幅度增加,數(shù)據(jù)類別日益復(fù)雜,給數(shù)據(jù)分析工作帶來極大挑戰(zhàn)。氣象行業(yè)和人們的生活息息相關(guān),隨著信息時(shí)代的發(fā)展,大數(shù)據(jù)技術(shù)的出現(xiàn)為氣象數(shù)據(jù)的發(fā)展帶來機(jī)遇?;诖?,本項(xiàng)目使用Spark等大數(shù)據(jù)處理工具,采用機(jī)器學(xué)習(xí)、深度學(xué)習(xí)等多種數(shù)據(jù)分析方法,并借助可視化手段將多種類型數(shù)據(jù)與復(fù)雜數(shù)據(jù)進(jìn)行解讀與概括,探究大數(shù)據(jù)技術(shù)在氣象數(shù)據(jù)中的應(yīng)用,給受眾傳遞更有價(jià)值的信息,進(jìn)而有助于提升社會整體生產(chǎn)效率,推動市場經(jīng)濟(jì)的有效發(fā)展。
1.2.研究方案
一、選用合適的數(shù)據(jù)集。歷史天氣數(shù)據(jù)可以通過爬取氣象網(wǎng)站獲得,這種方法的優(yōu)點(diǎn)在于可以靈活選擇自己想要的數(shù)據(jù),缺點(diǎn)在于耗時(shí)較長、可能遇到反爬機(jī)制,此外氣象網(wǎng)站的天氣數(shù)據(jù)往往特征維數(shù)不高,不適于機(jī)器學(xué)習(xí)、深度學(xué)習(xí)等任務(wù)。因此決定使用爬取的數(shù)據(jù)進(jìn)行基于Spark的數(shù)據(jù)分析及可視化,選用數(shù)據(jù)競賽網(wǎng)站Kaggle的更大更高維的數(shù)據(jù)進(jìn)行基于學(xué)習(xí)的任務(wù)。
二、工具與環(huán)境配置。由于虛擬機(jī)受限于硬件,難以完成較大規(guī)模數(shù)據(jù)的分析任務(wù),于是決定在本地Windows操作系統(tǒng)下部署相關(guān)大數(shù)據(jù)環(huán)境(Hadoop、Spark、深度學(xué)習(xí)框架等)。
三、選用合適的學(xué)習(xí)算法。不同類型的數(shù)據(jù)適合于不同的學(xué)習(xí)算法,天氣數(shù)據(jù)的特點(diǎn)是存在趨勢、周期性、節(jié)令性等規(guī)律,而由于循環(huán)神經(jīng)網(wǎng)絡(luò)RNN具有記憶能力,因此它對于處理時(shí)間序列數(shù)據(jù)非常有效。在訓(xùn)練過程中,網(wǎng)絡(luò)會根據(jù)歷史數(shù)據(jù)動態(tài)地更新權(quán)重,從而使得網(wǎng)絡(luò)能夠適應(yīng)不同的數(shù)據(jù)分布和趨勢。同時(shí),RNN通過考慮前后時(shí)刻之間的依賴性,使得更容易捕獲上述數(shù)據(jù)的特征,并可以自適應(yīng)地更新模型以處理噪聲和異常值的影響。因此選用RNN及其改進(jìn)(LSTM等)完成深度學(xué)習(xí)任務(wù)。
2.數(shù)據(jù)集介紹
2.1爬取中央氣象臺網(wǎng)站天氣數(shù)據(jù)
2.1.1.實(shí)驗(yàn)數(shù)據(jù)介紹
本次實(shí)驗(yàn)所采用的數(shù)據(jù)從中央氣象臺官方網(wǎng)站(關(guān)注陰晴冷暖,氣象一直為你)爬取,主要是最近24小時(shí)各個(gè)城市的天氣數(shù)據(jù),包括時(shí)間點(diǎn)(整點(diǎn))、整點(diǎn)氣溫、整點(diǎn)降水量、風(fēng)力、整點(diǎn)氣壓、相對濕度等,總數(shù)據(jù)量達(dá)到58368條。正常情況下,每個(gè)城市會對應(yīng)24條數(shù)據(jù)(每個(gè)整點(diǎn)一條)。有部分城市部分時(shí)間點(diǎn)數(shù)據(jù)存在缺失或異常。
2.1.2.數(shù)據(jù)獲取
2.1.2.1.觀察數(shù)據(jù)獲取方式
打開中央氣象臺官方網(wǎng)站,任意點(diǎn)擊“熱點(diǎn)城市”中的一個(gè)城市。打開瀏覽器的Web控制臺。通過切換“省份”和“城市”,我們可以發(fā)現(xiàn)網(wǎng)頁中的數(shù)據(jù)是以json字符串格式異步地從服務(wù)器傳送。可以發(fā)現(xiàn)以下數(shù)據(jù)和請求URL的關(guān)系:
表1 數(shù)據(jù)和請求URL的關(guān)系
|
由于省份三位編碼(如福建省編碼為ABJ)需要從省份數(shù)據(jù)獲得中獲得,城市編號需要從城市數(shù)據(jù)獲得(如福州市編號為58847),所以為了獲得各個(gè)城市最近24小時(shí)整點(diǎn)天氣數(shù)據(jù),依次爬取省份數(shù)據(jù)、城市數(shù)據(jù)、最近24小時(shí)整點(diǎn)數(shù)據(jù)。
2.1.2.2.數(shù)據(jù)爬取
由于可以直接通過訪問請求URL,傳回的響應(yīng)的數(shù)據(jù)部分即是json格式的數(shù)據(jù),所以只需要調(diào)用python的urllib2庫中相關(guān)函數(shù),對上述URL進(jìn)行請求即可。不需要像平常爬取HTML網(wǎng)頁時(shí)還需要對網(wǎng)頁源碼進(jìn)行解析,查找相關(guān)數(shù)據(jù)。唯一需要注意的是,有些城市可能不存在或者全部缺失最近24小時(shí)整點(diǎn)數(shù)據(jù),需要進(jìn)行過濾,以免出錯。
2.1.2.3數(shù)據(jù)存儲
雖然上一步獲取的json數(shù)據(jù)可以直接存儲并可使用SparkSession直接讀取,但是為了方便觀察數(shù)據(jù)結(jié)構(gòu)、辨識異常數(shù)據(jù)、對數(shù)據(jù)增加部分提示信息,爬取后的數(shù)據(jù)進(jìn)行了一些處理之后,保存成了csv格式,包括省份數(shù)據(jù)(province.csv)、城市數(shù)據(jù)(city.csv)、各個(gè)城市最近24小時(shí)整點(diǎn)天氣數(shù)據(jù)(passed_weather_ALL.csv)。由于所有城市過去24小時(shí)整點(diǎn)天氣數(shù)據(jù)數(shù)量太多,為了避免內(nèi)存不足,每爬取50個(gè)城市的數(shù)據(jù)后,就會進(jìn)行一次保存。
3.實(shí)驗(yàn)環(huán)境搭建
3.1.Windows安裝Hadoop
3.1.1.Hadoop簡介
Apache Hadoop是一款支持?jǐn)?shù)據(jù)密集型分布式應(yīng)用程序并以Apache 2.0許可協(xié)議發(fā)布的開源軟件框架,有助于使用許多計(jì)算機(jī)組成的網(wǎng)絡(luò)來解決數(shù)據(jù)、計(jì)算密集型的問題?;?/span>MapReduce計(jì)算模型,它為大數(shù)據(jù)的分布式存儲與處理提供了一個(gè)軟件框架。所有的Hadoop模塊都有一個(gè)基本假設(shè),即硬件故障是常見情況,應(yīng)該由框架自動處理。
Apache Hadoop的核心模塊分為存儲和計(jì)算模塊,前者被稱為Hadoop分布式文件系統(tǒng)(HDFS),后者即MapReduce計(jì)算模型。Hadoop框架先將文件分成數(shù)據(jù)塊并分布式地存儲在集群的計(jì)算節(jié)點(diǎn)中,接著將負(fù)責(zé)計(jì)算任務(wù)的代碼傳送給各節(jié)點(diǎn),讓其能夠并行地處理數(shù)據(jù)。這種方法有效利用了數(shù)據(jù)局部性,令各節(jié)點(diǎn)分別處理其能夠訪問的數(shù)據(jù)。與傳統(tǒng)的超級計(jì)算機(jī)架構(gòu)相比,這使得數(shù)據(jù)集的處理速度更快、效率更高。
3.1.2.安裝Java開發(fā)環(huán)境
打開cmd窗口并輸入java -version,若能成功顯示Java JDK的版本號則代表java環(huán)境已安裝成功,否則需要安裝Java JDK。
3.1.3.下載安裝Hadoop所需要的文件
Hadoop3.1.1版本的安裝包:https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.0.tar.gz。Windows環(huán)境安裝所需的bin:https://github.com/s911415/apache-hadoop-3.1.1-winutils。Hadoop是在Linux下編寫的,winutil主要用于模擬Linux下的目錄環(huán)境。所以Hadoop放在Windows下運(yùn)行的時(shí)候,需要這個(gè)輔助程序才能運(yùn)行。
3.1.4.替換原安裝包的bin目錄
hadoop-3.1.1解壓后文件夾的路徑為D:\hadoop-3.1.1。apache-hadoop-3.1.1-winutils-master這個(gè)文件夾解壓后里面只有bin這一個(gè)文件夾,我們將這個(gè)bin文件夾復(fù)制到hadoop-3.1.1文件夾中替換原有的bin文件夾。
3.1.5.配置Hadoop環(huán)境變量
新建系統(tǒng)變量,變量名填HADOOP_HOME,變量值填hadoop-3.1.1對應(yīng)的路徑(比如我的是D:\hadoop-3.1.1。順便可以檢查JAVA_HOME有沒有配置好,后面會用到)。然后點(diǎn)擊Path變量進(jìn)行編輯,在最前面加上%HADOOP_HOME%\bin。
3.1.6.檢查環(huán)境變量是否配置成功
配置好環(huán)境變量后,win+R 輸入cmd打開命令提示符,然后輸入hadoop version,按回車,如果出現(xiàn)如圖所示版本號,則說明安裝成功。
3.1.7.配置Hadoop的配置文件
3.1.7.1.配置core-site.xml文件
代碼 2 配置core-site.xml文件
<configuration> ?????????????? <property> ?????????????????????????????? <name>fs.defaultFS</name> ?????????????????????????????? <value>hdfs://localhost:900</value> ?????????????? </property> </configuration> |
3.1.7.2.配置mapred-site.xml文件
代碼 3 配置mapred-site.xml文件
<configuration>?? ?????????????? <property>?????? ?????????????? <name>mapreduce.framework.name</name>?????? ?????????????? <value>yarn</value>?? ?????????????? </property> </configuration> |
3.1.7.3.配置yarn-site.xml文件
代碼 4 配置yarn-site.xml文件
<configuration> ?????????????? <property> ?????????????????????????????? <name>yarn.nodemanager.aux-services</name> ?????????????????????????????? <value>mapreduce_shuffle</value> ?????????????? </property> ?????????????? <property> ?????????????????????????????? <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name> ?????????????????????????????? <value>org.apache.hadoop.mapred.ShuffleHandler</value> ?????????????? </property> </configuration> |
3.1.7.4.配置hdfs-site.xml文件
在D:\hadoop-3.1.1創(chuàng)建data文件夾(也可以是別的名字,但后面配置要對應(yīng)修改),在data2020文件夾中(D:\hadoop-3.1.1\data)創(chuàng)建datanode和namenode文件夾。之后打開hdfs-site.xml文件:
代碼 5 配置hdfs-site.xml文件
<configuration> ?????????????? <property>?????? ?????????????? <name>dfs.replication</name>?????? ?????????????? <value>1</value>?? ?????????????? </property>?? ?????????????? <property>?????? ?????????????? <name>dfs.namenode.name.dir</name>?????? ?????????????? <value>D:\hadoop-3.1.1\data\namenode</value> ?????????????? </property>?? ?????????????? <property>?????? ?????????????? <name>dfs.datanode.data.dir</name>???? ?????????????? <value>D:\hadoop-3.1.1\data\datanode</value> ?????????????? </property> </configuration> |
3.1.7.5.配置hadoop-env.sh文件
打開hadoop-env.sh,使用查找功能(Ctrl+F)查找export JAVA_HOME,找到相應(yīng)的位置,在#export JAVA_HOME=下面一行配置自己電腦上對應(yīng)的JAVA_HOME/bin路徑(JAVA_HOME的具體路徑在環(huán)境變量中查找到)。
3.1.7.6.配置hadoop-env.cmd文件
打開hadoop-env.cmd文件后使用查找功能(Ctrl+F),輸入@rem The java implementation to use查找到對應(yīng)行,在set JAVA_HOME那一行將自己的JAVA_HOME路徑配置上去。
3.1.8.啟動Hadoop服務(wù)
使用管理員模式進(jìn)入sbin目錄,輸入start-all.cmd啟動Hadoop服務(wù):
接著在瀏覽器中訪問http://localhost:9870,如果成功出現(xiàn)以下界面則代表Hadoop安裝和配置完成:
3.2.Windows安裝Spark
3.2.1.Spark簡介
Spark最初由美國加州伯克利大學(xué)的AMP 實(shí)驗(yàn)室于2009年開發(fā),是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架,可用于構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序。 Spark于2013年加入Apache孵化器項(xiàng)目后發(fā)展迅猛,如今已成為Apache軟件基金會最重要的三大分布式計(jì)算系統(tǒng)開源項(xiàng)目之一(Hadoop、Spark、Storm)。值得一提的是,Spark在2014年打破了Hadoop保持的基準(zhǔn)排序記錄。Spark使用206個(gè)節(jié)點(diǎn),用時(shí)23分鐘完成了100TB數(shù)據(jù)的排序,而Hadoop使用了2000個(gè)節(jié)點(diǎn),耗時(shí)72分鐘,完成了100TB數(shù)據(jù)的排序。也就是說Spark用十分之一的計(jì)算資源,獲得了比Hadoop快3倍的速度。
Spark具有如下幾個(gè)主要特點(diǎn):運(yùn)行速度快:使用DAG執(zhí)行引擎以支持循環(huán)數(shù)據(jù)流與內(nèi)存計(jì)算;容易使用:支持使用Scala、Java、Python和R語言進(jìn)行編程,可以通過 Spark Shell進(jìn)行交互式編程;通用性:Spark提供了完整而強(qiáng)大的技術(shù)棧,包括SQL查詢、流式計(jì)算、機(jī)器學(xué)習(xí)和圖算法組件;運(yùn)行模式多樣:可運(yùn)行于獨(dú)立的集群模式中,可運(yùn)行于Hadoop中,也可運(yùn)行于Amazon EC2等云環(huán)境中,并且可以訪問HDFS、Cassandra、HBase和Hive等多種數(shù)據(jù)源。
傳統(tǒng)大數(shù)據(jù)處理框架Hadoop存在如下一些缺點(diǎn):表達(dá)能力有限;磁盤IO開銷大;延遲高;任務(wù)之間的銜接涉及IO開銷;在前一個(gè)任務(wù)執(zhí)行完成之前,其他任務(wù)就無法開始,難以勝任復(fù)雜、多階段的計(jì)算任務(wù)。Spark在借鑒Hadoop MapReduce優(yōu)點(diǎn)的同時(shí),很好地解決了MapReduce所面臨的問題。相比于Hadoop MapReduce,Spark主要具有如下優(yōu)點(diǎn):Spark的計(jì)算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種數(shù)據(jù)集操作類型,編程模型比Hadoop MapReduce更靈活;Spark提供了內(nèi)存計(jì)算,可將中間結(jié)果放到內(nèi)存中,對于迭代運(yùn)算效率更高;Spark基于DAG的任務(wù)調(diào)度執(zhí)行機(jī)制,要優(yōu)于Hadoop MapReduce的迭代執(zhí)行機(jī)制。
Spark的運(yùn)行架構(gòu)包括集群管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)。其中,集群管理器可以是Spark自帶的資源管理器,也可以是YARN或Mesos等資源管理框架。與Hadoop MapReduce計(jì)算框架相比,Spark所采用的Executor有兩個(gè)優(yōu)點(diǎn)。一是利用多線程來執(zhí)行具體的任務(wù)(HadoopMapReduce采用的是進(jìn)程模型),減少任務(wù)的啟動開銷。二是 Executor中有一個(gè)BlockManager存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,當(dāng)需要多輪迭代計(jì)算時(shí),可以將中間結(jié)果存儲到這個(gè)存儲模塊里,下次需要時(shí)就可以直接讀該存儲模塊里的數(shù)據(jù),而不需要讀寫到 HDFS 等文件系統(tǒng)里,因而有效減少了 IO 開銷;或者在交互式查詢場景下,Executor預(yù)先將表緩存到該存儲系統(tǒng)上,從而可以提高讀寫IO的性能。
Spark運(yùn)行基本流程如下圖所示,流程如下。
(1)當(dāng)一個(gè)Spark應(yīng)用被提交時(shí),首先需要為這個(gè)應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由任務(wù)控制節(jié)點(diǎn)(Driver)創(chuàng)建一個(gè)SparkContext,由SparkContext負(fù)責(zé)和資源管理器—Cluster Manager的通信,以及進(jìn)行資源的申請、任務(wù)的分配和監(jiān)控等。SparkContext 會向資源管理器注冊并申請運(yùn)行Executor的資源。
(2)資源管理器為Executor分配資源,并啟動Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上。
(3)SparkContext 根據(jù) RDD 的依賴關(guān)系構(gòu)建 DAG,并將 DAG 提交給 DAG 調(diào)度器(DAGScheduler)進(jìn)行解析,將DAG分解成多個(gè)“階段”(每個(gè)階段都是一個(gè)任務(wù)集),并且計(jì)算出各個(gè)階段之間的依賴關(guān)系,然后把一個(gè)個(gè)“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進(jìn)行處理;Executor 向 SparkContext 申請任務(wù),任務(wù)調(diào)度器將任務(wù)分發(fā)給 Executor 運(yùn)行,同時(shí)SparkContext將應(yīng)用程序代碼發(fā)放給Executor。
(4)任務(wù)在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器,然后反饋給DAG調(diào)度器,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源。
Spark的核心建立在統(tǒng)一的抽象RDD之上,這使得Spark的各個(gè)組件可以無縫地進(jìn)行集成,在同一個(gè)應(yīng)用程序中完成大數(shù)據(jù)計(jì)算任務(wù)。一個(gè)RDD就是一個(gè)分布式對象集合,本質(zhì)上是一個(gè)只讀的分區(qū)記錄集合。每個(gè)RDD可以分成多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,并且一個(gè) RDD 的不同分區(qū)可以被保存到集群中不同的節(jié)點(diǎn)上,從而可以在集群中的不同節(jié)點(diǎn)上進(jìn)行并行計(jì)算。RDD提供了一組豐富的操作以支持常見的數(shù)據(jù)運(yùn)算,分為“行動”(Action)和“轉(zhuǎn)換”(Transformation)兩種類型,前者用于執(zhí)行計(jì)算并指定輸出的形式,后者指定RDD之間的相互依賴關(guān)系。兩類操作的主要區(qū)別是,轉(zhuǎn)換操作(如map、filter、groupBy、join等)接受RDD并返回RDD,而行動操作(如count、collect等)接受RDD但是返回非RDD(即輸出一個(gè)值或結(jié)果)。RDD采用了惰性調(diào)用,即在RDD的執(zhí)行過程中,真正的計(jì)算發(fā)生在RDD的“行動”操作,對于“行動”之前的所有“轉(zhuǎn)換”操作,Spark只是記錄下“轉(zhuǎn)換”操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集以及RDD生成的軌跡,即相互之間的依賴關(guān)系,而不會觸發(fā)真正的計(jì)算。
3.2.2.下載Spark
官網(wǎng)下載Spark2.4.3版本:Spark Release 2.4.3 | Apache Spark。下載完解壓,和Hadoop3.1.1和winutils放在同一個(gè)盤或者目錄下,方便以后的管理。
3.2.3.配置Spark環(huán)境變量
同之前配置Hadoop一樣,需要配置SPARK_HOME和Path。
3.2.4.拷貝pyspark
進(jìn)入spark安裝目錄,將pyspark復(fù)制到Python環(huán)境的Lib\site-packages目錄下。
3.2.5.安裝py4j
Py4J 是一個(gè)用?Python和?Java編寫的庫?,通過 Py4J,Python程序能夠動態(tài)訪問Java虛擬機(jī)中的Java對象,Java程序也能夠回調(diào) Python對象。使用pip install py4j安裝即可。
3.2.6.查看Spark是否安裝成功
打開cmd窗口,輸入spark-shell,出現(xiàn)以下內(nèi)容說明配置成功:
4.基于Spark的數(shù)據(jù)分析與可視化
4.1.分析降水?dāng)?shù)據(jù)
我們首先計(jì)算各個(gè)城市過去24個(gè)小時(shí)的累積降水量。思路是按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的rain1h字段進(jìn)行分組求和。相關(guān)步驟如下:
(1)創(chuàng)建SparkSession對象spark;
(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;
(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province、city_name、city_code、rain1h字段,并使用Column對象的cast(dateType)方法將rain1h轉(zhuǎn)成數(shù)值型,再使用Dateframe的filter方法篩選出rain1h小于1000的記錄(大于1000是異常數(shù)據(jù)),得到新的Dateframe df_rain;
(4)對df_rain進(jìn)行操作:使用Dateframe的groupBy操作按照province、city_name、city_code的字段分組,使用agg方法對rain1h字段進(jìn)行分組求和得到新的字段rain24h(過去24小時(shí)累積雨量),使用sort方法按照rain24h降序排列,經(jīng)過上述操作得到新的Dateframe df_rain_sum;
(5)對df_rain_sum調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存,提高性能;
(6)對df_rain_sum調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件;
(7)對df_rain_sum調(diào)用head()方法取前若干條數(shù)據(jù)(即24小時(shí)累積降水量Top-N的列表)供數(shù)據(jù)可視化使用。
代碼 6 計(jì)算各城市過去24小時(shí)累積雨量
def passed_rain_analyse(filename):
??? print("開始分析累積降雨量")
??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
??? df = spark.read.csv(filename, header=True)
??? df_rain = df.select(df['province'], df['city_name'], df['city_code'],
??????????????????????? df['rain1h'].cast(DecimalType(scale=1))).filter(df['rain1h'] < 1000)
??? # 篩選數(shù)據(jù),去除無效數(shù)據(jù),一小時(shí)降水大于1000視為無效
??? df_rain_sum = df_rain.groupBy("province", "city_name", "city_code").agg(F.sum("rain1h").alias("rain24h")).sort(
??????? F.desc("rain24h"))? # 分組、求和、排序
??? df_rain_sum.coalesce(1).write.csv("passed_rain_analyse.csv")
??? print("累積降雨量分析完畢!")
??? return df_rain_sum.head(20)? # 前20個(gè) |
4.2.分析氣溫?cái)?shù)據(jù)
根據(jù)國家標(biāo)準(zhǔn)(《地面氣象服務(wù)觀測規(guī)范》),日平均氣溫取四時(shí)次數(shù)據(jù)的平均值,四時(shí)次數(shù)據(jù)為:02時(shí)、08時(shí)、14時(shí)、20時(shí)。據(jù)此,應(yīng)該先篩選出各個(gè)時(shí)次的氣溫?cái)?shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的tempeature字段進(jìn)行分組求平均。相關(guān)步驟如下:
(1)創(chuàng)建SparkSession對象spark;
(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;
(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province,city_name,city_code,temperature字段,并使用庫pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)將time字段轉(zhuǎn)換成date(日期)字段和hour(小時(shí))字段,(time字段的分秒信息無用),,得到新的Dateframe df_temperature;
(4)對df_temperature進(jìn)行操作:使用Dateframe的filter操作過濾出hour字段在[2,8,14,20]中的記錄,經(jīng)過上述操作得到新的Dateframe df_4point_temperature;
(5)對df_4point_temperature進(jìn)行操作:使用Dateframe的groupBy操作按照province、city_name、city_code、date字段分組,使用agg方法對temperature字段進(jìn)行分組計(jì)數(shù)和求和(求和字段命名為avg_temperature),使用filter方法過濾出分組計(jì)數(shù)為4的記錄(確保有4個(gè)時(shí)次才能計(jì)算日平均溫),使用sort方法按照avg_temperature進(jìn)行排列,desc是降序,asc是升序;再篩選出需要保存的字段province、city_name、city_code、date,avg_temperature(順便使用庫pyspark.sql.functions中的format_number(col, precision)方法保留一位小數(shù)),經(jīng)過上述操作得到新的Dateframe df_avg_temperature;
(6)對df_avg_temperature調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存,提高性能;
(7)對df_avg_temperature調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件;
(8)對df_rain_sum調(diào)用collect()方法取將Dateframe轉(zhuǎn)換成list,方便后續(xù)進(jìn)行數(shù)據(jù)可視化。
代碼 7 計(jì)算各城市過去24小時(shí)平均氣溫
def passed_temperature_analyse(filename):
??? print("開始分析氣溫")
??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
??? df = spark.read.csv(filename, header=True)
??? df_temperature = df.select(? # 選擇需要的列
??????? df['province'],
??????? df['city_name'],
??????? df['city_code'],
??????? df['temperature'].cast(DecimalType(scale=1)),? # 轉(zhuǎn)換為十進(jìn)制類型,并將小數(shù)點(diǎn)后的位數(shù)保留1位
??????? F.date_format(df['time'], "yyyy-MM-dd").alias("date"),? # 得到日期數(shù)據(jù)
??????? F.hour(df['time']).alias("hour")? # 得到小時(shí)數(shù)據(jù),命名為hour
??? ).filter(df['temperature'] < 1000)
??? # 篩選四點(diǎn)時(shí)次
??? df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))
??? df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date").agg(
??????? F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(
??????? F.asc("avg_temperature")).select("province", "city_name", "city_code", "date",
???????????????????????????????????????? F.format_number('avg_temperature', 1).alias("avg_temperature"))
??? df_avg_temperature.show()
??? avg_temperature_list = df_avg_temperature.collect()
??? df_avg_temperature.coalesce(1).write.csv("passed_temperature.csv")
??? print("氣溫分析完畢")
??? return avg_temperature_list[0:20]? # 前20個(gè) |
我們可以使用同樣的方法得到過去24小時(shí)各省級行政區(qū)的平均氣溫,只需修改groupby語句即可(修改為groupBy("province"))。
4.3.分析氣壓數(shù)據(jù)
先篩選出各個(gè)時(shí)次的氣壓數(shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的pressure字段進(jìn)行分組求平均。相關(guān)步驟如下:
(1)創(chuàng)建SparkSession對象spark;
(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;
(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province、city_name、city_code、pressure字段,并使用庫pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)將time字段轉(zhuǎn)換成date(日期)字段和hour(小時(shí))字段,(time字段的分秒信息無用),,得到新的Dateframe df_pressure;
(4)對df_pressure進(jìn)行操作:使用Dateframe的filter操作過濾出hour字段在[2,8,14,20]中的記錄,經(jīng)過上述操作得到新的Dateframe df_4point_pressure;
(5)對df_4point_pressure進(jìn)行操作:使用Dateframe的groupBy操作按照province,city_name,city_code,date字段分組,使用agg方法對pressure字段進(jìn)行分組計(jì)數(shù)和求和(求和字段命名為avg_pressure),使用filter方法過濾出分組計(jì)數(shù)為4的記錄(確保有4個(gè)時(shí)次才能計(jì)算日平均溫),使用sort方法按照avg_pressure進(jìn)行排列,desc是降序,asc是升序;再篩選出需要保存的字段province、city_name、city_code、date,avg_pressure(順便使用庫pyspark.sql.functions中的format_number(col, precision)方法保留一位小數(shù)),經(jīng)過上述操作得到新的Dateframe df_avg_pressure;
(6)對df_avg_pressure調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存以提高性能;
(7)對df_avg_pressure調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件。
代碼 8 計(jì)算各城市過去24小時(shí)平均氣壓
def passed_pressure_analyse(filename):
??? print("開始分析氣壓")
??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
??? df = spark.read.csv(filename, header=True)
??? df_pressure = df.select(? # 選擇需要的列
??????? df['province'],
??????? df['city_name'],
??????? df['city_code'],
??????? df['pressure'].cast(DecimalType(scale=1)),
??????? F.date_format(df['time'], "yyyy-MM-dd").alias("date"),? # 得到日期數(shù)據(jù)
??????? F.hour(df['time']).alias("hour")? # 得到小時(shí)數(shù)據(jù)
??? )
??? df_4point_pressure = df_pressure.filter(df_pressure['hour'].isin([2, 8, 14, 20]))
??? df_avg_pressure = df_4point_pressure.groupBy("province", "city_name", "city_code", "date").agg(
??????? F.count("pressure"), F.avg("pressure").alias("avg_pressure")).sort(
??????? F.asc("avg_pressure")).select("province", "city_name", "city_code", "date",
????????????????????????????????????? F.format_number('avg_pressure', 1).alias("avg_pressure"))
??? avg_pressure_list = df_avg_pressure.collect()
??? df_avg_pressure.coalesce(1).write.csv("passed_pressure.csv")
??? print("氣壓分析完畢")
??? return avg_pressure_list[0:20]? # 最低的20個(gè) |
4.4.分析風(fēng)力數(shù)據(jù)
我們首先篩選出各個(gè)時(shí)次的風(fēng)向數(shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的windDirection字段進(jìn)行分組求平均。相關(guān)步驟與求平均溫度與平均氣壓相似。注意到風(fēng)向字段是一個(gè)0到360的數(shù),我們可以將它劃分為八個(gè)風(fēng)向(北風(fēng)、東北風(fēng)、東風(fēng)、東南風(fēng)、南風(fēng)、西南風(fēng)、西風(fēng)和西北風(fēng)),每個(gè)風(fēng)向區(qū)間大小為45°,以便進(jìn)行后續(xù)的分析與可視化。
4.5.繪制累積降水量柱狀圖
首先draw_rain(rain_list)函數(shù)遍歷傳入的降雨列表,將每個(gè)城市的名稱和24小時(shí)降雨量分別添加到name_list和num_list中。接著,設(shè)置一個(gè)索引列表,用于設(shè)置每個(gè)城市的柱狀圖位置。之后創(chuàng)建圖形,并使用plt.bar()函數(shù)繪制柱狀圖。在繪制柱狀圖時(shí),color參數(shù)設(shè)置每個(gè)柱子的顏色, width參數(shù)設(shè)置柱子的寬度。plt.xticks()函數(shù)設(shè)置x軸刻度標(biāo)簽的位置和字體大小,plt.ylim()函數(shù)設(shè)置y軸刻度范圍,plt.xlabel()和plt.ylabel()函數(shù)設(shè)置x軸和y軸標(biāo)簽的名稱和字體大小,plt.title()函數(shù)設(shè)置圖形的標(biāo)題和字體大小。在繪制完柱狀圖后,使用for循環(huán)遍歷每個(gè)柱子,并使用plt.text()函數(shù)在柱子上方添加降雨量的數(shù)值。?
代碼 9 繪制各城市過去24小時(shí)累積降雨量圖
def draw_rain(rain_list):
??? print("開始繪制累積降雨量圖")
??? name_list = []
??? num_list = []
??? for item in rain_list:
??????? name_list.append(item.province[0:2] + '\n' + item.city_name)
??????? num_list.append(item.rain24h)
??? index = [i + 0.25 for i in range(0, len(num_list))]
??? plt.figure(figsize=(15, 15))? # 設(shè)置圖的大小
??? rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
??? plt.xticks([i + 0.25 for i in index], name_list, fontsize=15, color='r')? # fontsize設(shè)置x刻度字體大小
??? plt.ylim(ymax=(int(max(num_list) + 100) / 100) * 60, ymin=0)? # 設(shè)置刻度間隔
??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小
??? plt.xlabel("城市", fontsize=25, color='darkblue')? # fontsize設(shè)置x坐標(biāo)標(biāo)簽字體大小
??? plt.ylabel("雨量", fontsize=25, color='darkblue')? # fontsize設(shè)置y坐標(biāo)標(biāo)簽字體大小
??? plt.title("2023年5月11日24小時(shí)累計(jì)降雨量全國前20名", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小
??? for rect in rects:
??????? height = rect.get_height()
??????? # fontsize設(shè)置直方圖上字體大小
??????? plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
??? plt.show()
??? print("累積降雨量圖繪制完畢!") |
4.6.繪制平均氣溫柱狀圖
draw_temperature函數(shù)用于繪制氣溫圖。函數(shù)首先創(chuàng)建兩個(gè)空列表name_list和num_list,并將temperature_list中每個(gè)元素的省份、城市名和平均氣溫分別添加到name_list和num_list中。接下來,創(chuàng)建一個(gè)索引列表index,其中每個(gè)元素都是當(dāng)前索引值加上0.25。然后通過用plt.figure創(chuàng)建指定大小的圖形,并使用plt.bar創(chuàng)建一個(gè)條形圖,其中每個(gè)條形的高度由num_list中的相應(yīng)元素確定。函數(shù)使用plt.xticks設(shè)置x軸刻度標(biāo)簽,其中每個(gè)標(biāo)簽都是name_list中的相應(yīng)元素。函數(shù)使用plt.ylim設(shè)置y軸刻度范圍,其中最大值是num_list中的最大值乘以3.3并向上取整,最小值為0。plt.yticks設(shè)置y軸刻度標(biāo)簽的字體大小和顏色,plt.xlabel和plt.ylabel設(shè)置x軸和y軸標(biāo)簽的字體大小和顏色,plt.title設(shè)置圖形標(biāo)題的字體大小和顏色。最后,使用plt.text在每個(gè)條形上添加高度標(biāo)簽,并使用plt.show顯示圖形。
代碼 10 繪制各城市過去24小時(shí)平均氣溫圖
def draw_temperature(temperature_list):
??? print("開始繪制氣溫圖")
??? name_list = []
??? num_list = []
??? date = temperature_list[1].date
??? for item in temperature_list:
??????? name_list.append(item.province[0:2] + '\n' + item.city_name[0:2] + '\n' + item.city_name[2:])
??????? num_list.append(float(item.avg_temperature))
??? index = [i + 0.25 for i in range(0, len(num_list))]
??? plt.figure(figsize=(20, 12))? # 設(shè)置圖的大小
??? rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
??? plt.xticks([i + 0.25 for i in index], name_list, fontsize=20, color='r')? # fontsize設(shè)置x刻度字體大小
??? plt.ylim(ymax=math.ceil(float(max(num_list))) * 3.3, ymin=0)? # 設(shè)置刻度間隔
??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小
??? plt.xlabel("城市", fontsize=25, color='darkblue')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
??? plt.ylabel("日平均氣溫", fontsize=25, color='darkblue')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
??? plt.title("2023-5-11全國日平均氣溫最低前20名", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小
??? for rect in rects:
??????? height = rect.get_height()
??????? plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
??? plt.show()
??? print("氣溫圖繪制完畢!") |
?
4.7.繪制平均氣壓折線圖
draw_pressure函數(shù)用來繪制全國日平均氣壓最低前20名的城市的氣壓圖。首先循環(huán)遍歷數(shù)組中的每一行,將每個(gè)城市的名稱和對應(yīng)的日平均氣壓值分別添加到兩個(gè)列表中。接下來使用matplotlib庫中的plot函數(shù)繪制了氣壓圖,并設(shè)置了圖例、坐標(biāo)軸標(biāo)簽、刻度字體大小等屬性。最后,循環(huán)遍歷列表中的每個(gè)城市,將其對應(yīng)的氣壓值添加到圖中,并設(shè)置了數(shù)字標(biāo)簽。
代碼 11 繪制各城市過去24小時(shí)平均氣壓圖
def draw_pressure(file_address):
??? print("開始繪制氣壓圖")
??? name_list = []
??? num_list = []
??? df = pd.read_csv(file_address, header=None)
??? data = df.to_numpy()
??? print(data)
??? for i in data:
??????? name_list.append(i[0][0:2] + '\n' + i[1][0:2] + '\n' + i[1][2:])
??????? num_list.append(float(i[4]))
??? plt.figure(figsize=(15, 12))
??? plt.plot(name_list, num_list, label='日平均氣壓', alpha=0.8, mfc='r', ms=30, mec='b', marker='.', linewidth=5,
???????????? linestyle="--")
??? plt.xticks(fontsize=20, color='r')? # fontsize設(shè)置x刻度字體大小
??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小
??? plt.xlabel("城市", fontsize=25, color='b')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
??? plt.ylabel("日平均氣壓", fontsize=25, color='b')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
??? # 設(shè)置數(shù)字標(biāo)簽
??? for a, b in zip(name_list, num_list):
??????? plt.text(a, b + 0.5, int(b), ha='center', va='bottom', fontsize=20)
??? plt.title("2023年5月11日全國日平均氣壓最低前20名", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小
??? plt.rcParams.update({'font.size': 20})
??? plt.legend(bbox_to_anchor=(1, 0.1))? # 圖例移到右下角
??? plt.show()
??? print("氣壓圖繪制完畢!") |
?
4.8.繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖
draw_windDirection用于繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖。首先定義了一個(gè)字典,用于存儲各個(gè)方向的風(fēng)的數(shù)量。接著提取出風(fēng)向數(shù)據(jù),并將其轉(zhuǎn)換為浮點(diǎn)數(shù)類型。然后計(jì)算了數(shù)據(jù)的總數(shù),并使用一個(gè)for循環(huán)遍歷數(shù)據(jù)列表,將每個(gè)數(shù)據(jù)點(diǎn)分配到相應(yīng)的風(fēng)向類別中。接下來,將每個(gè)風(fēng)向類別的數(shù)量存儲在一個(gè)列表中,并定義了每個(gè)類別的標(biāo)簽和顏色。然后計(jì)算了每個(gè)類別的百分比,并定義了一個(gè)突出模塊的偏移值。最后使用matplotlib庫中的pie函數(shù)繪制了一個(gè)餅圖,并設(shè)置標(biāo)題和字體大小。
代碼 12 繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖
def draw_windDirection(file_address):
??? dict = {'bei': 0, 'dongbei': 0, 'dong': 0, 'dongnan': 0, 'nan': 0, 'xinan': 0, 'xi': 0, 'xibei': 0}
??? df = pd.read_csv(file_address)
??? data = df.to_numpy()
??? list = data[:, -1]
??? list = [float(x) for x in list]
??? Num = df.shape[0]? # 4546
??? for i in list:
??????? if 0 <= i <= 22.5 or 337.5 <= i <= 360:
??????????? dict['bei'] += 1
??????? if 22.5 < i <= 67.5:
??????????? dict['dongbei'] += 1
??????? if 67.5 < i <= 112.5:
??????????? dict['dong'] += 1
??????? if 112.5 < i <= 157.5:
??????????? dict['dongnan'] += 1
??????? if 157.5 < i <= 202.5:
??????????? dict['nan'] += 1
??????? if 202.5 < i <= 247.5:
??????????? dict['xinan'] += 1
??????? if 247.5 < i <= 292.5:
??????????? dict['xi'] += 1
??????? if 292.5 < i <= 337.5:
??????????? dict['xibei'] += 1
??? # {'bei': 286, 'dongbei': 283, 'dong': 405, 'dongnan': 644, 'nan': 1017, 'xinan': 931, 'xi': 650, 'xibei': 330}
??? data = [dict['bei'], dict['dongbei'], dict['dong'], dict['dongnan'], dict['nan'], dict['xinan'], dict['xi'],
??????????? dict['xibei']]
??? # 數(shù)據(jù)標(biāo)簽
??? labels = ['北風(fēng)', '東北風(fēng)', '東風(fēng)', '東南風(fēng)', '南風(fēng)', '西南風(fēng)', '西風(fēng)', '西北風(fēng)']
??? # 各區(qū)域顏色
??? colors = ['lightcoral', 'orange', 'yellow', 'yellowgreen', 'plum', 'cyan', 'hotpink', 'silver']
??? # 數(shù)據(jù)計(jì)算處理
??? sizes = [data[0] / Num, data[1] / Num, data[2] / Num, data[3] / Num, data[4] / Num,
???????????? data[5] / Num, data[6] / Num, data[7] / Num]
??? # 設(shè)置突出模塊偏移值
??? expodes = (0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0)
??? plt.figure(figsize=(15, 15))
??? # 設(shè)置繪圖屬性并繪圖
??? plt.pie(sizes, explode=expodes, labels=labels, shadow=True, colors=colors, autopct='%.1f%%',
??????????? textprops={'fontsize': 22})
??? plt.axis('equal')
??? plt.title("2023年5月11日全國風(fēng)向統(tǒng)計(jì)圖", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小
??? plt.show() |
?
4.9.繪制全國各省平均氣溫地圖
draw_province_temperature函數(shù)用來繪制全國各省平均氣溫地圖。首先定義了一個(gè)包含各省名稱和平均氣溫的列表,然后使用pyecharts庫中的Map函數(shù)創(chuàng)建地圖,設(shè)置了地圖的初始高度和寬度,并添加了一個(gè)名為“氣溫”的系列,數(shù)據(jù)對為前面定義的列表,地圖類型為china,啟用了鼠標(biāo)滾輪縮放和拖動平移,不顯示圖形標(biāo)記。接下來設(shè)置了全局選項(xiàng),包括標(biāo)題、數(shù)據(jù)標(biāo)準(zhǔn)顯示和可視化映射選項(xiàng),其中標(biāo)題包括主標(biāo)題、副標(biāo)題和位置,數(shù)據(jù)標(biāo)準(zhǔn)顯示設(shè)置了最大值和最小值,可視化映射選項(xiàng)可以設(shè)置顏色范圍。最后設(shè)置了系列選項(xiàng),包括標(biāo)簽名稱顯示和顏色。最后使用render函數(shù)將地圖渲染為html文件?。
代碼 13 繪制全國各省平均氣溫地圖
Map(init_opts=opts.InitOpts(height="1000px", width="1500px")).add(
??? series_name="氣溫",
??? data_pair=tempreture,
??? maptype="china",
??? is_roam=True,
??? is_map_symbol_show=False,
).set_global_opts(
??? title_opts=opts.TitleOpts(title="2023-5-11全國各省平均氣溫",
????????????????????????????? subtitle="數(shù)據(jù)來源:中央氣象臺網(wǎng)站",
??????????? ??????????????????pos_right="center",
????????????????????????????? pos_top="5%"),
??? visualmap_opts=opts.VisualMapOpts(max_=30,
????????????????????????????????????? min_=0),
).set_series_opts(
??? label_opts=opts.LabelOpts(is_show=True, color="blue")
).render("2023-5-11全國各省平均氣溫.html") |
?
4.10.繪制武漢市主城區(qū)一天風(fēng)級雷達(dá)圖
draw_windSpeed函數(shù)用于繪制武漢市主城區(qū)一天內(nèi)不同風(fēng)向下的風(fēng)速平均值的極區(qū)圖。degs = np.arange(45, 361, 45)定義了一個(gè)包含45度到360度之間每隔45度的角度的numpy數(shù)組,定義空列表temp用于存儲不同風(fēng)向下的風(fēng)速平均值。之后對于每個(gè)角度deg,循環(huán)遍歷24小時(shí)內(nèi)的風(fēng)向數(shù)據(jù)windd,如果windd[i]等于deg,則將對應(yīng)的風(fēng)速數(shù)據(jù)wind_speed[i]添加到speed列表中。如果speed列表為空,則將0添加到temp列表中,否則將speed列表中的所有元素求和并除以元素個(gè)數(shù),將結(jié)果添加到temp列表中。theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8) 定義了一個(gè)整數(shù)N和一個(gè)包含N個(gè)元素的numpy數(shù)組theta,用于繪制極區(qū)圖。radii = np.array(temp) 將temp列表轉(zhuǎn)換為numpy數(shù)組radii,用于繪制極區(qū)圖。plt.axes(polar=True)定義了一個(gè)極坐標(biāo)系,colors = plt.cm.viridis(np.random.rand(N)) 定義了一個(gè)包含N個(gè)元素的numpy數(shù)組colors,用于設(shè)置每個(gè)扇區(qū)的顏色。plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors) 繪制了極區(qū)圖,其中theta和radii分別表示角度和半徑,width表示每個(gè)扇區(qū)的寬度,bottom表示每個(gè)扇區(qū)的起始位置,color表示每個(gè)扇區(qū)的顏色。plt.title設(shè)置了極區(qū)圖的標(biāo)題,其中x表示標(biāo)題的水平位置,fontsize表示標(biāo)題的字體大小,loc表示標(biāo)題的位置?。
代碼 14 繪制武漢市主城區(qū)一天風(fēng)級雷達(dá)圖
def draw_windSpeed(file):
??? data = pd.read_csv(file)
??? windd = []
??? wind = list(data['windDirection'])
??? wind_speed = list(data['windSpeed'])
??? for i in wind:
??????? if i > 360:
??????????? windd.append(0)
??????? if 0 <= i <= 22.5 or 337.5 <= i <= 360:
??????????? windd.append(90)
??????? if 22.5 < i <= 67.5:
??????????? windd.append(45)
??????? if 67.5 < i <= 112.5:
??????????? windd.append(360)
??????? if 112.5 < i <= 157.5:
??????????? windd.append(315)
??????? if 157.5 < i <= 202.5:
??????????? windd.append(270)
??????? if 202.5 < i <= 247.5:
??????????? windd.append(225)
??????? if 247.5 < i <= 292.5:
??????????? windd.append(180)
??????? if 292.5 < i <= 337.5:
??????????? windd.append(135)
??? degs = np.arange(45, 361, 45)
??? temp = []
??? for deg in degs:
??????? speed = []
??????? for i in range(0, 24):
??????????? if windd[i] == deg:
??????????????? speed.append(wind_speed[i])
??????? if len(speed) == 0:
??????????? temp.append(0)
??????? else:
??????????? temp.append(sum(speed) / len(speed))
??? N = 8
??? theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8)
??? radii = np.array(temp)
??? plt.axes(polar=True)
??? colors = plt.cm.viridis(np.random.rand(N))
??? plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors)
??? plt.title('2023-5-12武漢市主城區(qū)一天風(fēng)級圖', x=0.2, fontsize=15, loc='left')
??? plt.show() |
源代碼
spider.py文章來源:http://www.zghlxwxcb.cn/news/detail-488567.html
import urllib.request, urllib.error
import json
import csv
import os
import importlib, sys
importlib.reload(sys)
class Crawler:
def get_html(self, url):
headers = {
'User-Agent': "Mozilla/5.0 (Windows; U; Windows NT 5.2) Gecko/2008070208 Firefox/3.0.1"
}
request = urllib.request.Request(url,headers=headers)
response = urllib.request.urlopen(request)
return response.read().decode()
def parse_json(self, url):
obj = self.get_html(url)
if obj:
json_obj = json.loads(obj)
else:
json_obj = list()
return json_obj
def write_csv(self, file, data):
if data:
print("開始寫入 " + file)
with open(file, 'a+', encoding='utf-8-sig') as f: # utf-8-sig 帶BOM的utf-8
f_csv = csv.DictWriter(f, data[0].keys())
# if not os.path.exists(file):
f_csv.writeheader()
f_csv.writerows(data)
print("結(jié)束寫入 " + file)
def write_header(self, file, data):
if data:
print("開始寫入 " + file)
with open(file, 'a+', encoding='utf-8-sig') as f:
f_csv = csv.DictWriter(f, data[0].keys())
f_csv.writeheader()
f_csv.writerows(data)
print("結(jié)束寫入 " + file)
def write_row(self, file, data):
if data:
print("開始寫入 " + file)
with open(file, 'a+', encoding='utf-8-sig') as f:
f_csv = csv.DictWriter(f, data[0].keys())
if not os.path.exists(file):
f_csv.writeheader()
f_csv.writerows(data)
print("結(jié)束寫入 " + file)
def read_csv(self, file):
print("開始讀取 " + file)
with open(file, 'r+', encoding='utf-8-sig') as f:
data = csv.DictReader(f)
print("結(jié)束讀取 " + file)
return list(data)
def get_provinces(self):
province_file = 'C:/Users/26909/Desktop/province.csv'
if not os.path.exists(province_file):
print("開始爬取省份")
provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
print("省份爬取完畢!")
self.write_csv(province_file, provinces)
else:
provinces = self.read_csv(province_file)
return provinces
def get_cities(self):
city_file = 'input/city.csv'
if not os.path.exists(city_file):
cities = list()
print("開始爬取城市")
for province in self.get_provinces():
url = province['url'].split('/')[-1].split('.')[0]
cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/' + url))
self.write_csv(city_file, cities)
print("爬取城市完畢!")
else:
cities = self.read_csv(city_file)
return cities
def get_passed_weather(self, province):
weather_passed_file = 'C:/Users/26909/Desktop/passed_weather_' + province + '.csv'
if os.path.exists(weather_passed_file):
return
passed_weather = list()
count = 0
if province == 'ALL':
print("開始爬取過去的天氣狀況")
for city in self.get_cities():
data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
if data:
count = count + 1
for item in data:
item['city_code'] = city['code']
item['province'] = city['province']
item['city_name'] = city['city']
item['city_index'] = str(count)
passed_weather.extend(data)
if count % 50 == 0:
if count == 50:
self.write_header(weather_passed_file, passed_weather)
else:
self.write_row(weather_passed_file, passed_weather)
passed_weather = list()
if passed_weather:
if count <= 50:
self.write_header(weather_passed_file, passed_weather)
else:
self.write_row(weather_passed_file, passed_weather)
print("爬取過去的天氣狀況完畢!")
else:
print("開始爬取過去的天氣狀況")
select_city = filter(lambda x: x['province'] == province, self.get_cities())
for city in select_city:
data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
if data:
count = count + 1
for item in data:
item['city_index'] = str(count)
item['city_code'] = city['code']
item['province'] = city['province']
item['city_name'] = city['city']
passed_weather.extend(data)
self.write_csv(weather_passed_file, passed_weather)
print("爬取過去的天氣狀況完畢!")
def run(self, range='ALL'):
self.get_passed_weather(range)
cr = Crawler()
cr.run('ALL')
?analysis.py文章來源地址http://www.zghlxwxcb.cn/news/detail-488567.html
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
import matplotlib.pyplot as plt
import math
from pyecharts import options as opts
from pyecharts.charts import Map
# 解決中文顯示問題
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 計(jì)算各個(gè)城市過去24小時(shí)累積雨量
def passed_rain_analyse(filename):
print("開始分析累積降雨量")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv(filename, header=True)
df_rain = df.select(df['province'], df['city_name'], df['city_code'],
df['rain1h'].cast(DecimalType(scale=1))).filter(df['rain1h'] < 1000)
# 篩選數(shù)據(jù),去除無效數(shù)據(jù),一小時(shí)降水大于1000視為無效
df_rain_sum = df_rain.groupBy("province", "city_name", "city_code").agg(F.sum("rain1h").alias("rain24h")).sort(
F.desc("rain24h")) # 分組、求和、排序
df_rain_sum.coalesce(1).write.csv("passed_rain_analyse.csv")
print("累積降雨量分析完畢!")
return df_rain_sum.head(20) # 前20個(gè)
# 武漢過去24小時(shí)的風(fēng)速和風(fēng)向
def passed_windSpeed_analyse(filename):
print("開始分析風(fēng)速")
df = pd.read_csv(filename)
df_windSpeed = df[df['city_name'] == '武漢']
df_windSpeed.to_csv("wuhan_wind.csv")
return df_windSpeed # 前20個(gè)
# 計(jì)算各個(gè)城市過去24小時(shí)平均氣溫
def passed_temperature_analyse(filename):
print("開始分析氣溫")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv(filename, header=True)
df_temperature = df.select( # 選擇需要的列
df['province'],
df['city_name'],
df['city_code'],
df['temperature'].cast(DecimalType(scale=1)), # 轉(zhuǎn)換為十進(jìn)制類型,并將小數(shù)點(diǎn)后的位數(shù)保留1位
F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期數(shù)據(jù)
F.hour(df['time']).alias("hour") # 得到小時(shí)數(shù)據(jù),命名為hour
).filter(df['temperature'] < 1000)
# 篩選四點(diǎn)時(shí)次
df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))
df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date").agg(
F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(
F.asc("avg_temperature")).select("province", "city_name", "city_code", "date",
F.format_number('avg_temperature', 1).alias("avg_temperature"))
df_avg_temperature.show()
avg_temperature_list = df_avg_temperature.collect()
df_avg_temperature.coalesce(1).write.csv("passed_temperature.csv")
print("氣溫分析完畢")
return avg_temperature_list[0:20] # 前20個(gè)
# 計(jì)算各省過去24小時(shí)平均氣溫
def province_temperature_analyse(filename):
print("開始分析各省氣溫")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv(filename, header=True)
df_temperature = df.select( # 選擇需要的列
df['province'],
df['temperature'].cast(DecimalType(scale=1)),
F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期數(shù)據(jù)
F.hour(df['time']).alias("hour") # 得到小時(shí)數(shù)據(jù)
).filter(df['temperature'] < 1000)
# 篩選四點(diǎn)時(shí)次
df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))
df_avg_temperature = df_4point_temperature.groupBy("province").agg(
F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(
F.asc("avg_temperature")).select("province", F.format_number('avg_temperature', 1).alias("avg_temperature"))
df_avg_temperature.show()
avg_temperature_list = df_avg_temperature.collect()
df_avg_temperature.coalesce(1).write.csv("province_temperature.csv")
print("氣溫分析完畢")
return avg_temperature_list[0:20] # 最低的20個(gè)
# 計(jì)算各個(gè)城市過去24小時(shí)平均氣壓
def passed_pressure_analyse(filename):
print("開始分析氣壓")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv(filename, header=True)
df_pressure = df.select( # 選擇需要的列
df['province'],
df['city_name'],
df['city_code'],
df['pressure'].cast(DecimalType(scale=1)),
F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期數(shù)據(jù)
F.hour(df['time']).alias("hour") # 得到小時(shí)數(shù)據(jù)
)
# 篩選四點(diǎn)時(shí)次
df_4point_pressure = df_pressure.filter(df_pressure['hour'].isin([2, 8, 14, 20]))
df_avg_pressure = df_4point_pressure.groupBy("province", "city_name", "city_code", "date").agg(
F.count("pressure"), F.avg("pressure").alias("avg_pressure")).sort(
F.asc("avg_pressure")).select("province", "city_name", "city_code", "date",
F.format_number('avg_pressure', 1).alias("avg_pressure"))
avg_pressure_list = df_avg_pressure.collect()
df_avg_pressure.coalesce(1).write.csv("passed_pressure.csv")
print("氣壓分析完畢")
return avg_pressure_list[0:20] # 最低的20個(gè)
# 計(jì)算各個(gè)城市過去24小時(shí)平均風(fēng)向
def passed_windDirection_analyse(filename):
print("開始分析風(fēng)向")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv(filename, header=True)
df_windDirection = df.select( # 選擇需要的列
df['province'],
df['city_name'],
df['city_code'],
df['windDirection'].cast(DecimalType(scale=1)),
F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期數(shù)據(jù)
F.hour(df['time']).alias("hour") # 得到小時(shí)數(shù)據(jù)
).filter(df['windDirection'] < 400)
# 篩選四點(diǎn)時(shí)次
df_4point_windDirection = df_windDirection.filter(df_windDirection['hour'].isin([2, 8, 14, 20]))
df_avg_windDirection = df_4point_windDirection.groupBy("province", "city_name", "city_code", "date").agg(
F.count("windDirection"), F.avg("windDirection").alias("avg_windDirection")).sort(
F.asc("avg_windDirection")).select("province", "city_name", "city_code", "date",
F.format_number('avg_windDirection', 1).alias("avg_windDirection"))
df_avg_windDirection.coalesce(1).write.csv("passed_windDirection.csv")
print("風(fēng)向分析完畢")
# 繪制累積降雨量圖
def draw_rain(rain_list):
print("開始繪制累積降雨量圖")
name_list = []
num_list = []
for item in rain_list:
name_list.append(item.province[0:2] + '\n' + item.city_name)
num_list.append(item.rain24h)
index = [i + 0.25 for i in range(0, len(num_list))]
plt.figure(figsize=(15, 15)) # 設(shè)置圖的大小
rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
plt.xticks([i + 0.25 for i in index], name_list, fontsize=15, color='r') # fontsize設(shè)置x刻度字體大小
plt.ylim(ymax=(int(max(num_list) + 100) / 100) * 60, ymin=0) # 設(shè)置刻度間隔
plt.yticks(fontsize=20, color='r') # fontsize設(shè)置y刻度字體大小
plt.xlabel("城市", fontsize=25, color='darkblue') # fontsize設(shè)置x坐標(biāo)標(biāo)簽字體大小
plt.ylabel("雨量", fontsize=25, color='darkblue') # fontsize設(shè)置y坐標(biāo)標(biāo)簽字體大小
plt.title("2023年5月11日24小時(shí)累計(jì)降雨量全國前20名", fontsize=30, color='b') # fontsize設(shè)置標(biāo)題字體大小
for rect in rects:
height = rect.get_height()
# fontsize設(shè)置直方圖上字體大小
plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
plt.show()
print("累積降雨量圖繪制完畢!")
def draw_temperature(temperature_list):
print("開始繪制氣溫圖")
name_list = []
num_list = []
date = temperature_list[1].date
for item in temperature_list:
name_list.append(item.province[0:2] + '\n' + item.city_name[0:2] + '\n' + item.city_name[2:])
num_list.append(float(item.avg_temperature))
index = [i + 0.25 for i in range(0, len(num_list))]
plt.figure(figsize=(20, 12)) # 設(shè)置圖的大小
rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
plt.xticks([i + 0.25 for i in index], name_list, fontsize=20, color='r') # fontsize設(shè)置x刻度字體大小
plt.ylim(ymax=math.ceil(float(max(num_list))) * 3.3, ymin=0) # 設(shè)置刻度間隔
plt.yticks(fontsize=20, color='r') # fontsize設(shè)置y刻度字體大小
plt.xlabel("城市", fontsize=25, color='darkblue') # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
plt.ylabel("日平均氣溫", fontsize=25, color='darkblue') # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
plt.title("2023-5-11全國日平均氣溫最低前20名", fontsize=30, color='b') # fontsize設(shè)置標(biāo)題字體大小
for rect in rects:
height = rect.get_height()
# fontsize設(shè)置直方圖上字體大小
plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
plt.show()
print("氣溫圖繪制完畢!")
def draw_pressure(file_address):
print("開始繪制氣壓圖")
name_list = []
num_list = []
df = pd.read_csv(file_address, header=None)
data = df.to_numpy()
print(data)
for i in data:
name_list.append(i[0][0:2] + '\n' + i[1][0:2] + '\n' + i[1][2:])
num_list.append(float(i[4]))
plt.figure(figsize=(15, 12))
plt.plot(name_list, num_list, label='日平均氣壓', alpha=0.8, mfc='r', ms=30, mec='b', marker='.', linewidth=5,
linestyle="--")
plt.xticks(fontsize=20, color='r') # fontsize設(shè)置x刻度字體大小
plt.yticks(fontsize=20, color='r') # fontsize設(shè)置y刻度字體大小
plt.xlabel("城市", fontsize=25, color='b') # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
plt.ylabel("日平均氣壓", fontsize=25, color='b') # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
# 設(shè)置數(shù)字標(biāo)簽
for a, b in zip(name_list, num_list):
plt.text(a, b + 0.5, int(b), ha='center', va='bottom', fontsize=20)
plt.title("2023年5月11日全國日平均氣壓最低前20名", fontsize=30, color='b') # fontsize設(shè)置標(biāo)題字體大小
plt.rcParams.update({'font.size': 20})
plt.legend(bbox_to_anchor=(1, 0.1)) # 圖例移到右下角
plt.show()
print("氣壓圖繪制完畢!")
def draw_windDirection(file_address):
dict = {'bei': 0, 'dongbei': 0, 'dong': 0, 'dongnan': 0, 'nan': 0, 'xinan': 0, 'xi': 0, 'xibei': 0}
df = pd.read_csv(file_address)
data = df.to_numpy()
list = data[:, -1]
list = [float(x) for x in list]
Num = df.shape[0] # 4546
for i in list:
if 0 <= i <= 22.5 or 337.5 <= i <= 360:
dict['bei'] += 1
if 22.5 < i <= 67.5:
dict['dongbei'] += 1
if 67.5 < i <= 112.5:
dict['dong'] += 1
if 112.5 < i <= 157.5:
dict['dongnan'] += 1
if 157.5 < i <= 202.5:
dict['nan'] += 1
if 202.5 < i <= 247.5:
dict['xinan'] += 1
if 247.5 < i <= 292.5:
dict['xi'] += 1
if 292.5 < i <= 337.5:
dict['xibei'] += 1
# {'bei': 286, 'dongbei': 283, 'dong': 405, 'dongnan': 644, 'nan': 1017, 'xinan': 931, 'xi': 650, 'xibei': 330}
data = [dict['bei'], dict['dongbei'], dict['dong'], dict['dongnan'], dict['nan'], dict['xinan'], dict['xi'],
dict['xibei']]
# 數(shù)據(jù)標(biāo)簽
labels = ['北風(fēng)', '東北風(fēng)', '東風(fēng)', '東南風(fēng)', '南風(fēng)', '西南風(fēng)', '西風(fēng)', '西北風(fēng)']
# 各區(qū)域顏色
colors = ['lightcoral', 'orange', 'yellow', 'yellowgreen', 'plum', 'cyan', 'hotpink', 'silver']
# 數(shù)據(jù)計(jì)算處理
sizes = [data[0] / Num, data[1] / Num, data[2] / Num, data[3] / Num, data[4] / Num,
data[5] / Num, data[6] / Num, data[7] / Num]
# 設(shè)置突出模塊偏移值
expodes = (0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0)
plt.figure(figsize=(15, 15))
# 設(shè)置繪圖屬性并繪圖
plt.pie(sizes, explode=expodes, labels=labels, shadow=True, colors=colors, autopct='%.1f%%',
textprops={'fontsize': 22})
plt.axis('equal')
plt.title("2023年5月11日全國風(fēng)向統(tǒng)計(jì)圖", fontsize=30, color='b') # fontsize設(shè)置標(biāo)題字體大小
plt.show()
def draw_province_temperature():
tempreture = [
['甘肅省', 13.5],
['吉林省', 15.7],
['貴州省', 16.6],
['山西省', 16.9],
['陜西省', 17.7],
['四川省', 17.9],
['浙江省', 18.2],
['湖南省', 18.3],
['安徽省', 18.9],
['江西省', 18.9],
['上海市', 19.0],
['遼寧省', 19.0],
['北京市', 19.3],
['云南省', 19.4],
['湖北省', 19.6],
['江蘇省', 19.6],
['河北省', 19.9],
['河南省', 20.0],
['福建省', 20.1],
['重慶市', 20.9],
['山東省', 21.1],
['廣東省', 21.1],
['天津市', 22.1],
['臺灣省', 25.0],
['海南省', 26.1],
['廣西壯族自治區(qū)', 20.3],
['新疆維吾爾自治區(qū)', 20.4],
['澳門特別行政區(qū)', 22.4],
['香港特別行政區(qū)', 24.4],
['西藏自治區(qū)', 3.5],
['青海省', 3.6],
['黑龍江省', 16.5],
['內(nèi)蒙古自治區(qū)', 15.0],
['寧夏回族自治區(qū)', 16.2]
]
Map(init_opts=opts.InitOpts(height="1000px", width="1500px")).add(
series_name="氣溫",
data_pair=tempreture,
maptype="china",
# 是否啟用鼠標(biāo)滾輪縮放和拖動平移,默認(rèn)為True
is_roam=True,
# 是否顯示圖形標(biāo)記,默認(rèn)為True
is_map_symbol_show=False,
).set_global_opts(
# 設(shè)置標(biāo)題
title_opts=opts.TitleOpts(title="2023-5-11全國各省平均氣溫",
subtitle="數(shù)據(jù)來源:中央氣象臺網(wǎng)站",
pos_right="center",
pos_top="5%"),
# 設(shè)置標(biāo)準(zhǔn)顯示
visualmap_opts=opts.VisualMapOpts(max_=30,
min_=0),
# range_color=["#E0ECF8", "#045FB4"]),
).set_series_opts(
# 標(biāo)簽名稱顯示,默認(rèn)為True
label_opts=opts.LabelOpts(is_show=True, color="blue")
).render("2023-5-11全國各省平均氣溫.html")
def draw_windSpeed(file):
data = pd.read_csv(file)
windd = []
wind = list(data['windDirection'])
wind_speed = list(data['windSpeed'])
for i in wind:
if i > 360:
windd.append(0)
if 0 <= i <= 22.5 or 337.5 <= i <= 360:
windd.append(90)
if 22.5 < i <= 67.5:
windd.append(45)
if 67.5 < i <= 112.5:
windd.append(360)
if 112.5 < i <= 157.5:
windd.append(315)
if 157.5 < i <= 202.5:
windd.append(270)
if 202.5 < i <= 247.5:
windd.append(225)
if 247.5 < i <= 292.5:
windd.append(180)
if 292.5 < i <= 337.5:
windd.append(135)
degs = np.arange(45, 361, 45)
temp = []
for deg in degs:
speed = []
# 獲取 wind_deg 在指定范圍的風(fēng)速平均值數(shù)據(jù)
for i in range(0, 24):
if windd[i] == deg:
speed.append(wind_speed[i])
if len(speed) == 0:
temp.append(0)
else:
temp.append(sum(speed) / len(speed))
N = 8
theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8)
# 數(shù)據(jù)極徑
radii = np.array(temp)
# 繪制極區(qū)圖坐標(biāo)系
plt.axes(polar=True)
# 定義每個(gè)扇區(qū)的RGB值(R,G,B),x越大,對應(yīng)的顏色越接近藍(lán)色
colors = plt.cm.viridis(np.random.rand(N))
plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors)
plt.title('2023-5-12武漢市主城區(qū)一天風(fēng)級圖', x=0.2, fontsize=15, loc='left')
plt.show()
return temp
sourcefile = "passed_weather_ALL.csv"
# 降雨
# rain_list = passed_rain_analyse(sourcefile)
# draw_rain(rain_list)
# 氣溫
# temperature_list = passed_temperature_analyse(sourcefile)
# draw_temperature(temperature_list)
# 氣壓
# pressure_list = passed_pressure_analyse(sourcefile)
# draw_pressure('low_pressure.csv')
# 風(fēng)向
# windDirection_list = passed_windDirection_analyse(sourcefile)
# draw_windDirection('windDirection.csv')
# 各省氣溫
# province_temperature_analyse(sourcefile)
# draw_province_temperature()
# 風(fēng)向雷達(dá)圖
#passed_windSpeed_analyse(sourcefile)
draw_windSpeed('wuhan_wind.csv')
到了這里,關(guān)于基于Spark的氣象數(shù)據(jù)分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!