用于不可序列化的对象和函数的 Spark Scala 编程
Posted
技术标签:
【中文标题】用于不可序列化的对象和函数的 Spark Scala 编程【英文标题】:Spark Scala Programming for not serializable objects and functions 【发布时间】:2017-09-03 15:58:16 【问题描述】:运行 Spark Scala 程序时出现“任务不可序列化”异常
Spark RDD 属于不可序列化类型(java 类) 调用的函数来自不可序列化的类(再次是 java 类)我的代码是这样的
object Main
def main(args : Array(String)
...
var rdd = sc.textFile(filename)
.map(line => new NotSerializableJClass(line)).cache()
//rdd is RDD[NotSerializableJClass]
...
var test = new NotSerializableJPredicate()
rdd = rdd.filter(elem => test.test(elem))
//throws TaskNotSerializable on test Predicate class
我注意到我可以解决第二部分
rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))
但我仍然得到 RDD 中对象类的异常。而且我会以另一种方式也以另一种方式的第二部分,只是因为我不想创建大量 PredicateClass 的对象。
你能帮帮我吗?我怎样才能继续使用不可序列化的类?
【问题讨论】:
NotSerializableJClass
是第三方类还是您的应用程序中定义的类?
【参考方案1】:
RDD 必须是可序列化的,因此您不能创建非可序列化类的 RDD。
对于您的谓词,您可以使用 mapPartitions 编写它。
rdd.mapPartitions
part =>
val test = new NotSerializableJPredicate()
part.filterelem => test.test(elem)
mapPartitons 将在每个分区运行一次,因此它允许您在执行程序上实例化不可序列化的类,但它只需要在每个分区而不是每条记录上执行一次。
【讨论】:
【参考方案2】:帮助我避免任务序列化问题的一些一般规则:
如果您从代码中调用任何类的方法;Spark 将需要序列化包含该方法的整个类。绕过的方法可以是以下任何一种: a> 在 NotSerializableClass 中将方法声明为函数变量;所以不要写: def foo(x:Int)=blah blah 尝试使用 val foo = (x:Int)=>blah blah 所以; spark 现在不再需要序列化整个类。 b> 在某些情况下,重构代码以在单独的类中提取相关部分可能是可行的方法。 c>将类中实际上对于作业不需要的对象标记为@transient,并标记类Serializable
【讨论】:
以上是关于用于不可序列化的对象和函数的 Spark Scala 编程的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?
org.apache.spark.SparkException:任务不可序列化,wh