如何让 PySpark 在内存不足之前将中间结果写入磁盘?

Posted

技术标签:

【中文标题】如何让 PySpark 在内存不足之前将中间结果写入磁盘?【英文标题】:How do I get PySpark to write intermediate results to disk before running out of memory? 【发布时间】:2017-03-17 05:06:26 【问题描述】:

背景:在 Hadoop Streaming 中,每个 reduce 作业在完成时都会写入 hdfs,从而为 Hadoop 集群执行下一个 reduce 扫清了道路。

我无法将此范例映射到 (Py)Spark。

举个例子,

df = spark.read.load('path')
df.rdd.reduceByKey(my_func).toDF().write.save('output_path')

当我运行它时,集群会在将任何内容写入磁盘之前收集数据帧中的所有数据。至少当我观察工作进展时,情况看起来是这样的。

我的问题是我的数据比我的集群内存大得多,所以在写入任何数据之前我的内存就用完了。在 Hadoop Streaming 中,我们没有这个问题,因为输出数据被流式传输到磁盘,为后续批次的数据腾出空间。

我考虑过这样的事情:

for i in range(100):
    (df.filter(df.loop_index==i)
        .rdd
        .reduceByKey(my_func)
        .toDF()
        .write.mode('append')
        .save('output_path'))

我在每次迭代中只处理我的数据子集。但这似乎很笨拙,主要是因为我必须坚持df,由于内存限制这是不可能的,或者我必须在每次迭代中从输入的hdfs源重新读取。

使循环工作的一种方法是按天或其他数据子集对源文件夹进行分区。但是为了这个问题,让我们假设这是不可能的。

问题:如何在 PySpark 中运行这样的作业?我只需要一个更大的集群吗?如果是这样,在处理数据之前调整集群大小的常见做法是什么?

【问题讨论】:

你尝试设置 spark.rdd.compress=true 吗? 不,这有帮助吗?除非那使 spark 写入 hdfs,否则这并不能解决我的问题。压缩 rdd 可能会使更多数据适合内存,但这并不能解决我要问的根本问题。 由于没有提示my_func 做了什么,所以很难确定问题出在哪里。此外,如果需要良好的性能,您应该在转换为 DataFrame 时始终提供 schema 【参考方案1】:

在大量分区中重新分区数据可能会有所帮助。下面的示例类似于您的 for 循环,尽管您可能想先尝试使用更少的分区

df = spark.read.load('path').repartition(100)

您还应该查看您当前使用的执行器数量 (--num-executors)。减少这个数字也应该减少你的内存占用。

【讨论】:

以上是关于如何让 PySpark 在内存不足之前将中间结果写入磁盘?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 最近使用的一些有趣姿势的梳理

在 PySpark 中使用 collect_list 时 Java 内存不足

我如何让 pandas 使用 spark 集群

让 CUDA 内存不足

在 Windows 上分配开始失败之前检测内存运行不足

在PySpark中尽力重试