将 UDF 应用于 Spark Dataframe 中的多个列

Posted

技术标签:

【中文标题】将 UDF 应用于 Spark Dataframe 中的多个列【英文标题】:Apply UDF to multiple columns in Spark Dataframe 【发布时间】:2017-07-19 11:23:01 【问题描述】:

我有一个如下所示的数据框

| id| age|   rbc|  bgr| dm|cad|appet| pe|ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|  3|48.0|normal|117.0| no| no| poor|yes|yes|           ckd|
....
....
....

我编写了一个 UDF 来将分类 yes, no, poor, normal 转换为二进制 0s1s

def stringToBinary(stringValue: String): Int = 
    stringValue match 
        case "yes" => return 1
        case "no" => return 0
        case "present" => return 1
        case "notpresent" => return 0
        case "normal" => return 1
        case "abnormal" => return 0
    


val stringToBinaryUDF = udf(stringToBinary _)

我将其应用于数据框如下

val newCol = stringToBinaryUDF.apply(col("pc")) //creates the new column with formatted value
val refined1 = noZeroDF.withColumn("dm", newCol) //adds the new column to original

如何将多个列传递到 UDF,这样我就不必为其他分类列重复自己?

【问题讨论】:

【参考方案1】:

如果您有 spark 函数来完成与 udf 函数序列化和反序列化列数据相同的工作,则不应选择udf 函数。

给定一个dataframe

+---+----+------+-----+---+---+-----+---+---+--------------+
|id |age |rbc   |bgr  |dm |cad|appet|pe |ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|3  |48.0|normal|117.0|no |no |poor |yes|yes|ckd           |
+---+----+------+-----+---+---+-----+---+---+--------------+

你可以用when函数来实现你的要求

import org.apache.spark.sql.functions._
def applyFunction(column : Column) = when(column === "yes" || column === "present" || column === "normal", lit(1))
  .otherwise(when(column === "no" || column === "notpresent" || column === "abnormal", lit(0)).otherwise(column))

df.withColumn("dm", applyFunction(col("dm")))
  .withColumn("cad", applyFunction(col("cad")))
  .withColumn("rbc", applyFunction(col("rbc")))
  .withColumn("pe", applyFunction(col("pe")))
  .withColumn("ane", applyFunction(col("ane")))
  .show(false)

结果是

+---+----+---+-----+---+---+-----+---+---+--------------+
|id |age |rbc|bgr  |dm |cad|appet|pe |ane|classification|
+---+----+---+-----+---+---+-----+---+---+--------------+
|3  |48.0|1  |117.0|0  |0  |poor |1  |1  |ckd           |
+---+----+---+-----+---+---+-----+---+---+--------------+

现在问题清楚地表明您不想为所有列重复该过程,您可以执行以下操作

val columnsTomap = df.select("rbc", "cad", "rbc", "pe", "ane").columns

var tempdf = df
columnsTomap.map(column => 
  tempdf = tempdf.withColumn(column, applyFunction(col(column)))
)

tempdf.show(false)

【讨论】:

【参考方案2】:

您也可以使用foldLeft 函数。将您的 UDF 称为 stringToBinaryUDF:

import org.apache.spark.sql.functions._

val categoricalColumns = Seq("rbc", "cad", "rbc", "pe", "ane")
val refinedDF = categoricalColumns
    .foldLeft(noZeroDF)  (accumulatorDF: DataFrame, columnName: String) =>
         accumulatorDF
            .withColumn(columnName, stringToBinaryUDF(col(columnName)))
     

这将尊重不变性和函数式编程。

【讨论】:

【参考方案3】:

UDF 可以采用许多参数,即许多列,但它应该返回一个结果,即一列。

为此,只需将参数添加到您的stringToBinary 函数即可。

如果您希望它占据两列,它将如下所示:

def stringToBinary(stringValue: String, secondValue: String): Int = 
stringValue match 
    case "yes" => return 1
    case "no" => return 0
    case "present" => return 1
    case "notpresent" => return 0
    case "normal" => return 1
    case "abnormal" => return 0


val stringToBinaryUDF = udf(stringToBinary _)

希望对你有帮助

【讨论】:

如果它接受一个数组def stringToBinary(stringValues: Array[String])stringValues[0]代表什么? 它肯定会代表Array 中的第一个String,它代表UDF 中传递的第一个Column。另一种选择是使用* 来引用String 类型的许多参数,并像往常一样使用逗号分隔的符号传递参数。定义看起来像def stringToBinary(stringValues: String*)

以上是关于将 UDF 应用于 Spark Dataframe 中的多个列的主要内容,如果未能解决你的问题,请参考以下文章

将 UDF 应用于 spark 2.0 中的 SparseVector 列

使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas

如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?

是否可以将字符串注册为 UDF?

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

Spark:DataFrame 上 UDF 的任务不可序列化