如何在 MLlib 中编写自定义 Transformer?
Posted
技术标签:
【中文标题】如何在 MLlib 中编写自定义 Transformer?【英文标题】:How to write a custom Transformer in MLlib? 【发布时间】:2016-11-15 17:07:46 【问题描述】:我想为 scala 中的 spark 2.0 中的管道编写自定义 Transformer
。到目前为止,我还不清楚copy
或transformSchema
方法应该返回什么。他们返回null
是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java 复制?
由于Transformer
扩展PipelineStage
我得出结论,fit
调用transformSchema
方法。我是否正确理解 transformSchema
类似于 sk-learns fit?
由于我的Transformer
应该将数据集与(非常小的)第二个数据集连接起来,我也想将那个数据集存储在序列化管道中。我应该如何将它存储在转换器中以正确使用管道序列化机制?
一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保持这个值?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer
def transform(df: Dataset[MyClass]): DataFrame =
override def copy(extra: ParamMap): Transformer =
override def transformSchema(schema: StructType): StructType =
schema
【问题讨论】:
【参考方案1】:transformSchema
应该返回应用Transformer
后预期的架构。示例:
如果变压器增加了IntegerType
的列,输出的列名是foo
:
import org.apache.spark.sql.types._
override def transformSchema(schema: StructType): StructType =
schema.add(StructField("foo", IntegerType))
因此,如果数据集的架构没有更改,因为仅填充了名称值用于平均插补,我应该将原始案例类作为架构返回?
这在 Spark SQL(和 MLlib 中也是如此)中是不可能的,因为 Dataset
一旦创建就不可变。您只能添加或“替换”(添加后跟drop
操作)列。
【讨论】:
您的意思是删除/添加新列的转换器应该是估算器?这听起来很奇怪。所以我是否理解正确的带有 fit & transform 的 sklearn 变压器是火花估计器,火花变压器只能执行对于任何输入数据都是恒定的“固定”变换。因此,平均估算器需要是估算器吗? 正如@LostInOverflow 所说,你需要一个估计器,然后是一个转换器——估计器从原始列计算平均值,然后转换器用计算的平均值估算缺失值。此外,缺失值插补是目前正在酝酿中的一项功能——JIRA 谢谢。应该存储/持久化的 Estimator 拟合中的任何变量实际上都将被持久化是否正确? 参数应该是。其余的,我不确定。【参考方案2】:首先,我不确定您是否想要 Transformer
本身(或 UnaryTransformer
为 @LostInOverflow suggested in the answer),如您所说:
一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保持这个值?
对我来说,就好像您想应用一个聚合函数(又名聚合)并将其与所有列“连接”以产生最终值或 NaN。
看起来您希望 groupBy
对 mean
进行聚合,然后对 join
进行聚合,这也可能是一个窗口聚合。
无论如何,我会从UnaryTransformer
开始,这将解决您问题中的第一个问题:
到目前为止,我还不清楚
copy
或transformSchema
方法应该返回什么。他们返回 null 是否正确?
请参阅the complete project spark-mllib-custom-transformer at GitHub,其中我实现了UnaryTransformer
到toUpperCase
的字符串列,对于 UnaryTransformer,该列如下所示:
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.DataType, StringType
class UpperTransformer(override val uid: String)
extends UnaryTransformer[String, String, UpperTransformer]
def this() = this(Identifiable.randomUID("upp"))
override protected def createTransformFunc: String => String =
_.toUpperCase
override protected def outputDataType: DataType = StringType
【讨论】:
以上是关于如何在 MLlib 中编写自定义 Transformer?的主要内容,如果未能解决你的问题,请参考以下文章