Spark Scala:任务不可序列化错误

Posted

技术标签:

【中文标题】Spark Scala:任务不可序列化错误【英文标题】:Spark Scala: Task Not serializable error 【发布时间】:2017-09-21 09:49:52 【问题描述】:

我正在使用带有 Scala 插件和 spark 库的 IntelliJ 社区版。我仍在学习 Spark,并且正在使用 Scala Worksheet。

我已经编写了以下代码,用于删除字符串中的标点符号:

def removePunctuation(text: String): String = 
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase

然后我阅读了一个文本文件并尝试删除标点符号:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

这会产生如下错误,任何帮助将不胜感激:

org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) 在 org.apache.spark.util.ClosureCleaner$.clean(/home/ubuntu/src/main/scala/Test.sc:104) 在 org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:365) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) 在#worksheet#.#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) 引起:java.io.NotSerializableException: A$A21$A$A21 序列化栈: - 对象不可序列化(类:A$A21$A$A21,值:A$A21$A$A21@62db3891) - 字段(类:A$A21$A$A21$$anonfun$words$1,名称:$outer,类型:类 A$A21$A$A21) - 对象(A$A21$A$A21$$anonfun$words$1 类,) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.RDD.map(RDD.scala:369) 在 A$A21$A$A21.words$lzycompute(Test.sc:27) 在 A$A21$A$A21.words(Test.sc:27) 在 A$A21$A$A21.get$$instance$$words(Test.sc:27) 在 A$A21$.main(Test.sc:73) 在 A$A21.main(Test.sc) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)

【问题讨论】:

【参考方案1】:

正如 T. Gaweda 已经指出的那样,您很可能在一个不可序列化的类中定义您的函数。因为它是一个纯函数,即它不依赖于封闭类的任何上下文,我建议你将它放入一个应该扩展Serializable 的伴生对象中。这相当于 Scala 的 Java 静态方法:

object Helper extends Serializable 
  def removePunctuation(text: String): String = 
    val punctPattern = "[^a-zA-Z0-9\\s]".r
    punctPattern.replaceAllIn(text, "").toLowerCase
  

【讨论】:

【参考方案2】:

正如@TGaweda 所建议的,Spark 的SerializationDebugger 对于识别“从给定对象到有问题对象的序列化路径”非常有帮助。堆栈跟踪中“序列化堆栈”之前的所有美元符号都表明您的方法的容器对象是问题所在。

虽然在你的容器类上加上 Serializable 是最简单的,但我更喜欢利用 Scala 是一种函数式语言这一事实​​,并将你的函数用作一等公民:

sc.textFile("/home/ubuntu/data.txt",4).map  text =>
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase

或者如果你真的想把事情分开:

val removePunctuation: String => String = (text: String) => 
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase

sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

这些选项自 Regex is serializable 起当然有效,您应该确认。

其次但非常重要的一点是,构造 Regex 的成本很高,因此为了提高性能,将其从转换中排除——可能与 broadcast 一起使用。

【讨论】:

你能详细说明你的最后评论吗? 嗨,请提供见解:作为复习的一部分,我在 Databricks 社区版的问题中运行了这个问题。现在,这是一个非集群设置,但通常会根据我所看到的情况发生序列化错误。但它没有。这是 2.4.4 版本 - 怎么想?文件太小?驱动方法?为什么会显示其他错误?【参考方案3】:

阅读stacktrace,有:

$outer,类型:A$A21$A$A21 类

这是一个很好的提示。您的 lambda 是可序列化的,但您的类不可序列化。

当你制作 lambda 表达式时,这个表达式会引用外部类。在您的情况下,外部类不可序列化,即未实现可序列化或字段之一不是可序列化的实例

【讨论】:

顺便说一句。它可能是重复的副本的副本.. ;) 但是我没有时间搜索将问题标记为重复的最佳答案。如果您找到一些好的解释,请联系我并将问题标记为重复 @TGaweda:感谢您的回答,但我处于学习阶段,完全不明白。我在发帖之前搜索了这个问题。然而,他们都没有深入解释这一切意味着什么以及如何解决它。如果您能提出一个可能的解决方案,那么将来遇到此错误的任何人都只会感谢您 @sumitb 这就是我发布这个答案的原因:) 我想我没有清楚地发布我的问题。让我再次澄清一下,我在“Scala Worksheet”中编写所有内容。没有关于外部对象是否可序列化的问题。如果我使用 main 方法创建一个 scala 对象/类,写出完全相同的代码,那么它工作正常。我要求找出如何在工作表中对此进行测试。我很乐意听到一些建议。 @T.Gawęda 您能否解释一下如何读取序列化堆栈,特别是您如何从序列化堆栈的“$outer, type: class A$A21$A$A21”部分推断出一些东西?相关问题:***.com/questions/54912790/…

以上是关于Spark Scala:任务不可序列化错误的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark - 任务不可序列化

在 Spark Scala 中使用自定义数据框类时任务不可序列化

用于不可序列化的对象和函数的 Spark Scala 编程

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

Spark 应用程序收到“任务不可序列化”的错误?

任务不可序列化错误:Spark