Source code for mcmodels.regressors.nonparametric.nadaraya_watson

"""
Nadaraya-Watson Regression.
"""
# Authors: Joseph Knox <josephk@alleninstitute.org>
# License: Allen Institute Software License

# TODO : evaluate overwrite of K (kernel)
from __future__ import division

import numpy as np
from scipy.sparse import issparse

from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics.scorer import check_scoring
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.model_selection import GridSearchCV, ParameterGrid, check_cv
from sklearn.model_selection._search import _check_param_grid
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_array
from sklearn.utils import check_X_y

from ...utils import squared_norm


[docs]class NadarayaWatson(BaseEstimator, RegressorMixin): """NadarayaWatson Estimator. Parameters ---------- kernel : string or callable, default="linear" Kernel mapping used to compute weights. gamma : float, default=None Gamma parameter for the RBF, laplacian, polynomial, exponential chi2 and sigmoid kernels. Ignored by other kernels. degree : float, default=3 Degree of the polynomial kernel. Ignored by other kernels. coef0 : float, default=1 Zero coefficient for polynomial and sigmoid kernels. Ignored by other kernels. kernel_params : mapping of string to any, optional Additional parameters for kernel function passed as callable object. Notes ----- See `sklearn.kernel_ridge <http://scikit-learn.org/stable/modules/ generated/sklearn.kernel_ridge.KernelRidge.html#sklearn.kernel_ridge. KernelRidge>`_, for more info: Kernel Ridge Regression estimator from which the structure of this estimator is based. Examples -------- >>> import numpy as np >>> from mcmodels.regressors import NadarayaWatson >>> # generate some fake data >>> n_samples, n_features = 10, 5 >>> np.random.seed(0) >>> y = np.random.randn(n_samples) >>> X = np.random.randn(n_samples, n_features) >>> # fit regressor >>> reg = NadarayaWatson() >>> reg.fit(X, y) NadarayaWatson(coef0=1, degree=3, gamma=None, kernel='linear', kernel_params=None) """
[docs] def __init__(self, kernel="linear", degree=3, coef0=1, gamma=None, kernel_params=None): self.kernel = kernel self.gamma = gamma self.degree = degree self.coef0 = coef0 self.kernel_params = kernel_params
def _get_kernel(self, X, y=None): """Gets kernel matrix.""" if callable(self.kernel): params = self.kernel_params or {} else: params = {"gamma": self.gamma, "degree": self.degree, "coef0": self.coef0} return pairwise_kernels(X, y, metric=self.kernel, filter_params=True, **params) @property def _pairwise(self): return self.kernel == "precomputed" def _check_fit_arrays(self, X, y, sample_weight=None): """Checks fit arrays and scales y if sample_weight is not None.""" # Convert data X, y = check_X_y(X, y, accept_sparse=("csr", "csc"), multi_output=True, y_numeric=True) if sample_weight is not None and not isinstance(sample_weight, float): # TODO: break up? sample_weight = check_array(sample_weight, ensure_2d=False) # do not want to rescale X!!!! y = np.multiply(sample_weight[:, np.newaxis], y) if len(y.shape) == 1: y = y.reshape(-1, 1) return X, y
[docs] def fit(self, X, y, sample_weight=None): """Fit Nadaraya Watson estimator. Parameters ---------- X : array, shape (n_samples, n_features) Training data. y : array, shape (n_samples, n_features) Target values. Returns ------- self : returns an instance of self """ X, y = self._check_fit_arrays(X, y, sample_weight) self.X_ = X self.y_ = y return self
@staticmethod def _normalize_kernel(K, overwrite=False): """Normalizes kernel to have row sum == 1 if sum != 0""" factor = K.sum(axis=1) # if kernel has finite support, do not divide by zero factor[factor == 0] = 1 # divide in place if overwrite: return np.divide(K, factor[:, np.newaxis], K) return K/factor[:, np.newaxis]
[docs] def get_weights(self, X): """Return model weights.""" check_is_fitted(self, ["X_", "y_"]) K = self._get_kernel(X, self.X_) return self._normalize_kernel(K, overwrite=True)
[docs] def predict(self, X): """Predict using the Nadaraya Watson model. Parameters ---------- X : array, shape (n_samples, n_features) Training data. Returns ------- C : array, shape (n_samples,) or (n_samples, n_targets) Returns predicted values. """ check_is_fitted(self, ["X_", "y_"]) if len(X.shape) == 1: X = X.reshape(-1, 1) w = self.get_weights(X) # TODO: evaluate sklearn.utils.extmath.safe_sparse_dot() if issparse(self.y_): # has to be of form sparse.dot(dense) # more efficient than w.dot( y_.toarray() ) return self.y_.T.dot(w.T).T return w.dot(self.y_)
@property def nodes(self): """Nodes (data)""" check_is_fitted(self, ["X_", "y_"]) return self.y_
class _NadarayaWatsonLOOCV(NadarayaWatson): """Nadaraya watson with built-in Cross-Validation It allows efficient Leave-One-Out cross validation This class is not intended to be used directly. Use NadarayaWatsonCV instead. """ def __init__(self, param_grid, scoring=None, store_cv_scores=False): #TODO: check _check_param_grid in proper spot self.param_grid = param_grid self.scoring = scoring self.store_cv_scores = store_cv_scores _check_param_grid(param_grid) @property def _param_iterator(self): return ParameterGrid(self.param_grid) def _errors_and_values_helper(self, K): """Helper function to avoid duplication between self._errors and self._values. fill diagonal with 0, renormalize """ np.fill_diagonal(K, 0) S = self._normalize_kernel(K, overwrite=True) return S def _errors(self, K, y): """ mean((y - Sy)**2) = mean( ((I-S)y)**2 )""" S = self._errors_and_values_helper(K) # I - S (S has 0 on diagonal) S *= -1 np.fill_diagonal(S, 1.0) mse = lambda x: squared_norm(x) / x.size return mse(S.dot(y)) def _values(self, K, y): """ prediction """ S = self._errors_and_values_helper(K) return S.dot(y) def fit(self, X, y, sample_weight=None): """Fit the model using efficient leave-one-out cross validation""" X, y = self._check_fit_arrays(X, y, sample_weight) candidate_params = list(self._param_iterator) scorer = check_scoring(self, scoring=self.scoring, allow_none=True) # error = scorer is None error = self.scoring is None if not error: # scorer wants an object to make predictions # but are already computed efficiently by _NadarayaWatsonCV. # This identity_estimator will just return them def identity_estimator(): pass identity_estimator.predict = lambda y_pred: y_pred cv_scores = [] for candidate in candidate_params: # NOTE: a bit hacky, find better way K = NadarayaWatson(**candidate)._get_kernel(X) if error: # NOTE: score not error! score = -self._errors(K, y) else: y_pred = self._values(K, y) score = scorer(identity_estimator, y, y_pred) cv_scores.append(score) self.n_splits_ = X.shape[0] self.best_index_ = np.argmax(cv_scores) self.best_score_ = cv_scores[self.best_index_] self.best_params_ = candidate_params[self.best_index_] if self.store_cv_scores: self.cv_scores_ = cv_scores return self
[docs]class NadarayaWatsonCV(NadarayaWatson): """NadarayaWatson Estimator with built in Leave-one-out cross validation. By default, it performs Leave-one-out cross validation efficiently, but can accept cv argument to perform arbitrary cross validation splits. Parameters ---------- param_grid : dict or list of dictionaries Dictionary with parameters names (string) as keys and lists of parameter settings to try as values or a list of such dictionaries, in which case the grids spanned by each dictionary in the list are explored. This enables searching over any sequence of parameter settings. scoring : string, callable or None, optional, default: None A string (see sklearn.model_evaluation documentation) or a scorer callable object / function with signature ``scorer(estimator, X, y)`` cv : int, cross-validation generator or an iterable, optional, default: None Determines the cross-validation splitting strategy. If None, perform efficient leave-one-out cross validation, else use sklearn.model_selection.GridSearchCV. store_cv_scores : boolean, optional, default=False Flag indicating if the cross-validation values should be stored in `cv_scores_` attribute. This flag is only compatible with `cv=None`. Attributes ---------- cv_scores_ : array, shape = (n_samples, ~len(param_grid)) Cross-validation scores for each candidate parameter (if `store_cv_scores=True` and `cv=None`) best_score_ : float Mean cross-validated score of the best performing estimator. n_splits_ : int Number of cross-validation splits (folds/iterations) Examples -------- >>> import numpy as np >>> from mcmodels.regressors import NadarayaWatson >>> # generate some fake data >>> n_samples, n_features = 10, 5 >>> np.random.seed(0) >>> y = np.random.randn(n_samples) >>> X = np.random.randn(n_samples, n_features) >>> # fit regressor >>> param_grid = [dict(kernel=['linear'], degree=np.arange(1, 4)), ... dict(kernel=['rbf'], gamma=np.logspace(-1, 1, 3))] >>> reg = NadarayaWatsonCV(param_grid) >>> reg.fit(X, y) NadarayaWatsonCV(coef0=1, cv=None, degree=3, gamma=1.0, kernel='rbf', kernel_params=None, param_grid=[{'kernel': ['linear'], 'degree': array([1, 2, 3])}, {'kernel': ['rbf'], 'gamma': array([ 0.1, 1. , 10. ])}], scoring=None, store_cv_scores=False) """
[docs] def __init__(self, param_grid, scoring=None, cv=None, store_cv_scores=False, kernel="linear", degree=3, coef0=1, gamma=None, kernel_params=None): self.param_grid = param_grid self.scoring = scoring self.cv = cv self.store_cv_scores = store_cv_scores # NadarayaWatson kwargs :: for compatibility self.kernel = kernel self.gamma = gamma self.degree = degree self.coef0 = coef0 self.kernel_params = kernel_params
def _update_params(self, param_dict): for k, v in param_dict.items(): setattr(self, k, v)
[docs] def fit(self, X, y, sample_weight=None): """Fit Nadaraya Watson estimator. Parameters ---------- X : array, shape (n_samples, n_features) Training data. y : array, shape (n_samples, n_features) Target values. Returns ------- self : returns an instance of self """ if self.cv is None: estimator = _NadarayaWatsonLOOCV(param_grid=self.param_grid, scoring=self.scoring, store_cv_scores=self.store_cv_scores) estimator.fit(X, y, sample_weight=sample_weight) self.best_score_ = estimator.best_score_ self.n_splits_ = estimator.n_splits_ best_params_ = estimator.best_params_ if self.store_cv_scores: self.best_index_ = estimator.best_index_ self.cv_scores_ = estimator.cv_scores_ else: if self.store_cv_scores: raise ValueError("cv!=None and store_cv_score=True " "are incompatible") gs = GridSearchCV(NadarayaWatson(), self.param_grid, cv=self.cv, scoring=self.scoring, refit=True) gs.fit(X, y, sample_weight=sample_weight) estimator = gs.best_estimator_ self.n_splits_ = gs.n_splits_ self.best_score_ = gs.best_score_ best_params_ = gs.best_params_ # set params for predict self._update_params(best_params_) # store data for predict self.X_ = X self.y_ = y return self