目錄
一、數(shù)據(jù)準(zhǔn)備
1)Department
?2)School
3)Student
4)Teacher
5)實(shí)例化對(duì)象
結(jié)構(gòu)如下:
二、加載數(shù)據(jù)
數(shù)據(jù)展示?
三、日志數(shù)據(jù)加載
輸出結(jié)果?
一、數(shù)據(jù)準(zhǔn)備
1)Department
package org.example.jsonre;
public class Department {
private String name;
private String describe;
@Override
public String toString() {
return "Department{" +
"name='" + name + '\'' +
", describe='" + describe + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public Department() {
}
public Department(String name, String describe) {
this.name = name;
this.describe = describe;
}
}
?2)School
package org.example.jsonre;
public class School {
private String name;
private String leader;
private String address;
public School() {
}
public School(String name, String leader, String address) {
this.name = name;
this.leader = leader;
this.address = address;
}
@Override
public String toString() {
return "School{" +
"name='" + name + '\'' +
", leader='" + leader + '\'' +
", address='" + address + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLeader() {
return leader;
}
public void setLeader(String leader) {
this.leader = leader;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
3)Student
package org.example.jsonre;
import com.alibaba.fastjson.JSON;
import java.io.*;
import java.util.ArrayList;
public class Student {
private Integer id;
private String name;
private String grade;
private School school;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGrade() {
return grade;
}
public void setGrade(String grade) {
this.grade = grade;
}
public School getSchool() {
return school;
}
public void setSchool(School school) {
this.school = school;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", grade='" + grade + '\'' +
", school=" + school +
'}';
}
public Student() {
}
public Student(Integer id, String name, String grade, School school) {
this.id = id;
this.name = name;
this.grade = grade;
this.school = school;
}
}
4)Teacher
package org.example.jsonre;
import java.util.ArrayList;
public class Teacher {
private Integer id;
private String name;
private String tel;
private String email;
private Department dept;
@Override
public String toString() {
return "Teacher{" +
"id=" + id +
", name='" + name + '\'' +
", tel='" + tel + '\'' +
", email='" + email + '\'' +
", dept=" + dept +
", stu=" + stu +
'}';
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public Department getDept() {
return dept;
}
public void setDept(Department dept) {
this.dept = dept;
}
public ArrayList<Student> getStu() {
return stu;
}
public void setStu(ArrayList<Student> stu) {
this.stu = stu;
}
public Teacher() {
}
public Teacher(Integer id, String name, String tel, String email, Department dept, ArrayList<Student> stu) {
this.id = id;
this.name = name;
this.tel = tel;
this.email = email;
this.dept = dept;
this.stu = stu;
}
private ArrayList<Student> stu;
}
5)實(shí)例化對(duì)象
public class Test {
public static void main(String[] args) {
// 數(shù)據(jù)裝載
Teacher teacher = new Teacher();
School school = new School();
school.setAddress("安德門");
school.setLeader("王德發(fā)");
school.setName("南京某大學(xué)");
Department department = new Department();
department.setName("學(xué)術(shù)部");
department.setDescribe("主要負(fù)責(zé)教學(xué)研究");
Student stu1 = new Student();
stu1.setId(1);
stu1.setName("張三");
stu1.setGrade("一年級(jí)");
stu1.setSchool(school);
Student stu2 = new Student();
stu2.setId(2);
stu2.setName("李四");
stu2.setGrade("一年級(jí)");
School school3 = new School();
School school2 = new School();
school2.setAddress("安德門");
school2.setLeader("王德發(fā)");
school2.setName("南京某大學(xué)");
stu2.setSchool(school2);
Student stu3 = new Student();
stu3.setId(3);
stu3.setName("趙六");
stu3.setGrade("二年級(jí)");
school3.setAddress("安德門");
school3.setLeader("王德發(fā)");
school3.setName("南京某大學(xué)");
stu3.setSchool(school3);
ArrayList<Student> students = new ArrayList<>();
students.add(stu1);
students.add(stu2);
students.add(stu3);
teacher.setDept(department);
teacher.setStu(students);
teacher.setEmail("fivedessert@gmail.cn");
teacher.setName("five小點(diǎn)心");
teacher.setTel("1231231234");
String jsonString = JSON.toJSONString(teacher);
System.out.println(jsonString);
FileWriter fr = null;
try {
fr = new FileWriter("out/jsonTest.txt");
fr.write(jsonString);
fr.flush();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
fr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
{"dept":{"describe":"主要負(fù)責(zé)教學(xué)研究","name":"學(xué)術(shù)部"},"email":"fivedessert@gmail.cn","name":"five小點(diǎn)心","stu":[{"grade":"一年級(jí)","id":1,"name":"張三","school":{"address":"安德門","leader":"王德發(fā)","name":"南京某大學(xué)"}},{"grade":"一年級(jí)","id":2,"name":"李四","school":{"address":"安德門","leader":"王德發(fā)","name":"南京某大學(xué)"}},{"grade":"二年級(jí)","id":3,"name":"趙六","school":{"address":"安德門","leader":"王德發(fā)","name":"南京某大學(xué)"}}],"tel":"1231231234"}
結(jié)構(gòu)如下:
二、加載數(shù)據(jù)
sc.textFile讀取數(shù)據(jù)源,并對(duì)結(jié)構(gòu)化數(shù)據(jù)進(jìn)行拆分
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object JsonTest {
def main(args: Array[String]): Unit = {
// 定義sc配置
val conf = new SparkConf().setMaster("local").setAppName("JsonTest")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
// 導(dǎo)入包
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
// jsonobj1:讀取數(shù)據(jù),輸出rdd數(shù)據(jù)
val jsonobj1 = sc.textFile("out/jsonTest.txt")
jsonobj1.collect().foreach(println)
// jsonobj2:將rdd數(shù)據(jù)轉(zhuǎn)換成DF類型數(shù)據(jù)
val jsonobj2 = jsonobj1.toDF("teacher")
jsonobj2.printSchema()
jsonobj2.show(false)
// jsonobj3:將json類型數(shù)據(jù)按照需求進(jìn)行切分
val jsonobj3 = jsonobj2.select(
get_json_object($"teacher","$.dept").as("dept"),
get_json_object($"teacher","$.email").as("email"),
get_json_object($"teacher","$.name").as("name"),
get_json_object($"teacher","$.stu").as("stu"),
get_json_object($"teacher","$.tel").as("tel")
)
jsonobj3.printSchema()
jsonobj3.show(false)
// jsonobj4:與obj3類似,將dept數(shù)據(jù)進(jìn)行進(jìn)一步拆分
val jsonobj4 = jsonobj3.select(
get_json_object($"dept", "$.name").as("dept_name"),
get_json_object($"dept", "$.describe").as("describe"),
$"email", $"stu", $"tel"
)
jsonobj4.printSchema()
jsonobj4.show(false)
// jsonobj5:將stu的數(shù)據(jù)進(jìn)行結(jié)構(gòu)化處理,需要定義數(shù)據(jù)類型
val jsonobj5 = jsonobj4.select($"dept_name",$"describe",$"email",$"tel"
,from_json($"stu",ArrayType(StructType(
StructField("grade", StringType) ::
StructField("id", LongType) ::
StructField("name", StringType) ::
StructField("school", StringType) ::Nil
))).as("stu"))
jsonobj5.printSchema()
jsonobj5.show(false)
// jsonobj6:將stu數(shù)據(jù)分成多行
val jsonobj6 = jsonobj5.withColumn("stu",explode($"stu"))
jsonobj6.printSchema()
jsonobj6.show(false)
// jsonobj7:obj6的數(shù)據(jù)繼續(xù)拆分
// 由于是結(jié)構(gòu)化數(shù)據(jù),這里調(diào)用的方式與json略有不同
val jsonobj7 = jsonobj6.select($"dept_name",$"describe",$"email",$"tel",
$"stu.grade".as("stu_grade"),
$"stu.id".as("stu_id"),
$"stu.name".as("stu_name"),
$"stu.school".as("stu_school")
)
jsonobj7.printSchema()
jsonobj7.show(false)
// jsonobj8:將剩余數(shù)據(jù)進(jìn)行拆分
val jsonobj8 = jsonobj7.select($"dept_name",$"describe",$"email",$"tel",$"stu_grade",$"stu_id",$"stu_name",
get_json_object($"stu_school","$.address").as("sch_address"),
get_json_object($"stu_school","$.leader").as("sch_leader"),
get_json_object($"stu_school","$.name").as("sch_name")
)
jsonobj8.printSchema()
jsonobj8.show()
// 關(guān)閉
spark.close()
sc.stop()
}
}
數(shù)據(jù)展示?
?
?
?
?
?
三、日志數(shù)據(jù)加載
同樣的,除了json格式字符串,我們也可以用類似的方法來加載日志數(shù)據(jù)。
log日志數(shù)據(jù)文件(測(cè)試用-test)-其它文檔類資源-CSDN文庫(kù)文章來源:http://www.zghlxwxcb.cn/news/detail-756975.html
package org.example.etl.util.test
import com.mysql.cj.util.SaslPrep
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions, types}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object etldemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("etl").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val rdd1 = sc.textFile("in/test.log")
// rdd1.collect().foreach(println)
// 加載日志數(shù)據(jù),按照\(chéng)t分隔,過濾出長(zhǎng)度為8的數(shù)據(jù)
// 將數(shù)據(jù)封裝到Row對(duì)象,創(chuàng)建DF
val rdd2 = rdd1.map(x => x.split("\t"))
.filter(x => x.length == 8)
.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
// rdd2.collect().foreach(println)
val schema = StructType(
Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val logdf1 = spark.createDataFrame(rdd2,schema)
logdf1.printSchema()
logdf1.show(5)
// 過濾數(shù)據(jù):去除重復(fù)項(xiàng) + status=200 + event_time不為空
val logdf2 = logdf1.dropDuplicates("event_time", "url")
.filter($"status" === 200)
.filter($"event_time".isNotNull)
logdf2.printSchema()
logdf2.show(5,false)
val logrdd = logdf2.map(line => {
val str = line.getAs[String]("url")
val strArray = str.split("\\?")
var strMap: Map[String, String] = null;
if (strArray.length == 2) {
val tuples: Array[(String, String)] = strArray(1).split("&")
.map(x => x.split("="))
.filter(x => x.length == 2)
.map(x => (x(0), x(1)))
strMap = tuples.toMap
}
// 返回值,這里getAs后必須跟類型值
(
line.getAs[String]("event_time"),
strMap.getOrElse("userUID", ""),
strMap.getOrElse("userSID", ""),
strMap.getOrElse("actionBegin", ""),
strMap.getOrElse("actionEnd", ""),
strMap.getOrElse("actionType", ""),
strMap.getOrElse("actionName", ""),
strMap.getOrElse("actionValue", ""),
strMap.getOrElse("actionTest", ""),
strMap.getOrElse("ifEquipment", ""),
line.getAs[String]("method"),
line.getAs[String]("status"),
line.getAs[String]("sip"),
line.getAs[String]("user_uip"),
line.getAs[String]("action_prepend"),
line.getAs[String]("action_client"),
)
}
).toDF() // 這時(shí)候輸出的數(shù)據(jù)類型為元組,需要對(duì)其列進(jìn)行指定
.rdd
val schema2 = new StructType(Array(
StructField("event_time",StringType),
StructField("userUID",StringType),
StructField("userSID",StringType),
StructField("actionBegin",StringType),
StructField("actionEnd",StringType),
StructField("actionType",StringType),
StructField("actionName",StringType),
StructField("actionValue",StringType),
StructField("actionTest",StringType),
StructField("ifEquipment",StringType),
StructField("method",StringType),
StructField("status",StringType),
StructField("sip",StringType),
StructField("user_uip",StringType),
StructField("action_prepend",StringType),
StructField("action_client",StringType)
))
val logdf3 = spark.createDataFrame(logrdd, schema2)
logdf3.printSchema()
logdf3.show(3,false)
spark.close()
sc.stop()
}
}
輸出結(jié)果?
文章來源地址http://www.zghlxwxcb.cn/news/detail-756975.html
到了這里,關(guān)于大數(shù)據(jù)技術(shù)之Spark SQL——解析JSON字符串的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!