在scala./Spark代码中获取NullPointerException

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在scala./Spark代码中获取NullPointerException相关的知识,希望对你有一定的参考价值。

我正在使用scala在spark中读取CSV文件。在CSV文件中,我在第10行得到null(比方说)。所以我的代码在这一行抛出了nullpointerexception。所以它不打印下一个记录。以下是我的代码:

import org.apache.spark.sql.SparkSession
import java.lang.Long


object HighestGDP {
   def main(args:Array[String]){
     val spark = SparkSession.builder().appName("GDP").master("local").getOrCreate()
     val data  = spark.read.csv("D:\BGH\Spark\World_Bank_Indicators.csv").rdd

     val result = data.filter(line=>line.getString(1).substring(4,8).equals("2009")||line.getString(1).substring(4,8).equals("2010"))

     result.foreach(println)
     var gdp2009 = result.filter(rec=>rec.getString(1).substring(4,8).equals("2009"))
     .map{line=>{
       var GDP= 0L
       if(line.getString(19).equals(null))
         GDP=0L
       else
         GDP= line.getString(19).replaceAll(",", "").toLong
       (line.getString(0),GDP)
     }}

     gdp2009.foreach(println)
     result.foreach(println)
   }
}

那么有什么方法可以将值设置为0,其中value为null。我尝试了如果其他但仍然无法正常工作。错误:

18/03/06 22:56:01 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1208 bytes result sent to driver
18/03/06 22:56:01 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 297 ms on localhost (1/1)
18/03/06 22:56:01 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
18/03/06 22:56:01 INFO DAGScheduler: ResultStage 1 (foreach at HighestGDP.scala:12) finished in 0.297 s
18/03/06 22:56:01 INFO DAGScheduler: Job 1 finished: foreach at HighestGDP.scala:12, took 0.346954 s
18/03/06 22:56:01 INFO SparkContext: Starting job: foreach at HighestGDP.scala:21
18/03/06 22:56:01 INFO DAGScheduler: Got job 2 (foreach at HighestGDP.scala:21) with 1 output partitions
18/03/06 22:56:01 INFO DAGScheduler: Final stage: ResultStage 2 (foreach at HighestGDP.scala:21)
18/03/06 22:56:01 INFO DAGScheduler: Parents of final stage: List()
18/03/06 22:56:01 INFO DAGScheduler: Missing parents: List()
18/03/06 22:56:01 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[12] at map at HighestGDP.scala:14), which has no missing parents
18/03/06 22:56:01 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 15.3 KB, free 355.2 MB)
18/03/06 22:56:01 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 7.1 KB, free 355.2 MB)
18/03/06 22:56:01 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 13.133.209.137:57085 (size: 7.1 KB, free: 355.5 MB)
18/03/06 22:56:01 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1012
18/03/06 22:56:01 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[12] at map at HighestGDP.scala:14)
18/03/06 22:56:01 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/03/06 22:56:01 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 5918 bytes)
18/03/06 22:56:01 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/03/06 22:56:01 INFO FileScanRDD: Reading File path: file:///D:/BGH/Spark/World_Bank_Indicators.csv, range: 0-260587, partition values: [empty row]
(Afghanistan,425)
(Albania,3796)
(Algeria,3952)
18/03/06 22:56:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NullPointerException
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
18/03/06 22:56:01 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

18/03/06 22:56:01 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
18/03/06 22:56:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
18/03/06 22:56:01 INFO TaskSchedulerImpl: Cancelling stage 2
18/03/06 22:56:01 INFO DAGScheduler: ResultStage 2 (foreach at HighestGDP.scala:21) failed in 0.046 s
18/03/06 22:56:01 INFO DAGScheduler: Job 2 failed: foreach at HighestGDP.scala:21, took 0.046961 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:894)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:892)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:892)
    at HighestGDP$.main(HighestGDP.scala:21)
    at HighestGDP.main(HighestGDP.scala)
Caused by: java.lang.NullPointerException
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
    at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
18/03/06 22:56:01 INFO SparkContext: Invoking stop() from shutdown hook
18/03/06 22:56:01 INFO SparkUI: Stopped Spark web UI at http://13.133.209.137:4040
18/03/06 22:56:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/03/06 22:56:01 INFO MemoryStore: MemoryStore cleared
18/03/06 22:56:01 INFO BlockManager: BlockManager stopped
18/03/06 22:56:01 INFO BlockManagerMaster: BlockManagerMaster stopped
18/03/06 22:56:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/03/06 22:56:01 INFO SparkContext: Successfully stopped SparkContext
18/03/06 22:56:01 INFO ShutdownHookManager: Shutdown hook called
18/03/06 22:56:01 INFO ShutdownHookManager: Deleting directory C:Userskumar.harshAppDataLocalTempspark-65330823-f67a-4a9d-acaf-42478e3b7109
答案

我想问题是line.getString(19).equals(null)。如果line.getString(19)返回null,则无法调用equals方法(这将导致NullPointerException)。而不是这个检查你应该使用line.getString(19) == null

还有一点提示。尽量避免在代码中设置spark-master固定。这将在以后引起问题。请参阅讨论:Spark job with explicit setMaster("local"), passed to spark-submit with YARN

以上是关于在scala./Spark代码中获取NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

如何在窗口 scala/spark 中使用 partitionBy 函数

使用 Scala/Spark 列出目录中的文件(包括文件信息)

Scala/Spark - 如何获取所有子数组的第一个元素

scala/spark 代码中不允许在配置单元中添加列

Scala:使用火花从 scylla 获取数据

来自 CLI 的 scala/spark 脚本