国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

這篇具有很好參考價(jià)值的文章主要介紹了Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

實(shí)驗(yàn)三: Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

1. 實(shí)驗(yàn)?zāi)康?/h3>
  • 了解Spark中的基本概念和主要思想,熟悉Spark與MapReduce的區(qū)別;
  • 掌握基本的Spark編程,實(shí)現(xiàn)基礎(chǔ)RDD編程;
  • 了解HBase的基本特性及其適用場景;
  • 熟悉HBase Shell常用命令;
  • 學(xué)習(xí)使用HBase的Java API,編程實(shí)現(xiàn)HBase常用功能;

2. 實(shí)驗(yàn)環(huán)境

實(shí)驗(yàn)平臺:基于實(shí)驗(yàn)一搭建的虛擬機(jī)Hadoop大數(shù)據(jù)實(shí)驗(yàn)平臺上的Spark集群,HBase;

編程語言:JAVA(推薦使用)、Python等;

3. 實(shí)驗(yàn)內(nèi)容

3.1 Spark

3.1.0 Spark簡介

首先啟動HDFS、YARN、Spark

  // 啟動HDFS(cluster1上) 
  $ start-dfs.sh 
  // 啟動YARN(cluster1上) 
  $ start-yarn.sh 
  // 運(yùn)行Spark(cluster1上) 運(yùn)行spark前需啟動hadoop的HDFS和YARN 
  $ start-master.sh 
  $ start-slaves.sh  

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.1.1 功能實(shí)現(xiàn)
3.1.1.1 創(chuàng)建RDD

創(chuàng)建RDD,并熟悉RDD中的轉(zhuǎn)換操作,行動操作,并給出相應(yīng)實(shí)例;

// 在cluster2 上啟動mysql
# /etc/init.d/mysql.server start -user=mysql 
// 進(jìn)入Sprk shell 進(jìn)行交互式編程
$ spark-shell --master local[4]

--master選項(xiàng)指定分布式集群的主URL, local[4]表示在本地運(yùn)行4個(gè)線程。(由于我們的數(shù)據(jù)比較小,因此先在本地進(jìn)行測試)

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • 轉(zhuǎn)化操作

對于RDD而言,每一次轉(zhuǎn)換操作都會產(chǎn)生不同的RDD,供給下一個(gè)操作使用。RDD的轉(zhuǎn)換過程是惰性求值的,也就是說,整個(gè)轉(zhuǎn)換過程只是記錄了轉(zhuǎn)換的軌跡,并不會發(fā)生真正的計(jì)算,只有遇到行動操作時(shí),才會觸發(fā)“從頭到尾”的真正的計(jì)算。

最常用的轉(zhuǎn)化操作是map() filter(),轉(zhuǎn)化操作map() 接收一個(gè)函數(shù),把這個(gè)函數(shù)用于RDD 中的每個(gè)元素,將函數(shù)的返回結(jié)果作為結(jié)果RDD 中對應(yīng)元素的值。而轉(zhuǎn)化操作filter() 則接收一個(gè)函數(shù),并將RDD 中滿足該函數(shù)的元素放入新的RDD 中返回。

常用的轉(zhuǎn)換操作如下表:

操作 含義
filter(func) 篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集
map(func) 將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集
flatMap(func) 與map()相似,但每個(gè)輸入元素都可以映射到0或多個(gè)輸出結(jié)果
groupByKey() 應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, Iterable)形式的數(shù)據(jù)集
reduceByKey(func) 應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K,V)形式的數(shù)據(jù)集,其中每個(gè)值是將每個(gè)key傳遞到func中進(jìn)行聚合后的結(jié)果

下面將采用兩個(gè)例子實(shí)現(xiàn)轉(zhuǎn)換操作,具體操作如下圖所示:

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 用map() 對RDD 中的所有數(shù)求平方
scala> val input = sc.parallelize(List(1, 2, 3, 4))
scala> val result = input.map(x => x * x)
scala> println(result.collect().mkString(","))
  • 上述代碼解釋
    • 可以調(diào)用SparkContext的parallelize方法,從一個(gè)已經(jīng)存在的集合(數(shù)組)上創(chuàng)建RDD。其中List數(shù)組中的每個(gè)元素都是一個(gè)RDD。
    • map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集。
    • input.map(x=>x*x)的含義是,依次取出input這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的變量x,然后,執(zhí)行λ表達(dá)式的函數(shù)體部分“x*x”,也就是把變量x的值求平方后,作為函數(shù)的返回值,并作為一個(gè)元素放入到新的 RDD(即 result)中。最終,新生成的RDD(即result),包含了4個(gè)Int類型的元素,即1、4、9、16。

最終打印元素1,4,9,16, 說明平方求解成功。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 用filter過濾其中為1的值
scala> val result2 = input.filter(x => x!=1)
scala> println(result2.collect().mkString(","))
  • 上述代碼解釋
    • filter(func)操作會篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集
    • lines.filter()操作,filter()的輸入?yún)?shù)x=>x!=1是一個(gè)匿名函數(shù),或者被稱為“λ表達(dá)式”。該操作的含義是,依次取出input這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的x變量,如果x不為0,就加入到新的RDD中。

最終打印元素2,3,4 ,成功過濾掉1
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • 行動操作

    行動操作是真正觸發(fā)計(jì)算的地方。Spark程序只有執(zhí)行到行動操作時(shí),才會執(zhí)行真正的計(jì)算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動操作得到結(jié)果。

    常用RDD行動操作API

    操作 含義
    count() 返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)
    collect() 以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素
    first() 返回?cái)?shù)據(jù)集中的第一個(gè)元素
    take(n) 以數(shù)組的形式返回?cái)?shù)據(jù)集中的前n個(gè)元素
    reduce(func) 通過函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素
    foreach(func) 將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行
    • reduce()

      reduce()接收一個(gè)函數(shù)作為參數(shù),這個(gè)函數(shù)要操作兩個(gè)RDD 的元素類型的數(shù)據(jù)并返回一個(gè)同樣類型的新元素。一個(gè)簡單的例子就是函數(shù)+,可以用它來對我們的RDD 進(jìn)行累加。使用reduce(),可以很方便地計(jì)算出RDD中所有元素的總和、元素的個(gè)數(shù),以及其他類型的聚合操作。

      scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
      scala> var sum = rdd.reduce((x, y) => x + y)
      

      在執(zhí)行rdd.reduce((x,y)=>x+y)時(shí),系統(tǒng)會把rdd的第1個(gè)元素1傳入給參數(shù)x,把rdd的第2個(gè)元素2傳入給參數(shù)y,執(zhí)行x+y計(jì)算得到求和結(jié)果3;然后,把這個(gè)求和的結(jié)果3傳入給參數(shù)x,把rdd的第3個(gè)元素3傳入給參數(shù)y,執(zhí)行a+b計(jì)算得到求和結(jié)果6,以此類推,最終結(jié)果應(yīng)該是數(shù)組中所有數(shù)字的和。

      輸出結(jié)果為28,成功的計(jì)算出了rdd中所有元素的求和。Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

    • aggregate() 函數(shù)則把我們從返回值類型必須與所操作的RDD 類型相同的限制中解放出來。與fold() 類似,使用aggregate() 時(shí),需要提供我們期待返回的類型的初始值。然后通過一個(gè)函數(shù)把RDD 中的元素合并起來放入累加器。考慮到每個(gè)節(jié)點(diǎn)是在本地進(jìn)行累加的,最終,還需要提供第二個(gè)函數(shù)來將累加器兩兩合并。

      scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7)) 
      scala> var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))
      scala> var avg = result._1/result._2.toDouble
      

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

第二步驟輸出的是一個(gè)二元組,其中第一個(gè)元素是列表中所有數(shù)的累加和,第二個(gè)元素是列表中元素的個(gè)數(shù)。

第三步驟輸出的是列表的平均值。
3.1.1.2 持久化操作

了解如何將RDD中的計(jì)算過程進(jìn)行持久化操作,并給出具體代碼;

  • 持久化操作

    Spark RDD 是惰性求值的,而有時(shí)我們希望能多次使用同一個(gè)RDD。如果簡單地對RDD 調(diào)用行動操作,Spark 每次都會重算RDD 以及它的所有依賴。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3啻问褂猛唤M數(shù)據(jù)。為了避免多次計(jì)算同一個(gè)RDD,可以讓Spark 對數(shù)據(jù)進(jìn)行持久化。當(dāng)我們讓Spark 持久化存儲一個(gè)RDD 時(shí),計(jì)算出RDD 的節(jié)點(diǎn)會分別保存它們所求出的分區(qū)數(shù)據(jù)。如果一個(gè)有持久化數(shù)據(jù)的節(jié)點(diǎn)發(fā)生故障,Spark 會在需要用到緩存的數(shù)據(jù)時(shí)重算丟失的數(shù)據(jù)分區(qū)。如果希望節(jié)點(diǎn)故障的情況不會拖累我們的執(zhí)行速度,也可以把數(shù)據(jù)備份到多個(gè)節(jié)點(diǎn)上。出于不同的目的,我們可以為RDD 選擇不同的持久化級別,具體的級別如下表所示。
    Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

測試一: 練習(xí)使用persist函數(shù)

import org.apache.spark.storage.StorageLevel
var input = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY) // 使用persist函數(shù)將結(jié)果存放到磁盤中。
println(result.count()) 
println(result.collect().mkString(",")) 
  • 使用persist()方法對一個(gè)RDD標(biāo)記為持久化,之所以說“標(biāo)記為持久化”,是因?yàn)槌霈F(xiàn)persist()語句的地方,并不會馬上計(jì)算生成RDD并把它持久化,而是要等到遇到第一個(gè)行動操作觸發(fā)真正計(jì)算以后,才會把計(jì)算結(jié)果進(jìn)行持久化,持久化后的RDD將會被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中,被后面的行動操作重復(fù)使用。
  • persist()的圓括號中包含的是持久化級別參數(shù),可以有如下不同的級別:
    • persist(MEMORY_ONLY):表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容
    • persist(MEMORY_AND_DISK):表示將RDD作為反序列化的對象存儲在JVM中,如果內(nèi)存不足,超出的分區(qū)將會被存放在磁盤上。

結(jié)果如下圖所示:
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • 測試2 : 對比持久化操作效果
# 創(chuàng)建test文本
touch test.txt
# 寫入相應(yīng)內(nèi)容
echo "i am a test file,wonderful hello hello wonderful hello hello" > test.txt
# 上傳到hdfs
./hdfs dfs -put test.txt  user/hadoop/lab3
./hdfs dfs -put /home/hadoop/data/data8.txt  /lab3
# 查看上傳后的文件內(nèi)容
./hdfs dfs -ls user/hadoop/lab3

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 非持久化操作
val testrdd =sc.textFile("lab3/test.txt")
testrdd.count();
val t1 =System.currentTimeMillis();
println("noCache()=testrdd.count()=" + testrdd.count());
val t2 = System.currentTimeMillis();
val t2_t1 =t2 - t1;
println("nocache()=" + t2_t1);

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 持久化操作
val testrdd =sc.textFile("lab3/test.txt"); .persist(StorageLevel.MEMORY_ONLY());
testrdd.count();
val t1 =System.currentTimeMillis();
println("noCache()=testrdd.count()=" + testrdd.count());
val t2 = System.currentTimeMillis();
val t2_t1 = t2 - t1;
println("cache()=" + t2_t1);

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)
可以看到?jīng)]有使用持久化操作的運(yùn)行時(shí)間是573ms, 而使用了持久化操作的運(yùn)行時(shí)間為292。因此持久化操作過程給性能帶來了比較大的提升。

3.1.1.3 數(shù)據(jù)讀取與保存

了解Spark的數(shù)據(jù)讀取與保存操作,嘗試完成csv文件的讀取和保存;

# 首先上傳實(shí)驗(yàn)數(shù)據(jù)五到hdfs文件系統(tǒng)中(改為英文名'test5.csv')
./hdfs dfs -put /home/hadoop/data/test5.csv /user/hadoop/lab3
# 查看上傳后的文件內(nèi)容
./hdfs dfs -ls /user/hadoop/lab3

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

import java.io.StringReader;
import au.com.bytecode.opencsv.CSVReader;
val input=sc.textFile("/user/hadoop/lab3/test5.csv");
input.foreach(println);
val result =input.map{line =>val reader =new CSVReader(new StringReader(line));reader.readNext();} 
  • 上述源碼的解釋
    • Spark采用textFile()方法來從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD,該方法把文件的URI作為參數(shù),這個(gè)URI可以是本地文件系統(tǒng)的地址、分布式文件系統(tǒng)HDFS的地址。
    • 首先使用textFile()方法從文件中加載數(shù)據(jù),然后,使用map()函數(shù)轉(zhuǎn)換得到相應(yīng)的鍵值對RDD。
    • 執(zhí)行sc.textFile()方法以后,Spark 從本地文件test5.txt 中加載數(shù)據(jù)到內(nèi)存,在內(nèi)存中生成一個(gè)RDD對象lines,lines是org.apache.spark.rdd.RDD這個(gè)類的一個(gè)實(shí)例,這個(gè)RDD里面包含了若干個(gè)元素,每個(gè)元素的類型是 String 類型,也就是說,從 test5.txt 文件中讀取出來的每一行文本內(nèi)容,都成為RDD中的一個(gè)元素

如下圖所示,成功的讀取test5.csv的數(shù)據(jù)

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

并將結(jié)果保存到了result中。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.1.2 WordCount實(shí)驗(yàn)

編程熟悉Spark中的鍵值對操作,利用Spark的API完成wordcount實(shí)驗(yàn),即統(tǒng)計(jì)一段文本中每個(gè)單詞的出現(xiàn)總數(shù),如 a b c d a c, 結(jié)果為(a , 2),(b , 1),(c , 2),(d , 1)。

  • 代碼詳細(xì)說明
    • 首先使用textFile()方法從文件中加載數(shù)據(jù)
    • flatmap的輸入?yún)?shù)line => line.split(" “)是一個(gè)λ表達(dá)式。lines.map(line => line.split(” “))的含義是,依次取出lines這個(gè)RDD中的每個(gè)元素,對于當(dāng)前取到的元素,把它賦值給λ表達(dá)式中的line變量,然后,執(zhí)行λ表達(dá)式的函數(shù)體部分line.split(” “)。line.split(” ")的功能是,以空格作為分隔符把line拆分成一個(gè)個(gè)單詞,拆分后得到的單詞都封裝在一個(gè)數(shù)組對象中,成為新的RDD(即words)的一個(gè)元素。
    • 從input轉(zhuǎn)換得到一個(gè)新的RDD(即wordArray),wordArray中的每個(gè)元素都是一個(gè)數(shù)組對象。flatmap也就是把wordArray中的每個(gè)RDD元素都“拍扁”成多個(gè)元素,最終,所有這些被拍扁以后得到的元素,構(gòu)成一個(gè)新的RDD,即words。
    • map(word => (word,1))函數(shù)的作用是,取出RDD中的每個(gè)元素,也就是每個(gè)單詞,賦值給word,然后把word轉(zhuǎn)換成(word,1)的鍵值對形式。
    • reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, V)形式的數(shù)據(jù)集,其中的每個(gè)值是將每個(gè)key傳遞到函數(shù)func中進(jìn)行聚合后得到的結(jié)果。
    • 名稱為word的RDD中包含的每個(gè)元素都是<String,Int>類型,也就是(K,V)鍵值對類型。words.reduceByKey((a,b)=>a+b)操作執(zhí)行以后,所有key相同的鍵值對,它們的value首先被歸并成一個(gè)新的鍵值對,例如(“xxx”,(1,1,1))。然后,使用func函數(shù)把(1,1,1)聚合到一起,這里的func函數(shù)是一個(gè)λ表達(dá)式,即(a,b)=>a+b,它的功能是把(1,1,1)這個(gè)value-list中的每個(gè)元素進(jìn)行匯總求和,首先,把value-list中的第1個(gè)元素(即1)賦值給參數(shù)a,把value-list中的第2個(gè)元素(也是1)賦值給參數(shù)b,執(zhí)行a+b得到2,然后,繼續(xù)對value-list中的元素執(zhí)行下一次計(jì)算,把剛才求和得到的2賦值給a,把value-list中的第3個(gè)元素(即1)賦值給b,再次執(zhí)行a+b得到3。最終,就得到聚合后的結(jié)果(“xxx”,3)。
    • 最后,執(zhí)行了語句wordcount.foreach(elem=>println(elem)),該語句會依次遍歷 rdd 中的每個(gè)元素,把當(dāng)前遍歷到的元素賦值給變量 elem,并使用 println(elem)打印出 elem的值。實(shí)際上,rdd.foreach(elem=>println(elem))可以被簡化成rdd.foreach(println),效果是一樣的。
object WordCount { //定義wordcount類,用來統(tǒng)計(jì)一段文本中每個(gè)單詞的出現(xiàn)總數(shù)
    def main(args: Array[String]) {
        val inputFile =  "/home/hadoop/data/test6.txt"  // 原始數(shù)據(jù)文件
        val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
	conf.set("spark.testing.memory", "500000000") // 設(shè)置配置信息
        val sc = new SparkContext(conf)
                val textFile = sc.textFile(inputFile) // 讀取數(shù)據(jù)
                val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  // 文本分割并計(jì)數(shù),格式為(word,number)
                wordCount.foreach(println) // 打印wordCount
		wordCount.saveAsTextFile("/home/hadoop/lab3/output")  // 將結(jié)果保存到output文件夾下     
    }
}

然后將WordCount.scala代碼上傳到/home/hadoop/lab3 上,并編譯和運(yùn)行。

cd /home/hadoop/lab3
//編譯 
$ scalac -cp /usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: WordCount.scala 
//運(yùn)行
$ java -cp  /usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: WordCount

終端輸出結(jié)果如下圖所示:
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

/home/hadoop/lab3下會生成output文件夾,以及文件夾下的輸出文件。
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.1.3 累加器和廣播變量

查閱資料,了解Spark中累加器和廣播變量的作用,并舉例實(shí)驗(yàn)累加器和廣播變量,嘗試體會兩者的區(qū)別。

在 Spark中,提供了兩種類型的共享變量:累加器 (accumulator) 與廣播變量 (broadcast variable)

  • 累加器:用來對信息進(jìn)行聚合,主要用于累計(jì)計(jì)數(shù)等場景
  • 廣播變量:主要用于在節(jié)點(diǎn)間高效分發(fā)大對象。

對于正常的累計(jì)求和,如果在集群模式中使用下面的代碼進(jìn)行計(jì)算,會發(fā)現(xiàn)執(zhí)行結(jié)果并非預(yù)期:

var counter = 0
val data = Array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
 println(counter)

counter 最后的結(jié)果是 0,導(dǎo)致這個(gè)問題的主要原因是閉包。在實(shí)際計(jì)算時(shí),Spark 會將對 RDD 操作分解為 Task,Task 運(yùn)行在 Worker Node 上。在執(zhí)行之前,Spark 會對任務(wù)進(jìn)行閉包,如果閉包內(nèi)涉及到自由變量,則程序會進(jìn)行拷貝,并將副本變量放在閉包中,之后閉包被序列化并發(fā)送給每個(gè)執(zhí)行者。因此,當(dāng)在 foreach 函數(shù)中引用 counter 時(shí),它將不再是 Driver 節(jié)點(diǎn)上的 counter,而是閉包中的副本 counter,默認(rèn)情況下,副本 counter 更新后的值不會回傳到 Driver,所以 counter 的最終值仍然為零。
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

所以在遇到此類問題時(shí)應(yīng)優(yōu)先使用累加器。

3.1.3.1 累加器

累加器的原理:Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)
就是將每個(gè)副本變量的最終值傳回 Driver,由 Driver 聚合后得到最終值,并更新原始變量。

//創(chuàng)建一個(gè)accumulator變量
scala> val acc = sc.accumulator(0, "Accumulator")
//add方法可以相加
scala> sc.parallelize(Array(1,2,3,4,5)).foreach(x => acc.add(x))
scala> acc.value

求和結(jié)果為 1+2+3+4+5=15

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

//+=也可以相加
scala> sc.parallelize(Array(1,2,3,4,5)).foreach(x => acc += x)
scala> acc.value

求和的結(jié)果為15+15=30

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.1.3.2 廣播變量

廣播變量的做法很簡單:就是不把副本變量分發(fā)到每個(gè) Task 中,而是將其分發(fā)到每個(gè) Executor,Executor 中的所有 Task 共享一個(gè)副本變量。Spark提供的Broadcast Variable,是只讀的。并且在每個(gè)節(jié)點(diǎn)上只會有一份副本,而不會為每個(gè)task都拷貝一份副本。因此其最大作用,就是減少變量到各個(gè)節(jié)點(diǎn)的網(wǎng)絡(luò)傳輸消耗,以及在各個(gè)節(jié)點(diǎn)上的內(nèi)存消耗。此外,spark自己內(nèi)部也使用了高效的廣播算法來減少網(wǎng)絡(luò)消耗。
調(diào)用SparkContext的broadcast()方法,來針對某個(gè)變量創(chuàng)建廣播變量。然后在算子的函數(shù)內(nèi),使用到廣播變量時(shí),每個(gè)節(jié)點(diǎn)只會拷貝一份副本了。每個(gè)節(jié)點(diǎn)可以使用廣播變量的value()方法獲取值。Broadcast是只讀的。

使用Broadcast變量的步驟:

  1. 調(diào)用SparkContext.broadcast方法創(chuàng)建一個(gè)Broadcast[T]對象。
    任何序列化的類型都可以這么實(shí)現(xiàn)。
  2. 通過value屬性訪問改對象的值(Java之中為value()方法)
  3. 變量只會被發(fā)送到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個(gè)值不會影響到別的節(jié)點(diǎn))
// 把一個(gè)數(shù)組定義為一個(gè)廣播變量
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
// 之后用到該數(shù)組時(shí)應(yīng)優(yōu)先使用廣播變量,而不是原值
sc.parallelize(broadcastVar.value).map(_ * 10).collect()

輸出的結(jié)果是對應(yīng)Array的元素乘以10,結(jié)果為(10,20,30,40,50)
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.2 HBase

3.2.1 創(chuàng)建表格

通過Hbase的shell命令創(chuàng)建HBase列式存儲數(shù)據(jù)表格,其中每一行的數(shù)據(jù)格式如下

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 在cluster1 上啟動hdfs
$ start-dfs.sh
// 在cluster1 上啟動habase
$ start-yarn.sh
// 在cluster1 上啟動habase
$ start-hbase.sh
$ hbase shell
// 創(chuàng)建表student, 列簇名分別為information,score,stat_score 
> create 'student','information','score','stat_score'

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// describe查看student表的基本信息
describe 'student'

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 使用put函數(shù)進(jìn)行數(shù)據(jù)的插入
// student: 表名
// 1001 學(xué)號
// information:name 列簇名:列名
// Tom 值
put 'student','1001','information:name','Tom'
put 'student','1001','information:sex','male'
put 'student','1001','information:age','20'

put 'student','1001','score:123001','99'
put 'student','1001','score:123002','98'
put 'student','1001','score:123003','97'

put 'student','1001','stat_score:sum','294'
put 'student','1001','stat_score:avg','98'

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

// 查看student表
scan 'student'

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.2.2 插入數(shù)據(jù)

請使用HBASE提供的API編程,實(shí)現(xiàn)向1)創(chuàng)建的HBase表中插入類似于下表中的數(shù)據(jù)(完整數(shù)據(jù)在附錄中),列簇3部分先用”NULL”補(bǔ)充。
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

主要思路:

  • 首先構(gòu)造ReadFile函數(shù)用于讀取每一行的字符串,并保存到lines數(shù)組中。
  • 然后allAll函數(shù)遍歷lines數(shù)組,并對每一行進(jìn)行分割,得到"學(xué)號|姓名|性別|年齡"
  • 最后調(diào)用addRecord函數(shù)進(jìn)行數(shù)據(jù)的插入

核心代碼如下:

    // 從文件中逐行讀取數(shù)據(jù)
    public static List<String> ReadFile(String filename) {
        List<String> lines = null;
        try {
            lines = Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return lines;
    }

    // 一次性添加所有的學(xué)生信息
    public static void addALL(String tableName,String[] columnFamilys) throws Exception {
        //  讀取data7.txt
        //  文件中的數(shù)據(jù)格式:學(xué)號|姓名|性別|年齡
        List<String> list7 = ReadFile("/home/hadoop/data7.txt");
        //寫入student information
        for (String line : list7) {
            String[] temp = line.split("\\s+");
            String Id = temp[0];
            String Name = temp[1];
            String Sex = temp[2];
            String Age = temp[3];
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"name", Name);
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"age",Age);
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"sex",Sex);
            //提示
            System.out.println("學(xué)生學(xué)號:" + Id + ",姓名:" + Name + ",性別:" + Sex + ",年齡:" + Age);
        }
        //讀取data8.txt
        //文件中的數(shù)據(jù)格式:學(xué)號|課程號|成績
        List<String> list8 = ReadFile("/home/hadoop/data8.txt");
        //寫入student score
        for (String line : list8) {
            String[] temp = line.split("\\s+");
            String Id = temp[0];
            String Cno = temp[1];
            String Score = temp[2];
            HBaseJavaAPI.addRecord(tableName,Id, columnFamilys[1], Cno,Score);
            //提示
            System.out.println("學(xué)生學(xué)號:" + Id +",課程課號:" + Cno + ",成績:" + Score);
        }
    }

// 調(diào)用
    public static void main(String[] args) {
        try {
            String tableName = "student";
            // 第一步:創(chuàng)建數(shù)據(jù)庫表:“student”
            String[] columnFamilys = { "information", "score","stat_score" };
            HBaseJavaAPI.createTable(tableName, columnFamilys);
            // 第二步:向數(shù)據(jù)表的添加數(shù)據(jù)
            // 添加第一行數(shù)據(jù)
            if (isExist(tableName)) {
                //從文件中讀取信息,添加全部的數(shù)據(jù)
                addALL(tableName,columnFamilys);
            } else {
                System.out.println(tableName + "此數(shù)據(jù)庫表不存在!");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
![在這里插入圖片描述](https://img-blog.csdnimg.cn/fe2646f199ae4b198928cbf9852832c3.png)

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.2.3 成績統(tǒng)計(jì)

請使用Spark編程實(shí)現(xiàn)對每個(gè)學(xué)生所有課程總成績與平均成績的統(tǒng)計(jì)聚合,并將聚合結(jié)果存儲到1)中創(chuàng)建的HBase表。(可以考慮將聚合后的結(jié)果先存入HDFS文件,再從HDFS文件載入數(shù)據(jù)到HBase,也可以使用API將聚合結(jié)果直接插入到HBase表。)

實(shí)驗(yàn)思路:成績統(tǒng)計(jì)的實(shí)驗(yàn)是通過java編程使用Spark API實(shí)現(xiàn)的 , 首先采用mapTopair函數(shù)構(gòu)造學(xué)號和成績的鍵值對。然后再使用reduceByKey函數(shù)進(jìn)行歸約,實(shí)現(xiàn)相同學(xué)號成績的累加。

主要參考JavaRDDLike (Spark 1.6.3 JavaDoc) (apache.org)

// 官網(wǎng)給的函數(shù)原型
<K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)

總成績的代碼如下:

                //進(jìn)行成績總和的聚合
                //輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
			  /*  
			  	其中PairFunction中
				第一個(gè)參數(shù)是call 函數(shù)中參數(shù)的輸入的類型,表示從文件中讀取的一行為一個(gè)RDD
				第二個(gè)參數(shù)和第三個(gè)參數(shù)分別表示生成的鍵值對的類型。分別對應(yīng)<學(xué)號,成績>
				call函數(shù)是代碼的邏輯部分,通過split函數(shù)以空格為分節(jié)符進(jìn)行分割,分割結(jié)果以數(shù)組的形式進(jìn)行存放。
				返回的結(jié)果是數(shù)組中的第一列(學(xué)號)、第三列(成績)。 
				其中成績要從字符串的類型轉(zhuǎn)換成整數(shù)類型,方便后續(xù)的計(jì)算	
				其中reduceByKey函數(shù)要對Function2函數(shù)進(jìn)行重寫。

				Function2中的第一個(gè)參數(shù)是返回值的類型,第二個(gè)和第三個(gè)參數(shù)是call方法的輸入類型。
				call方法就是實(shí)現(xiàn)相同key的tuple中value值的累加。
				返回值是<學(xué)號,總成績>
			  */
			
               JavaPairRDD<String,Integer> SumScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
                    @Override
                    public  Tuple2<String,Integer> call(String f){
                        String[] str=f.split(" ");// 分割
                        return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() { // key值相同的進(jìn)行規(guī)約
                    @Override
                    public Integer call(Integer a, Integer b) throws Exception { // 學(xué)號相同的成績求和
                        return a+b;
                    }
                });

由于每個(gè)學(xué)生都是只有3門課,因此平均成績的算法就是總成績除以3,只需要在上一步的代碼基礎(chǔ)上再重新添加一個(gè) mapTopair 函數(shù)實(shí)現(xiàn)映射,其中鍵值不變(學(xué)號),然后平均成績是之前總成績的1/3。

平均成績的代碼如下:

                //進(jìn)行平均成績的聚合
                //輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
                /*
                最后一個(gè)mapTopair 參數(shù)解釋:
                其中第一個(gè)參數(shù)Tuple2<String,Integer>是返回值的類型<學(xué)號,平均成績>
                第二個(gè)參數(shù)String是學(xué)號
                第三個(gè)參數(shù)Integer是總成績

                最后一個(gè)call函數(shù)的解釋:
                其中stringIntegerTuple2是上一步規(guī)約后得到的鍵值對
                返回值類型也是一個(gè)Tuple2的鍵值對,stringIntegerTuple2._1代表的是學(xué)號,stringIntegerTuple2._2/3代表的是平均成績
                */	 
                JavaPairRDD<String,Integer> AvgScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
                    @Override
                    public  Tuple2<String,Integer> call(String f){
                        String[] str=f.split(" "); // 分割
                        return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer a, Integer b) throws Exception {
                        return a+b; // 學(xué)號相同的成績求和
                    }
                }).mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return new Tuple2<>(stringIntegerTuple2._1,stringIntegerTuple2._2/3); // key=學(xué)號|value=平均成績
                    }
                });
# 編譯
$  javac -cp 
/usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: Hbase_java.java
# 運(yùn)行
$  java -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: Hbase_java

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

3.2.4 API編程

請使用Hbase提供的API編程,完成以下指定功能:

3.2.4.1 功能一

先添加一個(gè)學(xué)生用戶,再使用addRecord(String tableName, String row, String[] fields, String[] values);向表tableName、行row(用S_Name表示)和字符串?dāng)?shù)組files指定的單元格中添加對應(yīng)的數(shù)據(jù)values。其中fields中每個(gè)元素如果對應(yīng)的列族下還有相應(yīng)的列限定符的話,用“columnFamily:column”表示。例如,同時(shí)向“Math(123001)”、“Computer Science(123002)”、“English(123003)”三列添加成績時(shí),字符串?dāng)?shù)組fields為{“score: 123001”,”score;123002”,”score: 123003”},數(shù)組values存儲這三門課的成績。

核心代碼如下:

/*
主要采用了`put.add()` 函數(shù)實(shí)現(xiàn)單條數(shù)據(jù)的插入。
其中第一個(gè)參數(shù)表示列簇名
第二個(gè)參數(shù)表示列名
第三個(gè)參數(shù)表示值
*/

// 添加一條數(shù)據(jù)
    public static void addRecord(String tableName, String row,
                                 String columnFamily, String column, String value) throws Exception {
        HTable table = new HTable(conf, tableName);
        Put put = new Put(Bytes.toBytes(row));// 指定行
        // 參數(shù)分別:列族、列、值
        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
                Bytes.toBytes(value));
        table.put(put);
    }
3.2.4.2 功能二

scanColumn(String tableName, String column);瀏覽表tableName某一列的數(shù)據(jù),如果某一行記錄中該列數(shù)據(jù)不存在,則返回null。要求當(dāng)參數(shù)column為某一列族名稱時(shí),如果底下有若干個(gè)列限定符,則要列出每個(gè)列限定符代表的列的數(shù)據(jù);當(dāng)參數(shù)column為某一列具體名稱(例如“Score:Math”)時(shí),只需要列出該列的數(shù)據(jù)。

  • 主要思路
    • 首先用split函數(shù)對數(shù)據(jù)進(jìn)行分割,并把分隔結(jié)果保存在字符串?dāng)?shù)組中。
    • 通過判斷數(shù)組的長度來執(zhí)行不同的操作,當(dāng)長度為2時(shí),說明參數(shù)是某一列族名稱時(shí), 只返回該列族的所有值。否則返回該列簇下的所有數(shù)據(jù)
   public static void scanColumn(String tableName,String column) throws Exception{
        Table table = connection.getTable(TableName.valueOf(tableName));
        String[] str= column.split(":"); //將數(shù)據(jù)分割
       if(str.length == 2) { // 當(dāng)column為某一列族名稱時(shí), 只返回該列族的所有值
           ResultScanner scan = table.getScanner(str[0].getBytes(), str[1].getBytes());
           for (Result result : scan) { //遍歷該列族所有數(shù)據(jù)
               System.out.println(new String(result.getValue(str[0].getBytes(), str[1].getBytes())));
           }
           scan.close(); // 關(guān)閉掃描器
       }
        else{  // 否則返回列簇下的所有數(shù)據(jù)
            ResultScanner scan = table.getScanner(str[0].getBytes());
           for(Result result :scan){ // 首先遍歷列簇下的不同行
               Map<byte[],byte[]> myMap = result.getFamilyMap(Bytes.toBytes(column));
               ArrayList<String> cols = new ArrayList<String>();
               for(Map.Entry<byte[],byte[]> entry:myMap.entrySet()){
                   cols.add(Bytes.toString(entry.getKey()));
               }
               for(String st :cols){ // 然后遍歷該列簇下的不同列族,并打印輸出結(jié)果
                   System.out.print(st+ " : "+ new String(result.getValue(column.getBytes(),st.getBytes()))+" ~~~ ");
               }
               System.out.println();
           }
           scan.close();
       }
   }
3.2.4.3 功能三

deleteRow(String tableName, String row);刪除表tableName中row指定的行的記錄。

主要思路:
采用Delete函數(shù)完成單條數(shù)據(jù)的刪除,其中參數(shù)row是學(xué)號值

    // 刪除一條(行)數(shù)據(jù)
    public static void deleteRow(String tableName, String row) throws Exception {
        HTable table = new HTable(conf, tableName);
        Delete del = new Delete(Bytes.toBytes(row));
        table.delete(del);
    }
3.2.5 測試結(jié)果

運(yùn)行如下命令,對代碼進(jìn)行編譯測試

cd ~/lab3
// 編譯運(yùn)行
javac -cp /usr/local/hbase-1.2.6/lib/*: HBaseJavaAPI.java
java -cp /usr/local/hbase-1.2.6/lib/*: HBaseJavaAPI
  • 創(chuàng)建表
    成功創(chuàng)建student表
    Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • 獲取表中的數(shù)據(jù)

    可以根據(jù)學(xué)號獲取單個(gè)學(xué)生的數(shù)據(jù),也可以獲取所有學(xué)生的數(shù)據(jù)。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

獲取列簇和列族的值。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • 刪除表中的數(shù)據(jù)

    如下圖所示,可以根據(jù)學(xué)號刪除一條記錄。最后也可以刪除整個(gè)數(shù)據(jù)表。
    Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

4. 踩坑記錄

  • Debug1

【問題描述】 啟動spark-shell的時(shí)候會報(bào)如下錯(cuò)誤not found: value sqlContext

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

【問題背景】在網(wǎng)上查了很多資料,翻遍了stackoverflow,能嘗試的方法都試過了,但都沒有奏效。因此我又仔細(xì)的查看了報(bào)錯(cuò)的信息,發(fā)現(xiàn)它原來是連接不到cluster2上安裝的數(shù)據(jù)庫。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

【解決方案】在啟動spark前,現(xiàn)在cluster2上開啟mysql服務(wù),然后終于成功解決了這個(gè)報(bào)錯(cuò)!

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • Debug2

【問題描述】報(bào)錯(cuò)illegal character '\u00a0'

【問題背景】去網(wǎng)上查了下\u00a0 的含義,發(fā)現(xiàn)是空格的意思,因此推測應(yīng)該是有不合法的空格。

【解決方案】刪去等號后的第一個(gè)空格,成功解決。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • Debug3

【問題描述】在編譯運(yùn)行WordCount程序時(shí),報(bào)錯(cuò)Permission denied

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

【問題背景】因?yàn)槲矣玫氖莤ftp傳輸文件,然后將本地?cái)?shù)據(jù)文件上傳到clster1后,默認(rèn)所有者是root,因此導(dǎo)致權(quán)限出錯(cuò)。

【解決方案】運(yùn)行如下命令,修改所有者為hadoop用戶,然后再次編譯成功。

su root
chown -R hadoop:hadoop /home/hadoop/
  • Debug4

【問題描述】在編譯HBaseJavaAPI時(shí)報(bào)錯(cuò)如下:
Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

【問題背景】原先在編寫代碼,得到行號采用的方式是rowKV.scanColumn() 會報(bào)錯(cuò)

【解決方案】在網(wǎng)上查了一些資料,采用了另一種方式來獲取行號,修改代碼如下:

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");

然后重新編譯運(yùn)行,成功解決該問題。

  • Debug5

【問題描述】在用javac編譯代碼時(shí)報(bào)錯(cuò)如下no source files

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

【解決方案】在小組同學(xué)的熱心幫助下,我發(fā)現(xiàn)是我在HBaseJavaAPI前少寫了一個(gè)空格。長記性,下次不會這么粗心了。

javac -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: HBaseJavaAPI.java
java -cp /usr/local/hbase-1.2.6/lib/*:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/*: HBaseJavaAPI
  • Debug6

【問題描述】由于一開始一開始在本地調(diào)試maven項(xiàng)目時(shí),導(dǎo)入jar包的時(shí)間非常慢。

【問題背景】在網(wǎng)上查資料應(yīng)該是網(wǎng)速的問題,而且我檢查過我已經(jīng)配置過了阿里云鏡像,按理說不會這么慢。

【解決方案】后來,我檢查一下發(fā)現(xiàn)原來是該項(xiàng)目的marven沒有配置,導(dǎo)入到本地配置的setting.xml文件后,1分鐘就下載完了所需的所有依賴。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

  • Debug7

【問題描述】在本地編譯成功的代碼放到linux上運(yùn)行就會報(bào)錯(cuò)。

【問題背景】在同組同學(xué)的提示下,我發(fā)現(xiàn)是因?yàn)镴ava版本不一致造成的。本地maven默認(rèn)配置是java1.8 但是我們的實(shí)驗(yàn)環(huán)境是java1.7。

【解決方案】修改apache-maven-3.8.4\conf目錄下的setting.xml文件中的JDK的版本修改為1.7, 修改完成后,由于版本不一致,還需要對代碼進(jìn)行進(jìn)一步的修改。

  <profiles>
    <profile>     
      <id>JDK-1.7</id>       
      <activation>       
          <activeByDefault>true</activeByDefault>       
          <jdk>1.7</jdk>       
      </activation>       
      <properties>       
          <maven.compiler.source>1.7</maven.compiler.source>       
          <maven.compiler.target>1.7</maven.compiler.target>       
          <maven.compiler.compilerVersion>1.7</maven.compiler.compilerVersion>       
      </properties>
  </profile>
  • Debug8

【問題描述】由于java的版本不同因此在用spark api編程時(shí),寫法法不一樣。

【解決方案】在使用mapToPair函數(shù)時(shí),java7和java8 有不同的寫法。以下舉一個(gè)簡單的例子:把數(shù)字類型轉(zhuǎn)換成<奇數(shù)/偶數(shù),數(shù)字> 的類型。

Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)

//java8
JavaPairRDD <String,Integer> pairRDD=intRDD.mapToPair(i->(i%2==0)? new Tuple2<String,Integer>("even",i):new Tuple2<String,Integer>("odd",i));

如果用java8的話,一行代碼使用匿名函數(shù)就可以搞定,但是在java7中要復(fù)雜的多。需要重寫PairFunction 函數(shù), 代碼的邏輯放在call 函數(shù)中實(shí)現(xiàn)。

// Java7
JavaPairRDD <String,Integer> pairRDD=intRDD.mapToPair(new PairFunction<Integer,String, Integer>(){
    @Override
    public Tuple2<String,Integer> call (Integer i) throws Exception{
        return (i%2 ==0)? new Tuple2<String,Integer>("even",i):new Tuple2<String,Integer>("odd",i);
    }
});

除此之外,還有reduceBykey函數(shù)也有變化,不能使用匿名函數(shù),而是要重寫Function2中的call方法實(shí)現(xiàn)

  • Debug9

    【問題描述】當(dāng)采用集群模式執(zhí)行時(shí),rdd.foreach(println)語句無法打印元素

    【問題背景】當(dāng)采用 Local 模式在單機(jī)上執(zhí)行時(shí),rdd.foreach(println)語句會打印出一個(gè)RDD中的所有元素。但是,當(dāng)采用集群模式執(zhí)行時(shí),在Worker節(jié)點(diǎn)上執(zhí)行打印語句是輸出到Worker節(jié)點(diǎn)的stdout中,而不是輸出到任務(wù)控制節(jié)點(diǎn)Driver中,因此,任務(wù)控制節(jié)點(diǎn)Driver中的stdout是不會顯示打印語句的這些輸出內(nèi)容的
    【解決方案】為了能夠把所有 Worker 節(jié)點(diǎn)上的打印輸出信息也顯示到Driver中,就需要使用collect()方法。例如,rdd.collect().foreach(println)。

5. 心得體會

這次實(shí)驗(yàn)比上次實(shí)驗(yàn)難度要大一點(diǎn),前前后后花了很多的時(shí)間去做。這次實(shí)驗(yàn)感覺收獲最大的就是自己在編程的時(shí)候,一開始在csdn上找資料,但是講的都不是很全面。最后逼著自己讀了官方的英文文檔,以及閱讀了英文版《Apache Spark 2.x for java Developers》的部分章節(jié)。雖然一開始讀起來挺費(fèi)勁的,但是讀完之后確實(shí)對于整個(gè)原理和代碼的理解會清晰很多,覺得也不是特別的難。除此之外就是,在一開始走了很多彎路,比如說嘗試用sbt打包,但是后來發(fā)現(xiàn)虛擬機(jī)內(nèi)存太小無法安裝。后來受到小組同學(xué)的啟發(fā),覺得可以先在本地sbt打包再傳上去,接下來還會再嘗試下這種方法。最后還是十分感謝小組同學(xué)在本次實(shí)驗(yàn)過程中對我的幫助!

6. 附錄

6.1 實(shí)驗(yàn)數(shù)據(jù)

實(shí)驗(yàn)可以使用“附件2-實(shí)驗(yàn)數(shù)據(jù)”文件夾中的附錄實(shí)驗(yàn)數(shù)據(jù),也可使用自己生成的數(shù)據(jù)。具體數(shù)據(jù)如下:

附錄實(shí)驗(yàn)數(shù)據(jù)五:SPARK編程一(3.1.1)使用的CSV數(shù)據(jù)文件;

附錄實(shí)驗(yàn)數(shù)據(jù)六:SPARK編程二(3.1.2)使用的數(shù)據(jù)文本;

附錄實(shí)驗(yàn)數(shù)據(jù)七:HBase編程(3.2.2)使用的學(xué)生表數(shù)據(jù);

附錄實(shí)驗(yàn)數(shù)據(jù)八:HBase編程(3.2.2)及SPARK編程三(3.2.3)使用的選課表數(shù)據(jù);文章來源地址http://www.zghlxwxcb.cn/news/detail-451016.html

6.2 實(shí)驗(yàn)源碼

6.2.1 WordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.collection.Map
object WordCount {
    def main(args: Array[String]) {
        val inputFile =  "/home/hadoop/data/test6.txt"  // 原始數(shù)據(jù)文件
        val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
	conf.set("spark.testing.memory", "500000000") // 設(shè)置配置信息
        val sc = new SparkContext(conf)
                val textFile = sc.textFile(inputFile) // 讀取數(shù)據(jù)
                val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  // 文本分割并計(jì)數(shù),格式為(word,number)
                wordCount.foreach(println) // 打印wordCount
		wordCount.saveAsTextFile("/home/hadoop/lab3/output")  // 將結(jié)果保存到output文件夾下     
    }
}
6.2.2 HBaseJavaAPI
/*
 * 創(chuàng)建一個(gè)students表,并進(jìn)行相關(guān)操作
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import org.apache.commons.math3.util.Pair;
public class HBaseJavaAPI {
    // 聲明靜態(tài)配置
    private static Configuration conf = null;

    static {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.56.121");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

    //判斷表是否存在
    private static boolean isExist(String tableName) throws IOException {
        HBaseAdmin hAdmin = new HBaseAdmin(conf);
        return hAdmin.tableExists(tableName);
    }

    // 創(chuàng)建數(shù)據(jù)庫表
    public static void createTable(String tableName, String[] columnFamilys)
            throws Exception {
        // 新建一個(gè)數(shù)據(jù)庫管理員
        HBaseAdmin hAdmin = new HBaseAdmin(conf);
        if (hAdmin.tableExists(tableName)) {
            System.out.println("表 "+tableName+" 已存在!");
            //System.exit(0);
        } else {
            // 新建一個(gè)students表的描述
            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
            // 在描述里添加列族
            for (String columnFamily : columnFamilys) {
                tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            }
            // 根據(jù)配置好的描述建表
            hAdmin.createTable(tableDesc);
            System.out.println("創(chuàng)建表 "+tableName+" 成功!");
        }
    }

    // 刪除數(shù)據(jù)庫表
    public static void deleteTable(String tableName) throws Exception {
        // 新建一個(gè)數(shù)據(jù)庫管理員
        HBaseAdmin hAdmin = new HBaseAdmin(conf);
        if (hAdmin.tableExists(tableName)) {
            // 關(guān)閉一個(gè)表
            hAdmin.disableTable(tableName);
            hAdmin.deleteTable(tableName);
            System.out.println("刪除表 "+tableName+" 成功!");
        } else {
            System.out.println("刪除的表 "+tableName+" 不存在!");
            System.exit(0);
        }
    }

    // 添加一條數(shù)據(jù)
    public static void addRecord(String tableName, String row,
                                 String columnFamily, String column, String value) throws Exception {
        HTable table = new HTable(conf, tableName);
        Put put = new Put(Bytes.toBytes(row));// 指定行
        // 參數(shù)分別:列族、列、值
        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
                Bytes.toBytes(value));
        table.put(put);
    }

    // 從文件中逐行讀取數(shù)據(jù)
    public static List<String> ReadFile(String filename) {
        List<String> lines = null;
        try {
            lines = Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return lines;
    }

    // 一次性添加所有的學(xué)生信息
    public static void addALL(String tableName,String[] columnFamilys) throws Exception {
        //  讀取data7.txt
        //  文件中的數(shù)據(jù)格式:學(xué)號|姓名|性別|年齡
        List<String> list7 = ReadFile("/home/hadoop/data/data7.txt");
        //寫入student information
        for (String line : list7) {
            String[] temp = line.split("\\s+");
            String Id = temp[0];
            String Name = temp[1];
            String Sex = temp[2];
            String Age = temp[3];
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"name", Name);
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"age",Age);
            HBaseJavaAPI.addRecord(tableName, Id, columnFamilys[0],"sex",Sex);
            //提示
            System.out.println("學(xué)生學(xué)號:" + Id + ",姓名:" + Name + ",性別:" + Sex + ",年齡:" + Age);
        }
        //讀取data8.txt
        //文件中的數(shù)據(jù)格式:學(xué)號|課程號|成績
        List<String> list8 = ReadFile("/home/hadoop/data/data8.txt");
        //寫入student score
        for (String line : list8) {
            String[] temp = line.split("\\s+");
            String Id = temp[0];
            String Cno = temp[1];
            String Score = temp[2];
            HBaseJavaAPI.addRecord(tableName,Id, columnFamilys[1], Cno,Score);
            //提示
            System.out.println("學(xué)生學(xué)號:" + Id +",課程課號:" + Cno + ",成績:" + Score);
        }
    }

    // 刪除一條(行)數(shù)據(jù)
    public static void deleteRow(String tableName, String row) throws Exception {
        HTable table = new HTable(conf, tableName);
        Delete del = new Delete(Bytes.toBytes(row));
        table.delete(del);
    }

    // 刪除多條數(shù)據(jù)
    public static void delMultiRows(String tableName, String[] rows)
            throws Exception {
        HTable table = new HTable(conf, tableName);
        List<Delete> delList = new ArrayList<Delete>();
        for (String row : rows) {
            Delete del = new Delete(Bytes.toBytes(row));
            delList.add(del);
        }
        table.delete(delList);
    }

    // 獲取一行的數(shù)據(jù)
    public static void scanColumn(String tableName, String row) throws Exception {
        HTable table = new HTable(conf, tableName);
        Get get = new Get(Bytes.toBytes(row));
        Result result = table.get(get);
        // 輸出結(jié)果,raw方法返回所有keyvalue數(shù)組
        for (KeyValue rowKV : result.raw()) {
            System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");
            System.out.print("時(shí)間戳:" + rowKV.getTimestamp() + " ");
            System.out.print("列族名:" + new String(rowKV.getFamily()) + " ");
            System.out.print("列名:" + new String(rowKV.getQualifier()) + " ");
            System.out.println("值:" + new String(rowKV.getValue()));
        }
    }
    
    // 獲取一列的數(shù)據(jù)
   public static void scanColumn(String tableName,String column) throws Exception{
        Table table = connection.getTable(TableName.valueOf(tableName));
        String[] str= column.split(":"); //將數(shù)據(jù)分割
       if(str.length == 2) { // 當(dāng)column為某一列族名稱時(shí), 只返回該列族的所有值
           ResultScanner scan = table.getScanner(str[0].getBytes(), str[1].getBytes());
           for (Result result : scan) {
               System.out.println(new String(result.getValue(str[0].getBytes(), str[1].getBytes())));
           }
           scan.close();
       }
        else{  // 否則返回列族下的所有數(shù)據(jù)
            ResultScanner scan = table.getScanner(str[0].getBytes());
           for(Result result :scan){
               Map<byte[],byte[]> myMap = result.getFamilyMap(Bytes.toBytes(column));
               ArrayList<String> cols = new ArrayList<String>();
               for(Map.Entry<byte[],byte[]> entry:myMap.entrySet()){
                   cols.add(Bytes.toString(entry.getKey()));
               }
               for(String st :cols){
                   System.out.print(st+ " : "+ new String(result.getValue(column.getBytes(),st.getBytes()))+" ~~~ ");
               }
               System.out.println();
           }
           scan.close();
       }
   }
    // 獲取所有數(shù)據(jù)
    public static void getAllRows(String tableName) throws Exception {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        ResultScanner results = table.getScanner(scan);
        // 輸出結(jié)果
        for (Result result : results) {
            for (KeyValue rowKV : result.raw()) {
                System.out.print("行名:" + new String(CellUtil.cloneRow(rowKV)) + " ");
                System.out.print("時(shí)間戳:" + rowKV.getTimestamp() + " ");
                System.out.print("列族名:" + new String(rowKV.getFamily()) + " ");
                System.out
                        .print("列名:" + new String(rowKV.getQualifier()) + " ");
                System.out.println("值:" + new String(rowKV.getValue()));
            }
        }
    }

    // 主函數(shù)
    public static void main(String[] args) {
        try {
            String tableName = "student";
            // 第一步:創(chuàng)建數(shù)據(jù)庫表:“student”
            String[] columnFamilys = { "information", "score","stat_score" };
            HBaseJavaAPI.createTable(tableName, columnFamilys);
            // 第二步:向數(shù)據(jù)表的添加數(shù)據(jù)
            // 添加第一行數(shù)據(jù)
            if (isExist(tableName)) {
                //添加所有的數(shù)據(jù)
                addALL(tableName,columnFamilys);

                //獲取一條數(shù)據(jù)
                System.out.println("-------------獲取學(xué)號為2015002學(xué)生的數(shù)據(jù)--------------");
                HBaseJavaAPI.scanColumn(tableName, "2015002");
                //獲取所有數(shù)據(jù)
                System.out.println("----------------獲取所有數(shù)據(jù)------------------------");
                HBaseJavaAPI.getAllRows(tableName);
                // 載入配置文件
                SparkConf conf=new SparkConf().setAppName("sparkclient").setMaster("local[2]");
                conf.set("spark.testing.memory","2147480000");
                JavaSparkContext sc=new JavaSparkContext(conf);

                //讀入數(shù)據(jù)
                JavaRDD<String> File8=sc.textFile("/home/hadoop/data/data8.txt");
                File8.persist(StorageLevel.MEMORY_AND_DISK());

                //進(jìn)行成績總和的聚合
                //輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
                JavaPairRDD<String,Integer> SumScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
                    @Override
                    public  Tuple2<String,Integer> call(String f){
                        String[] str=f.split(" ");// 分割
                        return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() { // key值相同的進(jìn)行規(guī)約
                    @Override
                    public Integer call(Integer a, Integer b) throws Exception { // 學(xué)號相同的成績求和
                        return a+b;
                    }
                });

                //進(jìn)行平均成績的聚合
                //輸入數(shù)據(jù)類型:學(xué)號string|課程號string|成績integer
                JavaPairRDD<String,Integer> AvgScore=File8.mapToPair(new PairFunction<String, String, Integer>(){
                    @Override
                    public  Tuple2<String,Integer> call(String f){
                        String[] str=f.split(" "); // 分割
                        return new Tuple2<>(str[0],Integer.parseInt(str[2])); // 輸出類型:key=學(xué)號|value=成績
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer a, Integer b) throws Exception {
                        return a+b; // 學(xué)號相同的成績求和
                    }
                }).mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return new Tuple2<>(stringIntegerTuple2._1,stringIntegerTuple2._2/3); // key=學(xué)號|value=平均成績
                    }
                });

                //迭代輸入
                for (Tuple2<String, Integer> stringIntegerTuple2 : SumScore.collect()) {
               
                    HBaseJavaAPI.addRecord(tableName,stringIntegerTuple2._1, columnFamilys[2],"sum", stringIntegerTuple2._2.toString());
                    System.out.println("添加id: "+stringIntegerTuple2._1+",sum "+stringIntegerTuple2._2.toString());
                }
                for (Tuple2<String, Integer> stringIntegerTuple2 : AvgScore.collect()) {
                  
                    HBaseJavaAPI.addRecord(tableName,stringIntegerTuple2._1, columnFamilys[2],"avg", stringIntegerTuple2._2.toString());
                    System.out.println("添加id: "+stringIntegerTuple2._1+",sum "+stringIntegerTuple2._2.toString());
                }

                System.out.println("----------------獲取所有數(shù)據(jù)------------------------");
                HBaseJavaAPI.getAllRows(tableName);
         	   System.out.println("----------------獲取列簇------------------------");
               scanColumn("student","information");//只包含列簇名的測試
               System.out.println("----------------獲取列族------------------------");
               scanColumn("student","information:name");//進(jìn)行查看所有學(xué)生名字的測試
                //刪除一條數(shù)據(jù)
                //System.out.println("-------------刪除學(xué)號為2015003學(xué)生的數(shù)據(jù)--------------");
                //HBaseJavaAPI.deleteRow(tableName, "2015003");
                //HBaseJavaAPI.getAllRows(tableName);
                //刪除多條數(shù)據(jù)
                //System.out.println("--------------刪除多條數(shù)據(jù)------------------");
                //String rows[] = new String[] { "qingqing","xiaoxue" };
                //HBaseJavaAPI.delMultiRows(tableName, rows);
                //HBaseJavaAPI.getAllRows(tableName);
                //刪除數(shù)據(jù)庫
                //System.out.println("-----------------刪除數(shù)據(jù)庫表-----------------");
                //HBaseJavaAPI.deleteTable(tableName);
                //System.out.println("表"+tableName+"存在嗎?"+isExist(tableName));
            } else {
                System.out.println(tableName + "此數(shù)據(jù)庫表不存在!");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

到了這里,關(guān)于Spark + HBase 數(shù)據(jù)處理和存儲實(shí)驗(yàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • HDFS+ MapReduce 數(shù)據(jù)處理與存儲實(shí)驗(yàn)

    HDFS+ MapReduce 數(shù)據(jù)處理與存儲實(shí)驗(yàn)

    了解HDFS的基本特性及其適用場景; 熟悉HDFS Shell常用命令; 學(xué)習(xí)使用HDFS的Java API,編程實(shí)現(xiàn)HDFS常用功能; 了解MapReduce中“Map”和“Reduce”基本概念和主要思想; 掌握基本的MapReduce API編程,并實(shí)現(xiàn)合并、去重、排序等基本功能; 實(shí)驗(yàn)平臺:基于實(shí)驗(yàn)一搭建的虛擬機(jī)Hadoop大數(shù)

    2023年04月23日
    瀏覽(98)
  • 數(shù)據(jù)存儲和分布式計(jì)算的實(shí)際應(yīng)用:如何使用Spark和Flink進(jìn)行數(shù)據(jù)處理和分析

    作為一名人工智能專家,程序員和軟件架構(gòu)師,我經(jīng)常涉及到數(shù)據(jù)處理和分析。在當(dāng)前大數(shù)據(jù)和云計(jì)算的時(shí)代,分布式計(jì)算已經(jīng)成為了一個(gè)重要的技術(shù)方向。Spark和Flink是當(dāng)前比較流行的分布式計(jì)算框架,它們提供了強(qiáng)大的分布式計(jì)算和數(shù)據(jù)分析功能,為數(shù)據(jù)處理和分析提供了

    2024年02月16日
    瀏覽(92)
  • HBase的數(shù)據(jù)批量操作與事務(wù)處理

    HBase是一個(gè)分布式、可擴(kuò)展、高性能的列式存儲系統(tǒng),基于Google的Bigtable設(shè)計(jì)。它是Hadoop生態(tài)系統(tǒng)的一部分,可以與HDFS、MapReduce、ZooKeeper等組件集成。HBase具有高可用性、高可擴(kuò)展性和高性能等特點(diǎn),適用于大規(guī)模數(shù)據(jù)存儲和實(shí)時(shí)數(shù)據(jù)處理。 在大數(shù)據(jù)時(shí)代,數(shù)據(jù)的批量操作和

    2024年02月22日
    瀏覽(33)
  • 大數(shù)據(jù)處理:利用Spark進(jìn)行大規(guī)模數(shù)據(jù)處理

    大數(shù)據(jù)處理是指對大規(guī)模、高速、多源、多樣化的數(shù)據(jù)進(jìn)行處理、分析和挖掘的過程。隨著互聯(lián)網(wǎng)、人工智能、物聯(lián)網(wǎng)等領(lǐng)域的發(fā)展,大數(shù)據(jù)處理技術(shù)已經(jīng)成為當(dāng)今科技的核心技術(shù)之一。Apache Spark是一個(gè)開源的大數(shù)據(jù)處理框架,它可以處理批量數(shù)據(jù)和流式數(shù)據(jù),并提供了一系

    2024年03月22日
    瀏覽(22)
  • 【spark大數(shù)據(jù)】spark大數(shù)據(jù)處理技術(shù)入門項(xiàng)目--購物信息分析

    【spark大數(shù)據(jù)】spark大數(shù)據(jù)處理技術(shù)入門項(xiàng)目--購物信息分析

    購物信息分析基于spark 目錄 本案例中三個(gè)文案例中需要處理的文件為 order_goods.txt、products.txt 以及 orders.txt 三個(gè)文件,三個(gè)文件的說明如下 一、本實(shí)訓(xùn)項(xiàng)目針對實(shí)驗(yàn)數(shù)據(jù)主要完成了哪些處理? 二、Hadoop+Spark集群環(huán)境的搭建步驟有哪些?(只介紹完全分布式集群環(huán)境的搭建)

    2023年04月08日
    瀏覽(30)
  • spark 數(shù)據(jù)傾斜處理

    spark 數(shù)據(jù)傾斜處理

    1.?對多次使用的RDD進(jìn)行持久化 同常內(nèi)存夠的時(shí)候建議使用:MEMORY_ONLY 如果內(nèi)存不夠的時(shí)候使用 通常建議使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。 2. 使用高性能的算子 3. 廣播大變量 4. 使用Kryo優(yōu)化序列化性能 Kryo序列化器介紹: Spark支持使用Kryo序列化機(jī)制。Kryo序列化

    2024年02月11日
    瀏覽(25)
  • Spark大數(shù)據(jù)處理講課筆記4.1 Spark SQL概述、數(shù)據(jù)幀與數(shù)據(jù)集

    Spark大數(shù)據(jù)處理講課筆記4.1 Spark SQL概述、數(shù)據(jù)幀與數(shù)據(jù)集

    ? 目錄 零、本講學(xué)習(xí)目標(biāo) 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL結(jié)構(gòu) 1、Spark SQL架構(gòu)圖 2、Spark SQL三大過程 3、Spark SQL內(nèi)部五大組件 (四)Spark SQL工作流程 (五)Spark SQL主要特點(diǎn) 1、將SQL查詢與Spark應(yīng)用程序無縫組合 2、Spark SQL以相同方式連接多種數(shù)據(jù)

    2024年02月09日
    瀏覽(25)
  • Spark Streaming實(shí)時(shí)數(shù)據(jù)處理

    作者:禪與計(jì)算機(jī)程序設(shè)計(jì)藝術(shù) Apache Spark?Streaming是一個(gè)構(gòu)建在Apache Spark?之上的快速、微批次、容錯(cuò)的流式數(shù)據(jù)處理系統(tǒng),它可以對實(shí)時(shí)數(shù)據(jù)進(jìn)行高吞吐量、低延遲地處理。Spark Streaming既可用于流計(jì)算場景也可用于離線批處理場景,而且可以將結(jié)構(gòu)化或無結(jié)構(gòu)化數(shù)據(jù)源(如

    2024年02月06日
    瀏覽(27)
  • 大數(shù)據(jù)處理與分析-Spark

    大數(shù)據(jù)處理與分析-Spark

    (基于Hadoop的MapReduce的優(yōu)缺點(diǎn)) MapReduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于Hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架 MapReduce是一種用于處理大規(guī)模數(shù)據(jù)集的編程模型和計(jì)算框架。它將數(shù)據(jù)處理過程分為兩個(gè)主要階段:Map階段和Reduce階段。在Map階段,數(shù)據(jù)被分割為多

    2024年02月04日
    瀏覽(30)
  • Spark大數(shù)據(jù)處理講課筆記3.7 Spark任務(wù)調(diào)度

    Spark大數(shù)據(jù)處理講課筆記3.7 Spark任務(wù)調(diào)度

    理解DAG概念 了解Stage劃分 了解RDD在Spark中的運(yùn)行流程 DAG(Directed Acyclic Graph) 叫做 有向無環(huán)圖 ,Spark中的RDD通過一系列的轉(zhuǎn)換算子操作和行動算子操作形成了一個(gè)DAG。DAG是一種非常重要的圖論數(shù)據(jù)結(jié)構(gòu)。如果一個(gè)有向圖無法從任意頂點(diǎn)出發(fā)經(jīng)過若干條邊回到該點(diǎn),則這個(gè)圖就

    2024年02月09日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包