如何在 Apache Spark 中向 Kryo Serializer 注册类?

Posted

技术标签:

【中文标题】如何在 Apache Spark 中向 Kryo Serializer 注册类?【英文标题】:How can I register classes to Kryo Serializer in Apache Spark? 【发布时间】:2016-06-04 19:52:34 【问题描述】:

我正在使用 Spark 1.6.1 和 Python。使用 PySpark 时如何启用 Kryo 序列化?

我在 spark-default.conf 文件中有以下设置:

spark.eventLog.enabled             true
spark.eventLog.dir                 //local_drive/sparkLogs
spark.default.parallelism          8
spark.locality.wait.node           5s
spark.executor.extraJavaOptions    -XX:+UseCompressedOops
spark.serializer                   org.apache.spark.serializer.KryoSerializer
spark.kryo.classesToRegister      Timing, Join, Select, Predicate, Timeliness, Project, Query2, ScanSelect
spark.shuffle.compress             true

还有以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:128)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:273)
at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:258)
at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:174)

Caused by: java.lang.ClassNotFoundException: Timing
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:120)

主类包含(Query2.py):

from Timing import Timing
from Predicate import Predicate
from Join import Join 
from ScanSelect import ScanSelect 
from Select import Select
from Timeliness import Timeliness
from Project import Project

conf = SparkConf().setMaster(master).setAppName(sys.argv[1]).setSparkHome("$SPARK_HOME")
sc = SparkContext(conf=conf)
conf.set("spark.kryo.registrationRequired", "true")
sqlContext = SQLContext(sc)

我知道“Kryo 不会对 PySpark 产生重大影响,因为它只是将数据存储为 byte[] 对象,即使使用 Java 也可以快速序列化。但可能值得尝试设置 spark.serializer 和不要尝试注册任何课程”(Matei Zaharia, 2014)。但是,我需要注册课程。

提前致谢。

【问题讨论】:

好的,那么这些类是在哪里定义的呢?您不会尝试向 Kryo 注册 Python 类,是吗? 我做了,那些Timing、Predicate等,是我的Python类的名字。我明白,通过阅读我必须注册我的课程。我怀疑你的评论我错了,但是我应该在那里写什么呢?我试图理解它并在文档中找到解决方案或过程,但我仍然不明白。 什么都没有。 Kryo 是一个 Java (JVM) 序列化框架,而不是 Python 框架。 可以和PySpark一起使用的是JVM,我就是不知道怎么用。参考:***.com/questions/36278574/… 不,它不能。使用 PySpark 时可以使用 Kryo 序列化 Java 对象,这与序列化 Python 对象不同。 【参考方案1】:

这是不可能的。 Kryo 是一个 Java (JVM) 序列化框架。它不能与 Python 类一起使用。为了序列化 Python 对象,PySpark 使用 Python 序列化工具,包括标准的pickle 模块和coludpickle 的improved version。您可以在Tips for properly using large broadcast variables? 中找到有关 PySpark 序列化的更多信息。

虽然您可以在使用 PySpark 时启用 Kryo 序列化,但这不会影响 Python 对象的序列化方式。它将仅用于 Java 或 Scala 对象的序列化。

【讨论】:

pyspark.ml,mllib.linalg.Vectors java/scala 对象吗? @ΘεόφιλοςΜουρατίδης 不,但可以在某些上下文中透明地映射到 JVM 对象。 所以当我提交我的 pyspark 作业时设置 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 并没有真正使用 Kryo 序列化器?

以上是关于如何在 Apache Spark 中向 Kryo Serializer 注册类?的主要内容,如果未能解决你的问题,请参考以下文章

在Spark中使用Kryo序列化

为什么kryo注册不能在SparkSession中工作?

高性能的序列化与反序列化:kryo的简单使用

Kryo序列化器(快速上手)

Spark-Kryo序列化框架

Spark---序列化(Kryo)