文章目錄
前言
一、讀題分析
二、使用步驟
1.導(dǎo)入配置文件到pom.xml
2.代碼部分
三、重難點(diǎn)分析
總結(jié)
前言
本題來源于全國職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題-離線數(shù)據(jù)處理-數(shù)據(jù)抽取(其他暫不透露)
題目:編寫Scala代碼,使用Spark將MySQL的shtd_industry庫中表EnvironmentData,ChangeRecord,BaseMachine,MachineData,ProduceRecord全量抽取到Hive的ods庫(需自建)中對(duì)應(yīng)表environmentdata,changerecord,basemachine, machinedata, producerecord中。
以下面題目為例:
抽取MySQL的shtd_industry庫中EnvironmentData表的全量數(shù)據(jù)進(jìn)入Hive的ods庫中表environmentdata,字段排序、類型不變,同時(shí)添加靜態(tài)分區(qū),分區(qū)字段類型為String,且值為當(dāng)前日期的前一天日期(分區(qū)字段格式為yyyyMMdd)。并在hive cli執(zhí)行show partitions ods.environmentdata命令,將結(jié)果截圖粘貼至對(duì)應(yīng)報(bào)告中;
提示:以下是本篇文章正文內(nèi)容,下面案例可供參考(使用Scala語言編寫)
一、讀題分析
涉及組件:Spark,Mysql,Hive
涉及知識(shí)點(diǎn):
- Spark讀取數(shù)據(jù)庫數(shù)據(jù)
- DataFrameAPI的使用(重點(diǎn))
- Spark寫入數(shù)據(jù)庫數(shù)據(jù)
- Hive數(shù)據(jù)庫的基本操作
二、使用步驟
1.導(dǎo)入配置文件到pom.xml
<!--SparkSQL配置-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--spark連接hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql配置-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
2.代碼部分
由于不是很難,直接上代碼,代碼如下(示例):
package A.offlineDataProcessing.shtd_industry.task1_dataExtraction
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object SparkToMysqlToHive {
def main(args: Array[String]): Unit = {
// 創(chuàng)建Spark對(duì)象會(huì)話
val spark = SparkSession.builder()
.appName("MySQL to Hive")
.master("spark://bigdata1:7077")
.enableHiveSupport().getOrCreate()
// 連接MySQL數(shù)據(jù)庫并設(shè)置屬性
val jdbcUrl = "jdbc:mysql://bigdata1:3306/shtd_industry"
val table = "EnvironmentData"
val properties = new Properties
properties.put("user", "root")
properties.put("password", "123456")
// Read data from MySQL
val df: DataFrame = spark.read.jdbc(jdbcUrl, table, properties)
println("-------------------自定義操作-------------------------")
// Add partition column
val dateFormat = new SimpleDateFormat("yyyyMMdd")
// 第一個(gè)getTime返回的是一個(gè) Date 對(duì)象
// 第二個(gè) getTime 方法返回的是一個(gè)整數(shù)值,表示此 Date 對(duì)象表示的時(shí)間距離標(biāo)準(zhǔn)基準(zhǔn)時(shí)間(1970年1月1日00:00:00 GMT)的毫秒數(shù)。
val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
//對(duì)MySQL來的數(shù)據(jù)進(jìn)行withCoulum操作,有就修改,沒有就添加
val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))
println("-------------------寫入數(shù)據(jù)-------------------------")
// Write data to Hive
// mode模式為覆蓋,還有append為追加
// partitionBy 根據(jù)指定列進(jìn)行分區(qū)
// saveAsTable保存表
dfWithPartition.write.mode("overwrite")
.partitionBy("etldate")
.saveAsTable("ods.environmentdata")
}
}
hive數(shù)據(jù)庫相關(guān)的操作在這不做演示
三、重難點(diǎn)分析
沒有難點(diǎn),主要涉及能否自定義函數(shù)完成任務(wù)需求
val dateFormat = new SimpleDateFormat("yyyyMMdd")
// 第一個(gè)getTime返回的是一個(gè) Date 對(duì)象
// 第二個(gè) getTime 方法返回的是一個(gè)整數(shù)值,表示此 Date 對(duì)象表示的時(shí)間距離標(biāo)準(zhǔn)基準(zhǔn)時(shí)間(1970年1月1日00:00:00 GMT)的毫秒數(shù)。
val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
//對(duì)MySQL來的數(shù)據(jù)進(jìn)行withCoulum操作,有就修改,沒有就添加
val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))
總結(jié)
本文僅僅介紹了Spark讀取MySQL的數(shù)據(jù)到hive數(shù)據(jù)庫的操作,spark提供了許多方法,我們不必寫SQL語法就可以直接對(duì)數(shù)據(jù)進(jìn)行操作,還是很方便的,并且難度也不高(比flink簡單)。文章來源:http://www.zghlxwxcb.cn/news/detail-672558.html
如轉(zhuǎn)載請標(biāo)明出處文章來源地址http://www.zghlxwxcb.cn/news/detail-672558.html
到了這里,關(guān)于大數(shù)據(jù)之使用Spark全量抽取MySQL的數(shù)據(jù)到Hive數(shù)據(jù)庫的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!