使用 saveAsTextFile 引发 NullPointerException
Posted
技术标签:
【中文标题】使用 saveAsTextFile 引发 NullPointerException【英文标题】:Spark NullPointerException with saveAsTextFile 【发布时间】:2015-10-03 13:40:22 【问题描述】:我在尝试合并和保存 RDD 时遇到 NPE。
代码在本地工作,并且在 scala shell 中的集群上工作,但在将其作为作业提交到集群时抛出错误。
我尝试使用 take() 打印出来以查看 rdd 是否包含一些空数据,但这会引发相同的错误 - 痛苦,因为它在 shell 中工作正常。
我正在保存到 HDFS 并在变量中保存完整的 url 路径 - 模型在 MLLib 训练阶段使用这种方法保存得很好。
任何想法都非常感谢!
Scala 代码(整体预测函数):
//Load the Random Forest
val rfModel = RandomForestModel.load(sc, modelPath)
//Make the predictions - Here the label is the unique ID of the point
val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features)))
//Collect and save
println("Done Modelling, now saving preds")
val outP = rfPreds.coalesce(1,true).saveAsTextFile(outPreds)
println("Done Modelling, now saving coords")
val outC = coords.coalesce(1,true).saveAsTextFile(outCoords)
堆栈跟踪:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 40, XX.XX.XX.XX): java.lang.NullPointerException
at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
【问题讨论】:
GeoDist1.scala:340
有什么内容?
val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features)))
检查一个点是否没有特征。
会检查,但这能解释为什么它在集群 shell 中有效,但在集群提交时无效?
那是很多代码。在发布到 Stack Overflow 之前,请尝试将问题简化为最小示例。它有几个好处:1)它更容易回答,2)将来有人更有可能遇到同样的问题,3)90% 的时间你在尝试减少问题时找出问题所在问题。
【参考方案1】:
Spark 操作分为惰性转换和动作。
当在 RDD 上调用 action 时,会对 RDD 执行 惰性转换。 因此,当您执行转换时,它只是存储为要执行的操作。
saveAsTextFile
方法是一个action,map操作是transformation。
如果转换步骤有任何问题,它将在调用转换的操作级别步骤中显示为问题。
因此,您在映射操作期间可能会遇到问题,其中某些字段中存在空值,这很可能导致您的 NPE 问题。
【讨论】:
在这个答案中添加以下内容:coalesce with 1 partition 不是一个好主意或好习惯,因为您试图将所有数据移动到一个分区中,这将使驱动程序不堪重负并可能导致 OOME问题。 好的,谢谢,我会调查这些并回来 - 想我仍然有点不确定为什么集群 shell 工作 - 认为这些操作会具有相同的惰性和动作拆分? @Dusted 给出的信息,我们仍然无法回答为什么它在 shell 模式下而不是在集群模式下工作。您需要提供有关如何提交应用、集群配置等的更多信息。 我已经使用 ec2 脚本部署了集群 - 一切标准,spark 1.5.0,hadoop 1x,linux ami's。数据复制到 hdfs(模型也保存在上一步中)。应用程序是用 scala 编写的 - 在本地打包并 scp 到服务器,然后使用以下命令运行:` ./spark/bin/spark-submit --class "GeoDistPredict1" --master spark://ec2-XX-XX-XX- XX.eu-west-1.compute.amazonaws.com:7077 Geo-Pred_2.10-1.0.jar `。当我从 /bin 运行 scala shell 时,我只是将这些行直接粘贴进去。我几乎是 Spark 的菜鸟(正如你所知道的那样),所以不确定我还能提供哪些其他信息 - 但请告诉我。 @Dusted :您是否在本地和生产环境的相同数据上运行?还是在本地小数据和集群大数据上?以上是关于使用 saveAsTextFile 引发 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章
Spark&Scala:saveAsTextFile()异常
Spark saveAsTextFile写入空文件 - _ $ folder $到S3