Scala Spark - 任务不可序列化

Posted

技术标签:

【中文标题】Scala Spark - 任务不可序列化【英文标题】:Scala Spark - task not serializable 【发布时间】:2015-12-16 03:13:20 【问题描述】:

我有以下代码,故障出在 sc.parallelize()

val pairs = ret.cartesian(ret)
    .map 
        case ((k1, v1), (k2, v2)) => ((k1, k2), (v1.toList, v2.toList))
    
for (pair <- pairs) 
    val test = sc.parallelize(pair._2._1.map(_._1 ))

在哪里

k1、k2 是字符串 v1、v2 是双精度列表

每当我尝试访问 sc 时都会收到以下错误。我在这里做错了什么?

线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1893) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:868) 在 CorrelationCalc$.main(CorrelationCalc.scala:33) 在 CorrelationCalc.main(CorrelationCalc.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化栈: - 对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@40bee8c5) - 字段(类:CorrelationCalc$$anonfun$main$1,名称:sc$1,类型:类 org.apache.spark.SparkContext) - 对象(类 CorrelationCalc$$anonfun$main$1, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 20 更多

【问题讨论】:

你的第一张地图似乎是个小问题。你打算用第二张地图做什么? 第一张地图只是重新格式化我的数据,第二张地图用于从列表中制作 RDD。它总是在第二张地图上失败,因为我无法序列化 sc。 什么是对的类型? 【参考方案1】:

理解只是做一个pairs.map()

RDD 操作由工作人员执行,为了让他们完成这项工作,您发送给他们的任何内容都必须是可序列化的。 SparkContext 附加到主服务器:它负责管理整个集群。

如果要创建 RDD,则必须了解整个集群(即第二个“D” --- 分布式),因此您无法在工作人员上创建新的 RDD。无论如何,您可能不想将每一行成对地变成一个 RDD(并且每个都具有相同的名称!)。

很难从你的代码中看出你想做什么,但它可能看起来像

val test = pairs.map( r => r._2._1) 

这将是一个 RDD,其中每一行都是 v1.toList 中的任何内容

【讨论】:

以上是关于Scala Spark - 任务不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Scala 中使用自定义数据框类时任务不可序列化

Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

用于不可序列化的对象和函数的 Spark Scala 编程

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

任务在 Databricks 上的 Scala 中不可序列化

Spark 应用程序收到“任务不可序列化”的错误?