在 UDF 正文中将列作为参数传递

Posted

技术标签:

【中文标题】在 UDF 正文中将列作为参数传递【英文标题】:Pass a column as an argument inside UDF Body 【发布时间】:2019-11-15 17:14:14 【问题描述】:

我有以下数据框:

+---------+----------+---------------------+------------+-----------------+
|CAR_OWNER|MOTOR_TYPE|ELECTRIFICATION_RATIO|ENERGY_IN_US|ENERGY_OUTSIDE_US|
+---------+----------+---------------------+------------+-----------------+
|     Alex|Electrical|                  1.0|          15|                0|
|      Bob| Thermical|                  0.0|           0|                5|
|   Claire|    Hybrid|                  0.5|           0|               10|
+---------+----------+---------------------+------------+-----------------+

使用以下函数:

def associateEnergy(motorType: String, consumedEnergy: Float, consumptionType: String, elecRatio: Float): Float =
motorType match 
  case "Electrical" => if (consumptionType == "ELEC") consumedEnergy else 0f
  case "Thermical"  => if (consumptionType == "THERM") consumedEnergy else 0f
  case "Hybrid"     => if (consumptionType == "ELEC") consumedEnergy * elecRatio else consumedEnergy * (1 - elecRatio)

我想计算以下字段:

ELEC_ENERGY_IN_US ELEC_ENERGY_OUTSIDE_US THERM_ENERGY_IN_US THERM_ENERGY_IN_US

我可以使用以下 Udf 来做到这一点:

def associateEnergyUdf(consumptionType: String) = udf(
    (motorType: String, consumedEnergy: Float, elecRatio: Float) =>
      associateEnergy(motorType, consumedEnergy, consumptionType, elecRatio)
  )

还有这段代码:

inputDf
    .withColumn("ELEC_ENERGY_IN_US", associateEnergyUdf("ELEC")(col("MOTOR_TYPE"), col("ENERGY_IN_US"), col("ELECTRIFICATION_RATIO")))
    .withColumn("ELEC_ENERGY_OUTSIDE_US", associateEnergyUdf("ELEC")(col("MOTOR_TYPE"), col("ENERGY_OUTSIDE_US"), col("ELECTRIFICATION_RATIO")))
    .withColumn("THERM_ENERGY_IN_US", associateEnergyUdf("THERM")(col("MOTOR_TYPE"), col("ENERGY_IN_US"), col("ELECTRIFICATION_RATIO")))
    .withColumn("THERM_ENERGY_OUTSIDE_US", associateEnergyUdf("THERM")(col("MOTOR_TYPE"), col("ENERGY_OUTSIDE_US"), col("ELECTRIFICATION_RATIO")))

但我不想重复四次 col("MOTOR_TYPE")col("ELECTRIFICATION_RATIO") 参数。所以我创建了以下 udf :

def associateEnergyReducedUdf(consumptionType: String)(consumedEnergyCol: Column) = udf(
    () => associateEnergyUdf(consumptionType)(col("MOTOR_TYPE"), consumedEnergyCol, col("ELECTRIFICATION_RATIO"))
  )

所以我只需要打电话:

inputDf
    .withColumn("ELEC_ENERGY_IN_US", associateEnergyReducedUdf("ELEC")(col("ENERGY_IN_US"))())
    .withColumn("ELEC_ENERGY_OUTSIDE_US", associateEnergyReducedUdf("ELEC")(col("ENERGY_OUTSIDE_US"))())
    .withColumn("THERM_ENERGY_IN_US", associateEnergyReducedUdf("THERM")(col("ENERGY_IN_US"))())
    .withColumn("THERM_ENERGY_OUTSIDE_US", associateEnergyReducedUdf("THERM")(col("ENERGY_OUTSIDE_US"))())

但这会导致以下错误:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

如何在不重复不必要的参数的情况下实现这一点?

【问题讨论】:

【参考方案1】:

在您的简化 UDF 中,您将一个已经存在的 UDF 包装到另一个 UDF 中。试试吧:

def associateEnergyReducedUdf(consumptionType: String)(consumedEnergyCol: Column) =
      () => associateEnergyUdf(consumptionType)(col("MOTOR_TYPE"), consumedEnergyCol, col("ELECTRIFICATION_RATIO"))

这行得通

【讨论】:

以上是关于在 UDF 正文中将列作为参数传递的主要内容,如果未能解决你的问题,请参考以下文章

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件

阿帕奇火花。 UDF 列基于另一列而不将其名称作为参数传递。

如何在用户定义的函数中将数据库列作为参数传递?

在存储过程中将输入参数作为值传递

如何使用 Springboot 在 REST 请求 URL 中将 json 对象作为参数传递