在 pyspark.ml 中运行多个功能的变压器

Posted

技术标签:

【中文标题】在 pyspark.ml 中运行多个功能的变压器【英文标题】:Transformer operating on multiple features in pyspark.ml 【发布时间】:2017-01-24 12:36:13 【问题描述】:

我想在DataFrame 中制作自己的特征转换器,以便添加一个列,例如,其他两列之间的差异。我关注了this question,但那里的变压器只在一根柱子上运行。 pyspark.ml.Transformer 将字符串作为inputCol 的参数,所以我当然不能指定多个列。

所以基本上,我想要实现的是一个类似于这个的_transform() 方法:

def _transform(self, dataset):
    out_col = self.getOutputCol()
    in_col = dataset.select([self.getInputCol()])

    # Define transformer logic
    def f(col1, col2):
        return col1 - col2
    t = IntegerType()

    return dataset.withColumn(out_col, udf(f, t)(in_col))

这怎么可能?

【问题讨论】:

难道HasInputCols(复数)就是你要找的东西? 【参考方案1】:

我设法解决了这个问题,首先从我要操作的一组特征中创建一个Vector,然后对新生成的矢量特征应用变换。下面是一个示例代码,说明如何制作一个不同于其他两个功能的新功能:

class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol):  

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(MeasurementDifferenceTransformer, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]

        # Define transformer logic
        def f(vector):
            return float(vector[0] - vector[1])
        t = FloatType()

        return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col))

要使用它,我们首先实例化一个VectorAssembler 来创建一个向量特征:

pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector")

然后我们实例化转换器:

pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2")

最后我们转换数据:

pairfeats = pair_assembler.transform(df)
difffeats = pait_transformer.transform(pairfeats)

【讨论】:

【参考方案2】:

您无需经历所有这些麻烦即可对多个列进行操作。这是使用 HasInputCols(而不是 HasInputCol)的更好方法

class MeasurementDifferenceTransformer(Transformer, HasInputCols, HasOutputCol):  
    @keyword_only
    def __init__(self, inputCols=None, outputCol=None):
        super(MeasurementDifferenceTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCols=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        in_col = self.getInputCols()

        # Define transformer logic
        def f(col1, col2):
            return float(col1-col2)
        t = FloatType()

        return dataset.withColumn(out_col, udf(lambda f, t)(*in_col))

【讨论】:

以上是关于在 pyspark.ml 中运行多个功能的变压器的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中聚合One-Hot编码功能

主变调档最大次数限制

在非 Spark 环境中加载 pyspark ML 模型

如何使 pyspark 和 ML(无 RDD)与大型 csv 一起工作?

PySpark ML——分布式机器学习库

在 python 中使用 pandas,numpy 是不是有 pyspark.ml.feature StringIndexer 的替代方法?