在scala中调用collect()函数时出现异常

Posted

技术标签:

【中文标题】在scala中调用collect()函数时出现异常【英文标题】:Exception when calling collect() function in scala 【发布时间】:2018-01-04 10:44:40 【问题描述】:

我试图编写自定义代码来比较源架构 (SAS) 和目标架构 (Hive) 的数据类型。在 SAS 中,我们有不同的数据类型。例如对于日期时间,数据类型定义为Num,格式定义为DateTime20.(例如)。在 hive 中,此数据类型等效于 Timestamp

因此,我的源架构文件如下所示:source.csv 文件

S_No,Variable,Type,Len,Format,Informat
6,EMP_HOURS,Num,8,15.2,15.1
4,EMP_NAME,Char,50,,
1,DATETIME,Num,8,DATETIME20.,DATETIME20.
5,HEADER_ROW_COUNT,Num,8,,
2,LOAD_DATETIME,Num,8,DATETIME20.,DATETIME20.
3,SOURCE_BANK,Char,1,,

SASToHiveMappings.csv 文件

Num,Double,Double
Num,DateTime,Timestamp
Num, ,Integer
Char, ,String

我在下面定义了一个自定义函数

def _getHiveTypeMapping(dataType: String, dataFormat: String) : String = 
    val sasToHiveMappingLocation = "s3a://abc/SASToHiveMappings.csv"
    val mappings = sc.textFile(sasToHiveMappingLocation)
    var definedType=""
    try  
         if(dataFormat.toUpperCase.contains("DATETIME")) definedType="datetime" 
         else if(dataFormat.toDouble.getClass.getName == "double")  definedType="Double" 
         else  definedType="Unknown" 
         
    catch  
         case _: Throwable => definedType="Unknown"
         

    if (definedType=="" || definedType=="Unknown" ) definedType=dataFormat
    else definedType=definedType
    try         
     val atype=mappings.map(x => x.split(",")).filter(x => x(0).toUpperCase.contains(dataType.toUpperCase)).filter(x => x(1).toUpperCase.contains(definedType.toUpperCase)).take(1).map(_(2))
     if (atype.size >0) atype(0) else ""
    
    catch 
        case e: Exception => e.getMessage.toString
    

现在当我运行以下命令时,它给了我正确的结果。

scala> rows.map(x => x.split(",")).map(y => (y(1),y(2),y(4))).take(6).map  case (a,b,c) => (a,_getHiveTypeMapping(b,c)) 
res196: Array[(String, String)] = Array((EMP_HOURS,Double), (EMP_NAME,String), (DATETIME,Timestamp), (HEADER_ROW_COUNT,Integer), (LOAD_DATETIME,Timestamp), (SOURCE_BANK,String))

但是当我删除中间的take(6) 并尝试运行collect() 函数时,我得到NullPointerException。我不知道为什么我会得到这个。 即

scala> rows.map(x => x.split(",")).map(y => (y(1),y(2),y(4))).map  case (a,b,c) => (a,_getHiveTypeMapping(b,c)) .collect()

例外是:

    18/01/04 10:42:13 WARN TaskSetManager: Lost task 1.0 in stage 267.0 (TID 313, localhost, executor driver): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 1 times, most recent failure: Lost task 0.0 in stage 267.0 (TID 312, localhost, executor driver): java.lang.NullPointerException
        at _getHiveTypeMapping(<console>:33)
        at $anonfun$3.apply(<console>:42)
        at $anonfun$3.apply(<console>:42)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  ... 48 elided
Caused by: java.lang.NullPointerException
  at _getHiveTypeMapping(<console>:33)
  at $anonfun$3.apply(<console>:42)
  at $anonfun$3.apply(<console>:42)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  at scala.collection.AbstractIterator.to(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
`

请你帮忙,因为我有点迷茫为什么会发生这种情况。

【问题讨论】:

【参考方案1】:

您在方法_getHiveTypeMapping 中使用了SparkContext。在您崩溃的代码中,您在RDD 上的map 操作中应用_getHiveTypeMapping。该代码将在执行程序上执行,而不是在驱动程序中执行。 SparkContext 是驱动程序的一部分。您不能在执行器上运行的代码中使用它。

【讨论】:

我尝试在函数_getHiveTypeMapping 之外使用SparkContext。现在,当我运行 collect() 函数时,我得到低于 scala&gt; rows.map(x =&gt; x.split(",")).map(y =&gt; (y(1),y(2),y(4))).map case (a,b,c) =&gt; (a,_getHiveTypeMapping(b,c)) .collect() res213: Array[(String, String)] = Array((EMP_HOURS,This RDD lacks a SparkContext. ... 让我换种说法:你不应该在 RDD 的转换中使用 SparkContext 或任何 RDD

以上是关于在scala中调用collect()函数时出现异常的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark 连接到 BigTable 时出现 Jetty ALPN/NPN 异常,scala 代码

使用omnipay 通过securepay 处理时出现无效指纹异常

处理事务时出现 VM 异常:还原错误

为啥我在删除 char* 时出现内存异常

用于不可序列化的对象和函数的 Spark Scala 编程

在 Android N 中调用位置时出现空点异常。 [重复]