在scala./Spark代码中获取NullPointerException
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在scala./Spark代码中获取NullPointerException相关的知识,希望对你有一定的参考价值。
我正在使用scala在spark中读取CSV文件。在CSV文件中,我在第10行得到null(比方说)。所以我的代码在这一行抛出了nullpointerexception。所以它不打印下一个记录。以下是我的代码:
import org.apache.spark.sql.SparkSession
import java.lang.Long
object HighestGDP {
def main(args:Array[String]){
val spark = SparkSession.builder().appName("GDP").master("local").getOrCreate()
val data = spark.read.csv("D:\BGH\Spark\World_Bank_Indicators.csv").rdd
val result = data.filter(line=>line.getString(1).substring(4,8).equals("2009")||line.getString(1).substring(4,8).equals("2010"))
result.foreach(println)
var gdp2009 = result.filter(rec=>rec.getString(1).substring(4,8).equals("2009"))
.map{line=>{
var GDP= 0L
if(line.getString(19).equals(null))
GDP=0L
else
GDP= line.getString(19).replaceAll(",", "").toLong
(line.getString(0),GDP)
}}
gdp2009.foreach(println)
result.foreach(println)
}
}
那么有什么方法可以将值设置为0,其中value为null。我尝试了如果其他但仍然无法正常工作。错误:
18/03/06 22:56:01 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1208 bytes result sent to driver
18/03/06 22:56:01 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 297 ms on localhost (1/1)
18/03/06 22:56:01 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/03/06 22:56:01 INFO DAGScheduler: ResultStage 1 (foreach at HighestGDP.scala:12) finished in 0.297 s
18/03/06 22:56:01 INFO DAGScheduler: Job 1 finished: foreach at HighestGDP.scala:12, took 0.346954 s
18/03/06 22:56:01 INFO SparkContext: Starting job: foreach at HighestGDP.scala:21
18/03/06 22:56:01 INFO DAGScheduler: Got job 2 (foreach at HighestGDP.scala:21) with 1 output partitions
18/03/06 22:56:01 INFO DAGScheduler: Final stage: ResultStage 2 (foreach at HighestGDP.scala:21)
18/03/06 22:56:01 INFO DAGScheduler: Parents of final stage: List()
18/03/06 22:56:01 INFO DAGScheduler: Missing parents: List()
18/03/06 22:56:01 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[12] at map at HighestGDP.scala:14), which has no missing parents
18/03/06 22:56:01 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 15.3 KB, free 355.2 MB)
18/03/06 22:56:01 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 7.1 KB, free 355.2 MB)
18/03/06 22:56:01 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 13.133.209.137:57085 (size: 7.1 KB, free: 355.5 MB)
18/03/06 22:56:01 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1012
18/03/06 22:56:01 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[12] at map at HighestGDP.scala:14)
18/03/06 22:56:01 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/03/06 22:56:01 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 5918 bytes)
18/03/06 22:56:01 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/03/06 22:56:01 INFO FileScanRDD: Reading File path: file:///D:/BGH/Spark/World_Bank_Indicators.csv, range: 0-260587, partition values: [empty row]
(Afghanistan,425)
(Albania,3796)
(Algeria,3952)
18/03/06 22:56:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NullPointerException
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/03/06 22:56:01 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/03/06 22:56:01 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
18/03/06 22:56:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
18/03/06 22:56:01 INFO TaskSchedulerImpl: Cancelling stage 2
18/03/06 22:56:01 INFO DAGScheduler: ResultStage 2 (foreach at HighestGDP.scala:21) failed in 0.046 s
18/03/06 22:56:01 INFO DAGScheduler: Job 2 failed: foreach at HighestGDP.scala:21, took 0.046961 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:894)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:892)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:892)
at HighestGDP$.main(HighestGDP.scala:21)
at HighestGDP.main(HighestGDP.scala)
Caused by: java.lang.NullPointerException
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:15)
at HighestGDP$$anonfun$3.apply(HighestGDP.scala:14)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/03/06 22:56:01 INFO SparkContext: Invoking stop() from shutdown hook
18/03/06 22:56:01 INFO SparkUI: Stopped Spark web UI at http://13.133.209.137:4040
18/03/06 22:56:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/03/06 22:56:01 INFO MemoryStore: MemoryStore cleared
18/03/06 22:56:01 INFO BlockManager: BlockManager stopped
18/03/06 22:56:01 INFO BlockManagerMaster: BlockManagerMaster stopped
18/03/06 22:56:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/03/06 22:56:01 INFO SparkContext: Successfully stopped SparkContext
18/03/06 22:56:01 INFO ShutdownHookManager: Shutdown hook called
18/03/06 22:56:01 INFO ShutdownHookManager: Deleting directory C:Userskumar.harshAppDataLocalTempspark-65330823-f67a-4a9d-acaf-42478e3b7109
答案
我想问题是line.getString(19).equals(null)
。如果line.getString(19)
返回null,则无法调用equals方法(这将导致NullPointerException)。而不是这个检查你应该使用line.getString(19) == null
。
还有一点提示。尽量避免在代码中设置spark-master固定。这将在以后引起问题。请参阅讨论:Spark job with explicit setMaster("local"), passed to spark-submit with YARN。
以上是关于在scala./Spark代码中获取NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章
如何在窗口 scala/spark 中使用 partitionBy 函数