在 Spark 2.0 中从 AVRO 写入镶木地板时出现 NullPointerException

Posted

技术标签:

【中文标题】在 Spark 2.0 中从 AVRO 写入镶木地板时出现 NullPointerException【英文标题】:NullPointerException when writing parquet from AVRO in Spark 2.0 【发布时间】:2016-11-07 21:58:15 【问题描述】:

我正在从 AWS S3 加载 AVRO 文件并将它们作为 parquet 写回。

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("AvroParquet").enableHiveSupport().getOrCreate()

in_path = "s3://my-bucket/avro-path/*.avro"
out_path = "s3://my-bucket/parquet-path/output.parquet"

df = spark.read.format("com.databricks.spark.avro").load(in_path)

df.write.save(out_path, format="parquet")

由于某种原因,我在写实木复合地板时收到了这个NullPointerException。可能是在阅读 java.io.ObjectInputStream.readObject0 的 avro 时。

App > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: org.apache.spark.SparkException: Task failed while writing rows
App >   at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:270)
App >   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
App >   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
App >   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
App >   at org.apache.spark.scheduler.Task.run(Task.scala:85)
App >   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
App >   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
App >   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
App >   at java.lang.Thread.run(Thread.java:745)
App > Caused by: java.io.IOException: java.lang.NullPointerException
App >   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1284)
App >   at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
App >   at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
App >   at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
App >   at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
App >   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
App >   at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1.apply(DefaultSource.scala:145)
App >   at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1.apply(DefaultSource.scala:143)
App >   at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279)
App >   at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263)
App >   at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:134)
App >   at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
App >   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
App >   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
App >   at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
App >   at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:262)
App >   at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:261)
App >   at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:261)
App >   at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
App >   at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:267)
App >   ... 8 more
App > Caused by: java.lang.NullPointerException
App >   at com.databricks.spark.avro.DefaultSource$SerializableConfiguration.tryOrIOException(DefaultSource.scala:217)
App >   at com.databricks.spark.avro.DefaultSource$SerializableConfiguration.readObject(DefaultSource.scala:207)
App >   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
App >   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
App >   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
App >   at java.lang.reflect.Method.invoke(Method.java:606)
App >   at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
App >   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
App >   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
App >   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
App >   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
App >   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
App >   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:254)
App >   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1321)
App >   at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:255)
App >   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:189)
App >   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1277)

刚从 1.6 升级到 2.0,不确定问题出在哪里。使用包com.databricks:spark-avro_2.11:3.0.1

似乎与https://github.com/databricks/spark-avro/issues/147 相关,但这应该在 3.0.1 中修复。

【问题讨论】:

【参考方案1】:

这个问题似乎与我的 Spark 配置设置有关,因为我能够在本地和通过 EMR 的 Spark 设置写入 parquet。我在 Github 中创建了一个问题:https://github.com/databricks/spark-avro/issues/188。

【讨论】:

以上是关于在 Spark 2.0 中从 AVRO 写入镶木地板时出现 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

Spark保存(写入)镶木地板只有一个文件

Spark:写入 Avro 文件

Apache Spark 数据帧在写入镶木地板时不会重新分区

配置 Spark 写入 HDFS 的 Avro 文件大小

将大型 Spark 数据帧作为镶木地板写入 s3 存储桶

镶木地板写入期间的 Spark ClosedChannelException 异常