在火花提交作业中读取镶木地板文件时出现内存不足错误

Posted

技术标签:

【中文标题】在火花提交作业中读取镶木地板文件时出现内存不足错误【英文标题】:Getting out of memory error while reading parquet file in spark submit job 【发布时间】:2017-11-16 11:09:47 【问题描述】:
[Stage 0:>                                                          (0 + 0) / 8]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[Stage 1:=====================================================>   (43 + 3) / 46]17/11/16 13:11:18 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 54)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/16 13:11:18 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-4,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/16 13:11:18 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 54, localhost): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这是我的代码-

val sqlContext = new SQLContext(sc)
    //sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
    log.setLevel(Level.INFO)
    val config = HBaseConfiguration.create()
    val newDataDF = sqlContext.read.parquet(file)
    newDataDF.registerTempTable("newDataDF")
    //sqlContext.cacheTable("newDataDF")
    val result = sqlContext.sql("SELECT rec FROM newDataDF")
    val rows = result.map(t => t(0)).collect()
    //val rows = result.map(t => t.getAs[String]("rec"))

它在下面一行抛出内存不足 //val rows = result.map(t => t(0)).collect()

已尝试所有内存调整和增加执行程序/驱动程序内存的选项,但似乎没有任何效果。 任何建议将不胜感激。

【问题讨论】:

【参考方案1】:

好吧,通过在您的 DataFrame 上调用 collect,您告诉 Spark 将所有数据收集到驱动程序上。对于较大的数据集,这确实会淹没驱动程序并导致 OOM。

Spark 是一个分布式计算框架,旨在用于不适合单台机器的大型数据集。只有在极少数情况下,您才想在 DataFrame 上调用 collect,即当您进行调试(在小型数据集上)或者您知道您的数据集的大小已大大减小时由于一些过滤或聚合转换。

【讨论】:

感谢您的回复。我同意在 DataFrame 上调用 collect 会将所有数据带到驱动程序上。由于我正在读取一个很大的镶木地板文件,有没有其他方法可以读取大量数据而不是收集驱动程序上的所有数据?我可以调用任何其他方法或函数来收集大量数据而不会使驱动程序过载吗?或者任何其他方式我可以做到这一点?如果您有任何解决方案,请提前致谢。 那么,您想对驱动程序上的数据做什么?我假设您需要进行一些查询? 是的,我想从 parquet 文件中查询和获取所有数据,然后将其存储在 hbase 表中。如果您对此有任何想法,请告诉我。再次感谢。 在这种情况下,您需要调用write 而不是collect write 用于写入数据。我需要阅读所有数据并对其进行处理。这就是我使用收集的原因。也可以使用过滤器,但我的用例是读取/收集所有数据。【参考方案2】:

你必须增加spark.driver.memory,默认值为1gb。您可以使用 --verbose 命令检查驱动程序和执行程序的内存。有关更多信息,请查看此链接并根据您的要求设置内存。 https://spark.apache.org/docs/latest/configuration.html

【讨论】:

感谢您的回复。有没有一种方法可以在从命令行运行作业时获取 spark conf 文件,并且无需在命令行中指定驱动程序内存?提前感谢您的回复。 是的,您可以在运行时设置内存。上面链接中显示的示例。

以上是关于在火花提交作业中读取镶木地板文件时出现内存不足错误的主要内容,如果未能解决你的问题,请参考以下文章

在 Python Pandas 中使用 read_parquet 从 AWS S3 读取镶木地板文件时出现分段错误

将小 PySpark DataFrame 写入镶木地板时出现内存错误

如何在读取前根据定义的模式读取 pyspark 中的镶木地板文件?

如何使用镶木地板在火花中读取和写入同一个文件?

Spark 读取镶木地板文件时出现问题

从具有时间戳的镶木地板蜂巢表中读取火花