如何在 MLlib 中编写自定义 Transformer?

Posted

技术标签:

【中文标题】如何在 MLlib 中编写自定义 Transformer?【英文标题】:How to write a custom Transformer in MLlib? 【发布时间】:2016-11-15 17:07:46 【问题描述】:

我想为 scala 中的 spark 2.0 中的管道编写自定义 Transformer。到目前为止,我还不清楚copytransformSchema 方法应该返回什么。他们返回null 是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java 复制?

由于Transformer 扩展PipelineStage 我得出结论,fit 调用transformSchema 方法。我是否正确理解 transformSchema 类似于 sk-learns fit?

由于我的Transformer 应该将数据集与(非常小的)第二个数据集连接起来,我也想将那个数据集存储在序列化管道中。我应该如何将它存储在转换器中以正确使用管道序列化机制?

一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保持这个值?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer 

      def transform(df: Dataset[MyClass]): DataFrame = 

      

      override def copy(extra: ParamMap): Transformer = 
      

      override def transformSchema(schema: StructType): StructType = 
        schema
      
    

【问题讨论】:

【参考方案1】:

transformSchema 应该返回应用Transformer 后预期的架构。示例:

如果变压器增加了IntegerType的列,输出的列名是foo

import org.apache.spark.sql.types._

override def transformSchema(schema: StructType): StructType = 
   schema.add(StructField("foo", IntegerType))

因此,如果数据集的架构没有更改,因为仅填充了名称值用于平均插补,我应该将原始案例类作为架构返回?

这在 Spark SQL(和 MLlib 中也是如此)中是不可能的,因为 Dataset 一旦创建就不可变。您只能添加或“替换”(添加后跟drop 操作)列。

【讨论】:

您的意思是删除/添加新列的转换器应该是估算器?这听起来很奇怪。所以我是否理解正确的带有 fit & transform 的 sklearn 变压器是火花估计器,火花变压器只能执行对于任何输入数据都是恒定的“固定”变换。因此,平均估算器需要是估算器吗? 正如@LostInOverflow 所说,你需要一个估计器,然后是一个转换器——估计器从原始列计算平均值,然后转换器用计算的平均值估算缺失值。此外,缺失值插补是目前正在酝酿中的一项功能——JIRA 谢谢。应该存储/持久化的 Estimator 拟合中的任何变量实际上都将被持久化是否正确? 参数应该是。其余的,我不确定。【参考方案2】:

首先,我不确定您是否想要 Transformer 本身(或 UnaryTransformer 为 @LostInOverflow suggested in the answer),如您所说:

一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保持这个值?

对我来说,就好像您想应用一个聚合函数(又名聚合)并将其与所有列“连接”以产生最终值或 NaN。

看起来您希望 groupBymean 进行聚合,然后对 join 进行聚合,这也可能是一个窗口聚合。

无论如何,我会从UnaryTransformer 开始,这将解决您问题中的第一个问题:

到目前为止,我还不清楚copytransformSchema 方法应该返回什么。他们返回 null 是否正确?

请参阅the complete project spark-mllib-custom-transformer at GitHub,其中我实现了UnaryTransformertoUpperCase 的字符串列,对于 UnaryTransformer,该列如下所示:

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.DataType, StringType

class UpperTransformer(override val uid: String)
  extends UnaryTransformer[String, String, UpperTransformer] 

  def this() = this(Identifiable.randomUID("upp"))

  override protected def createTransformFunc: String => String = 
    _.toUpperCase
  

  override protected def outputDataType: DataType = StringType

【讨论】:

以上是关于如何在 MLlib 中编写自定义 Transformer?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark MLlib 中的自定义算法:“函数”对象没有属性“_input_kwargs”

代码调试篇:如何编写 gdb 自定义命令

代码调试篇:如何编写 gdb 自定义命令

elementui loading自定义图片

elementui loading自定义图片

如何在 asp.net core 中编写自定义 actionResult