from sklearn.tree._tree import TREE_LEAF, TREE_UNDEFINED
from sklearn.tree import DecisionTreeRegressor
from sklearn import __version__ as skv
import numpy as np
from scipy.stats import f
from scipy.sparse import spmatrix
from typing import Optional, Union
from . import logger
[docs]
class PredictiveClusteringTree(DecisionTreeRegressor):
"""
Class that uses predictive clustering trees (PCT) for multi-output prediction.
Note this is a specialized PCT with the prototype as the mean vector and the distance measure as the (sum of) intra-cluster variance.
Such a PCT is equivalent to CART regression tree (with specialized parameters and post-pruning).
Parameters
----------
max_depth
Maximum possible depth of the tree, starting from the root node which has a depth of 0.
Default to no limit.
min_samples_split
The minimum sample size (in absolute number or fraction) of a possible node.
(Default: `20`)
min_samples_leaf
The minimum sample size (in absolute number or fraction) of a possible leaf.
(Default: `10`)
min_weight_fraction_leaf
The minimum fraction out of total sample weights for a possible leaf.
(Default: `0.0`)
random_state
Random seed for column (feature) shuffling before selecting the best feature and threshold.
max_leaf_nodes
The maximum number of leaves, achieved by keeping high-quality (i.e., high impurity reduction) nodes.
Default to no limit.
F_test_prune
Whether to use a F-test to prune the tree by removing unnecessary splits.
(Default: `True`)
p_thres
p-value threshold for pruning nodes after F-test.
(Default: `0.05`)
Attributes
----------
n_features_in_
Number of features.
n_outputs_
Number of outputs.
tree_
A :class:`~sklearn.tree._tree.Tree` object structured by parallel arrays.
p_value
F-test-based p-value for each node or leaf in the tree.
"""
def __init__(self,
*,
max_depth: Optional[int] = None,
min_samples_split: Union[int, float] = 20,
min_samples_leaf: Union[int, float] = 10,
min_weight_fraction_leaf: float = 0.0,
random_state: Optional[int] = None,
max_leaf_nodes: Optional[int] = None,
F_test_prune: bool = True,
p_thres: float = 0.05):
criterion = 'mse' if skv.split('.')[0] == '0' else 'squared_error'
super().__init__(criterion = criterion, #L2 norm multi-output decision tree = pct with mean as prototype and SSe as distance measure
splitter = 'best',
max_depth = max_depth,
min_samples_split = min_samples_split, #min_samples_leaf*2
min_samples_leaf = min_samples_leaf, #minimum cell cluster size
min_weight_fraction_leaf = min_weight_fraction_leaf,
max_features = None, #use all features
random_state = random_state,
max_leaf_nodes = max_leaf_nodes,
min_impurity_decrease = 0.0, #split as much as possible -> no good way to select a proper value
ccp_alpha = 0.0) #use F test for pruning, ignore ccp pruning
self.F_test_prune = F_test_prune
self.p_thres = p_thres
[docs]
def fit(self, X, y, sample_weight = None) -> None:
"""
Fit a PCT with the training dataset.
Parameters
----------
X
Sample-by-feature array-like matrix.
y
Sample-by-output array-like matrix.
sample_weight
Sample weights. Default to equal sample weights.
Returns
-------
None
Fitted and (possibly) pruned tree.
"""
if isinstance(X, spmatrix) and ((X.indices.dtype == 'int64') or (X.indptr.dtype == 'int64')):
super().fit(X.toarray(), y, sample_weight = sample_weight, check_input = True)
else:
super().fit(X, y, sample_weight = sample_weight, check_input = True)
if self.F_test_prune:
self.F_test()
self.prune_tree(p_thres = self.p_thres)
[docs]
def is_node(self, index: int) -> bool:
"""
Check whether a given index is a node.
Parameters
----------
index
Index of the node/leaf in the arrays of the tree structure.
Returns
-------
bool
`True` or `False` indicating whether the given index is an internal node.
"""
return self.tree_.children_left[index] != self.tree_.children_right[index]
[docs]
def F_test(self) -> None:
"""
F test for each internal node.
For each node, the corresponding F distribution has the degrees of freedom `n_output * (n_sample - 1)` and `n_output * (n_sample - 2)`, and the value (q) of `node_impurity * n_sample / (n_sample - 1)` divided by `(left_child_impurity * left_n_sample + right_child_impurity * right_n_sample) / (n_sample - 2)`.
Returns
-------
None
Modified tree with F-test p-values. Leaves are assigned 1 constantly.
"""
ps = np.ones(self.tree_.node_count)
for x in range(self.tree_.node_count):
if self.is_node(x):
f_score_numerator = self.tree_.impurity[x] * self.tree_.n_node_samples[x] / (self.tree_.n_node_samples[x] - 1)
f_score_denominator = (self.tree_.impurity[self.tree_.children_left[x]]*self.tree_.n_node_samples[self.tree_.children_left[x]] +
self.tree_.impurity[self.tree_.children_right[x]]*self.tree_.n_node_samples[self.tree_.children_right[x]]
) / (self.tree_.n_node_samples[x] - 2)
f_score = f_score_numerator / f_score_denominator
p = 1 - f.cdf(f_score, self.n_outputs_ * (self.tree_.n_node_samples[x] - 1), self.n_outputs_ * (self.tree_.n_node_samples[x] - 2))
ps[x] = p
self.p_value = ps
[docs]
def prune_node(self, index: int) -> None:
"""
Prune all descendents of a given node. This node will become a leaf.
Parameters
----------
index
Index of the node/leaf in the tree structure.
Returns
-------
None
Modified tree with all descendents of a given node pruned.
"""
if self.is_node(index):
stack = [index]
while len(stack) > 0:
index_p = stack.pop()
if self.is_node(index_p):
stack.append(self.tree_.children_left[index_p])
stack.append(self.tree_.children_right[index_p])
self.tree_.children_left[index_p] = TREE_LEAF
self.tree_.children_right[index_p] = TREE_LEAF
self.tree_.feature[index_p] = TREE_UNDEFINED
self.tree_.threshold[index_p] = TREE_UNDEFINED
[docs]
def prune_tree(self, p_thres: float = 0.05) -> None:
"""
Prune a tree based on F-test p values.
Parameters
----------
p_thres
p-value threshold to prune nodes.
(Default: `0.05`)
Returns
-------
None
Modified tree with unnecessary splits removed.
"""
if p_thres < 0 or p_thres > 1:
raise ValueError(
f"🛑 Please provide the `p_thres` between 0 and 1")
stack = [0]
while len(stack) > 0:
index_n = stack.pop()
if self.is_node(index_n):
if self.p_value[index_n] < p_thres:
stack.append(self.tree_.children_left[index_n])
stack.append(self.tree_.children_right[index_n])
else:
self.prune_node(index_n)
[docs]
def score(self, X, y, sample_weight = None) -> float:
"""
Calculate the coefficient of determination between the prediction and truth.
Different from multi-output problem where each output is calculated separately and the final R2 score is averaged across outputs, the score here is defined by considering each sample vector as a 'real' sample.
Parameters
----------
X
Sample-by-feature query matrix.
y
Sample-by-output truth matrix.
sample_weight
Sample weights applied to squared distance of each sample.
Returns
----------
float
Coefficient of determination.
"""
if X.shape[1] != self.n_features_in_:
raise ValueError(
f"🛑 Please provide `X` with {self.n_features_in_} columns")
if not isinstance(y, np.ndarray):
raise TypeError(
f"🛑 Please provide `y` as a numpy 2D array")
if (X.shape[0] != y.shape[0]) or (self.n_outputs_ != y.shape[1]):
raise ValueError(
f"🛑 Please provide `y` with {X.shape[0]} rows and {self.n_outputs_} columns")
y_pred = self.predict(X)
if sample_weight is not None:
sample_weight = np.array(sample_weight)
weight = sample_weight[:, np.newaxis]
else:
weight = 1.0
numerator = (weight * (y - y_pred) ** 2).sum()
denominator = (weight * (y - np.average(y, axis=0, weights = sample_weight)) ** 2).sum()
if numerator == 0:
return 1.0
elif denominator == 0:
return 0.0
else:
return 1 - numerator / denominator