提交jar时的Scala-Spark NullPointerError,而不是在shell中

Posted

技术标签:

【中文标题】提交jar时的Scala-Spark NullPointerError,而不是在shell中【英文标题】:Scala-Spark NullPointerError when submitting jar, not in shell 【发布时间】:2016-07-19 22:23:09 【问题描述】:

我的 spark 作业引发了一个我无法追踪的空指针异常。当我打印潜在的空变量时,它们都填充在每个工人身上。我的数据不包含空值,因为相同的作业在 spark shell 中工作。作业的执行函数如下,后面跟着错误信息。

所有未在函数中定义的辅助方法都定义在 Spark 作业对象的主体中,所以我相信闭包不是问题。

override def execute(sc:SparkContext) = 
  def construct_query(targetTypes:List[String]) = Map("query" ->
    Map("nested" ->
      Map("path"->"annotations.entities.items",
        "query"-> Map("terms"->
          Map("annotations.entities.items.type"-> targetTypes)))))

  val sourceConfig = HashMap(
    "es.nodes" -> params.targetClientHost
  )

  // Base elastic search RDD returning articles which match the above query on entity types
  val rdd = EsSpark.esJsonRDD(sc,
    params.targetIndex,
    toJson(construct_query(params.entityTypes)),
    sourceConfig
  ).sample(false,params.sampleRate)

  // Mapping ES json into news article object, then extracting the entities list of
  // well defined annotations
  val objectsRDD = rdd.map(tuple => 
    val maybeArticle =
      try 
        Some(JavaJsonUtils.fromJson(tuple._2, classOf[SearchableNewsArticle]))
      catch 
        case e: Exception => None
      
    (tuple._1,maybeArticle)
  
  ).filter(tuple => tuple._2.isDefined && tuple._2.get.annotations.isDefined &&
    tuple._2.get.annotations.get.entities.isDefined).map(tuple => (tuple._1, tuple._2.get.annotations.get.entities.get))

  // flat map the RDD of entities lists into a list of (entity text, (entity type, 1)) tuples
  (line 79) val entityDataMap: RDD[(String, (String, Int))] = objectsRDD.flatMap(tuple => tuple._2.items.collect(
    case item if (item.`type`.isDefined) && (item.text.isDefined) &&
   (line 81)(params.entityTypes.contains(item.`type`.get))  => (cleanUpText(item.text.get), (item.`type`.get, 1))
  ))

  // bucketize the tuples RDD into entity text, List(entity_type, entity_count) to make count aggregation and file writeouts
 // easier to follow
 val finalResults: Array[(String, (String, Int))] = entityDataMap.reduceByKey((x, y) => (x._1, x._2+y._2)).collect()

  val entityTypeMapping = Map(
    "HealthCondition" -> "HEALTH_CONDITION",
    "Drug" -> "DRUG",
    "FieldTerminology" -> "FIELD_TERMINOLOGY"
  )

  for (finalTuple <- finalResults) 
    val entityText = finalTuple._1
    val entityType = finalTuple._2._1
    if(entityTypeMapping.contains(entityType))
    
                if(!Files.exists(Paths.get(entityTypeMapping.get(entityType).get+".txt")))
        val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),false)
        printToFile(myFile) p => p.println(entityTypeMapping.get(entityType))
      
    
    val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),true)
    printToFile(myFile) p => p.println(entityText)
  


以及下面的错误信息:

java.lang.NullPointerException 在 com.quid.gazetteers.GazetteerGenerator$$anonfun$4$$anonfun$apply$1.isDefinedAt(GazetteerGenerator.scala:81) 在 com.quid.gazetteers.GazetteerGenerator$$anonfun$4$$anonfun$apply$1.isDefinedAt(GazetteerGenerator.scala:79) 在 scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:27​​8) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.TraversableLike$class.collect(TraversableLike.scala:27​​8) 在 scala.collection.AbstractTraversable.collect(Traversable.scala:105) 在 com.quid.gazetteers.GazetteerGenerator$$anonfun$4.apply(GazetteerGenerator.scala:79) 在 com.quid.gazetteers.GazetteerGenerator$$anonfun$4.apply(GazetteerGenerator.scala:79) 在 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 在 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)

【问题讨论】:

【参考方案1】:

这个问题已经解决了。 params 属性未序列化,可用于 spark 工作人员。解决方案是在需要 params 属性的区域范围内形成一个 spark 广播变量。

【讨论】:

我们是否有代码 sn-p 说明它是如何解决的?

以上是关于提交jar时的Scala-Spark NullPointerError,而不是在shell中的主要内容,如果未能解决你的问题,请参考以下文章

scala-spark实现重分区和打印各个分区的data

服务器:[http-nio-8080-exec-7] org.apache.coyote.http11.Http11Processor.service 处理请求时出错 java.lang.NullPo

运行 .jar 时的 getResourceAsStream 文件路径

SSH框架整合jar包时的注意事项

Linux - 通过 shell 脚本运行 jar 文件时的 IO 异常

运行使用 maven 构建的 jar 时的 FlinkMLTools NoClassDef