任务不可序列化错误:Spark
Posted
技术标签:
【中文标题】任务不可序列化错误:Spark【英文标题】:Task not Serializable error:Spark 【发布时间】:2016-10-21 15:32:36 【问题描述】:我有一个 (String,(Int,Iterable[String]))
形式的 RDD。对于 RDD 中的每个条目,整数值(我称之为距离)最初设置为 10。
Iterable[String]
中的每个元素在此 RDD 中都有自己的条目,它用作键(因此我们在单独的 rdd 条目中拥有 Iterable[String]
中每个元素的距离)。我的意图是执行以下操作:
1. 如果列表 (Iterable[String]
) 包含元素“Bethan”,我将其距离指定为 1。
2.在此之后,我通过过滤创建了距离为 1 的所有键的列表。
3.在此之后,我将 RDD 转换为一个新的 RDD,如果它自己的列表中的任何元素的距离为 1,则将其距离值更新为 2。
我有以下代码:
val disOneRdd = disRdd.map(x=> if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x)
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=>
var b:Boolean = false
loop.breakable
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
loop.break
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
)
但是当我运行它时,我收到错误“任务不可序列化”。我该怎么做,还有更好的方法吗?
编辑
输入表单的RDD:
("abc",(10,List("efg","hij","klm")))
("efg",(10,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(10,List("czx","Beethan","abc")))
("zse",(10,List("efg","Beethan","nbh")))
("gvf",(10,List("vcsd","fdgd")))
...
列表中包含 Beethan 的每个元素的距离都应为 1。每个具有“距离为 1 的元素”(而不是 Beethan)的元素的距离应为 2。out 具有以下形式:
("abc",(2,List("efg","hij","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh"))
("gvf",(10,List("vcsd","fdgd")))
...
错误信息:
[error] (run-main-0) org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at Bacon$.main(Bacon.scala:86)
at Bacon.main(Bacon.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: scala.util.control.Breaks
Serialization stack:
- object not serializable (class: scala.util.control.Breaks, value: scala.util.control.Breaks@78426203)
- field (class: Bacon$$anonfun$15, name: loop$1, type: class scala.util.control.Breaks)
- object (class Bacon$$anonfun$15, <function1>)
【问题讨论】:
一个小例子(示例输入和预期输出)将有助于了解您在此处尝试实现的目标 @cheseaux 请查看编辑 @sarthak 也请添加 stacktrace - 它非常有用,通常会有哪些类导致错误的信息 @T.Gawęda 你的意思是错误信息吗?我已更新问题中的错误消息。 【参考方案1】:val disOneRdd = disRdd.map(x=> if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x)
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=>
var b:Boolean = x._._2.filter(y => lst.contains(y)).size() > 0
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
)
或
import scala.util.control.Breaks._
val disOneRdd = disRdd.map(x=> if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x)
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=>
var b:Boolean = false
breakable
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
break
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
)
两个版本都适合我。问题在于不可序列化的 loop.breakable。老实说,我不知道这种构造的行为是否发生了变化,但是在将 loop.breakable
替换为 breakable
之后它可以工作 - 也许有一些 API 更改。带过滤器的版本可能会更慢,但可以避免breakable
的问题
尽管有主要问题,lst 应该是广播变量 - 但是我没有将广播变量放在这里以提供尽可能简单的答案
【讨论】:
spark序列化详解:***.com/questions/40818001/…以上是关于任务不可序列化错误:Spark的主要内容,如果未能解决你的问题,请参考以下文章
Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化
org.apache.spark.SparkException:任务不可序列化 java