13.108.Spark 優(yōu)化
1.1.25.Spark優(yōu)化與hive的區(qū)別
1.1.26.SparkSQL啟動(dòng)參數(shù)調(diào)優(yōu)
1.1.27.四川任務(wù)優(yōu)化實(shí)踐:執(zhí)行效率提升50%以上
13.108.Spark 優(yōu)化:
1.1.25.Spark優(yōu)化與hive的區(qū)別
先理解spark與mapreduce的本質(zhì)區(qū)別,算子之間(map和reduce之間多了依賴關(guān)系判斷,即寬依賴和窄依賴。)
優(yōu)化的思路和hive基本一致,比較大的區(qū)別就是mapreduce算子之間都需要落磁盤(pán),而spark只有寬依賴才需要落磁盤(pán),窄依賴不落磁盤(pán)。
1.1.26.SparkSQL啟動(dòng)參數(shù)調(diào)優(yōu)
1)先對(duì)比結(jié)果:executors優(yōu)化
Hive執(zhí)行了30分鐘(1800秒)的sql,沒(méi)有優(yōu)化過(guò)的SparkSQL執(zhí)行需要,
最少化的Executor執(zhí)行需要640秒(提高了Executor的并行度,犧牲了HDFS的吞吐量:5個(gè)core最合適),
最大化的Executor 281.634秒(最大限度的利用HDFS的吞吐量,犧牲Executor的并行度),
優(yōu)化取中間值,253.189秒。
方案1:最少化 Fat executors
--------------------------------- Fat executors --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Fat executors (每個(gè)節(jié)點(diǎn)一個(gè)Executor)【優(yōu)勢(shì):最佳吞吐量】
--num-executors 3 \ # 集群中的節(jié)點(diǎn)的數(shù)目 = 3
--executor-memory 30G \ # 每個(gè)節(jié)點(diǎn)的內(nèi)存/每個(gè)節(jié)點(diǎn)的executor數(shù)目 = 30GB/1 = 30GB
--executor-cores 16 \ # 每個(gè)executor獨(dú)占節(jié)點(diǎn)中所有的cores = 節(jié)點(diǎn)中的core的數(shù)目 = 16
--driver-memory 1G # AM大約需要1024MB的內(nèi)存和一個(gè)Executor
耗時(shí):Time taken: 640 seconds
方案2:最大化Tiny executors
--------------------------------- Tiny executors --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Tiny executors [每個(gè)Executor一個(gè)Core]【優(yōu)勢(shì):并行性】
--num-executors 48 \ # 集群中的core的總數(shù) = 每個(gè)節(jié)點(diǎn)的core數(shù)目 * 集群中的節(jié)點(diǎn)數(shù) = 16*3
--executor-memory 1.6G \ # 每個(gè)節(jié)點(diǎn)的內(nèi)存/每個(gè)節(jié)點(diǎn)的executor數(shù)目 = 30GB/16 = 1.875GB
--executor-cores 1 \ # 每個(gè)executor一個(gè)core
--driver-memory 1G # AM大約需要1024MB的內(nèi)存和一個(gè)Executor
耗時(shí):Time taken: 281.634 seconds
executor并發(fā)度只有45,task的并發(fā)度,1個(gè)executor 50左右,總數(shù) 18382
方案3:折中方案
--------------------------------- Balance between Fat (vs) Tiny --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Balance between Fat (vs) Tiny
--num-executors 8 \ # (16-1)*3/5 = 9 留一個(gè)executor給ApplicationManager => --num-executors = 9-1 = 8
# 每個(gè)節(jié)點(diǎn)的executor的數(shù)目 = 9 / 3 = 3
--executor-memory 10G \ # 每個(gè)executor的內(nèi)存 = 30GB / 3 = 10GB【默認(rèn)分配的是8G,需要修改配置文件支持到10G?!?/span>
# 計(jì)算堆開(kāi)銷 = 7% * 10GB = 0.7GB。因此,實(shí)際的 --executor-memory = 10 - 0.7 = 9.3GB
--executor-cores 5 \ # 給每個(gè)executor分配5個(gè)core,保證良好的HDFS吞吐。
# 每個(gè)節(jié)點(diǎn)留一個(gè)core給Hadoop/Yarn守護(hù)進(jìn)程 => 每個(gè)節(jié)點(diǎn)可用的core的數(shù)目= 16 - 1
--driver-memory 1G
耗時(shí):Time taken: 253 seconds
Task并行度優(yōu)化
1.調(diào)整 Executors 下 每個(gè)stage的默認(rèn)task數(shù)量,即設(shè)置Task 的并發(fā)度:
【當(dāng)集群數(shù)量比較大時(shí)】
很多人常犯的一個(gè)錯(cuò)誤就是不去設(shè)置這個(gè)參數(shù),那么此時(shí)就會(huì)導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來(lái)設(shè)置task的數(shù)量,
!【默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)task(如果不設(shè)置那么可以通過(guò)第三種方案來(lái)優(yōu)化!)】。
通常來(lái)說(shuō),Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個(gè)task),
如果task數(shù)量偏少的話,就會(huì)導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。
試想一下,無(wú)論你的Executor進(jìn)程有多少個(gè),內(nèi)存和CPU有多大,但是task只有1個(gè)或者10個(gè),
那么90%的Executor進(jìn)程可能根本就沒(méi)有task執(zhí)行,也就是白白浪費(fèi)了資源!
因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,
比如Executor的總CPU core數(shù)量為300個(gè),那么設(shè)置1000個(gè)task是可以的,此時(shí)可以充分地利用Spark集群的資源。
30 G 16 core
/home/admin/bigdata/spark-2.2.0-bin-hadoop2.6/bin/spark-sql \
--master yarn \
--num-executors 16 \
--executor-memory 1G \
--executor-cores 10 \
--driver-memory 1G \
--conf spark.default.parallelism=450 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3
1.1.27.四川任務(wù)優(yōu)化實(shí)踐:執(zhí)行效率提升50%以上
一、四川的信息
賬號(hào):xxxxxx 密碼: xxxxxxxx
一、事實(shí)表優(yōu)化
1、**優(yōu)化結(jié)果: 20 分鐘左右,優(yōu)化完成后 5 分鐘左右。**數(shù)據(jù)量:5.8億
2、原SQL:(spark不一定快)
drop table if exists dc_f_organization;
create table if not exists dc_f_organization (
orgid int,
orgcode string,
yearmonth string ,
zzdate string,
orgname string,
orglevel int,
id int,
orgtagging int,
createdate timestamp
);
insert into dc_f_organization
select
a.orgid, .orgcode, a.yearmonth, a.zzdate, n.orgname, n.orglevel, n.id, n.orgtagging, n.createdate
from
( select o.id orgid, o.orgcode, d.yearmonth, d.zzdate from dc_d_organization o, dc_d_wddate ) a
left join dc_d_organization n on to_date(n.createdate)=a.zzdate and n.orgcode = a.orgcode;
3、優(yōu)化方案:
– ############################## HIVE 執(zhí)行:增加 block 的數(shù)量,提高Spark的并發(fā)度(當(dāng)前任務(wù)文件比較小,設(shè)置了26;一般參考數(shù)量:300左右;) #################################
– (1) 單獨(dú)執(zhí)行笛卡爾積,
– 先拆分文件:(改用hive,拆分文件,增加并行度)
– 【耗時(shí):101.586 seconds;結(jié)果文件數(shù)量 26】
– 檢查文件塊數(shù)量:hadoop fs -ls /user/hive/warehouse/test.db/dc_d_org_date 26 個(gè)block
set mapreduce.map.memory.mb=1024;
set mapred.max.split.size=524288;
set mapred.min.split.size.per.node=524288;
set mapred.min.split.size.per.rack=524288;
drop table if exists dc_d_org_date;
create table dc_d_org_date as select o.id orgid,o.orgcode,d.yearmonth,d.zzdate from dc_d_organization o CROSS JOIN dc_d_wddate d;
-- ############################## SPARK 執(zhí)行;參數(shù):spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G #################################
-- (2)【Spark:Time taken: 115.78 seconds;】
set spark.shuffle.consolidateFiles=true;
drop table if exists dc_f_organization;
create table if not exists dc_f_organization
(orgid int,orgcode string,YEARMONTH string ,ZZDATE string,ORGNAME string,orglevel int,id int,ORGTAGGING int, createdate timestamp);
insert into dc_f_organization
select a.orgid,a.orgcode,a.YEARMONTH,a.ZZDATE,n.ORGNAME,n.orglevel,n.id,n.ORGTAGGING,n.createdate
from dc_d_org_date a
left join DC_D_ORGANIZATION n on to_date(n.CREATEDATE)=a.ZZDATE and n.orgcode = a.orgcode;
– ############################## 持續(xù)優(yōu)化方向:將上述兩者合并到一起在 spark 中執(zhí)行 ##############################
問(wèn)題:可能是因?yàn)槲募。瑂park 分區(qū)命令沒(méi)有生效。set spark.sql.shuffle.partitions=300;
注意:SPARK中笛卡爾積需要改成 CROSS JOIN,否則語(yǔ)法報(bào)錯(cuò)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-686810.html
二、優(yōu)化CUBE表文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-686810.html
1、優(yōu)化結(jié)果:原來(lái)1小時(shí)左右,優(yōu)化后26分鐘。
總結(jié):shuffle時(shí)間:16分鐘,數(shù)據(jù)量 35.2億
任務(wù)含有寬依賴(group)被分成2個(gè)stage
?采用方案 1:改用spark執(zhí)行。提高并行度。
執(zhí)行參數(shù):spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G
stage 1 執(zhí)行時(shí)間:11(partitions=300)
stage 2 執(zhí)行時(shí)間:15(partitions=200)
設(shè)置分區(qū)數(shù)量,默認(rèn)是200:set spark.sql.shuffle.partitions=300;(理論上可以提高 stage 2 30%的速度,實(shí)際運(yùn)行的時(shí)候可能會(huì)丟失executor,運(yùn)行不穩(wěn)定,不建議設(shè)置。)
(原因可能是設(shè)置了虛擬核心數(shù)量。)
方案 2:將case when的操作獨(dú)立出一張表,去除部分重復(fù)掃描計(jì)算,減少cube階段的計(jì)算量。
抽取的時(shí)間增加了2分鐘,節(jié)省的 shuffle 時(shí)間也是2分鐘。沒(méi)有意義。
預(yù)處理時(shí)間:2-3分鐘
stage 1 執(zhí)行時(shí)間:11
stage 2 執(zhí)行時(shí)間:13(節(jié)省的時(shí)間也是2-3分鐘)
方案 3:提高 shuffle 使用內(nèi)存的占比 設(shè)置為60%
執(zhí)行參數(shù):spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G --conf spark.storage.memoryFraction=0.3 --conf spark.shuffle.memoryFraction=0.5
執(zhí)行結(jié)果:效果不明顯,多次執(zhí)行時(shí)間也不太一致。
方案 4:減少CUBE的維度數(shù)量, orgid 和 orgcode是一對(duì)一關(guān)系,可以去掉1個(gè)維度,計(jì)算完成之后再join
執(zhí)行結(jié)果:join 消耗的時(shí)間更久。
2、采用的方案1:SPARK執(zhí)行
-- 執(zhí)行參數(shù) spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G
-- set spark.sql.shuffle.partitions=300;
drop table if exists dc_c_organization;
create table if not exists dc_c_organization(
YEARMONTH string,ZZDATE string,orgid int ,orgcode string,total int,
provinceNum int,cityNum int,districtNum int, newDistrictNum int,townNum int,streetNum int,otherNum int,communityNum int,villageNum int,gridNum int);
-- 如果用 hive 執(zhí)行可以開(kāi)啟 combiner,map端先預(yù)聚合,減少reduce端的數(shù)據(jù)量和計(jì)算量,減少磁盤(pán)的IO和網(wǎng)絡(luò)傳輸時(shí)間。
-- set hive.map.aggr = true;
-- set hive.groupby.mapaggr.checkinterval = 10000;
-- ############################## SPARK #################################
-- set spark.sql.shuffle.partitions=300;
insert into dc_c_organization
select
n.YEARMONTH,
n.ZZDATE,
n.orgid,
n.orgcode,
count(n.id) total,
nvl(SUM(case when pt.displayname='省' then 1 else 0 end),0) AS provinceNum,
nvl(SUM(case when pt.displayname='市' then 1 else 0 end),0) as cityNum,
nvl(SUM(case when pt.displayname='縣(區(qū))' then 1 else 0 end),0) AS districtNum,
(nvl(SUM(case when pt.displayname='縣(區(qū))' then 1 else 0 end),0) -nvl(SUM(case when pt.displayname='縣(區(qū))' AND n.ORGTAGGING= 31 then 1 else 0 end),0)) as newDistrictNum,
nvl(SUM(case when ((n.ORGNAME LIKE '%鄉(xiāng)%' OR n.ORGNAME LIKE '%鎮(zhèn)%' OR n.ORGNAME LIKE '%鄉(xiāng)鎮(zhèn)%')) AND pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end),0) townNum,
nvl(SUM(case when (n.ORGNAME LIKE '%街道%') AND pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end),0) streetNum,
(nvl(SUM(case when pt.displayname='鄉(xiāng)鎮(zhèn)(街道)'then 1 else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%鄉(xiāng)%' OR n.ORGNAME LIKE '%鎮(zhèn)%' OR n.ORGNAME LIKE '%鄉(xiāng)鎮(zhèn)%') ) AND pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end),0)-nvl(SUM(case when (n.ORGNAME LIKE '%街道%' ) AND pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end),0)) otherNum,
(nvl(SUM(case when pt.displayname='村(社區(qū))' then 1 else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委員會(huì)' OR n.ORGNAME LIKE '%農(nóng)村工作中心站' OR n.ORGNAME LIKE '%村委會(huì)')) AND pt.displayname='村(社區(qū))' then 1 else 0 end),0)) communityNum,
nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委員會(huì)' OR n.ORGNAME LIKE '%農(nóng)村工作中心站' OR n.ORGNAME LIKE '%村委會(huì)')) AND pt.displayname='村(社區(qū))' then 1 else 0 end),0) villageNum,
nvl(SUM(case when pt.displayname='片組片格'then 1 else 0 end),0) gridNum
from dc_f_organization n
left join dc_d_property pt on n.orglevel = pt.id
GROUP BY n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcode
WITH CUBE;
3、優(yōu)化方案2:從業(yè)務(wù)邏輯上進(jìn)行優(yōu)化。(發(fā)現(xiàn)SQL邏輯中存在重復(fù)的計(jì)算)
-- ############################ 預(yù)處理:去除重復(fù)計(jì)算和減少CUBE的計(jì)算量 ############################
drop table if exists temp_dc_c_organization;
create table temp_dc_c_organization
as select
n.yearmonth,
n.zzdate,
n.orgid,
n.orgcode,
n.id as id,
case when pt.displayname='省' then 1 else 0 end as provincenum,
case when pt.displayname='市' then 1 else 0 end as citynum,
case when pt.displayname='縣(區(qū))' then 1 else 0 end as districtnum,
case when pt.displayname='縣(區(qū))' and n.orgtagging= 31 then 1 else 0 end as old_districtnum,
【重復(fù)1】 case when ((n.orgname like '%鄉(xiāng)%' or n.orgname like '%鎮(zhèn)%' or n.orgname like '%鄉(xiāng)鎮(zhèn)%')) and pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end townnum,
【重復(fù)2】 case when (n.orgname like '%街道%') and pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end streetnum,
case when pt.displayname='鄉(xiāng)鎮(zhèn)(街道)'then 1 else 0 end as total_streetnum_01,
【重復(fù)1】 case when ((n.orgname like '%鄉(xiāng)%' or n.orgname like '%鎮(zhèn)%' or n.orgname like '%鄉(xiāng)鎮(zhèn)%')) and pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end as total_streetnum_02,
【重復(fù)2】 case when (n.orgname like '%街道%') and pt.displayname='鄉(xiāng)鎮(zhèn)(街道)' then 1 else 0 end as total_streetnum_03,
case when pt.displayname='村(社區(qū))' then 1 else 0 end as communitynum_01,
【重復(fù)3】 case when ((n.orgname like '%村' or n.orgname like '%村民委員會(huì)' or n.orgname like '%農(nóng)村工作中心站' or n.orgname like '%村委會(huì)')) and pt.displayname='村(社區(qū))' then 1 else 0 end as communitynum_02,
【重復(fù)3】 case when ((n.orgname like '%村' or n.orgname like '%村民委員會(huì)' or n.orgname like '%農(nóng)村工作中心站' or n.orgname like '%村委會(huì)')) and pt.displayname='村(社區(qū))' then 1 else 0 end villagenum,
case when pt.displayname='片組片格'then 1 else 0 end gridnum
from
dc_f_organization n
left join dc_d_property pt on n.orglevel = pt.id;
-- ############################ CUBE:節(jié)省的時(shí)間相當(dāng)于預(yù)處理的時(shí)間。############################
create table dc_c_organization_02
as select
yearmonth,
zzdate,
orgid,
count(id) total,
sum(provincenum) as provincenum,
sum(citynum) as citynum,
sum(districtnum) as districtnum,
sum(districtnum)-sum(old_districtnum) as newdistrictnum,
sum(townnum) townnum,
sum(streetnum) streetnum,
sum(total_streetnum_01)-sum(townnum)-sum(streetnum) othernum,
sum(communitynum_01)-sum(villagenum) communitynum,
sum(villagenum) villagenum,
sum(gridnum) gridnum
from temp_dc_c_organization as n
group by yearmonth, zzdate, orgid with cube;
到了這里,關(guān)于13.108.Spark 優(yōu)化、Spark優(yōu)化與hive的區(qū)別、SparkSQL啟動(dòng)參數(shù)調(diào)優(yōu)、四川任務(wù)優(yōu)化實(shí)踐:執(zhí)行效率提升50%以上的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!