文章作者郵箱:yugongshiye@sina.cn? ? ? ? ? ? ? 地址:廣東惠州
?▲ 本章節(jié)目的
? 了解Spark的RDD結構;
??掌握Spark的RDD操作方法;
??掌握Spark的RDD常用變換方法、常用執(zhí)行方法;
一、Spark最核心的數據結構——RDD彈性分布式數據集
1. 概述
初學Spark時,把RDD看做是一個集合類型(類似于Array或List),用于存儲數據和操作數據,但RDD和普通集合的區(qū)別:
1. RDD有分區(qū)機制,可以分布式,并行的處理同一個RDD數據集,從而極大提高處理效率。分區(qū)數量由程序員自己定。
2. RDD由容錯機制。即數據丟失后,可以進行恢復。
2. 創(chuàng)建RDD方法
RDD就是帶有分區(qū)的集合類型
彈性分布式數據集(RDD),特點是可以并行操作,并且是容錯的。有兩種方法可以創(chuàng)建RDD:
1. 執(zhí)行Transform操作(變換操作)。即將一個普通集合(Array或List)轉變?yōu)橐粋€RDD。
例如:val r1 = sc.parallelize(a1,2)
或 val r1 = sc.makeRDD(List(1,2,3,4),2)
查看分區(qū)數量:r1.partitions.size。
查看分區(qū)數據:r1.glom.collect。
查看RDD整體數據:r1.collect。
2. 讀取外部存儲系統(tǒng)的數據集,如HDFS,HBase,或任何與Hadoop有關的數據源。
讀取Linux本地文件:val r4 = sc.textFile("file:home/1.txt",2)
讀取hds文件:val r5 = sc.textFile("hdfs://hadoop01:9000/1.txt",2)
3.?RDD入門示例
案例一:
并行化集合可以通過調用 Spark Context 的并行化方法被創(chuàng)建,這個方法是在驅動程序(Scala-Seq)中的現有集合上的。集合里的參數會被拷貝到可以并行執(zhí)行的分布式數據集里。如下例子就是如何創(chuàng)建一個包含了 1 到 5 的并行化集合。例如:
val data = Array(1, 2, 3, 4, 5)???????????????
val r1 = sc.parallelize(data)?????????
val r2 = sc.parallelize(data,2)??????
你可以這樣理解RDD:它是spark提供的一個特殊集合類。諸如普通的集合類型,如傳統(tǒng)的Array:(1,2,3,4,5)是一個整體,但轉換成RDD后,我們可以對數據進行Partition(分區(qū))處理,這樣做的目的就是為了分布式。
你可以讓這個RDD有兩個分區(qū),那么有可能是這個形式:RDD(1,2) (3,4)。
這樣設計的目的在于:可以進行分布式運算。
注:創(chuàng)建RDD的方式有多種,比如案例一中是基于一個基本的集合類型(Array)轉換而來,像parallelize這樣的方法還有很多,之后就會學到。此外,我們也可以在讀取數據集時就創(chuàng)建RDD。
案例二:
Spark能夠從任何基于Hadoop的存儲資源,創(chuàng)建分布式數據集。包括本地文件系統(tǒng)、HDFS、Cassandra、HBase、Amazon S3等等。Spark支持TEXT文件格式、SequenceFiles文件格式和其他Hadoop的輸入文件格式。
RDD的TEXT文件能夠通過SparkContext的方法創(chuàng)建。這個方法獲取一個文件的URI路徑(可以是本地路徑、或者是hdfs://,?s3n://等),然后當作一條數據集讀取其中內容。例如:
val distFile = sc.textFile("data.txt")
4. 查看RDD
scala>rdd.collect
收集rdd中的數據組成Array返回,此方法將會把分布式存儲的rdd中的數據集中到一臺機器中組建Array。
在生產環(huán)境下一定要慎用這個方法,容易內存溢出。
查看RDD的分區(qū)數量:
scala>rdd.partitions.size
查看RDD每個分區(qū)的元素:
scala>rdd.glom.collect
此方法會將每個分區(qū)的元素以Array形式返回。
5.?分區(qū)概念
在下圖所示, 一個RDD有item1~item25個數據,共5個分區(qū),分別在3臺機器上進行處理。
此外,spark并沒有原生的提供rdd的分區(qū)查看工具我們可以自己來寫一個。
案例三:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object su {
def debug[T: ClassTag](rdd: RDD[T]) = {
rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => {
val m = scala.collection.mutable.Map[Int, List[T]]()
var list = List[T]()
while (iter.hasNext) {
list = list :+ iter.next
}
m(i) = list
m.iterator
}).collect().foreach((x: Tuple2[Int, List[T]]) => {
val i = x._1
println(s"partition:[$i]")
x._2.foreach { println }
})
}
}
?二、RDD的操作
1. 概述
對于RDD的操作,總的來分有三種:
1. Transformation變化操作,特點是都是懶操作,調用后并不是馬上執(zhí)行,比如典型的textFile方法。此外,每當調用一次變化操作(懶操作),就會產生一個新的RDD。
2. Action執(zhí)行操作,特點是會觸發(fā)執(zhí)行。
3. Controller控制操作。
?
?
?
?
?2. 常用的變化方法(懶方法):
Transformation | Meaning |
map(func) |
Return a new distributed dataset formed by passing each element of the source through a function?func. 返回一個新的分布式數據集,通過函數應用于RDD每一個元素,該方法的參數是一個函數。 案例: map 將函數應用到rdd的每個元素中文章來源:http://www.zghlxwxcb.cn/news/detail-659333.html val rdd = sc.makeRDD(List(1,3,5,7,9))文章來源地址http://www.zghlxwxcb.cn/news/detail-659333.html |
到了這里,關于大數據課程K2——Spark的RDD彈性分布式數據集的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!