任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时
Posted
技术标签:
【中文标题】任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时【英文标题】:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects 【发布时间】:2014-04-30 19:55:44 【问题描述】:在闭包外调用函数时出现奇怪的行为:
当函数在对象中时,一切正常 当函数在类中时:任务不可序列化:java.io.NotSerializableException:测试
问题是我需要我的代码在一个类而不是一个对象中。知道为什么会这样吗? Scala 对象是否序列化(默认?)?
这是一个工作代码示例:
object working extends App
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
这是无效的示例:
object NOTworking extends App
new testing().doIT
//adding extends Serializable wont help
class testing
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT =
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
def someFunc(a:Int) = a+1
【问题讨论】:
什么是 Spark.ctx?方法 ctx AFAICT 没有 Spark 对象 【参考方案1】:RDDs extend the Serialisable interface,所以这不是导致您的任务失败的原因。现在这并不意味着您可以使用 Spark 序列化 RDD
并避免使用 NotSerializableException
Spark 是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),可以看作是一个分布式集合。基本上,RDD 的元素在集群的节点上进行分区,但 Spark 将其从用户那里抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地的一样。
不想涉及太多细节,但是当您在 RDD(map
、flatMap
、filter
等)上运行不同的转换时,您的转换代码(闭包)是:
-
在驱动节点上序列化,
运送到集群中的适当节点,
反序列化,
最终在节点上执行
您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络发送)仍然会发生。 [这让您甚至可以在部署到生产环境之前发现任何错误]
在第二种情况下,您正在调用一个方法,该方法在 map 函数内部的类 testing
中定义。 Spark 看到了这一点,并且由于方法无法自行序列化,Spark 尝试序列化 整个 testing
类,以便代码在另一个 JVM 中执行时仍然可以工作。你有两种可能:
要么让类测试可序列化,所以整个类都可以被 Spark 序列化:
import org.apache.spark.SparkContext,SparkConf
object Spark
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
object NOTworking extends App
new Test().doIT
class Test extends java.io.Serializable
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() =
val after = rddList.map(someFunc)
after.collect().foreach(println)
def someFunc(a: Int) = a + 1
或者你创建someFunc
函数而不是方法(函数是Scala中的对象),这样Spark就可以序列化它:
import org.apache.spark.SparkContext,SparkConf
object Spark
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
object NOTworking extends App
new Test().doIT
class Test
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() =
val after = rddList.map(someFunc)
after.collect().foreach(println)
val someFunc = (a: Int) => a + 1
您可能对类序列化的类似但不一样的问题感兴趣,您可以阅读它in this Spark Summit 2013 presentation。
附带说明,您可以将rddList.map(someFunc(_))
重写为rddList.map(someFunc)
,它们完全相同。通常,第二个是首选,因为它不那么冗长且易于阅读。
编辑(2015-03-15):SPARK-5307 引入了 SerializationDebugger,Spark 1.3.0 是第一个使用它的版本。它将序列化路径添加到 NotSerializableException。当遇到 NotSerializableException 时,调试器会访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户找到对象。
在 OP 的情况下,这是打印到标准输出的内容:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
【讨论】:
嗯,你所解释的当然是有道理的,并解释了为什么整个类都被序列化(我没有完全理解)。尽管如此,我仍然认为 rdd 是不可序列化的(它们扩展了 Serializable,但这并不意味着它们不会导致 NotSerializableException,试试吧)。这就是为什么如果你把它们放在类之外它会修复错误。我将稍微修改一下我的答案,以更准确地表达我的意思——即它们导致异常,而不是它们扩展了界面。 如果您无法控制需要可序列化的类...如果您使用的是 Scala,则可以使用 Serializable 对其进行实例化:val test = new Test with Serializable
"rddList.map(someFunc(_)) 到 rddList.map(someFunc),它们完全一样" 不,它们不完全一样,实际上使用后者会导致序列化异常是不是前者不会。
@samthebest 你能解释一下为什么 map(someFunc(_)) 不会导致序列化异常而 map(someFunc) 会?
第二个选项对我不起作用,直到 someFuc
移出测试类。是版本更改的影响(这里是 Spark 2.3 和 Scala 2.11)还是我遗漏了什么?【参考方案2】:
Grega's answer 很好地解释了为什么原始代码不起作用以及解决问题的两种方法。但是,这种解决方案不是很灵活;考虑您的闭包包含对您无法控制的非Serializable
类的方法调用的情况。您既不能将Serializable
标记添加到此类,也不能更改底层实现以将方法更改为函数。
Nilesh 对此提供了一个很好的解决方法,但该解决方案可以更加简洁和通用:
def genMapper[A, B](f: A => B): A => B =
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
这个函数序列化器可以用来自动包装闭包和方法调用:
rdd map genMapper(someFunc)
这种技术还有一个好处是不需要额外的 Shark 依赖项来访问 KryoSerializationWrapper
,因为 Twitter 的 Chill 已经被核心 Spark 引入
【讨论】:
嗨,我想知道如果我使用你的代码,我需要注册一些东西吗?我尝试从 kryo 获得 Unable find 类异常。 THX【参考方案3】:完整的谈话充分解释了这个问题,它提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
投票最多的答案基本上是建议放弃整个语言特性——不再使用方法而只使用函数。实际上,应该避免在类中使用函数式编程方法,但是将它们转换为函数并不能解决这里的设计问题(参见上面的链接)。
作为在这种特殊情况下的快速修复,您可以使用@transient
注释告诉它不要尝试序列化违规值(这里,Spark.ctx
是一个自定义类,而不是 Spark 的一个遵循 OP 命名的类):
@transient
val rddList = Spark.ctx.parallelize(list)
你也可以重构代码,让 rddList 存在于其他地方,但这也很讨厌。
未来可能是孢子
未来 Scala 将包含这些称为“孢子”的东西,这应该允许我们细粒度地控制哪些内容会被闭包拉入,哪些不会被拉入。此外,这应该会将所有意外拉入不可序列化类型(或任何不需要的值)的错误转变为编译错误,而不是现在可怕的运行时异常/内存泄漏。
http://docs.scala-lang.org/sips/pending/spores.html
关于 Kryo 序列化的提示
使用 kyro 时,请务必进行注册,这意味着您会得到错误而不是内存泄漏:
“最后,我知道 kryo 有 kryo.setRegistrationOptional(true),但我很难弄清楚如何使用它。当这个选项打开时,如果我没有,kryo 似乎仍然会抛出异常'没有注册课程。”
Strategy for registering classes with kryo
当然,这只给你类型级控制而不是值级控制。
...更多想法即将到来。
【讨论】:
【参考方案4】:我遇到了类似的问题,我从Grega's answer 了解到的是
object NOTworking extends App
new testing().doIT
//adding extends Serializable wont help
class testing
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT =
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
def someFunc(a:Int) = a+1
您的 doIT 方法正在尝试序列化 someFunc(_) 方法,但由于方法不可序列化,它会尝试序列化类 testing这又是不可序列化的。
所以要让你的代码工作,你应该在 doIT 方法中定义 someFunc。例如:
def doIT =
def someFunc(a:Int) = a+1
//function definition
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
如果有多个函数出现,那么所有这些函数都应该对父上下文可用。
【讨论】:
【参考方案5】:我使用不同的方法解决了这个问题。您只需要在通过闭包之前序列化对象,然后再进行反序列化。即使您的类不可序列化,这种方法也很有效,因为它在幕后使用了 Kryo。你只需要一些咖喱。 ;)
这是我如何做到的一个例子:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar =
kryoWrapper.value.apply(foo)
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar)
def apply(foo: Foo) : Bar = //This is the real function
随意让 Blah 变得尽可能复杂,类、伴生对象、嵌套类、对多个 3rd 方库的引用。
KryoSerializationWrapper 指的是:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
【讨论】:
这实际上是序列化实例还是创建静态实例并序列化引用(请参阅我的答案)。 @samthebest 你能详细说明一下吗?如果你调查KryoSerializationWrapper
,你会发现它让Spark 认为它确实是java.io.Serializable
——它只是使用Kryo 在内部序列化对象——更快、更简单。而且我不认为它处理静态实例 - 它只是在调用 value.apply() 时反序列化值。【参考方案6】:
我不完全确定这是否适用于 Scala,但在 Java 中,我通过重构代码解决了 NotSerializableException
问题,以便闭包不会访问不可序列化的 final
字段。
【讨论】:
我在 Java 中面临同样的问题,我试图在 RDD foreach 方法中使用 Java IO 包中的 FileWriter 类。请告诉我我们如何解决这个问题。 好吧@Shankar,如果FileWriter
是外部类的final
字段,则不能这样做。但是FileWriter
可以由String
或File
构造,两者都是Serializable
。所以重构你的代码,根据外部类的文件名构造一个本地的FileWriter
。【参考方案7】:
仅供参考,在 Spark 2.4 中,很多人可能会遇到这个问题。 Kryo 序列化已经变得更好,但在许多情况下,您不能使用 spark.kryo.unsafe=true 或天真的 kryo 序列化程序。
为了快速修复,请尝试在 Spark 配置中更改以下内容
spark.kryo.unsafe="false"
或
spark.serializer="org.apache.spark.serializer.JavaSerializer"
我通过使用显式广播变量并利用新的内置 twitter-chill api 修改我遇到或亲自编写的自定义 RDD 转换,将它们从 rdd.map(row =>
转换为 rdd.mapPartitions(partition =>
函数。
示例
旧的(不是很好的)方式
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row =>
val value = sampleMap.get(row._1)
value
)
替代(更好)方式
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition =>
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row =>
val value = sampleMap.get(row._1)
value
).toIterator
)
这种新方法每个分区只会调用一次广播变量,这样更好。如果您不注册类,您仍然需要使用 Java 序列化。
【讨论】:
【参考方案8】:我也有类似的经历。
当我在驱动程序(主)上初始化一个变量时触发了该错误,但随后尝试在其中一名工作人员上使用它。 发生这种情况时,Spark Streaming 将尝试序列化对象以将其发送给工作人员,如果对象不可序列化,则会失败。
我通过将变量设为静态解决了错误。
以前的非工作代码
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
工作代码
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
学分:
-
https://docs.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html(pradeepcheekatla-msft 的答案)
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
【讨论】:
【参考方案9】:def upper(name: String) : String =
var uppper : String = name.toUpperCase()
uppper
val toUpperName = udf (EmpName: String) => upper(EmpName)
val emp_details = """["id": "1","name": "James Butt","country": "USA",
"id": "2", "name": "Josephine Darakjy","country": "USA",
"id": "3", "name": "Art Venere","country": "USA",
"id": "4", "name": "Lenna Paprocki","country": "USA",
"id": "5", "name": "Donette Foller","country": "USA",
"id": "6", "name": "Leota Dilliard","country": "USA"]"""
val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)
这会报错 org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
解决方案-
import java.io.Serializable;
object obj_upper extends Serializable
def upper(name: String) : String =
var uppper : String = name.toUpperCase()
uppper
val toUpperName = udf (EmpName: String) => upper(EmpName)
val df_upperName=
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)
【讨论】:
【参考方案10】:类中定义的Scala方法是不可序列化的,可以将方法转换为函数来解决序列化问题。
方法语法
def func_name (x String) : String =
...
return x
函数语法
val func_name = (x String) =>
...
x
【讨论】:
【参考方案11】:我的解决方案是添加一个 compagnion 类来处理类中不可序列化的所有方法。
【讨论】:
以上是关于任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时的主要内容,如果未能解决你的问题,请参考以下文章