Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]
Posted
技术标签:
【中文标题】Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]【英文标题】:Pandas to PySpark giving OOM error instead of spilling to disk [duplicate] 【发布时间】:2017-09-18 17:58:48 【问题描述】:我有一个用例,我想迭代地将数据加载到 Pandas 数据帧中,使用外部函数(即 xgboost,示例代码中未显示)进行一些处理,然后将结果推送到单个 PySpark 对象(RDD 或DF)。
在将数据存储为 RDD 或 Dataframe 时,我尝试让 PySpark 溢出到磁盘,同样,源是 Pandas DataFrame。似乎没有任何工作,我一直在崩溃 Java 驱动程序,我无法加载我的数据。或者,我尝试加载我的数据而不使用基本的 textFile RDD 进行处理,它就像一个魅力。我想知道这是否是 PySpark 错误,或者是否有解决方法。
示例代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import pyspark
try:
SparkContext.stop(sc)
except NameError:
1
SparkContext.setSystemProperty('spark.executor.memory', '200g')
SparkContext.setSystemProperty('spark.driver.memory', '200g')
sc = SparkContext("local", "App Name")
sql_sc = SQLContext(sc)
chunk_100k = pd.read_csv("CData.csv", chunksize=100000)
empty_df = pd.read_csv("CData.csv", nrows=0)
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('')
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema
SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema)
for chunk in chunk_100k:
SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema))
几次迭代后崩溃:
Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile。 : java.lang.OutOfMemoryError: Java 堆空间
工作直接加载到 RDD 代码:
my_rdd = sc.textFile("CData.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1]))
更新:
我已更改代码以演示加载到 Spark DataFrames 而不是 RDDs 时失败,请注意问题仍然存在并且错误消息仍然引用 RDDs。 在更改示例代码之前,发现在使用 'parallelize' 时保存到 RDD 至少存在问题,原因如下:
Why does SparkContext.parallelize use memory of the driver?
【问题讨论】:
@zero323 我已更改为上下文以消除使用并行化时的问题。 仍然是完全相同的问题 (SparkSession.createDataFrame
-> SparkSession._createFromLocal
-> SparkContext.parallelize
) 和同样的失败原因。从本地对象创建分布式数据结构并不是要走的路。如果您想以可扩展的方式加载数据,请使用 Spark csv 阅读器。
除了使用 Spark csv 阅读器之外没有其他方法吗?我想在溢出到磁盘而不是 csv 时从 pandas 中读取。将文件从 pandas 写入磁盘然后重新加载到 Spark 中是一个额外的步骤。
【参考方案1】:
在 apache-spark/1.5.1/libexec/conf/ 的文件中创建一个 spark-defaults.conf 文件,并在其中添加以下行: spark.driver.memory 45G spark.driver.maxResultSize 10G
【讨论】:
以上是关于Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]的主要内容,如果未能解决你的问题,请参考以下文章
pickle.loads 给出“模块”对象在 Pyspark Pandas Udf 中没有属性“<ClassName>”
Pandas UDF (PySpark) - 不正确的类型错误
有没有办法像 Pyspark 显示数据框一样打印 Pandas 数据框?