Spark 中的 NotSerializableException

Posted

技术标签:

【中文标题】Spark 中的 NotSerializableException【英文标题】:NotSerializableException in Spark 【发布时间】:2018-08-27 23:17:53 【问题描述】:

大多数不可序列化的问题在线获取非常基本的数据作为他们的sc.parallelize() 的输入,在地图部分他们遇到不可序列化的问题,但我的是一种类型。我有一个特定的数据类型,它来自第三方库并且不可序列化。 所以写这个显示 NotSerializableException :

val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX)

data.foreachPartition(rows => 
  rows.foreach(row => 
    println("value: " + row.getValue)    
  )  
)

作为解决方案,我在内部创建了相同的模型类(XData),但使其可序列化并执行此操作:

val data: RDD[XData] = (sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]

data.foreachPartition(rows => 
  rows.foreach(row => 
    println("value: " + row.getValue)    
  )  
)

我期待问题得到解决,但我仍然收到与[ERROR] org.apache.spark.util.Utils logError - Exception encountered java.io.NotSerializableException: ThirdPartyLib.models.XData 相同的错误。当我创建内部可序列化类型时,问题不应该重新解决吗?我该如何解决这个问题?

【问题讨论】:

这里的XData 是什么?可以分享一下内容吗? 我不知道我是否可以分享内容。但它只是一个简单的 pojo 类,具有属性和简单的 getter/setter。 ThirdPartyLib.getX 是否返回XData 对象列表? 是的。 ThirdPartyLib.getX 返回XData 的列表。 【参考方案1】:

所以

(sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]

您先并行化,然后再进行转换。所以 spark 仍然需要 ThirdPartyLib.models.XData 可序列化。此外,由于类型不同,该演员阵容可能会爆炸。

我认为这应该可以解决问题

def convertThirdPartyXDataToMyXData( xd: ThirdPartyLib.models.XData): XData = ???

val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX.map(convertThirdPartyXDataToMyXData)) //if you have a map on the collection that getX returns

【讨论】:

我最终要求第三方库所有者将类更改为 Serializable 但这个答案工作得很好。谢谢

以上是关于Spark 中的 NotSerializableException的主要内容,如果未能解决你的问题,请参考以下文章

python [Spark中的简单时间戳聚合] #spark

使用 Spark 中的共享变量

Spark RDD在Spark中的地位和作用如何?

Spark RDD在Spark中的地位和作用如何?

Spark 中的任务是啥? Spark worker 是如何执行 jar 文件的?

Spark改进|Apache Spark 3.0中的SQL性能改进概览