使用Python語言開發(fā)Spark程序代碼
- Spark Standalone的PySpark的搭建----bin/pyspark --master spark://node1:7077
- Spark StandaloneHA的搭建—Master的單點(diǎn)故障(node1,node2),zk的leader選舉機(jī)制,1-2min還原
- 【scala版本的交互式界面】bin/spark-shell --master xxx
- 【python版本交互式界面】bin/pyspark --master xxx
- 【提交任務(wù)】bin/spark-submit --master xxxx
【學(xué)會(huì)配置】Windows的PySpark環(huán)境配置
- 1-安裝Andaconda
- 2-在Anaconda Prompt中安裝PySpark
- 3-執(zhí)行安裝
- 4-使用Pycharm構(gòu)建Project(準(zhǔn)備工作)
- 需要配置anaconda的環(huán)境變量–參考課件
- 需要配置hadoop3.3.0的安裝包,里面有winutils,防止pycharm寫代碼的過程中報(bào)錯(cuò)
補(bǔ)充:
PyCharm構(gòu)建Python project
- 項(xiàng)目規(guī)劃
- 項(xiàng)目名稱:Bigdata25-pyspark_3.1.2
![]()
- 模塊名稱:PySpark-SparkBase_3.1.2,PySpark-SparkCore_3.1.2,PySpark-SparkSQL_3.1.2
![]()
- 文件夾:
- main pyspark的代碼
- data 數(shù)據(jù)文件
- config 配置文件
- test 常見python測(cè)試代碼放在test中
應(yīng)用入口:SparkContext
- http://spark.apache.org/docs/latest/rdd-programming-guide.html
WordCount代碼實(shí)戰(zhàn)
需求:給你一個(gè)文本文件,統(tǒng)計(jì)出單詞的數(shù)量
算子:rdd的api的操作,就是算子,flatMap扁平化算子,map轉(zhuǎn)換算子
Transformation算子
Action算子
步驟:
1-首先創(chuàng)建SparkContext上下文環(huán)境
2-從外部文件數(shù)據(jù)源讀取數(shù)據(jù)
3-執(zhí)行flatmap執(zhí)行扁平化操作
4-執(zhí)行map轉(zhuǎn)化操作,得到(word,1)
5-reduceByKey將相同Key的Value數(shù)據(jù)累加操作
6-將結(jié)果輸出到文件系統(tǒng)或打印代碼:
# -*- coding: utf-8 -*- # Program function: Spark的第一個(gè)程序 # 1-思考:sparkconf和sparkcontext從哪里導(dǎo)保 # 2-如何理解算子?Spark中算子有2種, # 一種稱之為Transformation算子(flatMapRDD-mapRDD-reduceBykeyRDD), # 一種稱之為Action算子(輸出到控制臺(tái),或文件系統(tǒng)或hdfs),比如collect或saveAsTextFile都是Action算子 from pyspark import SparkConf,SparkContext if __name__ == '__main__': # 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 conf = SparkConf().setAppName("FirstSpark").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN")#日志輸出級(jí)別 # 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) fileRDD = sc.textFile("D:\BigData\PyWorkspace\Bigdata25-pyspark_3.1.2\PySpark-SparkBase_3.1.2\data\words.txt") # print(type(fileRDD))#<class 'pyspark.rdd.RDD'> # all the data is loaded into the driver's memory. # print(fileRDD.collect()) # ['hello you Spark Flink', 'hello me hello she Spark'] # 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) # print(type(flat_mapRDD)) # print(flat_mapRDD.collect()) #['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] # # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) # print(type(rdd_mapRDD))#<class 'pyspark.rdd.PipelinedRDD'> # print(rdd_mapRDD.collect()) # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] # 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) # print(type(resultRDD)) # print(resultRDD.collect()) # [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)] # 6 - 將結(jié)果輸出到文件系統(tǒng)或打印 resultRDD.saveAsTextFile("D:\BigData\PyWorkspace\Bigdata25-pyspark_3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") # 7-停止SparkContext sc.stop()#Shut down the SparkContext.
總結(jié):
TopK需求
需求:[(‘Spark’, 2), (‘Flink’, 1), (‘hello’, 3), (‘you’, 1), (‘me’, 1), (‘she’, 1)]
排序:[ (‘hello’, 3),(‘Spark’, 2),]
共識(shí):Spark核心或靈魂是rdd,spark的所有操作都是基于rdd的操作
代碼:
# -*- coding: utf-8 -*- # Program function: 針對(duì)于value單詞統(tǒng)計(jì)計(jì)數(shù)的排序 # 1-思考:sparkconf和sparkcontext從哪里導(dǎo)保 # 2-如何理解算子?Spark中算子有2種, # 一種稱之為Transformation算子(flatMapRDD-mapRDD-reduceBykeyRDD), # 一種稱之為Action算子(輸出到控制臺(tái),或文件系統(tǒng)或hdfs),比如collect或saveAsTextFile都是Action算子 from pyspark import SparkConf, SparkContext if __name__ == '__main__': # 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 conf = SparkConf().setAppName("FirstSpark").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 日志輸出級(jí)別 # 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) fileRDD = sc.textFile("D:\BigData\PyWorkspace\Bigdata25-pyspark_3.1.2\PySpark-SparkBase_3.1.2\data\words.txt") # print(type(fileRDD))#<class 'pyspark.rdd.RDD'> # all the data is loaded into the driver's memory. # print(fileRDD.collect()) # ['hello you Spark Flink', 'hello me hello she Spark'] # 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) # print(type(flat_mapRDD)) # print(flat_mapRDD.collect()) # ['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] # # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) # print(type(rdd_mapRDD))#<class 'pyspark.rdd.PipelinedRDD'> # print(rdd_mapRDD.collect()) # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] # 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) # print(type(resultRDD)) print(resultRDD.collect()) # [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)] # 6 針對(duì)于value單詞統(tǒng)計(jì)計(jì)數(shù)的排序 print("==============================sortBY=============================") print(resultRDD.sortBy(lambda x: x[1], ascending=False).take(3)) # [('hello', 3), ('Spark', 2), ('Flink', 1)] print(resultRDD.sortBy(lambda x: x[1], ascending=False).top(3, lambda x: x[1])) print("==============================sortBykey=============================") print(resultRDD.map(lambda x: (x[1], x[0])).collect()) # [(2, 'Spark'), (1, 'Flink'), (3, 'hello'), (1, 'you'), (1, 'me'), (1, 'she')] print(resultRDD.map(lambda x: (x[1], x[0])).sortByKey(False).take(3)) #[(3, 'hello'), (2, 'Spark'), (1, 'Flink')] # 7-停止SparkContext sc.stop() # Shut down the SparkContext.
- sortBy
- sortByKey操作
從HDFS讀取數(shù)據(jù)
# -*- coding: utf-8 -*- # Program function: 從HDFS讀取文件 from pyspark import SparkConf, SparkContext import time if __name__ == '__main__': # 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 conf = SparkConf().setAppName("FromHDFS").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 日志輸出級(jí)別 # 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) fileRDD = sc.textFile("hdfs://node1:9820/pydata/input/hello.txt") # ['hello you Spark Flink', 'hello me hello she Spark'] # 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) # ['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] # # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] # 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) # print(type(resultRDD)) print(resultRDD.collect()) # 休息幾分鐘 time.sleep(600) # 7-停止SparkContext sc.stop() # Shut down the SparkContext.
提交代碼到集群執(zhí)行
關(guān)鍵:sys.argv[1],
代碼:
# -*- coding: utf-8 -*- # Program function: 提交任務(wù)執(zhí)行 import sys from pyspark import SparkConf, SparkContext if __name__ == '__main__': # 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 conf = SparkConf().setAppName("FromHDFS").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 日志輸出級(jí)別 # 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) # hdfs://node1:9820/pydata/input/hello.txt fileRDD = sc.textFile(sys.argv[1]) # ['hello you Spark Flink', 'hello me hello she Spark'] # 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) # ['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] # # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] # 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) # print(type(resultRDD)) resultRDD.saveAsTextFile(sys.argv[2]) # 7-停止SparkContext sc.stop() # Shut down the SparkContext.
結(jié)果:
[掌握-擴(kuò)展閱讀]遠(yuǎn)程PySpark環(huán)境配置
需求:需要將PyCharm連接服務(wù)器,同步本地寫的代碼到服務(wù)器上,使用服務(wù)器上的Python解析器執(zhí)行
步驟:
1-準(zhǔn)備PyCharm的連接
2-需要了解服務(wù)器的地址,端口號(hào),用戶名,密碼
![]()
設(shè)置自動(dòng)的上傳,如果不太好使,重啟pycharm
3-pycharm讀取的文件都需要上傳到linux中,復(fù)制相對(duì)路徑
4-執(zhí)行代碼在遠(yuǎn)程服務(wù)器上
![]()
5-執(zhí)行代碼
# -*- coding: utf-8 -*- # Program function: Spark的第一個(gè)程序 # 1-思考:sparkconf和sparkcontext從哪里導(dǎo)保 # 2-如何理解算子?Spark中算子有2種, # 一種稱之為Transformation算子(flatMapRDD-mapRDD-reduceBykeyRDD), # 一種稱之為Action算子(輸出到控制臺(tái),或文件系統(tǒng)或hdfs),比如collect或saveAsTextFile都是Action算子 from pyspark import SparkConf, SparkContext if __name__ == '__main__': # 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 conf = SparkConf().setAppName("FirstSpark").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 日志輸出級(jí)別 # 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) fileRDD = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkBase_3.1.2/data/words.txt") # fileRDD = sc.parallelize(["hello you", "hello me", "hello spark"]) # 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) # print(type(flat_mapRDD)) # print(flat_mapRDD.collect()) # ['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] # # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) # print(type(rdd_mapRDD))#<class 'pyspark.rdd.PipelinedRDD'> # print(rdd_mapRDD.collect()) # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] # 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) # print(type(resultRDD)) print(resultRDD.collect()) # [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)] # 6 - 將結(jié)果輸出到文件系統(tǒng)或打印 # resultRDD.saveAsTextFile("D:\BigData\PyWorkspace\Bigdata25-pyspark_3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") # 7-停止SparkContext sc.stop() # Shut down the SparkContext.
切記忘記上傳python的文件,直接執(zhí)行
注意1:自動(dòng)上傳設(shè)置
注意2:增加如何使用standalone和HA的方式提交代碼執(zhí)行
但是需要注意,盡可能使用hdfs的文件,不要使用單機(jī)版本的文件,因?yàn)閟tandalone是集群模式
# -*- coding: utf-8 -*- # Program function: Spark的第一個(gè)程序 # 1-思考:sparkconf和sparkcontext從哪里導(dǎo)保 # 2-如何理解算子?Spark中算子有2種, # 一種稱之為Transformation算子(flatMapRDD-mapRDD-reduceBykeyRDD), # 一種稱之為Action算子(輸出到控制臺(tái),或文件系統(tǒng)或hdfs),比如collect或saveAsTextFile都是Action算子 >from pyspark import SparkConf, SparkContext > >if __name__ == '__main__': > ># 1 - 首先創(chuàng)建SparkContext上下文環(huán)境 > >conf = SparkConf().setAppName("FirstSpark").setMaster("spark://node1:7077,node2:7077") >sc = SparkContext(conf=conf) >sc.setLogLevel("WARN") # 日志輸出級(jí)別 > ># 2 - 從外部文件數(shù)據(jù)源讀取數(shù)據(jù) > >fileRDD = sc.textFile("hdfs://node1:9820/pydata/input/hello.txt") > ># fileRDD = sc.parallelize(["hello you", "hello me", "hello spark"]) > ># 3 - 執(zhí)行flatmap執(zhí)行扁平化操作 > >flat_mapRDD = fileRDD.flatMap(lambda words: words.split(" ")) > ># print(type(flat_mapRDD)) > ># print(flat_mapRDD.collect()) > ># ['hello', 'you', 'Spark', 'Flink', 'hello', 'me', 'hello', 'she', 'Spark'] > ># # 4 - 執(zhí)行map轉(zhuǎn)化操作,得到(word, 1) > >rdd_mapRDD = flat_mapRDD.map(lambda word: (word, 1)) > ># print(type(rdd_mapRDD))#<class 'pyspark.rdd.PipelinedRDD'> > ># print(rdd_mapRDD.collect()) > ># [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)] > ># 5 - reduceByKey將相同Key的Value數(shù)據(jù)累加操作 > >resultRDD = rdd_mapRDD.reduceByKey(lambda x, y: x + y) > ># print(type(resultRDD)) > >print(resultRDD.collect()) > ># [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)] > ># 6 - 將結(jié)果輸出到文件系統(tǒng)或打印 > ># resultRDD.saveAsTextFile("D:\BigData\PyWorkspace\Bigdata25-pyspark_3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") > ># 7-停止SparkContext > >sc.stop() # Shut down the SparkContext.
總結(jié)
-
函數(shù)式編程
-
#Python中的函數(shù)式編程 #1-map(func, *iterables) --> map object def fun(x): return x*x #x=[1,2,3,4,5] y=map(fun,[1,2,3,4,5]) #[1, 4, 9, 16, 25] print(list(map(fun, [1, 2, 3, 4, 5]))) #2-lambda 匿名函數(shù) java: x=>x*x 表達(dá)式 Scala:x->x*x g=lambda x:x*x print(g(10)) print(list(map(lambda x:x*x, [1, 2, 3, 4, 5]))) def add(x,y): return x+y print(list(map(add, range(5), range(5, 10)))) print(list(map(lambda x,y:x+y,range(5),range(5,10)))) #3- [add(x,y) for x,y in zip(range(5),range(5,10))] # print(list(zip([1, 2, 3], [4, 5, 6])))#[1,4],[2,5] # print(list(zip([1, 2, 3], [4, 5, 6,7])))#[1,4],[2,5] # print(list(zip([1, 2, 3,6], [4, 5, 6])))#[1,4],[2,5] # 語法 lambda表達(dá)式語言:【lambda 變量:表達(dá)式】 # 列表表達(dá)式 [表達(dá)式 for 變量 in 可迭代的序列中 if 條件] print([add(x, y) for x, y in zip(range(5), range(5))]) #[0, 2, 4, 6, 8] #3-reduce from functools import reduce # ((((1+2)+3)+4)+5) print(reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])) # 4-filter seq1=['foo','x41','?1','***'] def func(x): #Return True if the string is an alpha-numeric string return x.isalnum() print(list(filter(func,seq1))) #返回 filter 對(duì)象 # sorted() # 最后我們可以看到,函數(shù)式編程有如下好處: # 1)代碼更簡(jiǎn)單了。 # 2)數(shù)據(jù)集,操作,返回值都放到了一起。 # 3)你在讀代碼的時(shí)候,沒有了循環(huán)體,于是就可以少了些臨時(shí)變量,以及變量倒來倒去邏輯。 # 4)你的代碼變成了在描述你要干什么,而不是怎么去干。
后記
??博客主頁:https://manor.blog.csdn.net文章來源:http://www.zghlxwxcb.cn/news/detail-795726.html
??歡迎點(diǎn)贊 ?? 收藏 ?留言 ?? 如有錯(cuò)誤敬請(qǐng)指正!
??本文由 Maynor 原創(chuàng),首發(fā)于 CSDN博客??
??感覺這輩子,最深情綿長的注視,都給了手機(jī)?
??專欄持續(xù)更新,歡迎訂閱:https://blog.csdn.net/xianyu120/category_12453356.html文章來源地址http://www.zghlxwxcb.cn/news/detail-795726.html
到了這里,關(guān)于Python大數(shù)據(jù)之PySpark(三)使用Python語言開發(fā)Spark程序代碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!