Spark RDD 抛出 NullPointerException

Posted

技术标签:

【中文标题】Spark RDD 抛出 NullPointerException【英文标题】:Spark RDD throwing NullPointerException 【发布时间】:2016-05-05 06:12:05 【问题描述】:

当我尝试从 hive 表中获取一些产品并在 spark 中处理/应用 rools 时,我遇到了问题。

//function which return products from Hive table
def getProductsList(hiveContext: org.apache.spark.sql.hive.HiveContext): scala.collection.mutable.MutableList[Product] = 
        val products = scala.collection.mutable.MutableList[Product]()      
                val results = hiveContext.sql("select item_id,value from  details where  type_id=12");
        val collection = results.collect();
        var i = 0;
        results.collect.foreach(t => 
          val product = new Product(collection(i)(0).asInstanceOf[Long], collection(i)(1).asInstanceOf[String]); 
          i = i+ 1;
          products += product
        )    
        products 
      

调用 getProductsList 函数并在产品上应用drools rools。

    val randomProducts = this.getProductsList(hiveContext)
        val rdd = ssc.sparkContext.parallelize(randomProducts)         
        val evaluatedProducts = rdd.mapPartitions(incomingProducts =>      
  print("Hello"); 
    rulesExecutor.evalRules(incomingProducts) )
        val productdf = hiveContext.applySchema(evaluatedProducts, classOf[Product])
    )

如上所示 rdd mapPartitions 迭代没有发生,它抛出以下错误。但我确信 rdd 不是空的。

Exception in thread "main" java.lang.NullPointerException
        at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
        at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
        at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
        at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:995)
        at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:488)
        at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:1028)
        at com.cloudera.sprue.ValidateEan$.main(ValidateEan.scala:70)
        at com.cloudera.sprue.ValidateEan.main(ValidateEan.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/05/05 07:44:48 INFO SparkContext: Invoking stop() from shutdown hook

请帮我解决这个问题。

【问题讨论】:

您使用的是什么版本的 Spark? applySchema 目前已弃用,您应该使用 .toDF()sqlContext.createDataFrame 谢谢丹尼尔。我也尝试使用 createDataFrame 但我面临同样的错误。请参阅下面的代码我是如何使用的。 val productdf= hiveContext.createDataFrame(rdd, classOf[Product]); 也许您可以尝试hiveContext.createDataFrame(rdd) 或将产品映射到元组:rdd.map( p: Product => (p.getVal1, p.getVal2) ).toDF("col1", "col2") 【参考方案1】:

由于我们需要DataFrame的最终结果,所以我们使用从hiveContext.sql()返回的SchemaRDD。

//defining schema
case class Product(id: Long, value: String)

//loading data from Hive table
val results: DataSet[Row] = hiveContext.sql("select item_id,value from  details where  type_id=12")

//convert ROW type to Product type then pass it to rulesExecutor.evalRules()
val evaluatedProducts = results.map(productRow => rulesExecutor.evalRules(Product(productRow.getLong(0), productRow.getString(1)))).toDF()

我假设rulesExecutor.evalRules() 将接受Product 类型。如果不是,我们可以使用Row 类型(无需在map() 中显式转换)。

【讨论】:

以上是关于Spark RDD 抛出 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

joda DateTime 格式导致 Spark RDD 函数中的空指针错误

PySpark 连接两个 RDD 导致一个空 RDD

ValueError: RDD 为空-- Pyspark (Windows Standalone)

如何查看spark中rdd的内容

Spark核心-RDD

Spark RDD整理