Fugue 是一個(gè)低代碼的統(tǒng)一接口,用于不同的計(jì)算框架,如 Spark、Dask。PyCaret 使用 Fugue 來(lái)支持分布式計(jì)算場(chǎng)景。
1、分布式計(jì)算場(chǎng)景
(1)分類
讓我們從最標(biāo)準(zhǔn)的例子開(kāi)始,代碼與本地版本完全相同,沒(méi)有任何魔法。
# 導(dǎo)入所需的庫(kù)
from pycaret.datasets import get_data # 導(dǎo)入獲取數(shù)據(jù)的函數(shù)
from pycaret.classification import * # 導(dǎo)入分類模型
# 使用get_data函數(shù)獲取名為"juice"的數(shù)據(jù)集,并設(shè)置verbose參數(shù)為False,表示不顯示詳細(xì)信息
data = get_data("juice", verbose=False)
# 設(shè)置目標(biāo)變量為'Purchase',n_jobs參數(shù)為1表示使用單個(gè)進(jìn)程
setup(data=data, target='Purchase', n_jobs=1)
# 獲取前5個(gè)模型的名稱,并存儲(chǔ)在test_models變量中
test_models = models().index.tolist()[:5]
compare_model
如果您不想使用分布式系統(tǒng),也完全相同。
# 比較模型函數(shù)
compare_models(include=test_models, n_select=2)
Processing: 0%| | 0/26 [00:00<?, ?it/s]
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=1000,
multi_class='auto', n_jobs=None, penalty='l2',
random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
warm_start=False),
DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
max_depth=None, max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_samples_leaf=1,
min_samples_split=2, min_weight_fraction_leaf=0.0,
random_state=4292, splitter='best')]
現(xiàn)在讓我們將其分布式,作為一個(gè)玩具案例,在dask上。唯一改變的是一個(gè)額外的參數(shù)parallel_backend
。
# 導(dǎo)入所需的庫(kù)
from pycaret.parallel import FugueBackend
# 使用FugueBackend作為并行計(jì)算的后端
# compare_models函數(shù)用于比較多個(gè)模型的性能
# include參數(shù)指定要比較的模型列表
# n_select參數(shù)指定要選擇的最佳模型數(shù)量
# parallel參數(shù)指定使用的并行計(jì)算后端,這里使用FugueBackend("dask")表示使用Dask作為并行計(jì)算后端
compare_models(include=test_models, n_select=2, parallel=FugueBackend("dask"))
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=1000,
multi_class='auto', n_jobs=None, penalty='l2',
random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
warm_start=False),
DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
max_depth=None, max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_samples_leaf=1,
min_samples_split=2, min_weight_fraction_leaf=0.0,
random_state=4292, splitter='best')]
為了使用Spark作為執(zhí)行引擎,您必須能夠訪問(wèn)一個(gè)Spark集群,并且必須擁有一個(gè)SparkSession
,讓我們初始化一個(gè)本地的Spark會(huì)話。
# 導(dǎo)入SparkSession模塊
from pyspark.sql import SparkSession
# 創(chuàng)建或獲取SparkSession對(duì)象
spark = SparkSession.builder.getOrCreate()
現(xiàn)在只需將parallel_backend
更改為此會(huì)話對(duì)象,即可在Spark上運(yùn)行。您必須明白這只是一個(gè)玩具案例。在實(shí)際情況中,您需要擁有一個(gè)指向真實(shí)Spark集群的SparkSession,才能享受Spark的強(qiáng)大功能。
# 調(diào)用 compare_models 函數(shù),傳入?yún)?shù) include=test_models、n_select=2 和 parallel=FugueBackend(spark)
compare_models(include=test_models, n_select=2, parallel=FugueBackend(spark))
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=1000,
multi_class='auto', n_jobs=None, penalty='l2',
random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
warm_start=False),
DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
max_depth=None, max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_samples_leaf=1,
min_samples_split=2, min_weight_fraction_leaf=0.0,
random_state=4292, splitter='best')]
最后,你可以使用pull
命令來(lái)獲取指標(biāo)表格。
pull()
(2)回歸
回歸問(wèn)題與分類問(wèn)題遵循相同的模式。
# 導(dǎo)入所需的庫(kù)
from pycaret.datasets import get_data # 導(dǎo)入獲取數(shù)據(jù)的函數(shù)
from pycaret.regression import * # 導(dǎo)入回歸模型
# 設(shè)置數(shù)據(jù)和目標(biāo)變量
setup(data=get_data("insurance", verbose=False), target='charges', n_jobs=1)
# 獲取前5個(gè)模型
test_models = models().index.tolist()[:5]
compare_model
如果您不想使用分布式系統(tǒng),也完全相同。
# 比較模型性能的函數(shù)
# 參數(shù):
# include: 需要比較的模型列表
# n_select: 需要選擇的模型數(shù)量
# sort: 按照哪個(gè)指標(biāo)進(jìn)行排序,默認(rèn)為平均絕對(duì)誤差(MAE)
compare_models(include=test_models, n_select=2, sort="MAE")
Processing: 0%| | 0/26 [00:00<?, ?it/s]
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
jitter=None, n_nonzero_coefs=500, normalize='deprecated',
precompute='auto', random_state=3514, verbose=False),
LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
normalize='deprecated', positive=False)]
現(xiàn)在讓我們將其分布式,作為一個(gè)玩具案例,在dask上。唯一改變的是一個(gè)額外的參數(shù)parallel_backend
。
# 導(dǎo)入所需的庫(kù)
from pycaret.parallel import FugueBackend
# 使用FugueBackend作為并行計(jì)算的后端
# compare_models函數(shù)用于比較多個(gè)模型的性能,并選擇性能最好的幾個(gè)模型
# include參數(shù)指定要比較的模型列表
# n_select參數(shù)指定要選擇的模型數(shù)量
# sort參數(shù)指定按照哪個(gè)指標(biāo)進(jìn)行排序,這里選擇按照平均絕對(duì)誤差(MAE)進(jìn)行排序
# parallel參數(shù)指定使用的并行計(jì)算后端,這里選擇使用FugueBackend("dask")作為并行計(jì)算的后端
compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend("dask"))
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
jitter=None, n_nonzero_coefs=500, normalize='deprecated',
precompute='auto', random_state=3514, verbose=False),
LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
normalize='deprecated', positive=False)]
為了使用Spark作為執(zhí)行引擎,您必須能夠訪問(wèn)一個(gè)Spark集群,并且必須擁有一個(gè)SparkSession
,讓我們初始化一個(gè)本地的Spark會(huì)話。
# 導(dǎo)入SparkSession模塊
from pyspark.sql import SparkSession
# 創(chuàng)建或獲取一個(gè)SparkSession對(duì)象
spark = SparkSession.builder.getOrCreate()
現(xiàn)在只需將parallel_backend
更改為此會(huì)話對(duì)象,即可在Spark上運(yùn)行。您必須明白這只是一個(gè)玩具案例。在真實(shí)情況下,您需要擁有一個(gè)指向真實(shí)Spark集群的SparkSession,才能享受Spark的強(qiáng)大功能。
# 調(diào)用compare_models函數(shù),傳入?yún)?shù)include=test_models、n_select=2、sort="MAE"和parallel=FugueBackend(spark)
compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend(spark))
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
jitter=None, n_nonzero_coefs=500, normalize='deprecated',
precompute='auto', random_state=3514, verbose=False),
LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
normalize='deprecated', positive=False)]
最后,你可以使用pull
命令來(lái)獲取指標(biāo)表格。
pull()
(3)時(shí)間序列
它遵循與分類相同的模式。
# 導(dǎo)入所需的庫(kù)和模塊
from pycaret.datasets import get_data # 導(dǎo)入獲取數(shù)據(jù)的函數(shù)
from pycaret.time_series import * # 導(dǎo)入時(shí)間序列模塊
# 創(chuàng)建時(shí)間序列預(yù)測(cè)實(shí)驗(yàn)對(duì)象
exp = TSForecastingExperiment()
# 設(shè)置實(shí)驗(yàn)參數(shù)
exp.setup(
data=get_data('airline', verbose=False), # 獲取數(shù)據(jù)集,此處使用航空數(shù)據(jù)集
fh=12, # 設(shè)置預(yù)測(cè)的未來(lái)時(shí)間步數(shù)為12
fold=3, # 設(shè)置交叉驗(yàn)證的折數(shù)為3
fig_kwargs={'renderer': 'notebook'}, # 設(shè)置繪圖參數(shù),此處使用notebook作為渲染器
session_id=42 # 設(shè)置隨機(jī)種子為42,保證實(shí)驗(yàn)的可重復(fù)性
)
# 獲取前5個(gè)模型的名稱
test_models = exp.models().index.tolist()[:5]
# 比較模型性能并選擇最佳模型
# 使用exp.compare_models函數(shù)比較模型性能,并選擇最佳的3個(gè)模型作為基準(zhǔn)模型
# 參數(shù)include=test_models表示只比較test_models中的模型
# 參數(shù)n_select=3表示選擇性能最好的3個(gè)模型作為最佳基準(zhǔn)模型
best_baseline_models = exp.compare_models(include=test_models, n_select=3)
best_baseline_models
Processing: 0%| | 0/27 [00:00<?, ?it/s]
[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0,
scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12),
start_params=None, suppress_warnings=False, trend=None,
with_intercept=True),
NaiveForecaster(sp=12, strategy='last', window_length=None),
PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]
# 導(dǎo)入所需的模塊
from pycaret.parallel import FugueBackend
# 使用FugueBackend作為并行計(jì)算的后端
# FugueBackend是一個(gè)用于分布式計(jì)算的后端,可以使用Dask或Ray來(lái)實(shí)現(xiàn)并行計(jì)算
# 這里使用了"Dask"作為FugueBackend的參數(shù),表示使用Dask來(lái)進(jìn)行并行計(jì)算
# 使用exp.compare_models函數(shù)比較模型性能,并選擇最佳的3個(gè)模型
# include參數(shù)指定要比較的模型列表,test_models是一個(gè)包含待比較模型的列表
# n_select參數(shù)指定要選擇的最佳模型的數(shù)量,這里選擇了3個(gè)最佳模型
# parallel參數(shù)指定并行計(jì)算的后端,這里使用了之前創(chuàng)建的FugueBackend對(duì)象
# 將比較結(jié)果保存在best_baseline_models變量中,該變量將包含最佳的3個(gè)模型
best_baseline_models = exp.compare_models(include=test_models, n_select=3, parallel=FugueBackend("dask"))
best_baseline_models
[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0,
scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12),
start_params=None, suppress_warnings=False, trend=None,
with_intercept=True),
NaiveForecaster(sp=12, strategy='last', window_length=None),
PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]
# 導(dǎo)入SparkSession模塊
from pyspark.sql import SparkSession
# 創(chuàng)建或獲取SparkSession對(duì)象
spark = SparkSession.builder.getOrCreate()
# 導(dǎo)入所需的模塊
from pycaret.parallel import FugueBackend
# 使用FugueBackend作為并行計(jì)算的后端
# 使用exp.compare_models函數(shù)來(lái)比較模型性能并選擇最佳模型
# include參數(shù)指定要比較的模型列表,這里選擇了test_models列表的前兩個(gè)模型
# n_select參數(shù)指定要選擇的最佳模型數(shù)量,這里選擇了3個(gè)最佳模型
# parallel參數(shù)指定并行計(jì)算的后端,這里使用了FugueBackend(spark)
# 將比較結(jié)果保存在best_baseline_models變量中
best_baseline_models = exp.compare_models(include=test_models[:2], n_select=3, parallel=FugueBackend(spark))
best_baseline_models
[NaiveForecaster(sp=1, strategy='last', window_length=None),
NaiveForecaster(sp=1, strategy='mean', window_length=None)]
# 從exp對(duì)象中調(diào)用pull()方法
exp.pull()
2、分布式應(yīng)用技巧
(1)一個(gè)更實(shí)際的案例
上面的例子都是純粹的玩具,為了在分布式系統(tǒng)中使事情完美運(yùn)行,你必須注意一些事情
(2) 在設(shè)置中使用lambda而不是dataframe
如果你直接在setup
中提供一個(gè)dataframe,這個(gè)數(shù)據(jù)集將需要發(fā)送到所有的工作節(jié)點(diǎn)。如果dataframe是1G,你有100個(gè)工作節(jié)點(diǎn),那么你的驅(qū)動(dòng)機(jī)器可能需要發(fā)送高達(dá)100G的數(shù)據(jù)(取決于具體框架的實(shí)現(xiàn)),這個(gè)數(shù)據(jù)傳輸本身就成為了一個(gè)瓶頸。相反,如果你提供一個(gè)lambda函數(shù),它不會(huì)改變本地計(jì)算的情況,但驅(qū)動(dòng)程序只會(huì)將函數(shù)引用發(fā)送給工作節(jié)點(diǎn),每個(gè)工作節(jié)點(diǎn)將負(fù)責(zé)自己加載數(shù)據(jù),因此驅(qū)動(dòng)程序端沒(méi)有大量的流量。
(3) 保持確定性
你應(yīng)該始終使用session_id
來(lái)使分布式計(jì)算具有確定性。
(4) 設(shè)置n_jobs
在想要分布式運(yùn)行某些任務(wù)時(shí),明確設(shè)置n_jobs非常重要,這樣它就不會(huì)過(guò)度使用本地/遠(yuǎn)程資源。這也可以避免資源爭(zhēng)用,并加快計(jì)算速度。
# 導(dǎo)入所需的庫(kù)
from pycaret.datasets import get_data # 導(dǎo)入獲取數(shù)據(jù)的函數(shù)
from pycaret.classification import * # 導(dǎo)入分類模塊
# 設(shè)置函數(shù),用于獲取數(shù)據(jù)
# 使用get_data函數(shù)獲取名為"juice"的數(shù)據(jù)集,關(guān)閉冗長(zhǎng)輸出(verbose=False),關(guān)閉數(shù)據(jù)集的概要信息(profile=False)
# 設(shè)置目標(biāo)變量為'Purchase'
# 設(shè)置會(huì)話ID為0,以確保結(jié)果的可重復(fù)性
# 設(shè)置使用的CPU核心數(shù)為1
setup(data_func=lambda: get_data("juice", verbose=False, profile=False), target='Purchase', session_id=0, n_jobs=1);
(4)設(shè)置適當(dāng)?shù)呐看笮?/h3>
batch_size
參數(shù)有助于在負(fù)載均衡和開(kāi)銷之間進(jìn)行調(diào)整。對(duì)于每個(gè)批次,設(shè)置將只調(diào)用一次。所以
選擇 | 負(fù)載均衡 | 開(kāi)銷 | 最佳情況 |
---|---|---|---|
較小的批量大小 | 更好 | 更差 |
訓(xùn)練時(shí)間 >> 數(shù)據(jù)加載時(shí)間 或者 模型數(shù)量 ~= 工作進(jìn)程數(shù)量
|
較大的批量大小 | 更差 | 更好 |
訓(xùn)練時(shí)間 << 數(shù)據(jù)加載時(shí)間 或者 模型數(shù)量 >> 工作進(jìn)程數(shù)量
|
默認(rèn)值設(shè)置為1
,表示我們希望獲得最佳的負(fù)載均衡。
(5) 顯示進(jìn)度
在開(kāi)發(fā)中,您可以通過(guò)display_remote=True
啟用可視效果,但同時(shí)您還必須啟用Fugue回調(diào),以便驅(qū)動(dòng)程序可以監(jiān)視工作進(jìn)度。但建議在生產(chǎn)環(huán)境中關(guān)閉顯示。
# 導(dǎo)入所需的模塊
from pycaret.parallel import FugueBackend
# 定義配置參數(shù)
fconf = {
"fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer", # 保持該值不變
"fugue.rpc.flask_server.host": "0.0.0.0", # 驅(qū)動(dòng)程序的 IP 地址,工作節(jié)點(diǎn)可以訪問(wèn)
"fugue.rpc.flask_server.port": "3333", # 驅(qū)動(dòng)程序上的開(kāi)放端口
"fugue.rpc.flask_server.timeout": "2 sec", # 工作節(jié)點(diǎn)與驅(qū)動(dòng)程序通信的超時(shí)時(shí)間
}
# 創(chuàng)建 FugueBackend 對(duì)象
be = FugueBackend("dask", fconf, display_remote=True, batch_size=3, top_only=False)
# 使用 FugueBackend 對(duì)象進(jìn)行模型比較
compare_models(n_select=2, parallel=be)
Processing: 0%| | 0/14 [00:00<?, ?it/s]
[RidgeClassifier(alpha=1.0, class_weight=None, copy_X=True, fit_intercept=True,
max_iter=None, normalize='deprecated', positive=False,
random_state=0, solver='auto', tol=0.001),
LinearDiscriminantAnalysis(covariance_estimator=None, n_components=None,
priors=None, shrinkage=None, solver='svd',
store_covariance=False, tol=0.0001)]
(6)自定義指標(biāo)
您可以像以前一樣添加自定義指標(biāo)。但是為了使評(píng)分器可分發(fā),它必須是可序列化的。一個(gè)常見(jiàn)的函數(shù)應(yīng)該沒(méi)問(wèn)題,但是如果在函數(shù)內(nèi)部使用了一些不可序列化的全局變量(例如一個(gè)RLock
對(duì)象),可能會(huì)引發(fā)問(wèn)題。因此,請(qǐng)盡量使自定義函數(shù)獨(dú)立于全局變量。
# 定義一個(gè)名為score_dummy的函數(shù),用于計(jì)算模型的得分
# 參數(shù)y_true表示真實(shí)值,y_pred表示預(yù)測(cè)值,axis表示計(jì)算得分的軸
def score_dummy(y_true, y_pred, axis=0):
return 0.0
# 添加一個(gè)名為'mydummy'的指標(biāo)
# 參數(shù)id表示指標(biāo)的唯一標(biāo)識(shí)符
# 參數(shù)name表示指標(biāo)的名稱
# 參數(shù)score_func表示計(jì)算指標(biāo)得分的函數(shù),這里使用之前定義的score_dummy函數(shù)
# 參數(shù)target表示指標(biāo)的計(jì)算目標(biāo),這里是預(yù)測(cè)值
# 參數(shù)greater_is_better表示得分是否越大越好,這里設(shè)置為False,表示得分越小越好
add_metric(id='mydummy',
name='DUMMY',
score_func=score_dummy,
target='pred',
greater_is_better=False)
Name DUMMY
Display Name DUMMY
Score Function <function score_dummy at 0x7f8aa0dc0ca0>
Scorer make_scorer(score_dummy, greater_is_better=False)
Target pred
Args {}
Greater is Better False
Multiclass True
Custom True
Name: mydummy, dtype: object
在類實(shí)例中添加一個(gè)函數(shù)也是可以的,但是請(qǐng)確保類中的所有成員變量都是可序列化的。
# 獲取模型列表的前5個(gè)模型
test_models = models().index.tolist()[:5]
# 比較模型
# include參數(shù)指定要比較的模型列表
# n_select參數(shù)指定要選擇的模型數(shù)量
# sort參數(shù)指定排序方式,這里使用"DUMMY"表示不進(jìn)行排序
# parallel參數(shù)指定使用的并行計(jì)算后端,這里使用Dask作為后端
compare_models(include=test_models, n_select=2, sort="DUMMY", parallel=FugueBackend("dask"))
[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
max_depth=None, max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_samples_leaf=1,
min_samples_split=2, min_weight_fraction_leaf=0.0,
random_state=0, splitter='best'),
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=1000,
multi_class='auto', n_jobs=None, penalty='l2',
random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
warm_start=False)]
pull()
# 定義一個(gè)Scores類
class Scores:
# 定義一個(gè)名為score_dummy2的方法,用于計(jì)算得分
# 參數(shù)y_true表示真實(shí)標(biāo)簽,y_prob表示預(yù)測(cè)概率,axis表示軸
def score_dummy2(self, y_true, y_prob, axis=0):
return 1.0
# 創(chuàng)建一個(gè)Scores對(duì)象
scores = Scores()
# 添加一個(gè)指標(biāo)
add_metric(
id='mydummy2', # 指標(biāo)的唯一標(biāo)識(shí)符
name='DUMMY2', # 指標(biāo)的名稱
score_func=scores.score_dummy2, # 指標(biāo)的計(jì)算函數(shù)
target='pred_proba', # 指標(biāo)的目標(biāo)值,這里是預(yù)測(cè)概率
greater_is_better=True, # 指標(biāo)的得分越大越好
)
Name DUMMY2
Display Name DUMMY2
Score Function <bound method Scores.score_dummy2 of <__main__...
Scorer make_scorer(score_dummy2, needs_proba=True, er...
Target pred_proba
Args {}
Greater is Better True
Multiclass True
Custom True
Name: mydummy2, dtype: object
# 調(diào)用compare_models函數(shù),傳入?yún)?shù)include=test_models、n_select=2、sort="DUMMY2"和parallel=FugueBackend("dask")
compare_models(include=test_models, n_select=2, sort="DUMMY2", parallel=FugueBackend("dask"))
[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
max_depth=None, max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_samples_leaf=1,
min_samples_split=2, min_weight_fraction_leaf=0.0,
random_state=0, splitter='best'),
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=1000,
multi_class='auto', n_jobs=None, penalty='l2',
random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
warm_start=False)]
# 這是一個(gè)函數(shù)定義,函數(shù)名為pull
def pull():
# 這是一個(gè)空函數(shù),沒(méi)有任何代碼
pass
(7) Spark設(shè)置
強(qiáng)烈建議每個(gè)Spark執(zhí)行器上只有一個(gè)worker,這樣worker可以充分利用所有的CPU(設(shè)置spark.task.cpus
)。當(dāng)你這樣做時(shí),你應(yīng)該明確地在setup
中設(shè)置n_jobs
為每個(gè)執(zhí)行器的CPU數(shù)量。
executor_cores = 4
spark = SparkSession.builder.config("spark.task.cpus", executor_cores).config("spark.executor.cores", executor_cores).getOrCreate()
setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=executor_cores)
compare_models(n_select=2, parallel=FugueBackend(spark))
(8) Dask
Dask有假分布式模式,例如默認(rèn)的(多線程)和多進(jìn)程模式。默認(rèn)模式可以正常工作(但實(shí)際上是按順序運(yùn)行的),而多進(jìn)程模式目前對(duì)PyCaret不起作用,因?yàn)樗鼤?huì)干擾PyCaret的全局變量。另一方面,任何Spark執(zhí)行模式都可以正常工作。
(9) 本地并行化
對(duì)于嘗試非平凡數(shù)據(jù)和模型的實(shí)際用途,本地并行化(最簡(jiǎn)單的方法是使用上面顯示的本地Dask作為后端)通常沒(méi)有性能優(yōu)勢(shì)。因?yàn)樵谟?xùn)練過(guò)程中很容易超載CPU,增加資源爭(zhēng)用。本地并行化的價(jià)值在于驗(yàn)證代碼,并讓你相信分布式環(huán)境將在更短的時(shí)間內(nèi)提供預(yù)期的結(jié)果。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-760410.html
(10) 如何開(kāi)發(fā)
分布式系統(tǒng)很強(qiáng)大,但你必須遵循一些良好的實(shí)踐來(lái)使用它們:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-760410.html
-
從小到大: 最初,你必須從一小組數(shù)據(jù)開(kāi)始,例如在
compare_model
中將你想嘗試的模型限制為一小組廉價(jià)模型,當(dāng)你驗(yàn)證它們工作正常后,可以切換到更大的模型集合。 -
從本地到分布式: 你應(yīng)該按照這個(gè)順序進(jìn)行:先在本地驗(yàn)證小數(shù)據(jù),然后在分布式環(huán)境下驗(yàn)證小數(shù)據(jù),最后在分布式環(huán)境下驗(yàn)證大數(shù)據(jù)。當(dāng)前的設(shè)計(jì)使過(guò)渡無(wú)縫。你可以按順序進(jìn)行這些操作:
parallel=None
->parallel=FugueBackend()
->parallel=FugueBackend(spark)
。在第二步中,你可以替換為本地的SparkSession或本地的dask。
到了這里,關(guān)于工具系列:PyCaret介紹_Fugue 集成_Spark、Dask分布式訓(xùn)練的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!