连接到 Spark 集群时的序列化问题
Posted
技术标签:
【中文标题】连接到 Spark 集群时的序列化问题【英文标题】:Serialization issues when connecting to Spark cluster 【发布时间】:2019-07-01 04:30:54 【问题描述】:我有一个用 Scala 编写的 Spark 应用程序,它可以读写 Parquet 文件。 该应用程序公开了一个 HTTP API,当它接收到请求时,通过一个长期存在的上下文将工作发送到 Spark 集群,该上下文在应用程序的生命周期中持续存在。 然后它将结果返回给 HTTP 客户端。
当我使用本地模式时,这一切都很好,local[*]
作为主模式。
但是,一旦我尝试连接到 Spark 集群,就会遇到序列化问题。
使用 Spark 的默认序列化程序,我得到以下信息:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.FilterExec.otherPreds of type scala.collection.Seq in instance of org.apache.spark.sql.execution.FilterExec
.
如果我启用 Kryo 序列化程序,我会得到 java.lang.IllegalStateException: unread block data
。
在尝试从 Parquet 文件中读取数据时会发生这种情况,但我认为这与 Parquet 文件本身没有任何关系,只是与发送到 Spark 集群的代码的序列化有关。
从大量互联网搜索中,我了解到这可能是由 Spark 版本甚至 Java 版本之间的不兼容造成的。 但是使用的版本是相同的。
该应用程序是用 Scala 2.12.8 编写的,并随 Spark 2.4.3 一起提供。 Spark 集群运行 Spark 2.4.3(使用 Scala 2.12 编译的版本)。 运行 Spark 集群和应用的机器使用的是 openJDK 1.8.0_212。
根据另一个互联网搜索,问题可能是由于spark.master
URL 不匹配。
所以我将spark-defaults.conf
中的spark.master
设置为我在应用程序中用来连接它的相同值。
但是,这并没有解决问题,我现在没有想法了。
【问题讨论】:
你能分享你正在运行的代码吗? 【参考方案1】:我不完全确定基本解释是什么,但我通过将应用程序的 jar 复制到 Spark 的 jars
目录来修复它。然后我仍然遇到一个错误,但另一个错误:关于 Cats/kernel/Eq
类丢失的东西。所以我将cats-kernel
的jar添加到Spark的jars
目录中。
现在一切正常。我在另一个 Stack Overflow 线程中读到的东西可能会解释它:
我认为,每当您使用引用项目方法/类的 lambda 进行任何类型的映射操作时,您都需要将它们作为附加 jar 提供。 Spark 确实序列化了 lambda 本身,但没有将其依赖关系整合在一起。不知道为什么错误消息根本没有信息。
【讨论】:
以上是关于连接到 Spark 集群时的序列化问题的主要内容,如果未能解决你的问题,请参考以下文章
并行使用 scala Spark 重命名 HDFS 文件时的序列化问题