将小 PySpark DataFrame 写入镶木地板时出现内存错误

Posted

技术标签:

【中文标题】将小 PySpark DataFrame 写入镶木地板时出现内存错误【英文标题】:Memory error writing small PySpark DataFrame to parquet 【发布时间】:2021-04-01 17:38:59 【问题描述】:

我在使用 PySpark 保存转换后的 Spark DataFrame 时遇到问题。

我的原始数据被拆分为 90 个 csv.gz 文件,生成了一个 500MM 行的 DataFrame。如果我尝试 write.parquet() 它而不进行转换,我会成功。但是在应用了一些实际上将 DF 大小减少到 300k 行的过滤器后,我没有这样做。

更有趣的是,如果我添加一个最终过滤器以将其进一步减少到 10k 行,那么 write.parquet() 可以工作。

我想了解当 500MM 没有遇到任何问题时,为什么我会在 300k 行 DF 中出现 OOM 错误,以及可以做些什么来解决这个问题。

代码:

spark = SparkSession \
  .builder \
  .master("local[*]")\
  .appName("Python Spark SQL basic example") \
  .config("spark.driver.memory", "4G") \
  .getOrCreate()

path = r'files'
file_list = g.glob(os.path.join(path, "*.gz")) 

df = spark.read.option("header", "true").csv(file_list)
df.write.parquet("out/originalDF.parquet")                  #500MM rows SUCCESS!

#some transformations here: 5 new columns, filter rows

df.write.parquet("out/intermediateDF.parquet")              #300k rows FAILED!

df = df.filter(col('symbol')=='foo')
df.write.parquet("out/finalDF.parquet")                     #10k rows SUCCESS!

错误:

Py4JJavaError: An error occurred while calling o162.parquet.
: org.apache.spark.SparkException: Job aborted.
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 48 in stage 4.0 failed 1 times, most recent failure: 
Lost task 48.0 in stage 4.0 (TID 228) (DESKTOP-DASDF23.home executor driver):
java.lang.OutOfMemoryError: Java heap space

【问题讨论】:

【参考方案1】:

我很确定 .config("spark.driver.memory", "4G") 不能在 Spark 应用程序本身中设置。为时已晚,因为应用程序已开始设置内存分配。

试试这样,例如:$ ./bin/spark-shell --driver-memory 4g。或者更高的价值。

【讨论】:

以上是关于将小 PySpark DataFrame 写入镶木地板时出现内存错误的主要内容,如果未能解决你的问题,请参考以下文章

如何将 pyspark-dataframe 写入红移?

如何将大型 Pyspark DataFrame 写入 DynamoDB

将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时

将 Pyspark DataFrame 写入 Parquet 时出现 Py4JJavaError

Pyspark - 将数据帧写入 2 个不同的 csv 文件

用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集