spark hiveUDF transient的重要性

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark hiveUDF transient的重要性相关的知识,希望对你有一定的参考价值。

背景

最近在写hiveUDF的时候,遇到了一些反序列的问题,具体的报错如下:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 6.0 failed 4 times, most recent failure: Lost task 11.3 in stage 6.0 (TID 105) (dw-csprd-bigdata-athena-dn-096.shizhuang-inc.com executor 3): com.esotericsoftware.kryo.KryoException: Unable to find class: scala.collection.immutable.HashMap$$$Lambda$9/282828951
Serialization trace:
mergef$1 (scala.collection.immutable.HashMap$$anon$1)
defaultMerger (scala.collection.immutable.HashMap$)
_2 (scala.Tuple2)
head (scala.collection.immutable.$colon$colon)
factories (com.fasterxml.jackson.module.scala.deser.UnsortedMapDeserializerModule$$anon$1)
_additionalDeserializers (com.fasterxml.jackson.databind.cfg.DeserializerFactoryConfig)
_factoryConfig (com.fasterxml.jackson.databind.deser.BeanDeserializerFactory)
_factory (com.fasterxml.jackson.databind.deser.DefaultDeserializationContext$Impl)
_deserializationContext (com.fasterxml.jackson.databind.ObjectMapper)
objectMapper (xxx.xxxUDF)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
	at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)

分析

我们的代码类似如下:

class xxxUDF extends GenericUDF 
  @transient
  var argumentOIs: Array[ObjectInspector] = _

  val objectMapper = new ObjectMapper()
  objectMapper.registerModule(DefaultScalaModule)
  objectMapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
  objectMapper.configure(Feature.ALLOW_SINGLE_QUOTES, true)

  val result: Text = new Text()

其中spark的配置是使用kryo序列化,spark.serializer=org.apache.spark.serializer.KryoSerializer

可以看到是objectMapper类在driver端在把UDF传给executor的时候,需要做UDF的序列化,而序列化的时候,就会把objectMapper字段进行序列化,
这样在executor端进行task.run的时候会把 objectMapper反序列化出来,这个时候如果对应的类的成员方法如果没有进行kryo的注册,就会直接报序列化的错误,
而spark目前的默认注册的kryo类在KryoSerializer.scala中,如下:

...
 kryo.register(None.getClass)
 kryo.register(Nil.getClass)
 kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
 kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
...

解决

在objectMapper加上@transient注解,使该对象不被序列化,这样在反序列化的时候,就不会反序列化该对象

以上是关于spark hiveUDF transient的重要性的主要内容,如果未能解决你的问题,请参考以下文章

Spark - Hive UDF 与 Spark-SQL 一起使用,但不与 DataFrame 一起使用

使用或不使用 @transient 序列化惰性 val 时的差异

Spark读取和使用Hive Permanent Function 原理

详解 HiveUDF 函数

大数据技术之_19_Spark学习_02_Spark Core 应用解析小结

hiveUDF的使用