目錄
0. 相關(guān)文章鏈接
1.?閉包檢查
2. 序列化方法和屬性
3. Kryo 序列化框架?
4. 核心點(diǎn)總結(jié)
0. 相關(guān)文章鏈接
?Spark文章匯總?
1.?閉包檢查
????????從計(jì)算的角度, 算子以外的代碼都是在 Driver 端執(zhí)行, 算子里面的代碼都是在 Executor 端執(zhí)行。那么在 scala 的函數(shù)式編程中,就會(huì)導(dǎo)致算子內(nèi)經(jīng)常會(huì)用到算子外的數(shù)據(jù),這樣就形成了閉包的效果,如果使用的算子外的數(shù)據(jù)無(wú)法序列化,就意味著無(wú)法傳值給 Executor 端執(zhí)行,就會(huì)發(fā)生錯(cuò)誤,所以需要在執(zhí)行任務(wù)計(jì)算前,檢測(cè)閉包內(nèi)的對(duì)象是否可以進(jìn)行序列化,這個(gè)操作我們稱(chēng)之為閉包檢測(cè)。Scala2.12 版本后閉包編譯方式發(fā)生了改變。
2. 序列化方法和屬性
從計(jì)算的角度, 算子以外的代碼都是在 Driver 端執(zhí)行, 算子里面的代碼都是在 Executor端執(zhí)行:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* @ date: 2023/7/4
* @ author: yangshibiao
* @ desc: 項(xiàng)目描述
*/
object ModelTest {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkConf并設(shè)置App名稱(chēng)
val conf: SparkConf = new SparkConf().setAppName("ModelTest").setMaster("local[*]")
//2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.創(chuàng)建一個(gè)RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))
//3.1創(chuàng)建一個(gè)Search對(duì)象
val search: Search = new Search("hello")
//3.2 函數(shù)傳遞,打?。篍RROR Task not serializable
search.getMatch1(rdd).collect().foreach(println)
//3.3 屬性傳遞,運(yùn)行正常
// search.getMatch2(rdd).collect().foreach(println)
//4.關(guān)閉連接
sc.stop()
}
}
class Search(query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函數(shù)序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 屬性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val q: String = query
rdd.filter((x: String) => x.contains(q))
}
}
運(yùn)行第一個(gè)方法進(jìn)行函數(shù)傳遞時(shí),拋出?ERROR Task not serializable,如下圖所示:
運(yùn)行第二個(gè)方法進(jìn)行屬性傳遞時(shí),運(yùn)行正常,打印出正常結(jié)果,如下圖所示:
3. Kryo 序列化框架?
參考地址: https://github.com/EsotericSoftware/kryo?
????????Java 的序列化能夠序列化任何的類(lèi)。但是比較重(字節(jié)多),序列化后,對(duì)象的提交也比較大。Spark 出于性能的考慮,Spark2.0 開(kāi)始支持另外一種 Kryo 序列化機(jī)制。Kryo 速度是 Serializable 的 10 倍。當(dāng) RDD 在 Shuffle 數(shù)據(jù)的時(shí)候,簡(jiǎn)單數(shù)據(jù)類(lèi)型、數(shù)組和字符串類(lèi)型已經(jīng)在 Spark 內(nèi)部使用 Kryo 來(lái)序列化。
注意:即使使用 Kryo 序列化,也要繼承 Serializable 接口。?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* @ date: 2023/7/4
* @ author: yangshibiao
* @ desc: 項(xiàng)目描述
*/
object ModelTest {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkConf并設(shè)置App名稱(chēng)
val conf: SparkConf = new SparkConf()
.setAppName("ModelTest")
.setMaster("local[*]")
// 替換默認(rèn)的序列化機(jī)制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)需要使用 kryo 序列化的自定義類(lèi)
.registerKryoClasses(Array(classOf[Search]))
//2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.創(chuàng)建一個(gè)RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))
//3.1創(chuàng)建一個(gè)Search對(duì)象
val search: Search = new Search("hello")
//3.2 函數(shù)傳遞
search.getMatch1(rdd).collect().foreach(println)
//4.關(guān)閉連接
sc.stop()
}
}
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函數(shù)序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
}
運(yùn)行成功,如下圖所示:
4. 核心點(diǎn)總結(jié)
- 在Spark中,如果有類(lèi)進(jìn)行序列化,該類(lèi)需要繼承Serializable,如下圖所示:
- 在Spark中,Serializable比較重,所以可以使用更優(yōu)的Kryo框架,但是注意的是即使使用 Kryo 序列化,也要繼承 Serializable 接口(如果Spark包中的類(lèi)均已注冊(cè),但如果是自定義的類(lèi),還需要手動(dòng)注冊(cè)),如下圖示所示:
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-521226.html
注:其他Spark相關(guān)系列文章鏈接由此進(jìn) ->??Spark文章匯總?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-521226.html
到了這里,關(guān)于Spark(9):RDD的序列化的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!