将火花数据帧写入镶木地板格式时出现内存不足错误

Posted

技术标签:

【中文标题】将火花数据帧写入镶木地板格式时出现内存不足错误【英文标题】:Out of memory error when writing out spark dataframes to parquet format 【发布时间】:2016-07-11 12:43:13 【问题描述】:

我正在尝试从数据库中查询数据,对其进行一些转换并将新数据以 parquet 格式保存在 hdfs 上。

由于数据库查询返回大量行,我分批获取数据并在每个传入的批次上运行上述过程。

UPDATE 2:批处理逻辑是:

import scala.collection.JavaConverters._

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType, StructField, StringType

class Batch(rows: List[String], 
            sqlContext: SQLContext) 

      // The actual schema has around 60 fields
      val schema = Array("name", "age", "address").map(field =>
                       StructField(field, StringType, true)
                   )

      val transformedRows = rows.map(rows => 

              // transformation logic (returns Array[Array[String]] type)

          ).map(row => Row.fromSeq(row.toSeq))

      val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema)



val sparkConf = new sparkConf().setAppName("Spark App")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)

// Code to query database
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next

var batch_num = 0

while (queryResponse.hasNext) 
    val batch = queryResponse.next

    val batchToSave = new Batch(
                          batch.toList.map(_.getDocument.toString),
                          sqlContext)

    batchToSave.dataframe.write.parquet(batch_num + "_Parquet")

    batch_num += 1


我在 1.6.1 中的 Spark 版本和 spark-submit 是:

spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar

问题是在一定数量的批次之后,我得到一个java.lang.OutOfMemoryError 错误。

整个堆栈跟踪是:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at scala.StringContext.standardInterpolator(StringContext.scala:125)
    at scala.StringContext.s(StringContext.scala:90)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
    at app.Application$.main(App.scala:156)
    at app.Application.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我尝试将数据合并到一个分区中,但这没有任何区别。

dataframe.coalesce(1).write.parquet(batch_num + "_Parquet")

任何帮助将不胜感激。

更新 1

不对 RDD 进行合并转换仍然会出错,但堆栈跟踪如下。似乎是 Parquet 的问题。

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
    at app.Application$.main(App.scala:156)
    at app.Application.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
    at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

不不不不不不,不要合并——这与你想做的相反。 How to deal with "java.lang.OutOfMemoryError: Java heap space" error (64MB heap size)的可能重复 如果您的数据(跨所有分区,因为您正在合并数据)大于可用内存,您将拥有 OOM。通常情况下,合并到单个分区是一种不好的做法,因为它会占用您的大部分可扩展性优势。 尝试写入 parquet 格式而不首先合并。仍然无法正常工作,但新的堆栈跟踪指出了镶木地板的问题 我与 2 个执行者一起工作。通过传递--executor-memory 20g --driver-memory 20g 将分配给驱动程序和执行程序的内存增加到 20gb,从而解决了这个问题 【参考方案1】:

今天也遇到了同样的问题。结果我的执行计划相当复杂,toString 生成了 150 MB 的信息,结合 Scala 的字符串插值导致驱动程序内存不足。

您可以尝试增加驱动程序内存(我不得不将它从 8 GB 增加到 16 GB)。

【讨论】:

以上是关于将火花数据帧写入镶木地板格式时出现内存不足错误的主要内容,如果未能解决你的问题,请参考以下文章

将小 PySpark DataFrame 写入镶木地板时出现内存错误

pyspark:数据帧写入镶木地板

当我在蜂巢中写入镶木地板表时出现 Pyspark 错误

Apache Spark 数据帧在写入镶木地板时不会重新分区

如何使用镶木地板在火花中读取和写入同一个文件?

awswrangler 将镶木地板数据帧写入单个文件