Spark 应用程序收到“任务不可序列化”的错误?

Posted

技术标签:

【中文标题】Spark 应用程序收到“任务不可序列化”的错误?【英文标题】:Spark application got the error of "Task not serializable"? 【发布时间】:2017-10-22 14:49:42 【问题描述】:

以下代码出现“Task not serializable”的错误?

错误

线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2101) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.RDD.map(RDD.scala:369) 在 ConnTest$.main(main.scala:41) 在 ConnTest.main(main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(未知来源) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(未知来源) 在 java.lang.reflect.Method.invoke(未知来源) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:DoWork 序列化栈: - 对象不可序列化(类:DoWork,值:DoWork@655621fd) - 字段(类:ConnTest$$anonfun$2,名称:doWork$1,类型:DoWork 类) - 对象(ConnTest$$anonfun$2 类,) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 20 更多

代码:

object ConnTest extends App 
  override def main(args: scala.Array[String]): Unit = 
    super.main(args)
    val date = args(0)
    val conf = new SparkConf()
    val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val jdbcSqlConn = "jdbc:sqlserver://......;"

    val listJob = new ItemListJob(sqlContext, jdbcSqlConn)
    val list = listJob.run(date).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect() 
    // It returns about 3000 rows

    val doWork = new DoWork(sqlContext, jdbcSqlConn)
    val processed = sc.parallelize(list).map(d => 
      doWork.run(d, date)
    )
  


class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) 
  def run(date: LocalDate) = 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  


class DoWork(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) 
  def run(id: Int, date: LocalDate) = 
    // ...... read the data from database for id, and create a text file
    val data = sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"someFunction('$id', $date)"
    )).load()
    // .... create a text file with content of data
    (id, date) 
  

更新:

我将.map() 调用更改为以下内容,

val processed = sc.parallelize(dealList).toDF.map(d => 
  doWork.run(d(0).asInstanceOf[Int], rc)
)

现在我得到了

的错误 线程“主”java.lang.UnsupportedOperationException 中的异常:找不到 java.time.LocalDate 的编码器 - 字段(类:“java.time.LocalDate”,名称:“_2”) - 根类:“scala.Tuple2” 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 在 scala.collection.immutable.List.foreach(List.scala:381)

【问题讨论】:

【参考方案1】:

问题在于以下关闭:

val processed = sc.parallelize(list).map(d => 
  doWork.run(d, date)
)

map 中的闭包会在 executors 中运行,因此 Spark 需要序列化 ​​doWork 并将其发送给 executors。 DoWork 必须是可序列化的。然而。我看到DoWork 包含scsqlContext,所以你不能只让DoWork 实现Serializable,因为你不能在执行程序中使用它们。

我猜你可能想将数据存储到DoWork 的数据库中。如果有,可以将RDD转为DataFrame,通过jdbc方法保存,如:

sc.parallelize(list).toDF.write.jdbc(...)

由于您没有提供DoWork中的代码,我无法提供更多建议。

【讨论】:

谢谢。函数DoWork.run() 将从数据库中读取数据并生成文本文件。我还没开始写函数体的代码。 刚刚意识到这两个课程我不需要sc: SparkContext。我已经更新了这个问题。但是,我仍然需要 sqlContext 来读取数据库? 数据框对我的案例有帮助吗?还是我必须预先创建 parquet 文件并使用 Spark-SQL 来获取数据并创建文本文件? 您不能在执行程序中使用SQLContextSparkContext。您有两个选择:使用 JDBC API 来写入数据,或者只使用 DataFrame.write.jdbc。此外,我在您的代码中没有得到一件事。为什么需要将DataFrame转为RDD? 我将 DataFrame 转换为 RDD 的原因是为了获取一个 int 列表,以便可以将其传递给sc.parallelize()。也许有更好的方法来做到这一点?我想我将不得不使用 JDBC(我只需要从数据库中读取数据,然后将其写入文本文件)?

以上是关于Spark 应用程序收到“任务不可序列化”的错误?的主要内容,如果未能解决你的问题,请参考以下文章

任务不可序列化错误:Spark

Spark Scala:任务不可序列化错误

Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

Spark 任务不可序列化

Spark:DataFrame 上 UDF 的任务不可序列化

org.apache.spark.SparkException:任务不可序列化,wh