用于不可序列化的对象和函数的 Spark Scala 编程

Posted

技术标签:

【中文标题】用于不可序列化的对象和函数的 Spark Scala 编程【英文标题】:Spark Scala Programming for not serializable objects and functions 【发布时间】:2017-09-03 15:58:16 【问题描述】:

运行 Spark Scala 程序时出现“任务不可序列化”异常

Spark RDD 属于不可序列化类型(java 类) 调用的函数来自不可序列化的类(再次是 java 类)

我的代码是这样的

object Main
    def main(args : Array(String)
        ...
        var rdd = sc.textFile(filename)
                  .map(line => new NotSerializableJClass(line)).cache() 
        //rdd is RDD[NotSerializableJClass]
        ...
        var test = new NotSerializableJPredicate()
        rdd = rdd.filter(elem => test.test(elem))
        //throws TaskNotSerializable on test Predicate class
    

我注意到我可以解决第二部分

rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))

但我仍然得到 RDD 中对象类的异常。而且我会以另一种方式也以另一种方式的第二部分,只是因为我不想创建大量 PredicateClass 的对象。

你能帮帮我吗?我怎样才能继续使用不可序列化的类?

【问题讨论】:

NotSerializableJClass 是第三方类还是您的应用程序中定义的类? 【参考方案1】:

RDD 必须是可序列化的,因此您不能创建非可序列化类的 RDD。

对于您的谓词,您可以使用 mapPartitions 编写它。

rdd.mapPartitions
  part => 
    val test = new NotSerializableJPredicate()
    part.filterelem => test.test(elem)
   

mapPartitons 将在每个分区运行一次,因此它允许您在执行程序上实例化不可序列化的类,但它只需要在每个分区而不是每条记录上执行一次。

【讨论】:

【参考方案2】:

帮助我避免任务序列化问题的一些一般规则:

如果您从代码中调用任何类的方法;Spark 将需要序列化包含该方法的整个类。绕过的方法可以是以下任何一种: a> 在 NotSerializableClass 中将方法声明为函数变量;所以不要写: def foo(x:Int)=blah blah 尝试使用 val foo = (x:Int)=>blah blah 所以; spark 现在不再需要序列化整个类。 b> 在某些情况下,重构代码以在单独的类中提取相关部分可能是可行的方法。 c>将类中实际上对于作业不需要的对象标记为@transient,并标记类Serializable

【讨论】:

以上是关于用于不可序列化的对象和函数的 Spark Scala 编程的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

Spark Scala:任务不可序列化错误

Spark 任务不可序列化

org.apache.spark.SparkException:任务不可序列化,wh

Spark 和不可序列化的 DateTimeFormatter

Java Spark Dataset MapFunction - 任务在没有任何类引用的情况下不可序列化