Spark ML Transformer - 使用 rangeBetween 在窗口上聚合

Posted

技术标签:

【中文标题】Spark ML Transformer - 使用 rangeBetween 在窗口上聚合【英文标题】:Spark ML Transformer - aggregate over a window using rangeBetween 【发布时间】:2017-11-03 15:20:00 【问题描述】:

我想创建自定义 Spark ML 转换器,该转换器在滚动窗口内应用聚合函数,构造为 over window。我希望能够在 Spark ML Pipeline 中使用这个转换器。

我想实现一些可以通过withColumn 很容易完成的事情,正如这个答案中给出的那样

Spark Window Functions - rangeBetween dates

例如:

val w = Window.orderBy(col("unixTimeMS")).rangeBetween(0, 700)
val df_new = df.withColumn("cts", sum("someColumnName").over(w))

在哪里

df 是我的数据框 unixTimeMS 是以毫秒为单位的 unix 时间 someColumnName 是我想要执行聚合的一些列。 在此示例中,我对窗口中的行进行求和。 窗口w 包括当前事务和当前事务700 毫秒内的所有事务。

是否可以将这样的窗口聚合放入 Spark ML Transformer 中?

我能够使用 Spark ML SQLTransformer 实现类似的目标,其中

val query = """SELECT *,
              sum(someColumnName) over (order by unixTimeMS) as cts
              FROM __THIS__"""

new SQLTransformer().setStatement(query)

但我不知道如何在 SQL 中使用rangeBetween 来选择时间段。不仅仅是行数。我需要针对当前行的 unixTimeMS 的特定时间段。

我知道一元变换不是这样做的方法,因为我需要进行聚合。我是否需要定义一个 UDAF(用户定义的聚合函数)并在 SQLTransformer 中使用它? 我找不到任何包含窗口函数的 UDAF 示例。

【问题讨论】:

【参考方案1】:

我正在回答我自己的问题以供将来参考。我最终使用了 SQLTransformer。就像示例中的窗口函数一样,我使用了 range between:

val query = SELECT *,
sum(dollars) over (
      partition by Numerocarte
      order by UnixTime
      range between 1000 preceding and 200 following) as cts
      FROM __THIS__"

其中 1000 和 200 之间的范围与按列排序的单位有关。

【讨论】:

以上是关于Spark ML Transformer - 使用 rangeBetween 在窗口上聚合的主要内容,如果未能解决你的问题,请参考以下文章

如果不使用spark-ml中的管道,交叉验证会更快吗?

Spark ml pipeline - transforming feature - StringIndexer

Spark 3.0 - 5.ML Pipeline 实战之电影影评情感分析

在 ML.NET 中使用Hugginface Transformer

使用 Spark ML 时出现 VectorUDT 问题

Spark|ML|随机森林|从 RandomForestClassificationModel 的 .txt 加载训练模型。 toDebugString