Spark 任务不可序列化

Posted

技术标签:

【中文标题】Spark 任务不可序列化【英文标题】:Spark task not serializable 【发布时间】:2018-04-06 15:10:51 【问题描述】:

我已经尝试了在 *** 上找到的所有解决此问题的方法,但尽管如此,我还是无法解决。 我有一个实例化“推荐”对象的“MainObj”对象。当我调用“recommendationProducts”方法时,我总是得到一个错误。 下面是方法的代码:

def recommendationProducts(item: Int): Unit = 

val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = 
  vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())


val itemFactor = model.productFeatures.lookup(item).head
val itemVector = new DoubleMatrix(itemFactor)

//Here is where I get the error:
val sims = model.productFeatures.map  case (id, factor) =>
  val factorVector = new DoubleMatrix(factor)
  val sim = cosineSimilarity(factorVector, itemVector)
  (id, sim)


val sortedSims = sims.top(10)(Ordering.by[(Int, Double), Double] 
  case (id, similarity) => similarity
)

println("\nTop 10 products:")
sortedSims.map(x => (x._1, x._2)).foreach(println)

这是错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at RecommendationObj.recommendationProducts(RecommendationObj.scala:269)
at MainObj$.analisiIUNGO(MainObj.scala:257)
at MainObj$.menu(MainObj.scala:54)
at MainObj$.main(MainObj.scala:37)
at MainObj.main(MainObj.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7c2312fa)
- field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)
- object (class MainObj$$anon$1, MainObj$$anon$1@615bad16)
- field (class: RecommendationObj$$anonfun$37, name: $outer, type: class RecommendationObj)
- object (class RecommendationObj$$anonfun$37, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 14 more

我尝试添加: 1)“扩展可序列化”(Scala)到我的班级 2)“扩展扩展 java.io.Serializable”到我的类 3)“@transient”到某些部分 4)在这个类中获取模型(和其他特征)(现在我从另一个对象中获取它们,并将它们像参数一样传递给我的类)

我该如何解决?我要疯了! 提前谢谢!

【问题讨论】:

【参考方案1】:

钥匙在这里:

 field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)

所以你有名为 sc 的 SparkContext 类型的字段。 Spark 想要序列化类,所以他也尝试序列化所有字段。

你应该:

使用@transient 注解并检查是否为空,然后重新创建 不使用字段中的 SparkContext,而是将其放入方法的参数中。但是请记住,您永远不应该在 map、flatMap 等的闭包中使用 SparkContext。

【讨论】:

谢谢!有用!但是,我传递了类似于类的参数的 SparkContext (sc),并使用它在构造函数中加载模型。错了吗? @S.SP 如果对您有帮助,请投票并接受答案。没错,但是接下来你必须使用@transient 注解说序列化器不序列化它 好的!谢谢你。我想给你投票,但我仍然没有 15 个声望点来做这件事。很抱歉,您的回答对我帮助很大! @S.SP 谢谢 :)

以上是关于Spark 任务不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

Spark:DataFrame 上 UDF 的任务不可序列化

Spark Scala:任务不可序列化错误

任务不可序列化错误:Spark

org.apache.spark.SparkException:任务不可序列化 java

org.apache.spark.SparkException:任务不可序列化,wh

Scala Spark - 任务不可序列化