在 takeSample 上 Spark 作业的堆内存不足

Posted

技术标签:

【中文标题】在 takeSample 上 Spark 作业的堆内存不足【英文标题】:Spark job running out of heap memory on takeSample 【发布时间】:2015-08-25 14:28:55 【问题描述】:

我有一个 Apache spark 集群,其中包含一个主节点和三个工作节点。工作节点有 32 个核心和 124G 内存。我还在 HDFS 中有一个数据集,其中包含大约 6.5 亿条文本记录。这个数据集是一些像这样读入的序列化 RDD:

import org.apache.spark.mllib.linalg.Vector, Vectors, SparseVector
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")

我想从这些记录中提取一百万个样本来做一些分析,所以我想试试val sample = vectors.takeSample(false, 10000, 0)。但是,这最终会失败并显示此错误消息:

 15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r

我知道我的堆空间用完了(我认为是在驱动程序上?),这是有道理的。执行hadoop fs -du -s /path/to/data,数据集在磁盘上占用了 2575 GB(但大小仅为约 850 GB)。

所以,我的问题是,我能做些什么来提取这个包含 1000000 条记录的样本(我稍后计划将其序列化到磁盘)?我知道我可以用较小的样本量做takeSample() 并在以后聚合它们,但我认为我只是没有设置正确的配置或做错了什么,这阻止了我以我想要的方式这样做。

【问题讨论】:

我最终不得不增加 spark.driver.memoryspark.driver.maxResultSize 才能让事情正常进行。此外,根据接受的响应调整我的集群可能也有帮助。 【参考方案1】:

在处理大数据时,在驱动程序节点收集中间结果很少是一个好主意。相反,将数据分布在集群中几乎总是更好。您要采集的样本也是如此。

如果您想对数据集的 1000000 个元素进行采样,然后将其写入磁盘,那么为什么不采集样本并将其写入磁盘而不在驱动程序中收集呢?以下代码 sn-p 应该正是这样做的

val sample = vectors.zipWithIndex().filter(_._1 < 1000000).map(_._2)

sample.saveAsObjectFile("path to file")

【讨论】:

这会只取前 1000000 条记录吗?我希望有一个随机分布的样本 @NathanielPawelczyk,根据前面的 Spark 操作,结果可能是随机的。 zipWithIndex 的 JavaDocs 说:“请注意,某些 RDD,例如由 groupBy() 返回的那些,不保证分区中元素的顺序。因此不保证分配给每个元素的索引,如果RDD 被重新评估。如果需要固定的排序来保证相同的索引分配,您应该使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。" 但是如果您的样本不必是固定大小的,那么您也可以使用RDD.sample(false, p, 0) 操作创建一个样本,该样本是原始RDD 的p-th 部分。 【参考方案2】:

您可以通过增加分区的数量来做到这一点,使每个分区更小。检查您正在设置的执行程序的数量以及为每个执行程序保留多少内存也很重要(您没有将此信息放在问题上)。

我发现this guide 对调整 Spark 非常有用。

【讨论】:

我过去一般不设置--num-executors,而是设置--executor-memory 124G--executor-cores 32。如果我将--num-executors 设置为大于 3 的数字,这是否意味着每个节点只运行多个执行程序?这是否需要我按比例减少我在--executor-cores--executor-memory 中的设置? 正确!根据您的集群配置,您可能还需要一些空间用于主程序。它是高度可变的,但我通常将容量划分为三分之一或四分之一。比如说,你可以使用 9 个执行器,10 个内核和 40 GB,为操作系统等留下一些东西。 另外,请注意您的内存问题很可能出在驱动程序上。还有spark.driver.memoryspark.driver.coresspark.driver.maxResultSize等配置。 澄清一下——Spark Web UI 没有反映这些每个执行者的统计信息,对吗?对我来说,它似乎只显示每个物理工作节点的统计信息(例如使用的内核和内存)。 确实如此,请务必查看所有选项卡!有一个执行者列表,里面有很多信息。

以上是关于在 takeSample 上 Spark 作业的堆内存不足的主要内容,如果未能解决你的问题,请参考以下文章

如何在特定节点上运行 Spark 作业

关于在集群 (AWS) 上运行 Spark 作业的说明

作业未显示在 Spark WebUI 上

从外部在 Hortonworks Sandbox 上执行 Spark 作业

python作业上的Spark提交过程泄漏

在 Yarn 集群上运行 Spark 作业的问题