Spark SQL:无需秒即可解析时间戳
Posted
技术标签:
【中文标题】Spark SQL:无需秒即可解析时间戳【英文标题】:Spark SQL: parse timestamp without seconds 【发布时间】:2016-10-24 01:09:46 【问题描述】:我不拥有的一些数据带有一个应该是 timestamp
的字段,但有时似乎不符合 ISO 8601 标准。
在我的代码中,我定义了一个模式,然后当 Spark SQL 解析我的 json 数据时,我收到以下错误:
java.lang.IllegalArgumentException: 2016-10-07T11:15Z
源数据有以下内容:
"transaction_date_time": "2016-10-07T11:15Z"
我的架构是这样定义的:
val schema = (new StructType)
.add("transaction_date_time", TimestampType)
我相信这是因为它错过了秒数。我怎样才能正确解析时间戳?
编辑: 例如,使用
读取数据spark.read.schema(schema).json(rdd).show()
会触发以下错误
16/10/24 13:06:27 ERROR Executor: Exception in task 6.0 in stage 5.0 (TID 23)
java.lang.IllegalArgumentException: 2016-10-07T11:15Z
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/24 13:06:27 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/24 13:06:27 ERROR TaskSetManager: Task 6 in stage 5.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 5.0 failed 1 times, most recent failure: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 54 elided
Caused by: java.lang.IllegalArgumentException: 2016-10-07T11:15Z
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
【参考方案1】:你可以改变
val schema = (new StructType)
.add("transaction_date_time", TimestampType)
到
val schema = (new StructType)
.add("transaction_date_time", StringType)
然后使用df.withColumn("columnTimeWithOutSec", unix_timestamp($"time", format))
where format = "format time with out seconds ex HH:mm "
就像this...
另外,请查看 DateTimeUtils.scala 以内联日期和时间戳的 Spark 样式转换。
【讨论】:
【参考方案2】:看起来apache.spark.Timestamp
只是java.sql.Timestamp
的包装。至少这是this 让我相信的。
因此,我们可以使用SimpleDateFormat
解析日期并提取毫秒,然后将其传递给Timestamp
构造函数。
您可以在此示例中执行类似的操作来预处理数据:
import java.sql.Timestamp;
import java.text.*;
import java.util.Date;
public class Test
public static void main(String[] args)
String timestamp = "2016-10-07T11:15Z";
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mmXXX");
Date parsedDate = null;
try
parsedDate = df.parse(timestamp);
catch(Exception e)
//do nothing
Timestamp ts = new Timestamp(parsedDate.getTime());
System.out.println(parsedDate);
System.out.println(ts);
哪些输出
Fri Oct 07 04:15:00 PDT 2016
2016-10-07 04:15:00.0
我搜索了一下“日期格式的可选部分”,发现 this SO 说你应该只制作两个 SimpleDateFormat
s。
【讨论】:
我认为解析该日期并不难,即使使用 Java 8 Time API 也可以开箱即用。我想问题出在 Spark 上,挑战在于将您的代码与上面的代码一起使用(我只是进行了编辑以更好地解释) 我查看了文档并找到了encoder trait。我认为您需要定义一个客户encoder
并使用它来代替TimeStamp
。 Here's another link that looks more useful。仍在四处寻找,但我想我会发布我发现的内容
看起来如果你只是扩展dataType,你可以将它传递给你已经从StructType使用的同一个add
函数以上是关于Spark SQL:无需秒即可解析时间戳的主要内容,如果未能解决你的问题,请参考以下文章
拆分时间戳 Spark databricks python SQL