UDF:在 Dataframe scala 上应用函数

Posted

技术标签:

【中文标题】UDF:在 Dataframe scala 上应用函数【英文标题】:UDF: apply a function on Dataframe scala 【发布时间】:2018-04-04 13:28:25 【问题描述】:

我有一个 scala 函数,计算两个日期之间的差异,将两个 LocalDateTime 作为参数:

我有一个包含两个字段 start_date 和 finish_date 的数据框。

我想构建一个 UDF “可能是”来在我的数据帧上应用函数 toEquals,尤其是在 é 字段 start_datefinish_date 上计算它们之间的差异。但是start_date和finish_date的类型是String。

【问题讨论】:

@RameshMaharjan 我编辑了我的问题。 您还可以分享来自input_table 的一些示例行吗? adjusttoEnd 函数是什么 我改了。谢谢 在下面试试我的答案,让我知道 :) 【参考方案1】:

我还没有测试代码,但是udf 函数中使用你的toEquals 逻辑 应该足够了

import org.apache.spark.sql.functions.udf
def toEquals = udf((rd1: String, rd2: String) => 
  val d1 = adjust(LocalDateTime.parse(rd1, DATE_TIME_FORMATTER))
  val d2 = adjust(LocalDateTime.parse(rd2, DATE_TIME_FORMATTER), asc = false)     
  if (d1.isAfter(d2)) 0.hours.toString
  else if (d1.toLocalDate.isEqual(d2.toLocalDate)) 
    (toEnd(d1.toLocalTime) - toEnd(d2.toLocalTime)).toString
  
  else 
    (toEnd(d1.toLocalTime) + jourOuvree(d1.toLocalDate.plusDays(1), d2.toLocalDate.minusDays(1)) * 8.hours + toStart(d2.toLocalTime)).toString
  
)

你可以调用udf函数

input_table.withColumn("toEquals", toEquals($"start_date",$"finish_date"))

【讨论】:

感谢您的回答。 scala> input_os_historystep.withColumn("toEquals", toEquals($"start_date",$"finish_date")) java.lang.UnsupportedOperationException: org.apache.spark.sql 不支持 scala.concurrent.duration.FiniteDuration 类型的模式.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:756) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:694) at org.apache.spark.sql.functions$.udf(函数.scala:3200) 错误消息很清楚,您必须将 FiniteDuration 对象转换为支持的类型。 supported types您必须通过将 FiniteDuration 转换为任何类型来返回 你的意思是我应该把它转换成 LocalDateTime ?因为我有一个带有参数 FiniteDuration 的类格式化程序,所以我编辑了我的问题。非常感谢 只需将对象更改为 String 。我已经使用 .toString 更新了我的答案。请尝试一下 很抱歉打扰您,您的解决方案很棒,但是当我执行 input_table.show() 时,它不会将新列添加到 Equals,另一方面它会增加按 +1 列,但它不显示它,也不在 printSchema

以上是关于UDF:在 Dataframe scala 上应用函数的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将字符串注册为 UDF?

Scala Spark 中的 udf 运行时错误

在不使用 UDF 的情况下从数据帧访问 scala 映射

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列

Spark:DataFrame 上 UDF 的任务不可序列化

pyspark中未定义的函数UDF?