在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)
Posted
技术标签:
【中文标题】在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)【英文标题】:Iterating/looping over Spark parquet files in a script results in memory error/build-up (using Spark SQL queries) 【发布时间】:2016-05-20 01:03:47 【问题描述】:我一直在试图弄清楚当我循环遍历 parquet 文件和几个后处理函数时,如何防止 Spark 由于内存问题而崩溃。对大量文本感到抱歉,但这不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的 Stack Overflow 表单,我们深表歉意!
基本伪代码为:
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(它不能将 sparkContext 或 sqlQuery 作为输入)而不是for 循环的标准。
从技术上讲,这是一个包含分区的大型 parquet 文件,但一次读取并查询它的规模太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个 parquet 分区(子目录)并编写相关的输出报告。
由于整个 parquet 文件的大小,不确定将所有代码包裹在大 mapPartition() 周围是否可行?
但是在几个循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。 (我已经确认循环崩溃的文件没有什么特别之处;在第二个或第三个循环中读取的任何随机文件都会发生这种情况。)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
我意识到 Spark 并不意味着要在循环中运行,但是这些 SQL 查询对于标准 Spark SQL 打包函数来说有点过于复杂,我们为每个文件编写了多个关于不同聚合统计信息的汇总报告。
有没有办法在每个循环索引结束时基本上清除内存?使用 sqlContext.dropTempTable() 删除任何已注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程还没有“结束”(看起来你曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)
我还应该注意,在完成处理后,我不会在循环中的数据帧上调用 unpersist(),但我也不会在它们上调用 persist();我只是重写了每个循环中的数据帧(这可能是问题的一部分)。
我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够多的内存来完成此脚本的一个循环(并且一个循环确实运行没有任何错误)。
任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。
【问题讨论】:
【参考方案1】:更新:如果我在每个循环中完成后对从 sql 查询创建的每个表调用 unpersist(),则循环可以成功地继续到下一次迭代,而不会出现内存问题。如上所述,.clearCache() 和单独删除临时表并没有解决问题。我猜这很有效,因为虽然这些表来自 sparkSQL 查询,但它返回一个 RDD。
即使我没有对这些 RDD 调用persist(),我也必须告诉 Spark 在下一个循环开始之前清除它们,以便我可以将新的 SQL 查询分配给这些相同的变量名。
【讨论】:
另外,对于 Spark 新手来说,仅供参考:除非函数是 UDF(这通常意味着它有点简单或返回说只有一列,而不是整个表)并且您可以对 spark 数据采取行动框架使用传统的 UDF API 函数/语法,然后避免使用函数,只需将代码编写为一个长脚本。它看起来很难看,但这似乎将我的代码速度提高了近 50%。并避免仅在 Python 中循环 - 在循环中从 Bash 调用您的 spark 脚本,以便每次处理新文件/输入数据帧时都从新的 Spark 上下文开始。这将避免内存问题。【参考方案2】:如果可以,请尝试升级到新发布的 spark 2.0。
我在 Java 堆空间方面遇到了与您非常相似的问题。通过简单地重复创建数据帧并使用 spark 1.6.2 反复调用的过程,我能够超过 4G 的堆空间。
在使用 SparkSession 的 spark 2.0 中,相同的程序仅获得 1.2 GB 的堆空间,并且内存使用情况与我正在运行的程序的预期非常一致。
【讨论】:
以上是关于在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)的主要内容,如果未能解决你的问题,请参考以下文章
spark剖析:spark读取parquet文件会有多少个task
spark剖析:spark读取parquet文件会有多少个task
spark剖析:spark读取parquet文件会有多少个task
spark剖析:spark读取parquet文件会有多少个task