org.apache.spark.SparkException:任务不可序列化(由 org.apache.hadoop.conf.Configuration 引起)

Posted

技术标签:

【中文标题】org.apache.spark.SparkException:任务不可序列化(由 org.apache.hadoop.conf.Configuration 引起)【英文标题】:org.apache.spark.SparkException: Task not serializable (Caused by org.apache.hadoop.conf.Configuration) 【发布时间】:2016-10-31 03:52:39 【问题描述】:

我想将转换后的流写入 Elasticsearch 索引,如下所示:

transformed.foreachRDD(rdd => 
  if (!rdd.isEmpty()) 
    val messages = rdd.map(prepare)
    messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
  
)

val messages = rdd.map(prepare) 行抛出错误(见下文)。我一直在尝试不同的方法来解决这个问题(例如在val conf 旁边添加@transient),但似乎没有任何效果。

6/06/28 19:23:00 错误 JobScheduler:运行作业流作业时出错 1467134580000 ms.0 org.apache.spark.SparkException:任务不 可序列化在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.map(RDD.scala:323) 在 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) 在 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 在 scala.util.Try$.apply(Try.scala:161) 在 org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起: java.io.NotSerializableException:org.apache.hadoop.conf.Configuration 序列化栈: - 对象不可序列化(类:org.apache.hadoop.conf.Configuration,值:配置: 核心-default.xml,核心-site.xml,mapred-default.xml,mapred-site.xml, 纱-default.xml,纱-site.xml) - 字段(类:de.kp.spark.elastic.stream.EsStream,名称:de$kp$spark$elastic$stream$EsStream$$conf,类型:类 org.apache.hadoop.conf.配置) - 对象(de.kp.spark.elastic.stream.EsStream 类,de.kp.spark.elastic.stream.EsStream@6b156e9a) - 字段(类:de.kp.spark.elastic.stream.EsStream$$anonfun$run$1,名称:$outer,类型:类 de.kp.spark.elastic.stream.EsStream) - 对象(de.kp.spark.elastic.stream.EsStream$$anonfun$run$1 类,) - 字段(类:de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2,名称: $外部,类型:类 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - 对象(de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2 类, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 更多线程“主”中的异常 org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.map(RDD.scala:323) 在 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) 在 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 在 scala.util.Try$.apply(Try.scala:161) 在 org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起: java.io.NotSerializableException:org.apache.hadoop.conf.Configuration 序列化栈: - 对象不可序列化(类:org.apache.hadoop.conf.Configuration,值:配置: 核心-default.xml,核心-site.xml,mapred-default.xml,mapred-site.xml, 纱-default.xml,纱-site.xml) - 字段(类:de.kp.spark.elastic.stream.EsStream,名称:de$kp$spark$elastic$stream$EsStream$$conf,类型:类 org.apache.hadoop.conf.配置) - 对象(de.kp.spark.elastic.stream.EsStream 类,de.kp.spark.elastic.stream.EsStream@6b156e9a) - 字段(类:de.kp.spark.elastic.stream.EsStream$$anonfun$run$1,名称:$outer,类型:类 de.kp.spark.elastic.stream.EsStream) - 对象(de.kp.spark.elastic.stream.EsStream$$anonfun$run$1 类,) - 字段(类:de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2,名称: $外部,类型:类 de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - 对象(de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2 类, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 更多

它是否与 Hadoop 的配置有关? (我参考这条消息:class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

更新:

class EsStream(name:String,conf:HConf) extends SparkBase with Serializable 

  /* Elasticsearch configuration */ 
  val ec = getEsConf(conf)               

  /* Kafka configuration */
  val (kc,topics) = getKafkaConf(conf)

  def run() 

    val ssc = createSSCLocal(name,conf)

    /*
     * The KafkaInputDStream returns a Tuple where only the second component
     * holds the respective message; we therefore reduce to a DStream[String]
     */
    val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
    /*
     * Inline transformation of the incoming stream by any function that maps 
     * a DStream[String] onto a DStream[String]
     */
    val transformed = transform(stream)
    /*
     * Write transformed stream to Elasticsearch index
     */
    transformed.foreachRDD(rdd => 
      if (!rdd.isEmpty()) 
        val messages = rdd.map(prepare)
        messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
      
    )

    ssc.start()
    ssc.awaitTermination()    

  

  def transform(stream:DStream[String]) = stream

  private def getEsConf(config:HConf):HConf = 

    val _conf = new HConf()

    _conf.set("es.nodes", conf.get("es.nodes"))
    _conf.set("es.port", conf.get("es.port"))

    _conf.set("es.resource", conf.get("es.resource"))

    _conf

  

  private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = 

    val cfg = Map(
      "group.id" -> conf.get("kafka.group"),

      "zookeeper.connect" -> conf.get("kafka.zklist"),
      "zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")

    )

    val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap   

    (cfg,topics)

  

  private def prepare(message:String):(Object,Object) = 

    val m = JSON.parseFull(message) match 
      case Some(map) => map.asInstanceOf[Map[String,String]]
      case None => Map.empty[String,String]
    

    val kw = NullWritable.get

    val vw = new MapWritable
    for ((k, v) <- m) vw.put(new Text(k), new Text(v))

    (kw, vw)

  


【问题讨论】:

org.apache.hadoop.conf.Configuration 类的对象似乎被全局声明为一个字段。尝试将其本地化。 @amit_kumar:谢谢。我刚刚发布了完整的课程。您能否指出我应该在哪里将 org.apache.hadoop.conf.Configuration 设为本地? 什么是 HConf?这导致了问题。检查它是否可序列化。 @amit_kumar:来自org.apache.hadoop.conf.Configuration =&gt; HBase,其中public class Configuration implements Iterable&lt;Map.Entry&lt;String,String&gt;&gt;, Writable ... 【参考方案1】:

EsStream的类构造函数中去掉conf:HConf,写成class EsStream(name:String)

接下来创建一个带有签名的方法:public def init(conf:HConf):Map(String,String)

在此方法中,您将读取所需的配置并在其中更新ec(kc,topics)

在此之后,您应该调用您的 run 方法。

【讨论】:

以上是关于org.apache.spark.SparkException:任务不可序列化(由 org.apache.hadoop.conf.Configuration 引起)的主要内容,如果未能解决你的问题,请参考以下文章