将小 PySpark DataFrame 写入镶木地板时出现内存错误
Posted
技术标签:
【中文标题】将小 PySpark DataFrame 写入镶木地板时出现内存错误【英文标题】:Memory error writing small PySpark DataFrame to parquet 【发布时间】:2021-04-01 17:38:59 【问题描述】:我在使用 PySpark 保存转换后的 Spark DataFrame 时遇到问题。
我的原始数据被拆分为 90 个 csv.gz 文件,生成了一个 500MM 行的 DataFrame。如果我尝试 write.parquet() 它而不进行转换,我会成功。但是在应用了一些实际上将 DF 大小减少到 300k 行的过滤器后,我没有这样做。
更有趣的是,如果我添加一个最终过滤器以将其进一步减少到 10k 行,那么 write.parquet() 可以工作。
我想了解当 500MM 没有遇到任何问题时,为什么我会在 300k 行 DF 中出现 OOM 错误,以及可以做些什么来解决这个问题。
代码:
spark = SparkSession \
.builder \
.master("local[*]")\
.appName("Python Spark SQL basic example") \
.config("spark.driver.memory", "4G") \
.getOrCreate()
path = r'files'
file_list = g.glob(os.path.join(path, "*.gz"))
df = spark.read.option("header", "true").csv(file_list)
df.write.parquet("out/originalDF.parquet") #500MM rows SUCCESS!
#some transformations here: 5 new columns, filter rows
df.write.parquet("out/intermediateDF.parquet") #300k rows FAILED!
df = df.filter(col('symbol')=='foo')
df.write.parquet("out/finalDF.parquet") #10k rows SUCCESS!
错误:
Py4JJavaError: An error occurred while calling o162.parquet.
: org.apache.spark.SparkException: Job aborted.
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 48 in stage 4.0 failed 1 times, most recent failure:
Lost task 48.0 in stage 4.0 (TID 228) (DESKTOP-DASDF23.home executor driver):
java.lang.OutOfMemoryError: Java heap space
【问题讨论】:
【参考方案1】:我很确定 .config("spark.driver.memory", "4G")
不能在 Spark 应用程序本身中设置。为时已晚,因为应用程序已开始设置内存分配。
试试这样,例如:$ ./bin/spark-shell --driver-memory 4g
。或者更高的价值。
【讨论】:
以上是关于将小 PySpark DataFrame 写入镶木地板时出现内存错误的主要内容,如果未能解决你的问题,请参考以下文章
如何将大型 Pyspark DataFrame 写入 DynamoDB
将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时
将 Pyspark DataFrame 写入 Parquet 时出现 Py4JJavaError