如何使用 Scala 调用 UDF

Posted

技术标签:

【中文标题】如何使用 Scala 调用 UDF【英文标题】:How to call an UDF using Scala 【发布时间】:2015-09-09 10:27:40 【问题描述】:

如何通过 DataFrame API 在 Scala 中表达以下代码?

sqlContext.read.parquet("/input").registerTempTable("data")
sqlContext.udf.register("median", new Median)
sqlContext.sql(
  """
    |SELECT
    |  param,
    |  median(value) as median
    |FROM data
    |GROUP BY param
""".stripMargin).registerTempTable("medians")

我已经通过

val data = sqlContext.read.parquet("/input")
sqlContext.udf.register("median", new Median)
data.groupBy("param")

但是他们我不确定如何调用median 函数。

【问题讨论】:

【参考方案1】:

您可以使用callUDF

data.groupBy("param").agg(callUDF("median", $"value"))

或者直接调用:

val median = new Median
data.groupBy("param").agg(median($"value"))

// Equivalent to
data.groupBy("param").agg(new Median()($"value"))

不过,我认为使用object 而不是class 会更有意义。

【讨论】:

关于对象而不是类的好点,我只使用了类,因为我从 java 测试示例中复制了实现并且没有考虑对象。 我很好奇,您是否尝试使用 UDAF 计算精确的中位数? 是的。是一个实验,可能只使用纯 scala,因为无论如何您都必须存储所有值。 有趣。你使用数组作为缓冲区吗? 一个 sparksql 数组。死慢。切换到 Scala 增益。

以上是关于如何使用 Scala 调用 UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Java 调用 Groovy 或 Scala UDF 来更新 Oracle?

如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错

如何在 Scala Spark 项目中使用 PySpark UDF?

如何使用替代方法解决重载方法值寄存器,UDF Spark scala