RDD的操作
函數分類
- *Transformation操作只是建立計算關系,而Action 操作才是實際的執(zhí)行者*。
![]()
- Transformation算子
- 轉換算子
- 操作之間不算的轉換,如果想看到結果通過action算子觸發(fā)
- Action算子
- 行動算子
- 觸發(fā)Job的執(zhí)行,能夠看到結果信息
Transformation函數
值類型valueType
map
flatMap
filter
mapValue
雙值類型DoubleValueType
- intersection
- union
- difference
- distinct
Key-Value值類型
- reduceByKey
- groupByKey
- sortByKey
- combineByKey是底層API
- foldBykey
- aggreateBykey
Action函數
- collect
- saveAsTextFile
- first
- take
- takeSample
- top
基礎練習[Wordcount快速演示]
Transformer算子
- 單value類型代碼
# -*- coding: utf-8 -*- # Program function:完成單Value類型RDD的轉換算子的演示 from pyspark import SparkConf,SparkContext import re ''' 分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行 分區(qū)間:有一些操作分區(qū)間做一些累加 ''' if __name__ == '__main__': # 1-創(chuàng)建SparkContext申請資源 conf = SparkConf().setAppName("mini").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN")#一般在工作中不這么寫,直接復制log4j文件 # 2-map操作 rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6]) rdd__map = rdd1.map(lambda x: x * 2) print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]] # 3-filter操作 print(rdd1.glom().collect()) print(rdd1.filter(lambda x: x > 3).glom().collect()) # 4-flatMap rdd2 = sc.parallelize([" hello you", "hello me "]) print(rdd2.flatMap(lambda word: re.split("\s+", word.strip())).collect()) # 5-groupBY x = sc.parallelize([1, 2, 3]) y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B') print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])] # 6-mapValue x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) def f(x): return len(x) print(x1.mapValues(f).collect())
- 雙value類型的代碼
# -*- coding: utf-8 -*- # Program function:完成單Value類型RDD的轉換算子的演示 from pyspark import SparkConf, SparkContext import re ''' 分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行 分區(qū)間:有一些操作分區(qū)間做一些累加 ''' if __name__ == '__main__': # 1-創(chuàng)建SparkContext申請資源 conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件 # 2-對兩個RDD求并集 rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]) Union_RDD = rdd1.union(rdd2) print(Union_RDD.collect()) print(rdd1.intersection(rdd2).collect()) print(rdd2.subtract(rdd1).collect()) # Return a new RDD containing the distinct elements in this RDD. print(Union_RDD.distinct().collect()) print(Union_RDD.distinct().glom().collect())
- key-Value算子
# -*- coding: utf-8 -*-
# Program function:完成單Value類型RDD的轉換算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''
if __name__ == '__main__':
# 1-創(chuàng)建SparkContext申請資源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件
# 2-key和value類型算子
# groupByKey
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
key1 = rdd3.groupByKey()
print("groupByKey:",key1.collect())
#groupByKey:
# [('b', <pyspark.resultiterable.ResultIterable object at 0x7f001c469c40),
# ('c', <pyspark.resultiterable.ResultIterable object at 0x7f001c469310),
# ('a', <pyspark.resultiterable.ResultIterable object at 0x7f001c469a00)]
print(key1.mapValues(list).collect())#需要通過mapValue獲取groupByKey的值
print(key1.mapValues(tuple).collect())
# reduceByKey
key2 = rdd3.reduceByKey(lambda x, y: x + y)
print(key2.collect())
# sortByKey
print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, 'b'), (1, 'c'), (1, 'a')]
# countByKey
print(rdd3.countByValue())#defaultdict(<class 'int', {('a', 1): 1, ('b', 2): 1, ('c', 1): 1, ('b', 3): 1})
Action算子
- 部分操作
# -*- coding: utf-8 -*- # Program function:完成單Value類型RDD的轉換算子的演示 from pyspark import SparkConf, SparkContext import re ''' 分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行 分區(qū)間:有一些操作分區(qū)間做一些累加 ''' if __name__ == '__main__': # 1-創(chuàng)建SparkContext申請資源 conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件 # 2-key和value類型算子 # groupByKey rdd1 = sc.parallelize([("a", 1), ("b", 2)]) rdd2 = sc.parallelize([("c", 1), ("b", 3)]) print(rdd1.first()) print(rdd1.take(2)) print(rdd1.top(2)) print(rdd1.collect()) rdd3 = sc.parallelize([1, 2, 3, 4, 5]) from operator import add from operator import mul print(rdd3.reduce(add)) print(rdd3.reduce(mul)) rdd4 = sc.parallelize(range(0, 10)) # 能否保證每次抽樣結果是一致的,使用seed隨機數種子 print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 34))
- 其他補充算子
# -*- coding: utf-8 -*- # Program function:完成單Value類型RDD的轉換算子的演示 from pyspark import SparkConf, SparkContext import re ''' 分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行 分區(qū)間:有一些操作分區(qū)間做一些累加 ''' def f(iterator): # 【1,2,3】 【4,5】 for x in iterator: # for x in 【1,2,3】 x=1,2,3 print 1.2.3 print(x) def f1(iterator): # 【1,2,3】 【4,5】 sum(1+2+3) sum(4+5) yield sum(iterator) if __name__ == '__main__': # 1-創(chuàng)建SparkContext申請資源 conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件 # 2-foreach-Applies a function to all elements of this RDD. rdd1 = sc.parallelize([("a", 1), ("b", 2)]) print(rdd1.glom().collect()) # def f(x):print(x) rdd1.foreach(lambda x: print(x)) # 3-foreachPartition--Applies a function to each partition of this RDD. # 從性能角度分析,按照分區(qū)并行比元素更加高效 rdd1.foreachPartition(f) # 4-map---按照元素進行轉換 rdd2 = sc.parallelize([1, 2, 3, 4]) print(rdd2.map(lambda x: x * 2).collect()) # 5-mapPartiton-----按照分區(qū)進行轉換 # Return a new RDD by applying a function to each partition of this RDD. print(rdd2.mapPartitions(f1).collect()) # [3, 7]
重要函數
基本函數
- 基礎的transformation
- 和action操作
分區(qū)操作函數
- mapPartition
- foreachPartition
重分區(qū)函數
# -*- coding: utf-8 -*- # Program function:完成單Value類型RDD的轉換算子的演示 from pyspark import SparkConf, SparkContext import re ''' 分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行 分區(qū)間:有一些操作分區(qū)間做一些累加 alt+6 可以調出來所有TODO, TODO是Python提供了預留功能的地方 ''' if __name__ == '__main__': #TODO: 1-創(chuàng)建SparkContext申請資源 conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件 #TODO: 2-執(zhí)行重分區(qū)函數--repartition rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3) print("partitions num:",rdd1.getNumPartitions()) print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]] print("repartition result:") #TODO: repartition可以增加分區(qū)也可以減少分區(qū),但是都會產生shuflle,如果減少分區(qū)的化建議使用coalesc避免發(fā)生shuffle rdd__repartition1 = rdd1.repartition(5) print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []] rdd__repartition2 = rdd1.repartition(2) print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]] #TODO: 3-減少分區(qū)--coalese print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]] print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]] print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []] # 結論:repartition默認調用的是coalese的shuffle為True的方法 # TODO: 4-PartitonBy,可以調整分區(qū),還可以調整分區(qū)器(一種hash分區(qū)器(一般打散數據),一種range分區(qū)器(排序拍好的)) # 此類專門針對RDD中數據類型為KeyValue對提供函數 # rdd五大特性中有第四個特點key-value分區(qū)器,默認是hashpartitioner分區(qū)器 rdd__map = rdd1.map(lambda x: (x, x)) print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3 print(rdd__map.partitionBy(2).glom().collect())
聚合函數
- 代碼:
# -*- coding: utf-8 -*-
# Program function:完成單Value類型RDD的轉換算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''
def addNum(x,y):
return x+y
if __name__ == '__main__':
# TODO: 1-創(chuàng)建SparkContext申請資源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件
# TODO: 2-使用reduce進行聚合計算
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
from operator import add
# 直接得到返回值-21
print(rdd1.reduce(add))
# TODO: 3-使用fold進行聚合計算
# 第一個參數zeroValue是初始值,會參與分區(qū)的計算
#第二個參數是執(zhí)行運算的operation
print(rdd1.fold(0, add)) # 21
print(rdd1.getNumPartitions()) # 3
print(rdd1.glom().collect())
print("fold result:", rdd1.fold(10, add))
# TODO: 3-使用aggreate進行聚合計算
# seqOp分區(qū)內的操作, combOp分區(qū)間的操作
print(rdd1.aggregate(0, add, add)) # 21
print(rdd1.glom().collect())
print("aggregate result:", rdd1.aggregate(1, add, add)) # aggregate result: 25
# 結論:fold是aggregate的簡化版本,fold分區(qū)內和分區(qū)間的函數是一致的
print("aggregate result:", rdd1.aggregate(1, addNum, addNum)) # aggregate result: 25
* byKey類的聚合函數 * **groupByKey----如何獲取value的數據?------答案:result.mapValue(list).collect** * **reduceByKey** * foldBykey
aggregateByKey
CombineByKey:這是一個更為底層實現的bykey 聚合算子,可以實現更多復雜功能
案例1:
# -*- coding: utf-8 -*-
# Program function:完成單Value類型RDD的轉換算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''
'''
對初始值進行操作
'''
def createCombiner(value): #('a',[1])
return [value]
# 這里的x=createCombiner得到的[value]結果
def mergeValue(x,y): #這里相同a的value=y=1
x.append(y)#('a', [1, 1]),('b', [1])
return x
def mergeCombiners(a,b):
a.extend(b)
return a
if __name__ == '__main__':
# TODO: 1-創(chuàng)建SparkContext申請資源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件
# TODO: 2-基礎數據處理
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
# [(a:[1,1]),(b,[1,1])]
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# 使用自定義集聚合函數組合每個鍵的元素的通用功能。
# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
# 對初始值進行操作
# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
# 對分區(qū)內的元素進行合并
# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
# 對分區(qū)間的元素進行合并
by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]
- 案例2
# -*- coding: utf-8 -*-
# Program function:完成單Value類型RDD的轉換算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''
'''
對初始值進行操作
[value,1],value指的是當前學生成績,1代表的是未來算一下一個學生考了幾次考試
("Fred", 88)----------[88,1]
'''
def createCombiner(value): #
return [value, 1]
'''
x代表的是 [value,1]值,x=[88,1]
y代表的相同key的value,比如("Fred", 95)的95,執(zhí)行分區(qū)內的累加
'''
def mergeValue(x, y):
return [x[0] + y, x[1] + 1]
'''
a = a[0] value,a[1] 幾次考試
'''
def mergeCombiners(a, b):
return [a[0] + b[0], a[1] + b[1]]
if __name__ == '__main__':
# TODO: 1-創(chuàng)建SparkContext申請資源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不這么寫,直接復制log4j文件
# TODO: 2-基礎數據處理
from operator import add
# 這里需要實現需求:求解一個學生的平均成績
x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)], 3)
print(x.glom().collect())
# 第一個分區(qū)("Fred", 88), ("Fred", 95)
# 第二個分區(qū)("Fred", 91), ("Wilma", 93),
# 第三個分區(qū)("Wilma", 95), ("Wilma", 98)
# reduceByKey
reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y)
print("reduceBykey:", reduce_by_key_rdd.collect()) # [('Fred', 274), ('Wilma', 286)]
# 如何求解平均成績?
# 使用自定義集聚合函數組合每個鍵的元素的通用功能。
# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
# 對初始值進行操作
# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
# 對分區(qū)內的元素進行合并
# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
# 對分區(qū)間的元素進行合并
combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(combine_by_key_rdd.collect()) # [('Fred', [274, 3]), ('Wilma', [286, 3])]
# 接下來平均值如何實現--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]=274,x[1][1]=3
print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
- 面試題:
- 關聯函數
AI副業(yè)實戰(zhàn)手冊:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及實戰(zhàn)案例,持續(xù)更新,實戰(zhàn)類小冊排名第一,做三個月掙不到錢找我退款,交個朋友的產品)
后記
??博客主頁:https://manor.blog.csdn.net文章來源:http://www.zghlxwxcb.cn/news/detail-767406.html
??歡迎點贊 ?? 收藏 ?留言 ?? 如有錯誤敬請指正!
??本文由 Maynor 原創(chuàng),首發(fā)于 CSDN博客??
??感覺這輩子,最深情綿長的注視,都給了手機?
??專欄持續(xù)更新,歡迎訂閱:https://blog.csdn.net/xianyu120/category_12453356.html文章來源地址http://www.zghlxwxcb.cn/news/detail-767406.html
到了這里,關于Python大數據之PySpark(六)RDD的操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!