任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时

Posted

技术标签:

【中文标题】任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时【英文标题】:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects 【发布时间】:2014-04-30 19:55:44 【问题描述】:

在闭包外调用函数时出现奇怪的行为:

当函数在对象中时,一切正常 当函数在类中时:

任务不可序列化:java.io.NotSerializableException:测试

问题是我需要我的代码在一个类而不是一个对象中。知道为什么会这样吗? Scala 对象是否序列化(默认?)?

这是一个工作代码示例:

object working extends App 
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))

这是无效的示例:

object NOTworking extends App 
  new testing().doIT


//adding extends Serializable wont help
class testing   
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  

  def someFunc(a:Int) = a+1

【问题讨论】:

什么是 Spark.ctx?方法 ctx AFAICT 没有 Spark 对象 【参考方案1】:

RDDs extend the Serialisable interface,所以这不是导致您的任务失败的原因。现在这并不意味着您可以使用 Spark 序列化 RDD 并避免使用 NotSerializableException

Spark 是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),可以看作是一个分布式集合。基本上,RDD 的元素在集群的节点上进行分区,但 Spark 将其从用户那里抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地的一样。

不想涉及太多细节,但是当您在 RDD(mapflatMapfilter 等)上运行不同的转换时,您的转换代码(闭包)是:

    在驱动节点上序列化, 运送到集群中的适当节点, 反序列化, 最终在节点上执行

您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络发送)仍然会发生。 [这让您甚至可以在部署到生产环境之前发现任何错误]

在第二种情况下,您正在调用一个方法,该方法在 map 函数内部的类 testing 中定义。 Spark 看到了这一点,并且由于方法无法自行序列化,Spark 尝试序列化 整个 testing 类,以便代码在另一个 JVM 中执行时仍然可以工作。你有两种可能:

要么让类测试可序列化,所以整个类都可以被 Spark 序列化:

import org.apache.spark.SparkContext,SparkConf

object Spark 
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))


object NOTworking extends App 
  new Test().doIT


class Test extends java.io.Serializable 
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  

  def someFunc(a: Int) = a + 1

或者你创建someFunc函数而不是方法(函数是Scala中的对象),这样Spark就可以序列化它:

import org.apache.spark.SparkContext,SparkConf

object Spark 
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))


object NOTworking extends App 
  new Test().doIT


class Test 
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  

  val someFunc = (a: Int) => a + 1

您可能对类序列化的类似但不一样的问题感兴趣,您可以阅读它in this Spark Summit 2013 presentation。

附带说明,您可以将rddList.map(someFunc(_)) 重写为rddList.map(someFunc),它们完全相同。通常,第二个是首选,因为它不那么冗长且易于阅读。

编辑(2015-03-15):SPARK-5307 引入了 SerializationDebugger,Spark 1.3.0 是第一个使用它的版本。它将序列化路径添加到 NotSerializableException。当遇到 NotSerializableException 时,调试器会访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户找到对象。

在 OP 的情况下,这是打印到标准输出的内容:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

【讨论】:

嗯,你所解释的当然是有道理的,并解释了为什么整个类都被序列化(我没有完全理解)。尽管如此,我仍然认为 rdd 是不可序列化的(它们扩展了 Serializable,但这并不意味着它们不会导致 NotSerializableException,试试吧)。这就是为什么如果你把它们放在类之外它会修复错误。我将稍微修改一下我的答案,以更准确地表达我的意思——即它们导致异常,而不是它们扩展了界面。 如果您无法控制需要可序列化的类...如果您使用的是 Scala,则可以使用 Serializable 对其进行实例化:val test = new Test with Serializable "rddList.map(someFunc(_)) 到 rddList.map(someFunc),它们完全一样" 不,它们不完全一样,实际上使用后者会导致序列化异常是不是前者不会。 @samthebest 你能解释一下为什么 map(someFunc(_)) 不会导致序列化异常而 map(someFunc) 会? 第二个选项对我不起作用,直到 someFuc 移出测试类。是版本更改的影响(这里是 Spark 2.3 和 Scala 2.11)还是我遗漏了什么?【参考方案2】:

Grega's answer 很好地解释了为什么原始代码不起作用以及解决问题的两种方法。但是,这种解决方案不是很灵活;考虑您的闭包包含对您无法控制的非Serializable 类的方法调用的情况。您既不能将Serializable 标记添加到此类,也不能更改底层实现以将方法更改为函数。

Nilesh 对此提供了一个很好的解决方法,但该解决方案可以更加简洁和通用:

def genMapper[A, B](f: A => B): A => B = 
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)

这个函数序列化器可以用来自动包装闭包和方法调用:

rdd map genMapper(someFunc)

这种技术还有一个好处是不需要额外的 Shark 依赖项来访问 KryoSerializationWrapper,因为 Twitter 的 Chill 已经被核心 Spark 引入

【讨论】:

嗨,我想知道如果我使用你的代码,我需要注册一些东西吗?我尝试从 kryo 获得 Unable find 类异常。 THX【参考方案3】:

完整的谈话充分解释了这个问题,它提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

投票最多的答案基本上是建议放弃整个语言特性——不再使用方法而只使用函数。实际上,应该避免在类中使用函数式编程方法,但是将它们转换为函数并不能解决这里的设计问题(参见上面的链接)。

作为在这种特殊情况下的快速修复,您可以使用@transient 注释告诉它不要尝试序列化违规值(这里,Spark.ctx 是一个自定义类,而不是 Spark 的一个遵循 OP 命名的类):

@transient
val rddList = Spark.ctx.parallelize(list)

你也可以重构代码,让 rddList 存在于其他地方,但这也很讨厌。

未来可能是孢子

未来 Scala 将包含这些称为“孢子”的东西,这应该允许我们细粒度地控制哪些内容会被闭包拉入,哪些不会被拉入。此外,这应该会将所有意外拉入不可序列化类型(或任何不需要的值)的错误转变为编译错误,而不是现在可怕的运行时异常/内存泄漏。

http://docs.scala-lang.org/sips/pending/spores.html

关于 Kryo 序列化的提示

使用 kyro 时,请务必进行注册,这意味着您会得到错误而不是内存泄漏:

“最后,我知道 kryo 有 kryo.setRegistrationOptional(true),但我很难弄清楚如何使用它。当这个选项打开时,如果我没有,kryo 似乎仍然会抛出异常'没有注册课程。”

Strategy for registering classes with kryo

当然,这只给你类型级控制而不是值级控制。

...更多想法即将到来。

【讨论】:

【参考方案4】:

我遇到了类似的问题,我从Grega's answer 了解到的是

object NOTworking extends App 
 new testing().doIT

//adding extends Serializable wont help
class testing 

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))


def someFunc(a:Int) = a+1


您的 doIT 方法正在尝试序列化 someFunc(_) 方法,但由于方法不可序列化,它会尝试序列化类 testing这又是不可序列化的。

所以要让你的代码工作,你应该在 doIT 方法中定义 someFunc。例如:

def doIT =  
 def someFunc(a:Int) = a+1
  //function definition
 
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))

如果有多个函数出现,那么所有这些函数都应该对父上下文可用。

【讨论】:

【参考方案5】:

我使用不同的方法解决了这个问题。您只需要在通过闭包之前序列化对象,然后再进行反序列化。即使您的类不可序列化,这种方法也很有效,因为它在幕后使用了 Kryo。你只需要一些咖喱。 ;)

这是我如何做到的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = 
    kryoWrapper.value.apply(foo)

val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) 
    def apply(foo: Foo) : Bar =  //This is the real function 

随意让 Blah 变得尽可能复杂,类、伴生对象、嵌套类、对多个 3rd 方库的引用。

KryoSerializationWrapper 指的是:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

【讨论】:

这实际上是序列化实例还是创建静态实例并序列化引用(请参阅我的答案)。 @samthebest 你能详细说明一下吗?如果你调查KryoSerializationWrapper,你会发现它让Spark 认为它确实是java.io.Serializable——它只是使用Kryo 在内部序列化对象——更快、更简单。而且我不认为它处理静态实例 - 它只是在调用 value.apply() 时反序列化值。【参考方案6】:

我不完全确定这是否适用于 Scala,但在 Java 中,我通过重构代码解决了 NotSerializableException 问题,以便闭包不会访问不可序列化的 final 字段。

【讨论】:

我在 Java 中面临同样的问题,我试图在 RDD foreach 方法中使用 Java IO 包中的 FileWriter 类。请告诉我我们如何解决这个问题。 好吧@Shankar,如果FileWriter 是外部类的final 字段,则不能这样做。但是FileWriter 可以由StringFile 构造,两者都是Serializable。所以重构你的代码,根据外部类的文件名构造一个本地的FileWriter【参考方案7】:

仅供参考,在 Spark 2.4 中,很多人可能会遇到这个问题。 Kryo 序列化已经变得更好,但在许多情况下,您不能使用 spark.kryo.unsafe=true 或天真的 kryo 序列化程序。

为了快速修复,请尝试在 Spark 配置中更改以下内容

spark.kryo.unsafe="false"

spark.serializer="org.apache.spark.serializer.JavaSerializer"

我通过使用显式广播变量并利用新的内置 twitter-chill api 修改我遇到或亲自编写的自定义 RDD 转换,将它们从 rdd.map(row =&gt; 转换为 rdd.mapPartitions(partition =&gt; 函数。

示例

旧的(不是很好的)方式

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => 
    val value = sampleMap.get(row._1)
    value
)

替代(更好)方式

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => 
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => 
        val value = sampleMap.get(row._1)
        value
    ).toIterator
)

这种新方法每个分区只会调用一次广播变量,这样更好。如果您不注册类,您仍然需要使用 Java 序列化。

【讨论】:

【参考方案8】:

我也有类似的经历。

当我在驱动程序(主)上初始化一个变量时触发了该错误,但随后尝试在其中一名工作人员上使用它。 发生这种情况时,Spark Streaming 将尝试序列化对象以将其发送给工作人员,如果对象不可序列化,则会失败。

我通过将变量设为静态解决了错误

以前的非工作代码

  private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

工作代码

  private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

学分:

    https://docs.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html(pradeepcheekatla-msft 的答案) https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

【讨论】:

【参考方案9】:
def upper(name: String) : String =  
var uppper : String  =  name.toUpperCase()
uppper


val toUpperName = udf (EmpName: String) => upper(EmpName)
val emp_details = """["id": "1","name": "James Butt","country": "USA",
"id": "2", "name": "Josephine Darakjy","country": "USA",
"id": "3", "name": "Art Venere","country": "USA",
"id": "4", "name": "Lenna Paprocki","country": "USA",
"id": "5", "name": "Donette Foller","country": "USA",
"id": "6", "name": "Leota Dilliard","country": "USA"]"""

val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)

这会报错 org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

解决方案-

import java.io.Serializable;

object obj_upper extends Serializable  
  def upper(name: String) : String = 
  
    var uppper : String  =  name.toUpperCase()
    uppper
  
val toUpperName = udf (EmpName: String) => upper(EmpName)


val df_upperName= 
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)

【讨论】:

【参考方案10】:

类中定义的Scala方法是不可序列化的,可以将方法转换为函数来解决序列化问题。

方法语法

def func_name (x String) : String = 
...
return x

函数语法

val func_name =  (x String) => 
...
x

【讨论】:

【参考方案11】:

我的解决方案是添加一个 compagnion 类来处理类中不可序列化的所有方法。

【讨论】:

以上是关于任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时的主要内容,如果未能解决你的问题,请参考以下文章

Spark 任务不可序列化

为啥 MicroBatchReader 必须是可序列化的?任务不可序列化错误

Scala Spark - 任务不可序列化

Spark Scala:任务不可序列化错误

任务不可序列化错误:Spark

任务在 Databricks 上的 Scala 中不可序列化