Pyspark 中的广播加入得到 OnOutOfMemoryError

Posted

技术标签:

【中文标题】Pyspark 中的广播加入得到 OnOutOfMemoryError【英文标题】:Broadcast join in Pyspark got OnOutOfMemoryError 【发布时间】:2017-10-28 06:42:08 【问题描述】:

我在 pyspark 中对一个不是很大的数据帧使用广播连接,但出现 oom 错误。代码很简单如下:

from pyspark.sql.functions import broadcast
df1 = sc.read.parquet("xxx")
df2 = sc.read.parquet("xxx")
df = df1.join(broadcast(df2), on=["key"])

几秒钟后我得到 OOM 错误

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 26024"...

df2 不是很大,1868895 行。 hdfs文件大约142M。我可以轻松地在驱动程序代码中加载所有数据并将其用作 UDF,这没有问题。但是当使用广播连接时,出现OOM。

我的配置如下:

spark.python.worker.memory=2048m
spark.executor.memory=8g
spark.driver.memory=7g

【问题讨论】:

【参考方案1】:

尝试增加 spark.sql.broadcastTimeout 参数的值,

spark 是 SparkSession 对象

【讨论】:

【参考方案2】:

Parquet 文件在处理时会爆炸。由于高压缩比,它将达到超过您配置的内存。在 8GB 内存中,您将获得 60%,即大约 4GB 用于处理。尝试增加内存或增加广播连接内存。

【讨论】:

以上是关于Pyspark 中的广播加入得到 OnOutOfMemoryError的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 使用广播字典中的日期过滤 RDD

基于 Pyspark 中的键加入 RDD

检查大小广播变量 PySpark

在 pyspark UDF 中使用广播数据帧

加入后替换pyspark数据框中的列

使用 pyspark 数据框中的复制名称加入后使用左表中的所有列进行分组