来自 Dataset 的 RDD 导致 Spark 2.x 的序列化错误
Posted
技术标签:
【中文标题】来自 Dataset 的 RDD 导致 Spark 2.x 的序列化错误【英文标题】:RDD from Dataset results in a Serialization Error with Spark 2.x 【发布时间】:2016-10-29 14:32:32 【问题描述】:我有一个使用 Databricks 笔记本从数据集创建的 RDD。
当我尝试从中获取具体值时,它只是失败并显示序列化错误消息。
这是我获取数据的地方(PageCount 是一个 Case 类):
val pcDf = spark.sql("SELECT * FROM pagecounts20160801")
val pcDs = pcDf.as[PageCount]
val pcRdd = pcDs.rdd
然后当我这样做时:
pcRdd.take(10)
我得到以下异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 82.0 (TID 2474) had a not serializable result: org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
即使对数据集的相同尝试有效:
pcDs.take(10)
编辑:
这是完整的堆栈跟踪
Serialization stack:
- object not serializable (class: org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection, value: <function1>)
- field (class: org.apache.spark.sql.execution.datasources.FileFormat$$anon$1, name: appendPartitionColumns, type: class org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
- object (class org.apache.spark.sql.execution.datasources.FileFormat$$anon$1, <function1>)
- field (class: org.apache.spark.sql.execution.datasources.FileScanRDD, name: readFunction, type: interface scala.Function1)
- object (class org.apache.spark.sql.execution.datasources.FileScanRDD, FileScanRDD[1095] at )
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@502bfe49)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@51dc790)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@502bfe49))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1096] at )
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@52ce8951)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@57850f0)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@52ce8951))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1097] at )
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@7e99329a)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@792f3145)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@7e99329a))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1098] at )
- field (class: org.apache.spark.sql.Dataset, name: rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.sql.Dataset, Invalid tree; null:
null)
- field (class: lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw, name: pcDs, type: class org.apache.spark.sql.Dataset)
- object (class lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw, lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw@3482035d)
- field (class: lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw$PageCount, name: $outer, type: class lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw)
- object (class lineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw$PageCount, PageCount(de.b,Spezial:Linkliste/Datei:Playing_card_diamond_9.svg,1,6053))
- element of array (index: 0)
- array (class [Llineb9de310f01c84f49b76c6c6295a1393c121.$read$$iw$$iw$$iw$$iw$PageCount;, size 10)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1452)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1440)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1439)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1439)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1665)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1881)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1894)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1311)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.take(RDD.scala:1285)
at lineb9de310f01c84f49b76c6c6295a1393c137.$read$$iw$$iw$$iw$$iw.<init>(<console>:33)
at lineb9de310f01c84f49b76c6c6295a1393c137.$read$$iw$$iw$$iw.<init>(<console>:40)
at lineb9de310f01c84f49b76c6c6295a1393c137.$read$$iw$$iw.<init>(<console>:42)
at lineb9de310f01c84f49b76c6c6295a1393c137.$read$$iw.<init>(<console>:44)
at lineb9de310f01c84f49b76c6c6295a1393c137.$eval$.$print$lzycompute(<console>:7)
at lineb9de310f01c84f49b76c6c6295a1393c137.$eval$.$print(<console>:6)
【问题讨论】:
你的PageCount
类实现可序列化了吗?
PageCount 是一个案例类吗?
这是@eliasah 建议的案例类,因此默认情况下它是可序列化的。事实上,因为它适用于数据集,我认为不应该有任何序列化问题?还是在序列化方面有区别?
你能多打印几行错误日志吗?
@eliasah 刚刚添加
【参考方案1】:
-
PageCount 类肯定有不可序列化的引用(一些非瞬态不可序列化的成员,或者可能有相同问题的父类型)。无法序列化给定的对象让 Spark 尝试序列化封闭范围,直到越来越多的成员,包括 FileFormat 的成员在路上的某个地方, - Janino 生成的投影(设计上不可序列化)。
这只是错误目标对象 (PageCount) 序列化的副作用。
来自 spark FileFormat.scala 的相关代码(应该用 @transient 标记以真正避免序列化,以防万一“appendPartitionColumns”曾经实现)
// Using lazy val to avoid serialization
private lazy val appendPartitionColumns =
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
尽管在常规场景中,上述这种“非预期”的序列化永远不会发生,直到用户定义的类型序列化成功。
Spark RDD(原始类型!Spark 不知道它的全局模式)序列化涉及物化期间的完整对象序列化(对象数据和对象“模式”,类型)。序列化的默认机制是 Java 序列化器(因此您可以尝试使用 Java 序列化器序列化 PageCount,这可能会揭示这种类型的问题),并且可能会替换为更高效的 Kryo 序列化器(这将但是将对象序列化为blob,这样我们将失去架构,并且将无法应用需要列访问的sql)。这就是 RDD 访问触发序列化问题的原因
Dataframes / Datasets 是强类型的,它们绑定到 Spark 已知的模式。因此,对于 Spark,不需要在节点之间传递对象结构,只传递数据。 这就是实现底层对象类型PageCount的Dataset / Dataframe没有问题的原因。
【讨论】:
以上是关于来自 Dataset 的 RDD 导致 Spark 2.x 的序列化错误的主要内容,如果未能解决你的问题,请参考以下文章
Spark——RDD和DataFrame和DataSet三者间的区别