在开始使用pyspark.ml
和管道API时,我发现自己为了在管道中使用它们而为典型的预处理任务编写自定义转换器。例如:
from pyspark.ml import Pipeline, Transformerclass CustomTransformer(Transformer): # 懒惰的解决方法 - 转换器需要有这些属性 _defaultParamMap = dict() _paramMap = dict() _params = dict()class ColumnSelector(CustomTransformer): """选择列子集的转换器 - 用作管道阶段""" def __init__(self, columns): self.columns = columns def _transform(self, data): return data.select(self.columns)class ColumnRenamer(CustomTransformer): """重命名一列的转换器""" def __init__(self, rename): self.rename = rename def _transform(self, data): (colNameBefore, colNameAfter) = self.rename return data.withColumnRenamed(colNameBefore, colNameAfter)class NaDropper(CustomTransformer): """ 删除至少包含一个非数字元素的行 """ def __init__(self, cols=None): self.cols = cols def _transform(self, data): dataAfterDrop = data.dropna(subset=self.cols) return dataAfterDropclass ColumnCaster(CustomTransformer): def __init__(self, col, toType): self.col = col self.toType = toType def _transform(self, data): return data.withColumn(self.col, data[self.col].cast(self.toType))
它们确实能工作,但我很好奇这是否是一种模式或反模式 – 这样的转换器是否是与管道API一起工作的好方法?是否有必要实现它们,还是在其他地方提供了等效的功能?
回答:
我认为这主要是基于意见的,尽管看起来不必要地冗长,而且Python的Transformers
与Pipeline
API的其余部分整合得不好。
还值得指出的是,你在这里所做的一切都可以通过SQLTransformer
轻松实现。例如:
from pyspark.ml.feature import SQLTransformerdef column_selector(columns): return SQLTransformer( statement="SELECT {} FROM __THIS__".format(", ".join(columns)) )
或者
def na_dropper(columns): return SQLTransformer( statement="SELECT * FROM __THIS__ WHERE {}".format( " AND ".join(["{} IS NOT NULL".format(x) for x in columns]) ) )
通过稍加努力,你可以使用带有Hive方言的SQLAlchemy来避免手写SQL。