在 Spark Scala 中使用自定义数据框类时任务不可序列化

Posted

技术标签:

【中文标题】在 Spark Scala 中使用自定义数据框类时任务不可序列化【英文标题】:Task not serializable while using custom dataframe class in Spark Scala 【发布时间】:2016-11-22 00:21:56 【问题描述】:

我在使用 Scala/Spark (1.5) 和 Zeppelin 时遇到了一个奇怪的问题:

如果我运行以下 Scala/Spark 代码,它将正常运行:

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")

rdd.mapa => 
    val aa = testList(0)
    None

但是,在按照建议 here 声明自定义数据框类型之后

//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame

object ExtraDataFrameOperations 
  implicit class DFWithExtraOperations(df : DataFrame) 

    //drop several columns
    def drop(colToDrop:Seq[String]):DataFrame = 
        var df_temp = df
        colToDrop.foreach case (f: String) =>
            df_temp = df_temp.drop(f)//can be improved with Spark 2.0
        
        df_temp
       
  

并使用它,例如如下:

//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"

val delimiter =  ","

val colToIgnore = Seq("c_9", "c_10")

val inputICFfolder = "hdfs:///group/project/TestSpark/"

val df = sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", "true") // Use first line of all files as header
            .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
            .option("delimiter", delimiter)
            .option("charset", "UTF-8")
            .load(inputICFfolder + filename)
            .drop(colToIgnore)//call the customize dataframe

本次运行成功。

现在如果我再次运行以下代码(与上面相同)

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.mapa => 
    val aa = testList(0)
    None

我收到错误消息:

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at 在 32 处并行化 testList: List[String] = List(a, b) org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314) ... 引起:java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$ 序列化堆栈: - 对象不可序列化(类: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$, 价值: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$@6c7e70e) - 字段(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,名称:ExtraDataFrameOperations$module,类型:类 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$) - 对象(类$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC@4c6d0802) - 字段(类: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,名称:$iw,类型:类 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC) ...

我不明白:

为什么在未对数据帧执行任何操作时出现此错误? 为什么“ExtraDataFrameOperations”之前成功使用过却不能序列化??

更新:

尝试

@inline val testList = List[String]("a", "b")

没有帮助。

【问题讨论】:

【参考方案1】:

只需添加“扩展可序列化” 这对我有用

/**
   * A wrapper around ProducerRecord RDD that allows to save RDD to Kafka.
   *
   * KafkaProducer is shared within all threads in one executor.
   * Error handling strategy - remember "last" seen exception and rethrow it to allow task fail.
   */
 implicit class DatasetKafkaSink(ds: Dataset[ProducerRecord[String, GenericRecord]]) extends Serializable 

   class ExceptionRegisteringCallback extends Callback 
     private[this] val lastRegisteredException = new AtomicReference[Option[Exception]](None)

     override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = 
       Option(exception) match 
         case a @ Some(_) => lastRegisteredException.set(a) // (re)-register exception if send failed
         case _ => // do nothing if encountered successful send
       
     

     def rethrowException(): Unit = lastRegisteredException.getAndSet(None).foreach(e => throw e)
   

   /**
     * Save to Kafka reusing KafkaProducer from singleton holder.
     * Returns back control only once all records were actually sent to Kafka, in case of error rethrows "last" seen
     * exception in the same thread to allow Spark task to fail
     */
   def saveToKafka(kafkaProducerConfigs: Map[String, AnyRef]): Unit = 
     ds.foreachPartition  records =>
       val callback = new ExceptionRegisteringCallback
       val producer = KafkaProducerHolder.getInstance(kafkaProducerConfigs)

       records.foreach(record => producer.send(record, callback))

       producer.flush()
       callback.rethrowException()
     
   
 '

【讨论】:

【参考方案2】:

看起来 spark 试图序列化 testList 周围的所有范围。 尝试内联数据@inline val testList = List[String]("a", "b") 或使用不同的对象来存储传递给驱动程序的函数/数据。

【讨论】:

不幸的是 @inline 没有帮助 并且将函数/数据存储在其他对象中并不真正适合自定义数据框对象的策略

以上是关于在 Spark Scala 中使用自定义数据框类时任务不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Scala 在 Apache Spark 中执行用户定义的函数

eclipse编写scala应用运行在spark集群上

spark自定义UDF为啥参数最多21个

自定义spark GraphX中的collectNeighborIds方法

如何在Spark Scala中以CSV格式编写不同的布局

关于在 Spark Scala 中创建用户定义函数 (UDF)