我有一个与以下类似的堆叠工作流程
import numpy as npfrom sklearn.linear_model import LogisticRegressionfrom sklearn.pipeline import Pipelinefrom sklearn.preprocessing import StandardScalerfrom sklearn.ensemble import StackingClassifierfrom sklearn.pipeline import make_pipelineimport xgboost as xgbX = np.random.random(size=(1000, 5))y = np.random.choice([0,1], 1000)w = np.random.random(size=(1000,))scaler = StandardScaler()log_reg = LogisticRegression()params = { 'n_estimators': 10, 'max_depth': 3, 'learning_rate': 0.1}log_reg_pipe = make_pipeline( scaler, log_reg)stack_pipe = make_pipeline( StackingClassifier( estimators=[('lr', lr_stack_pipe)], final_estimator=xgb.XGBClassifier(**params), passthrough=True, cv=2 ))
我想能够将样本权重传递给xgboost。我的问题是如何在最终估计器中设置样本权重?
我尝试过
stack_pipe.fit(X, y, sample_weights=w)
这会抛出
ValueError: Pipeline.fit does not accept the sample_weights parameter. You can pass parameters to specific steps of your pipeline using the stepname__parameter format, e.g. `Pipeline.fit(X, y, logisticregression__sample_weight=sample_weight)`
回答:
我最近还意识到堆叠估计器无法处理带样本权重的Pipeline。我通过子类化scikit-learn的StackingRegressor
和StackingClassifier
类,并重写其fit()
方法来更好地管理Pipeline来解决这个问题。请看以下内容:
"""Implement StackingClassifier that can handle sample-weighted Pipelines."""from sklearn.ensemble import StackingRegressor, StackingClassifierfrom copy import deepcopyimport numpy as npfrom joblib import Parallelfrom sklearn.base import clonefrom sklearn.base import is_classifier, is_regressorfrom sklearn.model_selection import cross_val_predictfrom sklearn.model_selection import check_cvfrom sklearn.utils import Bunchfrom sklearn.utils.fixes import delayedfrom sklearn.pipeline import PipelineESTIMATOR_NAME_IN_PIPELINE = 'estimator'def new_fit_single_estimator(estimator, X, y, sample_weight=None, message_clsname=None, message=None): """Private function used to fit an estimator within a job.""" if sample_weight is not None: try: if isinstance(estimator, Pipeline): # determine name of final estimator estimator_name = estimator.steps[-1][0] kwargs = {estimator_name + '__sample_weight': sample_weight} estimator.fit(X, y, **kwargs) else: estimator.fit(X, y, sample_weight=sample_weight) except TypeError as exc: if "unexpected keyword argument 'sample_weight'" in str(exc): raise TypeError( "Underlying estimator {} does not support sample weights." .format(estimator.__class__.__name__) ) from exc raise else: estimator.fit(X, y) return estimatorclass FlexibleStackingClassifier(StackingClassifier): def __init__(self, estimators, final_estimator=None, *, cv=None, n_jobs=None, passthrough=False, verbose=0): super().__init__( estimators=estimators, final_estimator=final_estimator, cv=cv, n_jobs=n_jobs, passthrough=passthrough, verbose=verbose ) def fit(self, X, y, sample_weight=None): """Fit the estimators. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vectors, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) Target values. sample_weight : array-like of shape (n_samples,) or default=None Sample weights. If None, then samples are equally weighted. Note that this is supported only if all underlying estimators support sample weights. .. versionchanged:: 0.23 when not None, `sample_weight` is passed to all underlying estimators Returns ------- self : object """ # all_estimators contains all estimators, the one to be fitted and the # 'drop' string. names, all_estimators = self._validate_estimators() self._validate_final_estimator() stack_method = [self.stack_method] * len(all_estimators) # Fit the base estimators on the whole training data. Those # base estimators will be used in transform, predict, and # predict_proba. They are exposed publicly. self.estimators_ = Parallel(n_jobs=self.n_jobs)( delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight) for est in all_estimators if est != 'drop' ) self.named_estimators_ = Bunch() est_fitted_idx = 0 for name_est, org_est in zip(names, all_estimators): if org_est != 'drop': self.named_estimators_[name_est] = self.estimators_[ est_fitted_idx] est_fitted_idx += 1 else: self.named_estimators_[name_est] = 'drop' # To train the meta-classifier using the most data as possible, we use # a cross-validation to obtain the output of the stacked estimators. # To ensure that the data provided to each estimator are the same, we # need to set the random state of the cv if there is one and we need to # take a copy. cv = check_cv(self.cv, y=y, classifier=is_classifier(self)) if hasattr(cv, 'random_state') and cv.random_state is None: cv.random_state = np.random.RandomState() self.stack_method_ = [ self._method_name(name, est, meth) for name, est, meth in zip(names, all_estimators, stack_method) ] fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight} if sample_weight is not None else None) predictions = Parallel(n_jobs=self.n_jobs)( delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv), method=meth, n_jobs=self.n_jobs, fit_params=fit_params, verbose=self.verbose) for est, meth in zip(all_estimators, self.stack_method_) if est != 'drop' ) # Only not None or not 'drop' estimators will be used in transform. # Remove the None from the method as well. self.stack_method_ = [ meth for (meth, est) in zip(self.stack_method_, all_estimators) if est != 'drop' ] X_meta = self._concatenate_predictions(X, predictions) new_fit_single_estimator(self.final_estimator_, X_meta, y, sample_weight=sample_weight) return selfclass FlexibleStackingRegressor(StackingRegressor): def __init__(self, estimators, final_estimator=None, *, cv=None, n_jobs=None, passthrough=False, verbose=0): super().__init__( estimators=estimators, final_estimator=final_estimator, cv=cv, n_jobs=n_jobs, passthrough=passthrough, verbose=verbose ) def fit(self, X, y, sample_weight=None): """Fit the estimators. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vectors, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) Target values. sample_weight : array-like of shape (n_samples,) or default=None Sample weights. If None, then samples are equally weighted. Note that this is supported only if all underlying estimators support sample weights. .. versionchanged:: 0.23 when not None, `sample_weight` is passed to all underlying estimators Returns ------- self : object """ # all_estimators contains all estimators, the one to be fitted and the # 'drop' string. names, all_estimators = self._validate_estimators() self._validate_final_estimator() stack_method = [self.stack_method] * len(all_estimators) # Fit the base estimators on the whole training data. Those # base estimators will be used in transform, predict, and # predict_proba. They are exposed publicly. self.estimators_ = Parallel(n_jobs=self.n_jobs)( delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight) for est in all_estimators if est != 'drop' ) self.named_estimators_ = Bunch() est_fitted_idx = 0 for name_est, org_est in zip(names, all_estimators): if org_est != 'drop': self.named_estimators_[name_est] = self.estimators_[ est_fitted_idx] est_fitted_idx += 1 else: self.named_estimators_[name_est] = 'drop' # To train the meta-classifier using the most data as possible, we use # a cross-validation to obtain the output of the stacked estimators. # To ensure that the data provided to each estimator are the same, we # need to set the random state of the cv if there is one and we need to # take a copy. cv = check_cv(self.cv, y=y, classifier=is_classifier(self)) if hasattr(cv, 'random_state') and cv.random_state is None: cv.random_state = np.random.RandomState() self.stack_method_ = [ self._method_name(name, est, meth) for name, est, meth in zip(names, all_estimators, stack_method) ] fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight} if sample_weight is not None else None) predictions = Parallel(n_jobs=self.n_jobs)( delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv), method=meth, n_jobs=self.n_jobs, fit_params=fit_params, verbose=self.verbose) for est, meth in zip(all_estimators, self.stack_method_) if est != 'drop' ) # Only not None or not 'drop' estimators will be used in transform. # Remove the None from the method as well. self.stack_method_ = [ meth for (meth, est) in zip(self.stack_method_, all_estimators) if est != 'drop' ] X_meta = self._concatenate_predictions(X, predictions) new_fit_single_estimator(self.final_estimator_, X_meta, y, sample_weight=sample_weight) return self
我同时包含了回归器和分类器版本,尽管您似乎只需要使用分类器子类。
但请注意:您必须在Pipeline中为估计器设置相同的名称,并且该名称必须与下方定义的ESTIMATOR_NAME_IN_PIPELINE
字段一致。否则代码将无法工作。例如,以下是一个使用与上方类定义脚本中相同名称的正确定义的Pipeline
实例:
from sklearn.preprocessing import StandardScalerfrom sklearn.linear_model import TweedieRegressorfrom sklearn.feature_selection import VarianceThresholdvalidly_named_pipeline = Pipeline([ ('variance_threshold', VarianceThreshold()), ('scaler', StandardScaler()), ('estimator', TweedieRegressor())])
这不是理想的解决方案,但这是我目前的解决方法,应该可以正常工作。
编辑:为了澄清,当我重写fit()
方法时,我只是从scikit仓库复制粘贴了代码,并进行了必要的更改,这些更改只涉及几行代码。所以大部分粘贴的代码不是我的原创,而是scikit开发者的工作。