hive-UDF/UDTF/UDAF

Posted xiaopihaierletian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hive-UDF/UDTF/UDAF相关的知识,希望对你有一定的参考价值。

1.Hive三种自定义函数

1.1 UDF

UDF,即用户定义函数(user-defined function),作用于单行数据,并且产生一个数据行作为输出。Hive中大多数函数都属于这一类,比如数学函数和字符串函数。UDF函数的输入与输出值是1:1关系

1.2 UDTF

UDTF,即用户定义表生成函数(user-defined table-generating function),作用于单行数据,并且产生多个数据行。UDTF函数的输入与输出值是1:n的关系

1.3 UDAF

UDAF,用户定义聚集函数(user-defined aggregate function),作用于多行数据,并且产生一个输出数据行。Hive中像COUNT、MAX、MIN和SUM这样的函数就是聚集函数。UDAF函数的输入与输出值是n:1的关系

2.UDF自定义函数实现

2.1 使用说明

UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容。

2.2 注意事项

编写UDF函数的时候需要注意一下几点:

1)自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。

2)需要实现evaluate函数,evaluate函数支持重载。

3)  UDF必须要有返回类型,可以返回null,但是返回类型不能为void;

4)UDF中常用Text/LongWritable等类型,不推荐使用java类型;

2.3 添加hive依赖

<dependency>

     <groupId>org.apache.hive</groupId>

     <artifactId>hive-exec</artifactId>

     <version>2.3.8</version>

</dependency>

2.4 自定义udf

功能:输入一条用户行为json数据,提取用户uid

package com.bigdata.hive;

import org.apache.hadoop.hive.ql.exec.UDF;

import org.json.JSONObject;

public class ParseJsonUDF  extends UDF

    public String evaluate(String line, String key)

        JSONObject baseJson = new JSONObject(line.trim());

        String result = "";

        if(baseJson.has(key))

            return baseJson.getString(key);

       

        return result;

   

    public static void main(String[] args)

        String line = "\\"userId\\":9527,\\"day\\":\\"2021-06-12\\",\\"begintime\\":1595058161312,\\"endtime\\":1595058469010,\\"data\\":[\\"package\\":\\"com.browser\\",\\"activetime\\":120000]";

        String userId  = new ParseJsonUDF().evaluate(line,"userId");

        System.out.println(userId);

   

2.5 项目打包

通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)

mvn clean package 

2.6 项目包上传至HDFS

通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。

bin/hdfs dfs -put home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar user/hive/jars

2.7 hive中添加项目包

通过bin/hive进入hive控制台,将项目包添加到hive中。

hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;

2.8 创建udf 函数模板

根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。

hive> create temporary function json_udf as 'com.bigdata.hive.ParseJsonUDF';

2.9 创建测试表

CREATE TABLE behavior(line STRING)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\\n'

STORED AS TEXTFILE;

2.10 准备测试数据集

准备测试数据文件behavior.log,每行数据代表用户行为数据。

"userId":9527,"day":"2021-05-12","begintime":1620785058833,"endtime":1620785217511,"data":["package":"com.browser","activetime":120000,"package":"com.qq","activetime":80000]

"userId":3854,"day":"2021-05-12","begintime":1620785290612,"endtime":1595058469010,"data":["package":"com.browser","activetime":60000,"package":"com.qq","activetime":150000]

2.11 数据加载至hive表

将本地文件加载到刚刚创建的behavior表中。

LOAD DATA LOCAL INPATH '/home/hadoop/shell/data/behavior.log' INTO TABLE behavior;

2.12 应用自定义udf函数

hive> select json_udf(line,'userId')  from behavior;

可以通过自定义json_udf函数提取用户行为json数据中的用户uid。

3.UDTF自定义函数实现

3.1 使用说明

自定义UDTF需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF类,重新initialize, process, close三个方法:

1)UDTF首先会调用initialize()方法,此方法返回UDTF的返回行的信息(返回个数,类型)。

2)初始化完成后,会调用process()方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;

3)最后close()方法调用,对需要清理的方法进行清理。

3.2自定义udtf

功能:输入一条用户行为json数据,输出多条用户行为日志数据。

package com.bigdata.hive;

import groovy.json.JsonException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.json.JSONArray;

import org.json.JSONException;

import java.util.ArrayList;

public class ParseJsonUDTF extends GenericUDTF

    **

     * 该方法中,我们将指定输出参数的名称和参数类型

     * @param argOIs

     * @return

     * @throws UDFArgumentException

     */

    @Override

    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException

        ArrayList<String> fieldNames = new ArrayList<String>();

        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        fieldNames.add("package");

        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        fieldNames.add("activetime");

        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

   

    **

     * 输入 1 条记录,输出 n 条记录

     * @param objects

     * @throws HiveException

     */

    @Override

    public void process(Object[] objects) throws HiveException

        获取传入的data

        String data = objects[0].toString();

        如果传进来的数据为空,直接返回过滤掉该数据

        if(StringUtils.isBlank(data))

            return ;

        else

          try

              获取一共有几条操作记录

              JSONArray dataArray = new JSONArray(data);

              if (dataArray == null) return;

              循环遍历每个事件

              for (int i=0;i<dataArray.length();i++)

                  String[]  output = new String[2];

                  try

                      output[0]=dataArray.getJSONObject(i).getString("package");

                      output[1]=dataArray.getJSONObject(i).getString("activetime");

                  catch (JSONException e)

                      continue;

                 

                  将结果返回

                  forward(output);

             

          catch (JSONException e)

              e.printStackTrace();

         

       

   

    **

     * 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出

     * @throws HiveException

     */

    @Override

    public void close() throws HiveException

   

3.3 项目打包

通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)

mvn clean package 

3.4 项目包上传至HDFS

通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。

bin/hdfs dfs -put home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar user/hive/jars

3.5 hive中添加项目包

通过bin/hive进入hive控制台,将项目包添加到hive中。

hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;

3.6 创建udf 函数模板

根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。

hive> create temporary function json_udtf as 'com.bigdata.hive.ParseJsonUDTF';

hive> create temporary function json_udf as 'com.bigdata.hive.ParseJsonUDF';

3.7 创建测试表

CREATE TABLE behavior(line STRING)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\\n'

STORED AS TEXTFILE;

3.8 准备测试数据集

准备测试数据文件behavior.log,每行数据代表用户行为数据。

"userId":9527,"day":"2021-05-12","begintime":1620785058833,"endtime":1620785217511,"data":["package":"com.browser","activetime":120000,"package":"com.qq","activetime":80000]

"userId":3854,"day":"2021-05-12","begintime":1620785290612,"endtime":1595058469010,"data":["package":"com.browser","activetime":60000,"package":"com.qq","activetime":150000]

3.9 数据加载至hive表

将本地文件加载到刚刚创建的behavior表中。

LOAD DATA LOCAL INPATH '/home/hadoop/shell/data/behavior.log' INTO TABLE behavior;

3.10应用自定义udtf函数

hive> select 

json_udf(line,'userId'),

json_udf(line,'day'),

json_udf(line,'begintime'),

json_udf(line,'endtime'),

package,

activetime 

from behavior lateral view json_udtf(json_udf

(line,'data')) tmpdata as package,activetime;

备注:与lateral view一起使用,执行过程相当于单独执行了两次抽取,然后union到一个表里。

可以通过自定义 json_udtf函数解析用户行为json数据,然后输出多条用户行为具体操作日志。

4.UDAF自定义函数实现

4.1 使用说明

1)必须继承

org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)

org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类Evaluator实现UDAFEvaluator接口)

2)Evaluator需要实现以下几个函数

init():初始化

iterate():迭代处理每一行数据

terminatePartial():输出map/reduce的阶段结果

merge():combiner/reduce对数据进行聚合

terminate():返回最终的聚集函数结果

4.2 udaf运行过程详解

UDAF 就是一个多行导成一行的聚合函数,它的过程与MR过程紧密结合

4.3 自定义udaf

功能:统计表中某一列的最大值

package com.bigdata.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

import org.apache.hadoop.io.IntWritable;

public class MyMaxUDAF extends UDAF

    static public class MaxIntEvaluator implements UDAFEvaluator

        private int mMax;

        private boolean mEmpty;

        构造方法

        public MaxIntEvaluator()

            super();

            init();

       

        类似于构造函数,用于UDAF的初始化

        public void init()

            mMax = 0;

            mEmpty = true;

       

        迭代处理每一行数据

        public boolean iterate(IntWritable o)

            if (o != null)

                if (mEmpty)

                    mMax = o.get();

                    mEmpty = false;

                else

                    mMax = Math.max(mMax, o.get());

               

           

            return true;

       

        //输出map/reduce的阶段结果

        public IntWritable terminatePartial()

            return mEmpty ? null : new IntWritable(mMax);

       

        //combiner/reduce对数据进行聚合

        public boolean merge(IntWritable o)

            return iterate(o);

       

       //返回最终的聚集函数结果

        public IntWritable terminate()

            return mEmpty ? null : new IntWritable(mMax);

       

   

4.4 项目打包

通过maven对自定义udf项目打包(比如learninghive-1.0-SNAPSHOT.jar)

mvn clean package 

4.5 项目包上传至HDFS

通过hdfs 命令,将项目打好的包上传至hdfs指定目录下。

bin/hdfs dfs -put /home/hadoop/shell/lib/learninghive-1.0-SNAPSHOT.jar /user/hive/jars

4.6 hive中添加项目包

通过bin/hive进入hive控制台,将项目包添加到hive中。

hive> add jar hdfs://mycluster/user/hive/jars/learninghive-1.0-SNAPSHOT.jar;

4.7 创建udf 函数模板

根据自定义udf类,创建一个自定义函数名称,供后面hive SQL 使用。

create temporary function max_udaf as 'com.bigdata.hive.MyMaxUDAF';

4.8 创建测试表

create  table  if not exists temperature 

(id string comment '气象站id',year string comment '年',temperature int comment '气温') 

comment '天气表' 

ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 

STORED AS  TEXTFILE ;

4.9 准备测试数据集

准备测试数据文件temperature,每行数据代表气象站天气气温数据。

03103,1980,41

03103,1981,98

03103,1982,70

03103,1983,74

03103,1984,77

03103,1985,78

03103,1986,82

03103,1987,75

03103,1988,81

03103,1989,89

4.10 数据加载至hive表

将本地文件加载到刚刚创建的temperature表中。

load data local inpath '/weather/temperature' overwrite into table temperature ;

4.11 应用自定义udaf函数

select max_udaf(temperature) from temperature;

可以通过自定义max_udaf函数统计出tempera-

ture表中气温最大值。

以上是关于hive-UDF/UDTF/UDAF的主要内容,如果未能解决你的问题,请参考以下文章

GlobalMapper精品教程023:Excel数据通过相同字段连接到属性表中(气温降水连接到气象台站)

剖析美国平均气温项目

如何用hadoop统计美国气象局的最高气温

剖析美国平均气温项目,掌握MapReduce编程

怎么样查各市的历史最高气温、最低气温以及年平均气温?

用mapreduce 处理气象数据集