Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

Posted

技术标签:

【中文标题】Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?【英文标题】:Spark - Task not serializable: How to work with complex map closures that call outside classes/objects? 【发布时间】:2014-05-27 19:42:30 【问题描述】:

看看这个问题:Scala + Spark - Task not serializable: java.io.NotSerializableExceptionon. When calling function outside closure only on classes not objects。

问题:

假设我的映射器可以是内部调用其他类并创建对象并在内部执行不同操作的函数(def)。 (或者它们甚至可以是扩展 (Foo) => Bar 的类并在它们的 apply 方法中进行处理 - 但现在让我们忽略这种情况)

Spark 仅支持闭包的 Java 序列化。有没有办法解决这个问题?我们可以使用一些东西而不是闭包来做我想做的事吗?我们可以使用 Hadoop 轻松完成此类工作。这件事让我几乎无法使用 Spark。不能指望所有 3rd 方库的所有类都扩展 Serializable!

可能的解决方案:

这样的东西似乎有用吗:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

答案似乎肯定是包装器,但我不知道具体是怎么做的。

【问题讨论】:

相关:另外,避免将 SparkContext 传递到 RDD map/filter/flatMap 等转换中,这可能会产生类似的错误 【参考方案1】:

我自己想出了如何做到这一点!

您只需要在通过闭包之前序列化对象,然后再进行反序列化。即使您的类不可序列化,这种方法也很有效,因为它在幕后使用了 Kryo。你只需要一些咖喱。 ;)

这是我如何做到的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = 
    kryoWrapper.value.apply(foo)

val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) 
    def apply(foo: Foo) : Bar =  //This is the real function 

随意让 Blah 变得尽可能复杂,类、伴生对象、嵌套类、对多个 3rd 方库的引用。

KryoSerializationWrapper 指的是:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

【讨论】:

另一种选择是在 Blah 类本身中实现 ​​Serializable 接口。 再想一想,wrapper 让它变得更加灵活,你可以根据需要切换到不同类型的序列化。 @SKP 这就是问题的重点。事实证明,不仅 Blah,而且 Blah 的实例字段都需要扩展 Serializable - 这很明显,因为所有内容都将被递归存储。如果您的班级使用 3rd 方库并且修改他们的代码会导致您做噩梦怎么办?这就是像这样的东西派上用场的地方。此外,Java 序列化很慢。【参考方案2】:

在使用 Java API 的情况下,在传递给映射函数闭包时应避免使用匿名类。您需要一个扩展函数并将其传递给 map(..) 的类,而不是执行 map(new Function) 看: https://yanago.wordpress.com/2015/03/21/apache-spark/

【讨论】:

当你说类扩展你的函数时,我想扩展作为接口的 VoidFunction,我在这里有点困惑.. 我需要实现还是扩展,如果我需要扩展我需要实际创建界面对吗? 在该特定示例中,您需要扩展 PairFunction。无需实现隔行扫描。地图采用 (PairFunction f) 或 (Function f) 能否在这里举个例子? 网站被暂停,有什么例子可以给我们吗? 如果我理解你的答案正确,不,这不起作用。

以上是关于Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Scala:任务不可序列化错误

在 Spark Scala 中使用自定义数据框类时任务不可序列化

Spark 应用程序收到“任务不可序列化”的错误?

Spark:DataFrame 上 UDF 的任务不可序列化

任务不可序列化错误:Spark

用于不可序列化的对象和函数的 Spark Scala 编程