org.apache.spark.SparkException:任务不可序列化,wh

Posted

技术标签:

【中文标题】org.apache.spark.SparkException:任务不可序列化,wh【英文标题】:org.apache.spark.SparkException: Task not serializable, wh 【发布时间】:2016-07-27 16:32:27 【问题描述】:

当我实现自己的partioner并尝试对原始rdd进行洗牌时,我遇到了一个问题。我知道这是由不可序列化的引用函数引起的,但是在添加

扩展可序列化

对于每个相关的类,这个问题仍然存在。我该怎么办?

线程“main”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1622)

object STRPartitioner extends Serializable
  def apply(expectedParNum: Int,
        sampleRate: Double,
        originRdd: RDD[Vertex]): Unit= 
    val bound = computeBound(originRdd)
    val rdd = originRdd.mapPartitions(
      iter => iter.map(row => 
        val cp = row
        (cp.coordinate, cp.copy())
      
      )
    )
    val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd)
    val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd,  partitioner)
    shuffled.setSerializer(new KryoSerializer(new SparkConf(false)))
    val result = shuffled.collect()
  

class STRPartitioner(expectedParNum: Int,
                     sampleRate: Double,
                     bound: MBR,
                     rdd: RDD[_ <: Product2[Coordinate, Vertex]])
  extends Partitioner with  Serializable 
    ... 

【问题讨论】:

【参考方案1】:

我只是解决问题!将 -Dsun.io.serialization.extendedDebugInfo=true 添加到您的 VM 配置中,您将针对不可序列化的类!

【讨论】:

以上是关于org.apache.spark.SparkException:任务不可序列化,wh的主要内容,如果未能解决你的问题,请参考以下文章