Flink中使用Kryo序列化器的注意事项

Posted 境悟初

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中使用Kryo序列化器的注意事项相关的知识,希望对你有一定的参考价值。

你以为我要说的是在Flink中使用Kryo序列化吗?不是的,还记得上一篇关于Kryo序列化的问题的文章:Kryo序列化:Class Not Found的可能原因.

里面介绍了因为在Spark环境下由于类加载器原因导致Kryo反序列化时找不到类的问题。

没错,还有续集。这次是在Flink下,也出现了同样的问题。

问题复现

见如下代码,是Flink提交给YARN的主函数类,里面反序列化一个 StreamParam的参数类。这个类就在提交的jar包里。
(KryoSerializer是我们自己封装了下Kryo,里面还是Kryo实例。)

object MyFlinkDriver 

  def main(args: Array[String]): Unit = 
    Assert.paramMiss(args.length > 0, "StreamParam JsonString")

    val param = KryoSerializer.deserialize(
      EncodeUtil.base64DecodeBytes(args(0))
    ).asInstanceOf[StreamParam]

    val res = param.streamResource
    val env = getStreamExecutionEnv(res)
    new StreamGraphExecutor(param.streamGraph, param.config, env).execute()

    env.execute(res.getTaskId)
  

在本地单测运行是没问题的,在服务器会发现类找不到。

Caused by: java.lang.ClassNotFoundException: com.jimo.sdk.core.analyze.stream.StreamParam
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_66]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_66]
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[just-cmc.jar:2.2.3-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_66]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_66]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_66]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar!/:?]
at com.jimo.sdk.core.serialize.KryoSerializer.deserialize(KryoSerializer.java:59) ~[?:?]
at com.jimo.executor.stream.MyFlinkDriver$.main(MyFlinkDriver.scala:19) ~[?:?]

同一个地方,同一个报错,应该是同一个原因吧。

设置类加载器

所以我们就设置类加载器吧。

	KryoSerializer.setClassLoader(MyFlinkDriver.getClass.getClassLoader)
	
	val param = KryoSerializer.deserialize(
	  EncodeUtil.base64DecodeBytes(args(0))
	).asInstanceOf[StreamParam]

当然,问题解决了,就这么简单。不过事情还没结束。

用的什么类加载器?

用的是Flink的 FlinkUserCodeClassLoader。Flink的自定义类加载器分为 parent-firstchild-first,可以通过配置文件配置。

在我们这个场景下,需要配成 parent-first,不然jar包里的类是Flink的类加载器加载的,而序列化时用的是 AppClassLoader,会导致反序列化的实例不是同一个,因为是不同的类加载器加载的。

总结

  • 将flink的flink-conf.yamlclassloader.resolve-order改为parent-first,不然类加载器导致反序列化不是同一个类
  • 同时要将Kryo的ClassLoader设置成Flink的类加载器,否则找不到类

以上是关于Flink中使用Kryo序列化器的注意事项的主要内容,如果未能解决你的问题,请参考以下文章

在Spark中使用Kryo序列化

Kryo 使用指南

kryo的速度测试

kryo的速度测试

使用 Kryo 完成 序列化和反序列化,并使用ThreadLocal解决线程不安全问题

高性能的序列化与反序列化:kryo的简单使用