spark hiveUDF 不要定义static成员变量

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark hiveUDF 不要定义static成员变量相关的知识,希望对你有一定的参考价值。

背景

最近在帮同事排查hive UDF的时候,发现了在udf中定义了静态成员变量引发的NullPointerException,具体报错如下:

java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at org.apache.spark.sql.hive.HiveGenericUDF.eval(hiveUDFs.scala:181)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_6$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)

代码很简单:

public class GenericUDF extends GenericUDF 
...
private transient static final List<String> list = new ArrayList();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException 
        super.init(stdFactory);
        list.add("1");
        list.add("2");
        ...
    
    ...
    for (String i : list) 
            if (s.contains(i)) 
                result = i;
            
        
    

分析

其实从正常的角度来说,把共有的成员变量定义为静态的是合理的,这样能够减少GC的频率,但是这次为什么不行了呢?
这个还得从spark 对hiveUDF的封装类HiveGenericUDF说起,如下:

    @transient
    lazy val function = funcWrapper.createFunction[GenericUDF]()

    @transient
    private lazy val returnInspector = 
      function.initializeAndFoldConstants(argumentInspectors.toArray)
    
    @transient
    private lazy val unwrapper = unwrapperFor(returnInspector)

    override def eval(input: InternalRow): Any = 
    returnInspector // Make sure initialized.

    var i = 0
    val length = children.length
    while (i < length) 
      val idx = i
      deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
        .set(() => children(idx).eval(input))
      i += 1
    
    unwrapper(function.evaluate(deferredObjects))
  
  

挑选了两个重点的方法,其中funcWrapper.createFunction的实现很简单,就是通过反射new出一个对象,这没什么说的,
但是对于initializeAndFoldConstants第二个方法,

public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments)
      throws UDFArgumentException 

    ObjectInspector oi = initialize(arguments);
    ...

这里会调用用户自己实现的GenericUDF的 initialize,而initializeAndFoldConstants的初次调用会只有到了真正计算的时候才会触发,
我们知道在spark executor 的task是以线程的模式运行的,所以在executor有多个task并发的情况下,会存在多线程的问题,回到我们的例子上来,
我们在init方法中调用了add方法,而这个变量是整个JVM共享的,也就是说有可能一个task在运行的时候,另一个线程改变了list的内容,这样就会导致不一致的情况。

解决

解决的方法很多:

  • 把静态的成员变量改成非静态的,如:
private transient final List<String> list = new ArrayList();

这样list变量就是对象级别的可见性了,就能避免多线程问题了

以上是关于spark hiveUDF 不要定义static成员变量的主要内容,如果未能解决你的问题,请参考以下文章

Spark - Hive UDF 与 Spark-SQL 一起使用,但不与 DataFrame 一起使用

详解 HiveUDF 函数

PHP的高效编程

大数据必知必会系列——萌新提问怎么定义HiveUDF函数?能否给个示例[新星计划]

hiveUDF的使用

[Spark经验一]Spark RDD计算使用的函数里尽量不要使用全局变量