當(dāng)特征數(shù)量或者模型數(shù)量很多的時(shí)候,使用PySpark
去計(jì)算相關(guān)風(fēng)控指標(biāo)會(huì)節(jié)省很多的時(shí)間。網(wǎng)上關(guān)于使用PySpark
計(jì)算相關(guān)風(fēng)控指標(biāo)的資料較少,尤其是PSI計(jì)算不管是國(guó)內(nèi)還是國(guó)外相關(guān)的代碼都沒(méi)有正確的,這里拋磚引玉,寫了三個(gè)風(fēng)控常用的指標(biāo)AUC,KS和PSI相關(guān)的計(jì)算方法,供參考。
AUC
AUC的相關(guān)概念網(wǎng)上已經(jīng)有很多的很好的文章,這里不在贅述,AUC使用的到的計(jì)算公式如下:
其中M
為負(fù)類樣本的數(shù)目,N
為正類樣本的數(shù)目
使用PySpark
計(jì)算代碼如下:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-659816.html
from pyspark.sql import functions as F
from pyspark.sql.window import Window
true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'
auc_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
.select(true_y_col, pred_y_col, date_col, 'model_name')\
.withColumn('totalbad', F.sum(F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
.withColumn('totalgood', F.sum(1-F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
.withColumn('rnk2', F.row_number().over(Window.partitionBy(date_col, 'model_name').orderBy(F.col(pred_y_col).asc())))\
.filter(F.col(true_y_col)==1)\
.groupBy(date_col, 'model_name')\
.agg(((F.sum(F.col('rnk2'))-0.5*(F.max(F.col('totalbad')))*(1+F.max(F.col('totalbad'))))/(F.max(F.col('totalbad'))*F.max(F.col('totalgood')))).alias('AUC'))\
.orderBy('model_name', date_col)
KS
KS統(tǒng)計(jì)量是基于經(jīng)驗(yàn)累積分布函數(shù)(Empirical Cumulative Distribution Function,ECDF)
建立的,一般定義為:
即為TPR
與FPR
差值絕對(duì)值的最大值。
KS計(jì)算方法有很多種,這里使用的是分箱法分別計(jì)算TPR
與FPR
,然后得到KS。
使用PySpark
計(jì)算代碼如下:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'
nBins = 10
ks_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
.select(true_y_col, pred_y_col, date_col, 'model_name')\
.withColumn('Bin', F.ntile(nBins).over(Window.partitionBy(date_col, 'model_name').orderBy(pred_y_col)))\
.groupBy(date_col, 'model_name', 'Bin').agg(F.sum(true_y_col).alias('N_1'), F.sum(1-F.col(true_y_col)).alias('N-0'))\
.withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name')))\
.withColumn('SUM_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
.withColumn('KSn', F.expr('round(abs(SUM_1/ALL_1-SUM_0/ALL_0),6)'))\
.withColumn('KS', F.round(F.max('KSn').over(Window.partitionBy(date_col, 'model_name')),6))
ks_df = ks_df.select(date_col, 'model_name', 'KS').filter(col('KS').isNotNull()).dropDuplicates()
PSI
群體穩(wěn)定性指標(biāo)(Population Stability Index,PSI)是風(fēng)控場(chǎng)景常用的驗(yàn)證樣本在各分?jǐn)?shù)段的分布與建模樣本分布的穩(wěn)定性。在建模中,常用來(lái)篩選特征變量、評(píng)估模型穩(wěn)定性。
計(jì)算公式如下:
其中\(A_i\)代表的是第i個(gè)分箱中實(shí)際分布(actual)樣本占比,同理\(E_i\)代表的是第i個(gè)分箱中預(yù)期分布(excepted)樣本占比文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-659816.html
使用PySpark
計(jì)算代碼如下:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when
date_col = 'day'
nBins = 10
feature_list = ['fea_1', 'fea_2', 'fea_3']
df = df.withColumn('flag', when(F.col(date_col) == 'actual_date', 0).when(F.col(date_col) == 'excepted_date', 1).otherwise(None)
quantitles = df.filter(F.col('flag') == 0)\
.approxQuantile(feature_list, [i/nBins for i in range(1, nBins)], 0.001) # 基準(zhǔn)樣本分箱
quantitles_dict = {col: quantitles[idx] for idx, col in enumerate(feature_list)}
f_quantitles_dict = F.create_map([F.lit(x) if isinstance(x, str) else F.array(*[F.lit(xx) for xx in x]) for i in quantitles_dict.items() for x in i])
unpivotExpr = "stack(3, 'fea_1', fea_1, 'fea_2', fea_2, 'fea_3', fea_3)"
psi_df = df.filter(F.col('flag').isNotNull()).select('flag', F.expr(unpivotExpr))\
.withColumn('Bin', when(F.col('value').isNull(), 'Missing').otherwise(
when(F.col('value') < f_quantitles_dict[F.col('varname')][0], 'bin_0')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][1], 'bin_1')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][2], 'bin_2')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][3], 'bin_3')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][4], 'bin_4')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][5], 'bin_5')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][6], 'bin_6')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][7], 'bin_7')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_8')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_9')))\
.groupBy('varname', 'Bin').agg(F.sum('flag').alias('N_1'), F.sum(1-F.col('flag')).alias('N_0'))\
.withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy('varname')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy('varname')))\
.withColumn('actual', F.expr('round(N_0/ALL_0, 6)'))\
.withColumn('excepted', F.expr('round(N_1/ALL_1, 6)'))\
.withColumn('PSIn', F.expr('round((actual-excepted)*ln(actual/excepted), 6'))\
.withColumn('PSI', F.round(F.sum('PSIn').over(Window.partitionBy('varname')), 6))
Reference
- 【風(fēng)控算法】二、SQL->Python->PySpark計(jì)算KS,AUC及PSI
- 風(fēng)控模型—區(qū)分度評(píng)估指標(biāo)(KS)深入理解應(yīng)用
- 風(fēng)控模型—群體穩(wěn)定性指標(biāo)(PSI)深入理解應(yīng)用
到了這里,關(guān)于使用PySpark計(jì)算AUC,KS與PSI的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!