一、前言
1、Hadoop集群框架搭建(學(xué)過,但是沒有現(xiàn)成的)
2、python(機器學(xué)習(xí))
3、Spark(沒有)
4、Flume(沒有)
5、Sqoop(沒有接觸)
6、編程語言: SpringBoot(有)+echarts(數(shù)據(jù)可視化框架)
二、大數(shù)據(jù)概述
1.1?百度百科:大數(shù)據(jù),短期無法運用常規(guī)一些手段去及時處理海量數(shù)據(jù),需要使用新型的技術(shù)進行處理。
1.2 大數(shù)據(jù):
????????a、海量數(shù)據(jù)存儲
????????b、海量數(shù)據(jù)分析(運算,處理)
1.3 大數(shù)據(jù)為了解決事物的未知性,給判斷提供準確性
2.1 企業(yè)中大數(shù)據(jù)開發(fā)流程
2.2 大數(shù)據(jù)的開發(fā)工具包
1、鏈接:https://pan.baidu.com/s/1eg8poPxAN-sLBd53S9hJ1g
2、提取碼:k54t
本次項目中會用到很多開發(fā)工具以及軟件安裝包:
a、操作系統(tǒng),linux操作系統(tǒng)---centos7---最接近企業(yè)開發(fā)版本
b、遠程連接工具-----finalshell遠程連接工具
c、jdk1.8版本
d、數(shù)據(jù)采集工具Flum01.7版本、Sqoop
e、hadoop集群框架----hadoop2.7.3版本
f、kafka2.11版本
g、Vmware虛擬機
h、spark2.1.1版本
i、flink-1.10.2版本
j、idea開發(fā)工具、Maven作jar管理項目創(chuàng)建
2.3 大數(shù)據(jù)環(huán)境搭建
2.3.1 虛擬機安裝和配置
1 虛擬機(Virtual Machine)指通過軟件模擬的具有完整硬件系統(tǒng)功能的、運行在一個完全隔離環(huán)境中的完整計算機系統(tǒng)。
2 在實體計算機中能夠完成的工作在虛擬機中都能夠?qū)崿F(xiàn)。
3 在計算機中創(chuàng)建虛擬機時,需要將實體機的部分硬盤和內(nèi)存容量作為虛擬機的硬盤和內(nèi)存容量。每個虛擬機都有獨立的CMOS、硬盤和操作系統(tǒng),可以像使用實體機一樣對虛擬機進行操作.
4 總結(jié):虛擬機具有獨立內(nèi)存、硬盤容量、cup,是一個完整計算機系統(tǒng),具有優(yōu)點,如果在使用虛擬機的過程中,出現(xiàn)損壞,或者故障,只需要還原虛擬機設(shè)備,就會釋放虛擬機資源,重新配置虛擬機,更加方便使用。
?2.3.2 虛擬機創(chuàng)建
?
?
?2.3.3 搭建linux操作系統(tǒng)
?2.3.4 遠程連接
a、需要開辟22端口 -----ssh方式
b、固定的ip地址
配置一個固定的IP地址:
在linux操作系統(tǒng)中:所有的軟件配置文件,都是以文件方式進行出現(xiàn),也就意味著,配置一個固定ip地址需要一個網(wǎng)卡文件
/etc/sysconfig/network-scripts/ifcfg-ens33
vi /etc/sysconfig/network-scripts/ifcfg-ens33
?重啟網(wǎng)絡(luò):
service network restart
systemctl restart network.service
ip addr ----查詢ip地址
ping www.baidu.com
使用netstat方式查看端口是否存在:
netstat -tln | grep 22
上述問題出現(xiàn)原因是因為,系統(tǒng)為純凈版的系統(tǒng),需要手動安裝
yum install net-tools -y
?2.3.5 時間同步
時間同步:把時間跟某個服務(wù)器時間進行統(tǒng)一
阿里時間同步服務(wù)器:
ntp.aliyun.com
ntp1.aliyun.com
ntp2.aliyun.com
[root@hadoop1 ~]# ntpdate ntp.aliyun.com -bash: ntpdate: 未找到命令
解決方案:
[root@hadoop1 ~]# yum install ntp -y
[root@hadoop1 ~]# ntpdate ntp.aliyun.com
5 Dec 15:24:49 ntpdate[36743]: step time server 203.107.6.88 offset -28800.771257 sec [root@hadoop1 ~]# date
2022年 12月 05日 星期一 15:25:10 CST
2.3.6 ip地址映射主機名
[root@hadoop1 ~]# vi /etc/hosts
192.168.91.100 hadoop1? ? ? ? ? ? ? ? ? ? ? ?------hadoop1為主機名
測試:
[root@hadoop1 ~]# ping hadoop1
PING hadoop1 (192.168.91.100) 56(84) bytes of data.
64 bytes from hadoop1 (192.168.91.100): icmp_seq=1 ttl=64 time=0.138 ms
2.3.7 安裝和配置jdk環(huán)境變量
由于hadoop生態(tài)圈和spark生態(tài)圈都需要jvm支持,需要在linux操作系統(tǒng)中進行環(huán)境變量的配置 a、創(chuàng)建一個文件夾----存放軟件
[root@hadoop1 ~]# mkdir /soft
b、上傳jdk軟件包到/soft文件夾中
[root@hadoop1 ~]# tar -zxvf /soft/jdk-8u162-linux-x64.tar.gz -C /opt
tar:常用于解壓或者壓縮操作
-z:解壓文件的后綴名為.gz
-x:解壓
-v:顯示解壓的過程
-f:表示目標文件
-C:表示解壓后的位置
c、配置jdk環(huán)境變量 在linux操作系統(tǒng)中,環(huán)境變量
系統(tǒng)環(huán)境變量:/etc/profile
當前用戶環(huán)境變量:~/bash_profile
[root@hadoop1 ~]# vi /etc/profile
#JDK的環(huán)境變量
export JAVA_HOME=/opt/jdk1.8.0_162
export PATH=$JAVA_HOME/bin:$PATH:$HOME/bin
d、生效環(huán)境變量
[root@hadoop1 ~]# source /etc/profile
e、測試環(huán)境變量是否配置成功
[root@hadoop1 ~]# java -version
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
2.3.8 虛擬機克隆和配置
由于項目中所采用為集群模式,需要多臺節(jié)點,克隆多臺虛擬以供使用
準備:關(guān)閉克隆虛擬機
?再次進行上述操作:
分別在hadoop2和hadoop3進行以下操作:
打開hadoop2:
?修改主機名:
?重啟虛擬機:
?配置遠程連接:
?同步時間:
[root@hadoop2 ~]# ntpdate ntp.aliyun.com
5 Dec 16:17:34 ntpdate[2136]: step time server 203.107.6.88 offset -28800.935828 sec
在hadoop3中進行上述操作:
?3臺節(jié)點之間映射關(guān)系:
[root@hadoop1 ~]# vi /etc/hosts
192.168.91.100 hadoop1
192.168.91.101 hadoop2
192.168.91.102 hadoop3
[root@hadoop2 ~]# vi /etc/hosts
192.168.91.100 hadoop1
192.168.91.101 hadoop2
192.168.91.102 hadoop3
[root@hadoop3 ~]# vi /etc/hosts
192.168.91.100 hadoop1
192.168.91.101 hadoop2
192.168.91.102 hadoop3
測試:
[root@hadoop1 ~]# ping hadoop1
PING hadoop1 (192.168.91.100) 56(84) bytes of data.
64 bytes from hadoop1 (192.168.91.100): icmp_seq=1 ttl=64 time=0.050 ms
64 bytes from hadoop1 (192.168.91.100): icmp_seq=2 ttl=64 time=0.041 ms
^Z
[1]+ 已停止? ? ? ? ? ?ping hadoop1
[root@hadoop1 ~]# ping hadoop2
PING hadoop2 (192.168.91.101) 56(84) bytes of data.
64 bytes from hadoop2 (192.168.91.101): icmp_seq=1 ttl=64 time=0.704 ms
64 bytes from hadoop2 (192.168.91.101): icmp_seq=2 ttl=64 time=0.440 ms
^Z [2]+ 已停止? ? ? ?ping hadoop2
[root@hadoop1 ~]# ping hadoop3
PING hadoop3 (192.168.91.102) 56(84) bytes of data.
64 bytes from hadoop3 (192.168.91.102): icmp_seq=1 ttl=64 time=0.507 ms
64 bytes from hadoop3 (192.168.91.102): icmp_seq=2 ttl=64 time=0.474 ms
^Z
[3]+ 已停止 ping? ? ? hadoop3
檢查ip地址:
ip addr
查看ip地址是否匹配
?檢查無線網(wǎng)卡如果不存在
?
?https://www.ccleaner.com/zh-cn
?卸載vm,重新安裝
2.3.9 Hadoop完全分布式搭建
namenode |
secondarynamenode |
datanode |
resourcemanager |
nodemanager |
|
hadoop1 |
√ |
√ |
√ |
√ |
|
hadoop2 |
√ |
√ |
√ |
||
hadoop3 |
√ |
√ |
a、上傳hadoop軟件包到soft文件夾
b、解壓Hadoop的軟件包
[root@hadoop1 ~]# tar -zxvf /soft/hadoop-2.7.3.tar.gz -C /opt
c、配置hadoop的環(huán)境變量
[root@hadoop1 ~]# vi /etc/profile
export HADOOP_HOME=/opt/hadoop-2.7.3
export?PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin
d、生效環(huán)境變量
[root@hadoop1 ~]# source /etc/profile
e、測試
[root@hadoop1 ~]# hadoop version
Hadoop 2.7.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r?baa91f7c6bc9cb92be5982de4719c1c8af91ccff
Compiled by root on 2016-08-18T01:41Z
Compiled with protoc 2.5.0
From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4
This command was run using /opt/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar
hadoop配置文件講解:
參考文檔:https://hadoop.apache.org/
hadoop主要模塊:
a、hadoop common:hadoop的通用模塊,為其他模塊提供支持
b、hdfs:hadoop分布式文件系統(tǒng)
c、hadoop yarn:hadoop資源調(diào)度平臺
d、hadoop MapReduce:分布式計算框架
https://hadoop.apache.org/docs/r2.7.3/
修改hadoop配置文件:
[root@hadoop1 ~]# cd /opt/hadoop-2.7.3/etc/hadoop/
[root@hadoop1 hadoop]# vi hadoop-env.sh
export JAVA_HOME=/opt/jdk1.8.0_162
[root@hadoop1 hadoop]# vi core-site.xml
<configuration> <!-- namenode 默認通訊地址 --> <property> <name>fs.defaultFS</name> <value>hdfs://hadoop1:9000</value> </property> <!-- 整個集群基礎(chǔ)路徑 --> <property> <name>hadoop.tmp.dir</name> <value>/opt/tmp/hadoop</value> </property> </configuration> [root@hadoop1 hadoop]# vi hdfs-site.xml <configuration> <!-- hadoop塊的備份數(shù) --> <property> <name>dfs.replication</name> <value>3</value> </property> <!-- secondarynamenode http訪問地址 --> <property> <name>dfs.namenode.secondary.http-address</name> <value>hadoop2:50090</value> </property> </configuration> [root@hadoop1 hadoop]# cp mapred-site.xml.template mapred-site.xml [root@hadoop1 hadoop]# vi mapred-site.xml <configuration> <!-- 整個集群運行框架是yarn --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> [root@hadoop1 hadoop]# vi yarn-site.xml <configuration> <!-- 中間服務(wù) shuffle --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
[root@hadoop1 hadoop]# vi slaves
hadoop1
hadoop2
hadoop3
需要把配置好文件分發(fā)hadoop2、hadoop3節(jié)點上:
[root@hadoop1 ~]# scp /etc/profile hadoop2:/etc/profile
The authenticity of host 'hadoop2 (192.168.91.101)' can't be established.
ECDSA key fingerprint is SHA256:ETL5Iad3RarttSkJLbFPlEn/KKUBAnHyMcttoUZxhHM.
ECDSA key fingerprint is MD5:5f:31:bc:fa:0f:74:a7:55:9c:ec:59:94:bd:14:ca:5b.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'hadoop2,192.168.91.101' (ECDSA) to the list of known hosts. root@hadoop2's password:
profile
[root@hadoop1 ~]# scp /etc/profile hadoop3:/etc/profile
The authenticity of host 'hadoop3 (192.168.91.102)' can't be established.
ECDSA key fingerprint is SHA256:ETL5Iad3RarttSkJLbFPlEn/KKUBAnHyMcttoUZxhHM.
ECDSA key fingerprint is MD5:5f:31:bc:fa:0f:74:a7:55:9c:ec:59:94:bd:14:ca:5b.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'hadoop3,192.168.91.102' (ECDSA) to the list of known hosts. root@hadoop3's password: profile
[root@hadoop1 ~]# scp -r /opt/hadoop-2.7.3 hadoop2:/opt/
[root@hadoop1 ~]# scp -r /opt/hadoop-2.7.3 hadoop3:/opt/
分別在hadoop1、hadoop2、hadoop3中關(guān)閉防火墻
[root@hadoop1 ~]# systemctl stop firewalld
[root@hadoop2 ~]# systemctl stop firewalld
[root@hadoop3 ~]# systemctl stop firewalld
在hadoop1中進行格式化namenode
[root@hadoop1 ~]# hadoop namenode -format
2.3.10 三個節(jié)點中配置免密
分別在hadoop1、hadoop2、hadoop3中依次進行以下操作
[root@hadoop1 ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:7rTCj+LqM95Gk1nUniyf1Yy0NM1W7FAPR6slCTvDYGo root@hadoop1
The key's randomart image is:
+---[RSA 2048]----+
| . o + ++.o|
| . + * B o+.|
| . E + % = o.|
| + + + = = |
| + oSo . |
| = .o |
| . o o |
| o.o o+ . |
| o+*o..o+ |
+----[SHA256]-----+
[root@hadoop2 ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Created directory '/root/.ssh'.
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa. Y
our public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:C3azvXal3IjmRmD/FClkEmxzS17X8TMOCWRV/0OgvSM root@hadoop2
The key's randomart image is:
+---[RSA 2048]----+
| ....+.o.+.|
| = * = + +|
| . O + * +o|
| o + o = +|
| o.So E + o.|
| . o =o o o .|
| o..= = |
| =.* . |
| =o. |
+----[SHA256]-----+
[root@hadoop3 ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Created directory '/root/.ssh'. Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:iswVFWqHXhN4BaIT2mMD0EwfAc8KC+hw41lVbK5sGFg root@hadoop3
The key's randomart image is:
+---[RSA 2048]----+
|.=+.+.o+*+. |
|. oBE=.=+. |
|= +o% =++ |
|+=.*.* +.. |
|..+ +o.S |
| o.o+. |
| +.. |
| |
| |
+----[SHA256]-----+
分別在hadoop2和hadoop3中進行操作:
[root@hadoop2 ~]# cp .ssh/id_rsa.pub hadoop2_id_rsa.pub
[root@hadoop2 ~]# scp hadoop2_id_rsa.pub hadoop1:.ssh/
[root@hadoop3 ~]# cp .ssh/id_rsa.pub hadoop3_id_rsa.pub
[root@hadoop3 ~]# scp hadoop3_id_rsa.pub hadoop1:.ssh/
在hadoop1中:
[root@hadoop1 ~]# cd .ssh
[root@hadoop1 .ssh]# cat hadoop2_id_rsa.pub hadoop3_id_rsa.pub id_rsa.pub >> authorized_keys
分別把authorized_keys發(fā)送到hadoop2和hadoop3中
[root@hadoop1 .ssh]# scp authorized_keys hadoop2:.ssh/
root@hadoop2's password:
authorized_keys
[root@hadoop1 .ssh]# scp authorized_keys hadoop3:.ssh/
root@hadoop3's password:
authorized_keys
分別在hadoop1、hadoop2和hadoop3中進行權(quán)限設(shè)置
[root@hadoop1 ~]# chmod 700 .ssh
[root@hadoop1 ~]# chmod 600 .ssh/authorized_keys
[root@hadoop2 ~]# chmod 700 .ssh
[root@hadoop2 ~]# chmod 600 .ssh/authorized_keys
[root@hadoop3 ~]# chmod 700 .ssh/
[root@hadoop3 ~]# chmod 600 .ssh/authorized_keys
測試:
[root@hadoop1 ~]# ssh hadoop2 Last login: Tue Dec 6 17:09:08 2022 from 192.168.91.1 [root@hadoop2 ~]# exit
登出
Connection to hadoop2 closed.
[root@hadoop1 ~]# ssh hadoop3
Last login: Tue Dec 6 17:09:12 2022 from 192.168.91.1
[root@hadoop3 ~]# exit4 -bash: exit4: 未找到命令
[root@hadoop3 ~]# exit
登出 Connection to hadoop3 closed.
2.3.11 hadoop啟動命令
[root@hadoop1 ~]# start-all.sh? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ---啟動hadoop所有守護進程
在hadoop1:
[root@hadoop1 ~]# jps
54578 DataNode
56274 Jps
55315 ResourceManager
54314 NameNode
55471 NodeManager
hadoop2:
[root@hadoop2 ~]# jps
29076 SecondaryNameNode
29284 NodeManager
28842 DataNode
30090 Jps
hadoop3:
[root@hadoop3 ~]# jps
28786 DataNode
29154 NodeManager
30197 Jps
通過網(wǎng)頁方式進行訪問:
http://192.168.91.100:50070
?http://192.168.91.100:8088? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?----yarn
?2.3.12 Flume安裝和配置
在企業(yè)中,經(jīng)常需要在多個節(jié)點中進行采集數(shù)據(jù),推介使用FLume進行數(shù)據(jù)采集,本節(jié)課主要內(nèi)容講解flume基礎(chǔ)配置和簡單應(yīng)用
a、上傳flume安裝包
b、解壓flume安裝包
[root@hadoop1 ~]# tar -zxvf /soft/apache-flume-1.7.0-bin.tar.gz -C /opt
c、配置flum的環(huán)境變量
[root@hadoop1 ~]# vi /etc/profile
#Flume的環(huán)境變量
export FLUME_HOME=/opt/apache-flume-1.7.0-bin
export?PATH=$JAVA_HOME/bin:$FLUME_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin
d、生效flume的環(huán)境變量
[root@hadoop1 ~]# source /etc/profile
e、測試 [root@hadoop1 ~]# flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
2.4 Windows開發(fā)環(huán)境配置
2.4.1 jdk安裝和環(huán)境變量配置
jdk選用版本----jdk1.8
配置jdk環(huán)境變量:
選擇此電腦---->右鍵----->屬性------>高級系統(tǒng)設(shè)置----->環(huán)境變量
追加到path路徑下
測試:
win鍵+r 輸入cmd進入dos命令窗口:
C:\Users\error>java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
2.4.2 Maven安裝和環(huán)境變量配置
解壓Maven的安裝包:
?選擇此電腦---->右鍵----->屬性------>高級系統(tǒng)設(shè)置----->環(huán)境變量
?追加path路徑下:
測試:
win鍵+r鍵輸入cmd
C:\Users\error>mvn -v
Apache Maven 3.0.5 (r01de14724cdef164cd33c7c8c2fe155faf9602da; 2013-02-19 21:51:28+0800)
Maven home: D:\apache-maven-3.0.5\bin\..
Java version: 1.8.0_151, vendor: Oracle Corporation
Java home: C:\Program Files\Java\jdk1.8.0_151\jre
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 10", version: "10.0", arch: "amd64", family: "dos"
?2.4.3 IDAE開發(fā)工具安裝和配置
?
?
?2.4.4 IDEA集成Maven開發(fā)環(huán)境
?2.5 Spark環(huán)境搭建
a、上傳spark軟件包linux操作系統(tǒng)
b、解壓Spark軟件包
[root@hadoop1 ~]# tar -zxvf /soft/spark-2.1.1-bin-hadoop2.7.tgz -C /opt
c、配置Spark的環(huán)境變量
[root@hadoop1 ~]# vi /etc/profile
#Spark的環(huán)境變量
export SPARK_HOME=/opt/spark-2.1.1-bin-hadoop2.7
export?PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$FLUME_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin
d、修改spark配置文件
[root@hadoop1 conf]# pwd
/opt/spark-2.1.1-bin-hadoop2.7/conf
修改spark的啟動文件:
[root@hadoop1 conf]# cp spark-env.sh.template spark-env.sh
[root@hadoop1 conf]# vi spark-env.sh
export JAVA_HOME=/opt/jdk1.8.0_162
修改slaves文件:
[root@hadoop1 conf]# cp slaves.template slaves
[root@hadoop1 conf]# vi slaves
hadoop1
hadoop2
hadoop3
把配置Spark發(fā)送到hadoop2、hadoop3節(jié)點上:
[root@hadoop1 ~]# scp -r /opt/spark-2.1.1-bin-hadoop2.7 hadoop2:/opt/
[root@hadoop1 ~]# scp -r /opt/spark-2.1.1-bin-hadoop2.7 hadoop3:/opt/
把系統(tǒng)配置文件也發(fā)送到hadoop2、hadoop3節(jié)點上:
[root@hadoop1 ~]# scp /etc/profile hadoop2:/etc/profile
profile? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 100% 2183 1.7MB/s 00:00
[root@hadoop1 ~]# scp /etc/profile hadoop3:/etc/profile
profile
分別在hadoop2和hadoop3上執(zhí)行source
[root@hadoop1 ~]# cd /opt/spark-2.1.1-bin-hadoop2.7/sbin/
[root@hadoop1 ~]# cd /opt/spark-2.1.1-bin-hadoop2.7/sbin/
[root@hadoop1 sbin]# ./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out
hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out
hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out
?2.6 搭建一下Windows--spark的開發(fā)環(huán)境
安裝scala工具包
scala-2.11.0.msi-----自行安裝
配置scala環(huán)境變量
選擇此電腦---右鍵-----屬性-----高級系統(tǒng)設(shè)置------環(huán)境變量
?追加到path路徑下:
?win鍵+r鍵 輸入cmd
C:\Users\error>scala
Welcome to Scala version 2.11.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151).
Type in expressions to have them evaluated.
Type :help for more information. scala> 16*16
res0: Int = 256
idea安裝scala插件:
?創(chuàng)建一個scala項目:
?
三、大數(shù)據(jù)技術(shù)應(yīng)用
3.1 Flume的應(yīng)用
3.1.1 Flume的初體驗
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html? ? ? ?-----flume的官方文檔
flume本質(zhì):一個agent,表示一個代理,相當于一個JVM
agent是由多個組件組成,常用的組件有source、channel、sink
source:數(shù)據(jù)源組件------主要用于采集數(shù)據(jù)
channel:管道的組件-----緩沖池
sink:下沉組件---------目的地
類比生活中真實事物------自來水場(source)--------管道(channel)---------家中水龍頭(下沉)
上述案例中:以webserver產(chǎn)生日志數(shù)據(jù)作為數(shù)據(jù)源------使用內(nèi)存(文件)作為channel------數(shù)據(jù)下沉hdfs中,hdfs作為的sink
需求:
采集sockt產(chǎn)生數(shù)據(jù),把數(shù)據(jù)下沉到日志中
source:socket
channel:memory
sink:logger(日志)
準備工作:
[root@hadoop1 ~]# yum install nc -y
小測試:
開啟一個服務(wù)端:
[root@hadoop1 ~]# nc -l 55555
客戶端連接上了服務(wù)端:
[root@hadoop1 ~]# nc hadoop1 55555
flume進行數(shù)據(jù)采集----相當于是服務(wù)端:
[root@hadoop1 ~]# mkdir /flume-run
編寫flume執(zhí)行案例:
[root@hadoop1 ~]# vi /flume-run/nc-memory-logger.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 詳細配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop1
a1.sources.r1.port = 44444# 詳細配置sink
a1.sinks.k1.type = logger# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執(zhí)行flume的數(shù)據(jù)采集文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/nc-memory-logger.conf -Dflume.root.logger=INFO,console
-n:表示agent的名稱
-c:表示flume配置
-f:表示執(zhí)行flume文件
-D: 表示Java屬性配置信息
2022-12-06 19:15:53,140 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 ? ? ? ? ? ? ? ?hello flume }
2022-12-06 19:15:58,032 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6E 63 ? ? ? ? ? ? ? ? ? ? ? ? hello nc }
2022-12-06 19:16:00,688 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 61 64 6F 6F 70 ? ? ? ? ? ? hello hadoop }客戶端:
[root@hadoop1 ~]# nc hadoop1 44444
hello flume
OK
hello nc
OK
hello hadoop
OK
3.1.2 企業(yè)開發(fā)中----離線數(shù)據(jù)采集
日志類型數(shù)據(jù),不需要立馬得出結(jié)論??梢酝ㄟ^設(shè)置定時器方式,定時進行采集
模擬企業(yè)開發(fā)中,定時采集案例:
定時采集數(shù)據(jù)到hdfs上
數(shù)據(jù)源:采集整個文件夾方式----dir
管道:內(nèi)存----memory
下沉:hdfs分布式文件系統(tǒng)上
a、創(chuàng)建一個文件夾----用來存放采集時需要文件數(shù)據(jù)
[root@hadoop1 ~]# kill -9 101330 ? ?----殺死進程
[root@hadoop1 ~]# mkdir /web_log
[root@hadoop1 ~]# vi /web_log/a.log
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
[root@hadoop1 ~]# vi /web_log/b.log
hello,c
java,linux,hadoop
spark,c,python
java,c,python
需求把上述兩個日志文件數(shù)據(jù)采集到hdfs上。
[root@hadoop1 ~]# vi /flume-run/dir-memory-hdfs.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 詳細配置source
a1.sources.r1.type = spooldir
#需要采集文件夾位置
a1.sources.r1.spoolDir = /web_log/# 詳細配置sink
a1.sinks.k1.type = hdfs
#需要采集文件到hfds中位置
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d
#防止采集時候出現(xiàn)大量小文件
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#設(shè)置hdfs中文件類型
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#設(shè)置本地的時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執(zhí)行文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/dir-memory-hdfs.conf -Dflume.root.logger=INFO,console
測試:
[root@hadoop1 ~]# hadoop fs -cat /flume/events/2022-12-06/FlumeData.1670337504112
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
3.1.3 企業(yè)開發(fā)中----實時數(shù)據(jù)采集
實時數(shù)據(jù)如何進行監(jiān)控
模擬:
構(gòu)建實時數(shù)據(jù)的文件:
[root@hadoop1 ~]# vi /tmp/gp.log ? ? ?----空文件
需求數(shù)據(jù)會實時寫入gp.log文件中,需要采集gp.log中實時數(shù)據(jù)
編輯flume采集文件:
[root@hadoop1 ~]# vi /flume-run/file-memory-logger.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 詳細配置source
a1.sources.r1.type = exec
# 實時去關(guān)注文件最后一行數(shù)據(jù)
a1.sources.r1.command = tail -F /tmp/gp.log# 詳細配置sink
a1.sinks.k1.type = logger# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
運行flume采集文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/file-memory-logger.conf -Dflume.root.logger=INFO,console
測試:
[root@hadoop1 ~]# echo hello java >> /tmp/gp.log?
[root@hadoop1 ~]# echo hello hadoop >> /tmp/gp.log
測試結(jié)果展示:
ava:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6A 61 76 61 ? ? ? ? ? ? ? ? ? hello java }
2022-12-06 23:35:46,393 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 61 64 6F 6F 70 ? ? ? ? ? ? hello hadoop }
3.2 第一個Maven項目
?3.3 復(fù)習(xí)
a、hadoop完全分布式搭建
hadoop namenode -format ?-----不能多次格式化namenode
b、flume安裝和配置
解壓flume軟件包、配置flume的環(huán)境變量、編寫flume執(zhí)行文件
c、flume在企業(yè)中常規(guī)應(yīng)用
離線數(shù)據(jù)的采集和實時數(shù)據(jù)的采集
d、windows中配置開發(fā)環(huán)境
?a、jdk環(huán)境變量配置
?b、Maven環(huán)境變量的配置
?c、idea安裝和破解
?d、idea繼承Maven的開發(fā)環(huán)境
?e、創(chuàng)建Maven項目
3.4 利用Java代碼操作HDFS
Hadoop軟件本身由java語言編寫而成,可以說Hadoop跟Java是無縫對接。java中提供一套API可以用于操作的HDFS
在hdfs中提供很多操作文件系統(tǒng)命令:
創(chuàng)建文件夾
[root@hadoop1 ~]# hadoop fs -mkdir /linuxMkdir
刪除文件夾
[root@hadoop1 ~]# hadoop fs -rmdir /linuxMkdir
上傳一個文件到hdfs上:
[root@hadoop1 ~]# hadoop fs -put anaconda-ks.cfg /linuxMkdir
[root@hadoop1 ~]# hadoop fs -lsr /linuxMkdir
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- ? 3 root supergroup ? ? ? 1257 2022-12-07 07:02 /linuxMkdir/anaconda-ks.cfg
下載一個文件到linux中:
[root@hadoop1 ~]# hadoop fs -get /linuxMkdir/anaconda-ks.cfg /tmp
根據(jù)上述命令,編寫java代碼:
準備工作:
添加hadoop相關(guān)依賴
Maven:
? ?a、用于構(gòu)建項目
? ?b、管理jar包
? ?c、項目打包、運行工作<dependency> ? <groupId>org.apache.hadoop</groupId> ? <artifactId>hadoop-client</artifactId> ? <version>2.7.3</version> </dependency> <dependency> ? <groupId>org.apache.hadoop</groupId> ? <artifactId>hadoop-common</artifactId> ? <version>2.7.3</version> </dependency> <dependency> ? <groupId>org.apache.hadoop</groupId> ? <artifactId>hadoop-hdfs</artifactId> ? <version>2.7.3</version> </dependency>
解決方法:[root@hadoop1 ~]# hadoop fs -chmod -R 777 /
package org.tjcj.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
/**
* 主要用于操作hdfs
*/
public class HDFSUtil {
//文件系統(tǒng)
private static FileSystem fileSystem;
static{//靜態(tài)代碼塊主要作用用于實例化fileSystem
//獲取當前的hadoop開發(fā)環(huán)境
Configuration conf = new Configuration();
//設(shè)置文件系統(tǒng)類型
conf.set("fs.defaultFS","hdfs://192.168.91.100:9000");
if(fileSystem == null){
try {
fileSystem = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 創(chuàng)建一個文件夾方法
* @param path
*/
public static void createDir(String path) throws IOException {
boolean flag = false;
if (!fileSystem.exists(new Path(path))){//如果文件夾不存在
flag = fileSystem.mkdirs(new Path(path));
}
if(flag){
System.out.println("文件夾已經(jīng)創(chuàng)建成功。。。");
}else{
System.out.println("文件夾已經(jīng)存在。。。。");
}
}
/**
* 刪除文件夾以及文件
* @param path
*/
public static void delete(String path) throws IOException {
boolean flag = false;
if(fileSystem.exists(new Path(path))){
flag = fileSystem.delete(new Path(path),true);
}
if(flag){
System.out.println("文件或者文件夾已經(jīng)刪除");
}else{
System.out.println("文件或者文件夾不存在");
}
}
/**
* 上傳到hdfs上
* @param srcPath
* @param destPath
* @throws IOException
*/
public static void uploadFile(String srcPath,String destPath) throws IOException {
fileSystem.copyFromLocalFile(false,true,new Path(srcPath),new Path(destPath));
System.out.println("文件上傳成功!??!");
}
public static void downloadFile(String srcPath,String destPath) throws IOException {
fileSystem.copyToLocalFile(false,new Path(srcPath),new Path(destPath),true);
System.out.println("文件下載成功");
}
public static void main(String[] args) throws IOException {
// createDir("/javaMkdir");
// delete("/javaMkdir");
// uploadFile("D:\\data\\input\\lol.txt","/javaMkdir");
downloadFile("/javaMkdir/lol.txt","D:\\");
}
}
3.5 MapReduce----wordcount
簡單的編程模型:
map:利用核心思想,并行處理
reduce:聚合,把map端輸出的信息做一個統(tǒng)計
引入依賴:
<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-mapreduce-client-common</artifactId>
? <version>2.7.3</version>
</dependency>
<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-mapreduce-client-core</artifactId>
? <version>2.7.3</version>
</dependency>
代碼:
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
?* 單詞計數(shù)
?*/
public class WordCount {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? //2、根據(jù),進行切割
? ? ? ? ? ? String [] splits = line.split(",");
? ? ? ? ? ? //3、寫出到shuffle階段
? ? ? ? ? ? for (String split : splits) {
? ? ? ? ? ? ? ?context.write(new Text(split),new LongWritable(1)); //[hadoop,1]
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ?long count=0;
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? count+=value.get();
? ? ? ? ? ? }
? ? ? ? ? ? context.write(key,new LongWritable(count));
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(WordCount.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/flume/events/2022-12-06/FlumeData.1670337504112"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out1"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out1/part-r-00000
c ? ? ? 5
hadoop ?4
hello ? 5
java ? ?4
linux ? 4
python ?2
spark ? 2
wordcount:單詞計數(shù),統(tǒng)計單詞出現(xiàn)次數(shù),利用shuffle階段特性:
a、合并key值相同
b、根據(jù)key值進行自然排序
c、根據(jù)key值跟reduce個數(shù)求模,分配到不同的reduce中
3.6 TopN問題熱賣榜,topn類似問題,可以使用MapReduce做處理
接著上午案例:統(tǒng)計大學(xué)中,學(xué)科所修次數(shù),統(tǒng)計排名靠前學(xué)科(統(tǒng)計前三學(xué)科)
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.TreeMap;
/**
?* 統(tǒng)計學(xué)科排名
?*/
public class TopN {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<Long,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? treeMap.put(Long.parseLong(splits[1]),splits[0]);
? ? ? ? }
? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>3){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (Long aLong : treeMap.keySet()) {
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(aLong)),new LongWritable(aLong));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(TopN.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out1/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out2"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out2/part-r-00000
hello ?5
linux ? 4
spark ? 2
有瑕疵,如何避免出現(xiàn)
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.TreeMap;
/**
?* 統(tǒng)計學(xué)科排名
?*/
public class TopN {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<String,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? //生成新的key
? ? ? ? ? ? String newKey=splits[1]+"-"+Math.random();
? ? ? ? ? ? System.out.println(newKey);
? ? ? ? ? ? treeMap.put(newKey,splits[0]);
? ? ? ? }
? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>3){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (String s : treeMap.keySet()) {
? ? ? ? ? ? ? ? String [] str = s.split("-");
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(s)),new LongWritable(Long.parseLong(str[0])));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(TopN.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out1/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out3"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out3/part-r-00000
c ? ? ? 5
hadoop ?4
hello ? 5
3.7 數(shù)據(jù)清洗數(shù)據(jù)清洗第一步:數(shù)據(jù)去重
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
原始文件:
[root@hadoop1 web_log]# cat c.log?
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
上述文件中存在很多重復(fù)文件,需求對重復(fù)文件進行去重工作:
[root@hadoop1 ~]# hadoop fs -put /web_log/c.log /linuxMkdir/
[root@hadoop1 ~]# hadoop fs -chmod -R 777 /linuxMkdir/
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
?* 數(shù)據(jù)去重
?*/
public class DupMR {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? context.write(value,NullWritable.get());
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? context.write(key,NullWritable.get());
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(DupMR.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(NullWritable.class);
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(NullWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/linuxMkdir/c.log"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out5"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out5/part-r-00000
hadoop,spark,linux
hello,c
hello,java,hello,hadoop
hello,linux,c,hadoop
hello,linux,java,c
java,c,python
java,linux,hadoop
spark,c,python
作業(yè):
統(tǒng)計2020年中ban選英雄排名前五的
思路:
? ?a、統(tǒng)計英雄ban選的次數(shù)
? ?b、統(tǒng)計排名前五
3.8 處理英雄聯(lián)盟數(shù)據(jù)案例
代碼:
第一步,統(tǒng)計ban選英雄的次數(shù)
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
?* 統(tǒng)計英雄ban選次數(shù)
?*/
public class BanMR1 {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? if(!line.contains("agt")){//去除第一行數(shù)據(jù)
? ? ? ? ? ? ? ? //2、根據(jù)\t,進行切割
? ? ? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? ? ? //3、寫出到shuffle階段
? ? ? ? ? ? ? ? for (int i=10;i<15;i++) {//找到ban選英雄
? ? ? ? ? ? ? ? ? ? context.write(new Text(splits[i]),new LongWritable(1)); //[hadoop,1]
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ?long count=0;
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? count+=value.get();
? ? ? ? ? ? }
? ? ? ? ? ? context.write(key,new LongWritable(count));
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(BanMR1.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/javaMkdir/lol.txt"));
? ? ? ? //設(shè)置輸出路徑,輸出路徑一定要確保不存在
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out6"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out6/part-r-00000
Aatrox ?93
Akali ? 123
Alistar 7
Aphelios ? ? ? ?545
Ashe ? ?71
Aurelion Sol ? ?10
Azir ? ?268
Bard ? ?178
Blitzcrank ? ? ?42
Braum ? 92
Caitlyn 81
。。。
第二步,選取ban選英雄排名前五的
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.TreeMap;
/**
?* 統(tǒng)計英雄ban選的排名
?*/
public class BanMR2 {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<Long,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? treeMap.put(Long.parseLong(splits[1]),splits[0]);
? ? ? ? }
? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>5){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (Long aLong : treeMap.keySet()) {
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(aLong)),new LongWritable(aLong));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(BanMR2.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out6/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out8" ));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out8/part-r-00000
Aphelios ? ? ? ?545
Kalista 460
LeBlanc 474
Sett ? ?602
Varus ? 469
3.9 統(tǒng)計2020年春季賽中,LPL賽區(qū)中,拿到一血隊伍的勝率分析:
條件春季賽,lpl賽區(qū)
package org.tjcj.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
?* 統(tǒng)計拿到1血隊伍的勝率
?*/
public class WinFirstBlod {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? if(!line.contains("agt") && line.contains("Spring") && line.contains("LPL")){//根據(jù)要求進行數(shù)據(jù)的篩選
? ? ? ? ? ? ? ? //2、切分數(shù)據(jù)
? ? ? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? ? ? int firstBlood = Integer.parseInt(splits[24]);
? ? ? ? ? ? ? ? if(firstBlood==1){//拿過1血隊伍
? ? ? ? ? ? ? ? ? ? context.write(new Text("FirstBlood"),new Text(splits[16]+","+firstBlood));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? long sum=0;//統(tǒng)計拿一血隊伍總次數(shù)
? ? ? ? ? ? long win=0;//拿到1血且勝利情況
? ? ? ? ? ? for (Text value : values) {
? ? ? ? ? ? ? ? String line =value.toString();
? ? ? ? ? ? ? ? String [] str = line.split(",");
? ? ? ? ? ? ? ? int result=Integer.parseInt(str[0]);
? ? ? ? ? ? ? ? if(result==1){//隊伍獲勝了
? ? ? ? ? ? ? ? ? ? win++;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? sum++;
? ? ? ? ? ? }
? ? ? ? ? ? context.write(new Text("拿1血隊伍的勝率"),new LongWritable(win*100/sum));
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(WinFirstBlod.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(Text.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/javaMkdir/lol.txt"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out9"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out9/part-r-00000
拿1血隊伍的勝率 63
加入日志依賴:
<!--添加日志-->
<dependency>
? <groupId>log4j</groupId>
? <artifactId>log4j</artifactId>
? <version>1.2.17</version>
</dependency>
log4j屬性文件:
log4j.rootLogger=INFO,console
log4j.additivity.org.apache=true
# 控制臺(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
3.10 MapReduce的小測試
案例:
1、統(tǒng)計2020年全年度,擊殺榜排名前五的隊伍
2、統(tǒng)計2020年全年度,RED的勝率
3、分別統(tǒng)計lck、lpl賽區(qū)拿五殺的隊伍
3.11 Spark中Wordcount案例
[root@hadoop1 ~]# spark-shell ? ?-----進入scala編程界面
scala> sc.textFile("/web_log/c.log") ? ? ? -----獲取一個文件 ? hello,java
scala> res0.flatMap(_.split(","))? ? ? ? ? ? ? -------hello ? java
scala> res1.map((_,1)) ? ? ? ? ? ? ? ? ? ? -------<hello,1>
scala> res2.reduceByKey(_+_) ? ? ? ? ? ? ? -------根據(jù)key值進行合并,value值進行相加
scala> res3.sortBy(_._2,false) ? ? ? ? ? ? -------根據(jù)值進行排序
scala> res4.collect ? ? ? ? ? ? ? ? ? ? ? ?-------把數(shù)據(jù)變成數(shù)組
res6: Array[(String, Int)] = Array((hello,25), (c,25), (linux,20), (java,20), (hadoop,20), (python,10), (spark,10))
scala> res6.foreach(println) ? ? ? ? ? ? ? -------打印數(shù)據(jù)
(hello,25)
(c,25)
(linux,20)
(java,20)
(hadoop,20)
(python,10)
(spark,10)
?上述案例可以寫一起:
?scala> sc.textFile("/web_log/c.log").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
(python,10)
(hello,25)
(linux,20)
(java,20)
(spark,10)
(hadoop,20)
(c,25)
3.12 Windows下Spark的開發(fā)環(huán)境
spark依賴:
<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-core_2.11</artifactId>
? <version>2.1.1</version>
</dependency>
wordcount案例:
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
/**
?* Spark中經(jīng)典案例
?*/
object WordCount {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? //2、WordCount核心代碼
? ? sc.textFile("D:\\a.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).
? ? ? sortBy(_._2,false).collect().foreach(println)
? }
}
3.13 Spark中RDD
RDD:彈性分布式數(shù)據(jù)集-------封裝一些功能
Spark提供很多便于用戶直接操作方法------RDD(彈性分布式數(shù)據(jù)集)
RDD特性:
1.RDD可以看做是?些列partition所組成的
2.RDD之間的依賴關(guān)系
3.算?是作?在partition之上的
4.分區(qū)器是作?在kv形式的RDD上
5.partition提供的最佳計算位置,利于數(shù)據(jù)處理的本地化即計算向數(shù)據(jù)移動?不是移動數(shù)據(jù)(就近原則)
RDD中彈性:
存儲的彈性:內(nèi)存與磁盤的
?動切換容錯的彈性:數(shù)據(jù)丟失可以
?動恢復(fù)計算的彈性:計算出錯重試機制
分?的彈性:根據(jù)需要重新分?
3.13.1 RDD創(chuàng)建
集群中創(chuàng)建:
利用集合方式進行創(chuàng)建------->Array、List
scala> val a = Array(1,2,3,4)
a: Array[Int] = Array(1, 2, 3, 4)
scala> val b = List(1,2,3,4)
b: List[Int] = List(1, 2, 3, 4)
方式一:
scala> sc.parallelize(a)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
方式二:
scala> sc.makeRDD(b)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:27
需求:
a、res0整體值變?yōu)樵瓉韮杀?br> scala> res0.map(_*2).collect().foreach(println)
2 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
4
6
8
b、res1整體+1
scala> res1.map(_+1).collect().foreach(println)
2
3
4
5
scala> res1.map(_=>{"a"}).collect().foreach(println)
a
a
a
a
通過外部文件方式:
scala> sc.textFile("/web_log/c.log")
res11: org.apache.spark.rdd.RDD[String] = /web_log/c.log MapPartitionsRDD[9] at textFile at <console>:25
3.13.2 常用算子(RDD)
參考網(wǎng)站:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
按照字母自然順序講解常用算子:
collect------可以把RDD轉(zhuǎn)換為數(shù)組
scala> val rdd = sc.makeRDD(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24
scala> rdd.collect()
res13: Array[Int] = Array(1, 2, 3, 4)?
count------計數(shù)
scala> sc.textFile("/web_log/c.log")
res14: org.apache.spark.rdd.RDD[String] = /web_log/c.log MapPartitionsRDD[12] at textFile at <console>:25
scala> res14.count()
res15: Long = 40?
distinct ----去重
scala> sc.textFile("/web_log/c.log").distinct()
res16: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at distinct at <console>:25
scala> res16.count()
res17: Long = 8
first--------拿到第一行數(shù)據(jù)
scala> sc.textFile("hdfs://192.168.91.100:9000//javaMkdir/lol.txt")
res18: org.apache.spark.rdd.RDD[String] = hdfs://192.168.91.100:9000//javaMkdir/lol.txt MapPartitionsRDD[19] at textFile at <console>:25
scala> res18.first()
res19: String = " ? ? ? agt ? ? league ?split ? playoffs ? ? ? ?date ? ?game ? ?patch ? side ? ?team ? ?ban1 ? ?ban2 ? ?ban3 ? ?ban4ban5 ? ? gamelength ? ? ?result ?kills ? deaths ?assists doublekills ? ? triplekills ? ? quadrakills ? ? pentakills ? ? ?firstblood ?team kpm firstdragon ? ? dragons opp_dragons ? ? elementaldrakes opp_elementaldrakes ? ? infernals ? ? ? mountains ? ? ? clouds ?oceans ? ? ? dragons (type unknown) ?elders ?opp_elders ? ? ?firstherald ? ? heralds opp_heralds ? ? firstbaron ? ? ?barons ?opp_barons ?firsttower ? ? ? towers ?opp_towers ? ? ?firstmidtower ? firsttothreetowers ? ? ?inhibitors ? ? ?opp_inhibitors ?damagetochampions ? dpm ? ? ?damageshare ? ? damagetakenperminute ? ?damagemitigatedperminute ? ? ? ?wardsplaced ? ? wpm ? ? wardskilled ? ? wcpm ? ?controlwardsbought ? visionscore ? ? vspm ? ?totalgold ? ? ? earnedgold ? ? ?earned gpm ? ? ?earnedgoldshare goldspent ? ? ? gspd ? ?total cs ? ? minionkills ? ? monsterkills ? ?monsterkillsownjungle ? monsterkillsenemyjungle cspm ? ?goldat1...
filter ?-------filter過濾
scala> sc.textFile("hdfs://192.168.91.100:9000//javaMkdir/lol.txt")
res20: org.apache.spark.rdd.RDD[String] = hdfs://192.168.91.100:9000//javaMkdir/lol.txt MapPartitionsRDD[21] at textFile at <console>:25
scala> res20.filter(_.contains("agt")).collect().foreach(println)
? ? ? ? agt ? ? league ?split ? playoffs ? ? ? ?date ? ?game ? ?patch ? side ? ?team ? ?ban1 ? ?ban2 ? ?ban3 ? ?ban4 ? ?ban5 ? ?gamelength ? result ?kills ? deaths ?assists doublekills ? ? triplekills ? ? quadrakills ? ? pentakills ? ? ?firstblood ? ? ?team kpm ? ?firstdragon ? ? ?dragons opp_dragons ? ? elementaldrakes opp_elementaldrakes ? ? infernals ? ? ? mountains ? ? ? clouds ?oceans ?dragons (type unknown) ? ? ? elders ?opp_elders ? ? ?firstherald ? ? heralds opp_heralds ? ? firstbaron ? ? ?barons ?opp_barons ? ? ?firsttower ? towers ?opp_towers ? ? ?firstmidtower ? firsttothreetowers ? ? ?inhibitors ? ? ?opp_inhibitors ?damagetochampions ? ? ? dpm damageshare ? ? ?damagetakenperminute ? ?damagemitigatedperminute ? ? ? ?wardsplaced ? ? wpm ? ? wardskilled ? ? wcpm ? ?controlwardsbought ? visionscore ? ? vspm ? ?totalgold ? ? ? earnedgold ? ? ?earned gpm ? ? ?earnedgoldshare goldspent ? ? ? gspd ? ?total cs ? ?minionkills ? ? ?monsterkills ? ?monsterkillsownjungle ? monsterkillsenemyjungle cspm ? ?goldat10 ? ? ? ?xpat10 ?csat10 ?opp_goldat10opp_xpat10 ? ? ? opp_csat10 ? ? ?golddiffat10 ? ?xpdiffat10 ? ? ?csdiffat10 ? ? ?goldat15 ? ? ? ?xpat15 ?csat15 ?opp_goldat15 ? ?opp_xpat15 ? opp_csat15 ? ? ?golddiffat15 ? ?xpdiffat15 ? ? ?csdiffat15
flatMap ?-----一行輸入,n行輸出
scala> sc.makeRDD(List("a","b","c")).flatMap(_*3).map((_,1)).collect().foreach(println)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
foreach ?------遍歷
scala> sc.makeRDD(List("a","b","c")).foreach(x=>println(x+"--spark"))
a--spark
b--spark
c--spark
scala> sc.makeRDD(List("a","b","c")).foreach(println)
b
c
a
map --------變形
scala> sc.makeRDD(List('a','b','c')).map(_*2).collect().foreach(println)
194
196
198
max最大值:
scala> sc.makeRDD(List(1,2,3,4,5)).max()
res37: Int = 5
min最小值:
scala> sc.makeRDD(List(1,2,3,4,5)).min()
res38: Int = 1
reduceByKey-----根據(jù)key進行合并
scala> sc.makeRDD(List("java","linux","c","java","hadoop"))
res39: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at makeRDD at <console>:25
scala> res39.map((_,1)).reduceByKey(_+_).collect().foreach(println)
(linux,1)
(java,2)
(hadoop,1)
(c,1)
saveAsTextFile-----把數(shù)據(jù)保存到某個位置
scala> res39.map((_,1)).reduceByKey(_+_).saveAsTextFile("/tmp/out1")
測試:
[root@hadoop1 tmp]# cd out1
[root@hadoop1 out1]# ls
part-00000 ?part-00001 ?_SUCCESS
[root@hadoop1 out1]# cat part-00000
(linux,1)
(java,2)
[root@hadoop1 out1]# cat part-00001
(hadoop,1)
(c,1)
sortBy------常用于排序工作
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._1,false).collect().foreach(println)
(linux,1)
(java,2)
(hadoop,1)
(c,1)
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect().foreach(println)
(java,2) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
(linux,1)
(hadoop,1)
(c,1)
注意:false表示降序,true表示自然排序
take-----選取前幾
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
res46: Array[(String, Int)] = Array((java,2), (linux,1), (hadoop,1))
top-----排名
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).top(3)
res47: Array[(String, Int)] = Array((linux,1), (java,2), (hadoop,1))?
3.13.3 統(tǒng)計ban選英雄需求:
統(tǒng)計lol,春季賽中,ban選英雄的排名前五英雄
package org.tjcj
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
?* 統(tǒng)計英雄聯(lián)盟中ban選英雄排名前五的英雄
?*/
object BanTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? //2、獲取一個RDD對象,通過外部文件方式獲取RDD
? ? val rdd:RDD[String] = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt")
? ? //3、過濾文件,找到需要處理的信息
? ? val filterRDD:RDD[String] = rdd.filter(!_.contains("agt")).filter(_.contains("Spring"))
? ? //4、對原始數(shù)據(jù)進行變形
? ? val mapRDD = filterRDD.map(x=>{//x表示傳入一行數(shù)據(jù)
? ? ? var splits=x.split("\t")
? ? ? var str=""
? ? ? var i=10
? ? ? while(i<15){
? ? ? ?str+=splits(i)+","
? ? ? ?i+=1
? ? ? }
? ? ? str.substring(0,str.length-1) //新的字符串
? ? })
? ? //5、亞平處理
? ? val flatMapRDD = mapRDD.flatMap(_.split(","))
? ? //6、變形
? ? val mapRDD1 = flatMapRDD.map((_,1))
? ? //7、統(tǒng)計
? ? val reduceRDD = mapRDD1.reduceByKey(_+_)
? ? //8、排序
? ? val sortRDD = ?reduceRDD.sortBy(_._2,false)
? ? //val修飾常量,var修飾變量
? ? //9、取值
? ? val array = sortRDD.take(5)
? ? //10、轉(zhuǎn)換保存
? ? sc.makeRDD(array).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out2")
? }
}
測試結(jié)果:
[root@hadoop3 ~]# hadoop fs -cat ?/spark/out/out2/part-00000
(Aphelios,385)
(Sett,331)
(LeBlanc,294)
(Senna,262)
(Zoe,251)
3.13.4 統(tǒng)計整個賽年中l(wèi)ck賽區(qū),擊殺榜排名前5隊伍,(最終數(shù)據(jù)要求,IG,456)
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
object KillTop5 {
def main(args: Array[String]): Unit = {
//1、獲取SparkContext對象
val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
filter(_.contains("LCK")).map(x=>{
( x.split("\t")(9),Integer.parseInt(x.split("\t")(17))) //(team,kills)
}).reduceByKey(_+_).sortBy(_._2,false).take(5)
sc.makeRDD(array).map(x=>{
var str=""
str=x._1+","+x._2
str
}).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out5")
}
}
測試結(jié)果:
[root@hadoop3 ~]# hadoop fs -cat /spark/out/out5/part-00000
DRX,1367
Gen.G,1356
DAMWON Gaming,1268
T1,1231
Afreeca Freecs,1038
3.13.4 統(tǒng)計一下整個賽年中l(wèi)pl賽區(qū),死亡排行榜,排名前五隊伍
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
object DeathTop5 {
def main(args: Array[String]): Unit = {
//1、獲取SparkContext對象
val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
filter(_.contains("LPL")).map(x=>{
( x.split("\t")(9),Integer.parseInt(x.split("\t")(18))) //(team,kills)
}).reduceByKey(_+_).sortBy(_._2,false).take(5)
sc.makeRDD(array).map(x=>{
var str=""
str=x._1+","+x._2
str
}).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out6")
}
}
測試:
[root@hadoop3 ~]# hadoop fs -cat /spark/out/out6/part-00000
Invictus Gaming,1317
Team WE,1271
Suning,1261
LGD Gaming,1221
Rogue Warriors,1219
3.13.5 總結(jié)
RDD:分布式彈性數(shù)據(jù)集,如何去掌握學(xué)好RDD
熟悉常用的RDD
多做練習(xí),針對不同需求有不同解決方法
測試:
1、統(tǒng)計整個賽年中,lpl賽區(qū),助攻榜排名前五
案例:
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
/**
?* lol助攻榜
?*/
object AssistsTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
? ? ? filter(_.contains("LPL")).map(x=>{
? ? ? ( x.split("\t")(9),Integer.parseInt(x.split("\t")(19))) ?//(team,Assists)
? ? }).reduceByKey(_+_).sortBy(_._2,false).take(5)
? ? sc.makeRDD(array).map(x=>{
? ? ? var str=""
? ? ? str=x._1+","+x._2
? ? ? str
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out7")
? }
}
運行結(jié)果:
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
/**
?* lol助攻榜
?*/
object AssistsTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
? ? ? filter(_.contains("LPL")).map(x=>{
? ? ? ( x.split("\t")(9),Integer.parseInt(x.split("\t")(19))) ?//(team,Assists)
? ? }).reduceByKey(_+_).sortBy(_._2,false).take(5)
? ? sc.makeRDD(array).map(x=>{
? ? ? var str=""
? ? ? str=x._1+","+x._2
? ? ? str
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out7")
? }
}
2、統(tǒng)計整個賽年中,拿1血勝率
案例:
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
/**
?* 拿一血的勝率
?*/
object WinFirstBlood {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkContext對象
? ? val conf = new SparkConf().setAppName("winFirstBlood").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt"))
? ? ? .map(x=>{
? ? ? ? var splits=x.split("\t")
? ? ? ? (Integer.parseInt(splits(16)),Integer.parseInt(splits(24)))
? ? ? }).filter(x=>{
? ? ? ? x._2==1
? ? }).reduceByKey(_+_).map(x=>{
? ? ? var d:Double=x._2
? ? ? ("win",d)
? ? }).reduceByKey(_/_).map(x=>{
? ? ? (x._1,x._2.formatted("%.2f"))
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out9")
? }
}
運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /spark/out/out8/part-00000
(win,0.58)
3、統(tǒng)計整個賽年中,勝率超過60%隊伍,以及勝率
package org.tjcj
import org.apache.spark.{SparkConf, SparkContext}
/**
?* 整個賽年中勝率超過60%
?*/
object WinSpark {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkContext對象
? ? val conf = new SparkConf().setAppName("winFirstBlood").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? val rdd= sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt"))
? ? val rdd1= ?rdd.map(x=>{
? ? ? ? var splits =x.split("\t")
? ? ? ? (splits(9),Integer.parseInt(splits(16)))
? ? ? }).reduceByKey(_+_)
? ? val rdd2 = rdd.map(x=>{
? ? ? var splits =x.split("\t")
? ? ? (splits(9),1)
? ? }).reduceByKey(_+_)
? ? rdd1.union(rdd2).reduceByKey((x1,x2)=>{
? ? ? var win:Int=0;
? ? ? if(x1<x2){
? ? ? ? win = x1*100/x2
? ? ? }
? ? ? win
? ? }).filter(_._2>60).map(x=>(x._1,x._2+"%"))
? ? ? .repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out11")
? }
}
運行:
[root@hadoop1 ~]# hadoop fs -cat /spark/out/out11/part-00000
(Invictus Gaming,61%)
(DRX,64%)
(JD Gaming,70%)
(DAMWON Gaming,67%)
(T1,67%)
(Gen.G,69%)
(Top Esports,67%)
3.14 Spark中Spark-sql應(yīng)用
3.14.1 Spark-sql的基礎(chǔ)用法
搭建Spark-sql開發(fā)環(huán)境
<!--引入spark-sql依賴-->
<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-sql_2.11</artifactId>
? <version>2.1.1</version>
</dependency>
<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-hive_2.11</artifactId>
? <version>2.1.1</version>
</dependency>
案例:
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
?*
?*/
object MyFirstSparkSql {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkSession對象
? ? val spark = SparkSession.builder().appName("myFirstSparkSession").master("local[*]").getOrCreate()
? ? val df = spark.read.json("D:\\spark-sql\\person.json")
? ? df.show()
? }
}
可以讀取JSON格式的文件,進行展示
Spark-sql的基礎(chǔ)用法:
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
?*
?*/
object MyFirstSparkSql {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkSession對象
? ? val spark = SparkSession.builder().appName("myFirstSparkSession").master("local[*]").getOrCreate()
? ? val df = spark.read.json("D:\\spark-sql\\people.json")
? ? df.show()
? ? //打印二維表的格式
? ? df.printSchema()
? ? //類似于sql語句格式進行查詢數(shù)據(jù)
? ? df.select("name","age").show()
? ? //引入scala,隱式轉(zhuǎn)換
? ? import spark.implicits._
? ? //讓字段進行運算
? ? df.select($"name",$"age"+1).show()
? ? //加上條件
? ? df.select("name").where("age>20").show()
? ? //進行分組統(tǒng)計
? ? df.groupBy("age").count().show()
? ? //創(chuàng)建一個臨時表
? ? df.createOrReplaceTempView("people")
? ? spark.sql("select age,count(*) as count1 from people group by age").show()
? ? //關(guān)閉spark
? ? spark.stop()
? }
}
3.14.2 Spark-sql中read
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 讀取不同類型的文件
*/
object SparkRead {
def main(args: Array[String]): Unit = {
//構(gòu)建SparkSession對象
val spark = SparkSession.builder().appName("sparkRead").master("local[*]").getOrCreate()
//讀取json文件
val df = spark.read.json("D:\\spark-sql\\employees.json")
df.show()
df.printSchema()
//讀取csv文件
val df1= spark.read.csv("D:\\spark-sql\\lol.csv")
df1.show()
df1.printSchema()
//讀取普通文本文件
val df2=spark.read.text("D:\\spark-sql\\student.txt")
df2.show()
df2.printSchema()
//讀取orc
//讀取
val df4 =spark.read.parquet("D:\\spark-sql\\users.parquet")
df4.show()
df4.printSchema()
}
}
3.14.3 Spark-sql中的寫
package org.tjcj.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
object SparkWrite {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().appName("write").appName("write").master("local[*]").getOrCreate()
spark.sql("set spark.sql.adaptive.enabled=true")
val rdd:RDD[Row]= spark.sparkContext.parallelize(List(
Row(1,"張三","大四",18),
Row(1,"李四","大四",19),
Row(1,"王五","大四",20),
Row(1,"趙六","大四",21),
Row(1,"劉七","大四",22)
))
val schema=StructType(List(
StructField("id",DataTypes.IntegerType,false),
StructField("name",DataTypes.StringType,false),
StructField("grade",DataTypes.StringType,false),
StructField("age",DataTypes.IntegerType,false)
))
val df = spark.createDataFrame(rdd,schema)
df.show()
df.printSchema()
df.repartition(1).write.json("D:\\spark-sql\\out\\out3")
import spark.implicits._
val ds:Dataset[Student]= spark.createDataset(List(
Student(1,"張三","大四",18),
Student(1,"李四","大四",19),
Student(1,"王五","大四",20),
Student(1,"趙六","大四",21),
Student(1,"劉七","大四",22)
))
ds.show()
ds.printSchema()
ds.repartition(1).write.csv("D:\\spark-sql\\out\\out4")
}
}
case class Student(id:Int,name:String,grade:String,age:Int)
3.15 圖表可視化
基于大數(shù)據(jù)處理的數(shù)據(jù),基本上都是文件的格式-----需要使用圖表化的工具進行展示
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>電影</title>
<style>
#main{
width: 1500px;
/*自適應(yīng)的高度*/
height: 100vh;
margin: 0px auto;
padding: 0px;
}
#one{
width: 700px;
height: 50vh;
float: left;
}
#two{
width: 700px;
height: 50vh;
float: right;
}
#three{
width: 700px;
height: 50vh;
float: left;
}
#four{
width: 700px;
height: 50vh;
float: right;
}
</style>
</head>
<body>
<div id="main">
<div id="one"></div>
<div id="two"></div>
<div id="three"></div>
<div id="four"></div>
</div>
</body>
<!--引入外部的js文件-->
<script src="../js/jquery.min.js"></script>
<script src="../js/echarts-all.js"></script>
<!--編寫javascript代碼引用json文件-->
<script>
$(function(){
$.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
url:"../data/part-00000-6aee901d-e3d1-4e45-8ced-26e24edadb88.json",
type:"get",
dataType:"json",
success:function(res){
let x=[]
let y=[]
for (let i=0;i<res.length;i++){
x.push(res[i].address)
y.push(res[i].count)
}
//1、初始化echats
let myEcharts = echarts.init(document.getElementById("one"))
// 2、設(shè)置option選項值
let option = {
title : {
text: '統(tǒng)計各個國家拍電影總數(shù)排行榜',
subtext: '豆瓣'
},
tooltip : {
trigger: 'axis'
},
legend: {
data:['電影數(shù)據(jù)']
},
calculable : true,
xAxis : [
{
type : 'category',
boundaryGap : false,
data : x
}
],
yAxis : [
{
type : 'value',
axisLabel : {
formatter: '{value} 部'
},
splitArea : {show : true}
}
],
series : [
{
name:'電影數(shù)據(jù)',
type:'line',
itemStyle: {
normal: {
lineStyle: {
shadowColor : 'rgba(0,0,0,0.4)'
}
}
},
data:y
}
]
};
// 3、加載Option選項
myEcharts.setOption(option)
}
})
$.ajax({ //導(dǎo)演的拍電影的總數(shù)
url:"../data/part-00000-256eb3a7-952f-49ee-befe-9cc3952d7d17.json",
type:"get",
dataType:"json",
success:function(res){
let x=[]
let y=[]
for (let i=0;i<res.length;i++){
x.push(res[i].direct)
y.push(res[i].count)
}
//1、初始化echats
let myEcharts = echarts.init(document.getElementById("two"))
// 2、設(shè)置option選項值
let option = {
title : {
text: '導(dǎo)演的拍電影排行榜',
subtext: '豆瓣'
},
tooltip : {
trigger: 'axis'
},
legend: {
data:['電影數(shù)據(jù)']
},
toolbox: {
show : true,
feature : {
mark : true,
dataView : {readOnly: false},
magicType:['line', 'bar'],
restore : true,
saveAsImage : true
}
},
calculable : true,
xAxis : [
{
type : 'category',
data : x
}
],
yAxis : [
{
type : 'value',
splitArea : {show : true}
}
],
series : [
{
name:'電影數(shù)據(jù)',
type:'bar',
data:y
}
]
};
// 3、加載Option選項
myEcharts.setOption(option)
}
})
$.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
url:"../data/part-00000-419034d2-3312-487c-a467-d426debeb2ef.json",
type:"get",
dataType:"json",
success:function(res){
let x=[]
let y=[]
for (let i=0;i<res.length;i++){
x.push(res[i].movieName)
y.push(res[i].grade)
}
//1、初始化echats
let myEcharts = echarts.init(document.getElementById("three"))
// 2、設(shè)置option選項值
let option = {
title : {
text: '評分排行榜',
subtext: '豆瓣'
},
tooltip : {
trigger: 'axis'
},
legend: {
data:['電影數(shù)據(jù)']
},
calculable : true,
xAxis : [
{
type : 'category',
boundaryGap : false,
data : x
}
],
yAxis : [
{
type : 'value',
axisLabel : {
formatter: '{value} 分'
},
splitArea : {show : true}
}
],
series : [
{
name:'電影數(shù)據(jù)',
type:'line',
itemStyle: {
normal: {
lineStyle: {
shadowColor : 'rgba(0,0,0,0.4)'
}
}
},
data:y
}
]
};
// 3、加載Option選項
myEcharts.setOption(option)
}
})
$.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
url:"../data/part-00000-d06a0328-f4be-408c-b050-231c1f17a2b9.json",
type:"get",
dataType:"json",
success:function(res){
let x=[]
let y=[]
for (let i=0;i<res.length;i++){
x.push(res[i].movieName)
y.push({"value":res[i].count,"name":res[i].movieName})
}
//1、初始化echats
let myEcharts = echarts.init(document.getElementById("four"))
// 2、設(shè)置option選項值
let option = {
title : {
text: '上映率',
subtext: '豆瓣',
x:'center'
},
tooltip : {
trigger: 'item',
formatter: "{a} <br/> : {c} (n5n3t3z%)"
},
legend: {
orient : 'vertical',
x : 'left',
data:x
},
toolbox: {
show : true,
feature : {
mark : true,
dataView : {readOnly: false},
restore : true,
saveAsImage : true
}
},
calculable : true,
series : [
{
name:'觀影次數(shù)',
type:'pie',
radius : '55%',
center: ['50%', 225],
data:y
}
]
};
// 3、加載Option選項
myEcharts.setOption(option)
}
})
})
</script>
</html>
四、項目實戰(zhàn)
4.1 項目文件分析
需求:
讀取兩個csv文件,并且輸出內(nèi)容以及,schema(二維表結(jié)構(gòu))
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 統(tǒng)計電影的文件情況
*/
object MoviesSpark {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df1=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv")
df1.show(10)
df1.printSchema()
val df2=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
df2.show(10)
df2.printSchema()
}
}
4.2 統(tǒng)計一下觀影次數(shù)排名前十的電影名稱和觀影次數(shù)
通過分析:
? 得出結(jié)論,只通過user一個表就可以統(tǒng)計出電影名稱以及其觀影次數(shù)
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc
/**
* 統(tǒng)計觀影次數(shù)最多的5部電影
*/
object MoviesTop5 {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
//方式一:采用spark-sql提供df方法操作
import spark.implicits._
df.select($"movieName").groupBy($"movieName").count().orderBy(desc("count")).show(5)
df.select($"movieName").groupBy($"movieName").count().orderBy($"count".desc).show(5)
//方式二:利用hive的sql方式
df.createOrReplaceTempView("user")
spark.sql("select movieName,count(*) as count from user group by movieName order by count desc").show(5)
}
}
寫出到文件中:
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{array, desc}
/**
* 統(tǒng)計觀影次數(shù)最多的10部電影
*/
object MoviesTop5 {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
//方式一:采用spark-sql提供df方法操作
import spark.implicits._
df.select($"movieName").groupBy($"movieName").count().orderBy(desc("count")).show(5)
df.select($"movieName").groupBy($"movieName").count().orderBy($"count".desc).limit(10).repartition(1)
.write.csv("D:\\spark-sql\\out\\out5")
//方式二:利用hive的sql方式
df.createOrReplaceTempView("user")
spark.sql("select movieName,count(*) as count from user group by movieName order by count desc").limit(10).repartition(1)
.write.json("D:\\spark-sql\\out\\out6")
}
}
4.3 統(tǒng)計一下評分排名前10的電影名稱和評分
分析:只需要movie.csv文件
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 統(tǒng)計電影的文件情況
*/
object GradeTop10 {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
//采用sql方式實現(xiàn)
//創(chuàng)建臨時表
df.createOrReplaceTempView("movie")
spark.sql("select distinct(movieName),grade from movie where grade >8 order by grade desc").limit(10)
.write.json("D:\\spark-sql\\out\\out8")
//關(guān)閉spark session對象
spark.stop()
}
}
4.4 驗證單表效果
統(tǒng)計劇情類型電影中評分最高10部電影
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 統(tǒng)計劇情類型的電影排行榜
*/
object JqTop10 {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
//采用sql方式實現(xiàn)
//創(chuàng)建臨時表
df.createOrReplaceTempView("movie")
spark.sql("select distinct(movieName),grade from movie where grade >8 and type='劇情' order by grade desc").limit(10)
.write.json("D:\\spark-sql\\out\\out9")
//關(guān)閉spark session對象
spark.stop()
}
}
統(tǒng)計各個國家電影數(shù)量排名前十電影
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 各個國家數(shù)據(jù)量
*/
object CountTop10 {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv")
df.createOrReplaceTempView("movie")
spark.sql("select address,count(*) as count from movie group by address order by count desc").limit(10)
.write.json("D:\\spark-sql\\out\\out10")
spark.stop()
}
}
4.5 統(tǒng)計近年來那些導(dǎo)演拍的電影多(TOPN榜)
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 近年來那些導(dǎo)演拍的電影較多
*/
object DirectTop10 {
def main(args: Array[String]): Unit = {
//1、獲取SparkSession對象
val spark = SparkSession.builder().appName("directTop10").master("local[*]").getOrCreate()
//2、獲取要處理的文件
val df = spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
//方式一:采用spark-sql中方言方式
import spark.implicits._
df.repartition(1).select("direct").groupBy("direct").count().orderBy($"count".desc).limit(10)
.write.json("D:\\spark-sql\\out\\out11")
//方式二:采用hive-sql的方式
df.createOrReplaceTempView("movie")
spark.sql("select direct,count(*) as count from movie group by direct order by count desc").limit(10).repartition(1)
.write.json("D:\\spark-sql\\out\\out12")
}
}
4.6 統(tǒng)計評分高于9.0,受歡迎的電影以及觀影次數(shù),排行榜
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 統(tǒng)計電影的文件情況
*/
object MoviesGrade {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df1=spark.read.option("header",true).csv("D:\\spark-sql\\movie.csv")
val df2=spark.read.option("header",true).csv("D:\\spark-sql\\user.csv")
//創(chuàng)建臨時表
df1.createOrReplaceTempView("movie")
df2.createOrReplaceTempView("user")
spark.sql("select m.movieName,count(*) as count from movie m left join user u on m.movieName = u.movieName " +
"where m.grade >9.0 group by m.movieName order by count desc").limit(10).repartition(1)
.write.json("D:\\spark-sql\\out\\out13")
}
}
運行結(jié)果:
{"movieName":"瘋狂動物城","count":208}
{"movieName":"阿甘正傳","count":190}
{"movieName":"美麗人生","count":126}
{"movieName":"三傻大鬧寶萊塢","count":126}
{"movieName":"竊聽風(fēng)暴","count":120}
{"movieName":"指環(huán)王3:王者無敵","count":120}
{"movieName":"教父","count":120}
{"movieName":"亂世佳人","count":120}
{"movieName":"辛德勒的名單","count":117}
{"movieName":"這個殺手不太冷","count":112}
4.7 統(tǒng)計評分高于9.0,且時間在1月份,電影名稱和電影評分,排行榜
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 統(tǒng)計電影的文件情況
*/
object GradeMovie {
def main(args: Array[String]): Unit = {
//獲取SparkSession對象
val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
val df1=spark.read.option("header",true).csv("D:\\spark-sql\\movie.csv").distinct()
val df2=spark.read.option("header",true).csv("D:\\spark-sql\\user.csv").distinct()
//創(chuàng)建臨時表
df1.createOrReplaceTempView("movie")
df2.createOrReplaceTempView("user")
spark.sql("select m.movieName,m.grade,count(*) as count from movie m left join user u on m.movieName = u.movieName " +
"where m.grade >9.0 and substr(u.time,0,7) = '2018-01' group by m.movieName,m.grade order by count desc").limit(10)
.repartition(1)
.write.json("D:\\spark-sql\\out\\out14")
}
}
4.8 總結(jié)和回顧
在Spark中,spark-sql和spark-core之間可以進行相互轉(zhuǎn)換的
RDD和DateFrame,RDD和DataSet是可以進行相互轉(zhuǎn)換的
package org.tjcj.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
object SparkWrite {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().appName("write").appName("write").master("local[*]").getOrCreate()
spark.sql("set spark.sql.adaptive.enabled=true")
val rdd:RDD[Row]= spark.sparkContext.parallelize(List(
Row(1,"張三","大四",18),
Row(1,"李四","大四",19),
Row(1,"王五","大四",20),
Row(1,"趙六","大四",21),
Row(1,"劉七","大四",22)
))
val schema=StructType(List(
StructField("id",DataTypes.IntegerType,false),
StructField("name",DataTypes.StringType,false),
StructField("grade",DataTypes.StringType,false),
StructField("age",DataTypes.IntegerType,false)
))
val df = spark.createDataFrame(rdd,schema)
df.show()
df.printSchema()
df.repartition(1).write.json("D:\\spark-sql\\out\\out3")
import spark.implicits._
val ds:Dataset[Student]= spark.createDataset(List(
Student(1,"張三","大四",18),
Student(1,"李四","大四",19),
Student(1,"王五","大四",20),
Student(1,"趙六","大四",21),
Student(1,"劉七","大四",22)
))
ds.show()
ds.printSchema()
ds.repartition(1).write.csv("D:\\spark-sql\\out\\out4")
}
}
case class Student(id:Int,name:String,grade:String,age:Int)
樣例類----自動生成set、get方法以及構(gòu)造方法
DataFrame和DataSet區(qū)別不大,DataSet是有格式的
執(zhí)行Sql兩種方式:
利用Spark-sql,獨有方言進行計算文章來源:http://www.zghlxwxcb.cn/news/detail-787448.html
package org.tjcj.sql
import org.apache.spark.sql.SparkSession
/**
* 近年來那些導(dǎo)演拍的電影較多
*/
object DirectTop10 {
def main(args: Array[String]): Unit = {
//1、獲取SparkSession對象
val spark = SparkSession.builder().appName("directTop10").master("local[*]").getOrCreate()
//2、獲取要處理的文件
val df = spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
//方式一:采用spark-sql中方言方式
import spark.implicits._
df.repartition(1).select("direct").groupBy("direct").count().orderBy($"count".desc).limit(10)
.write.json("D:\\spark-sql\\out\\out11")
//方式二:采用hive-sql的方式
df.createOrReplaceTempView("movie")
spark.sql("select direct,count(*) as count from movie group by direct order by count desc").limit(10).repartition(1)
.write.json("D:\\spark-sql\\out\\out12")
}
}
方式二:通常比較常用的一種,編寫sql語句直接使用,總體來說,Spark-sql更加便于開發(fā)。文章來源地址http://www.zghlxwxcb.cn/news/detail-787448.html
到了這里,關(guān)于大數(shù)據(jù)實訓(xùn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!