Scala中的Spark分组映射UDF

Posted

技术标签:

【中文标题】Scala中的Spark分组映射UDF【英文标题】:Spark grouped map UDF in Scala 【发布时间】:2020-04-08 15:40:02 【问题描述】:

我正在尝试编写一些代码,以允许我对数据帧的一组行计算一些操作。在 PySpark 中,这可以通过定义 GROUPED_MAP 类型的 Pandas UDF 来实现。但是,在 Scala 中,我只找到了一种创建自定义聚合器 (UDAF) 或经典 UDF 的方法。

我的临时解决方案是生成一个对我的组进行编码的键列表,这将允许我过滤数据帧并对数据帧的每个子集执行我的操作。然而,这种方法不是最优的并且非常慢。 执行的动作是按顺序进行的,因此需要很多时间。我可以并行化循环,但我确信这会显示出任何改进,因为 Spark 已经分发。

有没有更好的方法来做我想做的事?

编辑:尝试使用 Futures 进行并行化,但没有像预期的那样提高速度

【问题讨论】:

【参考方案1】:

据我所知,这在 Scala 中是不可能的。根据您的需要,我认为可能还有其他方法可以将转换应用于 Spark / Scala 中的一组行:

    执行groupBy(...).agg(collect_list(<column_names>)),并使用对值数组进行操作的UDF。如果需要,您可以使用带有 explode(<array_column>) 的 select 语句恢复到原始格式 尝试使用窗口函数重写您想要实现的目标。您可以使用聚合表达式添加新列,如下所示:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group)

val result = spark.range(100)
    .withColumn("group", pmod('id, lit(3)))
    .withColumn("group_sum", sum('id).over(w))

【讨论】:

以上是关于Scala中的Spark分组映射UDF的主要内容,如果未能解决你的问题,请参考以下文章

模式匹配范围在Scala与Spark udf

Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射

Scala Spark 中的 udf 运行时错误

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?