UDF:在 Dataframe scala 上应用函数
Posted
技术标签:
【中文标题】UDF:在 Dataframe scala 上应用函数【英文标题】:UDF: apply a function on Dataframe scala 【发布时间】:2018-04-04 13:28:25 【问题描述】:我有一个 scala 函数,计算两个日期之间的差异,将两个 LocalDateTime
作为参数:
我有一个包含两个字段 start_date 和 finish_date 的数据框。
我想构建一个 UDF “可能是”来在我的数据帧上应用函数 toEquals,尤其是在 é 字段 start_date
和 finish_date
上计算它们之间的差异。但是start_date和finish_date的类型是String。
【问题讨论】:
@RameshMaharjan 我编辑了我的问题。 您还可以分享来自input_table
的一些示例行吗? adjust
和 toEnd
函数是什么
我改了。谢谢
在下面试试我的答案,让我知道 :)
【参考方案1】:
我还没有测试代码,但是在udf
函数中使用你的toEquals
逻辑 应该足够了
import org.apache.spark.sql.functions.udf
def toEquals = udf((rd1: String, rd2: String) =>
val d1 = adjust(LocalDateTime.parse(rd1, DATE_TIME_FORMATTER))
val d2 = adjust(LocalDateTime.parse(rd2, DATE_TIME_FORMATTER), asc = false)
if (d1.isAfter(d2)) 0.hours.toString
else if (d1.toLocalDate.isEqual(d2.toLocalDate))
(toEnd(d1.toLocalTime) - toEnd(d2.toLocalTime)).toString
else
(toEnd(d1.toLocalTime) + jourOuvree(d1.toLocalDate.plusDays(1), d2.toLocalDate.minusDays(1)) * 8.hours + toStart(d2.toLocalTime)).toString
)
你可以调用udf
函数
input_table.withColumn("toEquals", toEquals($"start_date",$"finish_date"))
【讨论】:
感谢您的回答。 scala> input_os_historystep.withColumn("toEquals", toEquals($"start_date",$"finish_date")) java.lang.UnsupportedOperationException: org.apache.spark.sql 不支持 scala.concurrent.duration.FiniteDuration 类型的模式.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:756) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:694) at org.apache.spark.sql.functions$.udf(函数.scala:3200) 错误消息很清楚,您必须将 FiniteDuration 对象转换为支持的类型。 supported types您必须通过将 FiniteDuration 转换为任何类型来返回 你的意思是我应该把它转换成 LocalDateTime ?因为我有一个带有参数 FiniteDuration 的类格式化程序,所以我编辑了我的问题。非常感谢 只需将对象更改为 String 。我已经使用 .toString 更新了我的答案。请尝试一下 很抱歉打扰您,您的解决方案很棒,但是当我执行 input_table.show() 时,它不会将新列添加到 Equals,另一方面它会增加按 +1 列,但它不显示它,也不在 printSchema以上是关于UDF:在 Dataframe scala 上应用函数的主要内容,如果未能解决你的问题,请参考以下文章
在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列