编写scala版hive的自定义函数

Posted md_2014

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写scala版hive的自定义函数相关的知识,希望对你有一定的参考价值。

背景

最近,在数仓使用过程中,遇到一些数据处理过程中需要特殊处理,然而hive内置的函数确无法满足需求,因此需要通过自定义函数来扩充其内置函数,达到便捷处理数据的最终目的。hive支持Java版和Scala版自定义函数,本文要介绍的是编写scala版的自定义函数。

自定义函数类型

类型特点
UDF输入输出是一对一
UDTF输入输出是一对多
UDAF输入输出是多对一

编写UDF

UDF的编写需要从继承hive的特定类开始,目前支持两种方式,方式一编写简单,但仅支持操作基本数据类型,而方法二编写略微复杂,但支持所有数据类型,包括复杂类型,比如List,Map等,本文采用方式二,以输入输出数据类型均为List为例。
方式一:继承org.apache.hadoop.hive.ql.exec.UDF
方式二:继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF

  1. pom文件中引入以下依赖
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>$scala.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
  1. 用scala语言编写UDF(List输入,基本类型输出)
@Description(name = "udf_name", value = "_FUNC_(arg) - description of function.")
class XXX extends GenericUDF 
  @transient private var listOI: ListObjectInspector = _
  @transient private var elementOI: BinaryObjectInspector = _

  //输入数据类型校验,并指定输出数据类型
  @throws(classOf[UDFArgumentException])
  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector = 
    if (arguments.length != 1)
      throw new UDFArgumentLengthException("The operator 'udf_name' accepts 1 arguments, " +
        s"but get $arguments.length")

    val arg01 = arguments(0)

    if (!arg01.isInstanceOf[ListObjectInspector])
      throw new UDFArgumentTypeException(0, s"The argument type of function 'udf_name' should be list, " +
        s"but get $arg01.getTypeName")

    listOI = arg01.asInstanceOf[ListObjectInspector]
    val elementType = listOI.getListElementObjectInspector

    if (!elementType.isInstanceOf[BinaryObjectInspector])
      throw new UDFArgumentTypeException(0, s"The element type of list should be binary,but get $elementType.getTypeName")

    elementOI = elementType.asInstanceOf[BinaryObjectInspector]

    PrimitiveObjectInspectorFactory.writableBinaryObjectInspector
  

  //具体执行逻辑
  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef = 
    val arg0 = arguments(0).get()
    val arg0_length = listOI.getListLength(arg0)

    if (arg0_length == -1) return null

    val wkbs = new util.ArrayList[Array[Byte]](arg0_length)
    var i = 0
    while (i < arg0_length) 
      wkbs.add(i, elementOI.getPrimitiveJavaObject(listOI.getListElement(arg0, i)))
      i += 1
    

    new BytesWritable(todo(wkbs))
  
  
  override def getDisplayString(children: Array[String]): String = 
    getStandardDisplayString("udf_name", children)
  

  1. 用scala语言编写UDF(基本类型输入,List输出)
@Description(name = "udf_name", value = "_FUNC_(arg1, arg2) - description of function.")
class GridInternalGrids extends GenericUDF
  @transient private var stringOI: StringObjectInspector = _
  @transient private var intOI: IntObjectInspector = _

  @throws(classOf[UDFArgumentException])
  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector = 
    if (arguments.length != 2)
      throw new UDFArgumentLengthException(s"The operator 'udf_name' accept 2 arguments, " +
        s"but get $arguments.length")
    val arg01 = arguments(0).asInstanceOf[PrimitiveObjectInspector]
    val arg02 = arguments(1).asInstanceOf[PrimitiveObjectInspector]
    if (arg01.getPrimitiveCategory != PrimitiveCategory.STRING)
      throw new UDFArgumentTypeException(0, s"The 1st arg of function 'udf_name' should be string, " +
        s"but get $arg01.getTypeName")
    if (arg02.getPrimitiveCategory != PrimitiveCategory.INT)
      throw new UDFArgumentTypeException(1, s"The 2nd arg of function 'udf_name' should be int, " +
        s"but get $arg02.getTypeName")
    stringOI = arg01.asInstanceOf[StringObjectInspector]
    intOI = arg02.asInstanceOf[IntObjectInspector]
    ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector)
  

  @throws(classOf[UDFArgumentException])
  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef = 
    val arg01 = stringOI.getPrimitiveJavaObject(arguments(0).get())
    val arg02 = intOI.get(arguments(1).get())

    val result = new util.ArrayList[Text]()
    val temp_array = todo(arg01, arg02)
    var index = 0
    while (index < temp_array.length) 
      result.add(new Text(temp_array(index)))
      index += 1
    
    result
  

  override def getDisplayString(children: Array[String]): String = 
    getStandardDisplayString("udf_name", children)
  

  • UDTF,UDTF待写

自定义函数开发中异常处理

  1. 继承hive的UDF类后,必须用scala的class定义子类,而非object类
  2. 方法内用到split方法时,默认会对结果做隐式转换,这会导致udf运行时,
    触发报错running query: java.lang.NoSuchMethodError: scala.Predef,
    解决方案是import scala.Predef.refArrayOps => _,禁止做隐式转换
  3. UDFArgumentTypeException第一个参数注意从0开始,否则无法正常抛异常
  4. running query: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1 m c V I mcVI mcVIsp (state=,code=0)
    循环遍历数组只能用while

部署

  1. 上传jar包至hdfs
hdfs dfs -put -f /location_path/xx-xxx-xx.jar /hdfs_path/hive
  1. 登录hive客户端beeline
  2. 永久注册方法
create function udf_name as 'com.xxx.hive.xxx.Encode' using jar 'hdfs://xxx/hdfs_path/hive/xx-xxx-xx.jar';
  1. 使用
select id, udf_name(arg01,arg02,arg03) as col_name from db_name.table_name limit 10;

以上是关于编写scala版hive的自定义函数的主要内容,如果未能解决你的问题,请参考以下文章

0011-如何在Hive & Impala中使用UDF

如何在 Hive 中重新加载更新的自定义 UDF 函数?

在scala中调用collect()函数时出现异常

Hive UDF 在 Scala 中处理整数数组

Hive 编程专题之 - 自定义函数 Java 篇

Hive的自定义函数