Flink中使用Kryo序列化器的注意事项
Posted 境悟初
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中使用Kryo序列化器的注意事项相关的知识,希望对你有一定的参考价值。
你以为我要说的是在Flink中使用Kryo序列化吗?不是的,还记得上一篇关于Kryo序列化的问题的文章:Kryo序列化:Class Not Found的可能原因.
里面介绍了因为在Spark环境下由于类加载器原因导致Kryo反序列化时找不到类的问题。
没错,还有续集。这次是在Flink下,也出现了同样的问题。
问题复现
见如下代码,是Flink提交给YARN的主函数类,里面反序列化一个 StreamParam
的参数类。这个类就在提交的jar包里。
(KryoSerializer
是我们自己封装了下Kryo
,里面还是Kryo
实例。)
object MyFlinkDriver
def main(args: Array[String]): Unit =
Assert.paramMiss(args.length > 0, "StreamParam JsonString")
val param = KryoSerializer.deserialize(
EncodeUtil.base64DecodeBytes(args(0))
).asInstanceOf[StreamParam]
val res = param.streamResource
val env = getStreamExecutionEnv(res)
new StreamGraphExecutor(param.streamGraph, param.config, env).execute()
env.execute(res.getTaskId)
在本地单测运行是没问题的,在服务器会发现类找不到。
Caused by: java.lang.ClassNotFoundException: com.jimo.sdk.core.analyze.stream.StreamParam
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_66]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_66]
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[just-cmc.jar:2.2.3-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_66]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_66]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_66]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar!/:?]
at com.jimo.sdk.core.serialize.KryoSerializer.deserialize(KryoSerializer.java:59) ~[?:?]
at com.jimo.executor.stream.MyFlinkDriver$.main(MyFlinkDriver.scala:19) ~[?:?]
同一个地方,同一个报错,应该是同一个原因吧。
设置类加载器
所以我们就设置类加载器吧。
KryoSerializer.setClassLoader(MyFlinkDriver.getClass.getClassLoader)
val param = KryoSerializer.deserialize(
EncodeUtil.base64DecodeBytes(args(0))
).asInstanceOf[StreamParam]
当然,问题解决了,就这么简单。不过事情还没结束。
用的什么类加载器?
用的是Flink的 FlinkUserCodeClassLoader
。Flink的自定义类加载器分为 parent-first
和child-first
,可以通过配置文件配置。
在我们这个场景下,需要配成 parent-first
,不然jar包里的类是Flink的类加载器加载的,而序列化时用的是 AppClassLoader
,会导致反序列化的实例不是同一个,因为是不同的类加载器加载的。
总结
- 将flink的
flink-conf.yaml
的classloader.resolve-order
改为parent-first
,不然类加载器导致反序列化不是同一个类 - 同时要将Kryo的
ClassLoader
设置成Flink的类加载器,否则找不到类
以上是关于Flink中使用Kryo序列化器的注意事项的主要内容,如果未能解决你的问题,请参考以下文章