将 UDF 应用于 Spark Dataframe 中的多个列
Posted
技术标签:
【中文标题】将 UDF 应用于 Spark Dataframe 中的多个列【英文标题】:Apply UDF to multiple columns in Spark Dataframe 【发布时间】:2017-07-19 11:23:01 【问题描述】:我有一个如下所示的数据框
| id| age| rbc| bgr| dm|cad|appet| pe|ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
| 3|48.0|normal|117.0| no| no| poor|yes|yes| ckd|
....
....
....
我编写了一个 UDF 来将分类 yes, no, poor, normal
转换为二进制 0s
和 1s
def stringToBinary(stringValue: String): Int =
stringValue match
case "yes" => return 1
case "no" => return 0
case "present" => return 1
case "notpresent" => return 0
case "normal" => return 1
case "abnormal" => return 0
val stringToBinaryUDF = udf(stringToBinary _)
我将其应用于数据框如下
val newCol = stringToBinaryUDF.apply(col("pc")) //creates the new column with formatted value
val refined1 = noZeroDF.withColumn("dm", newCol) //adds the new column to original
如何将多个列传递到 UDF,这样我就不必为其他分类列重复自己?
【问题讨论】:
【参考方案1】:如果您有 spark
函数来完成与 udf
函数序列化和反序列化列数据相同的工作,则不应选择udf
函数。
给定一个dataframe
+---+----+------+-----+---+---+-----+---+---+--------------+
|id |age |rbc |bgr |dm |cad|appet|pe |ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|3 |48.0|normal|117.0|no |no |poor |yes|yes|ckd |
+---+----+------+-----+---+---+-----+---+---+--------------+
你可以用when
函数来实现你的要求
import org.apache.spark.sql.functions._
def applyFunction(column : Column) = when(column === "yes" || column === "present" || column === "normal", lit(1))
.otherwise(when(column === "no" || column === "notpresent" || column === "abnormal", lit(0)).otherwise(column))
df.withColumn("dm", applyFunction(col("dm")))
.withColumn("cad", applyFunction(col("cad")))
.withColumn("rbc", applyFunction(col("rbc")))
.withColumn("pe", applyFunction(col("pe")))
.withColumn("ane", applyFunction(col("ane")))
.show(false)
结果是
+---+----+---+-----+---+---+-----+---+---+--------------+
|id |age |rbc|bgr |dm |cad|appet|pe |ane|classification|
+---+----+---+-----+---+---+-----+---+---+--------------+
|3 |48.0|1 |117.0|0 |0 |poor |1 |1 |ckd |
+---+----+---+-----+---+---+-----+---+---+--------------+
现在问题清楚地表明您不想为所有列重复该过程,您可以执行以下操作
val columnsTomap = df.select("rbc", "cad", "rbc", "pe", "ane").columns
var tempdf = df
columnsTomap.map(column =>
tempdf = tempdf.withColumn(column, applyFunction(col(column)))
)
tempdf.show(false)
【讨论】:
【参考方案2】:您也可以使用foldLeft
函数。将您的 UDF 称为 stringToBinaryUDF
:
import org.apache.spark.sql.functions._
val categoricalColumns = Seq("rbc", "cad", "rbc", "pe", "ane")
val refinedDF = categoricalColumns
.foldLeft(noZeroDF) (accumulatorDF: DataFrame, columnName: String) =>
accumulatorDF
.withColumn(columnName, stringToBinaryUDF(col(columnName)))
这将尊重不变性和函数式编程。
【讨论】:
【参考方案3】:UDF 可以采用许多参数,即许多列,但它应该返回一个结果,即一列。
为此,只需将参数添加到您的stringToBinary
函数即可。
如果您希望它占据两列,它将如下所示:
def stringToBinary(stringValue: String, secondValue: String): Int =
stringValue match
case "yes" => return 1
case "no" => return 0
case "present" => return 1
case "notpresent" => return 0
case "normal" => return 1
case "abnormal" => return 0
val stringToBinaryUDF = udf(stringToBinary _)
希望对你有帮助
【讨论】:
如果它接受一个数组def stringToBinary(stringValues: Array[String])
stringValues[0]
代表什么?
它肯定会代表Array
中的第一个String
,它代表UDF 中传递的第一个Column
。另一种选择是使用*
来引用String
类型的许多参数,并像往常一样使用逗号分隔的符号传递参数。定义看起来像def stringToBinary(stringValues: String*)
以上是关于将 UDF 应用于 Spark Dataframe 中的多个列的主要内容,如果未能解决你的问题,请参考以下文章
将 UDF 应用于 spark 2.0 中的 SparseVector 列
使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas
如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException