HIVE UDAF开发上手,你一看就懂!
Posted jeason1991
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE UDAF开发上手,你一看就懂!相关的知识,希望对你有一定的参考价值。
单机跑一个脚本做数据处理,但是由于输入数据实在太大,处理过程中占用大量内存经常被系统杀死,所以考虑放在hive中做数据聚合。借此机会研究下UDAF怎么写,把踏坑的经验写出来,希望可以帮助大家少走弯路!嗯。。。就酱紫。
经常听UDF,那么UDAF是什么鬼? 就是聚合功能的UDF啦~ 比如hive内置的 count、sum、max、min、avg等。 但是内置的函数其实并不能满足我们复杂的统计需求,就需要自己去实现一个方法。
有两种实现方法,一种简单的,一种通用的,简单的方法据说有性能问题,我们就直接看通用的实现方法吧~
实现一个Generic UDAF有两部分:
- resolver
- evaluator
这俩货分别对应以下两个抽象类:
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
resolver 主要用来做参数检查和操作符重载,我们可以根据输入参数的不同选择相应的evaluator
evaluator 则是实现主要逻辑的地方,以静态内部类的形式存在
#!Java public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException // 参数检查 return new GenericUDAFHistogramNumericEvaluator(); /** *这个静态内部类就是写我们自己逻辑的地方,这个类名根据需要改,这个是官方文档写的一个条形图的例子 */ public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator // UDAF 逻辑
这里需要介绍下这个例子的功能:hIve中的histogram_numeric函数,用来做直方图的,比如我们要把年龄分30个桶构建直方图就是SELECT histogram_numeric(age, 30) FROM employees;
下面我们继续看例子
#!Java /** * 这个方法的参数新版的已经发生变化,直接就是TypeInfo [] parameters */ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException TypeInfo [] parameters = info.getParameters(); if (parameters.length != 2) throw new UDFArgumentTypeException(parameters.length - 1, "Please specify exactly two arguments."); // 检查第一个参数类型,如果不是原始类型(基本类型)抛异常 if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) case BYTE: case SHORT: case INT: case LONG: case FLOAT: case DOUBLE: break; case STRING: case BOOLEAN: default: throw new UDFArgumentTypeException(0, "Only numeric type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); // 检查第二个参数类型,条形图桶编号,假设这里要求是整型数 if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but " + parameters[1].getTypeName() + " was passed as parameter 2."); // 如果不是整型,抛异常 if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) throw new UDFArgumentTypeException(1, "Only an integer argument is accepted as parameter 2, but " + parameters[1].getTypeName() + " was passed instead."); //返回对应的处理类 return new GenericUDAFHistogramNumericEvaluator();
然后我们看看evaluator
#!Java public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator // For PARTIAL1 and COMPLETE: ObjectInspectors for original data,这俩货是用来做类型转换的 private PrimitiveObjectInspector inputOI; private PrimitiveObjectInspector nbinsOI; // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) private StandardListObjectInspector loi; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException super.init(m, parameters); // return type goes here @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException // return value goes here @Override public Object terminate(AggregationBuffer agg) throws HiveException // final return value goes here @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException // Aggregation buffer definition and manipulation methods static class StdAgg implements AggregationBuffer ; @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException @Override public void reset(AggregationBuffer agg) throws HiveException
理解这个类我们首先需要了解一些事情,写过Hadoop MapReduce 的同学应该知道,一个MapReduce job 分为 map、combine、reduce三个阶段,map阶段是把函数应用于输入数据的每一条,构建key-value供后续聚合;combine阶段是在mapper端局部进行聚合,聚合后的中间结果传给reduce函数,输入和reduce函数是一致的,被称为mapper端的reduce。了解了这个过程后,我们来看evaluator的几个方法,基本上是对应这几个阶段。
方法 |
作用 |
init |
初始化函数 |
getNewAggregationBuffer |
用来生成一个缓存对象,记录临时聚合结果 |
iterate |
一条一条处理数据,将结果存入缓存 |
terminatePartial |
这个方法意味着map阶段结束,将缓存中的数据持久化存储。这里返回的数据类型仅支持java基本类型、基本类型包装类、数组以及Hadoop的Writables, Lists和Map,不要使用自定义类型 |
merge |
接收terminatePartial返回的结果,合并局部聚合结果 |
terminate |
返回最终结果,可以在这里实现最后的求值,比如计算平均值 |
在hive中,用一个枚举类Mode来表示不同阶段
/** * Mode. *官方的注释写的挺详细了^_^ */ public static enum Mode /** * PARTIAL1: from original data to partial aggregation data: iterate() and * terminatePartial() will be called. */ PARTIAL1, /** * PARTIAL2: from partial aggregation data to partial aggregation data: * merge() and terminatePartial() will be called. */ PARTIAL2, /** * FINAL: from partial aggregation to full aggregation: merge() and * terminate() will be called. */ FINAL, /** * COMPLETE: from original data directly to full aggregation: iterate() and * terminate() will be called. */ COMPLETE ;
嗯。。。 写完后打个jar包出来,创建个临时函数来使用既可以了
add jar hiveUDF.jar; create temporary function test_udf as ‘com.test.xxxx‘;
select test_udf(a,b) from table2 groupy by xxx.
好啦,先写这么多,我写的时候数据类型用的大部分是java的,所以产生了各种类型转换错误,后面打算看看Hadoop的内置类型~ 希望能帮到大家~
以上是关于HIVE UDAF开发上手,你一看就懂!的主要内容,如果未能解决你的问题,请参考以下文章