Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?

Posted

技术标签:

【中文标题】Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?【英文标题】:NullPointerException in Scala Spark, appears to be caused be collection type? 【发布时间】:2014-05-21 20:26:25 【问题描述】:

sessionIdList 的类型为:

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

当我尝试运行以下代码时:

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))

val kDistanceNeighbourhood = sessionIdList.map(s => 
    cartesianComp.filter(v => v != null)
)

kDistanceNeighbourhood.take(1)

我收到异常:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

但是,如果我使用:

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s =>     
    cartesianComp.filter(v => v != null)
)

kDistanceNeighbourhood.take(1)

那么就不显示异常了

两个代码sn-ps的区别在于第一个sn-p sessionIdList的类型是:

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

在第二个 sn-p "l" 是类型

scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12

为什么会出现这个错误?

我是否需要将 sessionIdList 转换为 ParallelCollectionRDD 才能解决此问题?

【问题讨论】:

你能让你的代码独立吗? @IvanVergiliev 除了填充的 ParallelCollectionRDD 之外的所有代码都包括在内以重新创建异常。我不知道如何创建一个填充的 ParallelCollectionRDD 【参考方案1】:

Spark 不支持 RDD 的嵌套(请参阅 https://***.com/a/14130534/590203 了解另一个相同问题的出现),因此您无法在其他 RDD 操作中对 RDD 执行转换或操作。

在第一种情况下,当工作人员尝试访问仅存在于驱动程序而非工作人员上的 SparkContext 对象时,您会看到工作人员抛出 NullPointerException。

在第二种情况下,我的直觉是这项工作是在本地驱动程序上运行的,纯粹是偶然的。

【讨论】:

如果使用RDD嵌套,是否保证会抛出NPE?问的原因是我正在使用笛卡尔方法,并且在 RDD 上运行的函数正在调用嵌套的其他函数,但它似乎可以正常工作。似乎这个问题只发生在嵌套 RDD 操作而不是嵌套函数调用? 不能保证,但我认为这可能是为了防止用户编写在local模式下运行良好但在集群上失败的程序,或者根据突发奇想成功或失败的程序计划任务的位置。 如果 Spark 能够检测到这种情况并提供更有用的错误消息,那就太好了。 我也有同样的问题,但是我没有使用任何 RDD 或任何映射函数。我正在使用火花数据框。如果 spark executor 死亡或被列入黑名单,因此 worker 无法访问 sparkcontext 对象,是否会发生此错误?【参考方案2】:

这是一个合理的问题,我已经听到它问了足够多次了。我将尝试解释为什么这是真的,因为它可能会有所帮助。

嵌套的 RDD总是在生产环境中抛出异常。我认为您在这里描述的嵌套函数调用,如果这意味着在 RDD 操作中调用 RDD 操作,也会导致失败,因为它实际上是同一件事。 (RDD 是不可变的,因此执行诸如“map”之类的 RDD 操作相当于创建一个新的 RDD。)创建嵌套 RDD 的能力是定义 RDD 的方式和 Spark 应用程序的方式的必然结果设置。

RDD 是分布在 Spark 执行器上的对象(称为分区)的集合。 Spark 执行器不能相互通信,只能与 Spark 驱动程序通信。 RDD 操作都是在这些分区上分段计算的。因为 RDD 的执行程序环境不是递归的(即您可以将 Spark 驱动程序配置为带有子执行程序的 spark 执行程序),RDD 也不能。

在您的程序中,您创建了整数分区的分布式集合。然后,您正在执行映射操作。当 Spark 驱动程序看到映射操作时,它将执行映射的指令发送给执行程序,执行程序在每个分区上并行执行转换。但是您的映射无法完成,因为在每个分区上您都试图调用“整个 RDD”来执行另一个分布式操作。这是不可能的,因为每个分区都无法访问其他分区上的信息,如果这样做,计算就无法并行运行。

你可以做的,因为你在地图中需要的数据可能很小(因为你正在做一个过滤器,并且过滤器不需要任何关于 sessionIdList 的信息)是首先过滤会话 ID 列表。然后将该列表收集给驱动程序。然后将其广播给执行者,您可以在地图中使用它。如果 sessionID 列表太大,您可能需要加入。

【讨论】:

以上是关于Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?的主要内容,如果未能解决你的问题,请参考以下文章

spark-submit 中的 scala.ScalaReflectionException

如何在 Spark 的 github 中查看 Functions.Scala 中的代码

Scala(Spark)连接数据框中的列[重复]

Spark 中的数据框比较:Scala

解释 Spark 中的聚合功能(使用 Python 和 Scala)

Spark 与 Scala 中的 ETL 过程