實(shí)驗(yàn)原理
Spark的核心就是RDD,所有在RDD上的操作會(huì)被運(yùn)行在Cluster上,Driver程序啟動(dòng)很多Workers,Workers在(分布式)文件系統(tǒng)中讀取數(shù)據(jù)后轉(zhuǎn)化為RDD(彈性分布式數(shù)據(jù)集),然后對RDD在內(nèi)存中進(jìn)行緩存和計(jì)算。
而RDD有兩種類型的操作 ,分別是Action(返回values)和Transformations(返回一個(gè)新的RDD)。
一、數(shù)據(jù)展示與前置準(zhǔn)備
某電商網(wǎng)站記錄了大量用戶對商品的收藏?cái)?shù)據(jù),并將數(shù)據(jù)存儲(chǔ)在名為buyer_favorite1的文件中,數(shù)據(jù)格式以及數(shù)據(jù)內(nèi)容如下
在進(jìn)行后續(xù)操作前,請先開啟hadoop和spark服務(wù)??梢酝ㄟ^jps命令查看進(jìn)程是否開啟完整。
二、創(chuàng)建scala工程項(xiàng)目
1、開發(fā)環(huán)境:eclipse
打開已安裝完Scala插件的Eclipse,新建一個(gè)Scala項(xiàng)目,命名為spark4。
在spark4項(xiàng)目下新建包名,命名為my.scala。將scala object命名為ScalaWordCount。
2、導(dǎo)入運(yùn)行所需要的jar包。
右鍵項(xiàng)目,創(chuàng)建一個(gè)文件夾,名為lib。
將jar包導(dǎo)入進(jìn)來,再右鍵jar包,點(diǎn)擊Build Path=>Add to Build Path。(可以去我的資源里面下載spark1.x hadoop2.x)
3、編寫Scala語句,并統(tǒng)計(jì)用戶收藏?cái)?shù)據(jù)中,每個(gè)用戶收藏商品數(shù)量。
package my.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ScalaWordCount {
def main(args: Array[String]) {
//創(chuàng)建Spark的配置對象sparkConf,設(shè)置Spark程序運(yùn)行時(shí)的配置信息;
val conf = new SparkConf()
conf.setMaster("local") .setAppName("scalawordcount")
//創(chuàng)建SparkContext對象,SparkContext是Spark程序所有功能的唯一入口,無論采用Scala、Java還是Python都必須有一個(gè)SparkContext;
val sc = new SparkContext(conf)
val rdd = sc.textFile("hdfs://localhost:9000/myspark/buyer_favorite1") //根據(jù)具體的數(shù)據(jù)來源,通過SparkContext來創(chuàng)建RDD;
//對初始的RDD進(jìn)行Transformation級別的處理。(首先將每一行的字符串拆分成單個(gè)的單詞,然后在單詞拆分的基礎(chǔ)上對每個(gè)單詞實(shí)例計(jì)數(shù)為1;
//最后,在每個(gè)單詞實(shí)例計(jì)數(shù)為1的基礎(chǔ)上統(tǒng)計(jì)每個(gè)單詞在文件出現(xiàn)的總次數(shù))。
rdd.map(line => (line.split("\t")(0), 1))
.reduceByKey(_ + _)
.collect()
.foreach(println)
sc.stop()
}
}
在控制界面console中查看的輸出結(jié)果。
三、創(chuàng)建Java工程項(xiàng)目
再次右鍵點(diǎn)擊項(xiàng)目名,新建package,將包命名為my.java 。
右鍵點(diǎn)擊包my.java,新建Class,命名為JavaWordCount。
1、編寫Java代碼,統(tǒng)計(jì)用戶收藏?cái)?shù)據(jù)中,每個(gè)用戶收藏商品數(shù)量。
package my.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile("\t");
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/myspark/buyer_favorite1");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
String word[]=s.split("\t",2);
return Arrays.asList(word[0]);
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
System.out.println(counts.collect());
counts.saveAsTextFile("hdfs://localhost:9000/myspark/out");
ctx.stop();
}
}
2、在linux終端查看輸出結(jié)果
執(zhí)行如下命令查看結(jié)果,前提是已啟動(dòng)集群文章來源:http://www.zghlxwxcb.cn/news/detail-553286.html
hadoop fs -cat /myspark/out/part-00000
寫在最后
由此可以看出,scala語言在編寫spark程序時(shí)的優(yōu)越性,簡短精煉。文章來源地址http://www.zghlxwxcb.cn/news/detail-553286.html
到了這里,關(guān)于Spark—通過Java、Scala API實(shí)現(xiàn)WordCount案例的基本操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!