Spark:对象不可序列化

Posted

技术标签:

【中文标题】Spark:对象不可序列化【英文标题】:Spark: object not serializable 【发布时间】:2020-03-31 18:28:54 【问题描述】:

我有一个批处理作业,我尝试将其转换为结构化流。我收到以下错误:

20/03/31 15:09:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: com.apple.ireporter.analytics.compute.AggregateKey
Serialization stack:
    - object not serializable (class: com.apple.ireporter.analytics.compute.AggregateKey, value: d_)

...其中“d_”是数据集中的最后一行

这是相关代码sn-p

    df.writeStream.foreachBatch  (batchDF: DataFrame, batchId: Long) =>
      import spark.implicits._
      val javaRdd = batchDF.toJavaRDD
      val dataframeToRowColFunction = new RowToColumn(table)
      println("Back to Main class")
      val combinedRdd =javaRdd.flatMapToPair(dataframeToRowColFunction.FlatMapData2).combineByKey(aggrCreateComb.createCombiner,aggrMerge.aggrMerge,aggrMergeCombiner.aggrMergeCombiner)
      // spark.createDataFrame( combinedRdd).show(1); // I commented this
      // combinedRdd.collect() // I added this as a test
    

这是 FlatMapData2 类

  val FlatMapData2: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() 
    //val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() 
    override def call(x: Row) = 
      val tuples = new util.ArrayList[Tuple2[AggregateKey, AggregateValue]]
      val decomposedEvents = decomposer.decomposeDistributed(x)
      decomposedEvents.foreach 
        y =>  tuples.add(Tuple2(y._1,y._2))
      
      tuples.iterator()
    
  

这里是聚合的 Key 类

class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends Comparable [AggregateKey]
...

我是新手,任何帮助将不胜感激。如果还有什么需要补充的请告诉我

【问题讨论】:

最初的错误是由于spark.createDataFrame( combinedRdd).show(1);。我用combinedRdd.collect() 替换了它,现在它正在抛出NotSerializableException。我已经相应地修改了问题 可能是因为Map[Int,Any] Any 总是问题所在,在任何地方都不惜一切代价避免它。 Scala 有一个强大的类型系统可以做到这一点。 【参考方案1】:

我可以通过让 AggregateKey 扩展 java.io.Serializable 来解决这个问题

class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends java.io.Serializable

【讨论】:

以上是关于Spark:对象不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

Spark 任务不可序列化

Spark 和不可序列化的 DateTimeFormatter

Spark 应用程序收到“任务不可序列化”的错误?

Spark:DataFrame 上 UDF 的任务不可序列化

Spark Scala:任务不可序列化错误