實驗SparkSQL編程初級實踐
實踐環(huán)境:
-
Oracle VM VirtualBox 6.1.12
-
Ubuntu 16.04
-
Hadoop3.1.3
-
JDK1.8.0_162
-
spark2.4.0
-
python3.5
-
Windows11系統(tǒng)下pycharm2019.1專業(yè)版
實驗?zāi)康模?/strong>
-
通過實驗掌握Spark SQL的基本編程方法;
-
熟悉RDD到DataFrame的轉(zhuǎn)化方法;
-
熟悉利用Spark SQL管理來自不同數(shù)據(jù)源的數(shù)據(jù)。
實驗內(nèi)容,步驟與實驗結(jié)果:
- Spark SQL 基本操作
將下列JSON格式數(shù)據(jù)復(fù)制到Linux系統(tǒng)中,并保存命名為employee.json。
{ “id”:1 , “name”:" Ella" , “age”:36 } { “id”:2, “name”:“Bob”,“age”:29 } { “id”:3 , “name”:“Jack”,“age”:29 } { “id”:4 , “name”:“Jim”,“age”:28 } { “id”:4 , “name”:“Jim”,“age”:28 } { “id”:5 , “name”:“Damon” } { “id”:5 , “name”:“Damon” } |
---|
為employee.json創(chuàng)建DataFrame,并寫出Python語句完成下列操作:
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
#import SparkSession
from pyspark.sql import SparkSession
#create spar session object
spark=SparkSession.builder.appName(‘data_processing’).getOrCreate()
# Load csv Dataset
df=spark.read.json(“employee.json”)
1 .查詢所有數(shù)據(jù);
df.show()
2.查詢所有數(shù)據(jù),并去除重復(fù)的數(shù)據(jù);
df.distinct().show()
3.查詢所有數(shù)據(jù),打印時去除id字段;
df.drop(“id”).show()
4.篩選出age>30的記錄;
df.filter(“age”>30).show()
5.將數(shù)據(jù)按age分組;
df.groupBy(“age”).count().show()
6.將數(shù)據(jù)按name升序排列;
df.sort(df.name.asc()).show()
7.取出前3行數(shù)據(jù);
df.show(3)
8. 查詢所有記錄的name列,并為其取別名為username;
df.distinct().show()
9.查詢年齡age的平均值;
df.agg({“age”:“mean”}).show()
10. 查詢年齡age的最小值。
df.agg({“age”:“min”}).show()
2.編程實現(xiàn)將RDD轉(zhuǎn)換為DataFrame
源文件內(nèi)容如下(包含id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29 |
---|
請將數(shù)據(jù)復(fù)制保存到Linux系統(tǒng)中,命名為employee.txt,實現(xiàn)從RDD轉(zhuǎn)換得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有數(shù)據(jù)。請寫出程序代碼。
#反射機制 – 針對數(shù)據(jù)項已知
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
# 導(dǎo)入Spark相關(guān)包
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
# 構(gòu)建 spark 單元
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 構(gòu)建表頭
schemaString = “id name age”
fields = [StructField(field_name ,StringType(),True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
# 加載數(shù)據(jù)
filename = “employee.txt”
people= spark.sparkContext.textFile(filename)
# print(people.collect())
# 數(shù)據(jù)預(yù)處理
people_data = people.map(lambda x : x.split(“,”))
# print(people_data.collect())
# 處理為 ROW 對象模式
people_rows = people_data.map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2])))
# 構(gòu)建 DataFrame
schemapeople = spark.createDataFrame(people_rows,schema)
# 構(gòu)建臨時表
schemapeople.createOrReplaceTempView(“employee”)
# SQL 查詢
DF_people = spark.sql(“select * from employee”)
# DF – RDD
people_rdd = DF_people.rdd.map(lambda p : “id:” + p.id + “,” + “name:” + p.name + “,” + “Age:” + str(p.age))
# print(people_rdd.collect())
# print(people_rdd.collect())
for i in people_rdd.collect():
print(i)
3. 編程實現(xiàn)利用DataFrame讀寫MySQL的數(shù)據(jù)
(1)在MySQL數(shù)據(jù)庫中新建數(shù)據(jù)庫sparktest,再創(chuàng)建表employee,包含如表5-2所示的兩行數(shù)據(jù)。
表5-2 employee表原有數(shù)據(jù)
id | name | gender | Age |
---|---|---|---|
1 | Alice | F | 22 |
2 | John | M | 25 |
mysql> create database sparktest;
mysql> show databases;
mysql> use sparktest;
mysql> create table employee(id int(4),name char(20),gender char(4),age int(4));
mysql> insert into employee values(1,“Alice”,“F”,22);
mysql> insert into employee values(2,“John”,“M”,25);
mysql> select * from employee;
(2)配置Spark通過JDBC連接數(shù)據(jù)庫MySQL,編程實現(xiàn)利用DataFrame插入如表5-3所示的兩行數(shù)據(jù)到MySQL中,最后打印出age的最大值和age的總和。
表5-3 employee表新增數(shù)據(jù)
id | name | gender | age |
---|---|---|---|
3 | Mary | F | 26 |
4 | Tom | M | 23 |
關(guān)于數(shù)據(jù)庫的相關(guān)參數(shù) driver – com.mysql.jdbc.Driver 數(shù)據(jù)庫的JDBC驅(qū)動 url – 數(shù)據(jù)庫的連接地址 jdbc:mysql://localhost:3306/spark dbtable – 訪問的數(shù)據(jù)表 student user – 數(shù)據(jù)庫的用戶名 test password – 123456 數(shù)據(jù)庫的用戶密碼
查看數(shù)據(jù)庫內(nèi)容并插入數(shù)據(jù)代碼如下:
# -*- coding:utf-8 -*-
#反射機制 – 針對數(shù)據(jù)項已知
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
SUBMIT_ARGS = “–packages mysql:mysql-connector-java:5.1.40 pyspark-shell”
os.environ[“PYSPARK_SUBMIT_ARGS”] = SUBMIT_ARGS
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
jdbcStr = “jdbc:mysql://localhost:3306/sparktest”
# 構(gòu)建 spark 單元
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
# 讀取數(shù)據(jù)庫中的數(shù)據(jù)
jdbcDF=spark.read.format(“jdbc”).option(“url”,jdbcStr).option(“driver”,“com.mysql.jdbc.Driver”).option(“dbtable”,“employee”).option(“user”, “root”).option(“password”, “123456”).load()
# 數(shù)據(jù)查看
jdbcDF.show()
# 下面設(shè)置模式信息
schema = StructType([StructField(“id”, IntegerType(), True),StructField(“name”, StringType(), True),StructField(“gender”, StringType(), True),StructField(“age”, IntegerType(), True)])
# 下面設(shè)置兩條數(shù)據(jù),表示兩個學(xué)生的信息
studentRDD = spark.sparkContext.parallelize([“3 Mary F 26”,“4 Tom M 23”]).map(lambda x:x.split(“\t”))
# 下面創(chuàng)建Row對象,每個Row對象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
# 建立起Row對象和模式之間的對應(yīng)關(guān)系,也就是把數(shù)據(jù)和模式對應(yīng)起來
studentDF = spark.createDataFrame(rowRDD, schema)
studentDF.show()
# 寫入數(shù)據(jù)庫
prop = {}
prop[‘user’] = ‘root’
prop[‘password’] = ‘123456’
prop[‘driver’] = “com.mysql.jdbc.Driver”
studentDF.write.jdbc(jdbcStr,‘employee’,‘a(chǎn)ppend’, prop)
# 讀取數(shù)據(jù)庫中的數(shù)據(jù)
jdbcDF=spark.read.format(“jdbc”).option(“url”,jdbcStr).option(“driver”,“com.mysql.jdbc.Driver”).option(“dbtable”,“employee”).option(“user”, “root”).option(“password”, “123456”).load()
# 數(shù)據(jù)查看
jdbcDF.show()
print(type(jdbcDF))
#查詢年齡age的最大值
jdbcDF.agg({“age”:“max”}).show()
#查詢年齡age的總和值
jdbcDF.agg({“age”:“sum”}).show()
出現(xiàn)的問題與解決方案:
問題一:spark連接mysql時報錯,找不到JDBC。
問題原因:可能是實驗前沒有將jdbc放入spark的jars里面也可能是代碼差點什么。文章來源:http://www.zghlxwxcb.cn/news/detail-439741.html
解決方法:
將適合的jdbc放入spark的jars里面,并將jdbc路徑添加在spark-env.sh中。
以上步驟操作后重啟后并沒有解決。
復(fù)制spark-defaults.conf.template文件,修改spark-defaults.conf文件內(nèi)容。
再次重啟嘗試依舊報錯。在代碼中加入如下兩行:
運行成功?。。。ㄎ乙膊恢朗遣皇侵皇且驗檫@兩行而執(zhí)行成功的,但總歸解決了)文章來源地址http://www.zghlxwxcb.cn/news/detail-439741.html
到了這里,關(guān)于實驗SparkSQL編程初級實踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!