Pyspark在使用大量列保存数据框时遇到问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark在使用大量列保存数据框时遇到问题相关的知识,希望对你有一定的参考价值。
在Hortonworks集群上通过Jupyter笔记本使用Pyspark 1.6.2处理以下步骤时,我们遇到了一个奇怪的情况:
- 从pyspark数据帧中的ORC表读取数据
- 由
pivot_column
(pivoted_df
)转动此表 - 在特定选择的
pivoted_df
上添加一些计算列:calced_df = pivoted_df.select(dependency_list).run_calculations()
- 内部加入大表
pivoted_df
(列> 1.600)和“小”表calced_df
(仅约270列)联合所有列 - 保存到木地板桌子
(在步骤3中,选择是必要的,否则使用withColumn
语句添加一些计算字段需要很长时间。在具有大量列的表上,选择+ Join比withColumn
更快)
但是,对于pivot_column
变化少于2.500的数据集,工作正常。例如,我们从75.430.000行和1.600列开始成功处理作业。当我们使用另一个包含较少行(50.000)和更多列(2.433)的数据集处理作业时,它也正常工作。
但最后这项工作在最后一步崩溃,因为pivot_column
有超过2500种变化(并且只有大约70,000行),并且存在Stackoverflow错误。我们使用一些show()
操作调试了单个步骤,以检查作业失败的确切位置。我们发现,一切正常,直到第4步中的Join。因此,连接导致了问题,因为这一步我们得到以下消息:
Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
at scala.collection.AbstractTraversable.to(Traversable.scala:105)
at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
但究竟是什么导致了这个错误,我们怎么能避免它呢?
我们当前的Spark配置:
.setMaster("yarn-client")
.set("spark.executor.instances","10")
.set("spark.executor.cores","4")
.set("spark.executor.memory","10g")
.set("spark.driver.memory","8g")
.set("spark.yarn.executor.memoryOverhead","1200")
.set("spark.sql.pivotMaxValues", "6000")
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
提前谢谢了!
在压缩和保存之前,执行程序将保存到镶木地板的数据保存为镶嵌格式(在spark 1.6中 - 在2.x中更改,请参见https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) - 您可以增加分区数(spark.sql.shuffle.partitions)或执行者的记忆
请注意,python占用了spark执行器的一些内存(spark.python.worker.memory设置控制它)
您可以帮助的另一件事是在保存到镶木地板之前对数据进行排序
以上是关于Pyspark在使用大量列保存数据框时遇到问题的主要内容,如果未能解决你的问题,请参考以下文章
使用架构详细信息创建数据框时 Dataproc 上的 Pyspark 错误
在 Pyspark 中从 Rest Api 创建数据框时出错