国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于Spark的氣象數(shù)據(jù)分析

這篇具有很好參考價(jià)值的文章主要介紹了基于Spark的氣象數(shù)據(jù)分析。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

  1. 研究背景與方案

1.1.研究背景

在大數(shù)據(jù)時(shí)代背景下,各行業(yè)數(shù)據(jù)的規(guī)模大幅度增加,數(shù)據(jù)類別日益復(fù)雜,給數(shù)據(jù)分析工作帶來極大挑戰(zhàn)。氣象行業(yè)和人們的生活息息相關(guān),隨著信息時(shí)代的發(fā)展,大數(shù)據(jù)技術(shù)的出現(xiàn)為氣象數(shù)據(jù)的發(fā)展帶來機(jī)遇?;诖?,本項(xiàng)目使用Spark等大數(shù)據(jù)處理工具,采用機(jī)器學(xué)習(xí)、深度學(xué)習(xí)等多種數(shù)據(jù)分析方法,并借助可視化手段將多種類型數(shù)據(jù)與復(fù)雜數(shù)據(jù)進(jìn)行解讀與概括,探究大數(shù)據(jù)技術(shù)在氣象數(shù)據(jù)中的應(yīng)用,給受眾傳遞更有價(jià)值的信息,進(jìn)而有助于提升社會整體生產(chǎn)效率,推動市場經(jīng)濟(jì)的有效發(fā)展。

1.2.研究方案

一、選用合適的數(shù)據(jù)集。歷史天氣數(shù)據(jù)可以通過爬取氣象網(wǎng)站獲得,這種方法的優(yōu)點(diǎn)在于可以靈活選擇自己想要的數(shù)據(jù),缺點(diǎn)在于耗時(shí)較長、可能遇到反爬機(jī)制,此外氣象網(wǎng)站的天氣數(shù)據(jù)往往特征維數(shù)不高,不適于機(jī)器學(xué)習(xí)、深度學(xué)習(xí)等任務(wù)。因此決定使用爬取的數(shù)據(jù)進(jìn)行基Spark的數(shù)據(jù)分析及可視化,選用數(shù)據(jù)競賽網(wǎng)站Kaggle的更大更高維的數(shù)據(jù)進(jìn)行基于學(xué)習(xí)的任務(wù)。

二、工具與環(huán)境配置。由于虛擬機(jī)受限于硬件,難以完成較大規(guī)模數(shù)據(jù)的分析任務(wù),于是決定在本地Windows操作系統(tǒng)下部署相關(guān)大數(shù)據(jù)環(huán)境(HadoopSpark、深度學(xué)習(xí)框架等)

三、選用合適的學(xué)習(xí)算法。不同類型的數(shù)據(jù)適合于不同的學(xué)習(xí)算法,天氣數(shù)據(jù)的特點(diǎn)是存在趨勢、周期性、節(jié)令性等規(guī)律,而由于循環(huán)神經(jīng)網(wǎng)絡(luò)RNN具有記憶能力,因此它對于處理時(shí)間序列數(shù)據(jù)非常有效。在訓(xùn)練過程中,網(wǎng)絡(luò)會根據(jù)歷史數(shù)據(jù)動態(tài)地更新權(quán)重,從而使得網(wǎng)絡(luò)能夠適應(yīng)不同的數(shù)據(jù)分布和趨勢。同時(shí),RNN通過考慮前后時(shí)刻之間的依賴性,使得更容易捕獲上述數(shù)據(jù)的特征,并可以自適應(yīng)地更新模型以處理噪聲和異常值的影響。因此選用RNN及其改進(jìn)(LSTM等)完成深度學(xué)習(xí)任務(wù)。

2.數(shù)據(jù)集介紹

2.1爬取中央氣象臺網(wǎng)站天氣數(shù)據(jù)

2.1.1.實(shí)驗(yàn)數(shù)據(jù)介紹

本次實(shí)驗(yàn)所采用的數(shù)據(jù)從中央氣象臺官方網(wǎng)站(關(guān)注陰晴冷暖,氣象一直為你)爬取,主要是最近24小時(shí)各個(gè)城市的天氣數(shù)據(jù),包括時(shí)間點(diǎn)(整點(diǎn))、整點(diǎn)氣溫、整點(diǎn)降水量、風(fēng)力、整點(diǎn)氣壓、相對濕度等,總數(shù)據(jù)量達(dá)到58368條。正常情況下,每個(gè)城市會對應(yīng)24條數(shù)據(jù)(每個(gè)整點(diǎn)一條)。有部分城市部分時(shí)間點(diǎn)數(shù)據(jù)存在缺失或異常。

2.1.2.數(shù)據(jù)獲取

2.1.2.1.觀察數(shù)據(jù)獲取方式

打開中央氣象臺官方網(wǎng)站,任意點(diǎn)擊“熱點(diǎn)城市”中的一個(gè)城市。打開瀏覽器的Web控制臺。通過切換“省份”和“城市”,我們可以發(fā)現(xiàn)網(wǎng)頁中的數(shù)據(jù)是以json字符串格式異步地從服務(wù)器傳送。可以發(fā)現(xiàn)以下數(shù)據(jù)和請求URL的關(guān)系:

1 數(shù)據(jù)和請求URL的關(guān)系

http://www.nmc.cn/f/rest/province

省份數(shù)據(jù)

http://www.nmc.cn/f/rest/province/+省份三位編碼

某個(gè)省份的城市數(shù)據(jù)

http://www.nmc.cn/f/rest/passed/+城市編號

某個(gè)城市最近24小時(shí)整點(diǎn)天氣數(shù)據(jù)

由于省份三位編碼(如福建省編碼為ABJ)需要從省份數(shù)據(jù)獲得中獲得,城市編號需要從城市數(shù)據(jù)獲得(如福州市編號為58847),所以為了獲得各個(gè)城市最近24小時(shí)整點(diǎn)天氣數(shù)據(jù),依次爬取省份數(shù)據(jù)、城市數(shù)據(jù)、最近24小時(shí)整點(diǎn)數(shù)據(jù)

2.1.2.2.數(shù)據(jù)爬取

由于可以直接通過訪問請求URL,傳回的響應(yīng)的數(shù)據(jù)部分即是json格式的數(shù)據(jù),所以只需要調(diào)用python的urllib2庫中相關(guān)函數(shù),對上述URL進(jìn)行請求即可。不需要像平常爬取HTML網(wǎng)頁時(shí)還需要對網(wǎng)頁源碼進(jìn)行解析,查找相關(guān)數(shù)據(jù)。唯一需要注意的是,有些城市可能不存在或者全部缺失最近24小時(shí)整點(diǎn)數(shù)據(jù),需要進(jìn)行過濾,以免出錯。

2.1.2.3數(shù)據(jù)存儲

雖然上一步獲取的json數(shù)據(jù)可以直接存儲并可使用SparkSession直接讀取,但是為了方便觀察數(shù)據(jù)結(jié)構(gòu)、辨識異常數(shù)據(jù)、對數(shù)據(jù)增加部分提示信息,爬取后的數(shù)據(jù)進(jìn)行了一些處理之后,保存成了csv格式,包括省份數(shù)據(jù)(province.csv)、城市數(shù)據(jù)(city.csv)、各個(gè)城市最近24小時(shí)整點(diǎn)天氣數(shù)據(jù)(passed_weather_ALL.csv)。由于所有城市過去24小時(shí)整點(diǎn)天氣數(shù)據(jù)數(shù)量太多,為了避免內(nèi)存不足,每爬取50個(gè)城市的數(shù)據(jù)后,就會進(jìn)行一次保存。

3.實(shí)驗(yàn)環(huán)境搭建

3.1.Windows安裝Hadoop

3.1.1.Hadoop簡介

Apache Hadoop是一款支持?jǐn)?shù)據(jù)密集型分布式應(yīng)用程序并以Apache 2.0許可協(xié)議發(fā)布的開源軟件框架,有助于使用許多計(jì)算機(jī)組成的網(wǎng)絡(luò)來解決數(shù)據(jù)、計(jì)算密集型的問題?;?/span>MapReduce計(jì)算模型,它為大數(shù)據(jù)分布式存儲與處理提供了一個(gè)軟件框架。所有的Hadoop模塊都有一個(gè)基本假設(shè),即硬件故障是常見情況,應(yīng)該由框架自動處理。

Apache Hadoop的核心模塊分為存儲和計(jì)算模塊,前者被稱為Hadoop分布式文件系統(tǒng)(HDFS),后者即MapReduce計(jì)算模型。Hadoop框架先將文件分成數(shù)據(jù)塊并分布式地存儲在集群的計(jì)算節(jié)點(diǎn)中,接著將負(fù)責(zé)計(jì)算任務(wù)的代碼傳送給各節(jié)點(diǎn),讓其能夠并行地處理數(shù)據(jù)。這種方法有效利用了數(shù)據(jù)局部性,令各節(jié)點(diǎn)分別處理其能夠訪問的數(shù)據(jù)。與傳統(tǒng)的超級計(jì)算機(jī)架構(gòu)相比,這使得數(shù)據(jù)集的處理速度更快、效率更高。

3.1.2.安裝Java開發(fā)環(huán)境

打開cmd窗口并輸入java -version,若能成功顯示Java JDK的版本號則代表java環(huán)境已安裝成功,否則需要安裝Java JDK。

3.1.3.下載安裝Hadoop所需要的文件

Hadoop3.1.1版本的安裝包:https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.0.tar.gz。Windows環(huán)境安裝所需的bin:https://github.com/s911415/apache-hadoop-3.1.1-winutils。Hadoop是在Linux下編寫的,winutil主要用于模擬Linux下的目錄環(huán)境。所以Hadoop放在Windows下運(yùn)行的時(shí)候,需要這個(gè)輔助程序才能運(yùn)行。

3.1.4.替換原安裝包的bin目錄

hadoop-3.1.1解壓后文件夾的路徑為D:\hadoop-3.1.1。apache-hadoop-3.1.1-winutils-master這個(gè)文件夾解壓后里面只有bin這一個(gè)文件夾,我們將這個(gè)bin文件夾復(fù)制到hadoop-3.1.1文件夾中替換原有的bin文件夾。

3.1.5.配置Hadoop環(huán)境變量

新建系統(tǒng)變量,變量名填HADOOP_HOME,變量值填hadoop-3.1.1對應(yīng)的路徑(比如我的是D:\hadoop-3.1.1。順便可以檢查JAVA_HOME有沒有配置好,后面會用到)。然后點(diǎn)擊Path變量進(jìn)行編輯,在最前面加上%HADOOP_HOME%\bin。

3.1.6.檢查環(huán)境變量是否配置成功

配置好環(huán)境變量后,win+R 輸入cmd打開命令提示符,然后輸入hadoop version,按回車,如果出現(xiàn)如圖所示版本號,則說明安裝成功。

基于Spark的氣象數(shù)據(jù)分析

3.1.7.配置Hadoop的配置文件

3.1.7.1.配置core-site.xml文件

代碼 2 配置core-site.xml文件

<configuration>

?????????????? <property>

?????????????????????????????? <name>fs.defaultFS</name>

?????????????????????????????? <value>hdfs://localhost:900</value>

?????????????? </property>

</configuration>

3.1.7.2.配置mapred-site.xml文件

代碼 3 配置mapred-site.xml文件

<configuration>??

?????????????? <property>??????

?????????????? <name>mapreduce.framework.name</name>??????

?????????????? <value>yarn</value>??

?????????????? </property>

</configuration>

3.1.7.3.配置yarn-site.xml文件

代碼 4 配置yarn-site.xml文件

<configuration>

?????????????? <property>

?????????????????????????????? <name>yarn.nodemanager.aux-services</name>

?????????????????????????????? <value>mapreduce_shuffle</value>

?????????????? </property>

?????????????? <property>

?????????????????????????????? <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>

?????????????????????????????? <value>org.apache.hadoop.mapred.ShuffleHandler</value>

?????????????? </property>

</configuration>

3.1.7.4.配置hdfs-site.xml文件

在D:\hadoop-3.1.1創(chuàng)建data文件夾(也可以是別的名字,但后面配置要對應(yīng)修改),在data2020文件夾中(D:\hadoop-3.1.1\data)創(chuàng)建datanode和namenode文件夾。之后打開hdfs-site.xml文件:

代碼 5 配置hdfs-site.xml文件

<configuration>

?????????????? <property>??????

?????????????? <name>dfs.replication</name>??????

?????????????? <value>1</value>??

?????????????? </property>??

?????????????? <property>??????

?????????????? <name>dfs.namenode.name.dir</name>??????

?????????????? <value>D:\hadoop-3.1.1\data\namenode</value>

?????????????? </property>??

?????????????? <property>??????

?????????????? <name>dfs.datanode.data.dir</name>????

?????????????? <value>D:\hadoop-3.1.1\data\datanode</value>

?????????????? </property>

</configuration>

3.1.7.5.配置hadoop-env.sh文件

打開hadoop-env.sh,使用查找功能(Ctrl+F)查找export JAVA_HOME,找到相應(yīng)的位置,在#export JAVA_HOME=下面一行配置自己電腦上對應(yīng)的JAVA_HOME/bin路徑(JAVA_HOME的具體路徑在環(huán)境變量中查找到)。

3.1.7.6.配置hadoop-env.cmd文件

打開hadoop-env.cmd文件后使用查找功能(Ctrl+F),輸入@rem The java implementation to use查找到對應(yīng)行,在set JAVA_HOME那一行將自己的JAVA_HOME路徑配置上去。

3.1.8.啟動Hadoop服務(wù)

使用管理員模式進(jìn)入sbin目錄,輸入start-all.cmd啟動Hadoop服務(wù):

基于Spark的氣象數(shù)據(jù)分析

接著在瀏覽器中訪問http://localhost:9870,如果成功出現(xiàn)以下界面則代表Hadoop安裝和配置完成:

基于Spark的氣象數(shù)據(jù)分析

3.2.Windows安裝Spark

3.2.1.Spark簡介

Spark最初由美國加州伯克利大學(xué)的AMP 實(shí)驗(yàn)室于2009年開發(fā),是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架,可用于構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序。 Spark于2013年加入Apache孵化器項(xiàng)目后發(fā)展迅猛,如今已成為Apache軟件基金會最重要的三大分布式計(jì)算系統(tǒng)開源項(xiàng)目之一(Hadoop、Spark、Storm)。值得一提的是,Spark在2014年打破了Hadoop保持的基準(zhǔn)排序記錄。Spark使用206個(gè)節(jié)點(diǎn),用時(shí)23分鐘完成了100TB數(shù)據(jù)的排序,而Hadoop使用了2000個(gè)節(jié)點(diǎn),耗時(shí)72分鐘,完成了100TB數(shù)據(jù)的排序。也就是說Spark用十分之一的計(jì)算資源,獲得了比Hadoop快3倍的速度。

Spark具有如下幾個(gè)主要特點(diǎn):運(yùn)行速度快:使用DAG執(zhí)行引擎以支持循環(huán)數(shù)據(jù)流與內(nèi)存計(jì)算;容易使用:支持使用Scala、Java、Python和R語言進(jìn)行編程,可以通過 Spark Shell進(jìn)行交互式編程;通用性:Spark提供了完整而強(qiáng)大的技術(shù)棧,包括SQL查詢、流式計(jì)算、機(jī)器學(xué)習(xí)和圖算法組件;運(yùn)行模式多樣:可運(yùn)行于獨(dú)立的集群模式中,可運(yùn)行于Hadoop中,也可運(yùn)行于Amazon EC2等云環(huán)境中,并且可以訪問HDFS、Cassandra、HBase和Hive等多種數(shù)據(jù)源。

傳統(tǒng)大數(shù)據(jù)處理框架Hadoop存在如下一些缺點(diǎn):表達(dá)能力有限;磁盤IO開銷大;延遲高;任務(wù)之間的銜接涉及IO開銷;在前一個(gè)任務(wù)執(zhí)行完成之前,其他任務(wù)就無法開始,難以勝任復(fù)雜、多階段的計(jì)算任務(wù)。Spark在借鑒Hadoop MapReduce優(yōu)點(diǎn)的同時(shí),很好地解決了MapReduce所面臨的問題。相比于Hadoop MapReduce,Spark主要具有如下優(yōu)點(diǎn):Spark的計(jì)算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種數(shù)據(jù)集操作類型,編程模型比Hadoop MapReduce更靈活;Spark提供了內(nèi)存計(jì)算,可將中間結(jié)果放到內(nèi)存中,對于迭代運(yùn)算效率更高;Spark基于DAG的任務(wù)調(diào)度執(zhí)行機(jī)制,要優(yōu)于Hadoop MapReduce的迭代執(zhí)行機(jī)制。

基于Spark的氣象數(shù)據(jù)分析

Spark的運(yùn)行架構(gòu)包括集群管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)。其中,集群管理器可以是Spark自帶的資源管理器,也可以是YARN或Mesos等資源管理框架。與Hadoop MapReduce計(jì)算框架相比,Spark所采用的Executor有兩個(gè)優(yōu)點(diǎn)。一是利用多線程來執(zhí)行具體的任務(wù)(HadoopMapReduce采用的是進(jìn)程模型),減少任務(wù)的啟動開銷。二是 Executor中有一個(gè)BlockManager存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,當(dāng)需要多輪迭代計(jì)算時(shí),可以將中間結(jié)果存儲到這個(gè)存儲模塊里,下次需要時(shí)就可以直接讀該存儲模塊里的數(shù)據(jù),而不需要讀寫到 HDFS 等文件系統(tǒng)里,因而有效減少了 IO 開銷;或者在交互式查詢場景下,Executor預(yù)先將表緩存到該存儲系統(tǒng)上,從而可以提高讀寫IO的性能。

Spark運(yùn)行基本流程如下圖所示,流程如下。

(1)當(dāng)一個(gè)Spark應(yīng)用被提交時(shí),首先需要為這個(gè)應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由任務(wù)控制節(jié)點(diǎn)(Driver)創(chuàng)建一個(gè)SparkContext,由SparkContext負(fù)責(zé)和資源管理器—Cluster Manager的通信,以及進(jìn)行資源的申請、任務(wù)的分配和監(jiān)控等。SparkContext 會向資源管理器注冊并申請運(yùn)行Executor的資源。

(2)資源管理器為Executor分配資源,并啟動Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上。

(3)SparkContext 根據(jù) RDD 的依賴關(guān)系構(gòu)建 DAG,并將 DAG 提交給 DAG 調(diào)度器(DAGScheduler)進(jìn)行解析,將DAG分解成多個(gè)“階段”(每個(gè)階段都是一個(gè)任務(wù)集),并且計(jì)算出各個(gè)階段之間的依賴關(guān)系,然后把一個(gè)個(gè)“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進(jìn)行處理;Executor 向 SparkContext 申請任務(wù),任務(wù)調(diào)度器將任務(wù)分發(fā)給 Executor 運(yùn)行,同時(shí)SparkContext將應(yīng)用程序代碼發(fā)放給Executor。

(4)任務(wù)在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器,然后反饋給DAG調(diào)度器,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源。

基于Spark的氣象數(shù)據(jù)分析

Spark的核心建立在統(tǒng)一的抽象RDD之上,這使得Spark的各個(gè)組件可以無縫地進(jìn)行集成,在同一個(gè)應(yīng)用程序中完成大數(shù)據(jù)計(jì)算任務(wù)。一個(gè)RDD就是一個(gè)分布式對象集合,本質(zhì)上是一個(gè)只讀的分區(qū)記錄集合。每個(gè)RDD可以分成多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,并且一個(gè) RDD 的不同分區(qū)可以被保存到集群中不同的節(jié)點(diǎn)上,從而可以在集群中的不同節(jié)點(diǎn)上進(jìn)行并行計(jì)算。RDD提供了一組豐富的操作以支持常見的數(shù)據(jù)運(yùn)算,分為“行動”(Action)和“轉(zhuǎn)換”(Transformation)兩種類型,前者用于執(zhí)行計(jì)算并指定輸出的形式,后者指定RDD之間的相互依賴關(guān)系。兩類操作的主要區(qū)別是,轉(zhuǎn)換操作(如map、filter、groupBy、join等)接受RDD并返回RDD,而行動操作(如count、collect等)接受RDD但是返回非RDD(即輸出一個(gè)值或結(jié)果)。RDD采用了惰性調(diào)用,即在RDD的執(zhí)行過程中,真正的計(jì)算發(fā)生在RDD的“行動”操作,對于“行動”之前的所有“轉(zhuǎn)換”操作,Spark只是記錄下“轉(zhuǎn)換”操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集以及RDD生成的軌跡,即相互之間的依賴關(guān)系,而不會觸發(fā)真正的計(jì)算。

基于Spark的氣象數(shù)據(jù)分析

3.2.2.下載Spark

官網(wǎng)下載Spark2.4.3版本:Spark Release 2.4.3 | Apache Spark。下載完解壓,和Hadoop3.1.1和winutils放在同一個(gè)盤或者目錄下,方便以后的管理。

3.2.3.配置Spark環(huán)境變量

同之前配置Hadoop一樣,需要配置SPARK_HOME和Path。

3.2.4.拷貝pyspark

進(jìn)入spark安裝目錄,將pyspark復(fù)制到Python環(huán)境的Lib\site-packages目錄下。

3.2.5.安裝py4j

Py4J 是一個(gè)用?Python?Java編寫的庫?,通過 Py4JPython程序能夠動態(tài)訪問Java虛擬機(jī)中的Java對象,Java程序也能夠回調(diào) Python對象。使用pip install py4j安裝即可。

3.2.6.查看Spark是否安裝成功

打開cmd窗口,輸入spark-shell,出現(xiàn)以下內(nèi)容說明配置成功:

基于Spark的氣象數(shù)據(jù)分析

4.基于Spark的數(shù)據(jù)分析與可視化

4.1.分析降水?dāng)?shù)據(jù)

我們首先計(jì)算各個(gè)城市過去24個(gè)小時(shí)的累積降水量。思路是按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的rain1h字段進(jìn)行分組求和。相關(guān)步驟如下:

(1)創(chuàng)建SparkSession對象spark;

(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;

(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province、city_name、city_code、rain1h字段,并使用Column對象的cast(dateType)方法將rain1h轉(zhuǎn)成數(shù)值型,再使用Dateframe的filter方法篩選出rain1h小于1000的記錄(大于1000是異常數(shù)據(jù)),得到新的Dateframe df_rain;

(4)對df_rain進(jìn)行操作:使用Dateframe的groupBy操作按照province、city_name、city_code的字段分組,使用agg方法對rain1h字段進(jìn)行分組求和得到新的字段rain24h(過去24小時(shí)累積雨量),使用sort方法按照rain24h降序排列,經(jīng)過上述操作得到新的Dateframe df_rain_sum;

(5)對df_rain_sum調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存,提高性能;

(6)對df_rain_sum調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件;

(7)對df_rain_sum調(diào)用head()方法取前若干條數(shù)據(jù)(即24小時(shí)累積降水量Top-N的列表)供數(shù)據(jù)可視化使用。

代碼 6 計(jì)算各城市過去24小時(shí)累積雨量

def passed_rain_analyse(filename):

??? print("開始分析累積降雨量")

??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

??? df = spark.read.csv(filename, header=True)

??? df_rain = df.select(df['province'], df['city_name'], df['city_code'],

??????????????????????? df['rain1h'].cast(DecimalType(scale=1))).filter(df['rain1h'] < 1000)

??? # 篩選數(shù)據(jù),去除無效數(shù)據(jù),一小時(shí)降水大于1000視為無效

??? df_rain_sum = df_rain.groupBy("province", "city_name", "city_code").agg(F.sum("rain1h").alias("rain24h")).sort(

??????? F.desc("rain24h"))? # 分組、求和、排序

??? df_rain_sum.coalesce(1).write.csv("passed_rain_analyse.csv")

??? print("累積降雨量分析完畢!")

??? return df_rain_sum.head(20)? # 20個(gè)

4.2.分析氣溫?cái)?shù)據(jù)

根據(jù)國家標(biāo)準(zhǔn)(《地面氣象服務(wù)觀測規(guī)范》),日平均氣溫取四時(shí)次數(shù)據(jù)的平均值,四時(shí)次數(shù)據(jù)為:02時(shí)、08時(shí)、14時(shí)、20時(shí)。據(jù)此,應(yīng)該先篩選出各個(gè)時(shí)次的氣溫?cái)?shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的tempeature字段進(jìn)行分組求平均。相關(guān)步驟如下:

(1)創(chuàng)建SparkSession對象spark;

(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;

(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province,city_name,city_code,temperature字段,并使用庫pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)將time字段轉(zhuǎn)換成date(日期)字段和hour(小時(shí))字段,(time字段的分秒信息無用),,得到新的Dateframe df_temperature;

(4)對df_temperature進(jìn)行操作:使用Dateframe的filter操作過濾出hour字段在[2,8,14,20]中的記錄,經(jīng)過上述操作得到新的Dateframe df_4point_temperature;

(5)對df_4point_temperature進(jìn)行操作:使用Dateframe的groupBy操作按照province、city_name、city_code、date字段分組,使用agg方法對temperature字段進(jìn)行分組計(jì)數(shù)和求和(求和字段命名為avg_temperature),使用filter方法過濾出分組計(jì)數(shù)為4的記錄(確保有4個(gè)時(shí)次才能計(jì)算日平均溫),使用sort方法按照avg_temperature進(jìn)行排列,desc是降序,asc是升序;再篩選出需要保存的字段province、city_name、city_code、date,avg_temperature(順便使用庫pyspark.sql.functions中的format_number(col, precision)方法保留一位小數(shù)),經(jīng)過上述操作得到新的Dateframe df_avg_temperature;

(6)對df_avg_temperature調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存,提高性能;

(7)對df_avg_temperature調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件;

(8)對df_rain_sum調(diào)用collect()方法取將Dateframe轉(zhuǎn)換成list,方便后續(xù)進(jìn)行數(shù)據(jù)可視化。

代碼 7 計(jì)算各城市過去24小時(shí)平均氣溫

def passed_temperature_analyse(filename):

??? print("開始分析氣溫")

??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

??? df = spark.read.csv(filename, header=True)

??? df_temperature = df.select(? # 選擇需要的列

??????? df['province'],

??????? df['city_name'],

??????? df['city_code'],

??????? df['temperature'].cast(DecimalType(scale=1)),? # 轉(zhuǎn)換為十進(jìn)制類型,并將小數(shù)點(diǎn)后的位數(shù)保留1

??????? F.date_format(df['time'], "yyyy-MM-dd").alias("date"),? # 得到日期數(shù)據(jù)

??????? F.hour(df['time']).alias("hour")? # 得到小時(shí)數(shù)據(jù),命名為hour

??? ).filter(df['temperature'] < 1000)

??? # 篩選四點(diǎn)時(shí)次

??? df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))

??? df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date").agg(

??????? F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(

??????? F.asc("avg_temperature")).select("province", "city_name", "city_code", "date",

???????????????????????????????????????? F.format_number('avg_temperature', 1).alias("avg_temperature"))

??? df_avg_temperature.show()

??? avg_temperature_list = df_avg_temperature.collect()

??? df_avg_temperature.coalesce(1).write.csv("passed_temperature.csv")

??? print("氣溫分析完畢")

??? return avg_temperature_list[0:20]? # 20個(gè)

我們可以使用同樣的方法得到過去24小時(shí)各省級行政區(qū)的平均氣溫,只需修改groupby語句即可(修改為groupBy("province"))。

4.3.分析氣壓數(shù)據(jù)

先篩選出各個(gè)時(shí)次的氣壓數(shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的pressure字段進(jìn)行分組求平均。相關(guān)步驟如下:

(1)創(chuàng)建SparkSession對象spark;

(2)使用spark.read.csv(filename)讀取passed_weather_ALL.csv數(shù)據(jù)生成Dateframe df;

(3)對df進(jìn)行操作:使用Dateframe的select方法選擇province、city_name、city_code、pressure字段,并使用庫pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)將time字段轉(zhuǎn)換成date(日期)字段和hour(小時(shí))字段,(time字段的分秒信息無用),,得到新的Dateframe df_pressure;

(4)對df_pressure進(jìn)行操作:使用Dateframe的filter操作過濾出hour字段在[2,8,14,20]中的記錄,經(jīng)過上述操作得到新的Dateframe df_4point_pressure;

(5)對df_4point_pressure進(jìn)行操作:使用Dateframe的groupBy操作按照province,city_name,city_code,date字段分組,使用agg方法對pressure字段進(jìn)行分組計(jì)數(shù)和求和(求和字段命名為avg_pressure),使用filter方法過濾出分組計(jì)數(shù)為4的記錄(確保有4個(gè)時(shí)次才能計(jì)算日平均溫),使用sort方法按照avg_pressure進(jìn)行排列,desc是降序,asc是升序;再篩選出需要保存的字段province、city_name、city_code、date,avg_pressure(順便使用庫pyspark.sql.functions中的format_number(col, precision)方法保留一位小數(shù)),經(jīng)過上述操作得到新的Dateframe df_avg_pressure;

(6)對df_avg_pressure調(diào)用cache()方法將此前的轉(zhuǎn)換關(guān)系進(jìn)行緩存以提高性能;

(7)對df_avg_pressure調(diào)用coalesce()將數(shù)據(jù)分區(qū)數(shù)目減為1,并使用write.csv(filename)方法將得到的數(shù)據(jù)持久化到本地文件。

代碼 8 計(jì)算各城市過去24小時(shí)平均氣壓

def passed_pressure_analyse(filename):

??? print("開始分析氣壓")

??? spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

??? df = spark.read.csv(filename, header=True)

??? df_pressure = df.select(? # 選擇需要的列

??????? df['province'],

??????? df['city_name'],

??????? df['city_code'],

??????? df['pressure'].cast(DecimalType(scale=1)),

??????? F.date_format(df['time'], "yyyy-MM-dd").alias("date"),? # 得到日期數(shù)據(jù)

??????? F.hour(df['time']).alias("hour")? # 得到小時(shí)數(shù)據(jù)

??? )

??? df_4point_pressure = df_pressure.filter(df_pressure['hour'].isin([2, 8, 14, 20]))

??? df_avg_pressure = df_4point_pressure.groupBy("province", "city_name", "city_code", "date").agg(

??????? F.count("pressure"), F.avg("pressure").alias("avg_pressure")).sort(

??????? F.asc("avg_pressure")).select("province", "city_name", "city_code", "date",

????????????????????????????????????? F.format_number('avg_pressure', 1).alias("avg_pressure"))

??? avg_pressure_list = df_avg_pressure.collect()

??? df_avg_pressure.coalesce(1).write.csv("passed_pressure.csv")

??? print("氣壓分析完畢")

??? return avg_pressure_list[0:20]? # 最低的20個(gè)

4.4.分析風(fēng)力數(shù)據(jù)

我們首先篩選出各個(gè)時(shí)次的風(fēng)向數(shù)據(jù),再按照城市對數(shù)據(jù)進(jìn)行分組,對每個(gè)城市的windDirection字段進(jìn)行分組求平均。相關(guān)步驟與求平均溫度與平均氣壓相似。注意到風(fēng)向字段是一個(gè)0到360的數(shù),我們可以將它劃分為八個(gè)風(fēng)向(北風(fēng)、東北風(fēng)、東風(fēng)、東南風(fēng)、南風(fēng)、西南風(fēng)、西風(fēng)和西北風(fēng)),每個(gè)風(fēng)向區(qū)間大小為45°,以便進(jìn)行后續(xù)的分析與可視化。

4.5.繪制累積降水量柱狀圖

首先draw_rain(rain_list)函數(shù)遍歷傳入的降雨列表,將每個(gè)城市的名稱和24小時(shí)降雨量分別添加到name_list和num_list中。接著,設(shè)置一個(gè)索引列表,用于設(shè)置每個(gè)城市的柱狀圖位置。之后創(chuàng)建圖形,并使用plt.bar()函數(shù)繪制柱狀圖。在繪制柱狀圖時(shí),color參數(shù)設(shè)置每個(gè)柱子的顏色, width參數(shù)設(shè)置柱子的寬度。plt.xticks()函數(shù)設(shè)置x軸刻度標(biāo)簽的位置和字體大小,plt.ylim()函數(shù)設(shè)置y軸刻度范圍,plt.xlabel()和plt.ylabel()函數(shù)設(shè)置x軸和y軸標(biāo)簽的名稱和字體大小,plt.title()函數(shù)設(shè)置圖形的標(biāo)題和字體大小。在繪制完柱狀圖后,使用for循環(huán)遍歷每個(gè)柱子,并使用plt.text()函數(shù)在柱子上方添加降雨量的數(shù)值。?

代碼 9 繪制各城市過去24小時(shí)累積降雨量圖

def draw_rain(rain_list):

??? print("開始繪制累積降雨量圖")

??? name_list = []

??? num_list = []

??? for item in rain_list:

??????? name_list.append(item.province[0:2] + '\n' + item.city_name)

??????? num_list.append(item.rain24h)

??? index = [i + 0.25 for i in range(0, len(num_list))]

??? plt.figure(figsize=(15, 15))? # 設(shè)置圖的大小

??? rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)

??? plt.xticks([i + 0.25 for i in index], name_list, fontsize=15, color='r')? # fontsize設(shè)置x刻度字體大小

??? plt.ylim(ymax=(int(max(num_list) + 100) / 100) * 60, ymin=0)? # 設(shè)置刻度間隔

??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小

??? plt.xlabel("城市", fontsize=25, color='darkblue')? # fontsize設(shè)置x坐標(biāo)標(biāo)簽字體大小

??? plt.ylabel("雨量", fontsize=25, color='darkblue')? # fontsize設(shè)置y坐標(biāo)標(biāo)簽字體大小

??? plt.title("202351124小時(shí)累計(jì)降雨量全國前20", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小

??? for rect in rects:

??????? height = rect.get_height()

??????? # fontsize設(shè)置直方圖上字體大小

??????? plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)

??? plt.show()

??? print("累積降雨量圖繪制完畢!")

基于Spark的氣象數(shù)據(jù)分析

4.6.繪制平均氣溫柱狀圖

draw_temperature函數(shù)用于繪制氣溫圖。函數(shù)首先創(chuàng)建兩個(gè)空列表name_list和num_list,并將temperature_list中每個(gè)元素的省份、城市名和平均氣溫分別添加到name_list和num_list中。接下來,創(chuàng)建一個(gè)索引列表index,其中每個(gè)元素都是當(dāng)前索引值加上0.25。然后通過用plt.figure創(chuàng)建指定大小的圖形,并使用plt.bar創(chuàng)建一個(gè)條形圖,其中每個(gè)條形的高度由num_list中的相應(yīng)元素確定。函數(shù)使用plt.xticks設(shè)置x軸刻度標(biāo)簽,其中每個(gè)標(biāo)簽都是name_list中的相應(yīng)元素。函數(shù)使用plt.ylim設(shè)置y軸刻度范圍,其中最大值是num_list中的最大值乘以3.3并向上取整,最小值為0。plt.yticks設(shè)置y軸刻度標(biāo)簽的字體大小和顏色,plt.xlabel和plt.ylabel設(shè)置x軸和y軸標(biāo)簽的字體大小和顏色,plt.title設(shè)置圖形標(biāo)題的字體大小和顏色。最后,使用plt.text在每個(gè)條形上添加高度標(biāo)簽,并使用plt.show顯示圖形。

代碼 10 繪制各城市過去24小時(shí)平均氣溫圖

def draw_temperature(temperature_list):

??? print("開始繪制氣溫圖")

??? name_list = []

??? num_list = []

??? date = temperature_list[1].date

??? for item in temperature_list:

??????? name_list.append(item.province[0:2] + '\n' + item.city_name[0:2] + '\n' + item.city_name[2:])

??????? num_list.append(float(item.avg_temperature))

??? index = [i + 0.25 for i in range(0, len(num_list))]

??? plt.figure(figsize=(20, 12))? # 設(shè)置圖的大小

??? rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)

??? plt.xticks([i + 0.25 for i in index], name_list, fontsize=20, color='r')? # fontsize設(shè)置x刻度字體大小

??? plt.ylim(ymax=math.ceil(float(max(num_list))) * 3.3, ymin=0)? # 設(shè)置刻度間隔

??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小

??? plt.xlabel("城市", fontsize=25, color='darkblue')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小

??? plt.ylabel("日平均氣溫", fontsize=25, color='darkblue')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小

??? plt.title("2023-5-11全國日平均氣溫最低前20", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小

??? for rect in rects:

??????? height = rect.get_height()

??????? plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)

??? plt.show()

??? print("氣溫圖繪制完畢!")

?基于Spark的氣象數(shù)據(jù)分析

基于Spark的氣象數(shù)據(jù)分析

4.7.繪制平均氣壓折線圖

draw_pressure函數(shù)用來繪制全國日平均氣壓最低前20名的城市的氣壓圖。首先循環(huán)遍歷數(shù)組中的每一行,將每個(gè)城市的名稱和對應(yīng)的日平均氣壓值分別添加到兩個(gè)列表中。接下來使用matplotlib庫中的plot函數(shù)繪制了氣壓圖,并設(shè)置了圖例、坐標(biāo)軸標(biāo)簽、刻度字體大小等屬性。最后,循環(huán)遍歷列表中的每個(gè)城市,將其對應(yīng)的氣壓值添加到圖中,并設(shè)置了數(shù)字標(biāo)簽。

代碼 11 繪制各城市過去24小時(shí)平均氣壓圖

def draw_pressure(file_address):

??? print("開始繪制氣壓圖")

??? name_list = []

??? num_list = []

??? df = pd.read_csv(file_address, header=None)

??? data = df.to_numpy()

??? print(data)

??? for i in data:

??????? name_list.append(i[0][0:2] + '\n' + i[1][0:2] + '\n' + i[1][2:])

??????? num_list.append(float(i[4]))

  

??? plt.figure(figsize=(15, 12))

??? plt.plot(name_list, num_list, label='日平均氣壓', alpha=0.8, mfc='r', ms=30, mec='b', marker='.', linewidth=5,

???????????? linestyle="--")

??? plt.xticks(fontsize=20, color='r')? # fontsize設(shè)置x刻度字體大小

??? plt.yticks(fontsize=20, color='r')? # fontsize設(shè)置y刻度字體大小

??? plt.xlabel("城市", fontsize=25, color='b')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小

??? plt.ylabel("日平均氣壓", fontsize=25, color='b')? # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小

??? # 設(shè)置數(shù)字標(biāo)簽

??? for a, b in zip(name_list, num_list):

??????? plt.text(a, b + 0.5, int(b), ha='center', va='bottom', fontsize=20)

??? plt.title("2023511日全國日平均氣壓最低前20", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小

??? plt.rcParams.update({'font.size': 20})

??? plt.legend(bbox_to_anchor=(1, 0.1))? # 圖例移到右下角

??? plt.show()

??? print("氣壓圖繪制完畢!")

?基于Spark的氣象數(shù)據(jù)分析

4.8.繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖

draw_windDirection用于繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖。首先定義了一個(gè)字典,用于存儲各個(gè)方向的風(fēng)的數(shù)量。接著提取出風(fēng)向數(shù)據(jù),并將其轉(zhuǎn)換為浮點(diǎn)數(shù)類型。然后計(jì)算了數(shù)據(jù)的總數(shù),并使用一個(gè)for循環(huán)遍歷數(shù)據(jù)列表,將每個(gè)數(shù)據(jù)點(diǎn)分配到相應(yīng)的風(fēng)向類別中。接下來,將每個(gè)風(fēng)向類別的數(shù)量存儲在一個(gè)列表中,并定義了每個(gè)類別的標(biāo)簽和顏色。然后計(jì)算了每個(gè)類別的百分比,并定義了一個(gè)突出模塊的偏移值。最后使用matplotlib庫中的pie函數(shù)繪制了一個(gè)餅圖,并設(shè)置標(biāo)題和字體大小。

代碼 12 繪制全國風(fēng)向統(tǒng)計(jì)餅狀圖

def draw_windDirection(file_address):

??? dict = {'bei': 0, 'dongbei': 0, 'dong': 0, 'dongnan': 0, 'nan': 0, 'xinan': 0, 'xi': 0, 'xibei': 0}

??? df = pd.read_csv(file_address)

??? data = df.to_numpy()

??? list = data[:, -1]

??? list = [float(x) for x in list]

??? Num = df.shape[0]? # 4546

??? for i in list:

??????? if 0 <= i <= 22.5 or 337.5 <= i <= 360:

??????????? dict['bei'] += 1

??????? if 22.5 < i <= 67.5:

??????????? dict['dongbei'] += 1

??????? if 67.5 < i <= 112.5:

??????????? dict['dong'] += 1

??????? if 112.5 < i <= 157.5:

??????????? dict['dongnan'] += 1

??????? if 157.5 < i <= 202.5:

??????????? dict['nan'] += 1

??????? if 202.5 < i <= 247.5:

??????????? dict['xinan'] += 1

??????? if 247.5 < i <= 292.5:

??????????? dict['xi'] += 1

??????? if 292.5 < i <= 337.5:

??????????? dict['xibei'] += 1

??? # {'bei': 286, 'dongbei': 283, 'dong': 405, 'dongnan': 644, 'nan': 1017, 'xinan': 931, 'xi': 650, 'xibei': 330}

??? data = [dict['bei'], dict['dongbei'], dict['dong'], dict['dongnan'], dict['nan'], dict['xinan'], dict['xi'],

??????????? dict['xibei']]

??? # 數(shù)據(jù)標(biāo)簽

??? labels = ['北風(fēng)', '東北風(fēng)', '東風(fēng)', '東南風(fēng)', '南風(fēng)', '西南風(fēng)', '西風(fēng)', '西北風(fēng)']

??? # 各區(qū)域顏色

??? colors = ['lightcoral', 'orange', 'yellow', 'yellowgreen', 'plum', 'cyan', 'hotpink', 'silver']

??? # 數(shù)據(jù)計(jì)算處理

??? sizes = [data[0] / Num, data[1] / Num, data[2] / Num, data[3] / Num, data[4] / Num,

???????????? data[5] / Num, data[6] / Num, data[7] / Num]

??? # 設(shè)置突出模塊偏移值

??? expodes = (0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0)

??? plt.figure(figsize=(15, 15))

??? # 設(shè)置繪圖屬性并繪圖

??? plt.pie(sizes, explode=expodes, labels=labels, shadow=True, colors=colors, autopct='%.1f%%',

??????????? textprops={'fontsize': 22})

??? plt.axis('equal')

??? plt.title("2023511日全國風(fēng)向統(tǒng)計(jì)圖", fontsize=30, color='b')? # fontsize設(shè)置標(biāo)題字體大小

??? plt.show()

?基于Spark的氣象數(shù)據(jù)分析

4.9.繪制全國各省平均氣溫地圖

draw_province_temperature函數(shù)用來繪制全國各省平均氣溫地圖。首先定義了一個(gè)包含各省名稱和平均氣溫的列表,然后使用pyecharts庫中的Map函數(shù)創(chuàng)建地圖,設(shè)置了地圖的初始高度和寬度,并添加了一個(gè)名為“氣溫”的系列,數(shù)據(jù)對為前面定義的列表,地圖類型為china,啟用了鼠標(biāo)滾輪縮放和拖動平移,不顯示圖形標(biāo)記。接下來設(shè)置了全局選項(xiàng),包括標(biāo)題、數(shù)據(jù)標(biāo)準(zhǔn)顯示和可視化映射選項(xiàng),其中標(biāo)題包括主標(biāo)題、副標(biāo)題和位置,數(shù)據(jù)標(biāo)準(zhǔn)顯示設(shè)置了最大值和最小值,可視化映射選項(xiàng)可以設(shè)置顏色范圍。最后設(shè)置了系列選項(xiàng),包括標(biāo)簽名稱顯示和顏色。最后使用render函數(shù)將地圖渲染為html文件?。

代碼 13 繪制全國各省平均氣溫地圖

Map(init_opts=opts.InitOpts(height="1000px", width="1500px")).add(

??? series_name="氣溫",

??? data_pair=tempreture,

??? maptype="china",

??? is_roam=True,

??? is_map_symbol_show=False,

).set_global_opts(

??? title_opts=opts.TitleOpts(title="2023-5-11全國各省平均氣溫",

????????????????????????????? subtitle="數(shù)據(jù)來源:中央氣象臺網(wǎng)站",

??????????? ??????????????????pos_right="center",

????????????????????????????? pos_top="5%"),

??? visualmap_opts=opts.VisualMapOpts(max_=30,

????????????????????????????????????? min_=0),

  ).set_series_opts(

??? label_opts=opts.LabelOpts(is_show=True, color="blue")

).render("2023-5-11全國各省平均氣溫.html")

?基于Spark的氣象數(shù)據(jù)分析

4.10.繪制武漢市主城區(qū)一天風(fēng)級雷達(dá)圖

draw_windSpeed函數(shù)用于繪制武漢市主城區(qū)一天內(nèi)不同風(fēng)向下的風(fēng)速平均值的極區(qū)圖。degs = np.arange(45, 361, 45)定義了一個(gè)包含45度到360度之間每隔45度的角度的numpy數(shù)組,定義空列表temp用于存儲不同風(fēng)向下的風(fēng)速平均值。之后對于每個(gè)角度deg,循環(huán)遍歷24小時(shí)內(nèi)的風(fēng)向數(shù)據(jù)windd,如果windd[i]等于deg,則將對應(yīng)的風(fēng)速數(shù)據(jù)wind_speed[i]添加到speed列表中。如果speed列表為空,則將0添加到temp列表中,否則將speed列表中的所有元素求和并除以元素個(gè)數(shù),將結(jié)果添加到temp列表中。theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8) 定義了一個(gè)整數(shù)N和一個(gè)包含N個(gè)元素的numpy數(shù)組theta,用于繪制極區(qū)圖。radii = np.array(temp) 將temp列表轉(zhuǎn)換為numpy數(shù)組radii,用于繪制極區(qū)圖。plt.axes(polar=True)定義了一個(gè)極坐標(biāo)系,colors = plt.cm.viridis(np.random.rand(N)) 定義了一個(gè)包含N個(gè)元素的numpy數(shù)組colors,用于設(shè)置每個(gè)扇區(qū)的顏色。plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors) 繪制了極區(qū)圖,其中theta和radii分別表示角度和半徑,width表示每個(gè)扇區(qū)的寬度,bottom表示每個(gè)扇區(qū)的起始位置,color表示每個(gè)扇區(qū)的顏色。plt.title設(shè)置了極區(qū)圖的標(biāo)題,其中x表示標(biāo)題的水平位置,fontsize表示標(biāo)題的字體大小,loc表示標(biāo)題的位置?。

代碼 14 繪制武漢市主城區(qū)一天風(fēng)級雷達(dá)圖

def draw_windSpeed(file):

??? data = pd.read_csv(file)

??? windd = []

??? wind = list(data['windDirection'])

??? wind_speed = list(data['windSpeed'])

??? for i in wind:

??????? if i > 360:

??????????? windd.append(0)

??????? if 0 <= i <= 22.5 or 337.5 <= i <= 360:

??????????? windd.append(90)

??????? if 22.5 < i <= 67.5:

??????????? windd.append(45)

??????? if 67.5 < i <= 112.5:

??????????? windd.append(360)

??????? if 112.5 < i <= 157.5:

??????????? windd.append(315)

??????? if 157.5 < i <= 202.5:

??????????? windd.append(270)

??????? if 202.5 < i <= 247.5:

??????????? windd.append(225)

??????? if 247.5 < i <= 292.5:

??????????? windd.append(180)

??????? if 292.5 < i <= 337.5:

??????????? windd.append(135)

??? degs = np.arange(45, 361, 45)

??? temp = []

??? for deg in degs:

??????? speed = []

??????? for i in range(0, 24):

??????????? if windd[i] == deg:

??????????????? speed.append(wind_speed[i])

??????? if len(speed) == 0:

??????????? temp.append(0)

??????? else:

??????????? temp.append(sum(speed) / len(speed))

??? N = 8

??? theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8)

??? radii = np.array(temp)

??? plt.axes(polar=True)

??? colors = plt.cm.viridis(np.random.rand(N))

??? plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors)

??? plt.title('2023-5-12武漢市主城區(qū)一天風(fēng)級圖', x=0.2, fontsize=15, loc='left')

??? plt.show()

基于Spark的氣象數(shù)據(jù)分析

源代碼

spider.py

import urllib.request, urllib.error
import json
import csv
import os
import importlib, sys
importlib.reload(sys)

class Crawler:
    def get_html(self, url):
        headers = {
            'User-Agent': "Mozilla/5.0 (Windows; U; Windows NT 5.2) Gecko/2008070208 Firefox/3.0.1"
        }
        request = urllib.request.Request(url,headers=headers)
        response = urllib.request.urlopen(request)
        return response.read().decode()

    def parse_json(self, url):
        obj = self.get_html(url)
        if obj:
            json_obj = json.loads(obj)
        else:
            json_obj = list()
        return json_obj

    def write_csv(self, file, data):
        if data:
            print("開始寫入 " + file)
            with open(file, 'a+', encoding='utf-8-sig') as f:  # utf-8-sig  帶BOM的utf-8
                f_csv = csv.DictWriter(f, data[0].keys())
                # if not os.path.exists(file):
                f_csv.writeheader()
                f_csv.writerows(data)
            print("結(jié)束寫入 " + file)

    def write_header(self, file, data):
        if data:
            print("開始寫入 " + file)
            with open(file, 'a+', encoding='utf-8-sig') as f:
                f_csv = csv.DictWriter(f, data[0].keys())
                f_csv.writeheader()
                f_csv.writerows(data)
            print("結(jié)束寫入 " + file)

    def write_row(self, file, data):
        if data:
            print("開始寫入 " + file)
            with open(file, 'a+', encoding='utf-8-sig') as f:
                f_csv = csv.DictWriter(f, data[0].keys())
                if not os.path.exists(file):
                    f_csv.writeheader()
                f_csv.writerows(data)
            print("結(jié)束寫入 " + file)

    def read_csv(self, file):
        print("開始讀取 " + file)
        with open(file, 'r+', encoding='utf-8-sig') as f:
            data = csv.DictReader(f)
            print("結(jié)束讀取 " + file)
            return list(data)

    def get_provinces(self):
        province_file = 'C:/Users/26909/Desktop/province.csv'
        if not os.path.exists(province_file):
            print("開始爬取省份")
            provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
            print("省份爬取完畢!")
            self.write_csv(province_file, provinces)
        else:
            provinces = self.read_csv(province_file)
        return provinces

    def get_cities(self):
        city_file = 'input/city.csv'
        if not os.path.exists(city_file):
            cities = list()
            print("開始爬取城市")
            for province in self.get_provinces():
                url = province['url'].split('/')[-1].split('.')[0]
                cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/' + url))
            self.write_csv(city_file, cities)
            print("爬取城市完畢!")
        else:
            cities = self.read_csv(city_file)
        return cities

    def get_passed_weather(self, province):
        weather_passed_file = 'C:/Users/26909/Desktop/passed_weather_' + province + '.csv'
        if os.path.exists(weather_passed_file):
            return
        passed_weather = list()
        count = 0
        if province == 'ALL':
            print("開始爬取過去的天氣狀況")
            for city in self.get_cities():
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                        item['city_index'] = str(count)
                    passed_weather.extend(data)
                if count % 50 == 0:
                    if count == 50:
                        self.write_header(weather_passed_file, passed_weather)
                    else:
                        self.write_row(weather_passed_file, passed_weather)
                    passed_weather = list()
            if passed_weather:
                if count <= 50:
                    self.write_header(weather_passed_file, passed_weather)
                else:
                    self.write_row(weather_passed_file, passed_weather)
            print("爬取過去的天氣狀況完畢!")
        else:
            print("開始爬取過去的天氣狀況")
            select_city = filter(lambda x: x['province'] == province, self.get_cities())
            for city in select_city:
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_index'] = str(count)
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                    passed_weather.extend(data)
            self.write_csv(weather_passed_file, passed_weather)
            print("爬取過去的天氣狀況完畢!")

    def run(self, range='ALL'):
        self.get_passed_weather(range)

cr = Crawler()
cr.run('ALL')

?analysis.py文章來源地址http://www.zghlxwxcb.cn/news/detail-488567.html

import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
import matplotlib.pyplot as plt
import math
from pyecharts import options as opts
from pyecharts.charts import Map

# 解決中文顯示問題
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False


# 計(jì)算各個(gè)城市過去24小時(shí)累積雨量
def passed_rain_analyse(filename):
    print("開始分析累積降雨量")
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_rain = df.select(df['province'], df['city_name'], df['city_code'],
                        df['rain1h'].cast(DecimalType(scale=1))).filter(df['rain1h'] < 1000)
    # 篩選數(shù)據(jù),去除無效數(shù)據(jù),一小時(shí)降水大于1000視為無效
    df_rain_sum = df_rain.groupBy("province", "city_name", "city_code").agg(F.sum("rain1h").alias("rain24h")).sort(
        F.desc("rain24h"))  # 分組、求和、排序
    df_rain_sum.coalesce(1).write.csv("passed_rain_analyse.csv")
    print("累積降雨量分析完畢!")
    return df_rain_sum.head(20)  # 前20個(gè)


# 武漢過去24小時(shí)的風(fēng)速和風(fēng)向
def passed_windSpeed_analyse(filename):
    print("開始分析風(fēng)速")
    df = pd.read_csv(filename)
    df_windSpeed = df[df['city_name'] == '武漢']
    df_windSpeed.to_csv("wuhan_wind.csv")
    return df_windSpeed  # 前20個(gè)


# 計(jì)算各個(gè)城市過去24小時(shí)平均氣溫
def passed_temperature_analyse(filename):
    print("開始分析氣溫")
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_temperature = df.select(  # 選擇需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['temperature'].cast(DecimalType(scale=1)),  # 轉(zhuǎn)換為十進(jìn)制類型,并將小數(shù)點(diǎn)后的位數(shù)保留1位
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期數(shù)據(jù)
        F.hour(df['time']).alias("hour")  # 得到小時(shí)數(shù)據(jù),命名為hour
    ).filter(df['temperature'] < 1000)
    # 篩選四點(diǎn)時(shí)次
    df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))
    df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date").agg(
        F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(
        F.asc("avg_temperature")).select("province", "city_name", "city_code", "date",
                                         F.format_number('avg_temperature', 1).alias("avg_temperature"))
    df_avg_temperature.show()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.csv("passed_temperature.csv")
    print("氣溫分析完畢")
    return avg_temperature_list[0:20]  # 前20個(gè)


# 計(jì)算各省過去24小時(shí)平均氣溫
def province_temperature_analyse(filename):
    print("開始分析各省氣溫")
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_temperature = df.select(  # 選擇需要的列
        df['province'],
        df['temperature'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期數(shù)據(jù)
        F.hour(df['time']).alias("hour")  # 得到小時(shí)數(shù)據(jù)
    ).filter(df['temperature'] < 1000)
    # 篩選四點(diǎn)時(shí)次
    df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20]))
    df_avg_temperature = df_4point_temperature.groupBy("province").agg(
        F.count("temperature"), F.avg("temperature").alias("avg_temperature")).sort(
        F.asc("avg_temperature")).select("province", F.format_number('avg_temperature', 1).alias("avg_temperature"))
    df_avg_temperature.show()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.csv("province_temperature.csv")
    print("氣溫分析完畢")
    return avg_temperature_list[0:20]  # 最低的20個(gè)


# 計(jì)算各個(gè)城市過去24小時(shí)平均氣壓
def passed_pressure_analyse(filename):
    print("開始分析氣壓")
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_pressure = df.select(  # 選擇需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['pressure'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期數(shù)據(jù)
        F.hour(df['time']).alias("hour")  # 得到小時(shí)數(shù)據(jù)
    )
    # 篩選四點(diǎn)時(shí)次
    df_4point_pressure = df_pressure.filter(df_pressure['hour'].isin([2, 8, 14, 20]))
    df_avg_pressure = df_4point_pressure.groupBy("province", "city_name", "city_code", "date").agg(
        F.count("pressure"), F.avg("pressure").alias("avg_pressure")).sort(
        F.asc("avg_pressure")).select("province", "city_name", "city_code", "date",
                                      F.format_number('avg_pressure', 1).alias("avg_pressure"))
    avg_pressure_list = df_avg_pressure.collect()
    df_avg_pressure.coalesce(1).write.csv("passed_pressure.csv")
    print("氣壓分析完畢")
    return avg_pressure_list[0:20]  # 最低的20個(gè)


# 計(jì)算各個(gè)城市過去24小時(shí)平均風(fēng)向
def passed_windDirection_analyse(filename):
    print("開始分析風(fēng)向")
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_windDirection = df.select(  # 選擇需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['windDirection'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期數(shù)據(jù)
        F.hour(df['time']).alias("hour")  # 得到小時(shí)數(shù)據(jù)
    ).filter(df['windDirection'] < 400)
    # 篩選四點(diǎn)時(shí)次
    df_4point_windDirection = df_windDirection.filter(df_windDirection['hour'].isin([2, 8, 14, 20]))
    df_avg_windDirection = df_4point_windDirection.groupBy("province", "city_name", "city_code", "date").agg(
        F.count("windDirection"), F.avg("windDirection").alias("avg_windDirection")).sort(
        F.asc("avg_windDirection")).select("province", "city_name", "city_code", "date",
                                           F.format_number('avg_windDirection', 1).alias("avg_windDirection"))
    df_avg_windDirection.coalesce(1).write.csv("passed_windDirection.csv")
    print("風(fēng)向分析完畢")


# 繪制累積降雨量圖
def draw_rain(rain_list):
    print("開始繪制累積降雨量圖")
    name_list = []
    num_list = []
    for item in rain_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(item.rain24h)
    index = [i + 0.25 for i in range(0, len(num_list))]
    plt.figure(figsize=(15, 15))  # 設(shè)置圖的大小
    rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontsize=15, color='r')  # fontsize設(shè)置x刻度字體大小
    plt.ylim(ymax=(int(max(num_list) + 100) / 100) * 60, ymin=0)  # 設(shè)置刻度間隔
    plt.yticks(fontsize=20, color='r')  # fontsize設(shè)置y刻度字體大小
    plt.xlabel("城市", fontsize=25, color='darkblue')  # fontsize設(shè)置x坐標(biāo)標(biāo)簽字體大小
    plt.ylabel("雨量", fontsize=25, color='darkblue')  # fontsize設(shè)置y坐標(biāo)標(biāo)簽字體大小
    plt.title("2023年5月11日24小時(shí)累計(jì)降雨量全國前20名", fontsize=30, color='b')  # fontsize設(shè)置標(biāo)題字體大小
    for rect in rects:
        height = rect.get_height()
        # fontsize設(shè)置直方圖上字體大小
        plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
    plt.show()
    print("累積降雨量圖繪制完畢!")


def draw_temperature(temperature_list):
    print("開始繪制氣溫圖")
    name_list = []
    num_list = []
    date = temperature_list[1].date
    for item in temperature_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name[0:2] + '\n' + item.city_name[2:])
        num_list.append(float(item.avg_temperature))
    index = [i + 0.25 for i in range(0, len(num_list))]
    plt.figure(figsize=(20, 12))  # 設(shè)置圖的大小
    rects = plt.bar(index, num_list, color='ckrmgby', width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontsize=20, color='r')  # fontsize設(shè)置x刻度字體大小
    plt.ylim(ymax=math.ceil(float(max(num_list))) * 3.3, ymin=0)  # 設(shè)置刻度間隔
    plt.yticks(fontsize=20, color='r')  # fontsize設(shè)置y刻度字體大小
    plt.xlabel("城市", fontsize=25, color='darkblue')  # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
    plt.ylabel("日平均氣溫", fontsize=25, color='darkblue')  # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
    plt.title("2023-5-11全國日平均氣溫最低前20名", fontsize=30, color='b')  # fontsize設(shè)置標(biāo)題字體大小
    for rect in rects:
        height = rect.get_height()
        # fontsize設(shè)置直方圖上字體大小
        plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom", fontsize=18)
    plt.show()
    print("氣溫圖繪制完畢!")


def draw_pressure(file_address):
    print("開始繪制氣壓圖")
    name_list = []
    num_list = []
    df = pd.read_csv(file_address, header=None)
    data = df.to_numpy()
    print(data)
    for i in data:
        name_list.append(i[0][0:2] + '\n' + i[1][0:2] + '\n' + i[1][2:])
        num_list.append(float(i[4]))

    plt.figure(figsize=(15, 12))
    plt.plot(name_list, num_list, label='日平均氣壓', alpha=0.8, mfc='r', ms=30, mec='b', marker='.', linewidth=5,
             linestyle="--")
    plt.xticks(fontsize=20, color='r')  # fontsize設(shè)置x刻度字體大小
    plt.yticks(fontsize=20, color='r')  # fontsize設(shè)置y刻度字體大小
    plt.xlabel("城市", fontsize=25, color='b')  # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
    plt.ylabel("日平均氣壓", fontsize=25, color='b')  # fontsize設(shè)置坐標(biāo)標(biāo)簽字體大小
    # 設(shè)置數(shù)字標(biāo)簽
    for a, b in zip(name_list, num_list):
        plt.text(a, b + 0.5, int(b), ha='center', va='bottom', fontsize=20)
    plt.title("2023年5月11日全國日平均氣壓最低前20名", fontsize=30, color='b')  # fontsize設(shè)置標(biāo)題字體大小
    plt.rcParams.update({'font.size': 20})
    plt.legend(bbox_to_anchor=(1, 0.1))  # 圖例移到右下角
    plt.show()
    print("氣壓圖繪制完畢!")


def draw_windDirection(file_address):
    dict = {'bei': 0, 'dongbei': 0, 'dong': 0, 'dongnan': 0, 'nan': 0, 'xinan': 0, 'xi': 0, 'xibei': 0}
    df = pd.read_csv(file_address)
    data = df.to_numpy()
    list = data[:, -1]
    list = [float(x) for x in list]
    Num = df.shape[0]  # 4546
    for i in list:
        if 0 <= i <= 22.5 or 337.5 <= i <= 360:
            dict['bei'] += 1
        if 22.5 < i <= 67.5:
            dict['dongbei'] += 1
        if 67.5 < i <= 112.5:
            dict['dong'] += 1
        if 112.5 < i <= 157.5:
            dict['dongnan'] += 1
        if 157.5 < i <= 202.5:
            dict['nan'] += 1
        if 202.5 < i <= 247.5:
            dict['xinan'] += 1
        if 247.5 < i <= 292.5:
            dict['xi'] += 1
        if 292.5 < i <= 337.5:
            dict['xibei'] += 1
    # {'bei': 286, 'dongbei': 283, 'dong': 405, 'dongnan': 644, 'nan': 1017, 'xinan': 931, 'xi': 650, 'xibei': 330}
    data = [dict['bei'], dict['dongbei'], dict['dong'], dict['dongnan'], dict['nan'], dict['xinan'], dict['xi'],
            dict['xibei']]
    # 數(shù)據(jù)標(biāo)簽
    labels = ['北風(fēng)', '東北風(fēng)', '東風(fēng)', '東南風(fēng)', '南風(fēng)', '西南風(fēng)', '西風(fēng)', '西北風(fēng)']
    # 各區(qū)域顏色
    colors = ['lightcoral', 'orange', 'yellow', 'yellowgreen', 'plum', 'cyan', 'hotpink', 'silver']
    # 數(shù)據(jù)計(jì)算處理
    sizes = [data[0] / Num, data[1] / Num, data[2] / Num, data[3] / Num, data[4] / Num,
             data[5] / Num, data[6] / Num, data[7] / Num]
    # 設(shè)置突出模塊偏移值
    expodes = (0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0)
    plt.figure(figsize=(15, 15))
    # 設(shè)置繪圖屬性并繪圖
    plt.pie(sizes, explode=expodes, labels=labels, shadow=True, colors=colors, autopct='%.1f%%',
            textprops={'fontsize': 22})
    plt.axis('equal')
    plt.title("2023年5月11日全國風(fēng)向統(tǒng)計(jì)圖", fontsize=30, color='b')  # fontsize設(shè)置標(biāo)題字體大小
    plt.show()


def draw_province_temperature():
    tempreture = [
        ['甘肅省', 13.5],
        ['吉林省', 15.7],
        ['貴州省', 16.6],
        ['山西省', 16.9],
        ['陜西省', 17.7],
        ['四川省', 17.9],
        ['浙江省', 18.2],
        ['湖南省', 18.3],
        ['安徽省', 18.9],
        ['江西省', 18.9],
        ['上海市', 19.0],
        ['遼寧省', 19.0],
        ['北京市', 19.3],
        ['云南省', 19.4],
        ['湖北省', 19.6],
        ['江蘇省', 19.6],
        ['河北省', 19.9],
        ['河南省', 20.0],
        ['福建省', 20.1],
        ['重慶市', 20.9],
        ['山東省', 21.1],
        ['廣東省', 21.1],
        ['天津市', 22.1],
        ['臺灣省', 25.0],
        ['海南省', 26.1],
        ['廣西壯族自治區(qū)', 20.3],
        ['新疆維吾爾自治區(qū)', 20.4],
        ['澳門特別行政區(qū)', 22.4],
        ['香港特別行政區(qū)', 24.4],
        ['西藏自治區(qū)', 3.5],
        ['青海省', 3.6],
        ['黑龍江省', 16.5],
        ['內(nèi)蒙古自治區(qū)', 15.0],
        ['寧夏回族自治區(qū)', 16.2]
    ]

    Map(init_opts=opts.InitOpts(height="1000px", width="1500px")).add(
        series_name="氣溫",
        data_pair=tempreture,
        maptype="china",
        # 是否啟用鼠標(biāo)滾輪縮放和拖動平移,默認(rèn)為True
        is_roam=True,
        # 是否顯示圖形標(biāo)記,默認(rèn)為True
        is_map_symbol_show=False,
    ).set_global_opts(
        # 設(shè)置標(biāo)題
        title_opts=opts.TitleOpts(title="2023-5-11全國各省平均氣溫",
                                  subtitle="數(shù)據(jù)來源:中央氣象臺網(wǎng)站",
                                  pos_right="center",
                                  pos_top="5%"),
        # 設(shè)置標(biāo)準(zhǔn)顯示
        visualmap_opts=opts.VisualMapOpts(max_=30,
                                          min_=0),
        # range_color=["#E0ECF8", "#045FB4"]),
    ).set_series_opts(
        # 標(biāo)簽名稱顯示,默認(rèn)為True
        label_opts=opts.LabelOpts(is_show=True, color="blue")
    ).render("2023-5-11全國各省平均氣溫.html")


def draw_windSpeed(file):
    data = pd.read_csv(file)
    windd = []
    wind = list(data['windDirection'])
    wind_speed = list(data['windSpeed'])
    for i in wind:
        if i > 360:
            windd.append(0)
        if 0 <= i <= 22.5 or 337.5 <= i <= 360:
            windd.append(90)
        if 22.5 < i <= 67.5:
            windd.append(45)
        if 67.5 < i <= 112.5:
            windd.append(360)
        if 112.5 < i <= 157.5:
            windd.append(315)
        if 157.5 < i <= 202.5:
            windd.append(270)
        if 202.5 < i <= 247.5:
            windd.append(225)
        if 247.5 < i <= 292.5:
            windd.append(180)
        if 292.5 < i <= 337.5:
            windd.append(135)
    degs = np.arange(45, 361, 45)
    temp = []
    for deg in degs:
        speed = []
        # 獲取 wind_deg 在指定范圍的風(fēng)速平均值數(shù)據(jù)
        for i in range(0, 24):
            if windd[i] == deg:
                speed.append(wind_speed[i])
        if len(speed) == 0:
            temp.append(0)
        else:
            temp.append(sum(speed) / len(speed))
    N = 8
    theta = np.arange(0. + np.pi / 8, 2 * np.pi + np.pi / 8, 2 * np.pi / 8)
    # 數(shù)據(jù)極徑
    radii = np.array(temp)
    # 繪制極區(qū)圖坐標(biāo)系
    plt.axes(polar=True)
    # 定義每個(gè)扇區(qū)的RGB值(R,G,B),x越大,對應(yīng)的顏色越接近藍(lán)色
    colors = plt.cm.viridis(np.random.rand(N))
    plt.bar(theta, radii, width=(2 * np.pi / N), bottom=0.0, color=colors)
    plt.title('2023-5-12武漢市主城區(qū)一天風(fēng)級圖', x=0.2, fontsize=15, loc='left')
    plt.show()
    return temp


sourcefile = "passed_weather_ALL.csv"

# 降雨
# rain_list = passed_rain_analyse(sourcefile)
# draw_rain(rain_list)

# 氣溫
# temperature_list = passed_temperature_analyse(sourcefile)
# draw_temperature(temperature_list)

# 氣壓
# pressure_list = passed_pressure_analyse(sourcefile)
# draw_pressure('low_pressure.csv')

# 風(fēng)向
# windDirection_list = passed_windDirection_analyse(sourcefile)
# draw_windDirection('windDirection.csv')

# 各省氣溫
# province_temperature_analyse(sourcefile)
# draw_province_temperature()

# 風(fēng)向雷達(dá)圖
#passed_windSpeed_analyse(sourcefile)
draw_windSpeed('wuhan_wind.csv')

到了這里,關(guān)于基于Spark的氣象數(shù)據(jù)分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 基于hadoop的氣象數(shù)據(jù)可視化分析

    基于hadoop的氣象數(shù)據(jù)可視化分析

    目 錄 摘 要 I Abstract III 1緒論 1 1.1選題背景及意義 1 1.2研究現(xiàn)狀及趨勢 1 1.3研究主要內(nèi)容 2 2相關(guān)技術(shù)簡介 3 2.1開發(fā)工具 3 2.1.1 JDK1.7 3 2.1.2 eclipse luna 3 2.1.3 Hadoop 2.7.2 3 2.1.4 hbase 1.1.3 3 2.1.5 hive 1.2.1 3 2.1.6 zookeeper 3.4.8 4 2.1.7 mysql 5.5 4 2.1.8 swing 4 2.1.9 VMware Workstation 12 Pro 4 2.1.10其他輔助

    2024年02月02日
    瀏覽(55)
  • 基于Spark的音樂專輯數(shù)據(jù)分析

    基于Spark的音樂專輯數(shù)據(jù)分析

    每天天都在努力學(xué)習(xí)的我們 ????????本篇博客講解的內(nèi)容依舊是使用Spark進(jìn)行相關(guān)的數(shù)據(jù)分析,按理來說數(shù)據(jù)分析完之后應(yīng)該搞一搞可視化的,由于目前時(shí)間緊張,顧不得學(xué)習(xí)可視化了,先來看一下此次的內(nèi)容把。 ????????在Kaggle數(shù)據(jù)平臺下載了數(shù)據(jù)集albunms.csv,里面

    2024年02月08日
    瀏覽(26)
  • 基于Spark技術(shù)的銀行客戶數(shù)據(jù)分析

    基于Spark技術(shù)的銀行客戶數(shù)據(jù)分析

    申明: 未經(jīng)許可,禁止以任何形式轉(zhuǎn)載,若要引用,請標(biāo)注鏈接地址 全文共計(jì)4672字,閱讀大概需要3分鐘 大數(shù)據(jù)實(shí)驗(yàn)教學(xué)系統(tǒng) 案例:銀行客戶數(shù)據(jù)分析 某銀行積累有大量客戶數(shù)據(jù),現(xiàn)希望大數(shù)據(jù)分析團(tuán)隊(duì)使用Spark技術(shù)對這些數(shù)據(jù)進(jìn)行分析,以期獲得有價(jià)值的信息。 本案例用

    2024年02月09日
    瀏覽(43)
  • 基于spark對美國新冠肺炎疫情數(shù)據(jù)分析

    基于spark對美國新冠肺炎疫情數(shù)據(jù)分析

    GCC的同學(xué)不要抄襲呀?。?!嚴(yán)禁抄襲 有任何學(xué)習(xí)問題可以加我微信交流哦!bmt1014 前言 2020年美國新冠肺炎疫情是全球范圍內(nèi)的一場重大公共衛(wèi)生事件,對全球政治、經(jīng)濟(jì)、社會等各個(gè)領(lǐng)域都產(chǎn)生了深遠(yuǎn)影響。在這場疫情中,科學(xué)家們發(fā)揮了重要作用,積極探索病毒特性、傳

    2024年02月04日
    瀏覽(18)
  • 任務(wù)15:使用Hive進(jìn)行全國氣象數(shù)據(jù)分析

    任務(wù)15:使用Hive進(jìn)行全國氣象數(shù)據(jù)分析

    任務(wù)描述 知識點(diǎn) : 使用Hive進(jìn)行數(shù)據(jù)分析 重? 點(diǎn) : 掌握Hive基本語句 熟練使用Hive對天氣數(shù)據(jù)進(jìn)行分析 內(nèi)? 容 : 使用Hive創(chuàng)建外部表 使用Hive對數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析 任務(wù)指導(dǎo) 1. 使用Hive創(chuàng)建基礎(chǔ)表 將China_stn_city.csv文件上傳到HDFS的/china_stn目錄中 啟動metastore(后臺運(yùn)行) 進(jìn)入

    2024年01月16日
    瀏覽(26)
  • 基于Hadoop的京東商城數(shù)據(jù)分析的研究與實(shí)現(xiàn)

    題目 基于 Hadoop 的京東商城數(shù)據(jù)分析的研究與實(shí)現(xiàn) 1. 課題研究立項(xiàng)依據(jù) (1)課題來源 隨著互聯(lián)網(wǎng)信息技術(shù)的發(fā)展,企業(yè)商務(wù)模式也發(fā)生了翻天覆地的變化,很多傳統(tǒng)企業(yè)都把目光投向了互聯(lián)網(wǎng)電子商務(wù)。近年來,越來越多的電子商務(wù)平臺的誕生,引起了電子商務(wù)業(yè)內(nèi)的廣泛

    2024年02月06日
    瀏覽(28)
  • Spark內(nèi)容分享(二十七):阿里云基于 Spark 的云原生數(shù)據(jù)湖分析實(shí)踐

    Spark內(nèi)容分享(二十七):阿里云基于 Spark 的云原生數(shù)據(jù)湖分析實(shí)踐

    目錄 Spark 與云原生的結(jié)合 1. 傳統(tǒng) Spark 集群的痛點(diǎn) 2. Spark 與云原生結(jié)合的優(yōu)勢 Spark on K8s 原理介紹 1. Spark 的集群部署模式 2. Spark on K8s 的部署架構(gòu) 3. Spark on K8s 部署架構(gòu)——對比 4. Spark on K8s 社區(qū)進(jìn)展 5. Spark 3.3 新特性介紹 Spark on K8s 在阿里云 EMR 上的實(shí)踐 1. EMR Spark on ACK 2. 充分

    2024年01月15日
    瀏覽(19)
  • 【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

    【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

    一、選題背景 新型冠狀病毒疫情是由嚴(yán)重急性呼吸系統(tǒng)綜合征冠狀病毒2(SARS-CoV-2)導(dǎo)致的2019冠狀病毒?。–OVID-19)所引發(fā)的全球大流行疫情。該疾病在2019年末于中華人民共和國湖北省武漢市首次爆發(fā),隨后在2020年初迅速擴(kuò)散至全球多國,逐漸變成一場全球性的大瘟疫。截

    2023年04月19日
    瀏覽(24)
  • 【畢業(yè)設(shè)計(jì)_課程設(shè)計(jì)】基于Spark網(wǎng)易云音樂數(shù)據(jù)分析

    【畢業(yè)設(shè)計(jì)_課程設(shè)計(jì)】基于Spark網(wǎng)易云音樂數(shù)據(jù)分析

    基于Spark網(wǎng)易云音樂數(shù)據(jù)分析 提示:適合用于課程設(shè)計(jì)或畢業(yè)設(shè)計(jì),工作量達(dá)標(biāo),源碼開放 包含爬蟲,Scala代碼,Spark,Hadoop,ElasticSearch,logstash,Flume,echarts,log4j emotional_analysis_spider 爬蟲模塊 emotional_analysis_web 數(shù)據(jù)處理模塊(Scala代碼) emotional_analysis_recommend 推薦模塊目前還未開發(fā) emot

    2024年02月06日
    瀏覽(32)
  • Spark 大數(shù)據(jù)實(shí)戰(zhàn):基于 RDD 的大數(shù)據(jù)處理分析

    Spark 大數(shù)據(jù)實(shí)戰(zhàn):基于 RDD 的大數(shù)據(jù)處理分析

    之前筆者參加了公司內(nèi)部舉辦的一個(gè) Big Data Workshop,接觸了一些 Spark 的皮毛,后來在工作中陸陸續(xù)續(xù)又學(xué)習(xí)了一些 Spark 的實(shí)戰(zhàn)知識。 本文筆者從小白的視角出發(fā),給大家普及 Spark 的應(yīng)用知識。 Spark 集群是基于 Apache Spark 的分布式計(jì)算環(huán)境,用于處理 大規(guī)模數(shù)據(jù)集 的計(jì)算任

    2024年01月25日
    瀏覽(41)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包