scala映射函数中的“NotSerializableException”

Posted

技术标签:

【中文标题】scala映射函数中的“NotSerializableException”【英文标题】:"NotSerializableException " in scala map function 【发布时间】:2020-05-27 23:52:19 【问题描述】:

我正在读取文件并尝试使用函数映射值。但它给出了一个错误 NotSerializableException 下面是我正在运行的代码:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures 

  def parseLine(line: String) = 
    val fields = line.split(",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
    (stationID, entryType, temperature)
  

  /** Our main function where the action happens */
  def main(args: Array[String]) 

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(parseLine)


当我运行上面的代码时,它给了我以下错误:

使用 Spark 的默认 log4j 配置文件: org/apache/spark/log4j-defaults.properties 线程“主”中的异常 org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2326) 在 org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:363) 在 org.apache.spark.rdd.RDD.map(RDD.scala:370) 在 com.sundogsoftware.spark.MinTemperatures$.main(MinTemperatures.scala:32) 在 com.sundogsoftware.spark.MinTemperatures.main(MinTemperatures.scala)

引起:java.io.NotSerializableException:

com.sundogsoftware.spark.MinTemperatures$ 序列化堆栈: - 对象不可序列化(类:com.sundogsoftware.spark.MinTemperatures$,值: com.sundogsoftware.spark.MinTemperatures$@41fed14f) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 1) - 字段(类:java.lang.invoke.SerializedLambda,名称:capturedArgs,类型:类 [Ljava.lang.Object;) - 对象(类 java.lang.invoke.SerializedLambda,SerializedLambda[capturingClass=class com.sundogsoftware.spark.MinTemperatures$, functionInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, 实现=调用静态 com/sundogsoftware/spark/MinTemperatures$.$anonfun$main$1:(Lcom/sundogsoftware/spark/MinTemperatures$;Ljava/lang/String;)Lscala/Tuple3;, instantiatedMethodType=(Ljava/lang/String;)Lscala/Tuple3;, numCaptured=1]) - writeReplace 数据(类:java.lang.invoke.SerializedLambda)

但是当我使用匿名函数运行相同的代码时,它运行成功:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures 

  /** Our main function where the action happens */
  def main(args: Array[String]) 

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(x => 
      val fields = x.split(",");
      val stationID = fields(0);
      val entryType = fields(2);
      val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
      (stationID, entryType, temperature)
    )

    // Filter out all but TMIN entries
    val minTemps = parsedLines.filter(x => x._2 == "TMIN")

    // Convert to (stationID, temperature)
    val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))

    // Reduce by stationID retaining the minimum temperature found
    val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))

    // Collect, format, and print the results
    val results = minTempsByStation.collect()

    for (result <- results.sorted) 
      val station = result._1
      val temp = result._2
      val formattedTemp = f"$temp%.2f F"
      println(s"$station minimum temperature: $formattedTemp")
    

  

输出:

EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F

正如你在上面看到的,当我在 map 中使用 named function (parseLine) 时,它会给出错误,但是当我在 map 中使用 anonymous 函数时,是同一个程序而不是命名函数正在成功运行

我查看了几个博客,但没有找到错误的原因。 谁能帮我理解这一点?

【问题讨论】:

将函数移动到另一个对象,它应该可以工作。顺便说一句,建议明确返回类型。 嗨@LuisMiguelMejíaSuárez。你的建议奏效了,谢谢。我是 scala 的新手,你能解释一下我的代码中的问题以及你建议的解决方案或任何理解它的参考吗? 这是 spark 工作原理的一部分。它需要序列化一些闭包(带有上下文的匿名函数)。这里整个MinTemperaturesparseLine 的上下文(它是它的父级),它被捕获以进行序列化。由于某种原因,它不能被序列化并且抛出这个异常。我没有使用火花,不能再说什么:( @tejas 正如 Scalway 所说,整个对象必须被序列化,并且它可能与记录器或主要或 Spark 内部的某些东西无关。因此,拥有一个具有所有这些功能的 utils 对象是很常见的。 @tejas 看看这个帖子***.com/questions/43169409/… 【参考方案1】:

这个问题似乎与 sbt 或依赖无关,正如我所检查的,当脚本未定义为对象时会发生这种情况(Scala 对象默认可序列化),因此此错误意味着脚本不可序列化。 我创建了一个新对象并粘贴了相同的代码。它奏效了。

【讨论】:

我在地图函数中尝试传递 JsonWriterSettings 时遇到了同样的问题。 我创建了一个新对象并粘贴了相同的代码。成功了

以上是关于scala映射函数中的“NotSerializableException”的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Scala 中的映射键中获取值的常见元素?

大数据Scala学习—列表 集与映射

大数据Scala学习—列表 集与映射

如何将折叠左侧运算符“:/”转换为scala中的foldLeft函数?

Scala:RDD映射中的任务不可序列化由json4s“隐式val格式= DefaultFormats”引起

Scala 中的嵌套默认映射