用于日期操作的 SparkSQL (Spark 1.3) UDF
Posted
技术标签:
【中文标题】用于日期操作的 SparkSQL (Spark 1.3) UDF【英文标题】:SparkSQL (Spark 1.3) UDF for operations on Date 【发布时间】:2016-01-12 16:07:56 【问题描述】:我有一个数据框,其中包含两个字符串列,其中包含有关日期的信息(即“2014-01-01”)。我想对此类列进行操作,例如转换为日期格式和减去日期。我尝试使用我在互联网上找到的内容定义 UDF,例如:
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))
但是当我使用它时,我得到了以下错误:
scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon$6)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.functions$.udf(functions.scala:402)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
你能帮我解决这个问题吗?谢谢!
【问题讨论】:
【参考方案1】:SparkSQL 分别使用java.sql.Timestamp
和java.sql.Date
表示时间戳和日期。 java.util.Date
在这里不起作用。您可以简单地提取毫秒并传递给java.sql.Date
构造函数。
在实践中,我会考虑使用 HiveContext
和 Hive UDF。例如,您可以使用带有指定模式的unix_timestamp
(使用Simple Date Format)将字符串转换为秒
df.selectExpr("*",
"""unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""")
并使用标准类型转换来获取日期或时间戳。
【讨论】:
以上是关于用于日期操作的 SparkSQL (Spark 1.3) UDF的主要内容,如果未能解决你的问题,请参考以下文章