如何在 PySpark 中创建自定义 Estimator

Posted

技术标签:

【中文标题】如何在 PySpark 中创建自定义 Estimator【英文标题】:How to create a custom Estimator in PySpark 【发布时间】:2016-09-13 05:14:42 【问题描述】:

我正在尝试在 PySpark MLlib 中构建一个简单的自定义 Estimator。我有 here 可以编写自定义 Transformer,但我不确定如何在 Estimator 上执行此操作。我也不明白@keyword_only 做了什么,为什么我需要这么多setter 和getter。 Scikit-learn 似乎有适合自定义模型的文档 (see here),但 PySpark 没有。

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = 'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?

【问题讨论】:

【参考方案1】:

我不同意@Shteingarts 解决方案,因为他在类级别创建成员,甚至将它们与实例成员混合。如果您创建多个 HasMean 实例,将导致问题。为什么不对实例变量使用恕我直言的正确方法?其他代码示例也是如此。

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):
    def __init__(self):
        super(HasMean, self).__init__()
        self.mean = Param(self, "mean", "mean", typeConverter=TypeConverters.toFloat)

    def setMean(self, value):
        return self.set(self.mean, value)

    def getMean(self):
        return self.getOrDefault(self.mean)

【讨论】:

【参考方案2】:

一般来说没有文档,因为对于 Spark 1.6 / 2.0,大多数相关 API 并不打算公开。它应该在 Spark 2.1.0 中发生变化(请参阅 SPARK-7146)。

API 相对复杂,因为它必须遵循特定的约定才能使给定的TransformerEstimatorPipeline API 兼容。其中一些方法可能需要用于读写或网格搜索等功能。其他的,比如keyword_only 只是一个简单的助手,并不是严格要求的。

假设您为均值参数定义了以下混合:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准差参数:

class HasStandardDeviation(Params):

    standardDeviation = Param(Params._dummy(),
        "standardDeviation", "standardDeviation", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(standardDeviation=value)

    def getStddev(self):
        return self.getOrDefault(self.standardDeviation)

和阈值:

class HasCenteredThreshold(Params):

    centeredThreshold = Param(Params._dummy(),
            "centeredThreshold", "centeredThreshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centeredThreshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centeredThreshold)

您可以按如下方式创建基本的Estimator

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        super(NormalDeviation, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        
        
    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return NormalDeviationModel(
            inputCol=c, mean=mu, standardDeviation=sigma, 
            centeredThreshold=self.getCenteredThreshold(),
            predictionCol=self.getPredictionCol())


class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        super(NormalDeviationModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)    

感谢 Benjamin-Manns 在 PySpark 中提供的 use of DefaultParamsReadable, DefaultParamsWritable >= 2.3.0

最后可以这样使用:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

【讨论】:

谢谢!所以估计器的状态也被认为是一个参数? 您的意思是将估计器的调整参数作为模型的参数吗?如果是这样,这种方式设计很方便,但对于基本实现来说并不是硬性要求。 这是一个非常有用的例子。但是,如果您的变压器/模型具有特定于它而不是估计器的参数怎么办?一旦模型成为管道中的一个阶段,您如何将这些参数传递给模型?当这些参数与估计器无关时,我不想先将它们传递给估计器。我问过这个问题here... 谢谢@zero323 - 这有什么更新吗?我讨厌这种需要从每个参数继承的语法(估计器不是参数,所以它不应该从它继承......) @HananShteingart EstimatorTransform 以及其他构造(如评估器)都是参数(issubclass(Estimator, Params) - 注意复数不是单数,因为参数是相关但不同的实体),以及任何内置-in Estimator 是 Params 的子类。这是遵循相同结构的 Scala API 的直接翻译(参见known subclasses)。这与 MLRead[er|able] / MLWriter[er|able] 接口和 Pipeline API 的设计(主要是 setter 和 getter)直接相关。

以上是关于如何在 PySpark 中创建自定义 Estimator的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark ML 中创建自定义 SQLTransformer 以透视数据

如何在 laravel 中创建自定义关系?

如何在 QML 中创建自定义对象?

如何在 Facebook SDK 中创建自定义分享按钮

如何在 WordPress 中创建自定义表单?

如何在 React Native 中创建自定义日历?