国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)實訓(xùn)

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)實訓(xùn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、前言

1、Hadoop集群框架搭建(學(xué)過,但是沒有現(xiàn)成的)

2、python(機器學(xué)習(xí))

3、Spark(沒有)

4、Flume(沒有)

5、Sqoop(沒有接觸)

6、編程語言: SpringBoot(有)+echarts(數(shù)據(jù)可視化框架)

二、大數(shù)據(jù)概述

1.1?百度百科:大數(shù)據(jù),短期無法運用常規(guī)一些手段去及時處理海量數(shù)據(jù),需要使用新型的技術(shù)進行處理。

1.2 大數(shù)據(jù):

????????a、海量數(shù)據(jù)存儲

????????b、海量數(shù)據(jù)分析(運算,處理)

1.3 大數(shù)據(jù)為了解決事物的未知性,給判斷提供準確性

2.1 企業(yè)中大數(shù)據(jù)開發(fā)流程

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

2.2 大數(shù)據(jù)的開發(fā)工具包

1、鏈接:https://pan.baidu.com/s/1eg8poPxAN-sLBd53S9hJ1g

2、提取碼:k54t

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

本次項目中會用到很多開發(fā)工具以及軟件安裝包:

a、操作系統(tǒng),linux操作系統(tǒng)---centos7---最接近企業(yè)開發(fā)版本

b、遠程連接工具-----finalshell遠程連接工具

c、jdk1.8版本

d、數(shù)據(jù)采集工具Flum01.7版本、Sqoop

e、hadoop集群框架----hadoop2.7.3版本

f、kafka2.11版本

g、Vmware虛擬機

h、spark2.1.1版本

i、flink-1.10.2版本

j、idea開發(fā)工具、Maven作jar管理項目創(chuàng)建

2.3 大數(shù)據(jù)環(huán)境搭建

2.3.1 虛擬機安裝和配置

1 虛擬機(Virtual Machine)指通過軟件模擬的具有完整硬件系統(tǒng)功能的、運行在一個完全隔離環(huán)境中的完整計算機系統(tǒng)。

2 在實體計算機中能夠完成的工作在虛擬機中都能夠?qū)崿F(xiàn)。

3 在計算機中創(chuàng)建虛擬機時,需要將實體機的部分硬盤和內(nèi)存容量作為虛擬機的硬盤和內(nèi)存容量。每個虛擬機都有獨立的CMOS、硬盤和操作系統(tǒng),可以像使用實體機一樣對虛擬機進行操作.

4 總結(jié):虛擬機具有獨立內(nèi)存、硬盤容量、cup,是一個完整計算機系統(tǒng),具有優(yōu)點,如果在使用虛擬機的過程中,出現(xiàn)損壞,或者故障,只需要還原虛擬機設(shè)備,就會釋放虛擬機資源,重新配置虛擬機,更加方便使用。

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.3.2 虛擬機創(chuàng)建

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.3.3 搭建linux操作系統(tǒng)

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.3.4 遠程連接

a、需要開辟22端口 -----ssh方式

b、固定的ip地址

配置一個固定的IP地址:

在linux操作系統(tǒng)中:所有的軟件配置文件,都是以文件方式進行出現(xiàn),也就意味著,配置一個固定ip地址需要一個網(wǎng)卡文件

/etc/sysconfig/network-scripts/ifcfg-ens33

vi /etc/sysconfig/network-scripts/ifcfg-ens33

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?重啟網(wǎng)絡(luò):

service network restart

systemctl restart network.service

ip addr ----查詢ip地址

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

ping www.baidu.com

使用netstat方式查看端口是否存在:

netstat -tln | grep 22

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

上述問題出現(xiàn)原因是因為,系統(tǒng)為純凈版的系統(tǒng),需要手動安裝

yum install net-tools -y

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.3.5 時間同步

時間同步:把時間跟某個服務(wù)器時間進行統(tǒng)一

阿里時間同步服務(wù)器:

ntp.aliyun.com

ntp1.aliyun.com

ntp2.aliyun.com

[root@hadoop1 ~]# ntpdate ntp.aliyun.com -bash: ntpdate: 未找到命令

解決方案:

[root@hadoop1 ~]# yum install ntp -y

[root@hadoop1 ~]# ntpdate ntp.aliyun.com

5 Dec 15:24:49 ntpdate[36743]: step time server 203.107.6.88 offset -28800.771257 sec [root@hadoop1 ~]# date

20221205日 星期一 15:25:10 CST

2.3.6 ip地址映射主機名

[root@hadoop1 ~]# vi /etc/hosts

192.168.91.100 hadoop1? ? ? ? ? ? ? ? ? ? ? ?------hadoop1為主機名

測試:

[root@hadoop1 ~]# ping hadoop1

PING hadoop1 (192.168.91.100) 56(84) bytes of data.

64 bytes from hadoop1 (192.168.91.100): icmp_seq=1 ttl=64 time=0.138 ms

2.3.7 安裝和配置jdk環(huán)境變量

由于hadoop生態(tài)圈和spark生態(tài)圈都需要jvm支持,需要在linux操作系統(tǒng)中進行環(huán)境變量的配置 a、創(chuàng)建一個文件夾----存放軟件

[root@hadoop1 ~]# mkdir /soft

b、上傳jdk軟件包到/soft文件夾中

[root@hadoop1 ~]# tar -zxvf /soft/jdk-8u162-linux-x64.tar.gz -C /opt

tar:常用于解壓或者壓縮操作

-z:解壓文件的后綴名為.gz

-x:解壓

-v:顯示解壓的過程

-f:表示目標文件

-C:表示解壓后的位置

c、配置jdk環(huán)境變量 在linux操作系統(tǒng)中,環(huán)境變量

系統(tǒng)環(huán)境變量:/etc/profile

當前用戶環(huán)境變量:~/bash_profile

[root@hadoop1 ~]# vi /etc/profile

#JDK的環(huán)境變量

export JAVA_HOME=/opt/jdk1.8.0_162

export PATH=$JAVA_HOME/bin:$PATH:$HOME/bin

d、生效環(huán)境變量

[root@hadoop1 ~]# source /etc/profile

e、測試環(huán)境變量是否配置成功

[root@hadoop1 ~]# java -version

java version "1.8.0_162"

Java(TM) SE Runtime Environment (build 1.8.0_162-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)

2.3.8 虛擬機克隆和配置

由于項目中所采用為集群模式,需要多臺節(jié)點,克隆多臺虛擬以供使用

準備:關(guān)閉克隆虛擬機

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?再次進行上述操作:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

分別在hadoop2和hadoop3進行以下操作:

打開hadoop2:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?修改主機名:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?重啟虛擬機:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?配置遠程連接:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?同步時間:

[root@hadoop2 ~]# ntpdate ntp.aliyun.com

5 Dec 16:17:34 ntpdate[2136]: step time server 203.107.6.88 offset -28800.935828 sec

在hadoop3中進行上述操作:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?3臺節(jié)點之間映射關(guān)系:

[root@hadoop1 ~]# vi /etc/hosts

192.168.91.100 hadoop1

192.168.91.101 hadoop2

192.168.91.102 hadoop3

[root@hadoop2 ~]# vi /etc/hosts

192.168.91.100 hadoop1

192.168.91.101 hadoop2

192.168.91.102 hadoop3

[root@hadoop3 ~]# vi /etc/hosts

192.168.91.100 hadoop1

192.168.91.101 hadoop2

192.168.91.102 hadoop3

測試:

[root@hadoop1 ~]# ping hadoop1

PING hadoop1 (192.168.91.100) 56(84) bytes of data.

64 bytes from hadoop1 (192.168.91.100): icmp_seq=1 ttl=64 time=0.050 ms

64 bytes from hadoop1 (192.168.91.100): icmp_seq=2 ttl=64 time=0.041 ms

^Z

[1]+ 已停止? ? ? ? ? ?ping hadoop1

[root@hadoop1 ~]# ping hadoop2

PING hadoop2 (192.168.91.101) 56(84) bytes of data.

64 bytes from hadoop2 (192.168.91.101): icmp_seq=1 ttl=64 time=0.704 ms

64 bytes from hadoop2 (192.168.91.101): icmp_seq=2 ttl=64 time=0.440 ms

^Z [2]+ 已停止? ? ? ?ping hadoop2

[root@hadoop1 ~]# ping hadoop3

PING hadoop3 (192.168.91.102) 56(84) bytes of data.

64 bytes from hadoop3 (192.168.91.102): icmp_seq=1 ttl=64 time=0.507 ms

64 bytes from hadoop3 (192.168.91.102): icmp_seq=2 ttl=64 time=0.474 ms

^Z

[3]+ 已停止 ping? ? ? hadoop3

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

檢查ip地址:

ip addr

查看ip地址是否匹配

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?檢查無線網(wǎng)卡如果不存在

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?https://www.ccleaner.com/zh-cn

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?卸載vm,重新安裝

2.3.9 Hadoop完全分布式搭建

namenode

secondarynamenode

datanode

resourcemanager

nodemanager

hadoop1

hadoop2

hadoop3

a、上傳hadoop軟件包到soft文件夾

b、解壓Hadoop的軟件包

[root@hadoop1 ~]# tar -zxvf /soft/hadoop-2.7.3.tar.gz -C /opt

c、配置hadoop的環(huán)境變量

[root@hadoop1 ~]# vi /etc/profile

export HADOOP_HOME=/opt/hadoop-2.7.3

export?PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin

d、生效環(huán)境變量

[root@hadoop1 ~]# source /etc/profile

e、測試

[root@hadoop1 ~]# hadoop version

Hadoop 2.7.3

Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r?baa91f7c6bc9cb92be5982de4719c1c8af91ccff

Compiled by root on 2016-08-18T01:41Z

Compiled with protoc 2.5.0

From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4

This command was run using /opt/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar

hadoop配置文件講解:

參考文檔:https://hadoop.apache.org/

hadoop主要模塊:

a、hadoop common:hadoop的通用模塊,為其他模塊提供支持

b、hdfs:hadoop分布式文件系統(tǒng)

c、hadoop yarn:hadoop資源調(diào)度平臺

d、hadoop MapReduce:分布式計算框架

https://hadoop.apache.org/docs/r2.7.3/

修改hadoop配置文件:

[root@hadoop1 ~]# cd /opt/hadoop-2.7.3/etc/hadoop/

[root@hadoop1 hadoop]# vi hadoop-env.sh

export JAVA_HOME=/opt/jdk1.8.0_162

[root@hadoop1 hadoop]# vi core-site.xml

<configuration>
    <!-- namenode 默認通訊地址  -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop1:9000</value>
    </property>
    <!-- 整個集群基礎(chǔ)路徑 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/tmp/hadoop</value>
    </property>
</configuration>
[root@hadoop1 hadoop]# vi hdfs-site.xml 
<configuration>
    <!-- hadoop塊的備份數(shù)  -->
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <!-- secondarynamenode http訪問地址 -->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop2:50090</value>
    </property>
</configuration>
[root@hadoop1 hadoop]# cp mapred-site.xml.template mapred-site.xml
[root@hadoop1 hadoop]# vi mapred-site.xml
<configuration>
    <!-- 整個集群運行框架是yarn  -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>
[root@hadoop1 hadoop]# vi yarn-site.xml 
<configuration>
    <!-- 中間服務(wù) shuffle  -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

[root@hadoop1 hadoop]# vi slaves

hadoop1

hadoop2

hadoop3

需要把配置好文件分發(fā)hadoop2、hadoop3節(jié)點上:

[root@hadoop1 ~]# scp /etc/profile hadoop2:/etc/profile

The authenticity of host 'hadoop2 (192.168.91.101)' can't be established.

ECDSA key fingerprint is SHA256:ETL5Iad3RarttSkJLbFPlEn/KKUBAnHyMcttoUZxhHM.

ECDSA key fingerprint is MD5:5f:31:bc:fa:0f:74:a7:55:9c:ec:59:94:bd:14:ca:5b.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added 'hadoop2,192.168.91.101' (ECDSA) to the list of known hosts. root@hadoop2's password:

profile

[root@hadoop1 ~]# scp /etc/profile hadoop3:/etc/profile

The authenticity of host 'hadoop3 (192.168.91.102)' can't be established.

ECDSA key fingerprint is SHA256:ETL5Iad3RarttSkJLbFPlEn/KKUBAnHyMcttoUZxhHM.

ECDSA key fingerprint is MD5:5f:31:bc:fa:0f:74:a7:55:9c:ec:59:94:bd:14:ca:5b.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added 'hadoop3,192.168.91.102' (ECDSA) to the list of known hosts. root@hadoop3's password: profile

[root@hadoop1 ~]# scp -r /opt/hadoop-2.7.3 hadoop2:/opt/

[root@hadoop1 ~]# scp -r /opt/hadoop-2.7.3 hadoop3:/opt/

分別在hadoop1、hadoop2、hadoop3中關(guān)閉防火墻

[root@hadoop1 ~]# systemctl stop firewalld

[root@hadoop2 ~]# systemctl stop firewalld

[root@hadoop3 ~]# systemctl stop firewalld

在hadoop1中進行格式化namenode

[root@hadoop1 ~]# hadoop namenode -format

2.3.10 三個節(jié)點中配置免密

分別在hadoop1、hadoop2、hadoop3中依次進行以下操作

[root@hadoop1 ~]# ssh-keygen -t rsa

Generating public/private rsa key pair.

Enter file in which to save the key (/root/.ssh/id_rsa):

Enter passphrase (empty for no passphrase):

Enter same passphrase again:

Your identification has been saved in /root/.ssh/id_rsa.

Your public key has been saved in /root/.ssh/id_rsa.pub.

The key fingerprint is:

SHA256:7rTCj+LqM95Gk1nUniyf1Yy0NM1W7FAPR6slCTvDYGo root@hadoop1

The key's randomart image is:

+---[RSA 2048]----+

| . o + ++.o|

| . + * B o+.|

| . E + % = o.|

| + + + = = |

| + oSo . |

| = .o |

| . o o |

| o.o o+ . |

| o+*o..o+ |

+----[SHA256]-----+

[root@hadoop2 ~]# ssh-keygen -t rsa

Generating public/private rsa key pair.

Enter file in which to save the key (/root/.ssh/id_rsa):

Created directory '/root/.ssh'.

Enter passphrase (empty for no passphrase):

Enter same passphrase again:

Your identification has been saved in /root/.ssh/id_rsa. Y

our public key has been saved in /root/.ssh/id_rsa.pub.

The key fingerprint is:

SHA256:C3azvXal3IjmRmD/FClkEmxzS17X8TMOCWRV/0OgvSM root@hadoop2

The key's randomart image is:

+---[RSA 2048]----+

| ....+.o.+.|

| = * = + +|

| . O + * +o|

| o + o = +|

| o.So E + o.|

| . o =o o o .|

| o..= = |

| =.* . |

| =o. |

+----[SHA256]-----+

[root@hadoop3 ~]# ssh-keygen -t rsa

Generating public/private rsa key pair.

Enter file in which to save the key (/root/.ssh/id_rsa):

Created directory '/root/.ssh'. Enter passphrase (empty for no passphrase):

Enter same passphrase again:

Your identification has been saved in /root/.ssh/id_rsa.

Your public key has been saved in /root/.ssh/id_rsa.pub.

The key fingerprint is:

SHA256:iswVFWqHXhN4BaIT2mMD0EwfAc8KC+hw41lVbK5sGFg root@hadoop3

The key's randomart image is:

+---[RSA 2048]----+

|.=+.+.o+*+. |

|. oBE=.=+. |

|= +o% =++ |

|+=.*.* +.. |

|..+ +o.S |

| o.o+. |

| +.. |

| |

| |

+----[SHA256]-----+

分別在hadoop2和hadoop3中進行操作:

[root@hadoop2 ~]# cp .ssh/id_rsa.pub hadoop2_id_rsa.pub

[root@hadoop2 ~]# scp hadoop2_id_rsa.pub hadoop1:.ssh/

[root@hadoop3 ~]# cp .ssh/id_rsa.pub hadoop3_id_rsa.pub

[root@hadoop3 ~]# scp hadoop3_id_rsa.pub hadoop1:.ssh/

在hadoop1中:

[root@hadoop1 ~]# cd .ssh

[root@hadoop1 .ssh]# cat hadoop2_id_rsa.pub hadoop3_id_rsa.pub id_rsa.pub >> authorized_keys

分別把authorized_keys發(fā)送到hadoop2和hadoop3中

[root@hadoop1 .ssh]# scp authorized_keys hadoop2:.ssh/

root@hadoop2's password:

authorized_keys

[root@hadoop1 .ssh]# scp authorized_keys hadoop3:.ssh/

root@hadoop3's password:

authorized_keys

分別在hadoop1、hadoop2和hadoop3中進行權(quán)限設(shè)置

[root@hadoop1 ~]# chmod 700 .ssh

[root@hadoop1 ~]# chmod 600 .ssh/authorized_keys

[root@hadoop2 ~]# chmod 700 .ssh

[root@hadoop2 ~]# chmod 600 .ssh/authorized_keys

[root@hadoop3 ~]# chmod 700 .ssh/

[root@hadoop3 ~]# chmod 600 .ssh/authorized_keys

測試:

[root@hadoop1 ~]# ssh hadoop2 Last login: Tue Dec 6 17:09:08 2022 from 192.168.91.1 [root@hadoop2 ~]# exit

登出

Connection to hadoop2 closed.

[root@hadoop1 ~]# ssh hadoop3

Last login: Tue Dec 6 17:09:12 2022 from 192.168.91.1

[root@hadoop3 ~]# exit4 -bash: exit4: 未找到命令

[root@hadoop3 ~]# exit

登出 Connection to hadoop3 closed.

2.3.11 hadoop啟動命令

[root@hadoop1 ~]# start-all.sh? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ---啟動hadoop所有守護進程

在hadoop1:

[root@hadoop1 ~]# jps

54578 DataNode

56274 Jps

55315 ResourceManager

54314 NameNode

55471 NodeManager

hadoop2:

[root@hadoop2 ~]# jps

29076 SecondaryNameNode

29284 NodeManager

28842 DataNode

30090 Jps

hadoop3:

[root@hadoop3 ~]# jps

28786 DataNode

29154 NodeManager

30197 Jps

通過網(wǎng)頁方式進行訪問:

http://192.168.91.100:50070

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?http://192.168.91.100:8088? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?----yarn

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.3.12 Flume安裝和配置

在企業(yè)中,經(jīng)常需要在多個節(jié)點中進行采集數(shù)據(jù),推介使用FLume進行數(shù)據(jù)采集,本節(jié)課主要內(nèi)容講解flume基礎(chǔ)配置和簡單應(yīng)用

a、上傳flume安裝包

b、解壓flume安裝包

[root@hadoop1 ~]# tar -zxvf /soft/apache-flume-1.7.0-bin.tar.gz -C /opt

c、配置flum的環(huán)境變量

[root@hadoop1 ~]# vi /etc/profile

#Flume的環(huán)境變量

export FLUME_HOME=/opt/apache-flume-1.7.0-bin

export?PATH=$JAVA_HOME/bin:$FLUME_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin

d、生效flume的環(huán)境變量

[root@hadoop1 ~]# source /etc/profile

e、測試 [root@hadoop1 ~]# flume-ng version

Flume 1.7.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707

Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016

From source with checksum 0d21b3ffdc55a07e1d08875872c00523

2.4 Windows開發(fā)環(huán)境配置

2.4.1 jdk安裝和環(huán)境變量配置

jdk選用版本----jdk1.8

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

配置jdk環(huán)境變量:

選擇此電腦---->右鍵----->屬性------>高級系統(tǒng)設(shè)置----->環(huán)境變量

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

追加到path路徑下

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

測試:

win鍵+r 輸入cmd進入dos命令窗口:

C:\Users\error>java -version

java version "1.8.0_151"

Java(TM) SE Runtime Environment (build 1.8.0_151-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

2.4.2 Maven安裝和環(huán)境變量配置

解壓Maven的安裝包:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?選擇此電腦---->右鍵----->屬性------>高級系統(tǒng)設(shè)置----->環(huán)境變量

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?追加path路徑下:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

測試:

win鍵+r鍵輸入cmd

C:\Users\error>mvn -v

Apache Maven 3.0.5 (r01de14724cdef164cd33c7c8c2fe155faf9602da; 2013-02-19 21:51:28+0800)

Maven home: D:\apache-maven-3.0.5\bin\..

Java version: 1.8.0_151, vendor: Oracle Corporation

Java home: C:\Program Files\Java\jdk1.8.0_151\jre

Default locale: zh_CN, platform encoding: GBK

OS name: "windows 10", version: "10.0", arch: "amd64", family: "dos"

?2.4.3 IDAE開發(fā)工具安裝和配置

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.4.4 IDEA集成Maven開發(fā)環(huán)境

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.5 Spark環(huán)境搭建

a、上傳spark軟件包linux操作系統(tǒng)

b、解壓Spark軟件包

[root@hadoop1 ~]# tar -zxvf /soft/spark-2.1.1-bin-hadoop2.7.tgz -C /opt

c、配置Spark的環(huán)境變量

[root@hadoop1 ~]# vi /etc/profile

#Spark的環(huán)境變量

export SPARK_HOME=/opt/spark-2.1.1-bin-hadoop2.7

export?PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$FLUME_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:$HOME/bin

d、修改spark配置文件

[root@hadoop1 conf]# pwd

/opt/spark-2.1.1-bin-hadoop2.7/conf

修改spark的啟動文件:

[root@hadoop1 conf]# cp spark-env.sh.template spark-env.sh

[root@hadoop1 conf]# vi spark-env.sh

export JAVA_HOME=/opt/jdk1.8.0_162

修改slaves文件:

[root@hadoop1 conf]# cp slaves.template slaves

[root@hadoop1 conf]# vi slaves

hadoop1

hadoop2

hadoop3

把配置Spark發(fā)送到hadoop2、hadoop3節(jié)點上:

[root@hadoop1 ~]# scp -r /opt/spark-2.1.1-bin-hadoop2.7 hadoop2:/opt/

[root@hadoop1 ~]# scp -r /opt/spark-2.1.1-bin-hadoop2.7 hadoop3:/opt/

把系統(tǒng)配置文件也發(fā)送到hadoop2、hadoop3節(jié)點上:

[root@hadoop1 ~]# scp /etc/profile hadoop2:/etc/profile

profile? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 100% 2183 1.7MB/s 00:00

[root@hadoop1 ~]# scp /etc/profile hadoop3:/etc/profile

profile

分別在hadoop2和hadoop3上執(zhí)行source

[root@hadoop1 ~]# cd /opt/spark-2.1.1-bin-hadoop2.7/sbin/

[root@hadoop1 ~]# cd /opt/spark-2.1.1-bin-hadoop2.7/sbin/

[root@hadoop1 sbin]# ./start-all.sh

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out

hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out

hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out

hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.1.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?2.6 搭建一下Windows--spark的開發(fā)環(huán)境

安裝scala工具包

scala-2.11.0.msi-----自行安裝

配置scala環(huán)境變量

選擇此電腦---右鍵-----屬性-----高級系統(tǒng)設(shè)置------環(huán)境變量

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?追加到path路徑下:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?win鍵+r鍵 輸入cmd

C:\Users\error>scala

Welcome to Scala version 2.11.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151).

Type in expressions to have them evaluated.

Type :help for more information. scala> 16*16

res0: Int = 256

idea安裝scala插件:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?創(chuàng)建一個scala項目:

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

三、大數(shù)據(jù)技術(shù)應(yīng)用

3.1 Flume的應(yīng)用

3.1.1 Flume的初體驗

https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html? ? ? ?-----flume的官方文檔

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

flume本質(zhì):一個agent,表示一個代理,相當于一個JVM
agent是由多個組件組成,常用的組件有source、channel、sink
source:數(shù)據(jù)源組件------主要用于采集數(shù)據(jù)
channel:管道的組件-----緩沖池
sink:下沉組件---------目的地
類比生活中真實事物------自來水場(source)--------管道(channel)---------家中水龍頭(下沉)
上述案例中:以webserver產(chǎn)生日志數(shù)據(jù)作為數(shù)據(jù)源------使用內(nèi)存(文件)作為channel------數(shù)據(jù)下沉hdfs中,hdfs作為的sink
需求:
采集sockt產(chǎn)生數(shù)據(jù),把數(shù)據(jù)下沉到日志中
source:socket
channel:memory
sink:logger(日志)
準備工作:
[root@hadoop1 ~]# yum install nc -y
小測試:
開啟一個服務(wù)端:
[root@hadoop1 ~]# nc -l 55555
客戶端連接上了服務(wù)端:
[root@hadoop1 ~]# nc hadoop1 55555
flume進行數(shù)據(jù)采集----相當于是服務(wù)端:
[root@hadoop1 ~]# mkdir /flume-run
編寫flume執(zhí)行案例:
[root@hadoop1 ~]# vi /flume-run/nc-memory-logger.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 詳細配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop1
a1.sources.r1.port = 44444

# 詳細配置sink
a1.sinks.k1.type = logger

# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執(zhí)行flume的數(shù)據(jù)采集文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/nc-memory-logger.conf -Dflume.root.logger=INFO,console
-n:表示agent的名稱
-c:表示flume配置
-f:表示執(zhí)行flume文件
-D: 表示Java屬性配置信息
2022-12-06 19:15:53,140 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 ? ? ? ? ? ? ? ?hello flume }
2022-12-06 19:15:58,032 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6E 63 ? ? ? ? ? ? ? ? ? ? ? ? hello nc }
2022-12-06 19:16:00,688 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 61 64 6F 6F 70 ? ? ? ? ? ? hello hadoop }

客戶端:
[root@hadoop1 ~]# nc hadoop1 44444
hello flume
OK
hello nc
OK
hello hadoop
OK

3.1.2 企業(yè)開發(fā)中----離線數(shù)據(jù)采集

日志類型數(shù)據(jù),不需要立馬得出結(jié)論??梢酝ㄟ^設(shè)置定時器方式,定時進行采集
模擬企業(yè)開發(fā)中,定時采集案例:
定時采集數(shù)據(jù)到hdfs上
數(shù)據(jù)源:采集整個文件夾方式----dir
管道:內(nèi)存----memory
下沉:hdfs分布式文件系統(tǒng)上
a、創(chuàng)建一個文件夾----用來存放采集時需要文件數(shù)據(jù)
[root@hadoop1 ~]# kill -9 101330 ? ?----殺死進程
[root@hadoop1 ~]# mkdir /web_log
[root@hadoop1 ~]# vi /web_log/a.log
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
[root@hadoop1 ~]# vi /web_log/b.log
hello,c
java,linux,hadoop
spark,c,python
java,c,python
需求把上述兩個日志文件數(shù)據(jù)采集到hdfs上。
[root@hadoop1 ~]# vi /flume-run/dir-memory-hdfs.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 詳細配置source
a1.sources.r1.type = spooldir
#需要采集文件夾位置
a1.sources.r1.spoolDir = /web_log/

# 詳細配置sink
a1.sinks.k1.type = hdfs
#需要采集文件到hfds中位置
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d
#防止采集時候出現(xiàn)大量小文件
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#設(shè)置hdfs中文件類型
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#設(shè)置本地的時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執(zhí)行文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/dir-memory-hdfs.conf -Dflume.root.logger=INFO,console
測試:
[root@hadoop1 ~]# hadoop fs -cat /flume/events/2022-12-06/FlumeData.1670337504112
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python

3.1.3 企業(yè)開發(fā)中----實時數(shù)據(jù)采集

實時數(shù)據(jù)如何進行監(jiān)控
模擬:
構(gòu)建實時數(shù)據(jù)的文件:
[root@hadoop1 ~]# vi /tmp/gp.log ? ? ?----空文件
需求數(shù)據(jù)會實時寫入gp.log文件中,需要采集gp.log中實時數(shù)據(jù)
編輯flume采集文件:
[root@hadoop1 ~]# vi /flume-run/file-memory-logger.conf
# a1表示agent別名,分別定義source、channel、sinks
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 詳細配置source
a1.sources.r1.type = exec
# 實時去關(guān)注文件最后一行數(shù)據(jù)
a1.sources.r1.command = tail -F /tmp/gp.log

# 詳細配置sink
a1.sinks.k1.type = logger

# 詳細配置管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定source和channel,綁定sink和channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
運行flume采集文件:
[root@hadoop1 ~]# flume-ng agent -n a1 -c /opt/apache-flume-1.7.0-bin/conf/ -f /flume-run/file-memory-logger.conf -Dflume.root.logger=INFO,console
測試:
[root@hadoop1 ~]# echo hello java >> /tmp/gp.log?
[root@hadoop1 ~]# echo hello hadoop >> /tmp/gp.log
測試結(jié)果展示:
ava:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6A 61 76 61 ? ? ? ? ? ? ? ? ? hello java }
2022-12-06 23:35:46,393 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 61 64 6F 6F 70 ? ? ? ? ? ? hello hadoop }

3.2 第一個Maven項目

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

?3.3 復(fù)習(xí)

a、hadoop完全分布式搭建
hadoop namenode -format ?-----不能多次格式化namenode
b、flume安裝和配置
解壓flume軟件包、配置flume的環(huán)境變量、編寫flume執(zhí)行文件
c、flume在企業(yè)中常規(guī)應(yīng)用
離線數(shù)據(jù)的采集和實時數(shù)據(jù)的采集
d、windows中配置開發(fā)環(huán)境
?a、jdk環(huán)境變量配置
?b、Maven環(huán)境變量的配置
?c、idea安裝和破解
?d、idea繼承Maven的開發(fā)環(huán)境
?e、創(chuàng)建Maven項目

3.4 利用Java代碼操作HDFS

Hadoop軟件本身由java語言編寫而成,可以說Hadoop跟Java是無縫對接。java中提供一套API可以用于操作的HDFS
在hdfs中提供很多操作文件系統(tǒng)命令:
創(chuàng)建文件夾
[root@hadoop1 ~]# hadoop fs -mkdir /linuxMkdir
刪除文件夾
[root@hadoop1 ~]# hadoop fs -rmdir /linuxMkdir
上傳一個文件到hdfs上:
[root@hadoop1 ~]# hadoop fs -put anaconda-ks.cfg /linuxMkdir
[root@hadoop1 ~]# hadoop fs -lsr /linuxMkdir
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- ? 3 root supergroup ? ? ? 1257 2022-12-07 07:02 /linuxMkdir/anaconda-ks.cfg
下載一個文件到linux中:
[root@hadoop1 ~]# hadoop fs -get /linuxMkdir/anaconda-ks.cfg /tmp
根據(jù)上述命令,編寫java代碼:
準備工作:
添加hadoop相關(guān)依賴
Maven:
? ?a、用于構(gòu)建項目
? ?b、管理jar包
? ?c、項目打包、運行工作

<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-client</artifactId>
? <version>2.7.3</version>
</dependency>
<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-common</artifactId>
? <version>2.7.3</version>
</dependency>
<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-hdfs</artifactId>
? <version>2.7.3</version>
</dependency>

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

解決方法:[root@hadoop1 ~]# hadoop fs -chmod -R 777 /

package org.tjcj.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/**
 * 主要用于操作hdfs
 */
public class HDFSUtil {
    //文件系統(tǒng)
    private static FileSystem fileSystem;
    static{//靜態(tài)代碼塊主要作用用于實例化fileSystem
        //獲取當前的hadoop開發(fā)環(huán)境
        Configuration conf = new Configuration();
        //設(shè)置文件系統(tǒng)類型
        conf.set("fs.defaultFS","hdfs://192.168.91.100:9000");
        if(fileSystem == null){
            try {
                fileSystem = FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 創(chuàng)建一個文件夾方法
     * @param path
     */
    public static void createDir(String path) throws IOException {
        boolean flag = false;
        if (!fileSystem.exists(new Path(path))){//如果文件夾不存在
            flag = fileSystem.mkdirs(new Path(path));
        }
        if(flag){
            System.out.println("文件夾已經(jīng)創(chuàng)建成功。。。");
        }else{
            System.out.println("文件夾已經(jīng)存在。。。。");
        }
    }

    /**
     * 刪除文件夾以及文件
     * @param path
     */
    public static void delete(String path) throws IOException {
        boolean flag = false;
        if(fileSystem.exists(new Path(path))){
            flag = fileSystem.delete(new Path(path),true);
        }
        if(flag){
            System.out.println("文件或者文件夾已經(jīng)刪除");
        }else{
            System.out.println("文件或者文件夾不存在");
        }
    }

    /**
     * 上傳到hdfs上
     * @param srcPath
     * @param destPath
     * @throws IOException
     */
    public static void uploadFile(String srcPath,String destPath) throws IOException {
        fileSystem.copyFromLocalFile(false,true,new Path(srcPath),new Path(destPath));
        System.out.println("文件上傳成功!??!");
    }
    public static void downloadFile(String srcPath,String destPath) throws IOException {
        fileSystem.copyToLocalFile(false,new Path(srcPath),new Path(destPath),true);
        System.out.println("文件下載成功");
    }
    public static void main(String[] args) throws IOException {
//        createDir("/javaMkdir");
//        delete("/javaMkdir");
//        uploadFile("D:\\data\\input\\lol.txt","/javaMkdir");
        downloadFile("/javaMkdir/lol.txt","D:\\");
    }
}

3.5 MapReduce----wordcount

簡單的編程模型:
map:利用核心思想,并行處理
reduce:聚合,把map端輸出的信息做一個統(tǒng)計
引入依賴:

<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-mapreduce-client-common</artifactId>
? <version>2.7.3</version>
</dependency>
<dependency>
? <groupId>org.apache.hadoop</groupId>
? <artifactId>hadoop-mapreduce-client-core</artifactId>
? <version>2.7.3</version>
</dependency>

代碼:

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
?* 單詞計數(shù)
?*/
public class WordCount {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? //2、根據(jù),進行切割
? ? ? ? ? ? String [] splits = line.split(",");
? ? ? ? ? ? //3、寫出到shuffle階段
? ? ? ? ? ? for (String split : splits) {
? ? ? ? ? ? ? ?context.write(new Text(split),new LongWritable(1)); //[hadoop,1]
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ?long count=0;
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? count+=value.get();
? ? ? ? ? ? }
? ? ? ? ? ? context.write(key,new LongWritable(count));
? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(WordCount.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/flume/events/2022-12-06/FlumeData.1670337504112"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out1"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}

[root@hadoop1 ~]# hadoop fs -cat /mr/out/out1/part-r-00000
c ? ? ? 5
hadoop ?4
hello ? 5
java ? ?4
linux ? 4
python ?2
spark ? 2
wordcount:單詞計數(shù),統(tǒng)計單詞出現(xiàn)次數(shù),利用shuffle階段特性:
a、合并key值相同
b、根據(jù)key值進行自然排序
c、根據(jù)key值跟reduce個數(shù)求模,分配到不同的reduce中

3.6 TopN問題熱賣榜,topn類似問題,可以使用MapReduce做處理

接著上午案例:統(tǒng)計大學(xué)中,學(xué)科所修次數(shù),統(tǒng)計排名靠前學(xué)科(統(tǒng)計前三學(xué)科)

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

/**
?* 統(tǒng)計學(xué)科排名
?*/
public class TopN {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<Long,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? treeMap.put(Long.parseLong(splits[1]),splits[0]);
? ? ? ? }

? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>3){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (Long aLong : treeMap.keySet()) {
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(aLong)),new LongWritable(aLong));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(TopN.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out1/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out2"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}

[root@hadoop1 ~]# hadoop fs -cat /mr/out/out2/part-r-00000
hello ?5
linux ? 4
spark ? 2
有瑕疵,如何避免出現(xiàn)

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

/**
?* 統(tǒng)計學(xué)科排名
?*/
public class TopN {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<String,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? //生成新的key
? ? ? ? ? ? String newKey=splits[1]+"-"+Math.random();
? ? ? ? ? ? System.out.println(newKey);
? ? ? ? ? ? treeMap.put(newKey,splits[0]);
? ? ? ? }

? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>3){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (String s : treeMap.keySet()) {
? ? ? ? ? ? ? ? String [] str = s.split("-");
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(s)),new LongWritable(Long.parseLong(str[0])));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(TopN.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out1/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out3"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}

運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out3/part-r-00000
c ? ? ? 5
hadoop ?4
hello ? 5

3.7 數(shù)據(jù)清洗數(shù)據(jù)清洗第一步:數(shù)據(jù)去重

[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
[root@hadoop1 web_log]# cat a.log.COMPLETED b.log.COMPLETED >>c.log
原始文件:
[root@hadoop1 web_log]# cat c.log?
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
hello,java,hello,hadoop
hello,linux,java,c
hello,linux,c,hadoop
hadoop,spark,linux
hello,c
java,linux,hadoop
spark,c,python
java,c,python
上述文件中存在很多重復(fù)文件,需求對重復(fù)文件進行去重工作:
[root@hadoop1 ~]# hadoop fs -put /web_log/c.log /linuxMkdir/
[root@hadoop1 ~]# hadoop fs -chmod -R 777 /linuxMkdir/

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
?* 數(shù)據(jù)去重
?*/
public class DupMR {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? context.write(value,NullWritable.get());
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? context.write(key,NullWritable.get());
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(DupMR.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(NullWritable.class);
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(NullWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/linuxMkdir/c.log"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out5"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}

[root@hadoop1 ~]# hadoop fs -cat /mr/out/out5/part-r-00000
hadoop,spark,linux
hello,c
hello,java,hello,hadoop
hello,linux,c,hadoop
hello,linux,java,c
java,c,python
java,linux,hadoop
spark,c,python
作業(yè):
統(tǒng)計2020年中ban選英雄排名前五的
思路:
? ?a、統(tǒng)計英雄ban選的次數(shù)
? ?b、統(tǒng)計排名前五

大數(shù)據(jù)應(yīng)用實訓(xùn),大數(shù)據(jù),java,spring boot,log4j,hadoop,spark,echarts

3.8 處理英雄聯(lián)盟數(shù)據(jù)案例

代碼:
第一步,統(tǒng)計ban選英雄的次數(shù)

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
?* 統(tǒng)計英雄ban選次數(shù)
?*/
public class BanMR1 {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? if(!line.contains("agt")){//去除第一行數(shù)據(jù)
? ? ? ? ? ? ? ? //2、根據(jù)\t,進行切割
? ? ? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? ? ? //3、寫出到shuffle階段
? ? ? ? ? ? ? ? for (int i=10;i<15;i++) {//找到ban選英雄
? ? ? ? ? ? ? ? ? ? context.write(new Text(splits[i]),new LongWritable(1)); //[hadoop,1]
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ?long count=0;
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? count+=value.get();
? ? ? ? ? ? }
? ? ? ? ? ? context.write(key,new LongWritable(count));
? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(BanMR1.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/javaMkdir/lol.txt"));
? ? ? ? //設(shè)置輸出路徑,輸出路徑一定要確保不存在
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out6"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}


[root@hadoop1 ~]# hadoop fs -cat /mr/out/out6/part-r-00000
Aatrox ?93
Akali ? 123
Alistar 7
Aphelios ? ? ? ?545
Ashe ? ?71
Aurelion Sol ? ?10
Azir ? ?268
Bard ? ?178
Blitzcrank ? ? ?42
Braum ? 92
Caitlyn 81
。。。
第二步,選取ban選英雄排名前五的

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

/**
?* 統(tǒng)計英雄ban選的排名
?*/
public class BanMR2 {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
? ? ? ? private TreeMap<Long,String> treeMap = new TreeMap<>();
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、獲取一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();//c ? ? ? 5
? ? ? ? ? ? //2、切分字符串
? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? treeMap.put(Long.parseLong(splits[1]),splits[0]);
? ? ? ? }

? ? ? ? /**
? ? ? ? ?* 整個MapReduce中只調(diào)用一次
? ? ? ? ?* @param context
? ? ? ? ?* @throws IOException
? ? ? ? ?* @throws InterruptedException
? ? ? ? ?*/
? ? ? ? @Override
? ? ? ? protected void cleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ? ?while(treeMap.size()>5){
? ? ? ? ? ? ? ?treeMap.remove(treeMap.firstKey());
? ? ? ? ? ?}
? ? ? ? ? ? for (Long aLong : treeMap.keySet()) {
? ? ? ? ? ? ? ? context.write(new Text(treeMap.get(aLong)),new LongWritable(aLong));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? for (LongWritable value : values) {
? ? ? ? ? ? ? ? context.write(key,value);
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(BanMR2.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/mr/out/out6/part-r-00000"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out8" ));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}


運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /mr/out/out8/part-r-00000
Aphelios ? ? ? ?545
Kalista 460
LeBlanc 474
Sett ? ?602
Varus ? 469

3.9 統(tǒng)計2020年春季賽中,LPL賽區(qū)中,拿到一血隊伍的勝率分析:

條件春季賽,lpl賽區(qū)

package org.tjcj.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
?* 統(tǒng)計拿到1血隊伍的勝率
?*/
public class WinFirstBlod {
? ? public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
? ? ? ? @Override
? ? ? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? //1、拿到一行數(shù)據(jù)
? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? if(!line.contains("agt") && line.contains("Spring") && line.contains("LPL")){//根據(jù)要求進行數(shù)據(jù)的篩選
? ? ? ? ? ? ? ? //2、切分數(shù)據(jù)
? ? ? ? ? ? ? ? String [] splits = line.split("\t");
? ? ? ? ? ? ? ? int firstBlood = Integer.parseInt(splits[24]);
? ? ? ? ? ? ? ? if(firstBlood==1){//拿過1血隊伍
? ? ? ? ? ? ? ? ? ? context.write(new Text("FirstBlood"),new Text(splits[16]+","+firstBlood));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static class MyReducer extends Reducer<Text,Text,Text,LongWritable>{
? ? ? ? @Override
? ? ? ? protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? long sum=0;//統(tǒng)計拿一血隊伍總次數(shù)
? ? ? ? ? ? long win=0;//拿到1血且勝利情況
? ? ? ? ? ? for (Text value : values) {
? ? ? ? ? ? ? ? String line =value.toString();
? ? ? ? ? ? ? ? String [] str = line.split(",");
? ? ? ? ? ? ? ? int result=Integer.parseInt(str[0]);
? ? ? ? ? ? ? ? if(result==1){//隊伍獲勝了
? ? ? ? ? ? ? ? ? ? win++;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? sum++;
? ? ? ? ? ? }
? ? ? ? ? ? context.write(new Text("拿1血隊伍的勝率"),new LongWritable(win*100/sum));

? ? ? ? }
? ? }

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //獲取當前hadoop開發(fā)環(huán)境
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? configuration.set("fs.defaultFS","hdfs://192.168.91.100:9000");
? ? ? ? //創(chuàng)建MapReduce任務(wù)
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? //設(shè)置執(zhí)行的主類
? ? ? ? job.setJarByClass(WinFirstBlod.class);
? ? ? ? //設(shè)置Mapper
? ? ? ? job.setMapperClass(MyMapper.class);
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(Text.class);
? ? ? ? //設(shè)置Reducer
? ? ? ? job.setReducerClass(MyReducer.class);
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(LongWritable.class);
? ? ? ? //設(shè)置輸入路徑
? ? ? ? FileInputFormat.addInputPath(job,new Path("/javaMkdir/lol.txt"));
? ? ? ? //設(shè)置輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/mr/out/out9"));
? ? ? ? //提交任務(wù)
? ? ? ? System.out.println(job.waitForCompletion(true)?"success!!!":"failed!!!");
? ? }
}

[root@hadoop1 ~]# hadoop fs -cat /mr/out/out9/part-r-00000
拿1血隊伍的勝率 63
加入日志依賴:

<!--添加日志-->
<dependency>
? <groupId>log4j</groupId>
? <artifactId>log4j</artifactId>
? <version>1.2.17</version>
</dependency>

log4j屬性文件:

log4j.rootLogger=INFO,console
log4j.additivity.org.apache=true
# 控制臺(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

3.10 MapReduce的小測試

案例:
1、統(tǒng)計2020年全年度,擊殺榜排名前五的隊伍
2、統(tǒng)計2020年全年度,RED的勝率
3、分別統(tǒng)計lck、lpl賽區(qū)拿五殺的隊伍

3.11 Spark中Wordcount案例

[root@hadoop1 ~]# spark-shell ? ?-----進入scala編程界面
scala> sc.textFile("/web_log/c.log") ? ? ? -----獲取一個文件 ? hello,java
scala> res0.flatMap(_.split(","))? ? ? ? ? ? ? -------hello ? java
scala> res1.map((_,1)) ? ? ? ? ? ? ? ? ? ? -------<hello,1>
scala> res2.reduceByKey(_+_) ? ? ? ? ? ? ? -------根據(jù)key值進行合并,value值進行相加

scala> res3.sortBy(_._2,false) ? ? ? ? ? ? -------根據(jù)值進行排序
scala> res4.collect ? ? ? ? ? ? ? ? ? ? ? ?-------把數(shù)據(jù)變成數(shù)組
res6: Array[(String, Int)] = Array((hello,25), (c,25), (linux,20), (java,20), (hadoop,20), (python,10), (spark,10))
scala> res6.foreach(println) ? ? ? ? ? ? ? -------打印數(shù)據(jù)
(hello,25)
(c,25)
(linux,20)
(java,20)
(hadoop,20)
(python,10)
(spark,10)
?上述案例可以寫一起:
?scala> sc.textFile("/web_log/c.log").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
(python,10)
(hello,25)
(linux,20)
(java,20)
(spark,10)
(hadoop,20)
(c,25)

3.12 Windows下Spark的開發(fā)環(huán)境

spark依賴:

<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-core_2.11</artifactId>
? <version>2.1.1</version>
</dependency>

wordcount案例:

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

/**
?* Spark中經(jīng)典案例
?*/
object WordCount {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? //2、WordCount核心代碼
? ? sc.textFile("D:\\a.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).
? ? ? sortBy(_._2,false).collect().foreach(println)
? }
}

3.13 Spark中RDD

RDD:彈性分布式數(shù)據(jù)集-------封裝一些功能
Spark提供很多便于用戶直接操作方法------RDD(彈性分布式數(shù)據(jù)集)
RDD特性:
1.RDD可以看做是?些列partition所組成的
2.RDD之間的依賴關(guān)系
3.算?是作?在partition之上的
4.分區(qū)器是作?在kv形式的RDD上
5.partition提供的最佳計算位置,利于數(shù)據(jù)處理的本地化即計算向數(shù)據(jù)移動?不是移動數(shù)據(jù)(就近原則)
RDD中彈性:
存儲的彈性:內(nèi)存與磁盤的
?動切換容錯的彈性:數(shù)據(jù)丟失可以
?動恢復(fù)計算的彈性:計算出錯重試機制
分?的彈性:根據(jù)需要重新分?

3.13.1 RDD創(chuàng)建

集群中創(chuàng)建:
利用集合方式進行創(chuàng)建------->Array、List
scala> val a = Array(1,2,3,4)
a: Array[Int] = Array(1, 2, 3, 4)

scala> val b = List(1,2,3,4)
b: List[Int] = List(1, 2, 3, 4)
方式一:
scala> sc.parallelize(a)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
方式二:
scala> sc.makeRDD(b)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:27
需求:
a、res0整體值變?yōu)樵瓉韮杀?br> scala> res0.map(_*2).collect().foreach(println)
2 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
4
6
8
b、res1整體+1
scala> res1.map(_+1).collect().foreach(println)
2
3
4
5
scala> res1.map(_=>{"a"}).collect().foreach(println)
a
a
a
a
通過外部文件方式:
scala> sc.textFile("/web_log/c.log")
res11: org.apache.spark.rdd.RDD[String] = /web_log/c.log MapPartitionsRDD[9] at textFile at <console>:25

3.13.2 常用算子(RDD)

參考網(wǎng)站:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
按照字母自然順序講解常用算子:
collect------可以把RDD轉(zhuǎn)換為數(shù)組
scala> val rdd = sc.makeRDD(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24

scala> rdd.collect()
res13: Array[Int] = Array(1, 2, 3, 4)?
count------計數(shù)
scala> sc.textFile("/web_log/c.log")
res14: org.apache.spark.rdd.RDD[String] = /web_log/c.log MapPartitionsRDD[12] at textFile at <console>:25

scala> res14.count()
res15: Long = 40?
distinct ----去重
scala> sc.textFile("/web_log/c.log").distinct()
res16: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at distinct at <console>:25

scala> res16.count()
res17: Long = 8
first--------拿到第一行數(shù)據(jù)
scala> sc.textFile("hdfs://192.168.91.100:9000//javaMkdir/lol.txt")
res18: org.apache.spark.rdd.RDD[String] = hdfs://192.168.91.100:9000//javaMkdir/lol.txt MapPartitionsRDD[19] at textFile at <console>:25

scala> res18.first()
res19: String = " ? ? ? agt ? ? league ?split ? playoffs ? ? ? ?date ? ?game ? ?patch ? side ? ?team ? ?ban1 ? ?ban2 ? ?ban3 ? ?ban4ban5 ? ? gamelength ? ? ?result ?kills ? deaths ?assists doublekills ? ? triplekills ? ? quadrakills ? ? pentakills ? ? ?firstblood ?team kpm firstdragon ? ? dragons opp_dragons ? ? elementaldrakes opp_elementaldrakes ? ? infernals ? ? ? mountains ? ? ? clouds ?oceans ? ? ? dragons (type unknown) ?elders ?opp_elders ? ? ?firstherald ? ? heralds opp_heralds ? ? firstbaron ? ? ?barons ?opp_barons ?firsttower ? ? ? towers ?opp_towers ? ? ?firstmidtower ? firsttothreetowers ? ? ?inhibitors ? ? ?opp_inhibitors ?damagetochampions ? dpm ? ? ?damageshare ? ? damagetakenperminute ? ?damagemitigatedperminute ? ? ? ?wardsplaced ? ? wpm ? ? wardskilled ? ? wcpm ? ?controlwardsbought ? visionscore ? ? vspm ? ?totalgold ? ? ? earnedgold ? ? ?earned gpm ? ? ?earnedgoldshare goldspent ? ? ? gspd ? ?total cs ? ? minionkills ? ? monsterkills ? ?monsterkillsownjungle ? monsterkillsenemyjungle cspm ? ?goldat1...
filter ?-------filter過濾
scala> sc.textFile("hdfs://192.168.91.100:9000//javaMkdir/lol.txt")
res20: org.apache.spark.rdd.RDD[String] = hdfs://192.168.91.100:9000//javaMkdir/lol.txt MapPartitionsRDD[21] at textFile at <console>:25
scala> res20.filter(_.contains("agt")).collect().foreach(println)
? ? ? ? agt ? ? league ?split ? playoffs ? ? ? ?date ? ?game ? ?patch ? side ? ?team ? ?ban1 ? ?ban2 ? ?ban3 ? ?ban4 ? ?ban5 ? ?gamelength ? result ?kills ? deaths ?assists doublekills ? ? triplekills ? ? quadrakills ? ? pentakills ? ? ?firstblood ? ? ?team kpm ? ?firstdragon ? ? ?dragons opp_dragons ? ? elementaldrakes opp_elementaldrakes ? ? infernals ? ? ? mountains ? ? ? clouds ?oceans ?dragons (type unknown) ? ? ? elders ?opp_elders ? ? ?firstherald ? ? heralds opp_heralds ? ? firstbaron ? ? ?barons ?opp_barons ? ? ?firsttower ? towers ?opp_towers ? ? ?firstmidtower ? firsttothreetowers ? ? ?inhibitors ? ? ?opp_inhibitors ?damagetochampions ? ? ? dpm damageshare ? ? ?damagetakenperminute ? ?damagemitigatedperminute ? ? ? ?wardsplaced ? ? wpm ? ? wardskilled ? ? wcpm ? ?controlwardsbought ? visionscore ? ? vspm ? ?totalgold ? ? ? earnedgold ? ? ?earned gpm ? ? ?earnedgoldshare goldspent ? ? ? gspd ? ?total cs ? ?minionkills ? ? ?monsterkills ? ?monsterkillsownjungle ? monsterkillsenemyjungle cspm ? ?goldat10 ? ? ? ?xpat10 ?csat10 ?opp_goldat10opp_xpat10 ? ? ? opp_csat10 ? ? ?golddiffat10 ? ?xpdiffat10 ? ? ?csdiffat10 ? ? ?goldat15 ? ? ? ?xpat15 ?csat15 ?opp_goldat15 ? ?opp_xpat15 ? opp_csat15 ? ? ?golddiffat15 ? ?xpdiffat15 ? ? ?csdiffat15
flatMap ?-----一行輸入,n行輸出
scala> sc.makeRDD(List("a","b","c")).flatMap(_*3).map((_,1)).collect().foreach(println)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
foreach ?------遍歷
scala> sc.makeRDD(List("a","b","c")).foreach(x=>println(x+"--spark"))
a--spark
b--spark
c--spark

scala> sc.makeRDD(List("a","b","c")).foreach(println)
b
c
a
map --------變形
scala> sc.makeRDD(List('a','b','c')).map(_*2).collect().foreach(println)
194
196
198
max最大值:
scala> sc.makeRDD(List(1,2,3,4,5)).max()
res37: Int = 5
min最小值:
scala> sc.makeRDD(List(1,2,3,4,5)).min()
res38: Int = 1
reduceByKey-----根據(jù)key進行合并
scala> sc.makeRDD(List("java","linux","c","java","hadoop"))
res39: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at makeRDD at <console>:25

scala> res39.map((_,1)).reduceByKey(_+_).collect().foreach(println)
(linux,1)
(java,2)
(hadoop,1)
(c,1)
saveAsTextFile-----把數(shù)據(jù)保存到某個位置
scala> res39.map((_,1)).reduceByKey(_+_).saveAsTextFile("/tmp/out1")
測試:
[root@hadoop1 tmp]# cd out1
[root@hadoop1 out1]# ls
part-00000 ?part-00001 ?_SUCCESS
[root@hadoop1 out1]# cat part-00000
(linux,1)
(java,2)
[root@hadoop1 out1]# cat part-00001
(hadoop,1)
(c,1)
sortBy------常用于排序工作
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._1,false).collect().foreach(println)
(linux,1)
(java,2)
(hadoop,1)
(c,1)

scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect().foreach(println)
(java,2) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
(linux,1)
(hadoop,1)
(c,1)
注意:false表示降序,true表示自然排序
take-----選取前幾
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
res46: Array[(String, Int)] = Array((java,2), (linux,1), (hadoop,1))
top-----排名
scala> res39.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).top(3)
res47: Array[(String, Int)] = Array((linux,1), (java,2), (hadoop,1))?

3.13.3 統(tǒng)計ban選英雄需求:

統(tǒng)計lol,春季賽中,ban選英雄的排名前五英雄

package org.tjcj

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
?* 統(tǒng)計英雄聯(lián)盟中ban選英雄排名前五的英雄
?*/
object BanTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? //2、獲取一個RDD對象,通過外部文件方式獲取RDD
? ? val rdd:RDD[String] = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt")
? ? //3、過濾文件,找到需要處理的信息
? ? val filterRDD:RDD[String] = rdd.filter(!_.contains("agt")).filter(_.contains("Spring"))
? ? //4、對原始數(shù)據(jù)進行變形
? ? val mapRDD = filterRDD.map(x=>{//x表示傳入一行數(shù)據(jù)
? ? ? var splits=x.split("\t")
? ? ? var str=""
? ? ? var i=10
? ? ? while(i<15){
? ? ? ?str+=splits(i)+","
? ? ? ?i+=1
? ? ? }
? ? ? str.substring(0,str.length-1) //新的字符串
? ? })
? ? //5、亞平處理
? ? val flatMapRDD = mapRDD.flatMap(_.split(","))
? ? //6、變形
? ? val mapRDD1 = flatMapRDD.map((_,1))
? ? //7、統(tǒng)計
? ? val reduceRDD = mapRDD1.reduceByKey(_+_)
? ? //8、排序
? ? val sortRDD = ?reduceRDD.sortBy(_._2,false)
? ? //val修飾常量,var修飾變量
? ? //9、取值
? ? val array = sortRDD.take(5)
? ? //10、轉(zhuǎn)換保存
? ? sc.makeRDD(array).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out2")
? }
}

測試結(jié)果:
[root@hadoop3 ~]# hadoop fs -cat ?/spark/out/out2/part-00000
(Aphelios,385)
(Sett,331)
(LeBlanc,294)
(Senna,262)
(Zoe,251)

3.13.4 統(tǒng)計整個賽年中l(wèi)ck賽區(qū),擊殺榜排名前5隊伍,(最終數(shù)據(jù)要求,IG,456)

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

object KillTop5 {
  def main(args: Array[String]): Unit = {
    //1、獲取SparkContext對象
    val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
      filter(_.contains("LCK")).map(x=>{
      ( x.split("\t")(9),Integer.parseInt(x.split("\t")(17)))  //(team,kills)
    }).reduceByKey(_+_).sortBy(_._2,false).take(5)
    sc.makeRDD(array).map(x=>{
      var str=""
      str=x._1+","+x._2
      str
    }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out5")
  }
}

測試結(jié)果:
[root@hadoop3 ~]# hadoop fs -cat /spark/out/out5/part-00000
DRX,1367
Gen.G,1356
DAMWON Gaming,1268
T1,1231
Afreeca Freecs,1038

3.13.4 統(tǒng)計一下整個賽年中l(wèi)pl賽區(qū),死亡排行榜,排名前五隊伍

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

object DeathTop5 {
  def main(args: Array[String]): Unit = {
    //1、獲取SparkContext對象
    val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
      filter(_.contains("LPL")).map(x=>{
      ( x.split("\t")(9),Integer.parseInt(x.split("\t")(18)))  //(team,kills)
    }).reduceByKey(_+_).sortBy(_._2,false).take(5)
    sc.makeRDD(array).map(x=>{
      var str=""
      str=x._1+","+x._2
      str
    }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out6")
  }
}

測試:
[root@hadoop3 ~]# hadoop fs -cat /spark/out/out6/part-00000
Invictus Gaming,1317
Team WE,1271
Suning,1261
LGD Gaming,1221
Rogue Warriors,1219

3.13.5 總結(jié)

RDD:分布式彈性數(shù)據(jù)集,如何去掌握學(xué)好RDD
熟悉常用的RDD
多做練習(xí),針對不同需求有不同解決方法
測試:
1、統(tǒng)計整個賽年中,lpl賽區(qū),助攻榜排名前五
案例:

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

/**
?* lol助攻榜
?*/
object AssistsTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
? ? ? filter(_.contains("LPL")).map(x=>{
? ? ? ( x.split("\t")(9),Integer.parseInt(x.split("\t")(19))) ?//(team,Assists)
? ? }).reduceByKey(_+_).sortBy(_._2,false).take(5)
? ? sc.makeRDD(array).map(x=>{
? ? ? var str=""
? ? ? str=x._1+","+x._2
? ? ? str
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out7")
? }
}

運行結(jié)果:

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

/**
?* lol助攻榜
?*/
object AssistsTop5 {
? def main(args: Array[String]): Unit = {
? ? //1、獲取SparkContext對象
? ? val conf:SparkConf = new SparkConf().setAppName("ban5").setMaster("local[*]")
? ? val sc:SparkContext = new SparkContext(conf)
? ? val array = sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt")).
? ? ? filter(_.contains("LPL")).map(x=>{
? ? ? ( x.split("\t")(9),Integer.parseInt(x.split("\t")(19))) ?//(team,Assists)
? ? }).reduceByKey(_+_).sortBy(_._2,false).take(5)
? ? sc.makeRDD(array).map(x=>{
? ? ? var str=""
? ? ? str=x._1+","+x._2
? ? ? str
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out7")
? }
}

2、統(tǒng)計整個賽年中,拿1血勝率
案例:

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

/**
?* 拿一血的勝率
?*/
object WinFirstBlood {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkContext對象
? ? val conf = new SparkConf().setAppName("winFirstBlood").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt"))
? ? ? .map(x=>{
? ? ? ? var splits=x.split("\t")
? ? ? ? (Integer.parseInt(splits(16)),Integer.parseInt(splits(24)))
? ? ? }).filter(x=>{
? ? ? ? x._2==1
? ? }).reduceByKey(_+_).map(x=>{
? ? ? var d:Double=x._2
? ? ? ("win",d)
? ? }).reduceByKey(_/_).map(x=>{
? ? ? (x._1,x._2.formatted("%.2f"))
? ? }).repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out9")
? }
}

運行結(jié)果:
[root@hadoop1 ~]# hadoop fs -cat /spark/out/out8/part-00000
(win,0.58)
3、統(tǒng)計整個賽年中,勝率超過60%隊伍,以及勝率

package org.tjcj

import org.apache.spark.{SparkConf, SparkContext}

/**
?* 整個賽年中勝率超過60%
?*/
object WinSpark {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkContext對象
? ? val conf = new SparkConf().setAppName("winFirstBlood").setMaster("local[*]")
? ? val sc = new SparkContext(conf)
? ? val rdd= sc.textFile("hdfs://192.168.91.100:9000/javaMkdir/lol.txt").filter(!_.contains("agt"))
? ? val rdd1= ?rdd.map(x=>{
? ? ? ? var splits =x.split("\t")
? ? ? ? (splits(9),Integer.parseInt(splits(16)))
? ? ? }).reduceByKey(_+_)
? ? val rdd2 = rdd.map(x=>{
? ? ? var splits =x.split("\t")
? ? ? (splits(9),1)
? ? }).reduceByKey(_+_)
? ? rdd1.union(rdd2).reduceByKey((x1,x2)=>{
? ? ? var win:Int=0;
? ? ? if(x1<x2){
? ? ? ? win = x1*100/x2
? ? ? }
? ? ? win
? ? }).filter(_._2>60).map(x=>(x._1,x._2+"%"))
? ? ? .repartition(1).saveAsTextFile("hdfs://192.168.91.100:9000/spark/out/out11")
? }
}

運行:
[root@hadoop1 ~]# hadoop fs -cat /spark/out/out11/part-00000
(Invictus Gaming,61%)
(DRX,64%)
(JD Gaming,70%)
(DAMWON Gaming,67%)
(T1,67%)
(Gen.G,69%)
(Top Esports,67%)

3.14 Spark中Spark-sql應(yīng)用

3.14.1 Spark-sql的基礎(chǔ)用法

搭建Spark-sql開發(fā)環(huán)境

<!--引入spark-sql依賴-->
<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-sql_2.11</artifactId>
? <version>2.1.1</version>
</dependency>
<dependency>
? <groupId>org.apache.spark</groupId>
? <artifactId>spark-hive_2.11</artifactId>
? <version>2.1.1</version>
</dependency>

案例:

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
?*
?*/
object MyFirstSparkSql {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkSession對象
? ? val spark = SparkSession.builder().appName("myFirstSparkSession").master("local[*]").getOrCreate()
? ? val df = spark.read.json("D:\\spark-sql\\person.json")
? ? df.show()
? }
}
可以讀取JSON格式的文件,進行展示
Spark-sql的基礎(chǔ)用法:
package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
?*
?*/
object MyFirstSparkSql {
? def main(args: Array[String]): Unit = {
? ? //創(chuàng)建SparkSession對象
? ? val spark = SparkSession.builder().appName("myFirstSparkSession").master("local[*]").getOrCreate()
? ? val df = spark.read.json("D:\\spark-sql\\people.json")
? ? df.show()
? ? //打印二維表的格式
? ? df.printSchema()
? ? //類似于sql語句格式進行查詢數(shù)據(jù)
? ? df.select("name","age").show()
? ? //引入scala,隱式轉(zhuǎn)換
? ? import spark.implicits._
? ? //讓字段進行運算
? ? df.select($"name",$"age"+1).show()
? ? //加上條件
? ? df.select("name").where("age>20").show()
? ? //進行分組統(tǒng)計
? ? df.groupBy("age").count().show()
? ? //創(chuàng)建一個臨時表
? ? df.createOrReplaceTempView("people")
? ? spark.sql("select age,count(*) as count1 from people group by age").show()
? ? //關(guān)閉spark
? ? spark.stop()
? }
}

3.14.2 Spark-sql中read

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 讀取不同類型的文件
 */
object SparkRead {
  def main(args: Array[String]): Unit = {
    //構(gòu)建SparkSession對象
    val spark = SparkSession.builder().appName("sparkRead").master("local[*]").getOrCreate()
    //讀取json文件
    val df = spark.read.json("D:\\spark-sql\\employees.json")
    df.show()
    df.printSchema()
    //讀取csv文件
    val df1= spark.read.csv("D:\\spark-sql\\lol.csv")
    df1.show()
    df1.printSchema()
    //讀取普通文本文件
    val df2=spark.read.text("D:\\spark-sql\\student.txt")
    df2.show()
    df2.printSchema()
    //讀取orc

    //讀取
    val df4 =spark.read.parquet("D:\\spark-sql\\users.parquet")
    df4.show()
    df4.printSchema()
  }
}

3.14.3 Spark-sql中的寫

package org.tjcj.sql


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}


object SparkWrite {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("write").appName("write").master("local[*]").getOrCreate()
    spark.sql("set spark.sql.adaptive.enabled=true")
    val rdd:RDD[Row]= spark.sparkContext.parallelize(List(
      Row(1,"張三","大四",18),
      Row(1,"李四","大四",19),
      Row(1,"王五","大四",20),
      Row(1,"趙六","大四",21),
      Row(1,"劉七","大四",22)
    ))
    val schema=StructType(List(
      StructField("id",DataTypes.IntegerType,false),
      StructField("name",DataTypes.StringType,false),
      StructField("grade",DataTypes.StringType,false),
      StructField("age",DataTypes.IntegerType,false)
    ))
    val df = spark.createDataFrame(rdd,schema)
    df.show()
    df.printSchema()
    df.repartition(1).write.json("D:\\spark-sql\\out\\out3")
    import spark.implicits._
    val ds:Dataset[Student]= spark.createDataset(List(
      Student(1,"張三","大四",18),
      Student(1,"李四","大四",19),
      Student(1,"王五","大四",20),
      Student(1,"趙六","大四",21),
      Student(1,"劉七","大四",22)
    ))
    ds.show()
    ds.printSchema()
    ds.repartition(1).write.csv("D:\\spark-sql\\out\\out4")

  }
}
case class Student(id:Int,name:String,grade:String,age:Int)

3.15 圖表可視化

基于大數(shù)據(jù)處理的數(shù)據(jù),基本上都是文件的格式-----需要使用圖表化的工具進行展示

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>電影</title>
<style>
    #main{
        width: 1500px;
        /*自適應(yīng)的高度*/
        height: 100vh;
        margin: 0px auto;
        padding: 0px;
    }
    #one{
        width: 700px;
        height: 50vh;
        float: left;
    }
    #two{
        width: 700px;
        height: 50vh;
        float: right;
    }
    #three{
        width: 700px;
        height: 50vh;
        float: left;
    }
    #four{
        width: 700px;
        height: 50vh;
        float: right;
    }
</style>
</head>
<body>
    <div id="main">
        <div id="one"></div>
        <div id="two"></div>
        <div id="three"></div>
        <div id="four"></div>
    </div>
</body>
<!--引入外部的js文件-->
<script src="../js/jquery.min.js"></script>
<script src="../js/echarts-all.js"></script>
<!--編寫javascript代碼引用json文件-->
<script>
   $(function(){
       $.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
           url:"../data/part-00000-6aee901d-e3d1-4e45-8ced-26e24edadb88.json",
           type:"get",
           dataType:"json",
           success:function(res){
               let x=[]
               let y=[]
               for (let i=0;i<res.length;i++){
                   x.push(res[i].address)
                   y.push(res[i].count)
               }
                //1、初始化echats
               let myEcharts = echarts.init(document.getElementById("one"))
               // 2、設(shè)置option選項值
               let option = {
                   title : {
                       text: '統(tǒng)計各個國家拍電影總數(shù)排行榜',
                       subtext: '豆瓣'
                   },
                   tooltip : {
                       trigger: 'axis'
                   },
                   legend: {
                       data:['電影數(shù)據(jù)']
                   },
                   calculable : true,
                   xAxis : [
                       {
                           type : 'category',
                           boundaryGap : false,
                           data : x
                       }
                   ],
                   yAxis : [
                       {
                           type : 'value',
                           axisLabel : {
                               formatter: '{value} 部'
                           },
                           splitArea : {show : true}
                       }
                   ],
                   series : [
                       {
                           name:'電影數(shù)據(jù)',
                           type:'line',
                           itemStyle: {
                               normal: {
                                   lineStyle: {
                                       shadowColor : 'rgba(0,0,0,0.4)'
                                   }
                               }
                           },
                           data:y
                       }
                   ]
               };

               // 3、加載Option選項
               myEcharts.setOption(option)
           }
       })
       $.ajax({ //導(dǎo)演的拍電影的總數(shù)
           url:"../data/part-00000-256eb3a7-952f-49ee-befe-9cc3952d7d17.json",
           type:"get",
           dataType:"json",
           success:function(res){
               let x=[]
               let y=[]
               for (let i=0;i<res.length;i++){
                   x.push(res[i].direct)
                   y.push(res[i].count)
               }
               //1、初始化echats
               let myEcharts = echarts.init(document.getElementById("two"))
               // 2、設(shè)置option選項值
               let option = {
                   title : {
                       text: '導(dǎo)演的拍電影排行榜',
                       subtext: '豆瓣'
                   },
                   tooltip : {
                       trigger: 'axis'
                   },
                   legend: {
                       data:['電影數(shù)據(jù)']
                   },
                   toolbox: {
                       show : true,
                       feature : {
                           mark : true,
                           dataView : {readOnly: false},
                           magicType:['line', 'bar'],
                           restore : true,
                           saveAsImage : true
                       }
                   },
                   calculable : true,
                   xAxis : [
                       {
                           type : 'category',
                           data : x
                       }
                   ],
                   yAxis : [
                       {
                           type : 'value',
                           splitArea : {show : true}
                       }
                   ],
                   series : [
                       {
                           name:'電影數(shù)據(jù)',
                           type:'bar',
                           data:y
                       }
                   ]
               };


               // 3、加載Option選項
               myEcharts.setOption(option)
           }
       })
       $.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
           url:"../data/part-00000-419034d2-3312-487c-a467-d426debeb2ef.json",
           type:"get",
           dataType:"json",
           success:function(res){
               let x=[]
               let y=[]
               for (let i=0;i<res.length;i++){
                   x.push(res[i].movieName)
                   y.push(res[i].grade)
               }
               //1、初始化echats
               let myEcharts = echarts.init(document.getElementById("three"))
               // 2、設(shè)置option選項值
               let option = {
                   title : {
                       text: '評分排行榜',
                       subtext: '豆瓣'
                   },
                   tooltip : {
                       trigger: 'axis'
                   },
                   legend: {
                       data:['電影數(shù)據(jù)']
                   },
                   calculable : true,
                   xAxis : [
                       {
                           type : 'category',
                           boundaryGap : false,
                           data : x
                       }
                   ],
                   yAxis : [
                       {
                           type : 'value',
                           axisLabel : {
                               formatter: '{value} 分'
                           },
                           splitArea : {show : true}
                       }
                   ],
                   series : [
                       {
                           name:'電影數(shù)據(jù)',
                           type:'line',
                           itemStyle: {
                               normal: {
                                   lineStyle: {
                                       shadowColor : 'rgba(0,0,0,0.4)'
                                   }
                               }
                           },
                           data:y
                       }
                   ]
               };

               // 3、加載Option選項
               myEcharts.setOption(option)
           }
       })
       $.ajax({ //統(tǒng)計各個國家排電影的總數(shù)
           url:"../data/part-00000-d06a0328-f4be-408c-b050-231c1f17a2b9.json",
           type:"get",
           dataType:"json",
           success:function(res){
               let x=[]
               let y=[]
               for (let i=0;i<res.length;i++){
                   x.push(res[i].movieName)
                   y.push({"value":res[i].count,"name":res[i].movieName})
               }
               //1、初始化echats
               let myEcharts = echarts.init(document.getElementById("four"))
               // 2、設(shè)置option選項值
               let option = {
                   title : {
                       text: '上映率',
                       subtext: '豆瓣',
                       x:'center'
                   },
                   tooltip : {
                       trigger: 'item',
                       formatter: "{a} <br/> : {c} (n5n3t3z%)"
                   },
                   legend: {
                       orient : 'vertical',
                       x : 'left',
                       data:x
                   },
                   toolbox: {
                       show : true,
                       feature : {
                           mark : true,
                           dataView : {readOnly: false},
                           restore : true,
                           saveAsImage : true
                       }
                   },
                   calculable : true,
                   series : [
                       {
                           name:'觀影次數(shù)',
                           type:'pie',
                           radius : '55%',
                           center: ['50%', 225],
                           data:y
                       }
                   ]
               };

               // 3、加載Option選項
               myEcharts.setOption(option)
           }
       })
   })
</script>
</html> 

四、項目實戰(zhàn)

4.1 項目文件分析

需求:
讀取兩個csv文件,并且輸出內(nèi)容以及,schema(二維表結(jié)構(gòu))

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 統(tǒng)計電影的文件情況
 */
object MoviesSpark {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df1=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv")
    df1.show(10)
    df1.printSchema()
    val df2=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
    df2.show(10)
    df2.printSchema()
  }
}

4.2 統(tǒng)計一下觀影次數(shù)排名前十的電影名稱和觀影次數(shù)

通過分析:
? 得出結(jié)論,只通過user一個表就可以統(tǒng)計出電影名稱以及其觀影次數(shù)

package org.tjcj.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc

/**
 * 統(tǒng)計觀影次數(shù)最多的5部電影
 */
object MoviesTop5 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
    //方式一:采用spark-sql提供df方法操作
    import spark.implicits._
    df.select($"movieName").groupBy($"movieName").count().orderBy(desc("count")).show(5)
    df.select($"movieName").groupBy($"movieName").count().orderBy($"count".desc).show(5)
    //方式二:利用hive的sql方式
    df.createOrReplaceTempView("user")
    spark.sql("select movieName,count(*) as count from user group by movieName order by count desc").show(5)
  }
}

寫出到文件中:

package org.tjcj.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{array, desc}

/**
 * 統(tǒng)計觀影次數(shù)最多的10部電影
 */
object MoviesTop5 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df=spark.read.option("header",true).csv("D:\\data\\input\\user.csv")
    //方式一:采用spark-sql提供df方法操作
    import spark.implicits._
    df.select($"movieName").groupBy($"movieName").count().orderBy(desc("count")).show(5)
    df.select($"movieName").groupBy($"movieName").count().orderBy($"count".desc).limit(10).repartition(1)
      .write.csv("D:\\spark-sql\\out\\out5")


    //方式二:利用hive的sql方式
    df.createOrReplaceTempView("user")
    spark.sql("select movieName,count(*) as count from user group by movieName order by count desc").limit(10).repartition(1)
      .write.json("D:\\spark-sql\\out\\out6")
  }
}

4.3 統(tǒng)計一下評分排名前10的電影名稱和評分

分析:只需要movie.csv文件

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 統(tǒng)計電影的文件情況
 */
object GradeTop10 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
   //采用sql方式實現(xiàn)
    //創(chuàng)建臨時表
    df.createOrReplaceTempView("movie")
    spark.sql("select distinct(movieName),grade from movie where grade >8 order by grade desc").limit(10)
      .write.json("D:\\spark-sql\\out\\out8")
    //關(guān)閉spark session對象
    spark.stop()
  }
}

4.4 驗證單表效果

統(tǒng)計劇情類型電影中評分最高10部電影

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 統(tǒng)計劇情類型的電影排行榜
 */
object JqTop10 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
   //采用sql方式實現(xiàn)
    //創(chuàng)建臨時表
    df.createOrReplaceTempView("movie")
    spark.sql("select distinct(movieName),grade from movie where grade >8 and type='劇情' order by grade desc").limit(10)
      .write.json("D:\\spark-sql\\out\\out9")
    //關(guān)閉spark session對象
    spark.stop()
  }
}
統(tǒng)計各個國家電影數(shù)量排名前十電影
package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 各個國家數(shù)據(jù)量
 */
object CountTop10 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df=spark.read.option("header",true).csv("D:\\data\\input\\movie.csv")
    df.createOrReplaceTempView("movie")
    spark.sql("select address,count(*) as count from movie group by address order by count desc").limit(10)
      .write.json("D:\\spark-sql\\out\\out10")
    spark.stop()

  }
}

4.5 統(tǒng)計近年來那些導(dǎo)演拍的電影多(TOPN榜)

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 近年來那些導(dǎo)演拍的電影較多
 */
object DirectTop10 {
  def main(args: Array[String]): Unit = {
    //1、獲取SparkSession對象
    val spark = SparkSession.builder().appName("directTop10").master("local[*]").getOrCreate()
    //2、獲取要處理的文件
    val df = spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
    //方式一:采用spark-sql中方言方式
    import spark.implicits._
    df.repartition(1).select("direct").groupBy("direct").count().orderBy($"count".desc).limit(10)
      .write.json("D:\\spark-sql\\out\\out11")
    //方式二:采用hive-sql的方式
    df.createOrReplaceTempView("movie")
    spark.sql("select direct,count(*) as count from movie group by direct order by count desc").limit(10).repartition(1)
      .write.json("D:\\spark-sql\\out\\out12")
  }
}

4.6 統(tǒng)計評分高于9.0,受歡迎的電影以及觀影次數(shù),排行榜

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 統(tǒng)計電影的文件情況
 */
object MoviesGrade {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df1=spark.read.option("header",true).csv("D:\\spark-sql\\movie.csv")
    val df2=spark.read.option("header",true).csv("D:\\spark-sql\\user.csv")
    //創(chuàng)建臨時表
    df1.createOrReplaceTempView("movie")
    df2.createOrReplaceTempView("user")
    spark.sql("select m.movieName,count(*) as count from movie m left join user u on m.movieName = u.movieName " +
      "where m.grade >9.0 group by m.movieName order by count desc").limit(10).repartition(1)
      .write.json("D:\\spark-sql\\out\\out13")
  }
}

運行結(jié)果:
{"movieName":"瘋狂動物城","count":208}
{"movieName":"阿甘正傳","count":190}
{"movieName":"美麗人生","count":126}
{"movieName":"三傻大鬧寶萊塢","count":126}
{"movieName":"竊聽風(fēng)暴","count":120}
{"movieName":"指環(huán)王3:王者無敵","count":120}
{"movieName":"教父","count":120}
{"movieName":"亂世佳人","count":120}
{"movieName":"辛德勒的名單","count":117}
{"movieName":"這個殺手不太冷","count":112}

4.7 統(tǒng)計評分高于9.0,且時間在1月份,電影名稱和電影評分,排行榜

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 統(tǒng)計電影的文件情況
 */
object GradeMovie {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession對象
    val spark =SparkSession.builder().appName("movie").master("local[*]").getOrCreate()
    val df1=spark.read.option("header",true).csv("D:\\spark-sql\\movie.csv").distinct()
    val df2=spark.read.option("header",true).csv("D:\\spark-sql\\user.csv").distinct()
    //創(chuàng)建臨時表
    df1.createOrReplaceTempView("movie")
    df2.createOrReplaceTempView("user")
    spark.sql("select m.movieName,m.grade,count(*) as count from movie m left join user u on m.movieName = u.movieName " +
      "where m.grade >9.0 and substr(u.time,0,7) = '2018-01' group by m.movieName,m.grade order by count desc").limit(10)
        .repartition(1)
      .write.json("D:\\spark-sql\\out\\out14")
  }
}

4.8 總結(jié)和回顧

在Spark中,spark-sql和spark-core之間可以進行相互轉(zhuǎn)換的
RDD和DateFrame,RDD和DataSet是可以進行相互轉(zhuǎn)換的

package org.tjcj.sql


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}


object SparkWrite {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("write").appName("write").master("local[*]").getOrCreate()
    spark.sql("set spark.sql.adaptive.enabled=true")
    val rdd:RDD[Row]= spark.sparkContext.parallelize(List(
      Row(1,"張三","大四",18),
      Row(1,"李四","大四",19),
      Row(1,"王五","大四",20),
      Row(1,"趙六","大四",21),
      Row(1,"劉七","大四",22)
    ))
    val schema=StructType(List(
      StructField("id",DataTypes.IntegerType,false),
      StructField("name",DataTypes.StringType,false),
      StructField("grade",DataTypes.StringType,false),
      StructField("age",DataTypes.IntegerType,false)
    ))
    val df = spark.createDataFrame(rdd,schema)
    df.show()
    df.printSchema()
    df.repartition(1).write.json("D:\\spark-sql\\out\\out3")
    import spark.implicits._
    val ds:Dataset[Student]= spark.createDataset(List(
      Student(1,"張三","大四",18),
      Student(1,"李四","大四",19),
      Student(1,"王五","大四",20),
      Student(1,"趙六","大四",21),
      Student(1,"劉七","大四",22)
    ))
    ds.show()
    ds.printSchema()
    ds.repartition(1).write.csv("D:\\spark-sql\\out\\out4")

  }
}

case class Student(id:Int,name:String,grade:String,age:Int)
樣例類----自動生成set、get方法以及構(gòu)造方法
DataFrame和DataSet區(qū)別不大,DataSet是有格式的
執(zhí)行Sql兩種方式:
利用Spark-sql,獨有方言進行計算

package org.tjcj.sql

import org.apache.spark.sql.SparkSession

/**
 * 近年來那些導(dǎo)演拍的電影較多
 */
object DirectTop10 {
  def main(args: Array[String]): Unit = {
    //1、獲取SparkSession對象
    val spark = SparkSession.builder().appName("directTop10").master("local[*]").getOrCreate()
    //2、獲取要處理的文件
    val df = spark.read.option("header",true).csv("D:\\data\\input\\movie.csv").distinct()
    //方式一:采用spark-sql中方言方式
    import spark.implicits._
    df.repartition(1).select("direct").groupBy("direct").count().orderBy($"count".desc).limit(10)
      .write.json("D:\\spark-sql\\out\\out11")
    //方式二:采用hive-sql的方式
    df.createOrReplaceTempView("movie")
    spark.sql("select direct,count(*) as count from movie group by direct order by count desc").limit(10).repartition(1)
      .write.json("D:\\spark-sql\\out\\out12")
  }
}

方式二:通常比較常用的一種,編寫sql語句直接使用,總體來說,Spark-sql更加便于開發(fā)。文章來源地址http://www.zghlxwxcb.cn/news/detail-787448.html

到了這里,關(guān)于大數(shù)據(jù)實訓(xùn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(下)

    【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(下)

    【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(上) 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(中) 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)

    2024年02月03日
    瀏覽(23)
  • 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(上)

    【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(上)

    【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(上) 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)部署方案(中) 【大數(shù)據(jù)實時數(shù)據(jù)同步】超級詳細的生產(chǎn)環(huán)境OGG(GoldenGate)12.2實時異構(gòu)同步Oracle數(shù)據(jù)

    2024年02月03日
    瀏覽(29)
  • 大數(shù)據(jù)實訓(xùn)

    大數(shù)據(jù)實訓(xùn)

    1、Hadoop集群框架搭建(學(xué)過,但是沒有現(xiàn)成的) 2、python(機器學(xué)習(xí)) 3、Spark(沒有) 4、Flume(沒有) 5、Sqoop(沒有接觸) 6、編程語言: SpringBoot(有)+echarts(數(shù)據(jù)可視化框架) 1.1?百度百科:大數(shù)據(jù),短期無法運用常規(guī)一些手段去及時處理海量數(shù)據(jù),需要使用新型的技術(shù)

    2024年02月02日
    瀏覽(27)
  • 泰迪大數(shù)據(jù)實訓(xùn)平臺產(chǎn)品介紹

    泰迪大數(shù)據(jù)實訓(xùn)平臺產(chǎn)品介紹

    ? ? ? 大數(shù)據(jù)產(chǎn)品包括: 大數(shù)據(jù)實訓(xùn)管理平臺、大數(shù)據(jù)開發(fā)實訓(xùn)平臺、大數(shù)據(jù)編程實訓(xùn)平臺 等 ? ? ?大數(shù)據(jù)實訓(xùn)管理平臺 ? ? ?泰迪大數(shù)據(jù)實訓(xùn)平臺從課程管理、資源管理、實訓(xùn)管理等方面出發(fā),主要解決現(xiàn)有實驗室無法滿足教學(xué)需求、傳統(tǒng)教學(xué)流程和工具低效耗時和內(nèi)部

    2024年02月11日
    瀏覽(26)
  • 大數(shù)據(jù)實驗三-HBase編程實踐

    大數(shù)據(jù)實驗三-HBase編程實踐

    目錄 一.實驗內(nèi)容 二.實驗?zāi)康?三.實驗過程截圖及說明 1、安裝HBase 2、配置偽分布式模式: 3、使用hbase的shell命令來操作表: 4、使用hbase提供的javaAPI來編程實現(xiàn)類似操作: 5、實驗總結(jié)及心得體會 6、完整報告在文章開頭,掛載。 HBase編程實踐: 1)在Hadoop基礎(chǔ)上安裝H

    2024年04月12日
    瀏覽(34)
  • 【大數(shù)據(jù)實驗五】 MapReduce初級編程實踐

    【大數(shù)據(jù)實驗五】 MapReduce初級編程實踐

    1實驗?zāi)康?1.通過實驗掌握基本的MapReduce編程方法; 2.掌握用MapReduce解決一些常見的數(shù)據(jù)處理問題,包括數(shù)據(jù)去重、數(shù)據(jù)排序和數(shù)據(jù)挖掘等。 2實驗平臺 已經(jīng)配置完成的Hadoop偽分布式環(huán)境。 (1)操作系統(tǒng):Linux(Ubuntu18.04) (2)Hadoop版本:3.1.3 3實驗內(nèi)容和要求 1.編程實現(xiàn)文件

    2024年02月03日
    瀏覽(156)
  • 【大數(shù)據(jù)實訓(xùn)】—Hadoop開發(fā)環(huán)境搭建(一)

    【大數(shù)據(jù)實訓(xùn)】—Hadoop開發(fā)環(huán)境搭建(一)

    本關(guān)任務(wù):配置JavaJDK。 相關(guān)知識 配置開發(fā)環(huán)境是我們學(xué)習(xí)一門IT技術(shù)的第一步,Hadoop是基于Java開發(fā)的,所以我們學(xué)習(xí)Hadoop之前需要在Linux系統(tǒng)中配置Java的開發(fā)環(huán)境。 下載JDK 前往Oracle的官網(wǎng)下載JDK:點我前往Oracle的官網(wǎng)下載JDK 我們可以先下載到本地,然后從Windows中將文件傳

    2024年02月06日
    瀏覽(30)
  • 大數(shù)據(jù)實驗 實驗六:Spark初級編程實踐

    大數(shù)據(jù)實驗 實驗六:Spark初級編程實踐

    實驗環(huán)境:Windows 10 Oracle VM VirtualBox 虛擬機:cnetos 7 Hadoop 3.3 因為Hadoop版本為3.3所以在官網(wǎng)選擇支持3.3的spark安裝包 解壓安裝包到指定文件夾 配置spark-env.sh 啟動成功 (1) 在spark-shell中讀取Linux系統(tǒng)本地文件“/home/hadoop/test.txt”,然后統(tǒng)計出文件的行數(shù); (2) 在spark-shell中讀

    2024年02月04日
    瀏覽(174)
  • 大數(shù)據(jù)實驗 實驗二:熟悉HDFS常用操作

    大數(shù)據(jù)實驗 實驗二:熟悉HDFS常用操作

    附件中有word版本的實驗報告 理解HDFS在Hadoop體系結(jié)構(gòu)中的角色。 熟練使用HDFS操作常用的Shell命令。 熟悉HDFS操作常用的Java API。 Oracle VM VirtualBox虛擬機 系統(tǒng)版本centos7 JDK1.8版本 Hadoop-3.1.3 Windows11 Java IDE:IDEA 1.向HDFS中上傳任意文本文件,如果指定的文件在HDFS中已經(jīng)存在,由用戶

    2024年04月12日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包