Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]

Posted

技术标签:

【中文标题】Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]【英文标题】:Pandas to PySpark giving OOM error instead of spilling to disk [duplicate] 【发布时间】:2017-09-18 17:58:48 【问题描述】:

我有一个用例,我想迭代地将数据加载到 Pandas 数据帧中,使用外部函数(即 xgboost,示例代码中未显示)进行一些处理,然后将结果推送到单个 PySpark 对象(RDD 或DF)。

在将数据存储为 RDD 或 Dataframe 时,我尝试让 PySpark 溢出到磁盘,同样,源是 Pandas DataFrame。似乎没有任何工作,我一直在崩溃 Java 驱动程序,我无法加载我的数据。或者,我尝试加载我的数据而不使用基本的 textFile RDD 进行处理,它就像一个魅力。我想知道这是否是 PySpark 错误,或者是否有解决方法。

示例代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import pyspark

try:
    SparkContext.stop(sc)
except NameError:
    1

SparkContext.setSystemProperty('spark.executor.memory', '200g')
SparkContext.setSystemProperty('spark.driver.memory', '200g')
sc = SparkContext("local", "App Name")
sql_sc = SQLContext(sc)

chunk_100k = pd.read_csv("CData.csv", chunksize=100000)
empty_df = pd.read_csv("CData.csv", nrows=0)
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('')
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema

SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema)

for chunk in chunk_100k:
    SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema))

几次迭代后崩溃:

Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile。 : java.lang.OutOfMemoryError: Java 堆空间

工作直接加载到 RDD 代码:

my_rdd = sc.textFile("CData.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1]))

更新:

我已更改代码以演示加载到 Spark DataFrames 而不是 RDDs 时失败,请注意问题仍然存在并且错误消息仍然引用 RDDs。 在更改示例代码之前,发现在使用 'parallelize' 时保存到 RDD 至少存在问题,原因如下:

Why does SparkContext.parallelize use memory of the driver?

【问题讨论】:

@zero323 我已更改为上下文以消除使用并行化时的问题。 仍然是完全相同的问题 (SparkSession.createDataFrame -> SparkSession._createFromLocal -> SparkContext.parallelize) 和同样的失败原因。从本地对象创建分布式数据结构并不是要走的路。如果您想以可扩展的方式加载数据,请使用 Spark csv 阅读器。 除了使用 Spark csv 阅读器之外没有其他方法吗?我想在溢出到磁盘而不是 csv 时从 pandas 中读取。将文件从 pandas 写入磁盘然后重新加载到 Spark 中是一个额外的步骤。 【参考方案1】:

在 apache-spark/1.5.1/libexec/conf/ 的文件中创建一个 spark-defaults.conf 文件,并在其中添加以下行: spark.driver.memory 45G spark.driver.maxResultSize 10G

【讨论】:

以上是关于Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据框显示错误的值

pickle.loads 给出“模块”对象在 Pyspark Pandas Udf 中没有属性“<ClassName>”

Pandas UDF (PySpark) - 不正确的类型错误

有没有办法像 Pyspark 显示数据框一样打印 Pandas 数据框?

PySpark pandas_udfs java.lang.IllegalArgumentException错误

Pandas对Pyspark函数的迭代。