Pyspark 无法保存包含大量列的数据框

Posted

技术标签:

【中文标题】Pyspark 无法保存包含大量列的数据框【英文标题】:Pyspark trouble saving dataframe with a lot of columns 【发布时间】:2018-08-06 10:12:54 【问题描述】:

在 Hortonworks 集群上通过 Jupyter notebook 使用 Pyspark 1.6.2 处理以下步骤时,我们遇到了一个奇怪的情况:

    从 pyspark 数据帧中的 ORC 表中读取数据 通过pivot_column (pivoted_df) 透视此表 在pivoted_df 的特定选择上添加一些计算列: calced_df = pivoted_df.select(dependency_list).run_calculations() 在大表 pivoted_df(列 > 1.600)和“小”表 calced_df(仅 ~270 列)上进行内连接以合并所有列 保存到 Parquet 表

(在第 3 步中选择是必要的,因为否则使用 withColumn 语句添加一些计算字段将需要很长时间。在有很多列的表上,选择 + 连接比 withColumn 更快)

但是,对于 pivot_column 的变化少于 2.500 个的数据集,这项工作运行良好。例如,我们成功地处理了从 75.430.000 行和 1.600 列开始的作业。当我们使用另一个数据集处理作业时,包含更少的行 (50.000) 和更多的列 (2.433),它也可以工作。

但最终作业在最后一步崩溃,因为pivot_column 有超过 2500 个变体(并且只有大约 70.000 行),并带有 *** 错误。我们使用一些show() 操作调试了单个步骤,以检查作业失败的确切位置。我们发现,直到第 4 步中的加入,一切正常。所以加入导致了问题,从这一步开始,我们收到以下消息:

Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.***Error
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
    at scala.collection.AbstractTraversable.to(Traversable.scala:105)
    at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
    at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

但是究竟是什么导致了这个错误,我们该如何避免呢?

我们当前的 Spark 配置:

.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

非常感谢!

【问题讨论】:

Spark 非常适合高表(很多行),但它不能很好地扩展到宽表(很多列)。您可能会遇到系统可以处理的限制 - 请参阅this comment。 更多讨论在this question的cmets中。在实践中,我在这些情况下看到的做法是将宽表分解为多个较薄的表。然后当你需要使用它时,使用主键进行连接。 【参考方案1】:

执行器在压缩和保存之前将保存为 parquet 的数据保存为行格式(在 spark 1.6 中 - 在 2.x 中更改,请参阅https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) - 您可以增加分区数(spark.sql. shuffle.partitions) 或执行者的内存

请注意,python 会为 spark 执行器占用一些内存(由 spark.python.worker.memory 设置控制)

您可以帮助的另一件事是在保存到镶木地板之前对数据进行排序

【讨论】:

我们已经尝试在将数据保存到 Parquet 之前对其进行排序,不幸的是,这并没有解决问题(而且我们已经尝试将数据保存为 ORC 格式)...现在我们首先尝试将 spark.sql.shuffle.paritions 的数量增加到 400,然后增加到 600,但这没有任何区别。然后我们尝试了更多 exectuor.memory (20g)(和 400 个分区) - 再次出现同样的错误。我们还在作业中实现了一些 cache() 步骤(例如在 pivot 步骤之后),并发现当我们在 cache() 之后执行 show() 时,这会导致相同的错误... 我们使用较少的列数(~500)进行旋转,但在我们的数据大小上,我们必须使用 2000-4000 个分区来避免 OOM

以上是关于Pyspark 无法保存包含大量列的数据框的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据框显示错误的值

Pyspark在使用大量列保存数据框时遇到问题

如何更改pyspark数据框中列的顺序?

在pyspark中创建带有arraytype列的数据框

如何从 pyspark 数据框中更快地保存 csv 文件?

无法将数据框保存到镶木地板 pyspark