"""
This module include the cost sensitive decision tree method.
"""
# Authors: Alejandro Correa Bahnsen <al.bahnsen@gmail.com>
# License: BSD 3 clause
import numpy as np
import copy
from ..metrics import cost_loss
from sklearn.base import BaseEstimator
from sklearn.externals import six
import numbers
[docs]class CostSensitiveDecisionTreeClassifier(BaseEstimator):
"""A example-dependent cost-sensitive binary decision tree classifier.
Parameters
----------
criterion : string, optional (default="direct_cost")
The function to measure the quality of a split. Supported criteria are
"direct_cost" for the Direct Cost impurity measure, "pi_cost", "gini_cost",
and "entropy_cost".
criterion_weight : bool, optional (default=False)
Whenever or not to weight the gain according to the population distribution.
num_pct : int, optional (default=100)
Number of percentiles to evaluate the splits for each feature.
splitter : string, optional (default="best")
The strategy used to choose the split at each node. Supported
strategies are "best" to choose the best split and "random" to choose
the best random split.
max_features : int, float, string or None, optional (default=None)
The number of features to consider when looking for the best split:
- If int, then consider `max_features` features at each split.
- If float, then `max_features` is a percentage and
`int(max_features * n_features)` features are considered at each
split.
- If "auto", then `max_features=sqrt(n_features)`.
- If "sqrt", then `max_features=sqrt(n_features)`.
- If "log2", then `max_features=log2(n_features)`.
- If None, then `max_features=n_features`.
Note: the search for a split does not stop until at least one
valid partition of the node samples is found, even if it requires to
effectively inspect more than ``max_features`` features.
max_depth : int or None, optional (default=None)
The maximum depth of the tree. If None, then nodes are expanded until
all leaves are pure or until all leaves contain less than
min_samples_split samples.
Ignored if ``max_samples_leaf`` is not None.
min_samples_split : int, optional (default=2)
The minimum number of samples required to split an internal node.
min_samples_leaf : int, optional (default=1)
The minimum number of samples required to be at a leaf node.
min_gain : float, optional (default=0.001)
The minimum gain that a split must produce in order to be taken into account.
pruned : bool, optional (default=True)
Whenever or not to prune the decision tree using cost-based pruning
Attributes
----------
`tree_` : Tree object
The underlying Tree object.
See also
--------
sklearn.tree.DecisionTreeClassifier
References
----------
.. [1] Correa Bahnsen, A., Aouada, D., & Ottersten, B.
`"Example-Dependent Cost-Sensitive Decision Trees. Expert Systems with Applications" <http://albahnsen.com/files/Example-Dependent%20Cost-Sensitive%20Decision%20Trees.pdf>`__,
Expert Systems with Applications, 42(19), 6609–6619, 2015,
http://doi.org/10.1016/j.eswa.2015.04.042
Examples
--------
>>> from sklearn.ensemble import RandomForestClassifier
>>> from sklearn.cross_validation import train_test_split
>>> from costcla.datasets import load_creditscoring1
>>> from costcla.models import CostSensitiveDecisionTreeClassifier
>>> from costcla.metrics import savings_score
>>> data = load_creditscoring1()
>>> sets = train_test_split(data.data, data.target, data.cost_mat, test_size=0.33, random_state=0)
>>> X_train, X_test, y_train, y_test, cost_mat_train, cost_mat_test = sets
>>> y_pred_test_rf = RandomForestClassifier(random_state=0).fit(X_train, y_train).predict(X_test)
>>> f = CostSensitiveDecisionTreeClassifier()
>>> y_pred_test_csdt = f.fit(X_train, y_train, cost_mat_train).predict(X_test)
>>> # Savings using only RandomForest
>>> print(savings_score(y_test, y_pred_test_rf, cost_mat_test))
0.12454256594
>>> # Savings using CSDecisionTree
>>> print(savings_score(y_test, y_pred_test_csdt, cost_mat_test))
0.481916135529
"""
def __init__(self,
criterion='direct_cost',
criterion_weight=False,
num_pct=100,
max_features=None,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
min_gain=0.001,
pruned=True,
):
self.criterion = criterion
self.criterion_weight = criterion_weight
self.num_pct = num_pct
self.max_features = max_features
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.min_gain = min_gain
self.pruned = pruned
self.n_features_ = None
self.max_features_ = None
self.tree_ = []
[docs] def set_param(self, attribute, value):
setattr(self, attribute, value)
def _node_cost(self, y_true, cost_mat):
""" Private function to calculate the cost of a node.
Parameters
----------
y_true : array indicator matrix
Ground truth (correct) labels.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
Returns
-------
tuple(cost_loss : float, node prediction : int, node predicted probability : float)
"""
n_samples = len(y_true)
# Evaluates the cost by predicting the node as positive and negative
costs = np.zeros(2)
costs[0] = cost_loss(y_true, np.zeros(y_true.shape), cost_mat)
costs[1] = cost_loss(y_true, np.ones(y_true.shape), cost_mat)
pi = np.array([1 - y_true.mean(), y_true.mean()])
if self.criterion == 'direct_cost':
costs = costs
elif self.criterion == 'pi_cost':
costs *= pi
elif self.criterion == 'gini_cost':
costs *= pi ** 2
elif self.criterion in 'entropy_cost':
if pi[0] == 0 or pi[1] == 0:
costs *= 0
else:
costs *= -np.log(pi)
y_pred = np.argmin(costs)
# Calculate the predicted probability of a node using laplace correction.
n_positives = y_true.sum()
y_prob = (n_positives + 1.0) / (n_samples + 2.0)
return costs[y_pred], y_pred, y_prob
def _calculate_gain(self, cost_base, y_true, X, cost_mat, split):
""" Private function to calculate the gain in cost of using split in the
current node.
Parameters
----------
cost_base : float
Cost of the naive prediction
y_true : array indicator matrix
Ground truth (correct) labels.
X : array-like of shape = [n_samples, n_features]
The input samples.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
split : tuple of len = 2
split[0] = feature to split = j
split[1] = where to split = l
Returns
-------
tuple(gain : float, left node prediction : int)
"""
# Check if cost_base == 0, then no gain is possible
#TODO: This must be check in _best_split
if cost_base == 0.0:
return 0.0, int(np.sign(y_true.mean() - 0.5) == 1) # In case cost_b==0 and pi_1!=(0,1)
j, l = split
filter_Xl = (X[:, j] <= l)
filter_Xr = ~filter_Xl
n_samples, n_features = X.shape
# Check if one of the leafs is empty
#TODO: This must be check in _best_split
if np.nonzero(filter_Xl)[0].shape[0] in [0, n_samples]: # One leaft is empty
return 0.0, 0.0
# Split X in Xl and Xr according to rule split
Xl_cost, Xl_pred, _ = self._node_cost(y_true[filter_Xl], cost_mat[filter_Xl, :])
Xr_cost, _, _ = self._node_cost(y_true[filter_Xr], cost_mat[filter_Xr, :])
if self.criterion_weight:
n_samples_Xl = np.nonzero(filter_Xl)[0].shape[0]
Xl_w = n_samples_Xl * 1.0 / n_samples
Xr_w = 1 - Xl_w
gain = round((cost_base - (Xl_w * Xl_cost + Xr_w * Xr_cost)) / cost_base, 6)
else:
gain = round((cost_base - (Xl_cost + Xr_cost)) / cost_base, 6)
return gain, Xl_pred
def _best_split(self, y_true, X, cost_mat):
""" Private function to calculate the split that gives the best gain.
Parameters
----------
y_true : array indicator matrix
Ground truth (correct) labels.
X : array-like of shape = [n_samples, n_features]
The input samples.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
Returns
-------
tuple(split : tuple(j, l), gain : float, left node prediction : int,
y_pred : int, y_prob : float)
"""
n_samples, n_features = X.shape
num_pct = self.num_pct
cost_base, y_pred, y_prob = self._node_cost(y_true, cost_mat)
# Calculate the gain of all features each split in num_pct
gains = np.zeros((n_features, num_pct))
pred = np.zeros((n_features, num_pct))
splits = np.zeros((n_features, num_pct))
# Selected features
selected_features = np.arange(0, self.n_features_)
# Add random state
np.random.shuffle(selected_features)
selected_features = selected_features[:self.max_features_]
selected_features.sort()
#TODO: # Skip the CPU intensive evaluation of the impurity criterion for
# features that were already detected as constant (hence not suitable
# for good splitting) by ancestor nodes and save the information on
# newly discovered constant features to spare computation on descendant
# nodes.
# For each feature test all possible splits
for j in selected_features:
splits[j, :] = np.percentile(X[:, j], np.arange(0, 100, 100.0 / num_pct).tolist())
for l in range(num_pct):
# Avoid repeated values, since np.percentile may return repeated values
if l == 0 or (l > 0 and splits[j, l] != splits[j, l - 1]):
split = (j, splits[j, l])
gains[j, l], pred[j, l] = self._calculate_gain(cost_base, y_true, X, cost_mat, split)
best_split = np.unravel_index(gains.argmax(), gains.shape)
return (best_split[0], splits[best_split]), gains.max(), pred[best_split], y_pred, y_prob
def _tree_grow(self, y_true, X, cost_mat, level=0):
""" Private recursive function to grow the decision tree.
Parameters
----------
y_true : array indicator matrix
Ground truth (correct) labels.
X : array-like of shape = [n_samples, n_features]
The input samples.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
Returns
-------
Tree : Object
Container of the decision tree
NOTE: it is not the same structure as the sklearn.tree.tree object
"""
#TODO: Find error, add min_samples_split
if len(X.shape) == 1:
tree = dict(y_pred=y_true, y_prob=0.5, level=level, split=-1, n_samples=1, gain=0)
return tree
# Calculate the best split of the current node
split, gain, Xl_pred, y_pred, y_prob = self._best_split(y_true, X, cost_mat)
n_samples, n_features = X.shape
# Construct the tree object as a dictionary
#TODO: Convert tree to be equal to sklearn.tree.tree object
tree = dict(y_pred=y_pred, y_prob=y_prob, level=level, split=-1, n_samples=n_samples, gain=gain)
# Check the stopping criteria
if gain < self.min_gain:
return tree
if self.max_depth is not None:
if level >= self.max_depth:
return tree
if n_samples <= self.min_samples_split:
return tree
j, l = split
filter_Xl = (X[:, j] <= l)
filter_Xr = ~filter_Xl
n_samples_Xl = np.nonzero(filter_Xl)[0].shape[0]
n_samples_Xr = np.nonzero(filter_Xr)[0].shape[0]
if min(n_samples_Xl, n_samples_Xr) <= self.min_samples_leaf:
return tree
# No stooping criteria is met
tree['split'] = split
tree['node'] = self.tree_.n_nodes
self.tree_.n_nodes += 1
tree['sl'] = self._tree_grow(y_true[filter_Xl], X[filter_Xl], cost_mat[filter_Xl], level + 1)
tree['sr'] = self._tree_grow(y_true[filter_Xr], X[filter_Xr], cost_mat[filter_Xr], level + 1)
return tree
class _tree_class():
def __init__(self):
self.n_nodes = 0
self.tree = dict()
self.tree_pruned = dict()
self.nodes = []
self.n_nodes_pruned = 0
[docs] def fit(self, X, y, cost_mat, check_input=False):
""" Build a example-dependent cost-sensitive decision tree from the training set (X, y, cost_mat)
Parameters
----------
y_true : array indicator matrix
Ground truth (correct) labels.
X : array-like of shape = [n_samples, n_features]
The input samples.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
check_input : boolean, (default=True)
Allow to bypass several input checking.
Don't use this parameter unless you know what you do.
Returns
-------
self : object
Returns self.
"""
#TODO: Check input
#TODO: Add random state
n_samples, self.n_features_ = X.shape
self.tree_ = self._tree_class()
# Maximum number of features to be taken into account per split
if isinstance(self.max_features, six.string_types):
if self.max_features == "auto":
max_features = max(1, int(np.sqrt(self.n_features_)))
elif self.max_features == "sqrt":
max_features = max(1, int(np.sqrt(self.n_features_)))
elif self.max_features == "log2":
max_features = max(1, int(np.log2(self.n_features_)))
else:
raise ValueError(
'Invalid value for max_features. Allowed string '
'values are "auto", "sqrt" or "log2".')
elif self.max_features is None:
max_features = self.n_features_
elif isinstance(self.max_features, (numbers.Integral, np.integer)):
max_features = self.max_features
else: # float
if self.max_features > 0.0:
max_features = max(1, int(self.max_features * self.n_features_))
else:
max_features = 1 # On sklearn is 0.
self.max_features_ = max_features
self.tree_.tree = self._tree_grow(y, X, cost_mat)
if self.pruned:
self.pruning(X, y, cost_mat)
self.classes_ = np.array([0, 1])
return self
def _nodes(self, tree):
""" Private function that find the number of nodes in a tree.
Parameters
----------
tree : object
Returns
-------
nodes : array like of shape [n_nodes]
"""
def recourse(temp_tree_, nodes):
if isinstance(temp_tree_, dict):
if temp_tree_['split'] != -1:
nodes.append(temp_tree_['node'])
if temp_tree_['split'] != -1:
for k in ['sl', 'sr']:
recourse(temp_tree_[k], nodes)
return None
nodes_ = []
recourse(tree, nodes_)
return nodes_
def _classify(self, X, tree, proba=False):
""" Private function that classify a dataset using tree.
Parameters
----------
X : array-like of shape = [n_samples, n_features]
The input samples.
tree : object
proba : bool, optional (default=False)
If True then return probabilities else return class
Returns
-------
prediction : array of shape = [n_samples]
If proba then return the predicted positive probabilities, else return
the predicted class for each example in X
"""
n_samples, n_features = X.shape
predicted = np.ones(n_samples)
# Check if final node
if tree['split'] == -1:
if not proba:
predicted = predicted * tree['y_pred']
else:
predicted = predicted * tree['y_prob']
else:
j, l = tree['split']
filter_Xl = (X[:, j] <= l)
filter_Xr = ~filter_Xl
n_samples_Xl = np.nonzero(filter_Xl)[0].shape[0]
n_samples_Xr = np.nonzero(filter_Xr)[0].shape[0]
if n_samples_Xl == 0: # If left node is empty only continue with right
predicted[filter_Xr] = self._classify(X[filter_Xr, :], tree['sr'], proba)
elif n_samples_Xr == 0: # If right node is empty only continue with left
predicted[filter_Xl] = self._classify(X[filter_Xl, :], tree['sl'], proba)
else:
predicted[filter_Xl] = self._classify(X[filter_Xl, :], tree['sl'], proba)
predicted[filter_Xr] = self._classify(X[filter_Xr, :], tree['sr'], proba)
return predicted
[docs] def predict(self, X):
""" Predict class of X.
The predicted class for each sample in X is returned.
Parameters
----------
X : array-like of shape = [n_samples, n_features]
The input samples.
Returns
-------
y : array of shape = [n_samples]
The predicted classes,
"""
#TODO: Check consistency of X
if self.pruned:
tree_ = self.tree_.tree_pruned
else:
tree_ = self.tree_.tree
return self._classify(X, tree_, proba=False)
[docs] def predict_proba(self, X):
"""Predict class probabilities of the input samples X.
Parameters
----------
X : array-like of shape = [n_samples, n_features]
The input samples.
Returns
-------
prob : array of shape = [n_samples, 2]
The class probabilities of the input samples.
"""
#TODO: Check consistency of X
n_samples, n_features = X.shape
prob = np.zeros((n_samples, 2))
if self.pruned:
tree_ = self.tree_.tree_pruned
else:
tree_ = self.tree_.tree
prob[:, 1] = self._classify(X, tree_, proba=True)
prob[:, 0] = 1 - prob[:, 1]
return prob
def _delete_node(self, tree, node):
""" Private function that eliminate node from tree.
Parameters
----------
tree : object
node : int
node to be eliminated from tree
Returns
-------
pruned_tree : object
"""
# Calculate gains
temp_tree = copy.deepcopy(tree)
def recourse(temp_tree_, del_node):
if isinstance(temp_tree_, dict):
if temp_tree_['split'] != -1:
if temp_tree_['node'] == del_node:
del temp_tree_['sr']
del temp_tree_['sl']
del temp_tree_['node']
temp_tree_['split'] = -1
else:
for k in ['sl', 'sr']:
recourse(temp_tree_[k], del_node)
return None
recourse(temp_tree, node)
return temp_tree
def _pruning(self, X, y_true, cost_mat):
""" Private function that prune the decision tree.
Parameters
----------
X : array-like of shape = [n_samples, n_features]
The input samples.
y_true : array indicator matrix
Ground truth (correct) labels.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
"""
# Calculate gains
nodes = self._nodes(self.tree_.tree_pruned)
n_nodes = len(nodes)
gains = np.zeros(n_nodes)
y_pred = self._classify(X, self.tree_.tree_pruned)
cost_base = cost_loss(y_true, y_pred, cost_mat)
for m, node in enumerate(nodes):
# Create temporal tree by eliminating node from tree_pruned
temp_tree = self._delete_node(self.tree_.tree_pruned, node)
y_pred = self._classify(X, temp_tree)
nodes_pruned = self._nodes(temp_tree)
# Calculate %gain
gain = (cost_base - cost_loss(y_true, y_pred, cost_mat)) / cost_base
# Calculate %gain_size
gain_size = (len(nodes) - len(nodes_pruned)) * 1.0 / len(nodes)
# Calculate weighted gain
gains[m] = gain * gain_size
best_gain = np.max(gains)
best_node = nodes[int(np.argmax(gains))]
if best_gain > self.min_gain:
self.tree_.tree_pruned = self._delete_node(self.tree_.tree_pruned, best_node)
# If best tree is not root node, then recursively pruning the tree
if best_node != 0:
self._pruning(X, y_true, cost_mat)
[docs] def pruning(self, X, y, cost_mat):
""" Function that prune the decision tree.
Parameters
----------
X : array-like of shape = [n_samples, n_features]
The input samples.
y_true : array indicator matrix
Ground truth (correct) labels.
cost_mat : array-like of shape = [n_samples, 4]
Cost matrix of the classification problem
Where the columns represents the costs of: false positives, false negatives,
true positives and true negatives, for each example.
"""
self.tree_.tree_pruned = copy.deepcopy(self.tree_.tree)
if self.tree_.n_nodes > 0:
self._pruning(X, y, cost_mat)
nodes_pruned = self._nodes(self.tree_.tree_pruned)
self.tree_.n_nodes_pruned = len(nodes_pruned)