国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Python大數據之PySpark(六)RDD的操作

這篇具有很好參考價值的文章主要介紹了Python大數據之PySpark(六)RDD的操作。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RDD的操作

函數分類

  • *Transformation操作只是建立計算關系,而Action 操作才是實際的執(zhí)行者*
  • pyspark/sql/dataframe.py
  • Transformation算子
  • 轉換算子
  • 操作之間不算的轉換,如果想看到結果通過action算子觸發(fā)
  • pyspark/sql/dataframe.py
  • Action算子
  • 行動算子
  • 觸發(fā)Job的執(zhí)行,能夠看到結果信息
  • pyspark/sql/dataframe.py

Transformation函數

  • 值類型valueType

  • map

  • flatMap

  • filter

  • mapValue

雙值類型DoubleValueType

  • intersection
  • union
  • difference
  • distinct

Key-Value值類型

  • reduceByKey
  • groupByKey
  • sortByKey
  • combineByKey是底層API
  • foldBykey
  • aggreateBykey

Action函數

  • collect
  • saveAsTextFile
  • first
  • take
  • takeSample
  • top

基礎練習[Wordcount快速演示]

Transformer算子

  • 單value類型代碼

# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf,SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''
if __name__ == '__main__':

# 1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")#一般在工作中不這么寫,直接復制log4j文件

# 2-map操作

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd__map = rdd1.map(lambda x: x * 2)
print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]]

# 3-filter操作

print(rdd1.glom().collect())
print(rdd1.filter(lambda x: x > 3).glom().collect())

# 4-flatMap

rdd2 = sc.parallelize(["  hello      you", "hello me  "])
print(rdd2.flatMap(lambda word: re.split("\s+", word.strip())).collect())

# 5-groupBY

x = sc.parallelize([1, 2, 3])
y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B')
print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])]

# 6-mapValue

x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
print(x1.mapValues(f).collect())

  • 雙value類型的代碼

# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''
if __name__ == '__main__':

# 1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# 2-對兩個RDD求并集

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
Union_RDD = rdd1.union(rdd2)
print(Union_RDD.collect())
print(rdd1.intersection(rdd2).collect())
print(rdd2.subtract(rdd1).collect())

# Return a new RDD containing the distinct elements in this RDD.

print(Union_RDD.distinct().collect())
print(Union_RDD.distinct().glom().collect())

  • key-Value算子
# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''
if __name__ == '__main__':

# 1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# 2-key和value類型算子

# groupByKey

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
key1 = rdd3.groupByKey()
print("groupByKey:",key1.collect())
#groupByKey:

# [('b', <pyspark.resultiterable.ResultIterable object at 0x7f001c469c40),

# ('c', <pyspark.resultiterable.ResultIterable object at 0x7f001c469310),

# ('a', <pyspark.resultiterable.ResultIterable object at 0x7f001c469a00)]

print(key1.mapValues(list).collect())#需要通過mapValue獲取groupByKey的值
print(key1.mapValues(tuple).collect())

# reduceByKey

key2 = rdd3.reduceByKey(lambda x, y: x + y)
print(key2.collect())

# sortByKey

print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, 'b'), (1, 'c'), (1, 'a')]

# countByKey

print(rdd3.countByValue())#defaultdict(<class 'int', {('a', 1): 1, ('b', 2): 1, ('c', 1): 1, ('b', 3): 1})

Action算子

  • 部分操作

# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''
if __name__ == '__main__':

# 1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# 2-key和value類型算子

# groupByKey

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])

print(rdd1.first())
print(rdd1.take(2))
print(rdd1.top(2))
print(rdd1.collect())

rdd3 = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
from operator import mul

print(rdd3.reduce(add))
print(rdd3.reduce(mul))

rdd4 = sc.parallelize(range(0, 10))

# 能否保證每次抽樣結果是一致的,使用seed隨機數種子

print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 34))

  • 其他補充算子

# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
'''


def f(iterator):  # 【1,2,3】 【4,5】
for x in iterator:  # for x in 【1,2,3】  x=1,2,3 print 1.2.3
print(x)


def f1(iterator):  # 【1,2,3】 【4,5】  sum(1+2+3) sum(4+5)
yield sum(iterator)


if __name__ == '__main__':

# 1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# 2-foreach-Applies a function to all elements of this RDD.

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
print(rdd1.glom().collect())

# def f(x):print(x)

rdd1.foreach(lambda x: print(x))

# 3-foreachPartition--Applies a function to each partition of this RDD.

# 從性能角度分析,按照分區(qū)并行比元素更加高效

rdd1.foreachPartition(f)

# 4-map---按照元素進行轉換

rdd2 = sc.parallelize([1, 2, 3, 4])
print(rdd2.map(lambda x: x * 2).collect())

# 5-mapPartiton-----按照分區(qū)進行轉換

# Return a new RDD by applying a function to each partition of this RDD.

print(rdd2.mapPartitions(f1).collect())  # [3, 7]

重要函數

  • pyspark/sql/dataframe.py

基本函數

  • 基礎的transformation
  • 和action操作

分區(qū)操作函數

  • mapPartition
  • foreachPartition

重分區(qū)函數


# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re
'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''
if __name__ == '__main__':
#TODO:  1-創(chuàng)建SparkContext申請資源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件
#TODO:   2-執(zhí)行重分區(qū)函數--repartition
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print("partitions num:",rdd1.getNumPartitions())
print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]]
print("repartition result:")
#TODO:   repartition可以增加分區(qū)也可以減少分區(qū),但是都會產生shuflle,如果減少分區(qū)的化建議使用coalesc避免發(fā)生shuffle
rdd__repartition1 = rdd1.repartition(5)
print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
rdd__repartition2 = rdd1.repartition(2)
print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]]
#TODO:   3-減少分區(qū)--coalese
print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]]
print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]]
print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]

# 結論:repartition默認調用的是coalese的shuffle為True的方法

# TODO:  4-PartitonBy,可以調整分區(qū),還可以調整分區(qū)器(一種hash分區(qū)器(一般打散數據),一種range分區(qū)器(排序拍好的))

# 此類專門針對RDD中數據類型為KeyValue對提供函數

# rdd五大特性中有第四個特點key-value分區(qū)器,默認是hashpartitioner分區(qū)器

rdd__map = rdd1.map(lambda x: (x, x))
print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3
print(rdd__map.partitionBy(2).glom().collect())

聚合函數

  • pyspark/sql/dataframe.py
  • 代碼:
# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''
def addNum(x,y):
return x+y
if __name__ == '__main__':

# TODO:  1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# TODO:   2-使用reduce進行聚合計算

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
from operator import add

# 直接得到返回值-21

print(rdd1.reduce(add))

# TODO: 3-使用fold進行聚合計算

# 第一個參數zeroValue是初始值,會參與分區(qū)的計算

 #第二個參數是執(zhí)行運算的operation

print(rdd1.fold(0, add))  # 21
print(rdd1.getNumPartitions())  # 3
print(rdd1.glom().collect())
print("fold result:", rdd1.fold(10, add))

# TODO: 3-使用aggreate進行聚合計算

# seqOp分區(qū)內的操作, combOp分區(qū)間的操作

print(rdd1.aggregate(0, add, add))  # 21
print(rdd1.glom().collect())
print("aggregate result:", rdd1.aggregate(1, add, add))  # aggregate result: 25

# 結論:fold是aggregate的簡化版本,fold分區(qū)內和分區(qū)間的函數是一致的

print("aggregate result:", rdd1.aggregate(1, addNum, addNum))  # aggregate result: 25

* byKey類的聚合函數

* **groupByKey----如何獲取value的數據?------答案:result.mapValue(list).collect**

* **reduceByKey**

* foldBykey

pyspark/sql/dataframe.py

  • aggregateByKey

  • CombineByKey:這是一個更為底層實現的bykey 聚合算子,可以實現更多復雜功能

  • pyspark/sql/dataframe.py

  • 案例1:

# -*- coding: utf-8 -*-
# Program function:完成單Value類型RDD的轉換算子的演示
from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''

'''
對初始值進行操作
'''
def createCombiner(value): #('a',[1])
return [value]

# 這里的x=createCombiner得到的[value]結果
def mergeValue(x,y): #這里相同a的value=y=1
x.append(y)#('a', [1, 1]),('b', [1])
return x

def mergeCombiners(a,b):
a.extend(b)
return a
if __name__ == '__main__':

# TODO:  1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# TODO:  2-基礎數據處理

from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

# [(a:[1,1]),(b,[1,1])]

print(sorted(rdd.groupByKey().mapValues(list).collect()))

# 使用自定義集聚合函數組合每個鍵的元素的通用功能。

# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

# 對初始值進行操作

# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)

# 對分區(qū)內的元素進行合并

# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)

# 對分區(qū)間的元素進行合并

by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]


  • 案例2
# -*- coding: utf-8 -*-

# Program function:完成單Value類型RDD的轉換算子的演示

from pyspark import SparkConf, SparkContext
import re

'''
分區(qū)內:一個rdd可以分為很多分區(qū),每個分區(qū)里面都是有大量元素,每個分區(qū)都需要線程執(zhí)行
分區(qū)間:有一些操作分區(qū)間做一些累加
alt+6 可以調出來所有TODO,
TODO是Python提供了預留功能的地方
'''

'''
對初始值進行操作
[value,1],value指的是當前學生成績,1代表的是未來算一下一個學生考了幾次考試
("Fred", 88)----------[88,1]
'''


def createCombiner(value):  #
return [value, 1]


'''
x代表的是 [value,1]值,x=[88,1]
y代表的相同key的value,比如("Fred", 95)的95,執(zhí)行分區(qū)內的累加
'''


def mergeValue(x, y):
return [x[0] + y, x[1] + 1]


'''
a = a[0] value,a[1] 幾次考試
'''


def mergeCombiners(a, b):
return [a[0] + b[0], a[1] + b[1]]


if __name__ == '__main__':

# TODO:  1-創(chuàng)建SparkContext申請資源

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不這么寫,直接復制log4j文件

# TODO:  2-基礎數據處理

from operator import add

# 這里需要實現需求:求解一個學生的平均成績

x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)], 3)
print(x.glom().collect())

# 第一個分區(qū)("Fred", 88), ("Fred", 95)

# 第二個分區(qū)("Fred", 91), ("Wilma", 93),

# 第三個分區(qū)("Wilma", 95), ("Wilma", 98)

# reduceByKey

reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y)
print("reduceBykey:", reduce_by_key_rdd.collect())  # [('Fred', 274), ('Wilma', 286)]

# 如何求解平均成績?

# 使用自定義集聚合函數組合每個鍵的元素的通用功能。

# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

# 對初始值進行操作

# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)

# 對分區(qū)內的元素進行合并

# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)

# 對分區(qū)間的元素進行合并

combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(combine_by_key_rdd.collect())  # [('Fred', [274, 3]), ('Wilma', [286, 3])]

# 接下來平均值如何實現--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]=274,x[1][1]=3

print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
  • 面試題:

pyspark/sql/dataframe.py

  • 關聯函數

AI副業(yè)實戰(zhàn)手冊:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及實戰(zhàn)案例,持續(xù)更新,實戰(zhàn)類小冊排名第一,做三個月掙不到錢找我退款,交個朋友的產品)

后記

??博客主頁:https://manor.blog.csdn.net

??歡迎點贊 ?? 收藏 ?留言 ?? 如有錯誤敬請指正!
??本文由 Maynor 原創(chuàng),首發(fā)于 CSDN博客??
??感覺這輩子,最深情綿長的注視,都給了手機?
??專欄持續(xù)更新,歡迎訂閱:https://blog.csdn.net/xianyu120/category_12453356.html文章來源地址http://www.zghlxwxcb.cn/news/detail-767406.html

到了這里,關于Python大數據之PySpark(六)RDD的操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • pyspark.sql.dataframe.DataFrame 怎么轉pandas DataFrame

    pyspark.sql.dataframe.DataFrame 怎么轉pandas DataFrame

    pyspark.sql.dataframe.DataFrame 怎么轉pandas DataFrame 要將 PySpark 的? pyspark.sql.dataframe.DataFrame ?轉換為 Pandas DataFrame,可以使用? toPandas() ?方法。以下是一個示例: 上面的代碼輸出 在上述示例中,我們首先使用 PySpark 創(chuàng)建了一個示例 DataFrame? df_spark 。然后,我們使用? toPandas() ?方法

    2024年03月20日
    瀏覽(32)
  • 【Python】PySpark 數據輸入 ① ( RDD 簡介 | RDD 中的數據存儲與計算 | Python 容器數據轉 RDD 對象 | 文件文件轉 RDD 對象 )

    【Python】PySpark 數據輸入 ① ( RDD 簡介 | RDD 中的數據存儲與計算 | Python 容器數據轉 RDD 對象 | 文件文件轉 RDD 對象 )

    RDD 英文全稱為 \\\" Resilient Distributed Datasets \\\" , 對應中文名稱 是 \\\" 彈性分布式數據集 \\\" ; Spark 是用于 處理大規(guī)模數據 的 分布式計算引擎 ; RDD 是 Spark 的基本數據單元 , 該 數據結構 是 只讀的 , 不可寫入更改 ; RDD 對象 是 通過 SparkContext 執(zhí)行環(huán)境入口對象 創(chuàng)建的 ; SparkContext 讀取數

    2024年02月14日
    瀏覽(18)
  • 【Python】PySpark 數據計算 ③ ( RDD#reduceByKey 函數概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 語法 | 代碼示例 )

    【Python】PySpark 數據計算 ③ ( RDD#reduceByKey 函數概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 語法 | 代碼示例 )

    RDD#reduceByKey 方法 是 PySpark 中 提供的計算方法 , 首先 , 對 鍵值對 KV 類型 RDD 對象 數據 中 相同 鍵 key 對應的 值 value 進行分組 , 然后 , 按照 開發(fā)者 提供的 算子 ( 邏輯 / 函數 ) 進行 聚合操作 ; 上面提到的 鍵值對 KV 型 的數據 , 指的是 二元元組 , 也就是 RDD 對象中存儲的數據是

    2024年02月14日
    瀏覽(26)
  • 【Python】PySpark 數據計算 ① ( RDD#map 方法 | RDD#map 語法 | 傳入普通函數 | 傳入 lambda 匿名函數 | 鏈式調用 )

    【Python】PySpark 數據計算 ① ( RDD#map 方法 | RDD#map 語法 | 傳入普通函數 | 傳入 lambda 匿名函數 | 鏈式調用 )

    在 PySpark 中 RDD 對象 提供了一種 數據計算方法 RDD#map 方法 ; 該 RDD#map 函數 可以對 RDD 數據中的每個元素應用一個函數 , 該 被應用的函數 , 可以將每個元素轉換為另一種類型 , 也可以針對 RDD 數據的 原始元素進行 指定操作 ; 計算完畢后 , 會返回一個新的 RDD 對象 ; map 方法 , 又

    2024年02月14日
    瀏覽(33)
  • pyspark學習_dataframe常用操作_01

    1. 創(chuàng)建DataFrame ? ?本文使用DataFrame通過讀取json文件獲取數據,代碼如下: ? ?? ?2.? DataFrame常見操作 ? ? ? 2.1 printSchema 2.2 show? 2.3 select 2.4 groupBy? 2.5 filter 2.6 sort 2.7 replace 2.8 alias 2.9 withColumn 2.10 foreach

    2024年01月25日
    瀏覽(40)
  • 大數據之PySpark的RDD介紹

    大數據之PySpark的RDD介紹

    之前的文章主要介紹Spark基礎知識,例如集群角色、Spark集群運行流程等,接下來會進一步討論Spark相對核心的知識,讓我們拭目以待,同時也期待各位的精彩留言! RDD稱為彈性分布式數據集,是Spark中最基本的數據抽象,其為一個不可變、可分區(qū)、元素可并行計算的集合;

    2024年02月03日
    瀏覽(19)
  • PySpark大數據教程:深入學習SparkCore的RDD持久化和Checkpoint

    PySpark大數據教程:深入學習SparkCore的RDD持久化和Checkpoint

    本教程詳細介紹了PySpark中SparkCore的RDD持久化和Checkpoint功能,重點講解了緩存和檢查點的作用、如何進行緩存、如何設置檢查點目錄以及它們之間的區(qū)別。還提供了join操作的示例和Spark算子補充知識。

    2024年02月08日
    瀏覽(58)
  • pyspark筆記:讀取 & 處理csv文件 (pyspark DataFrame)

    pyspark筆記:讀取 & 處理csv文件 (pyspark DataFrame)

    pyspark cmd上的命令 pyspark中是惰性操作,所有變換類操作都是延遲計算的,pyspark只是記錄了將要對數據集進行的操作 只有需要數據集將數據返回到 Driver 程序時(比如collect,count,show之類),所有已經記錄的變換操作才會執(zhí)行 注意讀取出來的格式是Pyspark DataFrame,不是DataFr

    2024年02月08日
    瀏覽(18)
  • PySpark基礎 —— RDD

    PySpark基礎 —— RDD

    1.查看Spark環(huán)境信息 2.創(chuàng)建RDD 創(chuàng)建RDD主要有兩種方式 第一種:textFile方法 第二種:parallelize方法 ?2.1.textFile方法 本地文件系統加載數據 ?2.2.parallelize方法 ?2.3.wholeTextFiles方法 Action動作算子/行動操作 1.collect 2.take ?3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    瀏覽(23)
  • PySpark之RDD的持久化

    PySpark之RDD的持久化

    當RDD被重復使用,或者計算該RDD比較容易出錯,而且需要消耗比較多的資源和時間的時候,我們就可以將該RDD緩存起來。 主要作用: 提升Spark程序的計算效率 注意事項: RDD的緩存可以存儲在內存或者是磁盤上,甚至可以存儲在Executor進程的堆外內存中。主要是放在內存中,因此

    2024年01月23日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包