Pyspark - DataFrame persist() 出错 java.lang.OutOfMemoryError:超出 GC 开销限制

Posted

技术标签:

【中文标题】Pyspark - DataFrame persist() 出错 java.lang.OutOfMemoryError:超出 GC 开销限制【英文标题】:Pyspark - DataFrame persist() errors out java.lang.OutOfMemoryError: GC overhead limit exceeded 【发布时间】:2019-02-14 05:43:19 【问题描述】:

当我尝试持久化在大小约为 270GB 的表上创建的 DataFrame 时,Pyspark 作业失败

线程“yarn-scheduler-ask-am-thread-pool-9”中的异常 java.lang.OutOfMemoryError: 超出 GC 开销限制

仅当我尝试坚持时才会出现此问题。以下是配置,我尝试使用执行器/驱动程序内存、随机分区、执行器的动态分配和持久存储级别(DISK_ONLY、MEMORY_AND_DISK)。我的目的是在一个键上对数据进行分区并持久化,这样我的连续连接会更快。任何建议都会有很大帮助。

Spark 版本: 1.6.1(MapR 分布)数据大小: ~270GB配置: spark.executor.instances - 300 spark.executor.memory - 10g spark.executor.cores - 3 spark.driver.memory - 10g spark.yarn.executor.memoryOverhead - 2048 spark.io.compression.codec - lz4

普通查询

query = "select * from tableA"
df = sqlctx.sql(query)
df.count()

没有persist()的成功运行

重新分区和持久化

记住 shuffle 块,选择 2001 作为分区,因此每个分区将包含大约 128M 的数据。

test = df.repartition(2001, "key")
test.persist(StorageLevel.DISK_ONLY)
test.count()

GC 错误 - 在 Persist() 上

【问题讨论】:

【参考方案1】:

您是否尝试将 spark.executor.memory 增加到其最大值? (如果集群有足够的内存)

您是否尝试增加分区数以增加并行度?从而减小分区本身的大小。

还要分析垃圾收集发生的频率和花费 GC 的时间量的统计信息。 This 文章对分析和配置 GC 的工作非常有用。

【讨论】:

以上是关于Pyspark - DataFrame persist() 出错 java.lang.OutOfMemoryError:超出 GC 开销限制的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

PySpark:转换DataFrame中给定列的值

PySpark|比RDD更快的DataFrame

Pyspark:将 pyspark.sql.row 转换为 Dataframe

是否可以在 Pyspark 中对 DataFrame 进行子类化?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe