用于日期操作的 SparkSQL (Spark 1.3) UDF

Posted

技术标签:

【中文标题】用于日期操作的 SparkSQL (Spark 1.3) UDF【英文标题】:SparkSQL (Spark 1.3) UDF for operations on Date 【发布时间】:2016-01-12 16:07:56 【问题描述】:

我有一个数据框,其中包含两个字符串列,其中包含有关日期的信息(即“2014-01-01”)。我想对此类列进行操作,例如转换为日期格式和减去日期。我尝试使用我在互联网上找到的内容定义 UDF,例如:

import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))

但是当我使用它时,我得到了以下错误:

scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon$6)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.functions$.udf(functions.scala:402)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC.<init>(<console>:55)
    at $iwC.<init>(<console>:57)
    at <init>(<console>:59)
    at .<init>(<console>:63)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

你能帮我解决这个问题吗?谢谢!

【问题讨论】:

【参考方案1】:

SparkSQL 分别使用java.sql.Timestampjava.sql.Date 表示时间戳和日期。 java.util.Date 在这里不起作用。您可以简单地提取毫秒并传递给java.sql.Date 构造函数。

在实践中,我会考虑使用 HiveContext 和 Hive UDF。例如,您可以使用带有指定模式的unix_timestamp(使用Simple Date Format)将字符串转换为

df.selectExpr("*", 
  """unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""")

并使用标准类型转换来获取日期或时间戳。

【讨论】:

以上是关于用于日期操作的 SparkSQL (Spark 1.3) UDF的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark:SparkSQL的使用

大数据之Spark:SparkSQL开窗函数实战

大数据Spark SQL 快速入门(第二集)

减去日期并连接字符串 - spark SQL

Spark SQL 快速入门(第二集)

Spark 和 SparkSQL:如何模仿窗口功能?