項(xiàng)目聲明和依賴(lài)
ECommerceRecommendSystem [pom.xml]
- 公用的聲明、依賴(lài)、插件
properties 聲明
- log4g:處理日志的框架(日志的具體實(shí)現(xiàn))
- sel4g:簡(jiǎn)單日志門(mén)面(簡(jiǎn)單日志的接口)
- mongodb-spark:MongoDB和Spark的接口
- casbah:MongoDB在scala上的Driver(最新的有MongoScalaDriver)
- redis、kafka、spark、scala
- jblas:java線性代數(shù)庫(kù)(矩陣運(yùn)算)
dependences 依賴(lài)
- dependencies:聲明+引入
- dependencyManagement:聲明,不引入
<dependencies>
<!-- 引入共同的日志管理工具 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
plugin 插件
-
plugin:聲明+引入
-
pluginManagement:聲明,不引入
-
scala引用可能有問(wèn)題(recommender)
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-539559.html
-
注意版本號(hào)需要文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-539559.html
數(shù)據(jù)加載模塊
- Object:?jiǎn)卫?xiàng)目
package com.aguigu
import com.mongodb.MongoClientURI
import com.mongodb.casbah.Imports.MongoClientURI
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/** 樣例類(lèi)
* Product數(shù)據(jù)集
* 3982 商品ID
* Fuhlen 富勒 M8眩光舞者時(shí)尚節(jié)能 商品名稱(chēng)
* 1057,439,736 商品分類(lèi)ID,不需要
* B009EJN4T2 亞馬遜ID,不需要
* https://images-cn-4.ssl-image 商品的圖片URL
* 外設(shè)產(chǎn)品|鼠標(biāo)|電腦/辦公 商品分類(lèi)
* 富勒|鼠標(biāo)|電子產(chǎn)品|好用|外觀漂亮 商品UGC標(biāo)簽
*/
case class Product( productId:Int, name:String, URL:String, categories:String, tags:String )
/**
* Rating數(shù)據(jù)集
* 4867 用戶ID
* 457976 商品ID
* 5.0 評(píng)分
* 1395676800 時(shí)間戳
*/
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )
/**
* MongoDB連接配置
* @param uri MongoDB的連接uri
* @param db 要操作的db
*/
case class MongoConfig( uri: String, db: String )
object DataLoader {
// 常量
val PRODUCT_DATA_PATH = "/Users/liuhao/MyProject/ECommerceRecommendSystem/ECommerceRecommendSystem/recommender/DataLoader/src/main/resources/products.csv"
val RATING_DATA_PATH = "/Users/liuhao/MyProject/ECommerceRecommendSystem/ECommerceRecommendSystem/recommender/DataLoader/src/main/resources/ratings.csv"
// 定義mongodb中存儲(chǔ)的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
def main(args: Array[String]): Unit = {
/**
* 配置項(xiàng)
*/
val config = Map(
"spark.cores" -> "local[*]", // 所有邏輯核占用
"mongo.uri" -> "mongodb://localhost:27017/recommender", // MongoDB數(shù)據(jù)庫(kù)連接
"mongo.db" -> "recommender"
)
/**
* 創(chuàng)建Spark相關(guān):sparkConf, sparkSession
*/
// 創(chuàng)建 spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
// 創(chuàng)建 spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
/**
* 加載數(shù)據(jù)
*/
import spark.implicits._
val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
// RDD => DataFrame
val productDF = productRDD.map( item => {
// product數(shù)據(jù)通過(guò)'^'分割
val attr = item.split("\\^")
// 轉(zhuǎn)換成product類(lèi)
Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )
}).toDF()
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map(item => {
val attr = item.split(",")
Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}).toDF()
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db")) // 隱式配置
storeDataInMongoDB( productDF, ratingDF )
spark.stop()
}
def storeDataInMongoDB( productDF:DataFrame, ratingDF:DataFrame)(implicit mongoConfig: MongoConfig): Unit ={
// 新建mongodb連接(casbah),客戶端
val mongoClinet = MongoClient(MongoClientURI(mongoConfig.uri) )
// 定義要操作的mongodb表,理解:db.product
val productCollection = mongoClinet( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
val ratingCollection = mongoClinet( mongoConfig.db )( MONGODB_RATING_COLLECTION )
// 方式1:如果表已存在,則刪除
productCollection.dropCollection()
ratingCollection.dropCollection()
// 方式2:將當(dāng)前數(shù)據(jù)存入對(duì)應(yīng)表
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 對(duì)表創(chuàng)建索引
productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )
mongoClinet.close()
}
}
到了這里,關(guān)于第3章 創(chuàng)建項(xiàng)目并初始化業(yè)務(wù)數(shù)據(jù)(過(guò)程記錄)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!