浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
Posted michaelli916
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)相关的知识,希望对你有一定的参考价值。
前言
大家好,我是明哥。
HIVE 作为大数据生态的数仓解决方案,因为历史的原因在很多行业很多公司都有着广泛的应用。对于比较复杂的业务逻辑,HIVE SQL 往往比较难以表达,此时大家在开发中往往会辅以 HIVE UDF。所以充分理解和掌握 HIVE UDF正确的表写和使用方式,是大数据从业人员必不可少的一项技能。
对于 HIVE UDF 编写使用过程中常见的问题,明哥编写了一个系列 - “浅析 hive udf 的正确编写和使用方式 - 论姿势的重要性“,并陆续发布了三篇博文:
-
“如何在 hive udf 中访问配置数据-方案汇总与对比”
-
“浅析 hive udaf 的正确编写方式- 论姿势的重要性"
-
“浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题”
浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题
以下是该系列的第四篇博文:“浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service”
问题概述
在编写UDF时,有时我们需要直接访问 hive metastore service 以获取某些库表或分区的元数据信息,比如最近遇到某客户有个需求,需要直接访问 metastore service 以获取给定分区表的最大分区。该客户已经实现了基本的代码,且在没有开启 kerberos 认证的 hive 中也通过了测试,但在开启了 kerberos 认证的 hive 中执行该 UDF 时却会报错。该有 BUG 的 UDF 示例代码如下:
该有 BUG 的 UDF 在开启了 kerberos 认证的 hive 中执行时报错如下:
问题原因分析
在本系列文章中,笔者不断提到,“Hive SQL 和 UDF 的解析编译和优化是在 hiveserver2 中进行的,解析编译和优化的结果一般是生成 mr/tez/spark 任务,这些 mr/tez/spark 任务是在向 yarn 申请获得的 container 容器对应的 jvm 中执行的;但并不是所有的 sql 和 udf 都会生成 mr/tez/spark 任务,此时其真正的执行就是直接在 hiveserver2 这个已经存在的 jvm 中执行的,该 hiveserver2 这个 jvm 的生命周期跟 udf 的执行无关,如果涉及到配置环境变量,系统参数,或加载类及执行静态代码块,要尤其小心“.
其实这里原来的代码有问题的原因,就跟上面不断强调的这句话有关: hive 的 udf 是在 hiveserver2 这个 Jvm 进程中编译执行的(有时会生成mr作业有时不会,在我们这个场景下不会生成 mr 作业),而 hiveserver2 这个 jvm 进程已经通过 kerberos 认证了,已经能够正常访问 hive metastore service 了,所以 udf 代码中不需要再次执行 UserGroupInformation.loginUserFromKeytab(principal,keytab)进行 kerberos 认证!事实上,由于再次尝试认证时涉及到配置环境变量和系统参数(如示例代码中 java.security.krb5.conf, HiveConf 等),稍有不慎就会污染已经启动的 hiveserver2 这个 jvm s 实例,而该实例是全局的给多个客户端使用的 hive 服务端,一旦被污染会影响其它客户端的执行,危害比较大,往往需要重启 hiveserver2 实例才能修复。
问题解决方案
知道了问题发生的根本原因,问题解决思路就有了:
-
不要再次执行 kerberos 认证代码 UserGroupInformation.loginUserFromKeytab(principal,keytab);
-
不要重新创建全新的 org.apache.hadoop.hive.conf.HiveConf 实例,而是复用已有的 HiveConf 实例;
-
已有的 HiveConf 实例,可以通过 HiveConf hiveConf = SessionState.get().getConf() 获得;
最后还有必要指出,hive udf 和 udaf 有两种接口,一种是旧的 simple Udf/udaf,一种是新的GenericUDF/GenericUDAF:
-
org.apache.hadoop.hive.ql.exec.UDF/org.apache.hadoop.hive.ql.exec.UDAF
-
org.apache.hadoop.hive.ql.udf.generic.GenericUDF/org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
因为支持更多特性和执行时的性能问题,hive 社区推荐我们使用后者。
示例代码 (已经通过none/ldap/kerberos认证环境下的测试,可以直接使用)
simple Udf 示例代码:
package com.xxx;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.server.HiveServer2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")
//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPt extends UDF {
// this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
public String evaluate(String db, String table) {
try {
// note that you can get the HiveConf from org.apache.hadoop.hive.ql.session.SessionState
HiveConf hiveConf = SessionState.get().getConf();
HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
List<Partition> partitions = hiveMetaStoreClient.listPartitions(db, table, Short.MAX_VALUE);
// there is no need to call UserGroupInformation.loginUserFromKeytab(principal,keytab) to log into kdc,
// as the udf is executed in hiveserver2 and hiveserver2 has already logged into kdc;
// this can be verified by check below outputs
// String principal = "hive/_HOST@CDH.COM";
// String keytab= "hive.keytab";
// UserGroupInformation.loginUserFromKeytab(principal,keytab);
LOG.info("UserGroupInformation.getCurrentUser(): " + UserGroupInformation.getCurrentUser().toString());
LOG.info("UserGroupInformation.getLoginUser(): " + UserGroupInformation.getLoginUser().toString());
LOG.info("principal:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
LOG.info("keytab:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB));
LOG.info("java.security.krb5.conf" + System.getProperty("java.security.krb5.conf"));
LOG.info("java.security.krb5.realm" + System.getProperty("java.security.krb5.realm"));
LOG.info("java.security.krb5.kdc" + System.getProperty("java.security.krb5.kdc"));
LOG.info("HADOOP_USER_NAME" + System.getenv("HADOOP_USER_NAME"));
LOG.info("HADOOP_OPTS" + System.getenv("HADOOP_OPTS"));
// return the max partition value
return String.valueOf(Collections.max(partitions).getValues().get(0));
} catch (Exception e) {
LOG.warn(e.getMessage());
LOG.warn(e.toString());
return e.getMessage();
}
}
}
GenericUDF 示例代码:
package com.xxxx;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")
//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPtNew extends GenericUDF {
// this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
PrimitiveObjectInspector inputDbOI;
PrimitiveObjectInspector inputTableOI;
PrimitiveObjectInspector outputOI;
@Override
public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
// This UDF accepts one argument
assert (args.length == 2);
// The first argument is a primitive type
assert (args[0].getCategory() == ObjectInspector.Category.PRIMITIVE);
assert (args[1].getCategory() == ObjectInspector.Category.PRIMITIVE);
inputDbOI = (PrimitiveObjectInspector) args[0];
inputTableOI = (PrimitiveObjectInspector) args[1];
/* We only support String type */
assert (inputDbOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);
assert (inputTableOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);
/* And we'll return a type string, so let's return the corresponding object inspector */
outputOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
return outputOI;
}
@Override
public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 2) {
return "";
}
// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin1 = args[0].get();
Object oin2 = args[1].get();
if (oin1 == null || oin2 == null) {
return "";
}
String inputDb = (String) inputDbOI.getPrimitiveJavaObject(oin1);
String inputTable = (String) inputTableOI.getPrimitiveJavaObject(oin2);
if (StringUtils.isEmpty(inputDb) || StringUtils.isEmpty(inputTable)) {
return "";
}
HiveConf hiveConf = SessionState.get().getConf();
HiveMetaStoreClient hiveMetaStoreClient = null;
try {
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
List<Partition> partitions = hiveMetaStoreClient.listPartitions(inputDb, inputTable, Short.MAX_VALUE);
return String.valueOf(Collections.max(partitions).getValues().get(0));
} catch (TException e) {
LOG.warn(e.getMessage());
LOG.warn(e.toString());
return e.getMessage();
}
}
@Override
public String getDisplayString(String[] children) {
return "return the max partition for a hive partitioned table";
}
}
来自客户的认可才是最大的认可,以上示例代码交付给客户后,明哥收到了客户的赞赏,允许我小小的得瑟一下,哈哈。
欢迎大家关注,微信同名公众号!
以上是关于浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)的主要内容,如果未能解决你的问题,请参考以下文章
浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题