Source code for gojo.util.splitter

# Module that contains all the necessary functionalities to separate the data into different sets.
#
# Author: Fernando García Gutiérrez
# Email: ga.gu.fernando.concat@gmail.com
#
# STATUS: uncompleted and not functional, still in development
#
import numpy as np
import pandas as pd
from typing import Tuple
from sklearn.model_selection import (
    train_test_split,
    RepeatedKFold,
    RepeatedStratifiedKFold,
    LeaveOneOut)
from .io import _createObjectRepresentation
from .validation import (
    checkInputType,
    checkMultiInputTypes)


[docs]class SimpleSplitter(object): """ Wrapper of the sklearn `sklearn.model_selection.train_test_split` function used to perform a simple partitioning of the data into a training and a test set (optionally with stratification). Parameters ---------- test_size : float If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. stratify : bool, default=False If not False, data is split in a stratified fashion, using this as the class labels. random_state : int, default=None Controls the shuffling applied to the data before applying the split. shuffle : bool, default=True Whether to shuffle the data before splitting. If shuffle=False then stratify must be None. Examples -------- >>> import numpy as np >>> from gojo import util >>> >>> np.random.seed(1997) >>> >>> n_samples = 20 >>> n_feats = 10 >>> X = np.random.uniform(size=(n_samples, n_feats)) >>> y = np.random.randint(0, 2, size=n_samples) >>> >>> splitter = util.splitter.SimpleSplitter( >>> test_size=0.2, >>> stratify=True, >>> random_state=1997 >>> ) >>> >>> for train_idx, test_idx in splitter.split(X, y): >>> print(len(train_idx), y[train_idx].mean()) >>> print(len(test_idx), y[test_idx].mean()) """ def __init__( self, test_size: float, stratify: bool = False, random_state: int = None, shuffle: bool = True): self.test_size = test_size self.stratify = stratify self.random_state = random_state self.shuffle = shuffle def __repr__(self): return _createObjectRepresentation( 'SimpleSplitter', test_size=self.test_size, stratify=self.stratify, random_state=self.random_state, shuffle=self.shuffle ) def __str__(self): return self.__repr__()
[docs] def split( self, X: np.ndarray or pd.DataFrame, y: np.ndarray or pd.Series = None) -> Tuple[np.ndarray, np.ndarray]: """ Generates indices to split data into training and test set. Parameters ---------- X : np.ndarray or pd.DataFrame Input data. y : np.ndarray or pd.Series, default=None If `stratify` was specified as `True` this variable will be used for performing a stratified split. """ indices = np.arange(len(X)) train_idx, test_idx = train_test_split( indices, test_size=self.test_size, stratify=y if self.stratify else None, random_state=self.random_state, shuffle=self.shuffle) yield train_idx, test_idx
[docs]class InstanceLevelKFoldSplitter(object): """ Splitter that allows to make splits at instance level ignoring the observations associated to the instance. .. important:: The observations of the input data of the :meth:`split` method will be associated with the identifiers provided in `instance_id`. Parameters ---------- n_splits : int Number of folds. Must be at least 2. instance_id : np.ndarray Array identifying the instances to perform the splits. n_repeats : int, default=1 Number of times cross-validator needs to be repeated. shuffle : bool, default=True Indicates whether to shuffle the data before performing the split. random_state : int, default=None Controls the randomness of each repeated cross-validation instance. """ def __init__( self, n_splits: int, instance_id: np.ndarray, n_repeats: int = 1, shuffle: bool = True, random_state: int = None): checkMultiInputTypes( ('n_splits', n_splits, [int]), ('instance_id', instance_id, [np.ndarray]), ('n_repeats', n_repeats, [int]), ('shuffle', shuffle, [bool]), ('random_state', random_state, [int, type(None)]), ) # check input types if n_splits <= 1: raise TypeError('"n_splits" must be > 1') if n_repeats <= 0: raise TypeError('"n_repeats" must be > 0') if len(instance_id) <= 2: raise TypeError('"instance_id" cannot be <= 2') self.n_splits = n_splits self.instance_id = instance_id self.n_repeats = n_repeats self.shuffle = shuffle self.random_state = random_state self._indices = np.arange(len(instance_id)) # get the unique ids, and create an id-position(s) hash self._unique_instance_id = np.unique(instance_id) self._instance_id_hash = { _id: np.where(instance_id == _id)[0] for _id in self._unique_instance_id } # generate partitions self._train_indices, self._test_indices = self._generateSplits() # iterator-level states self._current_iteration = 0 def __repr__(self): return _createObjectRepresentation( 'InstanceLevelKFoldSplitter', n_splits=self.n_splits, n_repeats=self.n_repeats, shuffle=self.shuffle, random_state=self.random_state, observations=len(self.instance_id), unique_instances=len(self._unique_instance_id), ) def __str__(self): return self.__repr__() def _generateSplits(self): """ Internal method needed to generate the internal splits. """ # calculate the size of the split split_size = int(np.ceil(len(self._unique_instance_id) / self.n_splits)) # select the random state for reproducibility if self.random_state is not None: np.random.seed(self.random_state) split_indices = [] for n_repeat in range(self.n_repeats): repeat_split_indices = [] # transform unique ids to indices indices = np.arange(len(self._unique_instance_id)) # random permutation of the indices if self.shuffle: indices = np.random.permutation(indices) # add split indices for n_split in range(self.n_splits): repeat_split_indices.append(indices[n_split * split_size:n_split * split_size + split_size]) # inner checking assert len(np.unique(np.concatenate(repeat_split_indices))) == len( self._unique_instance_id), 'Inner checking fails (0)' # save all folds split_indices = split_indices + repeat_split_indices # expand the indices to the positions unfolded_split_indices = [] for indices in split_indices: unfolded_split_indices.append( np.concatenate([ self._instance_id_hash[_id] for _id in self._unique_instance_id[indices]]) ) # create final train/test folds train_indices = [] test_indices = [] for rep in range(self.n_repeats): for split_i in range(self.n_splits): train_indices_ = [] for split_j in range(self.n_splits): curr_idx = self.n_splits * rep + split_j if split_i == split_j: # select test data test_indices.append(unfolded_split_indices[curr_idx]) else: # select train data train_indices_.append(unfolded_split_indices[curr_idx]) train_indices.append(np.concatenate(train_indices_)) return train_indices, test_indices
[docs] def split( self, X: np.ndarray or pd.DataFrame, y=None) -> Tuple[np.ndarray, np.ndarray]: """ Generate the splits. This function will return a tuple where the first element will correspond to the training indices and the second element to the test indices. .. important:: `X` must match with `instance_id`. Parameters ---------- X : np.ndarray or pd.DataFrame Input data. y : object, default=None Ignored parameter. Implemented for `sklearn` compatibility. """ indices = np.arange(len(X)) if len(self.instance_id) != len(indices): raise TypeError( 'Input parameter "instance_id" must be of the same size as the input data.' 'Provided number of samples "%d", expected "%d"' % (len(indices), len(self.instance_id)) ) while self._current_iteration < len(self._train_indices): train_indices = self._train_indices[self._current_iteration] test_indices = self._test_indices[self._current_iteration] self._current_iteration += 1 yield train_indices, test_indices self._current_iteration = 0
[docs]class PredefinedSplitter(object): """ Wrapper that allows to incorporate a predefined split within the model evaluation subroutines. This wrapper expects from the user two lists, with the indices (positions along dimension 0 of the input data) that will be used as training and test respectively. Parameters ---------- train_index : list or np.ndarray Indices used for train. test_index : list or np.ndarray Indices used for test. Examples -------- >>> import numpy as np >>> from gojo import util >>> >>> np.random.seed(1997) >>> >>> n_samples = 20 >>> n_feats = 10 >>> X = np.random.uniform(size=(n_samples, n_feats)) >>> y = np.random.randint(0, 2, size=n_samples) >>> >>> splitter = util.splitter.PredefinedSplitter( >>> train_index=np.arange(0, 15), >>> test_index=np.arange(15, 20), >>> ) >>> >>> for train_idx, test_idx in splitter.split(X, y): >>> print(len(train_idx), y[train_idx].mean()) >>> print(len(test_idx), y[test_idx].mean()) """ def __init__( self, train_index: list or np.ndarray, test_index: list or np.ndarray): if isinstance(train_index, list): train_index = np.array(train_index) if isinstance(test_index, list): test_index = np.array(test_index) if len(train_index.shape) > 1: raise ValueError( 'train_index must be a one-dimensional vector. Provided shape: %r' % list(train_index.shape)) if len(test_index.shape) > 1: raise ValueError( 'test_index must be a one-dimensional vector. Provided shape: %r' % list(test_index.shape)) self.train_index = train_index self.test_index = test_index def __repr__(self): return _createObjectRepresentation( 'PredefinedSplitter', train_index_length=len(self.train_index), test_index_length=len(self.test_index), ) def __str__(self): return self.__repr__()
[docs] def split( self, X: np.ndarray or pd.DataFrame, y: np.ndarray or pd.Series = None) -> Tuple[np.ndarray, np.ndarray]: """ Generates indices to split data into training and test set. Parameters ---------- X : np.ndarray or pd.DataFrame Input data. y : np.ndarray or pd.Series, default=None Target variable. """ indices = np.arange(len(X)) # check shape consistency if len(indices) != (len(self.train_index) + len(self.test_index)): raise ValueError( 'Inconsistency in the predefined indexes for separating training (length %d) and test (length %d)' ' data, with the size of the data received (length %d).' % ( len(self.train_index), len(self.test_index), len(indices) )) # check data leakages common_indices = set(list(self.train_index)).intersection(set(list(self.test_index))) if len(common_indices) > 0: raise ValueError( 'Data leakage between training and test instances has been detected. Number ' 'of common indexes: %d (%r)' % (len(common_indices), list(common_indices))) # check index consistency max_index_val = max(list(set(list(self.train_index)).union(set(list(self.test_index))))) if max_index_val != (indices.shape[0] - 1): raise ValueError('Indexes outside the possible range of values to index the data.') yield self.train_index, self.test_index
[docs]def getCrossValObj(cv: int = None, repeats: int = 1, stratified: bool = False, loocv: bool = False, random_state: int = None) -> RepeatedKFold or RepeatedStratifiedKFold or LeaveOneOut: """ Function used to obtain the sklearn class used to perform an evaluation of the models according to the cross-validation or leave-one-out cross-validation (LOOCV) schemes. Parameters ---------- cv : int, default=None (cross-validation) This parameter is used to specify the number of folds. Ignored when loocv is set to True. repeats : int, default=1 (cross-validation) This parameter is used to specify the number of repetitions of an N-repeats cross-validation. Ignored when loocv is set to True. stratified : bool, default=False (cross-validation) This parameter is specified whether to perform the cross-validation with class stratification. Ignored when loocv is set to True. loocv : bool, default=False (Leave-one-out cross validation) Indicates whether to perform a LOOCV. If this parameter is set to True the rest of the parameters will be ignored. random_state : int, default=None (cross-validation) Random state for study replication. Returns -------- cv_obj : RepeatedKFold or RepeatedStratifiedKFold or LeaveOneOut Cross-validation instance from the `sklearn <https://scikit-learn.org/stable/modules/cross_validation.html>`_ library. """ checkMultiInputTypes( ('cv', cv, [int, type(None)]), ('repeats', repeats, [int, type(None)]), ('stratified', stratified, [bool]), ('loocv', loocv, [bool]), ('random_state', random_state, [int, type(None)])) if loocv: return LeaveOneOut() else: if cv is None: raise TypeError( 'Parameter "cv" in "gojo.util.tools.getCrossValObj()" must be selected to a integer if ' 'loocv is set to False.') if stratified: return RepeatedStratifiedKFold(n_repeats=repeats, n_splits=cv, random_state=random_state) else: return RepeatedKFold(n_repeats=repeats, n_splits=cv, random_state=random_state)
def _splitOpArgsDicts(op_args: dict, indices: list) -> Tuple[dict] or dict: """ This function splits the values of each of the variables defined in the input directory based on the different indices provided. The splits will be returned in the same order in which the indices were provided. """ assert isinstance(op_args, (type(None), dict)) assert isinstance(indices, list) assert len(indices) >= 1 # return an empty list of dictionaries of the same length as indices if op_args is None or len(op_args) == 0: if len(indices) == 1: return {} return tuple([{} for _ in range(len(indices))]) # select the indices split_info = [] for index_vals in indices: index_level_dict = {} # stores all the values associated with the current split for var_name, var_values in op_args.items(): checkInputType('op_args["%s"]' % var_name, var_values, [list, np.ndarray]) index_level_dict[var_name] = [var_values[idx] for idx in index_vals] split_info.append(index_level_dict) if len(split_info) == 1: return split_info return tuple(split_info)