spark hiveUDF 不要定义static成员变量
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark hiveUDF 不要定义static成员变量相关的知识,希望对你有一定的参考价值。
背景
最近在帮同事排查hive UDF的时候,发现了在udf中定义了静态成员变量引发的NullPointerException,具体报错如下:
java.lang.NullPointerException
at java.lang.String.contains(String.java:2133)
at org.apache.spark.sql.hive.HiveGenericUDF.eval(hiveUDFs.scala:181)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_6$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
代码很简单:
public class GenericUDF extends GenericUDF
...
private transient static final List<String> list = new ArrayList();
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException
super.init(stdFactory);
list.add("1");
list.add("2");
...
...
for (String i : list)
if (s.contains(i))
result = i;
分析
其实从正常的角度来说,把共有的成员变量定义为静态的是合理的,这样能够减少GC的频率,但是这次为什么不行了呢?
这个还得从spark 对hiveUDF的封装类HiveGenericUDF说起,如下:
@transient
lazy val function = funcWrapper.createFunction[GenericUDF]()
@transient
private lazy val returnInspector =
function.initializeAndFoldConstants(argumentInspectors.toArray)
@transient
private lazy val unwrapper = unwrapperFor(returnInspector)
override def eval(input: InternalRow): Any =
returnInspector // Make sure initialized.
var i = 0
val length = children.length
while (i < length)
val idx = i
deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
.set(() => children(idx).eval(input))
i += 1
unwrapper(function.evaluate(deferredObjects))
挑选了两个重点的方法,其中funcWrapper.createFunction的实现很简单,就是通过反射new出一个对象,这没什么说的,
但是对于initializeAndFoldConstants第二个方法,
public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments)
throws UDFArgumentException
ObjectInspector oi = initialize(arguments);
...
这里会调用用户自己实现的GenericUDF的 initialize,而initializeAndFoldConstants的初次调用会只有到了真正计算的时候才会触发,
我们知道在spark executor 的task是以线程的模式运行的,所以在executor有多个task并发的情况下,会存在多线程的问题,回到我们的例子上来,
我们在init方法中调用了add方法,而这个变量是整个JVM共享的,也就是说有可能一个task在运行的时候,另一个线程改变了list的内容,这样就会导致不一致的情况。
解决
解决的方法很多:
- 把静态的成员变量改成非静态的,如:
private transient final List<String> list = new ArrayList();
这样list变量就是对象级别的可见性了,就能避免多线程问题了
以上是关于spark hiveUDF 不要定义static成员变量的主要内容,如果未能解决你的问题,请参考以下文章
Spark - Hive UDF 与 Spark-SQL 一起使用,但不与 DataFrame 一起使用