名称中的日期时 Spark CSV 文件写入错误 - InvalidClassException:org.apache.commons.lang3.time.FastDateParser

Posted

技术标签:

【中文标题】名称中的日期时 Spark CSV 文件写入错误 - InvalidClassException:org.apache.commons.lang3.time.FastDateParser【英文标题】:Spark CSV file writing error when date in name - InvalidClassException: org.apache.commons.lang3.time.FastDateParser 【发布时间】:2020-11-24 14:49:46 【问题描述】:

我们有以下 Spark 代码来编写名称中包含 DATE 的 CSV 文件。

输出文件名,例如Example-FIle-2020-11-24

对于日期转换 - 我们使用的是 JAVA SimpleDateFormat

DateFormat dfOut = new SimpleDateFormat("yyyy-MM-dd");
outputPath = "ABC" + dfout.format(date);

Dataset.coalesce(1)
                .write()
                .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
                .option("header", "true")
                .option("nullValue", null)
                .option("emptyValue", null)
                .mode(SaveMode.Overwrite)
                .csv(outputPath);

Spark 应用程序在集群上随机失败。 Spark 尝试多次在同一个执行程序上执行应用程序。之后我们得到 blacklist* 节点相关的错误。

错误-

20/11/24 05:21:55 ERROR datasources.FileFormatWriter: Aborting job b1252410-7869-48ad-8d69-10dd5fa54c12.
org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 1.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 0.1 in stage 1.0 (TID 2, ap-cdpdn8p.oneadr.net, executor 4): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateParser; local class incompatible: stream classdesc serialVersionUID = 3, local class serialVersionUID = 2
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Blacklisting behavior can be configured via spark.blacklist.*.
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
        at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:656)
        at com.nordea.nrc.idv.IDVIntentsExportingEngine.run(IDVIntentsExportingEngine.java:59)
        at com.nordea.nrc.idv.IDVIntentsExportingMain.main(IDVIntentsExportingMain.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:667)
20/11/24 05:21:55 INFO yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
20/11/24 05:21:55 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted.
org.apache.spark.SparkException: Job aborted.

【问题讨论】:

【参考方案1】:

在尝试将数据框保存为 CSV 时,您正在使用 DataFrameWriter 中的 coalesce(1) 函数。 Coalesce 函数将 DataFrame 的分区数减少到传递给它的数量,在您的情况下为 1。这会导致所有数据在保存时移动到一个执行器,从而导致应用程序崩溃。尝试通过删除 Coalesce 功能来保存应用程序。试试下面的代码。

            Dataset
            .write
            .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
            .option("header", "true")
            .option("nullValue", null)
            .option("emptyValue", null)
            .mode(SaveMode.Overwrite)
            .csv(outputPath);

【讨论】:

以上是关于名称中的日期时 Spark CSV 文件写入错误 - InvalidClassException:org.apache.commons.lang3.time.FastDateParser的主要内容,如果未能解决你的问题,请参考以下文章

写入 HDFS 时 Apache spark 中的任务数

将大型 Spark 数据帧从数据块写入 csv 失败

Spark SQL 到 Hive 表 - 日期时间字段小时错误

如何在写入hive orc表时合并spark中的小文件

无法在 Spark 中将文件写入 Zeppelin 上的远程 hdfs

如何在Spark Scala中以CSV格式编写不同的布局