一、平方誤差的計(jì)算
square_error_utils.py
import numpy as np
class SquareErrorUtils:
"""
平方誤差最小化準(zhǔn)則,選擇其中最優(yōu)的一個(gè)作為切分點(diǎn)
對(duì)特征屬性進(jìn)行分箱處理
"""
@staticmethod
def _set_sample_weight(sample_weight, n_samples):
"""
擴(kuò)展到集成學(xué)習(xí),此處為樣本權(quán)重的設(shè)置
:param sample_weight: 各樣本的權(quán)重
:param n_samples: 樣本量
:return:
"""
if sample_weight is None:
sample_weight = np.asarray([1.0] * n_samples)
return sample_weight
@staticmethod
def square_error(y, sample_weight):
"""
平方誤差
:param y: 當(dāng)前劃分區(qū)域的目標(biāo)值集合
:param sample_weight: 當(dāng)前樣本的權(quán)重
:return:
"""
y = np.asarray(y)
return np.sum((y - y.mean()) ** 2 * sample_weight)
def cond_square_error(self, x, y, sample_weight):
"""
計(jì)算根據(jù)某個(gè)特征x劃分的區(qū)域中y的誤差值
:param x: 某個(gè)特征劃分區(qū)域所包含的樣本
:param y: x對(duì)應(yīng)的目標(biāo)值
:param sample_weight: 當(dāng)前x的權(quán)重
:return:
"""
x, y = np.asarray(x), np.asarray(y)
error = 0.0
for x_val in set(x):
x_idx = np.where(x == x_val) # 按區(qū)域計(jì)算誤差
new_y = y[x_idx] # 對(duì)應(yīng)區(qū)域的目標(biāo)值
new_sample_weight = sample_weight[x_idx]
error += self.square_error(new_y, new_sample_weight)
return error
def square_error_gain(self, x, y, sample_weight=None):
"""
平方誤差帶來的增益值
:param x: 某個(gè)特征變量
:param y: 對(duì)應(yīng)的目標(biāo)值
:param sample_weight: 樣本權(quán)重
:return:
"""
sample_weight = self._set_sample_weight(sample_weight, len(x))
return self.square_error(y, sample_weight) - self.cond_square_error(x, y, sample_weight)
?二、樹的結(jié)點(diǎn)信息封裝
class TreeNode_R:
"""
決策樹回歸算法,樹的結(jié)點(diǎn)信息封裝,實(shí)體類:setXXX()、getXXX()
"""
def __init__(self, feature_idx: int = None, feature_val=None, y_hat=None, square_error: float = None,
criterion_val=None, n_samples: int = None, left_child_Node=None, right_child_Node=None):
"""
決策樹結(jié)點(diǎn)信息封裝
:param feature_idx: 特征索引,如果指定特征屬性的名稱,可以按照索引取值
:param feature_val: 特征取值
:param square_error: 劃分結(jié)點(diǎn)的標(biāo)準(zhǔn):當(dāng)前結(jié)點(diǎn)的平方誤差
:param n_samples: 當(dāng)前結(jié)點(diǎn)所包含的樣本量
:param y_hat: 當(dāng)前結(jié)點(diǎn)的預(yù)測(cè)值:Ci
:param left_child_Node: 左子樹
:param right_child_Node: 右子樹
"""
self.feature_idx = feature_idx
self.feature_val = feature_val
self.criterion_val = criterion_val
self.square_error = square_error
self.n_samples = n_samples
self.y_hat = y_hat
self.left_child_Node = left_child_Node # 遞歸
self.right_child_Node = right_child_Node # 遞歸
def level_order(self):
"""
按層次遍歷樹...
:return:
"""
pass
# def get_feature_idx(self):
# return self.get_feature_idx()
#
# def set_feature_idx(self, feature_idx):
# self.feature_idx = feature_idx
三、回歸決策樹CART算法實(shí)現(xiàn)
import numpy as np
from utils.square_error_utils import SquareErrorUtils
from utils.tree_node_R import TreeNode_R
from utils.data_bin_wrapper import DataBinsWrapper
class DecisionTreeRegression:
"""
回歸決策樹CART算法實(shí)現(xiàn):按照二叉樹構(gòu)造
1. 劃分標(biāo)準(zhǔn):平方誤差最小化
2. 創(chuàng)建決策樹fit(),遞歸算法實(shí)現(xiàn),注意出口條件
3. 預(yù)測(cè)predict_proba()、predict() --> 對(duì)樹的搜索
4. 數(shù)據(jù)的預(yù)處理操作,尤其是連續(xù)數(shù)據(jù)的離散化,分箱
5. 剪枝處理
"""
def __init__(self, criterion="mse", max_depth=None, min_sample_split=2, min_sample_leaf=1,
min_target_std=1e-3, min_impurity_decrease=0, max_bins=10):
self.utils = SquareErrorUtils() # 結(jié)點(diǎn)劃分類
self.criterion = criterion # 結(jié)點(diǎn)的劃分標(biāo)準(zhǔn)
if criterion.lower() == "mse":
self.criterion_func = self.utils.square_error_gain # 平方誤差增益
else:
raise ValueError("參數(shù)criterion僅限mse...")
self.min_target_std = min_target_std # 最小的樣本目標(biāo)值方差,小于閾值不劃分
self.max_depth = max_depth # 樹的最大深度,不傳參,則一直劃分下去
self.min_sample_split = min_sample_split # 最小的劃分結(jié)點(diǎn)的樣本量,小于則不劃分
self.min_sample_leaf = min_sample_leaf # 葉子結(jié)點(diǎn)所包含的最小樣本量,剩余的樣本小于這個(gè)值,標(biāo)記葉子結(jié)點(diǎn)
self.min_impurity_decrease = min_impurity_decrease # 最小結(jié)點(diǎn)不純度減少值,小于這個(gè)值,不足以劃分
self.max_bins = max_bins # 連續(xù)數(shù)據(jù)的分箱數(shù),越大,則劃分越細(xì)
self.root_node: TreeNode_R() = None # 回歸決策樹的根節(jié)點(diǎn)
self.dbw = DataBinsWrapper(max_bins=max_bins) # 連續(xù)數(shù)據(jù)離散化對(duì)象
self.dbw_XrangeMap = {} # 存儲(chǔ)訓(xùn)練樣本連續(xù)特征分箱的端點(diǎn)
def fit(self, x_train, y_train, sample_weight=None):
"""
回歸決策樹的創(chuàng)建,遞歸操作前的必要信息處理(分箱)
:param x_train: 訓(xùn)練樣本:ndarray,n * k
:param y_train: 目標(biāo)集:ndarray,(n, )
:param sample_weight: 各樣本的權(quán)重,(n, )
:return:
"""
x_train, y_train = np.asarray(x_train), np.asarray(y_train)
self.class_values = np.unique(y_train) # 樣本的類別取值
n_samples, n_features = x_train.shape # 訓(xùn)練樣本的樣本量和特征屬性數(shù)目
if sample_weight is None:
sample_weight = np.asarray([1.0] * n_samples)
self.root_node = TreeNode_R() # 創(chuàng)建一個(gè)空樹
self.dbw.fit(x_train)
x_train = self.dbw.transform(x_train)
self._build_tree(1, self.root_node, x_train, y_train, sample_weight)
def _build_tree(self, cur_depth, cur_node: TreeNode_R, x_train, y_train, sample_weight):
"""
遞歸創(chuàng)建回歸決策樹算法,核心算法。按先序(中序、后序)創(chuàng)建的
:param cur_depth: 遞歸劃分后的樹的深度
:param cur_node: 遞歸劃分后的當(dāng)前根結(jié)點(diǎn)
:param x_train: 遞歸劃分后的訓(xùn)練樣本
:param y_train: 遞歸劃分后的目標(biāo)集合
:param sample_weight: 遞歸劃分后的各樣本權(quán)重
:return:
"""
n_samples, n_features = x_train.shape # 當(dāng)前樣本子集中的樣本量和特征屬性數(shù)目
# 計(jì)算當(dāng)前數(shù)結(jié)點(diǎn)的預(yù)測(cè)值,即加權(quán)平均值,
cur_node.y_hat = np.dot(sample_weight / np.sum(sample_weight), y_train)
cur_node.n_samples = n_samples
# 遞歸出口判斷
cur_node.square_error = ((y_train - y_train.mean()) ** 2).sum()
# 所有的樣本目標(biāo)值較為集中,樣本方差非常小,不足以劃分
if cur_node.square_error <= self.min_target_std:
# 如果為0,則表示當(dāng)前樣本集合為空,遞歸出口3
return
if n_samples < self.min_sample_split: # 當(dāng)前結(jié)點(diǎn)所包含的樣本量不足以劃分
return
if self.max_depth is not None and cur_depth > self.max_depth: # 樹的深度達(dá)到最大深度
return
# 劃分標(biāo)準(zhǔn),選擇最佳的劃分特征及其取值
best_idx, best_val, best_criterion_val = None, None, 0.0
for k in range(n_features): # 對(duì)當(dāng)前樣本集合中每個(gè)特征計(jì)算劃分標(biāo)準(zhǔn)
for f_val in sorted(np.unique(x_train[:, k])): # 當(dāng)前特征的不同取值
region_x = (x_train[:, k] <= f_val).astype(int) # 是當(dāng)前取值f_val就是1,否則就是0
criterion_val = self.criterion_func(region_x, y_train, sample_weight)
if criterion_val > best_criterion_val:
best_criterion_val = criterion_val # 最佳的劃分標(biāo)準(zhǔn)值
best_idx, best_val = k, f_val # 當(dāng)前最佳特征索引以及取值
# 遞歸出口的判斷
if best_idx is None: # 當(dāng)前屬性為空,或者所有樣本在所有屬性上取值相同,無法劃分
return
if best_criterion_val <= self.min_impurity_decrease: # 小于最小不純度閾值,不劃分
return
cur_node.criterion_val = best_criterion_val
cur_node.feature_idx = best_idx
cur_node.feature_val = best_val
# print("當(dāng)前劃分的特征索引:", best_idx, "取值:", best_val, "最佳標(biāo)準(zhǔn)值:", best_criterion_val)
# print("當(dāng)前結(jié)點(diǎn)的類別分布:", target_dist)
# 創(chuàng)建左子樹,并遞歸創(chuàng)建以當(dāng)前結(jié)點(diǎn)為子樹根節(jié)點(diǎn)的左子樹
left_idx = np.where(x_train[:, best_idx] <= best_val) # 左子樹所包含的樣本子集索引
if len(left_idx) >= self.min_sample_leaf: # 小于葉子結(jié)點(diǎn)所包含的最少樣本量,則標(biāo)記為葉子結(jié)點(diǎn)
left_child_node = TreeNode_R() # 創(chuàng)建左子樹空結(jié)點(diǎn)
# 以當(dāng)前結(jié)點(diǎn)為子樹根結(jié)點(diǎn),遞歸創(chuàng)建
cur_node.left_child_Node = left_child_node
self._build_tree(cur_depth + 1, left_child_node, x_train[left_idx],
y_train[left_idx], sample_weight[left_idx])
right_idx = np.where(x_train[:, best_idx] > best_val) # 右子樹所包含的樣本子集索引
if len(right_idx) >= self.min_sample_leaf: # 小于葉子結(jié)點(diǎn)所包含的最少樣本量,則標(biāo)記為葉子結(jié)點(diǎn)
right_child_node = TreeNode_R() # 創(chuàng)建右子樹空結(jié)點(diǎn)
# 以當(dāng)前結(jié)點(diǎn)為子樹根結(jié)點(diǎn),遞歸創(chuàng)建
cur_node.right_child_Node = right_child_node
self._build_tree(cur_depth + 1, right_child_node, x_train[right_idx],
y_train[right_idx], sample_weight[right_idx])
def _search_tree_predict(self, cur_node: TreeNode_R, x_test):
"""
根據(jù)測(cè)試樣本從根結(jié)點(diǎn)到葉子結(jié)點(diǎn)搜索路徑,判定所屬區(qū)域(葉子結(jié)點(diǎn))
搜索:按照后續(xù)遍歷
:param x_test: 單個(gè)測(cè)試樣本
:return:
"""
if cur_node.left_child_Node and x_test[cur_node.feature_idx] <= cur_node.feature_val:
return self._search_tree_predict(cur_node.left_child_Node, x_test)
elif cur_node.right_child_Node and x_test[cur_node.feature_idx] > cur_node.feature_val:
return self._search_tree_predict(cur_node.right_child_Node, x_test)
else:
# 葉子結(jié)點(diǎn),類別,包含有類別分布
return cur_node.y_hat
def predict(self, x_test):
"""
預(yù)測(cè)測(cè)試樣本x_test的預(yù)測(cè)值
:param x_test: 測(cè)試樣本ndarray、numpy數(shù)值運(yùn)算
:return:
"""
x_test = np.asarray(x_test) # 避免傳遞DataFrame、list...
if self.dbw.XrangeMap is None:
raise ValueError("請(qǐng)先進(jìn)行回歸決策樹的創(chuàng)建,然后預(yù)測(cè)...")
x_test = self.dbw.transform(x_test)
y_test_pred = [] # 用于存儲(chǔ)測(cè)試樣本的預(yù)測(cè)值
for i in range(x_test.shape[0]):
y_test_pred.append(self._search_tree_predict(self.root_node, x_test[i]))
return np.asarray(y_test_pred)
@staticmethod
def cal_mse_r2(y_test, y_pred):
"""
模型預(yù)測(cè)的均方誤差MSE和判決系數(shù)R2
:param y_test: 測(cè)試樣本的真值
:param y_pred: 測(cè)試樣本的預(yù)測(cè)值
:return:
"""
y_test, y_pred = y_test.reshape(-1), y_pred.reshape(-1)
mse = ((y_pred - y_test) ** 2).mean() # 均方誤差
r2 = 1 - ((y_pred - y_test) ** 2).sum() / ((y_test - y_test.mean()) ** 2).sum()
return mse, r2
def _prune_node(self, cur_node: TreeNode_R, alpha):
"""
遞歸剪枝,針對(duì)決策樹中的內(nèi)部結(jié)點(diǎn),自底向上,逐個(gè)考察
方法:后序遍歷
:param cur_node: 當(dāng)前遞歸的決策樹的內(nèi)部結(jié)點(diǎn)
:param alpha: 剪枝閾值
:return:
"""
# 若左子樹存在,遞歸左子樹進(jìn)行剪枝
if cur_node.left_child_Node:
self._prune_node(cur_node.left_child_Node, alpha)
# 若右子樹存在,遞歸右子樹進(jìn)行剪枝
if cur_node.right_child_Node:
self._prune_node(cur_node.right_child_Node, alpha)
# 針對(duì)決策樹的內(nèi)部結(jié)點(diǎn)剪枝,非葉結(jié)點(diǎn)
if cur_node.left_child_Node is not None or cur_node.right_child_Node is not None:
for child_node in [cur_node.left_child_Node, cur_node.right_child_Node]:
if child_node is None:
# 可能存在左右子樹之一為空的情況,當(dāng)左右子樹劃分的樣本子集數(shù)小于min_samples_leaf
continue
if child_node.left_child_Node is not None or child_node.right_child_Node is not None:
return
# 計(jì)算剪枝前的損失值(平方誤差),2表示當(dāng)前結(jié)點(diǎn)包含兩個(gè)葉子結(jié)點(diǎn)
pre_prune_value = 2 * alpha
if cur_node and cur_node.left_child_Node is not None:
pre_prune_value += (0.0 if cur_node.left_child_Node.square_error is None
else cur_node.left_child_Node.square_error)
if cur_node and cur_node.right_child_Node is not None:
pre_prune_value += (0.0 if cur_node.right_child_Node.square_error is None
else cur_node.right_child_Node.square_error)
# 計(jì)算剪枝后的損失值,當(dāng)前結(jié)點(diǎn)即是葉子結(jié)點(diǎn)
after_prune_value = alpha + cur_node.square_error
if after_prune_value <= pre_prune_value: # 進(jìn)行剪枝操作
cur_node.left_child_Node = None
cur_node.right_child_Node = None
cur_node.feature_idx, cur_node.feature_val = None, None
cur_node.square_error = None
def prune(self, alpha=0.01):
"""
決策樹后剪枝算法(李航)C(T) + alpha * |T|
:param alpha: 剪枝閾值,權(quán)衡模型對(duì)訓(xùn)練數(shù)據(jù)的擬合程度與模型的復(fù)雜度
:return:
"""
self._prune_node(self.root_node, alpha)
return self.root_node
?四、回歸決策樹算法的測(cè)試
test_decision_tree_R.py文章來源:http://www.zghlxwxcb.cn/news/detail-831333.html
import numpy as np
import matplotlib.pyplot as plt
from decision_tree_R import DecisionTreeRegression
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
obj_fun = lambda x: np.sin(x)
np.random.seed(0)
n = 100
x = np.linspace(0, 10, n)
target = obj_fun(x) + 0.3 * np.random.randn(n)
data = x[:, np.newaxis] # 二維數(shù)組
tree = DecisionTreeRegression(max_bins=50, max_depth=10)
tree.fit(data, target)
x_test = np.linspace(0, 10, 200)
y_test_pred = tree.predict(x_test[:, np.newaxis])
mse, r2 = tree.cal_mse_r2(obj_fun(x_test), y_test_pred)
plt.figure(figsize=(14, 5))
plt.subplot(121)
plt.scatter(data, target, s=15, c="k", label="Raw Data")
plt.plot(x_test, y_test_pred, "r-", lw=1.5, label="Fit Model")
plt.xlabel("x", fontdict={"fontsize": 12, "color": "b"})
plt.ylabel("y", fontdict={"fontsize": 12, "color": "b"})
plt.grid(ls=":")
plt.legend(frameon=False)
plt.title("Regression Decision Tree(UnPrune) and MSE = %.5f R2 = %.5f" % (mse, r2))
plt.subplot(122)
tree.prune(0.5)
y_test_pred = tree.predict(x_test[:, np.newaxis])
mse, r2 = tree.cal_mse_r2(obj_fun(x_test), y_test_pred)
plt.scatter(data, target, s=15, c="k", label="Raw Data")
plt.plot(x_test, y_test_pred, "r-", lw=1.5, label="Fit Model")
plt.xlabel("x", fontdict={"fontsize": 12, "color": "b"})
plt.ylabel("y", fontdict={"fontsize": 12, "color": "b"})
plt.grid(ls=":")
plt.legend(frameon=False)
plt.title("Regression Decision Tree(Prune) and MSE = %.5f R2 = %.5f" % (mse, r2))
plt.show()
文章來源地址http://www.zghlxwxcb.cn/news/detail-831333.html
到了這里,關(guān)于機(jī)器學(xué)習(xí):回歸決策樹(Python)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!