编写scala版hive的自定义函数
Posted md_2014
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写scala版hive的自定义函数相关的知识,希望对你有一定的参考价值。
背景
最近,在数仓使用过程中,遇到一些数据处理过程中需要特殊处理,然而hive内置的函数确无法满足需求,因此需要通过自定义函数来扩充其内置函数,达到便捷处理数据的最终目的。hive支持Java版和Scala版自定义函数,本文要介绍的是编写scala版的自定义函数。
自定义函数类型
类型 | 特点 |
---|---|
UDF | 输入输出是一对一 |
UDTF | 输入输出是一对多 |
UDAF | 输入输出是多对一 |
编写UDF
UDF的编写需要从继承hive的特定类开始,目前支持两种方式,方式一编写简单,但仅支持操作基本数据类型,而方法二编写略微复杂,但支持所有数据类型,包括复杂类型,比如List,Map等,本文采用方式二,以输入输出数据类型均为List为例。
方式一:继承org.apache.hadoop.hive.ql.exec.UDF
方式二:继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF
- pom文件中引入以下依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>$scala.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
- 用scala语言编写UDF(List输入,基本类型输出)
@Description(name = "udf_name", value = "_FUNC_(arg) - description of function.")
class XXX extends GenericUDF
@transient private var listOI: ListObjectInspector = _
@transient private var elementOI: BinaryObjectInspector = _
//输入数据类型校验,并指定输出数据类型
@throws(classOf[UDFArgumentException])
override def initialize(arguments: Array[ObjectInspector]): ObjectInspector =
if (arguments.length != 1)
throw new UDFArgumentLengthException("The operator 'udf_name' accepts 1 arguments, " +
s"but get $arguments.length")
val arg01 = arguments(0)
if (!arg01.isInstanceOf[ListObjectInspector])
throw new UDFArgumentTypeException(0, s"The argument type of function 'udf_name' should be list, " +
s"but get $arg01.getTypeName")
listOI = arg01.asInstanceOf[ListObjectInspector]
val elementType = listOI.getListElementObjectInspector
if (!elementType.isInstanceOf[BinaryObjectInspector])
throw new UDFArgumentTypeException(0, s"The element type of list should be binary,but get $elementType.getTypeName")
elementOI = elementType.asInstanceOf[BinaryObjectInspector]
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector
//具体执行逻辑
override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
val arg0 = arguments(0).get()
val arg0_length = listOI.getListLength(arg0)
if (arg0_length == -1) return null
val wkbs = new util.ArrayList[Array[Byte]](arg0_length)
var i = 0
while (i < arg0_length)
wkbs.add(i, elementOI.getPrimitiveJavaObject(listOI.getListElement(arg0, i)))
i += 1
new BytesWritable(todo(wkbs))
override def getDisplayString(children: Array[String]): String =
getStandardDisplayString("udf_name", children)
- 用scala语言编写UDF(基本类型输入,List输出)
@Description(name = "udf_name", value = "_FUNC_(arg1, arg2) - description of function.")
class GridInternalGrids extends GenericUDF
@transient private var stringOI: StringObjectInspector = _
@transient private var intOI: IntObjectInspector = _
@throws(classOf[UDFArgumentException])
override def initialize(arguments: Array[ObjectInspector]): ObjectInspector =
if (arguments.length != 2)
throw new UDFArgumentLengthException(s"The operator 'udf_name' accept 2 arguments, " +
s"but get $arguments.length")
val arg01 = arguments(0).asInstanceOf[PrimitiveObjectInspector]
val arg02 = arguments(1).asInstanceOf[PrimitiveObjectInspector]
if (arg01.getPrimitiveCategory != PrimitiveCategory.STRING)
throw new UDFArgumentTypeException(0, s"The 1st arg of function 'udf_name' should be string, " +
s"but get $arg01.getTypeName")
if (arg02.getPrimitiveCategory != PrimitiveCategory.INT)
throw new UDFArgumentTypeException(1, s"The 2nd arg of function 'udf_name' should be int, " +
s"but get $arg02.getTypeName")
stringOI = arg01.asInstanceOf[StringObjectInspector]
intOI = arg02.asInstanceOf[IntObjectInspector]
ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector)
@throws(classOf[UDFArgumentException])
override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
val arg01 = stringOI.getPrimitiveJavaObject(arguments(0).get())
val arg02 = intOI.get(arguments(1).get())
val result = new util.ArrayList[Text]()
val temp_array = todo(arg01, arg02)
var index = 0
while (index < temp_array.length)
result.add(new Text(temp_array(index)))
index += 1
result
override def getDisplayString(children: Array[String]): String =
getStandardDisplayString("udf_name", children)
- UDTF,UDTF待写
自定义函数开发中异常处理
- 继承hive的UDF类后,必须用scala的class定义子类,而非object类
- 方法内用到split方法时,默认会对结果做隐式转换,这会导致udf运行时,
触发报错running query: java.lang.NoSuchMethodError: scala.Predef,
解决方案是import scala.Predef.refArrayOps => _,禁止做隐式转换 - UDFArgumentTypeException第一个参数注意从0开始,否则无法正常抛异常
- running query: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1
m
c
V
I
mcVI
mcVIsp (state=,code=0)
循环遍历数组只能用while
部署
- 上传jar包至hdfs
hdfs dfs -put -f /location_path/xx-xxx-xx.jar /hdfs_path/hive
- 登录hive客户端beeline
- 永久注册方法
create function udf_name as 'com.xxx.hive.xxx.Encode' using jar 'hdfs://xxx/hdfs_path/hive/xx-xxx-xx.jar';
- 使用
select id, udf_name(arg01,arg02,arg03) as col_name from db_name.table_name limit 10;
以上是关于编写scala版hive的自定义函数的主要内容,如果未能解决你的问题,请参考以下文章