在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)

Posted

技术标签:

【中文标题】在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)【英文标题】:Iterating/looping over Spark parquet files in a script results in memory error/build-up (using Spark SQL queries) 【发布时间】:2016-05-20 01:03:47 【问题描述】:

我一直在试图弄清楚当我循环遍历 parquet 文件和几个后处理函数时,如何防止 Spark 由于内存问题而崩溃。对大量文本感到抱歉,但这不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的 Stack Overflow 表单,我们深表歉意!

基本伪代码为:

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....

此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(它不能将 sparkContext 或 sqlQuery 作为输入)而不是for 循环的标准。

从技术上讲,这是一个包含分区的大型 parquet 文件,但一次读取并查询它的规模太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个 parquet 分区(子目录)并编写相关的输出报告。

由于整个 parquet 文件的大小,不确定将所有代码包裹在大 mapPartition() 周围是否可行?

但是在几个循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。 (我已经确认循环崩溃的文件没有什么特别之处;在第二个或第三个循环中读取的任何随机文件都会发生这种情况。)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space

我意识到 Spark 并不意味着要在循环中运行,但是这些 SQL 查询对于标准 Spark SQL 打包函数来说有点过于复杂,我们为每个文件编写了多个关于不同聚合统计信息的汇总报告。

有没有办法在每个循环索引结束时基本上清除内存?使用 sqlContext.dropTempTable() 删除任何已注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程还没有“结束”(看起来你曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)

我还应该注意,在完成处理后,我不会在循环中的数据帧上调用 unpersist(),但我也不会在它们上调用 persist();我只是重写了每个循环中的数据帧(这可能是问题的一部分)。

我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够多的内存来完成此脚本的一个循环(并且一个循环确实运行没有任何错误)。

任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。

【问题讨论】:

【参考方案1】:

更新:如果我在每个循环中完成后对从 sql 查询创建的每个表调用 unpersist(),则循环可以成功地继续到下一次迭代,而不会出现内存问题。如上所述,.clearCache() 和单独删除临时表并没有解决问题。我猜这很有效,因为虽然这些表来自 sparkSQL 查询,但它返回一个 RDD。

即使我没有对这些 RDD 调用persist(),我也必须告诉 Spark 在下一个循环开始之前清除它们,以便我可以将新的 SQL 查询分配给这些相同的变量名。

【讨论】:

另外,对于 Spark 新手来说,仅供参考:除非函数是 UDF(这通常意味着它有点简单或返回说只有一列,而不是整个表)并且您可以对 spark 数据采取行动框架使用传统的 UDF API 函数/语法,然后避免使用函数,只需将代码编写为一个长脚本。它看起来很难看,但这似乎将我的代码速度提高了近 50%。并避免仅在 Python 中循环 - 在循环中从 Bash 调用您的 spark 脚本,以便每次处理新文件/输入数据帧时都从新的 Spark 上下文开始。这将避免内存问题。【参考方案2】:

如果可以,请尝试升级到新发布的 spark 2.0。

我在 Java 堆空间方面遇到了与您非常相似的问题。通过简单地重复创建数据帧并使用 spark 1.6.2 反复调用的过程,我能够超过 4G 的堆空间。

在使用 SparkSession 的 spark 2.0 中,相同的程序仅获得 1.2 GB 的堆空间,并且内存使用情况与我正在运行的程序的预期非常一致。

【讨论】:

以上是关于在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)的主要内容,如果未能解决你的问题,请参考以下文章

spark剖析:spark读取parquet文件会有多少个task

spark剖析:spark读取parquet文件会有多少个task

spark剖析:spark读取parquet文件会有多少个task

spark剖析:spark读取parquet文件会有多少个task

使用 spark 写入 parquet 文件时如何添加额外的元数据

通过Map Spark Scala循环