国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

這篇具有很好參考價值的文章主要介紹了【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、選題背景

新型冠狀病毒疫情是由嚴重急性呼吸系統(tǒng)綜合征冠狀病毒2(SARS-CoV-2)導(dǎo)致的2019冠狀病毒病(COVID-19)所引發(fā)的全球大流行疫情。該疾病在2019年末于中華人民共和國湖北省武漢市首次爆發(fā),隨后在2020年初迅速擴散至全球多國,逐漸變成一場全球性的大瘟疫。截至到2022年12月7日,全球已累計報告超過6.43億例確診病例,其中超過663.7萬人死亡,是人類歷史上最大規(guī)模的流行病之一。

這次疫情導(dǎo)致嚴重的全球性的社會和經(jīng)濟混亂,被視為人類自第二次世界大戰(zhàn)以來面臨的最嚴峻危機,并使全球經(jīng)濟陷入自從1930年代的大蕭條以來最大的衰退。危機爆發(fā)的初期,亦遇上全球醫(yī)療與民生用品因為恐慌性消費導(dǎo)致供應(yīng)不足、傳播假新聞與針對不同族裔產(chǎn)生種族或地域等歧視的問題。許多教育機構(gòu)和公共區(qū)域被部分或完全關(guān)閉,很多活動被取消或推遲。而疫情擴散對全球航空、旅游、娛樂、體育、石油市場、金融市場等方面造成巨大影響并在經(jīng)濟重啟后仍持續(xù)多年。

鑒于新冠肺炎疫情的巨大影響,我們只有先做到客觀而全面地認識,才能謀求科學(xué)有效的應(yīng)對方法。本項目使用Spark等大數(shù)據(jù)處理工具,對美國逾兩年的疫情狀況進行分析,以期得到數(shù)據(jù)背后的疫情發(fā)展規(guī)律,更加客觀全面地看待疫情現(xiàn)狀,為中國的疫情防控舉措提供科學(xué)合理的參考意見。

?文章來源地址http://www.zghlxwxcb.cn/news/detail-418245.html

二、數(shù)據(jù)集介紹

鏈接:https://pan.baidu.com/s/1ke36AeJ0ThB_6zpNZwW4fQ?pwd=0294?
提取碼:0294

本項目的數(shù)據(jù)集來自數(shù)據(jù)競賽網(wǎng)站Kaggle的美國各縣疫情數(shù)據(jù)集US counties COVID 19 dataset,包含了從2020年1月21日美國新增第一例確診病例到2022年5月13日停止更新期間的250萬多條記錄。該數(shù)據(jù)集以csv格式組織,各屬性的含義如下:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

date表示日期,county表示區(qū)縣名稱,state表示州名,fips表示各縣位置編碼,cases表示截止當前日期累計確診病例數(shù),deaths表示截止當前日期累計死亡病例數(shù)。FIPS(Federal Information Processing System)編碼是美國國內(nèi)各地區(qū)各自的唯一編碼,用以區(qū)分不同地理實體。每個州各自均有一個唯一的二位FIPS編碼,每個州下的所有郡縣級地區(qū)同樣有各自唯一的五位FIPS編碼(其中前兩位是一樣的,都是該州所屬FIPS編碼)。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

?

三、Spark簡介

Spark最初由美國加州伯克利大學(xué)的AMP 實驗室于2009年開發(fā),是基于內(nèi)存計算的大數(shù)據(jù)并行計算框架,可用于構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序。 Spark于2013年加入Apache孵化器項目后發(fā)展迅猛,如今已成為Apache軟件基金會最重要的三大分布式計算系統(tǒng)開源項目之一(Hadoop、Spark、Storm)。值得一提的是,Spark在2014年打破了Hadoop保持的基準排序記錄。Spark使用206個節(jié)點,用時23分鐘完成了100TB數(shù)據(jù)的排序,而Hadoop使用了2000個節(jié)點,耗時72分鐘,完成了100TB數(shù)據(jù)的排序。也就是說Spark用十分之一的計算資源,獲得了比Hadoop快3倍的速度。

Spark具有如下幾個主要特點:運行速度快:使用DAG執(zhí)行引擎以支持循環(huán)數(shù)據(jù)流與內(nèi)存計算;容易使用:支持使用Scala、Java、Python和R語言進行編程,可以通過 Spark Shell進行交互式編程;通用性:Spark提供了完整而強大的技術(shù)棧,包括SQL查詢、流式計算、機器學(xué)習(xí)和圖算法組件;運行模式多樣:可運行于獨立的集群模式中,可運行于Hadoop中,也可運行于Amazon EC2等云環(huán)境中,并且可以訪問HDFS、Cassandra、HBase和Hive等多種數(shù)據(jù)源。

傳統(tǒng)大數(shù)據(jù)處理框架Hadoop存在如下一些缺點:表達能力有限;磁盤IO開銷大;延遲高;任務(wù)之間的銜接涉及IO開銷;在前一個任務(wù)執(zhí)行完成之前,其他任務(wù)就無法開始,難以勝任復(fù)雜、多階段的計算任務(wù)。Spark在借鑒Hadoop MapReduce優(yōu)點的同時,很好地解決了MapReduce所面臨的問題。相比于Hadoop MapReduce,Spark主要具有如下優(yōu)點:Spark的計算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種數(shù)據(jù)集操作類型,編程模型比Hadoop MapReduce更靈活;Spark提供了內(nèi)存計算,可將中間結(jié)果放到內(nèi)存中,對于迭代運算效率更高;Spark基于DAG的任務(wù)調(diào)度執(zhí)行機制,要優(yōu)于Hadoop MapReduce的迭代執(zhí)行機制。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測??

?

Spark的運行架構(gòu)包括集群管理器(Cluster Manager)、運行作業(yè)任務(wù)的工作節(jié)點(Worker Node)、每個應(yīng)用的任務(wù)控制節(jié)點(Driver)和每個工作節(jié)點上負責(zé)具體任務(wù)的執(zhí)行進程(Executor)。其中,集群管理器可以是Spark自帶的資源管理器,也可以是YARN或Mesos等資源管理框架。與Hadoop MapReduce計算框架相比,Spark所采用的Executor有兩個優(yōu)點。一是利用多線程來執(zhí)行具體的任務(wù)(HadoopMapReduce采用的是進程模型),減少任務(wù)的啟動開銷。二是 Executor中有一個BlockManager存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,當需要多輪迭代計算時,可以將中間結(jié)果存儲到這個存儲模塊里,下次需要時就可以直接讀該存儲模塊里的數(shù)據(jù),而不需要讀寫到 HDFS 等文件系統(tǒng)里,因而有效減少了 IO 開銷;或者在交互式查詢場景下,Executor預(yù)先將表緩存到該存儲系統(tǒng)上,從而可以提高讀寫IO的性能。

Spark運行基本流程如下圖所示,流程如下。

(1)當一個Spark應(yīng)用被提交時,首先需要為這個應(yīng)用構(gòu)建起基本的運行環(huán)境,即由任務(wù)控制節(jié)點(Driver)創(chuàng)建一個SparkContext,由SparkContext負責(zé)和資源管理器—Cluster Manager的通信,以及進行資源的申請、任務(wù)的分配和監(jiān)控等。SparkContext 會向資源管理器注冊并申請運行Executor的資源。

(2)資源管理器為Executor分配資源,并啟動Executor進程,Executor運行情況將隨著“心跳”發(fā)送到資源管理器上。

(3)SparkContext 根據(jù) RDD 的依賴關(guān)系構(gòu)建 DAG,并將 DAG 提交給 DAG 調(diào)度器(DAGScheduler)進行解析,將DAG分解成多個“階段”(每個階段都是一個任務(wù)集),并且計算出各個階段之間的依賴關(guān)系,然后把一個個“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進行處理;Executor 向 SparkContext 申請任務(wù),任務(wù)調(diào)度器將任務(wù)分發(fā)給 Executor 運行,同時SparkContext將應(yīng)用程序代碼發(fā)放給Executor。

(4)任務(wù)在Executor上運行,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器,然后反饋給DAG調(diào)度器,運行完畢后寫入數(shù)據(jù)并釋放所有資源。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

Spark的核心建立在統(tǒng)一的抽象RDD之上,這使得Spark的各個組件可以無縫地進行集成,在同一個應(yīng)用程序中完成大數(shù)據(jù)計算任務(wù)。一個RDD就是一個分布式對象集合,本質(zhì)上是一個只讀的分區(qū)記錄集合。每個RDD可以分成多個分區(qū),每個分區(qū)就是一個數(shù)據(jù)集片段,并且一個 RDD 的不同分區(qū)可以被保存到集群中不同的節(jié)點上,從而可以在集群中的不同節(jié)點上進行并行計算。RDD提供了一組豐富的操作以支持常見的數(shù)據(jù)運算,分為“行動”(Action)和“轉(zhuǎn)換”(Transformation)兩種類型,前者用于執(zhí)行計算并指定輸出的形式,后者指定RDD之間的相互依賴關(guān)系。兩類操作的主要區(qū)別是,轉(zhuǎn)換操作(如map、filter、groupBy、join等)接受RDD并返回RDD,而行動操作(如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結(jié)果)。RDD采用了惰性調(diào)用,即在RDD的執(zhí)行過程中,真正的計算發(fā)生在RDD的“行動”操作,對于“行動”之前的所有“轉(zhuǎn)換”操作,Spark只是記錄下“轉(zhuǎn)換”操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集以及RDD生成的軌跡,即相互之間的依賴關(guān)系,而不會觸發(fā)真正的計算。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

?

四、實驗環(huán)境搭建

由于虛擬機內(nèi)存大小的限制,在虛擬機中運行程序會出現(xiàn)運行速度過慢甚至運行失敗等問題,所以直接在Windows操作系統(tǒng)中重新搭建實驗環(huán)境。

1、安裝JDK1.8。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

2、配置環(huán)境變量。點擊我的電腦-屬性-高級系統(tǒng)設(shè)置-環(huán)境變量,點擊新建,創(chuàng)建JAVA_HOME。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

3、安裝Hadoop2.7.1。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

不要忘記配置環(huán)境變量:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

4、安裝winutils。Hadoop主要基于Linux編寫,winutil.exe主要用于模擬Linux下的目錄環(huán)境,因此Hadoop若想要在Windows下運行,需要winutil.exe的幫助。下載完成后找到相應(yīng)的Hadoop版本,這里我們安裝的是2.7.1,進入該目錄,將bin目錄下的所有內(nèi)容復(fù)制,粘貼到Hadoop2.7.1安裝目錄的bin目錄下。

5、修改hadoop-env.cmd。打開Hadoop安裝目錄,找到hadoop-env.cmd文件,指定JDK的路徑。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

6、安裝Python3.6。由于版本問題,使用Hadoop2.7和Spark2.4版本只能使用Python3.6,所以需要安裝Python3.6。建議使用Anaconda來下載Python。

7、安裝Spark并配置環(huán)境變量。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

8、進入Spark的安裝目錄,將pyspark復(fù)制到Anaconda下的python36目錄。

9、安裝Py4J。在Python3.6環(huán)境的Scripts文件夾打開cmd窗口使用pip安裝。Py4J?是Python和Java的互調(diào)接口,使得Python程序可以利用Python解釋器直接調(diào)用Java虛擬機中的Java對象,也可以讓Java調(diào)用Python對象。

這樣,環(huán)境搭建已經(jīng)完成。我們打開cmd窗口輸入spark-shell:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

說明安裝成功。

?

五、基于Spark的數(shù)據(jù)分析及可視化

首先導(dǎo)入所需要的包:

import pandas as pd
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func

?原始數(shù)據(jù)集是csv格式的,為方便Spark讀取,需要轉(zhuǎn)化為txt格式:

data = pd.read_csv('C:\\Users\\26909\\Desktop\\us-counties.csv')
with open('us-counties.txt', 'a+', encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0]) + '\t' + str(line[1]) + '\t'
                 + str(line[2]) + '\t' + str(line[3]) + '\t' + str(line[4]) + '\n'))

?下面創(chuàng)建一個spark對象作為編程入口:

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

因為其構(gòu)造函數(shù)私有,所以需要用builder方法創(chuàng)建SparkSession對象。SparkSession內(nèi)部封裝了SparkContext,所以計算實際上是由SparkContext完成的。

在Spark2.0版本之前,使用Spark必須先創(chuàng)建SparkConf和SparkContext;在Spark2.0中只要創(chuàng)建一個SparkSession就夠了,SparkConf、SparkContext和SQLContext都已經(jīng)被封裝在SparkSession當中。

下圖說明了SparkContext在Spark中的主要功能:

?【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

從圖中可以看到SparkContext起到中介的作用,通過它來使用Spark其他功能。每一個JVM都有一個對應(yīng)的SparkContext,Driver Program通過SparkContext連接到集群管理器來實現(xiàn)對集群中任務(wù)的控制。Spark配置參數(shù)的設(shè)置以及對SQLContext、HiveContext和StreamingContext的控制也要通過SparkContext進行。然而在Spark2.0中上述的所有功能都是通過SparkSession來完成的,同時SparkSession也簡化了DataFrame/Dataset API的使用和對數(shù)據(jù)的操作。

?

fields = [StructField("date", DateType(), False), StructField("county", StringType(), False),
          StructField("state", StringType(), False),
          StructField("cases", IntegerType(), False), StructField("deaths", IntegerType(), False), ]
schema = StructType(fields)

一個StructField記錄了列名、列類型、列是否運行為空;多個StructField組成一個StructType對象,一個StructType對象描述一個DataFrame。

?

rdd0 = spark.sparkContext.textFile("us-counties.txt")
rdd1 = rdd0.map(lambda x: x.split("\t")).map(lambda p: Row(toDate(p[0]), p[1], p[2], int(p[3]), int(p[4])))

SparkContext.textFile()從HDFS、本地文件系統(tǒng)或任何Hadoop支持的文件系統(tǒng)讀取文本文件,并將其作為字符串的RDD返回。同時,一行數(shù)據(jù)被描述為Row對象。rdd1是rdd0進行map transformation的結(jié)果,由于惰性機制,并不是立即執(zhí)行計算。

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

如上圖所示,Spark對RDD的操作可以整體分為兩類:Transformation和Action。這里的Transformation可以翻譯為轉(zhuǎn)換,表示是針對RDD中數(shù)據(jù)的轉(zhuǎn)換操作,主要會針對已有的RDD創(chuàng)建一個新的RDD,常見的有map、flatMap、filter等。Action可以翻譯為執(zhí)行,表示是觸發(fā)任務(wù)執(zhí)行的操作,主要對RDD進行最后的操作,比如遍歷、 reduce、保存到文件等,并且還可以把結(jié)果返回給Driver程序。

不管是Transformation里面的操作還是Action里面的操作,我們一般會把它們稱之為算子,例如:map 算子、reduce算子,其中Transformation算子有一個特性:lazy,lazy特性在這里指的是如果一個Spark任務(wù)中只定義了transformation算子,那么即使你執(zhí)行這個任務(wù),任務(wù)中的算子也不會執(zhí)行,也就是說,transformation是不會觸發(fā)Spark任務(wù)的執(zhí)行,它們只是記錄了對RDD所做的操作。只有當transformation之后,接著執(zhí)行了一個action操作,那么所有的transformation才會執(zhí)行。Spark通過惰性機制來進行底層的Spark任務(wù)執(zhí)行的優(yōu)化,避免產(chǎn)生過多中間結(jié)果。

?

shemaUsInfo = spark.createDataFrame(rdd1, schema)

在SQLContext中使用createDataFrame可以創(chuàng)建DataFrame。這里通過row+schema來創(chuàng)建Dataframe。

?

shemaUsInfo.createOrReplaceTempView("usInfo")
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"), func.sum("deaths")).sort(shemaUsInfo["date"].asc())
# 列重命名
df1 = df.withColumnRenamed("sum(cases)", "cases").withColumnRenamed("sum(deaths)", "deaths")
df1.repartition(1).write.json("result1.json")  # 寫入hdfs
# 注冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")

將原始數(shù)據(jù)注冊為一個名為usInfo的表,再將同一日期的病例數(shù)量和死亡數(shù)量進行聚合,注冊為ustotal表。

這樣,準備工作已全部完成,接下來就可以使用SQL語句對usInfo、ustotal等表進行篩選,從而進行分析。

下面是可視化分析:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

1 美國確診病例增長曲線

可以看出美國疫情總體經(jīng)歷了兩波疫情增長高峰,第一波出現(xiàn)在2020年與2021年之交,第二波開始于2021年秋天,一直持續(xù)到2022年春天。截止2022年5月13日,全美已累計確診8236.71萬例。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

2 美國死亡病例增長曲線

截止2022年5月13日,全美已累計死亡998279例。與確診病例增長曲線對比可以看出,死亡病例增長最快的時期與新增病例增速最快的時期基本重合。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

3 美國每日新增確診曲線圖

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

4 新華社有關(guān)美國日增新冠確診病例超百萬例的報道

每年10月至次年3月為疫情高發(fā)期,在2022年1月尤為明顯,該月確診病例暴增,甚至某些天日增超過百萬例,與新聞報道的時間吻合。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

圖5 美國每日新增死亡病例數(shù)曲線圖

冬季和春季是死亡病例的高發(fā)期,且死亡病例增長最快的時期與新增確診病例增速最快的時期基本重合。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

6 美國分州疫情地圖

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

7 美國分縣疫情地圖

美國東西部沿海地區(qū)經(jīng)濟發(fā)達,人口稠密,確診人數(shù)較多,如加利福尼亞、佛羅里達、德克薩斯、紐約等州。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

8 確診數(shù)量詞云圖

全美確診人數(shù)位居前列的州有加利福尼亞州、德克薩斯州、紐約州、伊利諾伊州、賓夕法尼亞州、新澤西州、北卡羅萊納州、馬薩諸塞州和亞利桑那州等。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

9 各州死亡病例Top10

死亡病例最多的前10個州依次為加利福尼亞州、紐約州、德克薩斯州、佛羅里達州、賓夕法尼亞州、新澤西州、伊利諾伊州、密歇根州和佐治亞州。

?

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

10 美國確診病例各月份占比餅狀圖

從該餅狀圖可以看出,美國確診病例3和4月占比最大,其次是1和2月。究其原因,可能是春季氣候變化無常,早晚溫差較大,是傳染病的高發(fā)季節(jié);而冬季溫度較低,更利于新冠肺炎病毒存活。

我們還可以對美國新冠肺炎疫情數(shù)據(jù)進行聚類分析。DBSCAN(Density-Based Spatial Clustering of Applications with Noise,具有噪聲的基于密度的聚類方法)是一種典型的基于密度的空間聚類算法。和K-Means這樣的一般只適用于凸樣本集的聚類相比,DBSCAN既可以適用于凸樣本集,也可以適用于非凸樣本集。該算法將具有足夠密度的區(qū)域劃分為簇,并在具有噪聲的空間數(shù)據(jù)庫中發(fā)現(xiàn)任意形狀的簇,它將簇定義為密度相連的點的最大集合。

該算法利用基于密度的聚類的概念,即要求聚類空間中的一定區(qū)域內(nèi)所包含對象(點或其他空間對象)的數(shù)目不小于某一給定閾值。DBSCAN算法的顯著優(yōu)點是聚類速度快且能夠有效處理噪聲點和發(fā)現(xiàn)任意形狀的空間聚類。但是當空間聚類的密度不均勻、聚類間距差相差很大時,聚類質(zhì)量較差。

DBSCAN的優(yōu)點在于,首先它可以對任意形狀的稠密數(shù)據(jù)集進行聚類,而K-Means之類的聚類算法一般只適用于凸數(shù)據(jù)集;其次它可以在聚類的同時發(fā)現(xiàn)異常點,對數(shù)據(jù)集中的異常點不敏感;且DBSCAN聚類結(jié)果沒有偏倚,而K-Means之類的聚類算法初始值對聚類結(jié)果有很大影響。

DBSCAN是基于一組鄰域來描述樣本集的緊密程度的,參數(shù)(?, MinPts)用來描述鄰域的樣本分布緊密程度。其中,?描述了某一樣本的鄰域距離閾值,MinPts描述了某一樣本的距離為?的鄰域中樣本個數(shù)的閾值。下面是DBSCAN聚類算法的流程圖:

?【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

下圖為可視化的聚類結(jié)果:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

?其中x、y軸分別為病例人數(shù)和死亡人數(shù)。為方便顯示,將坐標軸比例設(shè)置為log,坐標取值以指數(shù)量級增長。被歸為一類的樣本具有一定程度的相似性,可以針對這些時間節(jié)點的確診病例進行進一步的深入研究。

?

六、疫情數(shù)據(jù)建模預(yù)測

從前面的分析可以看出,疫情數(shù)據(jù)表現(xiàn)出了一定的季節(jié)特性,可以認為是一種時間序列數(shù)據(jù)。在對時間序列數(shù)據(jù)進行預(yù)測時,一個簡單的思路是認為離預(yù)測點越近的點所起的作用越大。例如對下個月的體重進行預(yù)測,這個月的數(shù)據(jù)影響力更大些。假設(shè)隨著時間變化權(quán)重以指數(shù)方式下降,最終年代久遠的數(shù)據(jù)權(quán)重將接近于0。將權(quán)重按照指數(shù)級進行衰減,這就是指數(shù)平滑法的基本思想。

指數(shù)平滑法有幾種不同形式:一次指數(shù)平滑法針對沒有趨勢和季節(jié)性的序列,二次指數(shù)平滑法針對有趨勢但沒有季節(jié)性的序列,三次指數(shù)平滑法針對有趨勢也有季節(jié)性的序列。Holt-Winters特指三次指數(shù)平滑法。

所有的指數(shù)平滑法都要更新上一時間步長的計算結(jié)果,并使用當前時間步長的數(shù)據(jù)中包含的新信息。它們通過混合新信息和舊信息來實現(xiàn),而相關(guān)的新舊信息的權(quán)重由一個可調(diào)整的參數(shù)來控制。

Statsmodels是Python進行擬合多種統(tǒng)計模型、進行統(tǒng)計試驗和數(shù)據(jù)探索可視化的庫,它提供了實現(xiàn)指數(shù)平滑法的ExponentialSmoothing模型,需要指定5個參數(shù),第一個endog就是時間序列數(shù)據(jù);第二個trend是趨勢,有三種可選項,就是加法趨勢、乘法趨勢還有None;第三個damped是衰減,Boolean決定是否對趨勢進行衰減;第四個seasonal是季節(jié)性(周期),也是三種選項,加法、乘法還有None;第五個seasonal_periods,季節(jié)性周期,int型,holt-winter要考慮的季節(jié)的數(shù)量。經(jīng)過訓(xùn)練集劃分和參數(shù)調(diào)整,最終的預(yù)測結(jié)果與真實數(shù)據(jù)可視化如下:

【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測

其中紅色曲線為預(yù)測值,藍色曲線為真實值??梢钥闯龆咦兓厔莼疽恢?,但預(yù)測精度有待改善。

?

八、源代碼

?Spark01.py

import pandas as pd
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
# .csv->.txt
data = pd.read_csv('C:\\Users\\26909\\Desktop\\us-counties.csv')
with open('us-counties.txt', 'a+', encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0]) + '\t' + str(line[1]) + '\t'
                 + str(line[2]) + '\t' + str(line[3]) + '\t' + str(line[4]) + '\n'))

def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date


spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

fields = [StructField("date", DateType(), False), StructField("county", StringType(), False),
          StructField("state", StringType(), False),
          StructField("cases", IntegerType(), False), StructField("deaths", IntegerType(), False), ]
schema = StructType(fields)

rdd0 = spark.sparkContext.textFile("us-counties.txt")
rdd1 = rdd0.map(lambda x: x.split("\t")).map(lambda p: Row(toDate(p[0]), p[1], p[2], int(p[3]), int(p[4])))

shemaUsInfo = spark.createDataFrame(rdd1, schema)

shemaUsInfo.createOrReplaceTempView("usInfo")

# 1.計算每日的累計確診病例數(shù)和死亡數(shù)
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"), func.sum("deaths")).sort(shemaUsInfo["date"].asc())

# 列重命名
df1 = df.withColumnRenamed("sum(cases)", "cases").withColumnRenamed("sum(deaths)", "deaths")
df1.repartition(1).write.json("result1.json")  # 寫入hdfs

# 注冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")

# 2.計算每日較昨日的新增確診病例數(shù)和死亡病例數(shù)
df2 = spark.sql(
    "select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")

df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")  # 寫入hdfs

# 3.統(tǒng)計截止5.19日 美國各州的累計確診人數(shù)和死亡人數(shù)
df3 = spark.sql(
    "select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")

df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json")  # 寫入hdfs

df3.createOrReplaceTempView("eachStateInfo")

# 4.找出美國確診最多的10個州
df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 56")
df4.repartition(1).write.json("result4.json")

# 5.找出美國死亡最多的10個州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")

# 6.找出美國確診最少的10個州
df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")

# 7.找出美國死亡最少的10個州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")

# 8.統(tǒng)計截止5.19全美和各州的病死率
df8 = spark.sql(
    "select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(), df8["deathRate"].desc()).repartition(1).write.json("result8.json")

?

?Spark02.py

from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json

# 1.畫出每日的累計確診病例數(shù)和死亡數(shù)——>雙柱狀圖
def drawChart_1(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))
    d = (
        Bar()
            .add_xaxis(date)
            .add_yaxis("累計確診人數(shù)", cases, stack="stack1")
            .add_yaxis("累計死亡人數(shù)", deaths, stack="stack1")
            .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
            .set_global_opts(title_opts=opts.TitleOpts(title="美國每日累計確診和死亡人數(shù)"))
            .render("result1.html")
    )

# 2.畫出每日的新增確診病例數(shù)和死亡數(shù)——>折線圖
def drawChart_2(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))
    (
        Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
            .add_xaxis(xaxis_data=date)
            .add_yaxis(
            series_name="新增確診",
            y_axis=cases,
            markpoint_opts=opts.MarkPointOpts(
                data=[
                    opts.MarkPointItem(type_="max", name="最大值")
                ]
            ),
            markline_opts=opts.MarkLineOpts(
                data=[opts.MarkLineItem(type_="average", name="平均值")]
            ),
        )
            .set_global_opts(
            title_opts=opts.TitleOpts(title="美國每日新增確診折線圖", subtitle=""),
            tooltip_opts=opts.TooltipOpts(trigger="axis"),
            toolbox_opts=opts.ToolboxOpts(is_show=True),
            xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
        )
            .render("result2.html")
    )
    (
        Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
            .add_xaxis(xaxis_data=date)
            .add_yaxis(
            series_name="新增死亡",
            y_axis=deaths,
            markpoint_opts=opts.MarkPointOpts(
                data=[opts.MarkPointItem(type_="max", name="最大值")]
            ),
            markline_opts=opts.MarkLineOpts(
                data=[
                    opts.MarkLineItem(type_="average", name="平均值"),
                    opts.MarkLineItem(symbol="none", x="90%", y="max"),
                    opts.MarkLineItem(symbol="circle", type_="max", name="最高點"),
                ]
            ),
        )
            .set_global_opts(
            title_opts=opts.TitleOpts(title="美國每日新增死亡折線圖", subtitle=""),
            tooltip_opts=opts.TooltipOpts(trigger="axis"),
            toolbox_opts=opts.ToolboxOpts(is_show=True),
            xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
        )
            .render("result2.html")
    )

# 3.畫出截止5.19,美國各州累計確診、死亡人數(shù)和病死率--->表格
def drawChart_3(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)
    table = Table()
    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
        title_opts=ComponentTitleOpts(title="美國各州疫情一覽", subtitle="")
    )
    table.render("result3.html")

# 4.畫出美國確診最多的10個州——>詞云圖
def drawChart_4(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            row = (str(js['state']), int(js['totalCases']))
            data.append(row)
    c = (
        WordCloud()
            .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
            .set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診Top10"))
            .render("result4.html")
    )

# 5.畫出美國死亡最多的10個州——>象柱狀圖
def drawChart_5(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            state.insert(0, str(js['state']))
            totalDeath.insert( 0, int(js['totalDeaths']))

    c = (
        PictorialBar()
            .add_xaxis(state)
            .add_yaxis(
            "",
            totalDeath,
            label_opts=opts.LabelOpts(is_show=False),
            symbol_size=18,
            symbol_repeat="fixed",
            symbol_offset=[0, 0],
            is_symbol_clip=True,
            symbol=SymbolType.ROUND_RECT,
        )
            .reversal_axis()
            .set_global_opts(
            title_opts=opts.TitleOpts(title="PictorialBar-美國各州死亡人數(shù)Top10"),
            xaxis_opts=opts.AxisOpts(is_show=False),
            yaxis_opts=opts.AxisOpts(
                axistick_opts=opts.AxisTickOpts(is_show=False),
                axisline_opts=opts.AxisLineOpts(
                    linestyle_opts=opts.LineStyleOpts(opacity=0)
                ),
            ),
        )
            .render("result5.html")
    )

# 6.找出美國確診最少的10個州——>詞云圖
def drawChart_6(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            row = (str(js['state']), int(js['totalCases']))
            data.append(row)

    c = (
        WordCloud()
            .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
            .set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診最少的10個州"))
            .render("result6.html")
    )

# 7.找出美國死亡最少的10個州——>漏斗圖
def drawChart_7(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            data.insert(0, [str(js['state']), int(js['totalDeaths'])])
    c = (
        Funnel()
            .add(
            "State",
            data,
            sort_="ascending",
            label_opts=opts.LabelOpts(position="inside"),
        )
            .set_global_opts(title_opts=opts.TitleOpts(title=""))
            .render("result7.html")
    )

# 8.美國的病死率--->餅狀圖
def drawChart_8(index):
    root = "D:\PyProject\Pyspark\keshe\\result" + str(index) + "/part-00000.json"
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:  # 到 EOF,返回空字符串,則終止循環(huán)
                break
            js = json.loads(line)
            if str(js['state']) == "USA":
                values.append(["Death(%)", round(float(js['deathRate']) * 100, 2)])
                values.append(["No-Death(%)", 100 - round(float(js['deathRate']) * 100, 2)])
    c = (
         Pie()
        .add("", values)
        .set_colors(["blcak", "orange"])
        .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
        .set_series_opts(label_opts=opts.LabelOpts(formatter=": {c}"))
        .render("result8.html")
        )
# 可視化主程序:
index = 1
while index < 9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index += 1

?plot01.py

"""
  本py文件:
  繪制美國分縣疫情地圖
  繪制美國新增病例增長曲線
  繪制美國疫情死亡病例增長曲線
  使用三次指數(shù)平滑法進行預(yù)測
"""

import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd
import plotly.express as px
import datetime
from statsmodels.tsa.api import ExponentialSmoothing, SimpleExpSmoothing, Holt
from plotly.offline import init_notebook_mode, iplot
from urllib.request import urlopen
import json
import plotly.io as pio
import plotly.offline as py
pio.renderers.default = "notebook_connected"
init_notebook_mode(connected=True)

ds=pd.read_csv("C:\\Users\\26909\\Desktop\\us-counties-latest.csv")
# 繪制美國疫情分縣地圖
with urlopen('https://raw.githubusercontent.com/plotly/datasets/master/geojson-counties-fips.json') as response:
    counties = json.load(response)

fig = px.choropleth_mapbox(ds, geojson=counties, locations='fips', color='cases',
                           color_continuous_scale="Turbo",
                           range_color=(0, 20000),
                           mapbox_style="carto-positron",
                           hover_name ="county",
                           zoom=3, center = {"lat": 37.0902, "lon": -95.7129},
                           opacity=0.5
                          )
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()
py.plot(fig, filename='1.html')

# 繪制美國新增病例數(shù)增長曲線
ds['date']= pd.to_datetime(ds['date'])
ds.Timestamp=ds["date"]
ds.index = ds.Timestamp
df = ds.resample('D').sum()
fig = go.Figure()
fig.add_trace(go.Scatter(
    x=df.index,
    y=df.cases,
    name="Cases in USA"
))

fig.update_layout(
    font=dict(
        family="Courier New, monospace",
        size=18,
    )
)
fig.show()
py.plot(fig, filename='2.html')

# 繪制美國疫情死亡病例增長曲線
fig = go.Figure()
fig.add_trace(go.Scatter(
    x=df.index,
    y=df.deaths,
    name="Cases in USA"
))
fig.update_layout(
    font=dict(
        family="Courier New, monospace",
        size=18,
    )
)
fig.show()
py.plot(fig, filename='3.html')

# 三次指數(shù)平滑法預(yù)測模型
start = datetime.datetime.strptime("2021-02-14", "%Y-%m-%d")
end = datetime.datetime.strptime("2021-07-13", "%Y-%m-%d")
date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
dt=[]
for date in date_generated:
    dt.append(date.strftime("%Y-%m-%d"))
dtd=pd.DataFrame()
dtd["Date"]=dt
df = pd.read_csv("C:\\Users\\26909\\Desktop\\us-counties-latest.csv")

fig, axes = plt.subplots(2, 1, figsize = (10, 10))
data=pd.DataFrame(df.groupby("date").agg({"cases": "sum","deaths": "sum"}))
ddf=pd.DataFrame(df.groupby("date").agg({"cases": "sum","deaths": "sum"}))

data.insert(2,'increasecases','0')
data.insert(3,'increasedeaths','0')
data=np.asarray(data)

for i in range(1,844):
    data[i][2]=data[i][0]-data[i-1][0]
    data[i][3] = data[i][1] - data[i - 1][1]
data[0][2]=1
data[0][3]=0

fit1 = ExponentialSmoothing(data[:640,2] ,seasonal_periods=130 ,trend='add', seasonal='add',).fit()
pred = fit1.forecast(149)

pred=pred.astype(int)
dtd["Holt"]=pred
dtd.Timestamp=dtd["Date"]
dtd.index = dtd.Timestamp

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=ddf.index,
    y=data[:,2],
    name="Confirmed Cases"
))

fig.add_trace(go.Scatter(
    x=dtd.index,
    y=dtd.Holt,
    name="Future Prediction"
))
fig.update_layout(
    font=dict(
        family="Courier New, monospace",
        size=18,
    )
)
fig.show()
py.plot(fig, filename='4.html')

?

plot02.py

"""
 本py文件:
 繪制美國疫情分州地圖
 繪制美國確診病例各月份占比餅狀圖
"""
import pandas as pd
from plotly.offline import init_notebook_mode, iplot
init_notebook_mode(connected=True)
import plotly.graph_objs as go
import plotly.offline as py
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
plt.style.use('ggplot')

dataset=pd.read_csv("C:\\Users\\26909\\Desktop\\us-counties-latest.csv")
data = dataset.groupby("state").sum().reset_index()

# 繪制美國疫情分州地圖
dat = [dict(
    type="choropleth",
    locations=['AL', 'AK', 'AS','AZ', 'AR', 'CA', 'CO', 'CT', 'DE', "DC", 'FL', 'GA', "GU", 'HI', 'ID', 'IL', 'IN', 'IA',
                'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY',
                'NC', 'ND', "NM", 'OH', 'OK', 'OR', 'PA', "PR", 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', "VI", 'VA',
                'WA', 'WV', 'WI', 'WY'],
    locationmode='USA-states',
    z=data["cases"],
    text=data["state"],
    colorscale=[[0, "#641b0c"], [0.85, "#f85466"], [0.9, "#fa7483"],
                [0.94, "#f88f9a"], [0.97, "#f7bbc1"], [1, "#f1c6cb"]],
    autocolorscale=False,
    reversescale=True,
    marker=dict(
        line=dict(
            width=0.5,
            color='rgba(100,100,100)',
        ),
    ),
    colorbar=dict(
        title="Total Cases",
    )
)]

layout = dict(
    title={
        'text': "Total Cases of States",
        'y': 0.9,
        'x': 0.5,
        'xanchor': 'center',
        'yanchor': 'top'},
    geo=dict(
        showframe=False,
        showcoastlines=True,
        projection=dict(
            type="albers usa"
        ),
        scope="usa"
    )
)
fig = go.Figure(data=dat, layout=layout)
iplot(fig)
fig.show()
py.plot(fig, filename='5.html')

# 繪制美國確診病例各月份占比餅狀圖
a = []
for x in dataset["date"]:
    a.append(x.split("-")[1])
dataset["Month"] = a
data = dataset.groupby("Month").sum().reset_index()
data.replace("01", "January", inplace=True)
data.replace("02", "February", inplace=True)
data.replace("03", "March", inplace=True)
data.replace("04", "April", inplace=True)
data.replace("05", "May", inplace=True)
data.replace("06", "June", inplace=True)
data.replace("07", "July", inplace=True)
data.replace("08", "August", inplace=True)
data.replace("09", "Semptember", inplace=True)
data.replace("10", "October", inplace=True)
data.replace("11", "November", inplace=True)
data.replace("12", "December", inplace=True)

cases = [each for each in data.cases]
deaths = [each for each in data.deaths]

pie_list = [each for each in data.cases]
labels = data.Month
fig = {
    "data": [
        {
            "values": pie_list,
            "labels": labels,
            "domain": {"x": [0, 0.5]},
            "name": "Cases per Month",
            "hoverinfo": "label+percent+name",
            "hole": .3,
            "type": "pie"}, ],

    "layout": {
        "title": "Cases per Month",
        "annotations": [
            {"font": {"size": 20},
             "showarrow": False,
             "text": "Number of Cases",
             "x": 0.20,
             "y": 1.19
             }, ]
    }
}
iplot(fig)
py.plot(fig, filename='6.html')

DBSCAN.py

import pandas as pd
import scipy

def fit_dbscan_iterable(X, minPts=3, radius=1, verbose=True, anim=True, animFrames=100, animFramesAfter=40,
                        animFramesBefore=10):
    """Find clusters using dbscan algorithm

    Parameters
    ----------
    X : list of points
        array of points to find the clusters in (accept: list, numpy or pandas)
    minPts : int (default=3)
        minimum points to be considered a core
    radius : float (default=1)
        radius to find neighbors
    verbose : bool (default=True)
        print some messages during processing
    anim : bool (default=True)
        return intermediary frames to make a animation

    Returns
    -------
    generator object -> (y, cores, cursor)
        returns a generator object (iterable) that will
        return a tuple of (y, cores and cursor) for
        each frame

    """

    radius = float(radius)

    if isinstance(X, pd.DataFrame):
        X = X.to_numpy()

    def locate_neigbors(x1):
        nonlocal X, radius
        result = []

        for x2i, x2 in enumerate(X):
            d = scipy.spatial.distance.euclidean(x1, x2)
            if d < radius:
                result.append([x2i, x2])

        return result;

    animEach = len(X) / animFrames
    animFrame = 0;

    # dados gerais
    cluster_id = 0
    y = [0 for _ in X]
    cores = [0 for _ in X]

    if anim:
        for i in range(animFramesBefore):
            yield (y, cores, None)

    for x1i, x1 in enumerate(X):

        if y[x1i] == 0:

            N = locate_neigbors(x1)

            if len(N) >= minPts:

                cluster_id = cluster_id + 1

                visited = []
                while len(N) > 0:
                    (x2i, x2) = N.pop()

                    if x2i in visited: continue
                    visited.append(x2i)

                    animFrame = animFrame + 1
                    if (animFrame % animEach) == 0:
                        if anim: yield (y, cores, x2)


                    if cores[x2i] == 0:


                        y[x2i] = cluster_id

                        N2 = locate_neigbors(x2)
                        if (len(N2) >= minPts):
                            cores[x2i] = 1
                            for x3i, x3 in N2:

                                if y[x3i] != cluster_id and cores[x3i] == 0:
                                    N.insert(0, [x3i, x3])

    if anim:
        for i in range(animFramesAfter):
            yield (y, cores, None)
            # finalizado, retorna última frame
    yield (y, cores, None)


def fit_dbscan(X, minPts=3, radius=1, verbose=True, anim=False):
    """Find clusters using dbscan algorithm

    Parameters
    ----------
    X : list of points
        array of points to find the clusters in (accept: list, numpy or pandas)
    minPts : int (default=3)
        minimum points to be considered a core
    radius : float (default=1)
        radius to find neighbors
    verbose : bool (default=True)
        print some messages during processing
    anim : bool (default=True)
        return intermediary frames to make a animation

    Returns
    -------
    if anim=False:
        tuple (y, cores, cursor):
            y: list of scalars
                the clusterids which each point belongs to
            cores: list of scalars
                1 if the corresponding X is a core
            cursor: tensor (point)
                None in the last frame
    if anim=True:
        list of tuples (y, cores, cursor)

    """
    if anim:
        return list(fit_dbscan_iterable(X, minPts, radius, verbose, anim=True))
    else:
        return next(fit_dbscan_iterable(X, minPts, radius, verbose, anim=False))

def bloco1():
    global df

    from datetime import datetime
    import pandas as pd
    import os
    pd.options.display.max_columns = None

    df = pd.read_csv("C:\\Users\\26909\\Desktop\\us-counties-latest.csv"
                     )

    df.insert(1, "wday", df['date'].apply(lambda x: int(datetime.strptime(x, '%Y-%m-%d').strftime('%w'))))
    df.insert(1, "mday", df['date'].apply(lambda x: int(datetime.strptime(x, '%Y-%m-%d').strftime('%d'))))
    df = df.dropna()
    df = df.sample(1500)
    return df

def bloco():

    import matplotlib.pyplot as plt

    X = df[['cases', 'deaths']].to_numpy()

    y, cores, cursor = fit_dbscan(X, radius=20, minPts=5, anim=False)

    fig = plt.figure(figsize=(12,5), dpi=300)

    hsv = plt.cm.get_cmap('hsv', 10)

    plt.clf()
    plt.title(f"Covid-19 casos x mortes",fontsize=7)
    plt.rcParams.update({'font.size': 6})
    plt.xticks(())
    plt.yticks(())

    ax = fig.subplots()
    ax.set_xlabel('casos (log)')
    ax.set_ylabel('mortes (log)')

    fig.patch.set_visible(False)
    ax.patch.set_visible(False)

    plt.yscale('log')
    plt.xscale('log')
    res = []

    ax.scatter(
        [X[j][0] for j in range(len(X)) if y[j] == 0],
        [X[j][1] for j in range(len(X)) if y[j] == 0],
        color='#c0c0c0',
        label='Outlier',
        marker='o',
        s=1
    )

    for i in range(1, 10):
        ax.scatter(
            [X[j][0] for j in range(len(X)) if y[j] == i and cores[j] == 0],
            [X[j][1] for j in range(len(X)) if y[j] == i and cores[j] == 0],
            color=hsv(i),
            marker='o',
            s=1
        )
        ax.scatter(
            [X[j][0] for j in range(len(X)) if y[j] == i and cores[j] == 1],
            [X[j][1] for j in range(len(X)) if y[j] == i and cores[j] == 1],
            color=hsv(i),
            label=f'Cluster {i}',
            marker='o',
            s=1
        )

    if not (cursor is None):
        ax.scatter(
            [cursor[0]],
            [cursor[1]],
            color='#000000',
            label='Cursor',
            marker='o',
            s=1
        )

    ax.legend()
    plt.show()

bloco1()
bloco()

?

?

?

?

到了這里,關(guān)于【大數(shù)據(jù)平臺】基于Spark的美國新冠肺炎疫情數(shù)據(jù)分析及預(yù)測的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包