如何使用值编写 UDF 作为对其他列的引用?
Posted
技术标签:
【中文标题】如何使用值编写 UDF 作为对其他列的引用?【英文标题】:How to write UDF with values as references to other columns? 【发布时间】:2017-10-01 07:47:44 【问题描述】:我想创建一个执行以下操作的 UDF:
A DataFrame
有 5 列,并且想要创建第 6 列,其中包含第一列和第二列名称的值的总和。
让我打印DataFrame
并解释一下:
case class salary(c1: String, c2: String, c3: Int, c4: Int, c5: Int)
val df = Seq(
salary("c3", "c4", 7, 5, 6),
salary("c5", "c4", 8, 10, 20),
salary("c5", "c3", 1, 4, 9))
.toDF()
DataFrame
结果
+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| c3| c4| 7| 5| 6|
| c5| c4| 8| 10| 20|
| c5| c3| 1| 4| 9|
+---+---+---+---+---+
df.withColumn("c6",UDFName(c1,c2))
该列的结果应该是:
1º 行(C3,C4) 然后 7+5= 12
2º 行(C5,C4) 然后 20+10= 30
3º 行(C5,C3) 然后 9+1= 10
【问题讨论】:
【参考方案1】:这里真的不需要UDF。只需使用虚拟MapType
列:
import org.apache.spark.sql.functions.col, lit, map
// We use an interleaved list of column name and column value
val values = map(Seq("c3", "c4", "c5").flatMap(c => Seq(lit(c), col(c))): _*)
// Check the first row
df.select(values).limit(1).show(false)
+------------------------------+
|map(c3, c3, c4, c4, c5, c5) |
+------------------------------+
|Map(c3 -> 7, c4 -> 5, c5 -> 6)|
+------------------------------+
并在表达式中使用它:
df.withColumn("c6", values($"c1") + values($"c2"))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4| 7| 5| 6| 12|
| c5| c4| 8| 10| 20| 30|
| c5| c3| 1| 4| 9| 10|
+---+---+---+---+---+---+
它比处理UDFs
和Rows
更干净、更快、更安全:
import org.apache.spark.sql.functions.struct, udf
import org.apache.spark.sql.Row
val f = udf((row: Row) => for
// Use Options to avoid problems with null columns
// Explicit null checks should be faster, but much more verbose
c1 <- Option(row.getAs[String]("c1"))
c2 <- Option(row.getAs[String]("c2"))
// In this case we could (probably) skip Options below
// but Ints in Spark SQL can get null
x <- Option(row.getAs[Int](c1))
y <- Option(row.getAs[Int](c2))
yield x + y)
df.withColumn("c6", f(struct(df.columns map col: _*)))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4| 7| 5| 6| 12|
| c5| c4| 8| 10| 20| 30|
| c5| c3| 1| 4| 9| 10|
+---+---+---+---+---+---+
【讨论】:
【参考方案2】:用户定义函数 (UDF) 可以访问直接作为输入参数传递的值。
如果您想访问其他列,UDF 将只能访问它们如果您将它们作为输入参数传递。有了这个,你应该很容易实现你所追求的。
我强烈建议使用struct 函数来组合所有其他列。
struct(cols: Column*): Column 创建一个新的结构列。
您也可以使用Dataset.columns 方法访问struct
的列。
columns: Array[String] 将所有列名作为数组返回。
【讨论】:
以上是关于如何使用值编写 UDF 作为对其他列的引用?的主要内容,如果未能解决你的问题,请参考以下文章
使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧
如何在不指定每一列的情况下将整行作为参数传递给 Spark(Java)中的 UDF?