Pyspark在使用大量列保存数据框时遇到问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark在使用大量列保存数据框时遇到问题相关的知识,希望对你有一定的参考价值。

在Hortonworks集群上通过Jupyter笔记本使用Pyspark 1.6.2处理以下步骤时,我们遇到了一个奇怪的情况:

  1. 从pyspark数据帧中的ORC表读取数据
  2. pivot_columnpivoted_df)转动此表
  3. 在特定选择的pivoted_df上添加一些计算列:calced_df = pivoted_df.select(dependency_list).run_calculations()
  4. 内部加入大表pivoted_df(列> 1.600)和“小”表calced_df(仅约270列)联合所有列
  5. 保存到木地板桌子

(在步骤3中,选择是必要的,否则使用withColumn语句添加一些计算字段需要很长时间。在具有大量列的表上,选择+ Join比withColumn更快)

但是,对于pivot_column变化少于2.500的数据集,工作正常。例如,我们从75.430.000行和1.600列开始成功处理作业。当我们使用另一个包含较少行(50.000)和更多列(2.433)的数据集处理作业时,它也正常工作。

但最后这项工作在最后一步崩溃,因为pivot_column有超过2500种变化(并且只有大约70,000行),并且存在Stackoverflow错误。我们使用一些show()操作调试了单个步骤,以检查作业失败的确切位置。我们发现,一切正常,直到第4步中的Join。因此,连接导致了问题,因为这一步我们得到以下消息:

Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
    at scala.collection.AbstractTraversable.to(Traversable.scala:105)
    at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
    at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

但究竟是什么导致了这个错误,我们怎么能避免它呢?

我们当前的Spark配置:

.setMaster("yarn-client") 
.set("spark.executor.instances","10") 
.set("spark.executor.cores","4") 
.set("spark.executor.memory","10g") 
.set("spark.driver.memory","8g") 
.set("spark.yarn.executor.memoryOverhead","1200") 
.set("spark.sql.pivotMaxValues", "6000") 
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

提前谢谢了!

答案

在压缩和保存之前,执行程序将保存到镶木地板的数据保存为镶嵌格式(在spark 1.6中 - 在2.x中更改,请参见https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) - 您可以增加分区数(spark.sql.shuffle.partitions)或执行者的记忆

请注意,python占用了spark执行器的一些内存(spark.python.worker.memory设置控制它)

您可以帮助的另一件事是在保存到镶木地板之前对数据进行排序

以上是关于Pyspark在使用大量列保存数据框时遇到问题的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中连接两个数据框时避免列重复列名

读入火花数据框时如何从csv文件中删除列

使用架构详细信息创建数据框时 Dataproc 上的 Pyspark 错误

在 Pyspark 中从 Rest Api 创建数据框时出错

Pyspark - 将rdd转换为数据框时数据设置为null

在 PySpark 中加入 270 列