實(shí)驗(yàn)三: Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)
1. 實(shí)驗(yàn)?zāi)康?/h3>
- 了解Spark中的基本概念和主要思想,熟悉Spark與MapReduce的區(qū)別;
- 掌握基本的Spark編程,實(shí)現(xiàn)基礎(chǔ)RDD編程;
- 了解HBase的基本特性及其適用場景;
- 熟悉HBase Shell常用命令;
- 學(xué)習(xí)使用HBase的Java API,編程實(shí)現(xiàn)HBase常用功能;
2. 實(shí)驗(yàn)環(huán)境
實(shí)驗(yàn)平臺:基于實(shí)驗(yàn)一搭建的虛擬機(jī)Hadoop大數(shù)據(jù)實(shí)驗(yàn)平臺上的Spark集群,HBase;
編程語言:JAVA(推薦使用)、Python等;
3. 實(shí)驗(yàn)內(nèi)容
3.1 Spark
3.1.0 Spark簡介
首先啟動HDFS、YARN、Spark
// 啟動HDFS(cluster1上)
$ start-dfs.sh
// 啟動YARN(cluster1上)
$ start-yarn.sh
// 運(yùn)行Spark(cluster1上) 運(yùn)行spark前需啟動hadoop的HDFS和YARN
$ start-master.sh
$ start-slaves.sh
3.1.1 功能實(shí)現(xiàn)
3.1.1.1 創(chuàng)建RDD
創(chuàng)建RDD,并熟悉RDD中的轉(zhuǎn)換操作,行動操作,并給出相應(yīng)實(shí)例;
// 在cluster2 上啟動mysql
# /etc/init.d/mysql.server start -user=mysql
// 進(jìn)入Sprk shell 進(jìn)行交互式編程
$ spark-shell --master local[4]
--master
選項(xiàng)指定分布式集群的主URL, local[4]
表示在本地運(yùn)行4個(gè)線程。(由于我們的數(shù)據(jù)比較小,因此先在本地進(jìn)行測試)
- 轉(zhuǎn)化操作
對于RDD而言,每一次轉(zhuǎn)換操作都會產(chǎn)生不同的RDD,供給下一個(gè)操作使用。RDD的轉(zhuǎn)換過程是惰性求值的,也就是說,整個(gè)轉(zhuǎn)換過程只是記錄了轉(zhuǎn)換的軌跡,并不會發(fā)生真正的計(jì)算,只有遇到行動操作時(shí),才會觸發(fā)“從頭到尾”的真正的計(jì)算。
最常用的轉(zhuǎn)化操作是map()
和filter()
,轉(zhuǎn)化操作map()
接收一個(gè)函數(shù),把這個(gè)函數(shù)用于RDD 中的每個(gè)元素,將函數(shù)的返回結(jié)果作為結(jié)果RDD 中對應(yīng)元素的值。而轉(zhuǎn)化操作filter()
則接收一個(gè)函數(shù),并將RDD 中滿足該函數(shù)的元素放入新的RDD 中返回。
常用的轉(zhuǎn)換操作如下表:
操作 | 含義 |
---|---|
filter(func) | 篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集 |
map(func) | 將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集 |
flatMap(func) | 與map()相似,但每個(gè)輸入元素都可以映射到0或多個(gè)輸出結(jié)果 |
groupByKey() | 應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, Iterable)形式的數(shù)據(jù)集 |
reduceByKey(func) | 應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K,V)形式的數(shù)據(jù)集,其中每個(gè)值是將每個(gè)key傳遞到func中進(jìn)行聚合后的結(jié)果 |
下面將采用兩個(gè)例子實(shí)現(xiàn)轉(zhuǎn)換操作,具體操作如下圖所示:
// 用map() 對RDD 中的所有數(shù)求平方
scala> val input = sc.parallelize(List(1, 2, 3, 4))
scala> val result = input.map(x => x * x)
scala> println(result.collect().mkString(","))
- 上述代碼解釋
- 可以調(diào)用SparkContext的
parallelize
方法,從一個(gè)已經(jīng)存在的集合(數(shù)組)上創(chuàng)建RDD。其中List數(shù)組中的每個(gè)元素都是一個(gè)RDD。 - map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集。
- input.map(x=>x*x)的含義是,依次取出input這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的變量x,然后,執(zhí)行λ表達(dá)式的函數(shù)體部分“x*x”,也就是把變量x的值求平方后,作為函數(shù)的返回值,并作為一個(gè)元素放入到新的 RDD(即 result)中。最終,新生成的RDD(即result),包含了4個(gè)Int類型的元素,即1、4、9、16。
- 可以調(diào)用SparkContext的
最終打印元素1,4,9,16
, 說明平方求解成功。
// 用filter過濾其中為1的值
scala> val result2 = input.filter(x => x!=1)
scala> println(result2.collect().mkString(","))
- 上述代碼解釋
- filter(func)操作會篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集
- lines.filter()操作,filter()的輸入?yún)?shù)x=>x!=1是一個(gè)匿名函數(shù),或者被稱為“λ表達(dá)式”。該操作的含義是,依次取出input這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的x變量,如果x不為0,就加入到新的RDD中。
最終打印元素2,3,4
,成功過濾掉1
-
行動操作
行動操作是真正觸發(fā)計(jì)算的地方。Spark程序只有執(zhí)行到行動操作時(shí),才會執(zhí)行真正的計(jì)算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動操作得到結(jié)果。
常用RDD行動操作API
操作 含義 count() 返回?cái)?shù)據(jù)集中的元素個(gè)數(shù) collect() 以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素 first() 返回?cái)?shù)據(jù)集中的第一個(gè)元素 take(n) 以數(shù)組的形式返回?cái)?shù)據(jù)集中的前n個(gè)元素 reduce(func) 通過函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素 foreach(func) 將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行 -
reduce()
reduce()
接收一個(gè)函數(shù)作為參數(shù),這個(gè)函數(shù)要操作兩個(gè)RDD 的元素類型的數(shù)據(jù)并返回一個(gè)同樣類型的新元素。一個(gè)簡單的例子就是函數(shù)+,可以用它來對我們的RDD 進(jìn)行累加。使用reduce(),可以很方便地計(jì)算出RDD中所有元素的總和、元素的個(gè)數(shù),以及其他類型的聚合操作。scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7)) scala> var sum = rdd.reduce((x, y) => x + y)
在執(zhí)行rdd.reduce((x,y)=>x+y)時(shí),系統(tǒng)會把rdd的第1個(gè)元素1傳入給參數(shù)x,把rdd的第2個(gè)元素2傳入給參數(shù)y,執(zhí)行x+y計(jì)算得到求和結(jié)果3;然后,把這個(gè)求和的結(jié)果3傳入給參數(shù)x,把rdd的第3個(gè)元素3傳入給參數(shù)y,執(zhí)行a+b計(jì)算得到求和結(jié)果6,以此類推,最終結(jié)果應(yīng)該是數(shù)組中所有數(shù)字的和。
輸出結(jié)果為28,成功的計(jì)算出了rdd中所有元素的求和。
-
aggregate() 函數(shù)則把我們從返回值類型必須與所操作的RDD 類型相同的限制中解放出來。與fold() 類似,使用aggregate() 時(shí),需要提供我們期待返回的類型的初始值。然后通過一個(gè)函數(shù)把RDD 中的元素合并起來放入累加器。考慮到每個(gè)節(jié)點(diǎn)是在本地進(jìn)行累加的,最終,還需要提供第二個(gè)函數(shù)來將累加器兩兩合并。
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7)) scala> var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2)) scala> var avg = result._1/result._2.toDouble
-
第二步驟輸出的是一個(gè)二元組,其中第一個(gè)元素是列表中所有數(shù)的累加和,第二個(gè)元素是列表中元素的個(gè)數(shù)。
第三步驟輸出的是列表的平均值。
3.1.1.2 持久化操作
了解如何將RDD中的計(jì)算過程進(jìn)行持久化操作,并給出具體代碼;
-
持久化操作
Spark RDD 是惰性求值的,而有時(shí)我們希望能多次使用同一個(gè)RDD。如果簡單地對RDD 調(diào)用行動操作,Spark 每次都會重算RDD 以及它的所有依賴。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3啻问褂猛唤M數(shù)據(jù)。為了避免多次計(jì)算同一個(gè)RDD,可以讓Spark 對數(shù)據(jù)進(jìn)行持久化。當(dāng)我們讓Spark 持久化存儲一個(gè)RDD 時(shí),計(jì)算出RDD 的節(jié)點(diǎn)會分別保存它們所求出的分區(qū)數(shù)據(jù)。如果一個(gè)有持久化數(shù)據(jù)的節(jié)點(diǎn)發(fā)生故障,Spark 會在需要用到緩存的數(shù)據(jù)時(shí)重算丟失的數(shù)據(jù)分區(qū)。如果希望節(jié)點(diǎn)故障的情況不會拖累我們的執(zhí)行速度,也可以把數(shù)據(jù)備份到多個(gè)節(jié)點(diǎn)上。出于不同的目的,我們可以為RDD 選擇不同的持久化級別,具體的級別如下表所示。
測試一: 練習(xí)使用persist函數(shù)
import org.apache.spark.storage.StorageLevel
var input = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY) // 使用persist函數(shù)將結(jié)果存放到磁盤中。
println(result.count())
println(result.collect().mkString(","))
- 使用persist()方法對一個(gè)RDD標(biāo)記為持久化,之所以說“標(biāo)記為持久化”,是因?yàn)槌霈F(xiàn)persist()語句的地方,并不會馬上計(jì)算生成RDD并把它持久化,而是要等到遇到第一個(gè)行動操作觸發(fā)真正計(jì)算以后,才會把計(jì)算結(jié)果進(jìn)行持久化,持久化后的RDD將會被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中,被后面的行動操作重復(fù)使用。
- persist()的圓括號中包含的是持久化級別參數(shù),可以有如下不同的級別:
- persist(MEMORY_ONLY):表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容
- persist(MEMORY_AND_DISK):表示將RDD作為反序列化的對象存儲在JVM中,如果內(nèi)存不足,超出的分區(qū)將會被存放在磁盤上。
結(jié)果如下圖所示:
- 測試2 : 對比持久化操作效果
# 創(chuàng)建test文本
touch test.txt
# 寫入相應(yīng)內(nèi)容
echo "i am a test file,wonderful hello hello wonderful hello hello" > test.txt
# 上傳到hdfs
./hdfs dfs -put test.txt user/hadoop/lab3
./hdfs dfs -put /home/hadoop/data/data8.txt /lab3
# 查看上傳后的文件內(nèi)容
./hdfs dfs -ls user/hadoop/lab3
// 非持久化操作
val testrdd =sc.textFile("lab3/test.txt")
testrdd.count();
val t1 =System.currentTimeMillis();
println("noCache()=testrdd.count()=" + testrdd.count());
val t2 = System.currentTimeMillis();
val t2_t1 =t2 - t1;
println("nocache()=" + t2_t1);
// 持久化操作
val testrdd =sc.textFile("lab3/test.txt"); .persist(StorageLevel.MEMORY_ONLY());
testrdd.count();
val t1 =System.currentTimeMillis();
println("noCache()=testrdd.count()=" + testrdd.count());
val t2 = System.currentTimeMillis();
val t2_t1 = t2 - t1;
println("cache()=" + t2_t1);
可以看到?jīng)]有使用持久化操作的運(yùn)行時(shí)間是573ms, 而使用了持久化操作的運(yùn)行時(shí)間為292。因此持久化操作過程給性能帶來了比較大的提升。
3.1.1.3 數(shù)據(jù)讀取與保存
了解Spark的數(shù)據(jù)讀取與保存操作,嘗試完成csv文件的讀取和保存;
# 首先上傳實(shí)驗(yàn)數(shù)據(jù)五到hdfs文件系統(tǒng)中(改為英文名'test5.csv')
./hdfs dfs -put /home/hadoop/data/test5.csv /user/hadoop/lab3
# 查看上傳后的文件內(nèi)容
./hdfs dfs -ls /user/hadoop/lab3
import java.io.StringReader;
import au.com.bytecode.opencsv.CSVReader;
val input=sc.textFile("/user/hadoop/lab3/test5.csv");
input.foreach(println);
val result =input.map{line =>val reader =new CSVReader(new StringReader(line));reader.readNext();}
- 上述源碼的解釋
- Spark采用textFile()方法來從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD,該方法把文件的URI作為參數(shù),這個(gè)URI可以是本地文件系統(tǒng)的地址、分布式文件系統(tǒng)HDFS的地址。
- 首先使用textFile()方法從文件中加載數(shù)據(jù),然后,使用map()函數(shù)轉(zhuǎn)換得到相應(yīng)的鍵值對RDD。
- 執(zhí)行
sc.textFile()
方法以后,Spark 從本地文件test5.txt 中加載數(shù)據(jù)到內(nèi)存,在內(nèi)存中生成一個(gè)RDD對象lines,lines是org.apache.spark.rdd.RDD這個(gè)類的一個(gè)實(shí)例,這個(gè)RDD里面包含了若干個(gè)元素,每個(gè)元素的類型是 String 類型,也就是說,從 test5.txt 文件中讀取出來的每一行文本內(nèi)容,都成為RDD中的一個(gè)元素
如下圖所示,成功的讀取test5.csv的數(shù)據(jù)
并將結(jié)果保存到了result中。
3.1.2 WordCount實(shí)驗(yàn)
編程熟悉Spark中的鍵值對操作,利用Spark的API完成wordcount實(shí)驗(yàn),即統(tǒng)計(jì)一段文本中每個(gè)單詞的出現(xiàn)總數(shù),如 a b c d a c, 結(jié)果為(a , 2),(b , 1),(c , 2),(d , 1)。
- 代碼詳細(xì)說明
- 首先使用textFile()方法從文件中加載數(shù)據(jù)
- flatmap的輸入?yún)?shù)line => line.split(" “)是一個(gè)λ表達(dá)式。lines.map(line => line.split(” “))的含義是,依次取出lines這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的line變量,然后,執(zhí)行λ表達(dá)式的函數(shù)體部分line.split(” “)。line.split(” ")的功能是,以空格作為分隔符把line拆分成一個(gè)個(gè)單詞,拆分后得到的單詞都封裝在一個(gè)數(shù)組對象中,成為新的RDD(即words)的一個(gè)元素。
- 從input轉(zhuǎn)換得到一個(gè)新的RDD(即wordArray),wordArray中的每個(gè)元素都是一個(gè)數(shù)組對象。flatmap也就是把wordArray中的每個(gè)RDD元素都“拍扁”成多個(gè)元素,最終,所有這些被拍扁以后得到的元素,構(gòu)成一個(gè)新的RDD,即words。
- map(word => (word,1))函數(shù)的作用是,取出RDD中的每個(gè)元素,也就是每個(gè)單詞,賦值給word,然后把word轉(zhuǎn)換成(word,1)的鍵值對形式。
- reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, V)形式的數(shù)據(jù)集,其中的每個(gè)值是將每個(gè)key傳遞到函數(shù)func中進(jìn)行聚合后得到的結(jié)果。
- 名稱為word的RDD中包含的每個(gè)元素都是<String,Int>類型,也就是(K,V)鍵值對類型。words.reduceByKey((a,b)=>a+b)操作執(zhí)行以后,所有key相同的鍵值對,它們的value首先被歸并成一個(gè)新的鍵值對,例如(“xxx”,(1,1,1))。然后,使用func函數(shù)把(1,1,1)聚合到一起,這里的func函數(shù)是一個(gè)λ表達(dá)式,即(a,b)=>a+b,它的功能是把(1,1,1)這個(gè)value-list中的每個(gè)元素進(jìn)行匯總求和,首先,把value-list中的第1個(gè)元素(即1)賦值給參數(shù)a,把value-list中的第2個(gè)元素(也是1)賦值給參數(shù)b,執(zhí)行a+b得到2,然后,繼續(xù)對value-list中的元素執(zhí)行下一次計(jì)算,把剛才求和得到的2賦值給a,把value-list中的第3個(gè)元素(即1)賦值給b,再次執(zhí)行a+b得到3。最終,就得到聚合后的結(jié)果(“xxx”,3)。
- 最后,執(zhí)行了語句wordcount.foreach(elem=>println(elem)),該語句會依次遍歷 rdd 中的每個(gè)元素,把當(dāng)前遍歷到的元素賦值給變量 elem,并使用 println(elem)打印出 elem的值。實(shí)際上,rdd.foreach(elem=>println(elem))可以被簡化成rdd.foreach(println),效果是一樣的。
object WordCount { //定義wordcount類,用來統(tǒng)計(jì)一段文本中每個(gè)單詞的出現(xiàn)總數(shù)
def main(args: Array[String]) {
val inputFile = "/home/hadoop/data/test6.txt" // 原始數(shù)據(jù)文件
val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
conf.set("spark.testing.memory", "500000000") // 設(shè)置配置信息
val sc = new SparkContext(conf)
val textFile = sc.textFile(inputFile) // 讀取數(shù)據(jù)
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 文本分割并計(jì)數(shù),格式為(word,number)
wordCount.foreach(println) // 打印wordCount
wordCount.saveAsTextFile("/home/hadoop/lab3/output") // 將結(jié)果保存到output文件夾下
}
}
然后將WordCount.scala代碼上傳到/home/hadoop/lab3
上,并編譯和運(yùn)行。
cd /home/hadoop/lab3
//編譯
$ scalac -cp /usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: WordCount.scala
//運(yùn)行
$ java -cp /usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: WordCount
終端輸出結(jié)果如下圖所示:
在/home/hadoop/lab3
下會生成output文件夾,以及文件夾下的輸出文件。
3.1.3 累加器和廣播變量
查閱資料,了解Spark中累加器和廣播變量的作用,并舉例實(shí)驗(yàn)累加器和廣播變量,嘗試體會兩者的區(qū)別。
在 Spark中,提供了兩種類型的共享變量:累加器 (accumulator) 與廣播變量 (broadcast variable)
- 累加器:用來對信息進(jìn)行聚合,主要用于累計(jì)計(jì)數(shù)等場景
- 廣播變量:主要用于在節(jié)點(diǎn)間高效分發(fā)大對象。
對于正常的累計(jì)求和,如果在集群模式中使用下面的代碼進(jìn)行計(jì)算,會發(fā)現(xiàn)執(zhí)行結(jié)果并非預(yù)期:
var counter = 0
val data = Array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
println(counter)
counter 最后的結(jié)果是 0,導(dǎo)致這個(gè)問題的主要原因是閉包。在實(shí)際計(jì)算時(shí),Spark 會將對 RDD 操作分解為 Task,Task 運(yùn)行在 Worker Node 上。在執(zhí)行之前,Spark 會對任務(wù)進(jìn)行閉包,如果閉包內(nèi)涉及到自由變量,則程序會進(jìn)行拷貝,并將副本變量放在閉包中,之后閉包被序列化并發(fā)送給每個(gè)執(zhí)行者。因此,當(dāng)在 foreach 函數(shù)中引用 counter 時(shí),它將不再是 Driver 節(jié)點(diǎn)上的 counter,而是閉包中的副本 counter,默認(rèn)情況下,副本 counter 更新后的值不會回傳到 Driver,所以 counter 的最終值仍然為零。
所以在遇到此類問題時(shí)應(yīng)優(yōu)先使用累加器。
3.1.3.1 累加器
累加器的原理:
就是將每個(gè)副本變量的最終值傳回 Driver,由 Driver 聚合后得到最終值,并更新原始變量。
//創(chuàng)建一個(gè)accumulator變量
scala> val acc = sc.accumulator(0, "Accumulator")
//add方法可以相加
scala> sc.parallelize(Array(1,2,3,4,5)).foreach(x => acc.add(x))
scala> acc.value
求和結(jié)果為 1+2+3+4+5=15
//+=也可以相加
scala> sc.parallelize(Array(1,2,3,4,5)).foreach(x => acc += x)
scala> acc.value
求和的結(jié)果為15+15=30
3.1.3.2 廣播變量
廣播變量的做法很簡單:就是不把副本變量分發(fā)到每個(gè) Task 中,而是將其分發(fā)到每個(gè) Executor,Executor 中的所有 Task 共享一個(gè)副本變量。Spark提供的Broadcast Variable,是只讀的。并且在每個(gè)節(jié)點(diǎn)上只會有一份副本,而不會為每個(gè)task都拷貝一份副本。因此其最大作用,就是減少變量到各個(gè)節(jié)點(diǎn)的網(wǎng)絡(luò)傳輸消耗,以及在各個(gè)節(jié)點(diǎn)上的內(nèi)存消耗。此外,spark自己內(nèi)部也使用了高效的廣播算法來減少網(wǎng)絡(luò)消耗。
調(diào)用SparkContext的broadcast()方法,來針對某個(gè)變量創(chuàng)建廣播變量。然后在算子的函數(shù)內(nèi),使用到廣播變量時(shí),每個(gè)節(jié)點(diǎn)只會拷貝一份副本了。每個(gè)節(jié)點(diǎn)可以使用廣播變量的value()方法獲取值。Broadcast是只讀的。
使用Broadcast變量的步驟:
- 調(diào)用SparkContext.broadcast方法創(chuàng)建一個(gè)Broadcast[T]對象。
任何序列化的類型都可以這么實(shí)現(xiàn)。 - 通過value屬性訪問改對象的值(Java之中為value()方法)
- 變量只會被發(fā)送到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個(gè)值不會影響到別的節(jié)點(diǎn))
// 把一個(gè)數(shù)組定義為一個(gè)廣播變量
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
// 之后用到該數(shù)組時(shí)應(yīng)優(yōu)先使用廣播變量,而不是原值
sc.parallelize(broadcastVar.value).map(_ * 10).collect()
輸出的結(jié)果是對應(yīng)Array的元素乘以10,結(jié)果為(10,20,30,40,50)
3.2 HBase
3.2.1 創(chuàng)建表格
通過Hbase的shell命令創(chuàng)建HBase列式存儲數(shù)據(jù)表格,其中每一行的數(shù)據(jù)格式如下
// 在cluster1 上啟動hdfs
$ start-dfs.sh
// 在cluster1 上啟動habase
$ start-yarn.sh
// 在cluster1 上啟動habase
$ start-hbase.sh
$ hbase shell
// 創(chuàng)建表student, 列簇名分別為information,score,stat_score
> create 'student','information','score','stat_score'
// describe查看student表的基本信息
describe 'student'
// 使用put函數(shù)進(jìn)行數(shù)據(jù)的插入
// student: 表名
// 1001 學(xué)號
// information:name 列簇名:列名
// Tom 值
put 'student','1001','information:name','Tom'
put 'student','1001','information:sex','male'
put 'student','1001','information:age','20'
put 'student','1001','score:123001','99'
put 'student','1001','score:123002','98'
put 'student','1001','score:123003','97'
put 'student','1001','stat_score:sum','294'
put 'student','1001','stat_score:avg','98'
// 查看student表
scan 'student'
3.2.2 插入數(shù)據(jù)
請使用HBASE提供的API編程,實(shí)現(xiàn)向1)創(chuàng)建的HBase表中插入類似于下表中的數(shù)據(jù)(完整數(shù)據(jù)在附錄中),列簇3部分先用”NULL”補(bǔ)充。
主要思路:
- 首先構(gòu)造ReadFile函數(shù)用于讀取每一行的字符串,并保存到lines數(shù)組中。
- 然后allAll函數(shù)遍歷lines數(shù)組,并對每一行進(jìn)行分割,得到"學(xué)號|姓名|性別|年齡"
- 最后調(diào)用addRecord函數(shù)進(jìn)行數(shù)據(jù)的插入
核心代碼如下:
// 從文件中逐行讀取數(shù)據(jù)
public static List<String> ReadFile(String filename) {
List<String> lines = null;
try {
lines = Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
} catch (IOException e) {
e.printStackTrace();
}
return lines;
}
// 一次性添加所有的學(xué)生信息
public static void addALL(String tableName,String[] columnFamilys) throws Exception {
// 讀取data7.txt
// 文件中的數(shù)據(jù)格式:學(xué)號|姓名|性別|年齡
List<String> list7 = ReadFile("/home/hadoop/data7.txt");
//寫入student information
for (String line : list7) {
String[] temp = line.split("\\s+");
String Id = temp[0];
String Name = temp[1];
String Sex = temp[2];
String Age = temp[3];
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"name", Name);
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"age",Age);
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"sex",Sex);
//提示
System.out.println("學(xué)生學(xué)號:" + Id + ",姓名:" + Name + ",性別:" + Sex + ",年齡:" + Age);
}
//讀取data8.txt
//文件中的數(shù)據(jù)格式:學(xué)號|課程號|成績
List<String> list8 = ReadFile("/home/hadoop/data8.txt");
//寫入student score
for (String line : list8) {
String[] temp = line.split("\\s+");
String Id = temp[0];
String Cno = temp[1];
String Score = temp[2];
HBaseJavaAPI.addRecord(tableName,Id, columnFamilys[1], Cno,Score);
//提示
System.out.println("學(xué)生學(xué)號:" + Id +",課程課號:" + Cno + ",成績:" + Score);
}
}
// 調(diào)用
public static void main(String[] args) {
try {
String tableName = "student";
// 第一步:創(chuàng)建數(shù)據(jù)庫表:“student”
String[] columnFamilys = { "information", "score","stat_score" };
HBaseJavaAPI.createTable(tableName, columnFamilys);
// 第二步:向數(shù)據(jù)表的添加數(shù)據(jù)
// 添加第一行數(shù)據(jù)
if (isExist(tableName)) {
//從文件中讀取信息,添加全部的數(shù)據(jù)
addALL(tableName,columnFamilys);
} else {
System.out.println(tableName + "此數(shù)據(jù)庫表不存在!");
}
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.3 成績統(tǒng)計(jì)
請使用Spark編程實(shí)現(xiàn)對每個(gè)學(xué)生所有課程總成績與平均成績的統(tǒng)計(jì)聚合,并將聚合結(jié)果存儲到1)中創(chuàng)建的HBase表。(可以考慮將聚合后的結(jié)果先存入HDFS文件,再從HDFS文件載入數(shù)據(jù)到HBase,也可以使用API將聚合結(jié)果直接插入到HBase表。)
實(shí)驗(yàn)思路:成績統(tǒng)計(jì)的實(shí)驗(yàn)是通過java編程使用Spark API實(shí)現(xiàn)的 , 首先采用mapTopair
函數(shù)構(gòu)造學(xué)號和成績的鍵值對。然后再使用reduceByKey
函數(shù)進(jìn)行歸約,實(shí)現(xiàn)相同學(xué)號成績的累加。
主要參考JavaRDDLike (Spark 1.6.3 JavaDoc) (apache.org)
// 官網(wǎng)給的函數(shù)原型
<K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
總成績的代碼如下:
//進(jìn)行成績總和的聚合
//輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
/*
其中PairFunction中
第一個(gè)參數(shù)是call 函數(shù)中參數(shù)的輸入的類型,表示從文件中讀取的一行為一個(gè)RDD
第二個(gè)參數(shù)和第三個(gè)參數(shù)分別表示生成的鍵值對的類型。分別對應(yīng)<學(xué)號,成績>
call函數(shù)是代碼的邏輯部分,通過split函數(shù)以空格為分節(jié)符進(jìn)行分割,分割結(jié)果以數(shù)組的形式進(jìn)行存放。
返回的結(jié)果是數(shù)組中的第一列(學(xué)號)、第三列(成績)。
其中成績要從字符串的類型轉(zhuǎn)換成整數(shù)類型,方便后續(xù)的計(jì)算
其中reduceByKey函數(shù)要對Function2函數(shù)進(jìn)行重寫。
Function2中的第一個(gè)參數(shù)是返回值的類型,第二個(gè)和第三個(gè)參數(shù)是call方法的輸入類型。
call方法就是實(shí)現(xiàn)相同key的tuple中value值的累加。
返回值是<學(xué)號,總成績>
*/
JavaPairRDD<String,Integer> SumScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
@Override
public Tuple2<String,Integer> call(String f){
String[] str=f.split(" ");// 分割
return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() { // key值相同的進(jìn)行規(guī)約
@Override
public Integer call(Integer a, Integer b) throws Exception { // 學(xué)號相同的成績求和
return a+b;
}
});
由于每個(gè)學(xué)生都是只有3門課,因此平均成績的算法就是總成績除以3,只需要在上一步的代碼基礎(chǔ)上再重新添加一個(gè) mapTopair
函數(shù)實(shí)現(xiàn)映射,其中鍵值不變(學(xué)號),然后平均成績是之前總成績的1/3。
平均成績的代碼如下:
//進(jìn)行平均成績的聚合
//輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
/*
最后一個(gè)mapTopair 參數(shù)解釋:
其中第一個(gè)參數(shù)Tuple2<String,Integer>是返回值的類型<學(xué)號,平均成績>
第二個(gè)參數(shù)String是學(xué)號
第三個(gè)參數(shù)Integer是總成績
最后一個(gè)call函數(shù)的解釋:
其中stringIntegerTuple2是上一步規(guī)約后得到的鍵值對
返回值類型也是一個(gè)Tuple2的鍵值對,stringIntegerTuple2._1代表的是學(xué)號,stringIntegerTuple2._2/3代表的是平均成績
*/
JavaPairRDD<String,Integer> AvgScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
@Override
public Tuple2<String,Integer> call(String f){
String[] str=f.split(" "); // 分割
return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b; // 學(xué)號相同的成績求和
}
}).mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2._1,stringIntegerTuple2._2/3); // key=學(xué)號|value=平均成績
}
});
# 編譯
$ javac -cp
/usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: Hbase_java.java
# 運(yùn)行
$ java -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: Hbase_java
3.2.4 API編程
請使用Hbase提供的API編程,完成以下指定功能:
3.2.4.1 功能一
先添加一個(gè)學(xué)生用戶,再使用addRecord(String tableName, String row, String[] fields, String[] values);向表tableName、行row(用S_Name表示)和字符串?dāng)?shù)組files指定的單元格中添加對應(yīng)的數(shù)據(jù)values。其中fields中每個(gè)元素如果對應(yīng)的列族下還有相應(yīng)的列限定符的話,用“columnFamily:column”表示。例如,同時(shí)向“Math(123001)”、“Computer Science(123002)”、“English(123003)”三列添加成績時(shí),字符串?dāng)?shù)組fields為{“score: 123001”,”score;123002”,”score: 123003”},數(shù)組values存儲這三門課的成績。
核心代碼如下:
/*
主要采用了`put.add()` 函數(shù)實(shí)現(xiàn)單條數(shù)據(jù)的插入。
其中第一個(gè)參數(shù)表示列簇名
第二個(gè)參數(shù)表示列名
第三個(gè)參數(shù)表示值
*/
// 添加一條數(shù)據(jù)
public static void addRecord(String tableName, String row,
String columnFamily, String column, String value) throws Exception {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(row));// 指定行
// 參數(shù)分別:列族、列、值
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
Bytes.toBytes(value));
table.put(put);
}
3.2.4.2 功能二
scanColumn(String tableName, String column);瀏覽表tableName某一列的數(shù)據(jù),如果某一行記錄中該列數(shù)據(jù)不存在,則返回null。要求當(dāng)參數(shù)column為某一列族名稱時(shí),如果底下有若干個(gè)列限定符,則要列出每個(gè)列限定符代表的列的數(shù)據(jù);當(dāng)參數(shù)column為某一列具體名稱(例如“Score:Math”)時(shí),只需要列出該列的數(shù)據(jù)。
- 主要思路
- 首先用split函數(shù)對數(shù)據(jù)進(jìn)行分割,并把分隔結(jié)果保存在字符串?dāng)?shù)組中。
- 通過判斷數(shù)組的長度來執(zhí)行不同的操作,當(dāng)長度為2時(shí),說明參數(shù)是某一列族名稱時(shí), 只返回該列族的所有值。否則返回該列簇下的所有數(shù)據(jù)
public static void scanColumn(String tableName,String column) throws Exception{
Table table = connection.getTable(TableName.valueOf(tableName));
String[] str= column.split(":"); //將數(shù)據(jù)分割
if(str.length == 2) { // 當(dāng)column為某一列族名稱時(shí), 只返回該列族的所有值
ResultScanner scan = table.getScanner(str[0].getBytes(), str[1].getBytes());
for (Result result : scan) { //遍歷該列族所有數(shù)據(jù)
System.out.println(new String(result.getValue(str[0].getBytes(), str[1].getBytes())));
}
scan.close(); // 關(guān)閉掃描器
}
else{ // 否則返回列簇下的所有數(shù)據(jù)
ResultScanner scan = table.getScanner(str[0].getBytes());
for(Result result :scan){ // 首先遍歷列簇下的不同行
Map<byte[],byte[]> myMap = result.getFamilyMap(Bytes.toBytes(column));
ArrayList<String> cols = new ArrayList<String>();
for(Map.Entry<byte[],byte[]> entry:myMap.entrySet()){
cols.add(Bytes.toString(entry.getKey()));
}
for(String st :cols){ // 然后遍歷該列簇下的不同列族,并打印輸出結(jié)果
System.out.print(st+ " : "+ new String(result.getValue(column.getBytes(),st.getBytes()))+" ~~~ ");
}
System.out.println();
}
scan.close();
}
}
3.2.4.3 功能三
deleteRow(String tableName, String row);刪除表tableName中row指定的行的記錄。
主要思路:
采用Delete
函數(shù)完成單條數(shù)據(jù)的刪除,其中參數(shù)row是學(xué)號值
// 刪除一條(行)數(shù)據(jù)
public static void deleteRow(String tableName, String row) throws Exception {
HTable table = new HTable(conf, tableName);
Delete del = new Delete(Bytes.toBytes(row));
table.delete(del);
}
3.2.5 測試結(jié)果
運(yùn)行如下命令,對代碼進(jìn)行編譯測試
cd ~/lab3
// 編譯運(yùn)行
javac -cp /usr/local/hbase-1.2.6/lib/*: HBaseJavaAPI.java
java -cp /usr/local/hbase-1.2.6/lib/*: HBaseJavaAPI
-
創(chuàng)建表
成功創(chuàng)建student表 -
獲取表中的數(shù)據(jù)
可以根據(jù)學(xué)號獲取單個(gè)學(xué)生的數(shù)據(jù),也可以獲取所有學(xué)生的數(shù)據(jù)。
獲取列簇和列族的值。
-
刪除表中的數(shù)據(jù)
如下圖所示,可以根據(jù)學(xué)號刪除一條記錄。最后也可以刪除整個(gè)數(shù)據(jù)表。
4. 踩坑記錄
- Debug1
【問題描述】 啟動spark-shell的時(shí)候會報(bào)如下錯(cuò)誤not found: value sqlContext
【問題背景】在網(wǎng)上查了很多資料,翻遍了stackoverflow,能嘗試的方法都試過了,但都沒有奏效。因此我又仔細(xì)的查看了報(bào)錯(cuò)的信息,發(fā)現(xiàn)它原來是連接不到cluster2上安裝的數(shù)據(jù)庫。
【解決方案】在啟動spark前,現(xiàn)在cluster2上開啟mysql服務(wù),然后終于成功解決了這個(gè)報(bào)錯(cuò)!
- Debug2
【問題描述】報(bào)錯(cuò)illegal character '\u00a0'
【問題背景】去網(wǎng)上查了下\u00a0
的含義,發(fā)現(xiàn)是空格的意思,因此推測應(yīng)該是有不合法的空格。
【解決方案】刪去等號后的第一個(gè)空格,成功解決。
- Debug3
【問題描述】在編譯運(yùn)行WordCount程序時(shí),報(bào)錯(cuò)Permission denied
。
【問題背景】因?yàn)槲矣玫氖莤ftp傳輸文件,然后將本地?cái)?shù)據(jù)文件上傳到clster1后,默認(rèn)所有者是root,因此導(dǎo)致權(quán)限出錯(cuò)。
【解決方案】運(yùn)行如下命令,修改所有者為hadoop用戶,然后再次編譯成功。
su root
chown -R hadoop:hadoop /home/hadoop/
- Debug4
【問題描述】在編譯HBaseJavaAPI時(shí)報(bào)錯(cuò)如下:
【問題背景】原先在編寫代碼,得到行號采用的方式是rowKV.scanColumn()
會報(bào)錯(cuò)
【解決方案】在網(wǎng)上查了一些資料,采用了另一種方式來獲取行號,修改代碼如下:
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");
然后重新編譯運(yùn)行,成功解決該問題。
- Debug5
【問題描述】在用javac編譯代碼時(shí)報(bào)錯(cuò)如下no source files
【解決方案】在小組同學(xué)的熱心幫助下,我發(fā)現(xiàn)是我在HBaseJavaAPI
前少寫了一個(gè)空格。長記性,下次不會這么粗心了。
javac -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: HBaseJavaAPI.java
java -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: HBaseJavaAPI
- Debug6
【問題描述】由于一開始一開始在本地調(diào)試maven項(xiàng)目時(shí),導(dǎo)入jar包的時(shí)間非常慢。
【問題背景】在網(wǎng)上查資料應(yīng)該是網(wǎng)速的問題,而且我檢查過我已經(jīng)配置過了阿里云鏡像,按理說不會這么慢。
【解決方案】后來,我檢查一下發(fā)現(xiàn)原來是該項(xiàng)目的marven沒有配置,導(dǎo)入到本地配置的setting.xml文件后,1分鐘就下載完了所需的所有依賴。
- Debug7
【問題描述】在本地編譯成功的代碼放到linux上運(yùn)行就會報(bào)錯(cuò)。
【問題背景】在同組同學(xué)的提示下,我發(fā)現(xiàn)是因?yàn)镴ava版本不一致造成的。本地maven默認(rèn)配置是java1.8 但是我們的實(shí)驗(yàn)環(huán)境是java1.7。
【解決方案】修改apache-maven-3.8.4\conf
目錄下的setting.xml文件中的JDK的版本修改為1.7, 修改完成后,由于版本不一致,還需要對代碼進(jìn)行進(jìn)一步的修改。
<profiles>
<profile>
<id>JDK-1.7</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.7</jdk>
</activation>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.compilerVersion>1.7</maven.compiler.compilerVersion>
</properties>
</profile>
- Debug8
【問題描述】由于java的版本不同因此在用spark api編程時(shí),寫法法不一樣。
【解決方案】在使用mapToPair
函數(shù)時(shí),java7和java8 有不同的寫法。以下舉一個(gè)簡單的例子:把數(shù)字類型轉(zhuǎn)換成<奇數(shù)/偶數(shù),數(shù)字> 的類型。
//java8
JavaPairRDD <String,Integer> pairRDD=intRDD.mapToPair(i->(i%2==0)? new Tuple2<String,Integer>("even",i):new Tuple2<String,Integer>("odd",i));
如果用java8的話,一行代碼使用匿名函數(shù)就可以搞定,但是在java7中要復(fù)雜的多。需要重寫PairFunction
函數(shù), 代碼的邏輯放在call
函數(shù)中實(shí)現(xiàn)。
// Java7
JavaPairRDD <String,Integer> pairRDD=intRDD.mapToPair(new PairFunction<Integer,String, Integer>(){
@Override
public Tuple2<String,Integer> call (Integer i) throws Exception{
return (i%2 ==0)? new Tuple2<String,Integer>("even",i):new Tuple2<String,Integer>("odd",i);
}
});
除此之外,還有reduceBykey
函數(shù)也有變化,不能使用匿名函數(shù),而是要重寫Function2中的call方法實(shí)現(xiàn)
-
Debug9
【問題描述】當(dāng)采用集群模式執(zhí)行時(shí),rdd.foreach(println)語句無法打印元素
【問題背景】當(dāng)采用 Local 模式在單機(jī)上執(zhí)行時(shí),rdd.foreach(println)語句會打印出一個(gè)RDD中的所有元素。但是,當(dāng)采用集群模式執(zhí)行時(shí),在Worker節(jié)點(diǎn)上執(zhí)行打印語句是輸出到Worker節(jié)點(diǎn)的stdout中,而不是輸出到任務(wù)控制節(jié)點(diǎn)Driver中,因此,任務(wù)控制節(jié)點(diǎn)Driver中的stdout是不會顯示打印語句的這些輸出內(nèi)容的
【解決方案】為了能夠把所有 Worker 節(jié)點(diǎn)上的打印輸出信息也顯示到Driver中,就需要使用collect()方法。例如,rdd.collect().foreach(println)。
5. 心得體會
這次實(shí)驗(yàn)比上次實(shí)驗(yàn)難度要大一點(diǎn),前前后后花了很多的時(shí)間去做。這次實(shí)驗(yàn)感覺收獲最大的就是自己在編程的時(shí)候,一開始在csdn上找資料,但是講的都不是很全面。最后逼著自己讀了官方的英文文檔,以及閱讀了英文版《Apache Spark 2.x for java Developers》的部分章節(jié)。雖然一開始讀起來挺費(fèi)勁的,但是讀完之后確實(shí)對于整個(gè)原理和代碼的理解會清晰很多,覺得也不是特別的難。除此之外就是,在一開始走了很多彎路,比如說嘗試用sbt打包,但是后來發(fā)現(xiàn)虛擬機(jī)內(nèi)存太小無法安裝。后來受到小組同學(xué)的啟發(fā),覺得可以先在本地sbt打包再傳上去,接下來還會再嘗試下這種方法。最后還是十分感謝小組同學(xué)在本次實(shí)驗(yàn)過程中對我的幫助!
6. 附錄
6.1 實(shí)驗(yàn)數(shù)據(jù)
實(shí)驗(yàn)可以使用“附件2-實(shí)驗(yàn)數(shù)據(jù)”文件夾中的附錄實(shí)驗(yàn)數(shù)據(jù),也可使用自己生成的數(shù)據(jù)。具體數(shù)據(jù)如下:
附錄實(shí)驗(yàn)數(shù)據(jù)五:SPARK編程一(3.1.1)使用的CSV數(shù)據(jù)文件;
附錄實(shí)驗(yàn)數(shù)據(jù)六:SPARK編程二(3.1.2)使用的數(shù)據(jù)文本;
附錄實(shí)驗(yàn)數(shù)據(jù)七:HBase編程(3.2.2)使用的學(xué)生表數(shù)據(jù);文章來源:http://www.zghlxwxcb.cn/news/detail-451016.html
附錄實(shí)驗(yàn)數(shù)據(jù)八:HBase編程(3.2.2)及SPARK編程三(3.2.3)使用的選課表數(shù)據(jù);文章來源地址http://www.zghlxwxcb.cn/news/detail-451016.html
6.2 實(shí)驗(yàn)源碼
6.2.1 WordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.collection.Map
object WordCount {
def main(args: Array[String]) {
val inputFile = "/home/hadoop/data/test6.txt" // 原始數(shù)據(jù)文件
val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
conf.set("spark.testing.memory", "500000000") // 設(shè)置配置信息
val sc = new SparkContext(conf)
val textFile = sc.textFile(inputFile) // 讀取數(shù)據(jù)
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 文本分割并計(jì)數(shù),格式為(word,number)
wordCount.foreach(println) // 打印wordCount
wordCount.saveAsTextFile("/home/hadoop/lab3/output") // 將結(jié)果保存到output文件夾下
}
}
6.2.2 HBaseJavaAPI
/*
* 創(chuàng)建一個(gè)students表,并進(jìn)行相關(guān)操作
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import org.apache.commons.math3.util.Pair;
public class HBaseJavaAPI {
// 聲明靜態(tài)配置
private static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.56.121");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
//判斷表是否存在
private static boolean isExist(String tableName) throws IOException {
HBaseAdmin hAdmin = new HBaseAdmin(conf);
return hAdmin.tableExists(tableName);
}
// 創(chuàng)建數(shù)據(jù)庫表
public static void createTable(String tableName, String[] columnFamilys)
throws Exception {
// 新建一個(gè)數(shù)據(jù)庫管理員
HBaseAdmin hAdmin = new HBaseAdmin(conf);
if (hAdmin.tableExists(tableName)) {
System.out.println("表 "+tableName+" 已存在!");
//System.exit(0);
} else {
// 新建一個(gè)students表的描述
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
// 在描述里添加列族
for (String columnFamily : columnFamilys) {
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
}
// 根據(jù)配置好的描述建表
hAdmin.createTable(tableDesc);
System.out.println("創(chuàng)建表 "+tableName+" 成功!");
}
}
// 刪除數(shù)據(jù)庫表
public static void deleteTable(String tableName) throws Exception {
// 新建一個(gè)數(shù)據(jù)庫管理員
HBaseAdmin hAdmin = new HBaseAdmin(conf);
if (hAdmin.tableExists(tableName)) {
// 關(guān)閉一個(gè)表
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
System.out.println("刪除表 "+tableName+" 成功!");
} else {
System.out.println("刪除的表 "+tableName+" 不存在!");
System.exit(0);
}
}
// 添加一條數(shù)據(jù)
public static void addRecord(String tableName, String row,
String columnFamily, String column, String value) throws Exception {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(row));// 指定行
// 參數(shù)分別:列族、列、值
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
Bytes.toBytes(value));
table.put(put);
}
// 從文件中逐行讀取數(shù)據(jù)
public static List<String> ReadFile(String filename) {
List<String> lines = null;
try {
lines = Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
} catch (IOException e) {
e.printStackTrace();
}
return lines;
}
// 一次性添加所有的學(xué)生信息
public static void addALL(String tableName,String[] columnFamilys) throws Exception {
// 讀取data7.txt
// 文件中的數(shù)據(jù)格式:學(xué)號|姓名|性別|年齡
List<String> list7 = ReadFile("/home/hadoop/data/data7.txt");
//寫入student information
for (String line : list7) {
String[] temp = line.split("\\s+");
String Id = temp[0];
String Name = temp[1];
String Sex = temp[2];
String Age = temp[3];
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"name", Name);
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"age",Age);
HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"sex",Sex);
//提示
System.out.println("學(xué)生學(xué)號:" + Id + ",姓名:" + Name + ",性別:" + Sex + ",年齡:" + Age);
}
//讀取data8.txt
//文件中的數(shù)據(jù)格式:學(xué)號|課程號|成績
List<String> list8 = ReadFile("/home/hadoop/data/data8.txt");
//寫入student score
for (String line : list8) {
String[] temp = line.split("\\s+");
String Id = temp[0];
String Cno = temp[1];
String Score = temp[2];
HBaseJavaAPI.addRecord(tableName,Id, columnFamilys[1], Cno,Score);
//提示
System.out.println("學(xué)生學(xué)號:" + Id +",課程課號:" + Cno + ",成績:" + Score);
}
}
// 刪除一條(行)數(shù)據(jù)
public static void deleteRow(String tableName, String row) throws Exception {
HTable table = new HTable(conf, tableName);
Delete del = new Delete(Bytes.toBytes(row));
table.delete(del);
}
// 刪除多條數(shù)據(jù)
public static void delMultiRows(String tableName, String[] rows)
throws Exception {
HTable table = new HTable(conf, tableName);
List<Delete> delList = new ArrayList<Delete>();
for (String row : rows) {
Delete del = new Delete(Bytes.toBytes(row));
delList.add(del);
}
table.delete(delList);
}
// 獲取一行的數(shù)據(jù)
public static void scanColumn(String tableName, String row) throws Exception {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(row));
Result result = table.get(get);
// 輸出結(jié)果,raw方法返回所有keyvalue數(shù)組
for (KeyValue rowKV : result.raw()) {
System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");
System.out.print("時(shí)間戳:" + rowKV.getTimestamp() + " ");
System.out.print("列族名:" + new String(rowKV.getFamily()) + " ");
System.out.print("列名:" + new String(rowKV.getQualifier()) + " ");
System.out.println("值:" + new String(rowKV.getValue()));
}
}
// 獲取一列的數(shù)據(jù)
public static void scanColumn(String tableName,String column) throws Exception{
Table table = connection.getTable(TableName.valueOf(tableName));
String[] str= column.split(":"); //將數(shù)據(jù)分割
if(str.length == 2) { // 當(dāng)column為某一列族名稱時(shí), 只返回該列族的所有值
ResultScanner scan = table.getScanner(str[0].getBytes(), str[1].getBytes());
for (Result result : scan) {
System.out.println(new String(result.getValue(str[0].getBytes(), str[1].getBytes())));
}
scan.close();
}
else{ // 否則返回列族下的所有數(shù)據(jù)
ResultScanner scan = table.getScanner(str[0].getBytes());
for(Result result :scan){
Map<byte[],byte[]> myMap = result.getFamilyMap(Bytes.toBytes(column));
ArrayList<String> cols = new ArrayList<String>();
for(Map.Entry<byte[],byte[]> entry:myMap.entrySet()){
cols.add(Bytes.toString(entry.getKey()));
}
for(String st :cols){
System.out.print(st+ " : "+ new String(result.getValue(column.getBytes(),st.getBytes()))+" ~~~ ");
}
System.out.println();
}
scan.close();
}
}
// 獲取所有數(shù)據(jù)
public static void getAllRows(String tableName) throws Exception {
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
// 輸出結(jié)果
for (Result result : results) {
for (KeyValue rowKV : result.raw()) {
System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");
System.out.print("時(shí)間戳:" + rowKV.getTimestamp() + " ");
System.out.print("列族名:" + new String(rowKV.getFamily()) + " ");
System.out
.print("列名:" + new String(rowKV.getQualifier()) + " ");
System.out.println("值:" + new String(rowKV.getValue()));
}
}
}
// 主函數(shù)
public static void main(String[] args) {
try {
String tableName = "student";
// 第一步:創(chuàng)建數(shù)據(jù)庫表:“student”
String[] columnFamilys = { "information", "score","stat_score" };
HBaseJavaAPI.createTable(tableName, columnFamilys);
// 第二步:向數(shù)據(jù)表的添加數(shù)據(jù)
// 添加第一行數(shù)據(jù)
if (isExist(tableName)) {
//添加所有的數(shù)據(jù)
addALL(tableName,columnFamilys);
//獲取一條數(shù)據(jù)
System.out.println("-------------獲取學(xué)號為2015002學(xué)生的數(shù)據(jù)--------------");
HBaseJavaAPI.scanColumn(tableName, "2015002");
//獲取所有數(shù)據(jù)
System.out.println("----------------獲取所有數(shù)據(jù)------------------------");
HBaseJavaAPI.getAllRows(tableName);
// 載入配置文件
SparkConf conf=new SparkConf().setAppName("sparkclient").setMaster("local[2]");
conf.set("spark.testing.memory","2147480000");
JavaSparkContext sc=new JavaSparkContext(conf);
//讀入數(shù)據(jù)
JavaRDD<String> File8=sc.textFile("/home/hadoop/data/data8.txt");
File8.persist(StorageLevel.MEMORY_AND_DISK());
//進(jìn)行成績總和的聚合
//輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
JavaPairRDD<String,Integer> SumScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
@Override
public Tuple2<String,Integer> call(String f){
String[] str=f.split(" ");// 分割
return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() { // key值相同的進(jìn)行規(guī)約
@Override
public Integer call(Integer a, Integer b) throws Exception { // 學(xué)號相同的成績求和
return a+b;
}
});
//進(jìn)行平均成績的聚合
//輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
JavaPairRDD<String,Integer> AvgScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
@Override
public Tuple2<String,Integer> call(String f){
String[] str=f.split(" "); // 分割
return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b; // 學(xué)號相同的成績求和
}
}).mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2._1,stringIntegerTuple2._2/3); // key=學(xué)號|value=平均成績
}
});
//迭代輸入
for (Tuple2<String, Integer> stringIntegerTuple2 : SumScore.collect()) {
HBaseJavaAPI.addRecord(tableName,stringIntegerTuple2._1, columnFamilys[2],"sum", stringIntegerTuple2._2.toString());
System.out.println("添加id: "+stringIntegerTuple2._1+",sum "+stringIntegerTuple2._2.toString());
}
for (Tuple2<String, Integer> stringIntegerTuple2 : AvgScore.collect()) {
HBaseJavaAPI.addRecord(tableName,stringIntegerTuple2._1, columnFamilys[2],"avg", stringIntegerTuple2._2.toString());
System.out.println("添加id: "+stringIntegerTuple2._1+",sum "+stringIntegerTuple2._2.toString());
}
System.out.println("----------------獲取所有數(shù)據(jù)------------------------");
HBaseJavaAPI.getAllRows(tableName);
System.out.println("----------------獲取列簇------------------------");
scanColumn("student","information");//只包含列簇名的測試
System.out.println("----------------獲取列族------------------------");
scanColumn("student","information:name");//進(jìn)行查看所有學(xué)生名字的測試
//刪除一條數(shù)據(jù)
//System.out.println("-------------刪除學(xué)號為2015003學(xué)生的數(shù)據(jù)--------------");
//HBaseJavaAPI.deleteRow(tableName, "2015003");
//HBaseJavaAPI.getAllRows(tableName);
//刪除多條數(shù)據(jù)
//System.out.println("--------------刪除多條數(shù)據(jù)------------------");
//String rows[] = new String[] { "qingqing","xiaoxue" };
//HBaseJavaAPI.delMultiRows(tableName, rows);
//HBaseJavaAPI.getAllRows(tableName);
//刪除數(shù)據(jù)庫
//System.out.println("-----------------刪除數(shù)據(jù)庫表-----------------");
//HBaseJavaAPI.deleteTable(tableName);
//System.out.println("表"+tableName+"存在嗎?"+isExist(tableName));
} else {
System.out.println(tableName + "此數(shù)據(jù)庫表不存在!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
到了這里,關(guān)于Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!