一、需求
在ODPS上我們有如下數(shù)據(jù):
id | category_id | attr_id | attr_name | attr_value |
205348 | 10000046 | 2 | 最優(yōu)粘度 | ["0W-40"] |
205348 | 10000046 | 1 | 基礎(chǔ)油類(lèi)型 | ["全合成"] |
205348 | 10000046 | 3 | 級(jí)別 | ["BMW Longlife 01"] |
我們希望得到的結(jié)果如下:
(205348,?10000046, "基礎(chǔ)油類(lèi)型:全合成\n最優(yōu)粘度:0W-40\n級(jí)別:BMW Longlife 01\n")
需求解讀:
需要將(id, category_id)作為key,然后將(attr_id, attr_name, attr_value)進(jìn)行reduce操作,在reduce之后的數(shù)據(jù)中對(duì)attr_id進(jìn)行排序,再將attr_name和attr_value合并在一起。
二、reduce操作之字符串方式
這個(gè)是最簡(jiǎn)單的方式,大致思路如下:
首先,將(id, category_id)作為key。
然后,將attr_id、attr_name、attr_value合并成一個(gè)字符串a(chǎn)ttr_info:attr_id + "#" + attr_name + "#" + attr_value,然后attr_info再通過(guò)"&"進(jìn)行合并。
示例代碼如下:
xx.map{case(id, category_id, attr_id, attr_name, attr_value) => ((id, category_id), attr_id + "#" + attr_name + "#" + attr_value)}
.reduceByKey(_ + "&" + _, 100)
然后在接下來(lái)的流程中首先split("#")得到不同的attr信息,再通過(guò)split("&")得到不同的attr的列信息。這就要求attr_id,attr_name,attr_value中不能包含"#"和"&"字符串。
所以這種方式有缺陷,就是當(dāng)attr_id,attr_name,attr_value包含了"#"和"&"字符串時(shí)需要先replace一下,這樣就改變了原數(shù)據(jù)的值。
三、reduce操作之列表方式
這種方式相對(duì)復(fù)雜一點(diǎn),需要對(duì)輸入數(shù)據(jù)進(jìn)行預(yù)處理,但是邏輯清晰。
輸入數(shù)據(jù)中(id, category_id)是key保持不變,(item_id, item_name, item_value)是一組tuple。
reduce操作會(huì)在同一個(gè)partition中,不同的partition之間進(jìn)行數(shù)據(jù)合并,這要求數(shù)據(jù)的輸入、輸出類(lèi)型保持不變。
我們的初步想法:將item_id, item_name, item_value分別放到3個(gè)列表中,合并時(shí)就是列表之間的合并,合并完畢后使用時(shí)只需要遍歷列表即可。
因?yàn)閞educe操作的輸入、輸出類(lèi)型不能變化,所以先放item_id, item_name, item_value初始化為一個(gè)列表,然后再進(jìn)行列表之間的合并。
示例代碼如下:
xx.map{case(id, category_id, attr_id, attr_name, attr_value) =>
val itemIdList = new ArrayList[Long]()
itemIdList.add(attr_id)
val itemNameList = new ArrayList[String]()
itemNameList.add(attr_name)
val itemValueList = new ArrayList[String]()
itemValueList.add(attr_value)
((id, category_id), (itemIdList, itemNameList, itemValueList))
}.reduceByKey((x, y) => {
val itemIdList = new ArrayList[Long]()
for(i <- 0 until x._1.size()){
itemIdList.add(x._1.get(i))
}
for(i <- 0 until y._1.size()){
itemIdList.add(y._1.get(i))
}
val itemNameList = new ArrayList[String]()
for(i <- 0 until x._2.size()){
itemNameList.add(x._2.get(i))
}
for(i <- 0 until y._2.size()){
itemNameList.add(y._2.get(i))
}
val itemValueList = new ArrayList[String]()
for(i <- 0 until x._3.size()){
itemValueList.add(x._3.get(i))
}
for(i <- 0 until y._3.size()){
itemValueList.add(y._3.get(i))
}
(itemIdList, itemNameList, itemValueList)
}, 100)
再簡(jiǎn)單一點(diǎn)如下示例:
?
carCaseRawInfo.map(x => {
val stepInfoList = new util.ArrayList[(Long, String, String, String)]()
stepInfoList.add((x._4, x._5, x._6, x._7))
((x._1, x._3, x._3), stepInfoList)
})
.reduceByKey((x, y) => {
val stepInfoList = new ArrayList[(Long, String, String, String)]()
for(i <- 0 until x.size()){
stepInfoList.add(x.get(i))
}
for(i <- 0 until y.size()){
stepInfoList.add(y.get(i))
}
stepInfoList
}, GlobalConfig.DEFAULT_PARTITIONS_NUM)
四、reduce之partition屬性
首先提一下Shuffle過(guò)程,它的本意是洗牌、混亂的意思,類(lèi)似于java中的Colletions.shuffle(List)方法,它會(huì)隨機(jī)地打亂參數(shù)list里地元素順序。MapReduce的Shuffle過(guò)程大致可以理解成:數(shù)據(jù)從map task輸出到reduce task輸入的這段過(guò)程。
而partition過(guò)程:分割map每個(gè)節(jié)點(diǎn)的結(jié)果,按照key分別映射給不同的reduce,這個(gè)是可以自定義的。
通過(guò)設(shè)置reduce中的numPartitions值,會(huì)在reduce操作之后進(jìn)行repartition,避免數(shù)據(jù)不均衡堆在一個(gè)partition中。
五、reduceByKey和groupByKey的區(qū)別
從 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前對(duì)分區(qū)內(nèi)相同 key 的數(shù)據(jù)進(jìn)行預(yù)聚合(combine)功能,這樣會(huì)減少落盤(pán)的數(shù)據(jù)量,而 groupByKey 只是進(jìn)行分組,不存在數(shù)據(jù)量減少的問(wèn)題,reduceByKey 性能比較高。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-852610.html
從功能的角度: reduceByKey 其實(shí)包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場(chǎng)合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那么還是只能使用 groupByKey 。reduceByKey的分區(qū)內(nèi)和分區(qū)間的計(jì)算規(guī)則是一樣的文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-852610.html
到了這里,關(guān)于Spark的reduceByKey方法使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!