在 Amazon EMR 集群中运行时,Spark 广播变量返回 NullPointerException

Posted

技术标签:

【中文标题】在 Amazon EMR 集群中运行时,Spark 广播变量返回 NullPointerException【英文标题】:Spark broadcasted variable returns NullPointerException when run in Amazon EMR cluster 【发布时间】:2015-09-27 00:32:06 【问题描述】:

我通过广播共享的变量在集群中为空。

我的应用程序相当复杂,但是我编写了这个小例子,当我在本地运行它时可以完美运行,但在集群中却失败了:

package com.gonzalopezzi.bigdata.bicing

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext, SparkConf

object PruebaBroadcast2 extends App 
  val conf = new SparkConf().setAppName("PruebaBroadcast2")
  val sc = new SparkContext(conf)

  val arr : Array[Int] = (6 to 9).toArray
  val broadcasted = sc.broadcast(arr)

  val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
  rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)  // NullPointerException in the flatmap. broadcasted is null


我不知道问题是编码错误还是配置问题。

这是我得到的堆栈跟踪:

15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Command exiting with ret '1'

谁能帮我解决这个问题? 至少,你能告诉我你是否在代码中看到了奇怪的东西吗? 如果您认为代码没问题,请告诉我,因为这意味着问题出在集群的配置中。

提前致谢。

【问题讨论】:

【参考方案1】:

我终于让它工作了。

这样声明对象是行不通的:

object MyObject extends App 

但是,如果你声明一个带有 main 函数的对象:

object MyObject 
    def main (args : Array[String]) 
    /* ... */
    

所以,如果我以这种方式重写问题中的简短示例:

object PruebaBroadcast2 

  def main (args: Array[String]) 
    val conf = new SparkConf().setAppName("PruebaBroadcast2")
    val sc = new SparkContext(conf)

    val arr : Array[Int] = (6 to 9).toArray
    val broadcasted = sc.broadcast(arr)

    val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)

    rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
  

这个问题似乎与这个错误有关: https://issues.apache.org/jira/browse/SPARK-4170

【讨论】:

错误状态为“已修复”,但我似乎仍然遇到同样的问题 (cdh 5.5.2) 该错误状态为“已修复”,但该修复仅显示警告:“scala.App 的子类可能无法正常工作。请改用 main() 方法。” 这是一个技巧,但在美学上我更喜欢一点,你可以这样做object PruebaBroadcast2 extends App /* your code */ 【参考方案2】:

我有类似的问题。问题是我有一个变量,并在 RDD 映射函数中使用它,我得到了空值。这是我的原始代码:

object MyClass extends App 
    ...
    val prefix = "prefix" 
    val newRDD = inputRDD.map(s => prefix + s) // got null for prefix
    ...

我发现它适用于任何函数,而不仅仅是 ma​​in()

object MyClass extends App 
    ...
    val prefix = "prefix" 
    val newRDD = addPrefix(input, prefix)
    def addPrefix(input: RDD[String], prefix: String): RDD[String] = 
        inputRDD.map(s => prefix + s)
    

【讨论】:

以上是关于在 Amazon EMR 集群中运行时,Spark 广播变量返回 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

Spark在本地运行但在YARN中运行时找不到文件

如何在 Amazon EMR 上引导安装 Python 模块?

Amazon EMR:Pyspark 有奇怪的依赖问题

Amazon EMR 服务与 EMR 集群

markdown Amazon EMR上的Apache Spark

如何让 Zeppelin 在 EMR 集群上干净地重新启动?