UDF scala 返回 [max,index]

Posted

技术标签:

【中文标题】UDF scala 返回 [max,index]【英文标题】:UDF scala return [max,index] 【发布时间】:2016-03-16 08:02:44 【问题描述】:

我想为 Spark SQL 实现以下功能。给定一个数组,返回带有索引的最大值。我试过了:

/*
 * This function finds the maximum value and corresponding index in the array. NULLs are ignored. 
 * Return type is array in format [max, index], and its element type is the same as the input type.
 * Parameters: x Array[Int]
 * Returns: Array as [max, index].
 */
def array_max_index(x: WrappedArray[Int]): WrappedArray[Int] = 
    val arr = collection.mutable.WrappedArray.empty
    arr.:+(x.max).:+(x.indexOf(x.max))

这很好用,但仅适用于Integers - 我希望 UDF 也适用于其他数值(例如 Doubles)。我尝试了以下方法,但无法返回具有类型的结构:

def array_max_index[T](item:Traversable[T])(implicit n:Numeric[T]): Traversable[T] = 
    val arr = collection.mutable.WrappedArray.empty
    val max = item.max
    val index = n.toInt(item.toSeq.indexOf(max))
    arr.:+(max).:+(index)
  

有什么想法吗?

【问题讨论】:

你试过了吗?它不工作吗?它应该工作...... 是的,此代码有效,但仅适用于整数数组。我希望它适用于 Double 类型。 那么在问题中说明...? 【参考方案1】:

返回Array 没那么有用——因为索引类型始终为Int,最大值类型取决于具体调用(如果我理解正确,您希望它适用于整数和双精度数) - 所以无法正确输入数组。

这是 UDF 的一种可能实现,返回一个元组

def array_max_index[T](x: Traversable[T])(implicit n: Numeric[T]): (T, Int) = 
  (x.max, x.toSeq.indexOf(x.max))

然后,可以为Doubles 和Ints 调用:

sqlContext.udf.register("array_max_index", array_max_index(_: Traversable[Double]))

sqlContext.sql(
  """SELECT array_max_index(array(
    |  CAST(5.0 AS DOUBLE),
    |  CAST(7.0 AS DOUBLE),
    |  CAST(3.0 AS DOUBLE)
    |)) as max_and_index""".stripMargin).show

哪些打印:

+-------------+
|max_and_index|
+-------------+
|      [7.0,1]|
+-------------+

【讨论】:

以上是关于UDF scala 返回 [max,index]的主要内容,如果未能解决你的问题,请参考以下文章

参数为空时,Spark Scala UDF 不返回预期值

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射

Scala UDF 返回“不支持单元类型的架构”

Scala UDF 函数对数组列进行操作并返回自定义值

如果其他,Spark scala udf 错误