如何使用值编写 UDF 作为对其他列的引用?

Posted

技术标签:

【中文标题】如何使用值编写 UDF 作为对其他列的引用?【英文标题】:How to write UDF with values as references to other columns? 【发布时间】:2017-10-01 07:47:44 【问题描述】:

我想创建一个执行以下操作的 UDF:

A DataFrame 有 5 列,并且想要创建第 6 列,其中包含第一列和第二列名称的值的总和。

让我打印DataFrame 并解释一下:

case class salary(c1: String, c2: String, c3: Int, c4: Int, c5: Int)

val df = Seq(
    salary("c3", "c4", 7, 5, 6),
    salary("c5", "c4", 8, 10, 20),
    salary("c5", "c3", 1, 4, 9))
    .toDF()

DataFrame结果

+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| c3| c4|  7|  5|  6|
| c5| c4|  8| 10| 20|
| c5| c3|  1|  4|  9|
+---+---+---+---+---+

df.withColumn("c6",UDFName(c1,c2))

该列的结果应该是:

1º 行(C3,C4) 然后 7+5= 12

2º 行(C5,C4) 然后 20+10= 30

3º 行(C5,C3) 然后 9+1= 10

【问题讨论】:

【参考方案1】:

这里真的不需要UDF。只需使用虚拟MapType 列:

import org.apache.spark.sql.functions.col, lit, map

// We use an interleaved list of column name and column value
val values = map(Seq("c3", "c4", "c5").flatMap(c => Seq(lit(c), col(c))): _*)

// Check the first row
df.select(values).limit(1).show(false)
+------------------------------+
|map(c3, c3, c4, c4, c5, c5)   |
+------------------------------+
|Map(c3 -> 7, c4 -> 5, c5 -> 6)|
+------------------------------+

并在表达式中使用它:

df.withColumn("c6", values($"c1") + values($"c2"))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4|  7|  5|  6| 12|
| c5| c4|  8| 10| 20| 30|
| c5| c3|  1|  4|  9| 10|
+---+---+---+---+---+---+ 

它比处理UDFsRows 更干净、更快、更安全:

import org.apache.spark.sql.functions.struct, udf
import org.apache.spark.sql.Row

val f = udf((row: Row) => for 
  // Use Options to avoid problems with null columns
  // Explicit null checks should be faster, but much more verbose
  c1 <- Option(row.getAs[String]("c1"))
  c2 <- Option(row.getAs[String]("c2"))

  // In this case we could (probably) skip Options below
  // but Ints in Spark SQL can get null
  x <- Option(row.getAs[Int](c1))
  y <- Option(row.getAs[Int](c2))
 yield x + y)

df.withColumn("c6", f(struct(df.columns map col: _*)))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4|  7|  5|  6| 12|
| c5| c4|  8| 10| 20| 30|
| c5| c3|  1|  4|  9| 10|
+---+---+---+---+---+---+ 

【讨论】:

【参考方案2】:

用户定义函数 (UDF) 可以访问直接作为输入参数传递的值。

如果您想访问其他列,UDF 将只能访问它们如果您将它们作为输入参数传递。有了这个,你应该很容易实现你所追求的。

我强烈建议使用struct 函数来组合所有其他列。

struct(cols: Column*): Column 创建一个新的结构列。

您也可以使用Dataset.columns 方法访问struct 的列。

columns: Array[String] 将所有列名作为数组返回。

【讨论】:

以上是关于如何使用值编写 UDF 作为对其他列的引用?的主要内容,如果未能解决你的问题,请参考以下文章

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

VBA UDF 多单元格引用

如何在不指定每一列的情况下将整行作为参数传递给 Spark(Java)中的 UDF?

无论计算模式如何,您将如何编写手动计算的 excel udf?

将 UDF 方法作为参数传递给 KSQL 中的其他 UDF

如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?