Source code for palladio.model_assessment

"""Nested Cross-Validation for scikit-learn using MPI.

This package provides nested cross-validation similar to scikit-learn's
GridSearchCV but uses the Message Passing Interface (MPI)
for parallel computing.
"""
from __future__ import print_function

import gzip
import logging
import joblib as jl
import numbers
import os
import warnings

from collections import deque
from collections import Iterable
from six.moves import cPickle as pkl
from sklearn.base import BaseEstimator, clone
# from sklearn.model_selection import check_cv as _check_cv
from sklearn.metrics.scorer import check_scoring
from sklearn.base import is_classifier
from sklearn.model_selection._split import _CVIterableWrapper
from sklearn.model_selection._validation import _shuffle
from sklearn.model_selection import StratifiedShuffleSplit, ShuffleSplit
from sklearn.utils import check_X_y, check_random_state
from sklearn.utils.metaestimators import _safe_split
from sklearn.utils.multiclass import type_of_target

from palladio.utils import build_cv_results as _build_cv_results

__all__ = ('ModelAssessment',)

try:
    from mpi4py import MPI
    COMM = MPI.COMM_WORLD
    RANK = COMM.Get_rank()
    NAME = MPI.Get_processor_name()
    IS_MPI_JOB = COMM.Get_size() > 1

except ImportError:
    # warnings.warn("mpi4py module not found. "
    #               "PALLADIO cannot run on multiple machines.")
    COMM = None
    RANK = 0
    NAME = 'localhost'
    IS_MPI_JOB = False

MAX_RESUBMISSIONS = 0  # resubmissions disabled
DO_WORK = 100
EXIT = 200


def _worker(estimator_, i, X, y, train, test):
    """Implement the worker resubmission in case of errors."""
    # custom_name = "{}_p_{}_i_{}".format(
    #     ("permutation" if is_permutation_test else "regular"), RANK, i)
    # tmp_name_base = 'tmp_' + custom_name

    logger = logging.getLogger('main')

    experiment_resubmissions = 0
    experiment_completed = False

    logger.info("{}{} executing job {}".format(NAME, RANK, i))

    while not experiment_completed and \
            experiment_resubmissions <= MAX_RESUBMISSIONS:
        try:

            if experiment_resubmissions > 0:
                logger.warning("{}{} resubmitting experiment {}".format(NAME, RANK, i))

            # tmp_name = tmp_name_base + '_submission_{}'.format(
            #     experiment_resubmissions + 1)
            # run_experiment(data, labels, None, config,
            #                is_permutation_test, experiments_folder_path,
            #                tmp_name)
            # TODO necessary?
            estimator = clone(estimator_.estimator)

            # need to get the deepest estimator to use _safe_split
            estimator__ = clone(estimator)
            while hasattr(estimator__, 'estimator'):
                estimator__ = clone(estimator__.estimator)

            X_train, y_train = _safe_split(estimator__, X, y, train)
            X_test, y_test = _safe_split(estimator__, X, y, test, train)

            if estimator_.shuffle_y:
                random_state = check_random_state(estimator_.random_state)
                y_train = _shuffle(y_train, estimator_.groups, random_state)

            logger.info("{}{} fitting experiment {} - starting".format(NAME, RANK, i))
            estimator.fit(X_train, y_train)
            logger.info("{}{} fitting experiment {} - completed".format(NAME, RANK, i))

            logger.debug("{}{} scoring experiment {} - starting".format(NAME, RANK, i))
            yts_pred = estimator.predict(X_test)
            ytr_pred = estimator.predict(X_train)
            lr_score = estimator_.scorer_(estimator, X_train, y_train)
            ts_score = estimator_.scorer_(estimator, X_test, y_test)
            logger.debug("{}{} scoring experiment {} - complete".format(NAME, RANK, i))

            if hasattr(estimator, 'cv_results_'):
                # In case in which the estimator is a CV object
                cv_results = estimator.cv_results_
            else:
                cv_results = None

            cv_results_ = {
                'split_i': i,
                'learn_score': lr_score,
                'test_score': ts_score,
                'cv_results_': cv_results,
                'ytr_pred': ytr_pred,
                'yts_pred': yts_pred,
                'test_index': test,
                'train_index': train,
                'estimator': estimator
            }

            experiment_completed = True

            # ### Dump partial results
            if estimator_.experiments_folder is not None:
                logger.debug("{}{} saving results for experiment {}".format(NAME, RANK, i))
                pkl_name = (
                    'permutation' if estimator_.shuffle_y else 'regular') + \
                    '_%d.pkl' % i

                pkl.dump(cv_results_, gzip.open(os.path.join(
                    estimator_.experiments_folder, pkl_name), 'wb'))

        except StandardError as error:
            # If somethings out of the ordinary happens,
            # resubmit the job
            experiment_resubmissions += 1
            warnings.warn(
                "[{}_{}] failed experiment {}, resubmission #{}\n"
                "Exception raised: {}".format(
                    NAME, RANK, i, experiment_resubmissions, error))

    if not experiment_completed:
        warnings.warn(
            "[{}_{}] failed to complete experiment {}, "
            "max resubmissions limit reached".format(NAME, RANK, i))
        return {}
    else:
        if not IS_MPI_JOB and estimator_.verbose:

            logger.info("{}{} job {} completed".format(NAME, RANK, i))

            print("Experiment {} completed [{}]".format(
                i, ('permutation' if estimator_.shuffle_y else 'regular')))

        return cv_results_


def _check_cv(cv=3, y=None, classifier=False, **kwargs):
    """Input checker utility for building a cross-validator.

    Parameters
    ----------
    cv : int, cross-validation generator or an iterable, optional
        Determines the cross-validation splitting strategy.
        Possible inputs for cv are:
          - None, to use the default 3-fold cross-validation,
          - integer, to specify the number of folds.
          - An object to be used as a cross-validation generator.
          - An iterable yielding train/test splits.

        For integer/None inputs, if classifier is True and ``y`` is either
        binary or multiclass, :class:`StratifiedKFold` is used. In all other
        cases, :class:`KFold` is used.

        Refer :ref:`User Guide <cross_validation>` for the various
        cross-validation strategies that can be used here.

    y : array-like, optional
        The target variable for supervised learning problems.

    classifier : boolean, optional, default False
        Whether the task is a classification task, in which case
        stratified KFold will be used.

    kwargs : dict
        Other parameters for StratifiedShuffleSplit or ShuffleSplit.

    Returns
    -------
    checked_cv : a cross-validator instance.
        The return value is a cross-validator which generates the train/test
        splits via the ``split`` method.
    """
    if cv is None:
        cv = kwargs.pop('n_splits', 0) or 10

    if isinstance(cv, numbers.Integral):
        if (classifier and (y is not None) and
                (type_of_target(y) in ('binary', 'multiclass'))):
            return StratifiedShuffleSplit(cv, **kwargs)
        else:
            return ShuffleSplit(cv, **kwargs)

    if not hasattr(cv, 'split') or isinstance(cv, str):
        if not isinstance(cv, Iterable) or isinstance(cv, str):
            raise ValueError("Expected cv as an integer, cross-validation "
                             "object (from sklearn.model_selection) "
                             "or an iterable. Got %s." % cv)
        return _CVIterableWrapper(cv)

    return cv  # New style cv objects are passed without any modification


[docs]class ModelAssessment(BaseEstimator): """Cross-validation with nested parameter search for each training fold. The data is first split into ``cv`` train and test sets. For each training set a grid search over the specified set of parameters is performed (inner cross-validation). The set of parameters that achieved the highest average score across all inner folds is used to re-fit a model on the entire training set of the outer cross-validation loop. Finally, results on the test set of the outer loop are reported. Parameters ---------- estimator : object type that implements the "fit" and "predict" methods A object of that type is instantiated for each grid point. cv : integer or cross-validation generator, optional, default: 3 If an integer is passed, it is the number of folds. Specific cross-validation objects can be passed, see sklearn.cross_validation module for the list of possible objects scoring : string, callable or None, optional, default: None A string (see model evaluation documentation) or a scorer callable object / function with signature ``scorer(estimator, X, y)``. See sklearn.metrics.get_scorer for details. fit_params : dict, optional, default: None Parameters to pass to the fit method. multi_output : boolean, default: False Allow multi-output y, as for multivariate regression. shuffle_y : bool, optional, default=False When True, the object is used to perform permutation test. n_jobs : int, optional, default: 1 The number of jobs to use for the computation. This works by computing each of the Monte Carlo runs in parallel. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. Ignored when using MPI. n_splits: int, optional, default: 10 The number of cross-validation splits (folds/iterations). test_size : float (default 0.1), int, or None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is automatically set to the complement of the train size. train_size : float, int, or None (default is None) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int or RandomState, optional, default: None Pseudo-random number generator state used for random sampling. groups : array-like, with shape (n_samples,), optional, default: None Group labels for the samples used while splitting the dataset into train/test set. experiments_folder : string, optional, default: None The path to the folder used to save the results. verbose : bool, optional, default: False Print debug messages. Attributes ---------- scorer_ : function Scorer function used on the held out data to choose the best parameters for the model. cv_results_ : dictionary Result of the fit. The dictionary is pandas.DataFrame-able. Each row is the results of an external split. Columns are: 'split_i', 'learn_score', 'test_score', 'cv_results_', 'ytr_pred', 'yts_pred', 'test_index', 'train_index', 'estimator' Example: >>> pd.DataFrame(cv_results_) split_i | learn_score | test_score | cv_results_ | ... 0 | 0.987 | 0.876 | {<internal splits>} | ... 1 | 0.846 | 0.739 | {<internal splits>} | ... 2 | 0.956 | 0.630 | {<internal splits>} | ... 3 | 0.964 | 0.835 | {<internal splits>} | ... """ def __init__(self, estimator, cv=None, scoring=None, fit_params=None, multi_output=False, shuffle_y=False, n_jobs=1, n_splits=10, test_size=0.1, train_size=None, random_state=None, groups=None, experiments_folder=None, verbose=False): self.estimator = estimator self.scoring = scoring self.fit_params = fit_params self.cv = cv self.multi_output = multi_output self.n_splits = n_splits self.test_size = test_size self.train_size = train_size self.random_state = random_state self.groups = groups self.experiments_folder = experiments_folder self.n_jobs = n_jobs self.verbose = verbose # Shuffle training labels self.shuffle_y = shuffle_y def _fit_single_job(self, job_list, X, y): cv_results_ = {} # for i, (train_index, test_index) in job_list: # LOG.info("Training fold %d", i + 1) # # slave_result_ = self._worker( # i, X, y, train_index, test_index) # # _build_cv_results(cv_results_, **slave_result_) slave_results = jl.Parallel(n_jobs=self.n_jobs) \ (jl.delayed(_worker)( self, i, X, y, train_index, test_index) for i, ( train_index, test_index) in job_list) for slave_result_ in slave_results: _build_cv_results(cv_results_, **slave_result_) self.cv_results_ = cv_results_ def _fit_master(self, X, y): cv = _check_cv( self.cv, y, classifier=is_classifier(self.estimator), n_splits=self.n_splits, test_size=self.test_size, train_size=self.train_size, random_state=self.random_state) job_list = list(enumerate(cv.split(X, y, self.groups))) if not IS_MPI_JOB: self._fit_single_job( job_list, X, y) return count = 0 nprocs = COMM.Get_size() queue = deque(job_list) n_pipes = len(queue) cv_results_ = {} # updated by _build_cv_results # seed the slaves by sending work to each processor for rankk in range(1, min(nprocs, n_pipes)): pipe_tuple = queue.popleft() COMM.send(pipe_tuple, dest=rankk, tag=DO_WORK) while queue: pipe_tuple = queue.popleft() # receive result from slave status = MPI.Status() slave_result_ = COMM.recv( source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) # send to the same slave new work COMM.send(pipe_tuple, dest=status.source, tag=DO_WORK) _build_cv_results(cv_results_, **slave_result_) count += 1 # No more work to do, so receive all the results from slaves for rankk in range(1, min(nprocs, n_pipes)): status = MPI.Status() slave_result_ = COMM.recv( source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) _build_cv_results(cv_results_, **slave_result_) count += 1 # tell all slaves to exit by sending an empty message with the EXIT_TAG for rankk in range(1, nprocs): COMM.send(0, dest=rankk, tag=EXIT) # max_performance = _get_best_parameters(grid_results, param_names) # LOG.info("Best performance for fold %d:\n%s", i + 1, # max_performance) # max_performance['fold'] = i + 1 # best_parameters.append(max_performance) # best_parameters = pandas.DataFrame(best_parameters) # best_parameters.set_index('fold', inplace=True) # best_parameters['score (Test)'] = 0.0 # best_parameters.rename(columns={'score': 'score (Validation)'}, # inplace=True) # scores = _fit_and_score_with_parameters( # X, y, cv, best_parameters.loc[:, param_names]) # best_parameters['score (Test)'] = scores # self.best_params_ = best_parameters self.cv_results_ = cv_results_ def _fit_slave(self, X, y): """Pipeline evaluation. Parameters ---------- X : array of float, shape : n_samples x n_features, default : () The input data matrix. """ try: while True: status_ = MPI.Status() received = COMM.recv(source=0, tag=MPI.ANY_TAG, status=status_) # check the tag of the received message if status_.tag == EXIT: return # do the work i, (train_index, test_index) = received if self.verbose: print("[{} {}]: Performing experiment {}".format( NAME, RANK, i)) cv_results_ = _worker(self, i, X, y, train_index, test_index) if self.verbose: print("[{} {}]: Experiment {} completed".format( NAME, RANK, i)) COMM.send(cv_results_, dest=0, tag=0) except StandardError as exc: warnings.warn("Quitting ... TB:", str(exc))
[docs] def fit(self, X, y): """Fit the model to the training data.""" X, y = check_X_y(X, y, force_all_finite=False, multi_output=self.multi_output) self.scorer_ = check_scoring(self.estimator, scoring=self.scoring) if RANK == 0: self._fit_master(X, y) else: self._fit_slave(X, y) return self