内存不足错误的 Spark 配置

Posted

技术标签:

【中文标题】内存不足错误的 Spark 配置【英文标题】:Spark configuration for Out of memory error 【发布时间】:2018-06-19 19:10:42 【问题描述】:

集群设置 -

Driver has 28gb
Workers have 56gb each (8 workers)

配置-

spark.memory.offHeap.enabled true
spark.driver.memory 20g
spark.memory.offHeap.size 16gb
spark.executor.memory 40g

我的工作 -

//myFunc just takes a string s and does some transformations on it, they are very small strings, but there's about 10million to process.


//Out of memory failure
data.map(s => myFunc(s)).saveAsTextFile(outFile)

//works fine
data.map(s => myFunc(s))

此外,我从我的程序中去集群/删除了 spark,它在具有 56gb 内存的单个服务器上完成得很好(成功保存到文件中)。这表明它只是一个火花配置问题。我查看了https://spark.apache.org/docs/latest/configuration.html#memory-management,我目前的配置似乎是我的工作需要更改的所有内容。我还应该改变什么?

更新-

数据 -

val fis: FileInputStream = new FileInputStream(new File(inputFile))
val bis: BufferedInputStream = new BufferedInputStream(fis);
val input: CompressorInputStream = new CompressorStreamFactory().createCompressorInputStream(bis);
br = new BufferedReader(new InputStreamReader(input))
val stringArray = br.lines().toArray()
val data = sc.parallelize(stringArray)

注意 - 这不会导致任何内存问题,即使它非常低效。我无法使用 spark 读取它,因为它会引发一些 EOF 错误。

myFunc,我不能真正发布它的代码,因为它很复杂。但基本上,输入字符串是一个分隔字符串,它会进行一些分隔符替换、日期/时间规范化等。输出字符串与输入字符串的大小大致相同。

此外,它适用于较小的数据大小,并且输出正确且与输入数据文件的大小大致相同。

【问题讨论】:

什么是data,它是如何生成的? myFunc的定义是什么? myFunc 中可能存在导致内存问题的问题,但更可能的问题是用于创建 data 的其他转换之一。不看代码就无法判断。所以我们需要从源文件中查看完整的流程。工作正常的那个这样做是因为它什么都不做。请记住,在您运行操作之前,Spark 中不会发生任何事情。 @puhlen 用信息更新了主帖 【参考方案1】:

您当前的解决方案没有利用火花。您将整个文件加载到内存中的数组中,然后使用 sc.parallelize 将其分发到 RDD 中。这非常浪费内存(即使没有 spark),当然会导致大文件出现内存不足的问题。

改为使用sc.textFile(filePath) 创建您的RDD。然后 spark 能够以块的形式智能地读取和处理文件,因此一次只需要其中一小部分在内存中。您还可以通过这种方式利用并行性,因为 spark 将能够并行读取和处理文件,无论您拥有多少执行程序和代码,而不是需要在单个线程上读取整个文件机器。

假设 myFunc 一次只能查看一行,那么这个程序的内存占用应该非常小。

【讨论】:

是的,我试过了,我写了一个注释说这是不可能的,因为我遇到了一些 EOF 错误并且无论如何都无法修改输入文件。但是,如果我以某种方式让它工作,这会解决我的问题吗?因为读取整个文件本身不会导致内存错误。 也对,MyFunc 一次只取一行,有 1500 万~ 行 @jayjay93 是的,如果您使用textfile 让它工作,它应该可以解决您的问题。如果您觉得需要将大型数据集加载到数组中,也可以增加驱动程序内存,然后使用sc.parallelize。如果程序在没有火花的情况下运行良好,那么为什么不这样做呢。根据您的描述,您可能可以编写一个高效的程序,使用在单台机器上运行的 100MB 以下内存来完成任务。 我并不怀疑你,但在我尝试修复文本文件问题之前,我可以将整个文件存储在我的驱动程序上,同时只使用 20% 的内存。那么为什么“智能读取”文件可以帮助我避免内存不足?驱动程序上的 1500 万项数组根本没有引起任何问题,我已经测试过了。 .//////////////【参考方案2】:

如果您在 MAP 之前和之后在您的程序中提供更多详细信息,将会有所帮助。 除非触发动作,否则第二个命令(仅 Map)不会执行任何操作。您的文件可能未分区,驱动程序正在执行此工作。下面应该强制将数据均匀地分配给工作人员并保护单个节点上的 OOM。但它会导致数据混洗。

查看您的代码后更新解决方案,如果您这样做会更好

val data = sc.parallelize(stringArray).repartition(8)
data.map(s => myFunc(s)).saveAsTextFile(outFile)

【讨论】:

我在帖子中添加了更新,提供了更多信息。不过我会试试的,谢谢!

以上是关于内存不足错误的 Spark 配置的主要内容,如果未能解决你的问题,请参考以下文章

Spark:广播对象时内存不足

在 Spark SQL 中读取 40 万行时出现内存不足错误 [重复]

多次迭代内存不足

使用 jdbc 驱动程序读取大表时超时和内存不足错误

Apache Spark 内存不足,分区数量较少

由于内存不足,Spark Join 失败