hive-UDF/UDTF/UDAF
Posted xiaopihaierletian
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hive-UDF/UDTF/UDAF相关的知识,希望对你有一定的参考价值。
1.Hive三种自定义函数
1.1 UDF
UDF,即用户定义函数(user-defined function),作用于单行数据,并且产生一个数据行作为输出。Hive中大多数函数都属于这一类,比如数学函数和字符串函数。UDF函数的输入与输出值是1:1关系。
1.2 UDTF
UDTF,即用户定义表生成函数(user-defined table-generating function),作用于单行数据,并且产生多个数据行。UDTF函数的输入与输出值是1:n的关系。
1.3 UDAF
UDAF,用户定义聚集函数(user-defined aggregate function),作用于多行数据,并且产生一个输出数据行。Hive中像COUNT、MAX、MIN和SUM这样的函数就是聚集函数。UDAF函数的输入与输出值是n:1的关系。
2.UDF自定义函数实现
2.1 使用说明
UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容。
2.2 注意事项
编写UDF函数的时候需要注意一下几点:
1)自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。
2)需要实现evaluate函数,evaluate函数支持重载。
3) UDF必须要有返回类型,可以返回null,但是返回类型不能为void;
4)UDF中常用Text/LongWritable等类型,不推荐使用java类型;
2.3 添加hive依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.8</version>
</dependency>
2.4 自定义udf
功能:输入一条用户行为json数据,提取用户uid
package com.bigdata.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONObject;
public class ParseJsonUDF extends UDF
public String evaluate(String line, String key)
JSONObject baseJson = new JSONObject(line.trim());
String result = "";
if(baseJson.has(key))
return baseJson.getString(key);
return result;
public static void main(String[] args)
String line = "\\"userId\\":9527,\\"day\\":\\"2021-06-12\\",\\"begintime\\":1595058161312,\\"endtime\\":1595058469010,\\"data\\":[\\"package\\":\\"com.browser\\",\\"activetime\\":120000]";
String userId = new ParseJsonUDF().evaluate(line,"userId");
System.out.println(userId);
2.5 项目打包
通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)
mvn clean package
2.6 项目包上传至HDFS
通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。
bin/hdfs dfs -put home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar user/hive/jars
2.7 hive中添加项目包
通过bin/hive进入hive控制台,将项目包添加到hive中。
hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;
2.8 创建udf 函数模板
根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。
hive> create temporary function json_udf as 'com.bigdata.hive.ParseJsonUDF';
2.9 创建测试表
CREATE TABLE behavior(line STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\\n'
STORED AS TEXTFILE;
2.10 准备测试数据集
准备测试数据文件behavior.log,每行数据代表用户行为数据。
"userId":9527,"day":"2021-05-12","begintime":1620785058833,"endtime":1620785217511,"data":["package":"com.browser","activetime":120000,"package":"com.qq","activetime":80000]
"userId":3854,"day":"2021-05-12","begintime":1620785290612,"endtime":1595058469010,"data":["package":"com.browser","activetime":60000,"package":"com.qq","activetime":150000]
2.11 数据加载至hive表
将本地文件加载到刚刚创建的behavior表中。
LOAD DATA LOCAL INPATH '/home/hadoop/shell/data/behavior.log' INTO TABLE behavior;
2.12 应用自定义udf函数
hive> select json_udf(line,'userId') from behavior;
可以通过自定义json_udf函数提取用户行为json数据中的用户uid。
3.UDTF自定义函数实现
3.1 使用说明
自定义UDTF需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF类,重新initialize, process, close三个方法:
1)UDTF首先会调用initialize()方法,此方法返回UDTF的返回行的信息(返回个数,类型)。
2)初始化完成后,会调用process()方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;
3)最后close()方法调用,对需要清理的方法进行清理。
3.2自定义udtf
功能:输入一条用户行为json数据,输出多条用户行为日志数据。
package com.bigdata.hive;
import groovy.json.JsonException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class ParseJsonUDTF extends GenericUDTF
**
* 该方法中,我们将指定输出参数的名称和参数类型
* @param argOIs
* @return
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("package");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("activetime");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
**
* 输入 1 条记录,输出 n 条记录
* @param objects
* @throws HiveException
*/
@Override
public void process(Object[] objects) throws HiveException
获取传入的data
String data = objects[0].toString();
如果传进来的数据为空,直接返回过滤掉该数据
if(StringUtils.isBlank(data))
return ;
else
try
获取一共有几条操作记录
JSONArray dataArray = new JSONArray(data);
if (dataArray == null) return;
循环遍历每个事件
for (int i=0;i<dataArray.length();i++)
String[] output = new String[2];
try
output[0]=dataArray.getJSONObject(i).getString("package");
output[1]=dataArray.getJSONObject(i).getString("activetime");
catch (JSONException e)
continue;
将结果返回
forward(output);
catch (JSONException e)
e.printStackTrace();
**
* 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
* @throws HiveException
*/
@Override
public void close() throws HiveException
3.3 项目打包
通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)
mvn clean package
3.4 项目包上传至HDFS
通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。
bin/hdfs dfs -put home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar user/hive/jars
3.5 hive中添加项目包
通过bin/hive进入hive控制台,将项目包添加到hive中。
hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;
3.6 创建udf 函数模板
根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。
hive> create temporary function json_udtf as 'com.bigdata.hive.ParseJsonUDTF';
hive> create temporary function json_udf as 'com.bigdata.hive.ParseJsonUDF';
3.7 创建测试表
CREATE TABLE behavior(line STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\\n'
STORED AS TEXTFILE;
3.8 准备测试数据集
准备测试数据文件behavior.log,每行数据代表用户行为数据。
"userId":9527,"day":"2021-05-12","begintime":1620785058833,"endtime":1620785217511,"data":["package":"com.browser","activetime":120000,"package":"com.qq","activetime":80000]
"userId":3854,"day":"2021-05-12","begintime":1620785290612,"endtime":1595058469010,"data":["package":"com.browser","activetime":60000,"package":"com.qq","activetime":150000]
3.9 数据加载至hive表
将本地文件加载到刚刚创建的behavior表中。
LOAD DATA LOCAL INPATH '/home/hadoop/shell/data/behavior.log' INTO TABLE behavior;
3.10应用自定义udtf函数
hive> select
json_udf(line,'userId'),
json_udf(line,'day'),
json_udf(line,'begintime'),
json_udf(line,'endtime'),
package,
activetime
from behavior lateral view json_udtf(json_udf
(line,'data')) tmpdata as package,activetime;
备注:与lateral view一起使用,执行过程相当于单独执行了两次抽取,然后union到一个表里。
可以通过自定义 json_udtf函数解析用户行为json数据,然后输出多条用户行为具体操作日志。
4.UDAF自定义函数实现
4.1 使用说明
1)必须继承
org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)
org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类Evaluator实现UDAFEvaluator接口)
2)Evaluator需要实现以下几个函数
init():初始化
iterate():迭代处理每一行数据
terminatePartial():输出map/reduce的阶段结果
merge():combiner/reduce对数据进行聚合
terminate():返回最终的聚集函数结果
4.2 udaf运行过程详解
UDAF 就是一个多行导成一行的聚合函数,它的过程与MR过程紧密结合
4.3 自定义udaf
功能:统计表中某一列的最大值
package com.bigdata.hive;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
public class MyMaxUDAF extends UDAF
static public class MaxIntEvaluator implements UDAFEvaluator
private int mMax;
private boolean mEmpty;
构造方法
public MaxIntEvaluator()
super();
init();
类似于构造函数,用于UDAF的初始化
public void init()
mMax = 0;
mEmpty = true;
迭代处理每一行数据
public boolean iterate(IntWritable o)
if (o != null)
if (mEmpty)
mMax = o.get();
mEmpty = false;
else
mMax = Math.max(mMax, o.get());
return true;
//输出map/reduce的阶段结果
public IntWritable terminatePartial()
return mEmpty ? null : new IntWritable(mMax);
//combiner/reduce对数据进行聚合
public boolean merge(IntWritable o)
return iterate(o);
//返回最终的聚集函数结果
public IntWritable terminate()
return mEmpty ? null : new IntWritable(mMax);
4.4 项目打包
通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)
mvn clean package
4.5 项目包上传至HDFS
通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。
bin/hdfs dfs -put /home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar /user/hive/jars
4.6 hive中添加项目包
通过bin/hive进入hive控制台,将项目包添加到hive中。
hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;
4.7 创建udf 函数模板
根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。
create temporary function max_udaf as 'com.bigdata.hive.MyMaxUDAF';
4.8 创建测试表
create table if not exists temperature
(id string comment '气象站id',year string comment '年',temperature int comment '气温')
comment '天气表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE ;
4.9 准备测试数据集
准备测试数据文件temperature,每行数据代表气象站天气气温数据。
03103,1980,41
03103,1981,98
03103,1982,70
03103,1983,74
03103,1984,77
03103,1985,78
03103,1986,82
03103,1987,75
03103,1988,81
03103,1989,89
4.10 数据加载至hive表
将本地文件加载到刚刚创建的temperature表中。
load data local inpath '/weather/temperature' overwrite into table temperature ;
4.11 应用自定义udaf函数
select max_udaf(temperature) from temperature;
可以通过自定义max_udaf函数统计出tempera-
ture表中气温最大值。
以上是关于hive-UDF/UDTF/UDAF的主要内容,如果未能解决你的问题,请参考以下文章