Pyspark 无法保存包含大量列的数据框
Posted
技术标签:
【中文标题】Pyspark 无法保存包含大量列的数据框【英文标题】:Pyspark trouble saving dataframe with a lot of columns 【发布时间】:2018-08-06 10:12:54 【问题描述】:在 Hortonworks 集群上通过 Jupyter notebook 使用 Pyspark 1.6.2 处理以下步骤时,我们遇到了一个奇怪的情况:
-
从 pyspark 数据帧中的 ORC 表中读取数据
通过
pivot_column
(pivoted_df
) 透视此表
在pivoted_df
的特定选择上添加一些计算列:
calced_df = pivoted_df.select(dependency_list).run_calculations()
在大表 pivoted_df
(列 > 1.600)和“小”表 calced_df
(仅 ~270 列)上进行内连接以合并所有列
保存到 Parquet 表
(在第 3 步中选择是必要的,因为否则使用 withColumn
语句添加一些计算字段将需要很长时间。在有很多列的表上,选择 + 连接比 withColumn
更快)
但是,对于 pivot_column
的变化少于 2.500 个的数据集,这项工作运行良好。例如,我们成功地处理了从 75.430.000 行和 1.600 列开始的作业。当我们使用另一个数据集处理作业时,包含更少的行 (50.000) 和更多的列 (2.433),它也可以工作。
但最终作业在最后一步崩溃,因为pivot_column
有超过 2500 个变体(并且只有大约 70.000 行),并带有 *** 错误。我们使用一些show()
操作调试了单个步骤,以检查作业失败的确切位置。我们发现,直到第 4 步中的加入,一切正常。所以加入导致了问题,从这一步开始,我们收到以下消息:
Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.***Error
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
at scala.collection.AbstractTraversable.to(Traversable.scala:105)
at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
但是究竟是什么导致了这个错误,我们该如何避免呢?
我们当前的 Spark 配置:
.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
非常感谢!
【问题讨论】:
Spark 非常适合高表(很多行),但它不能很好地扩展到宽表(很多列)。您可能会遇到系统可以处理的限制 - 请参阅this comment。 更多讨论在this question的cmets中。在实践中,我在这些情况下看到的做法是将宽表分解为多个较薄的表。然后当你需要使用它时,使用主键进行连接。 【参考方案1】:执行器在压缩和保存之前将保存为 parquet 的数据保存为行格式(在 spark 1.6 中 - 在 2.x 中更改,请参阅https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) - 您可以增加分区数(spark.sql. shuffle.partitions) 或执行者的内存
请注意,python 会为 spark 执行器占用一些内存(由 spark.python.worker.memory 设置控制)
您可以帮助的另一件事是在保存到镶木地板之前对数据进行排序
【讨论】:
我们已经尝试在将数据保存到 Parquet 之前对其进行排序,不幸的是,这并没有解决问题(而且我们已经尝试将数据保存为 ORC 格式)...现在我们首先尝试将 spark.sql.shuffle.paritions 的数量增加到 400,然后增加到 600,但这没有任何区别。然后我们尝试了更多 exectuor.memory (20g)(和 400 个分区) - 再次出现同样的错误。我们还在作业中实现了一些 cache() 步骤(例如在 pivot 步骤之后),并发现当我们在 cache() 之后执行 show() 时,这会导致相同的错误... 我们使用较少的列数(~500)进行旋转,但在我们的数据大小上,我们必须使用 2000-4000 个分区来避免 OOM以上是关于Pyspark 无法保存包含大量列的数据框的主要内容,如果未能解决你的问题,请参考以下文章