使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群相关的知识,希望对你有一定的参考价值。

使用Java给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

背景

集群从CDH5.16升级到CDP7.1后,笔者用的阿里云DataPhin中台也升级了版本,之前的UDF不是很好用。某些UDF主要是让肤浅的SQL Boy们看不到Hive表某些机密字段的真实信息,防止出现机密信息泄露。笔者编写UDF函数,实现编码、解码、加密、解密,经测试在USDP集群的Apache Hive中可行。USDP的稳定性比Aliyun的中台貌似还好点。。。

原理

UDF

Hive的exec包中有UDF类,继承后用Java重写具体实现,并将编译好的Jar包放置在Hive路径,加载注册后即可像普通函数那样使用。

UDF简易案例

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

/**
 * @program: HiveUDF
 * @description: 使用base64重新编码string
 * @author: zhiyong
 * @create: 2022-08-04 22:48
 **/
public class base64code1 extends UDF 
    public String evaluate(String input)
        return java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
    

这样即可实现最简易的UDF。

但是很明显,这个方法在3.1.2的Hive已经过时了。按照注释应该继承那些继承了该UDF类的类才能不报过时。

继承了该UDF类的类:

随便点开一个:

package org.apache.hadoop.hive.ql.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

/**
 * UDFAscii.
 *
 */
@Description(name = "ascii",
    value = "_FUNC_(str) - returns the numeric value of the first character"
    + " of str",
    extended = "Returns 0 if str is empty or NULL if str is NULL\\n"
    + "Example:\\n"
    + "  > SELECT _FUNC_('222') FROM src LIMIT 1;"
    + "  50\\n"
    + "  > SELECT _FUNC_(2) FROM src LIMIT 1;\\n" + "  50")
public class UDFAscii extends UDF 
  private final IntWritable result = new IntWritable();

  public IntWritable evaluate(Text s) 
    if (s == null) 
      return null;
    

    if (s.getLength() > 0) 
      result.set(s.getBytes()[0]);
     else 
      result.set(0);
    

    return result;
  

其实也没啥特别的,这些自带的继承了UDF的类只是已经写好了evaluate方法,再次继承只需要重写该方法即可。好多个继承了UDF类的类包种都可以看到重写了evaluate方法,显然该方法很重要。

UDF源码

首先查看该过时方法:

package org.apache.hadoop.hive.ql.exec;

import org.apache.hadoop.hive.ql.udf.UDFType;

/**
 * A User-defined function (UDF) for use with Hive.
 * <p>
 * New UDF classes need to inherit from this UDF class (or from @link
 * org.apache.hadoop.hive.ql.udf.generic.GenericUDF GenericUDF which provides more flexibility at
 * the cost of more complexity).
 * <p>
 * Requirements for all classes extending this UDF are:
 * <ul>
 * <li>Implement one or more methods named @code evaluate which will be called by Hive (the exact
 * way in which Hive resolves the method to call can be configured by setting a custom @link
 * UDFMethodResolver). The following are some examples:
 * <ul>
 * <li>@code public int evaluate();</li>
 * <li>@code public int evaluate(int a);</li>
 * <li>@code public double evaluate(int a, double b);</li>
 * <li>@code public String evaluate(String a, int b, Text c);</li>
 * <li>@code public Text evaluate(String a);</li>
 * <li>@code public String evaluate(List<Integer> a); (Note that Hive Arrays are represented as
 * @link java.util.List Lists in Hive.
 * So an @code ARRAY<int> column would be passed in as a @code List<Integer>.)</li>
 * </ul>
 * </li>
 * <li>@code evaluate should never be a void method. However it can return @code null if
 * needed.
 * <li>Return types as well as method arguments can be either Java primitives or the corresponding
 * @link org.apache.hadoop.io.Writable Writable class.</li>
 * </ul>
 * One instance of this class will be instantiated per JVM and it will not be called concurrently.
 *
 * @see Description
 * @see UDFType
 *
 * @deprecated use @link org.apache.hadoop.hive.ql.udf.generic.GenericUDF
 */
@Deprecated
@UDFType(deterministic = true)
public class UDF 

  /**
   * The resolver to use for method resolution.
   */
  private UDFMethodResolver rslv;

  /**
   * The constructor.
   */
  public UDF() 
    rslv = new DefaultUDFMethodResolver(this.getClass());
  

  /**
   * The constructor with user-provided @link UDFMethodResolver.
   */
  protected UDF(UDFMethodResolver rslv) 
    this.rslv = rslv;
  

  /**
   * Sets the resolver.
   *
   * @param rslv The method resolver to use for method resolution.
   */
  public void setResolver(UDFMethodResolver rslv) 
    this.rslv = rslv;
  

  /**
   * Get the method resolver.
   */
  public UDFMethodResolver getResolver() 
    return rslv;
  

  /**
   * This can be overridden to include JARs required by this UDF.
   *
   * @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredJars()
   *      GenericUDF.getRequiredJars()
   *
   * @return an array of paths to files to include, @code null by default.
   */
  public String[] getRequiredJars() 
    return null;
  

  /**
   * This can be overridden to include files required by this UDF.
   *
   * @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredFiles()
   *      GenericUDF.getRequiredFiles()
   *
   * @return an array of paths to files to include, @code null by default.
   */
  public String[] getRequiredFiles() 
    return null;
  

发现一个使用频繁的类:

package org.apache.hadoop.hive.ql.exec;

import java.lang.reflect.Method;
import java.util.List;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

/**
 * The UDF Method resolver interface. A user can plugin a resolver to their UDF
 * by implementing the functions in this interface. Note that the resolver is
 * stored in the UDF class as an instance variable. We did not use a static
 * variable because many resolvers maintain the class of the enclosing UDF as
 * state and are called from a base class e.g. UDFBaseCompare. This makes it
 * very easy to write UDFs that want to do resolution similar to the comparison
 * operators. Such UDFs just need to extend UDFBaseCompare and do not have to
 * care about the UDFMethodResolver interface. Same is true for UDFs that want
 * to do resolution similar to that done by the numeric operators. Such UDFs
 * simply have to extend UDFBaseNumericOp class. For the default resolution the
 * UDF implementation simply needs to extend the UDF class.
 */
@Deprecated
public interface UDFMethodResolver 

  /**
   * Gets the evaluate method for the UDF given the parameter types.
   * 
   * @param argClasses
   *          The list of the argument types that need to matched with the
   *          evaluate function signature.
   */
  Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException;


显然这是个接口,继续查看具体的实现类:

主要是这3种:

显然正常情况应该是使用:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hive.ql.exec;

import java.lang.reflect.Method;
import java.util.List;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

/**
 * The default UDF Method resolver. This resolver is used for resolving the UDF
 * method that is to be used for evaluation given the list of the argument
 * types. The getEvalMethod goes through all the evaluate methods and returns
 * the one that matches the argument signature or is the closest match. Closest
 * match is defined as the one that requires the least number of arguments to be
 * converted. In case more than one matches are found, the method throws an
 * ambiguous method exception.
 */
public class DefaultUDFMethodResolver implements UDFMethodResolver 

  /**
   * The class of the UDF.
   */
  private final Class<? extends UDF> udfClass;

  /**
   * Constructor. This constructor sets the resolver to be used for comparison
   * operators. See @link UDFMethodResolver
   */
  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) 
    this.udfClass = udfClass;
  

  /**
   * Gets the evaluate method for the UDF given the parameter types.
   * 
   * @param argClasses
   *          The list of the argument types that need to matched with the
   *          evaluate function signature.
   */
  @Override
  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException 
    return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false,
        argClasses);
  

这个工具类的方法:

package org.apache.hadoop.hive.ql.exec;

public final class FunctionRegistry 

      /**
   * This method is shared between UDFRegistry and UDAFRegistry. methodName will
   * be "evaluate" for UDFRegistry, and "aggregate"/"evaluate"/"evaluatePartial"
   * for UDAFRegistry.
   * @throws UDFArgumentException
   */
  public static <T> Method getMethodInternal(Class<? extends T> udfClass,
      String methodName, boolean exact, List<TypeInfo> argumentClasses)
      throws UDFArgumentException 

    List<Method> mlist = new ArrayList<Method>();

    for (Method m : udfClass.getMethods()) 
      if (m.getName().equals(methodName)) 
        mlist.add(m);
      
    

    return getMethodInternal(udfClass, mlist, exact, argumentClasses);
  
    

显然底层是通过org.apache.hadoop.hive.ql.exec包的FunctionRegistry工具类的getMethodInternal这个静态方法,反射加载了所有继承了org.apache.hadoop.hive.ql.exec.UDF类并且方法名称为evaluate的所有方法。所以事实上可以进行类的重载,但是UDF函数的稳定性一直欠佳,笔者不喜欢这么做。到这一步就解释了为何继承UDF类后方法名称必须是evaluate。

而udfClass.getMethods()方法:

package java.lang;

public final class Class<T> implements java.io.Serializable,
                              GenericDeclaration,
                              Type,
                              AnnotatedElement 
                                  
    /**
     * Returns an array containing @code Method objects reflecting all the
     * public methods of the class or interface represented by this @code
     * Class object, including those declared by the class or interface and
     * those inherited from superclasses and superinterfaces.
     *
     * <p> If this @code Class object represents a type that has multiple
     * public methods with the same name and parameter types, but different
     * return types, then the returned array has a @code Method object for
     * each such method.
     *
     * <p> If this @code Class object represents a type with a class
     * initialization method @code <clinit>, then the returned array does
     * <em>not</em> have a corresponding @code Method object.
     *
     * <p> If this @code Class object represents an array type, then the
     * returned array has a @code Method object for each of the public
     * methods inherited by the array type from @code Object. It does not
     * contain a @code Method object for @code clone().
     *
     * <p> If this @code Class object represents an interface then the
     * returned array does not contain any implicitly declared methods from
     * @code Object. Therefore, if no methods are explicitly declared in
     * this interface or any of its superinterfaces then the returned array
     * has length 0. (Note that a @code Class object which represents a class
     * always has public methods, inherited from @code Object.)
     *
     * <p> If this @code Class object represents a primitive type or void,
     * then the returned array has length 0.
     *
     * <p> Static methods declared in superinterfaces of the class or interface
     * represented by this @code Class object are not considered members of
     * the class or interface.
     *
     * <p> The elements in the returned array are not sorted and are not in any
     * particular order.
     *
     * @return the array of @code Method objects representing the
     *         public methods of this class
     * @throws SecurityException
     *         If a security manager, <i>s</i>, is present and
     *         the caller's class loader is not the same as or an
     *         ancestor of the class loader for the current class and
     *         invocation of @link SecurityManager#checkPackageAccess
     *         s.checkPackageAccess() denies access to the package
     *         of this class.
     *
     * @jls 8.2 Class Members
     * @jls 8.4 Method Declarations
     * @since JDK1.1
     */
    @CallerSensitive
    public Method[] getMethods() throws SecurityException 
        checkMemberAccess(Member.PUBLIC, Reflection.getCallerClass(), true);
        return copyMethods(privateGetPublicMethods());
    
                              

这个反射方法从JDK1.0时代就有了。而且反射还会抛异常:

package java.lang;

/**
 * Thrown by the security manager to indicate a security violation.
 *
 * @author  unascribed
 * @see     java.lang.SecurityManager
 * @since   JDK1.0
 */
public class SecurityException extends RuntimeException 
    

底层会抛出运行时异常。

GenericUDF初探

既然直接继承UDF类是过时的做法,那么根据注释可以继承GenericUDF类包,虽然不推荐使用,但目前流行使用这种复杂的类包。

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

/**
 * @program: HiveUDF
 * @description: 测试UDF
 * @author: zhiyong
 * @create: 2022-08-05 00:10
 **/
public class base64code2 extends GenericUDF 
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException 
        return null;
    

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException 
        return null;
    

    @Override
    public String getDisplayString(String[] children) 
        return null;
    

显然继承了GenericUDF就必须重写3个方法,并且导入4个类。根据类名称,initialize应该是初始化,evaluate是具体实现的算法,getDisplayString应该是类似打印日志之类的操作。

org.apache.hadoop.hive.serde2.objectinspector包的ObjectInspector类显然有必要看看。

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.hive.serde2.objectinspector;

import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
import org.apache.hadoop.hive.common.classification.InterfaceStability.Stable;

@Public
@Stable
public interface ObjectInspector extends Cloneable 
    String getTypeName();

    ObjectInspector.Category getCategory();

    public static enum Category 
        PRIMITIVE,
        LIST,
        MAP,
        STRUCT,
        UNION;

        private Category() 
        
    

显然这是个接口,里边还定义了一个枚举体Category。

事实上,该包还有很多类:

名称差不多的类:

例如:ObjectInspectorConverters内部是转换类型的方法。ObjectInspectorFactory类与ObjectInspectorUtils类是工具类,内部有很多静态方法。

GenericUDF源码

package org.apache.hadoop.hive.ql.udf.generic;

/**
 * A Generic User-defined function (GenericUDF) for the use with Hive.
 *
 * New GenericUDF classes need to inherit from this GenericUDF class.
 *
 * The GenericUDF are superior to normal UDFs in the following ways: 1. It can
 * accept arguments of complex types, and return complex types. 2. It can accept
 * variable length of arguments. 3. It can accept an infinite number of function
 * signature - for example, it's easy to write a GenericUDF that accepts
 * array<int>, array<array<int>> and so on (arbitrary levels of nesting). 4. It
 * can do short-circuit evaluations using DeferedObject.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
@UDFType(deterministic = true)
public abstract class GenericUDF implements Closeable 

显然这是个抽象类,虽然是共有的但是并不能直接使用。

这个抽象类有很多继承类:

至于为神马找到这2个类,其实笔者是根据Hive的Error Log堆栈找到的。。。。。。

这2个同名类的区别是:

Hive版本不同,CDH5.16都要淘汰了,2.3.7的老版本已经没有继续看的必要。

package org.apache.hadoop.hive.ql.udf.generic;

/**
 * GenericUDFBridge encapsulates UDF to provide the same interface as
 * GenericUDF.
 *
 * Note that GenericUDFBridge implements Serializable because the name of the
 * UDF class needs to be serialized with the plan.
 *
 */
public class GenericUDFBridge extends GenericUDF implements Serializable 
  private static final long serialVersionUID = 4994861742809511113L;

  /**
   * The name of the UDF.
   */
  private String udfName;

  /**
   * Whether the UDF is an operator or not. This controls how the display string
   * is generated.
   */
  private boolean isOperator;

  /**
   * The underlying UDF class Name.
   */
  private String udfClassName;

  /**
   * The underlying method of the UDF class.
   */
  private transient Method udfMethod;

  /**
   * Helper to convert the parameters before passing to udfMethod.
   */
  private transient ConversionHelper conversionHelper;
  /**
   * The actual udf object.
   */
  private transient UDF udf;
  /**
   * The non-deferred real arguments for method invocation.
   */
  private transient Object[] realArguments;

  private transient UdfWhitelistChecker udfChecker;

  /**
   * Create a new GenericUDFBridge object.
   *
   * @param udfName
   *          The name of the corresponding udf.
   * @param isOperator true for operators
   * @param udfClassName java class name of UDF
   */
  public GenericUDFBridge(String udfName, boolean isOperator,
      String udfClassName) 
    this.udfName = udfName;
    this.isOperator = isOperator;
    this.udfClassName = udfClassName;
  
 
  // For Java serialization only
  public GenericUDFBridge() 
  

  public void setUdfName(String udfName) 
    this.udfName = udfName;
  

  @Override
  public String getUdfName() 
    return udfName;
  

  public String getUdfClassName() 
    return udfClassName;
  

  public void setUdfClassName(String udfClassName) 
    this.udfClassName = udfClassName;
  

  public boolean isOperator() 
    return isOperator;
  

  public void setOperator(boolean isOperator) 
    this.isOperator = isOperator;
  

  public Class<? extends UDF> getUdfClass() 
    try 
      return getUdfClassInternal();
     catch (ClassNotFoundException e) 
      throw new RuntimeException(e);
    
  

  /** Gets the UDF class and checks it against the whitelist, if any. */
  private Class<? extends UDF> getUdfClassInternal()
      throws ClassNotFoundException 
    @SuppressWarnings("unchecked")
    Class<? extends UDF> clazz = (Class<? extends UDF>) Class.forName(
        udfClassName, true, Utilities.getSessionSpecifiedClassLoader());
    if (udfChecker != null && !udfChecker.isUdfAllowed(clazz)) 
      throw new SecurityException("UDF " + clazz.getCanonicalName() + " is not allowed");
    
    return clazz;
  

  @Override
  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException 

    try 
      udf = (UDF)getUdfClassInternal().newInstance(以上是关于使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群的主要内容,如果未能解决你的问题,请参考以下文章

hive自定义UDF函数,步骤详解

Hive GenericUDF 错误 - RuntimeException typeInfo 不能为空

Hive3.1.2自带的系统函数及UDF的随系统自动注册

Hive3.1.2自带的系统函数及UDF的随系统自动注册

Spark Hive自定义函数使用解析

Hive 如何实现自定义函数 UDF