最近布置了個課堂作業(yè),用python實現(xiàn)決策樹算法?。整了幾天勉勉強(qiáng)強(qiáng)畫出了棵歪脖子樹,記錄一下。
大體思路:
1.創(chuàng)建決策樹My_Decision_Tree類,類函數(shù)__init__()初始化參數(shù)、fit()進(jìn)行決策樹模型訓(xùn)練、predict()進(jìn)行預(yù)測、evaluate()進(jìn)行模型評估、save_model()保存模型(csv格式)、load_model()加載模型、show_tree()使用Pillow庫繪制決策樹以及其他一些封裝的功能函數(shù);
2.最佳劃分點的度量通常有Gini值、Entropy、Classification error等,本程序采用Gini值,計算方法如下:?
構(gòu)建決策數(shù)模型所用到的功能函數(shù)(gini值計算相關(guān))說明:?
①get_col_gini(self,threshold_point, value_series, label_series):計算一列數(shù)據(jù)在某一閾值劃分點下的gini_split值;
②get_best_point(self,value_series, label_series):對連續(xù)型數(shù)據(jù)進(jìn)行大小排序,分別計算切分點及其對應(yīng)的gini_split值,找到使得gini_split值最小的最佳切分點;
③get_best_column(self,data,label_series):遍歷數(shù)據(jù)表中的屬性列,計算每列的最佳劃分點和gini_split值,并對比找到最適合劃分的一列及其對應(yīng)的劃分點;
3.決策樹構(gòu)建(訓(xùn)練)的邏輯:?
①程序開始以root為父節(jié)點,先找到6列屬性(剩余屬性)中最適合劃分的列和劃分點,并將其劃分為兩個子節(jié)點,記錄判斷條件和此節(jié)點位置。刪除子節(jié)點中的該列屬性以避免最終決策樹模型傾向于某個屬性,利用剩余的屬性繼續(xù)構(gòu)造決策樹;
②判斷各個子節(jié)點中的標(biāo)簽是否相同,如果為同一類則移到葉節(jié)點中,否則將此節(jié)點更新到下個流程中的父節(jié)點中;
③一直循環(huán)以上過程,當(dāng)父節(jié)點為空時結(jié)束訓(xùn)練,如果所有子節(jié)點都為葉節(jié)點時則決策樹完美將數(shù)據(jù)分類;當(dāng)然也會出現(xiàn)所有屬性都用完但子節(jié)點中標(biāo)簽依舊不唯一的情況,這時以該節(jié)點中個數(shù)較多的標(biāo)簽類作為分類結(jié)果,終止模型構(gòu)建。
# -*- coding: utf-8 -*-
# @Version: Python 3.8.2
# @Author: 707
# @Use: 決策樹算法編寫
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split #數(shù)據(jù)集劃分
#采用Pillow可視化決策樹
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
###決策樹框架###
class My_Decision_Tree(object):
'''決策樹框架'''
def __init__(self, arg=None):
##初始化類參數(shù)
self.arg = arg
#存放決策樹中的各層判斷條件
self.decision_df=pd.DataFrame(columns=['parent_depth','parent_pos','this_depth','this_pos','column','point','label'])
self.Parent_node_list=[] #存放父節(jié)點和子節(jié)點的DataFrame
self.Child_node_list=[]
self.leaf_list=[] #存放劃分好的葉節(jié)點
self.branch_list=[] #存放未能劃分出來的節(jié)點
def fit(self,x_train,y_train):
'''傳入訓(xùn)練集和標(biāo)簽構(gòu)造決策樹'''
'''
程序邏輯:
(1)程序開始以root為父節(jié)點,先找到6列屬性(剩余屬性)中最適合劃分的列和劃分點,并將其劃分為兩個子節(jié)點,
記錄判斷條件和此節(jié)點位置。刪除子節(jié)點中的該列屬性以避免最終決策樹模型傾向于某個屬性,利用剩余的屬性繼續(xù)構(gòu)造決策樹
(2)判斷各個子節(jié)點中的標(biāo)簽是否相同,如果為同一類則移到葉節(jié)點中,否則將此節(jié)點更新到下個流程中的父節(jié)點中
(3)一直循環(huán)以上過程,當(dāng)父節(jié)點為空時結(jié)束訓(xùn)練,如果所有子節(jié)點都為葉節(jié)點時則決策樹完美將數(shù)據(jù)分類;當(dāng)然也會出現(xiàn)所有屬性都用完但子節(jié)點
中標(biāo)簽依舊不唯一的情況,這時以該節(jié)點中個數(shù)較多的標(biāo)簽類作為分類結(jié)果,終止模型構(gòu)建。
'''
x_data=x_train.copy()
if(y_train.name not in x_data.columns):
x_data[y_train.name]=y_train
#把第一層(原數(shù)據(jù))放入父節(jié)點列表Parent_node_list
self.Parent_node_list.append(x_data)
#寫入第一層的決策(跟節(jié)點)
decision={'this_depth':1,'this_pos':0,'column':'root','label':'#'}
self.decision_df=self.decision_df.append(decision,ignore_index=True)
#開始循環(huán)計算分類節(jié)點
parent_count=0 #循環(huán)的父節(jié)點數(shù)
child_pos=0 #子節(jié)點的位置
depth=2 #第幾層節(jié)點
while True:
parent_node=self.Parent_node_list[parent_count]
#找到第一個適合劃分的列和劃分點
col_1,point_1=self.get_best_column(parent_node,parent_node[y_train.name])
print('decision condition:',col_1,point_1)
#根據(jù)條件把父節(jié)點劃分為兩部分
Child_node1=parent_node.loc[parent_node[col_1]<=point_1]
Child_node2=parent_node.loc[parent_node[col_1]>point_1]
#每一部分的分類結(jié)果
result=[]
for child_node in [Child_node1,Child_node2]:
#刪除已使用過的屬性
del(child_node[col_1])
#判斷子節(jié)點標(biāo)簽是否為一類,是則將其放入葉節(jié)點列表中
if(len(child_node[y_train.name].unique())==1):
self.leaf_list.append(child_node)
print('添加一個葉節(jié)點,標(biāo)簽類型:',child_node[y_train.name].unique()[0],'數(shù)據(jù)大小:',child_node.shape)
result.append(child_node[y_train.name].unique()[0])
# 判斷子節(jié)點標(biāo)簽是否還有剩余屬性可以作為分類依據(jù),如果有則添加到子節(jié)點列表中用于后續(xù)分類
elif(child_node.shape[1]!=1):
self.Child_node_list.append(child_node)
print('添加一個子節(jié)點,數(shù)據(jù)大小:',child_node.shape)
result.append('#')
#都不滿足說明該節(jié)點沒有分完但是分不下去了,提示錯誤
else:
self.branch_list.append(child_node)
print('child_node節(jié)點已用完所有屬性,仍然沒分出來,剩余數(shù)據(jù)大小:')
print(child_node[y_train.name].value_counts())
values=child_node[y_train.name].value_counts()
if(len(values)==0):
replace=list(parent_node[y_train.name].value_counts().index)[0]
else:
replace=list(values.index)[0]
print('用%s作為該條件下的預(yù)測結(jié)果' %replace)
result.append(replace)
#找到該父節(jié)點在該層中所對應(yīng)的位置
p_pos_list=self.decision_df.loc[(self.decision_df['this_depth']==depth-1)&(self.decision_df['label']=='#'),'this_pos']
p_pos_list=list(p_pos_list)
# print('p_pos_list:',p_pos_list)
#判斷完一個父節(jié)點之后,把判斷條件加入decision_df中
decision1={'parent_depth':depth-1,'parent_pos':p_pos_list[parent_count],'this_depth':depth,'this_pos':child_pos,
'column':col_1,'point':point_1,'label':result[0]}
decision2={'parent_depth':depth-1,'parent_pos':p_pos_list[parent_count],'this_depth':depth,'this_pos':child_pos+1,
'column':col_1,'point':point_1,'label':result[1]}
self.decision_df=self.decision_df.append([decision1,decision2],ignore_index=True)
#當(dāng)遍歷完父節(jié)點列表所有值后,將子節(jié)點更新為父節(jié)點
child_pos+=2
parent_count+=1
if(parent_count==len(self.Parent_node_list)):
parent_count=0
child_pos=0
depth+=1
print('該層決策結(jié)束,進(jìn)行下一層決策\n')
self.Parent_node_list=self.Child_node_list.copy()
self.Child_node_list.clear()
print('更新parent_node_list,大?。?d' %len(self.Parent_node_list))
#判斷父節(jié)點列表中是否還有未分類的節(jié)點,如果沒有則表示已經(jīng)全部分好,結(jié)束訓(xùn)練
if(len(self.Parent_node_list)==0):
print('決策樹構(gòu)建完成')
#顯示構(gòu)建好的決策樹:判斷條件及結(jié)果(葉節(jié)點)
print(self.decision_df)
break
def predict(self,x_test):
'''輸入測試數(shù)據(jù)進(jìn)行決策判斷'''
y_predict=list()
if(type(x_test)==pd.core.series.Series):
pred=self.get_ylabel(x_test)
y_predict.append(pred)
else:
for index,row in x_test.iterrows():
pred=self.get_ylabel(row)
y_predict.append(pred)
y_predict=np.array(y_predict,dtype=str)
return y_predict
def evaluate(self,x_test,y_test):
'''輸入測試集和標(biāo)簽評估決策樹準(zhǔn)確性,返回acc'''
y_true=np.array(y_test,dtype=str)
y_pred=self.predict(x_test)
# print(y_pred)
# print(y_true)
label_list=list(self.decision_df['label'].unique())
label_list.remove('#')
label_list=np.array(label_list,dtype=str) #類型轉(zhuǎn)換
#創(chuàng)建混淆矩陣(index為true,columns為pred)
confusion_matrix=pd.DataFrame(data=0,columns=label_list,index=label_list)
for i in range(len(y_true)):
confusion_matrix.loc[y_true[i],y_pred[i]]+=1
print('混淆矩陣:')
print(confusion_matrix)
#計算準(zhǔn)確率
acc=0
for i in range(len(label_list)):
acc+=confusion_matrix.iloc[i,i]
acc/=len(y_true)
print('acc:%.5f' %acc)
return acc
def save_model(self,path):
'''以csv格式保存模型'''
self.decision_df.to_csv(path,index=False)
def load_model(self,path):
'''以csv格式讀取模型'''
self.decision_df=pd.read_csv(path)
def get_col_gini(self,threshold_point, value_series, label_series):
'''Gini值計算函數(shù)'''
# 將輸入進(jìn)行重組
df_input = pd.DataFrame()
df_input['value'] = value_series
df_input['label'] = label_series
# print(df_input)
# 設(shè)計Gini值的計算表格
label_cols = label_series.value_counts()
df_gini = pd.DataFrame(columns=['node1', 'node2'], index=label_cols.index)
for c in label_cols.index:
df_c = df_input.loc[df_input['label'] == c]
df_gini.loc[c, 'node1'] = df_c.loc[df_c['value']<= threshold_point].shape[0]
df_gini.loc[c, 'node2'] = df_c.loc[df_c['value']> threshold_point].shape[0]
#計算node1、node2節(jié)點gini值中和的部分
sum_n1=df_gini['node1'].sum()
sum_n2=df_gini['node2'].sum()
# print(df_gini)
# 計算node節(jié)點gini值
gini_n1=gini_n2=0
if(sum_n1==0):
for c in label_cols.index:
gini_n2+=(df_gini.loc[c,'node2']/sum_n2)**2
elif(sum_n2==0):
for c in label_cols.index:
gini_n1+=(df_gini.loc[c,'node1']/sum_n1)**2
else:
for c in label_cols.index:
gini_n1+=(df_gini.loc[c,'node1']/sum_n1)**2
gini_n2+=(df_gini.loc[c,'node2']/sum_n2)**2
gini_n1 = 1-gini_n1
gini_n2 = 1-gini_n2
#計算gini_split
gini_split=sum_n1/(sum_n1+sum_n2)*gini_n1 +sum_n2/(sum_n1+sum_n2)*gini_n2
# print("point:%f,gini_split:%f" %(threshold_point,gini_split))
return gini_split
def get_best_point(self,value_series, label_series):
'''找到一列屬性中最適合劃分(gini值最小)的點'''
value_array=np.array(value_series)
value_array=np.sort(value_array)
df_point = pd.DataFrame(columns=['point', 'gini_value'])
# 循環(huán)屬性值列,計算劃分點及其gini值并添加至df_point數(shù)據(jù)表中
for i in range(len(value_array) + 1):
if(i == 0):
point = value_array[i] - 1
elif(i == len(value_array)):
point = value_array[i - 1]
else:
point = 0.5 * (value_array[i] + value_array[i - 1])
gini = self.get_col_gini(point, value_series, label_series)
s = pd.Series(data={'point': point, 'gini_value': gini})
df_point.loc[i] = s
df_point.sort_values(by='gini_value', inplace=True)
best_point = df_point.iloc[0, 0]
best_gini = df_point.iloc[0,1]
# print("best point for column '%s':%f" %(value_series.name,best_point))
# print(df_point)
return best_point,best_gini
def get_best_column(self,data,label_series):
'''遍歷data中的屬性列,計算其最佳劃分點及gini值,找出最適合劃分的一列和劃分點'''
x_data=data.copy()
if(label_series.name in x_data.columns):
del(x_data[label_series.name])
gini_columns=pd.DataFrame(columns=['point','gini'],index=x_data.columns)
for col_name in x_data.columns:
point,gini=self.get_best_point(x_data[col_name],label_series)
s=pd.Series({'point':point,'gini':gini})
gini_columns.loc[col_name]=[point,gini]
# gini_columns=gini_columns.append(s,ignore_index=True) #append會更改索引
gini_columns.sort_values(by='gini',inplace=True)
# print(gini_columns)
best_col=gini_columns.index[0]
best_point=gini_columns.iloc[0,0]
return best_col,best_point
def get_ylabel(self,x_series):
'''計算一行x數(shù)據(jù)(Series)對應(yīng)的標(biāo)簽'''
model=self.decision_df
y_pred='#'
x_index=1
parent_index=[]
child_df=pd.DataFrame()
# for i in range(1):
while (y_pred=='#'):
#判斷條件
condition=[model.loc[x_index,'column'],model.loc[x_index,'point']]
if(x_series[condition[0]]>condition[1]):
x_index+=1
# print('%s>%f' %(condition[0],condition[1]))
# else:
# print('%s<=%f' %(condition[0],condition[1]))
y_pred=model.loc[x_index,'label']
#更新父節(jié)點索引并找到其子節(jié)點
parent_index=[model.loc[x_index,'this_depth'],model.loc[x_index,'this_pos']]
child_df=model.loc[(model['parent_depth']==parent_index[0])&(model['parent_pos']==parent_index[1])]
#找到標(biāo)簽時結(jié)束
if(child_df.shape[0]!=0):
x_index=list(child_df.index)[0]
# print('跳到第%d行繼續(xù)判斷' %x_index)
# else:
# print('預(yù)測結(jié)束')
# print('pred:',y_pred)
return y_pred
def show_tree(self):
'''將決策樹進(jìn)行可視化'''
def add_text(im_draw,text_str,xy,multiline=1):
'''在繪圖對象的某個位置添加文字'''
#設(shè)置大小
font_h,font_w=25,14
font_h*=multiline
text_len=round(len(text_str)/multiline)
font=ImageFont.truetype(font='simsun.ttc',size=20)
im_draw.text(xy=(xy[0]-font_w*3,xy[1]),text=text_str,font=font,fill='black',align='center')
#繪制矩形
# im_draw.rectangle(xy=(xy[0],xy[1],xy[0]+font_w*text_len,xy[1]+font_h),outline='black',width=2)
interval_x,interval_y=60,80
model=self.decision_df.copy()
model['x_pos']=model['this_pos']
model['y_pos']=(model['this_depth']-1)*interval_y
model['text']='text'
max_depth=model.iloc[-1,2]
#創(chuàng)建圖像
img_w,img_h=1500,600
tree_img=Image.new(mode='RGB',size=(img_w,img_h),color='white')
draw=ImageDraw.Draw(tree_img) #創(chuàng)建繪圖對象
parent_pos=[]
parent_x_pos=0
x_pos=0
for x_index in model.index:
text=model.loc[x_index,'column']
if (str(model.loc[x_index,'point']) == 'nan'):
x_pos=img_w/4
else:
#跟新text內(nèi)容和x位置
model.loc[x_index,'x_pos']=x_pos
parent_pos=[model.loc[x_index,'parent_depth'],model.loc[x_index,'parent_pos']]
parent_x_pos=model.loc[(model['this_depth']==parent_pos[0])&(model['this_pos']==parent_pos[1]),'x_pos']
depth=model.loc[x_index,'this_depth']-1
if(model.loc[x_index,'this_pos']%2==0):
text+='\n<='+('%.3f' %model.loc[x_index,'point'])
x_pos=parent_x_pos-interval_x*np.sqrt(max_depth-depth)
else:
text+='\n>'+('%.3f' %model.loc[x_index,'point'])
x_pos=parent_x_pos+interval_x*np.sqrt(max_depth-depth)
x_pos=x_pos.iloc[0]
if(model.loc[x_index,'label'] !='#'):
text+='\nClass:'+str(model.loc[x_index,'label'])
#將文字和位置添加到
model.loc[x_index,'text']=text
model.loc[x_index,'x_pos']=x_pos
# 調(diào)整節(jié)點橫坐標(biāo)位置
gap=140
for depth in model['this_depth'].unique():
if(depth!=1):
same_depth=model.loc[model['this_depth']==depth]
for x_index in same_depth.index[:-1]:
if(x_index==same_depth.index[0]):
if((model.loc[same_depth.index[0],'x_pos']-model.loc[same_depth.index[-1],'x_pos'])<gap*len(same_depth.index)):
#如果整體太擠,整層先往左移一段
for i in same_depth.index:
model.loc[i,'x_pos']-=gap*len(same_depth.index)/8
#如果相鄰兩個靠太近,右邊的往右移一點
if((model.loc[x_index+1,'x_pos']-model.loc[x_index,'x_pos'])<gap):
model.loc[x_index+1,'x_pos']=model.loc[x_index,'x_pos']+gap
# model.loc[x_index,'x_pos']-=gap/2
#繪制文字和線
this_img_pos=[]
parent_img_pos=[]
for x_index in model.index:
#繪制直線
if(x_index !=0):
this_img_pos=[model.loc[x_index,'x_pos'],model.loc[x_index,'y_pos']]
parent_pos=[model.loc[x_index,'parent_depth'],model.loc[x_index,'parent_pos']]
parent_img_pos=model.loc[(model['this_depth']==parent_pos[0])&(model['this_pos']==parent_pos[1]),['x_pos','y_pos']]
parent_img_pos=[parent_img_pos.iloc[0,0],parent_img_pos.iloc[0,1]]
draw.line(xy=(parent_img_pos[0],parent_img_pos[1],this_img_pos[0],this_img_pos[1]),fill='gray',width=1)
#添加文字
this_pos=(model.loc[x_index,'x_pos'],model.loc[x_index,'y_pos'])
text=model.loc[x_index,'text']
add_text(im_draw=draw,text_str=text,xy=this_pos)
#顯示圖片
tree_img.show()
if __name__ == '__main__':
# 讀取文件
data = pd.read_csv('./paras_labels.csv')
#數(shù)據(jù)按8:2進(jìn)行訓(xùn)練集、測試集切分
x_train,x_test,y_train,y_test=train_test_split(data,data['label'],test_size=0.2,random_state=7)
ds_tree=My_Decision_Tree()
ds_tree.fit(x_train,y_train)
# ds_tree.save_model('my_decision_tree%d.csv' %e)
# ds_tree.load_model('my_decision_tree%d.csv' %e)
# print(ds_tree.decision_df)
print('訓(xùn)練集評估模型:')
ds_tree.evaluate(x_train,y_train)
print('測試集評估模型:')
ds_tree.evaluate(x_test,y_test)
ds_tree.show_tree()
結(jié)果:
用我的數(shù)據(jù)跑了一下,成功長出一棵歪脖子樹,nice!?
?代碼能力有限,有錯誤的地方歡迎大佬們交流,批評指正[狗頭抱拳]
?文章來源地址http://www.zghlxwxcb.cn/news/detail-451274.html
?文章來源:http://www.zghlxwxcb.cn/news/detail-451274.html
?
到了這里,關(guān)于python編程實現(xiàn)決策樹算法的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!