编写代码创建UDTF函数

Posted 闭关苦炼内功

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写代码创建UDTF函数相关的知识,希望对你有一定的参考价值。

1)创建UDTF函数——编写代码

(1)创建一个maven工程:hivefunction

(2)创建包名:com.atguigu.hive.udtf

(3)引入如下依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qc</groupId>
    <artifactId>gmall-udtf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <!--添加hive依赖-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>

    </dependencies>

</project>

(4)编码

  • 参考写法
package com.atguigu.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

public class ExplodeJSONArray extends GenericUDTF 

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException 

        // 1 参数合法性检查
        if (argOIs.length != 1) 
            throw new UDFArgumentException("explode_json_array 只需要一个参数");
        

        // 2 第一个参数必须为string
        //判断参数是否为基础数据类型
        if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) 
            throw new UDFArgumentException("explode_json_array 只接受基础类型参数");
        

        //将参数对象检查器强转为基础类型对象检查器
        PrimitiveObjectInspector argumentOI = (PrimitiveObjectInspector) argOIs[0];

        //判断参数是否为String类型
        if (argumentOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) 
            throw new UDFArgumentException("explode_json_array 只接受string类型的参数");
        

        // 3 定义返回值名称和类型
        List<String> fieldNames = new ArrayList<String>();
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        fieldNames.add("items");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    

    public void process(Object[] objects) throws HiveException 

        // 1 获取传入的数据
        String jsonArray = objects[0].toString();

        // 2 将string转换为json数组
        JSONArray actions = new JSONArray(jsonArray);

        // 3 循环一次,取出数组中的一个json,并写出
        for (int i = 0; i < actions.length(); i++) 

            String[] result = new String[1];
            result[0] = actions.getString(i);
            forward(result);
        
    

    public void close() throws HiveException 

    



  • 我的写法

ExplodeJSONArray.java

package com.qc.gmall.hive.udtf;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.json.JSONArray;

// UDTF 函数编码
public class ExplodeJSONArray extends GenericUDTF 

    private PrimitiveObjectInspector inputOI;

    @Override
    public void close() throws HiveException 
        // 不需要实现任何逻辑
    

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException 

        if (argOIs.length != 1)
            throw new UDFArgumentException("explode_json_array函数只能接收1个参数");
        

        ObjectInspector argOI = argOIs[0];

        if (argOI.getCategory() != ObjectInspector.Category.PRIMITIVE)
           throw new UDFArgumentException("explode_json_array函数只能接收基本数据类型的参数");
        

        // 强转类型
        PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) argOI;
        inputOI = primitiveOI;

        if (primitiveOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)
            throw new UDFArgumentException("explode_json_array函数只能接收STRING类型的参数");
        

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("item");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                fieldOIs);
    

    @Override
    public void process(Object[] args) throws HiveException 

        Object arg = args[0];
        String jsonArrayStr = PrimitiveObjectInspectorUtils.getString(arg, inputOI);

        // json 解析工具
        JSONArray jsonArray = new JSONArray(jsonArrayStr);

        // for循环遍历
        for (int i = 0; i < jsonArray.length(); i++) 
            String json = jsonArray.getString(i);
            // 即便只有一列,也要用一个字符数组去装
            String[] result = json;
            // 把数组通过 forward 方法输出出去
            forward(result);
        

    



2)创建函数

(1)打包

(2)将hivefunction-1.0-SNAPSHOT.jar上传到hadoop102的/opt/module,然后再将该jar包上传到HDFS的/user/hive/jars路径下

[atguigu@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[atguigu@hadoop102 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars

(3)创建永久函数与开发好的java class关联

create function explode_json_array as 'com.atguigu.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';

(4)注意:如果修改了自定义函数重新生成jar包怎么处理?

只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。


参考官方

DeveloperGuide UDTF

GenericUDTF Interface
A custom UDTF can be created by extending the GenericUDTF abstract class and then implementing the initialize, process, and possibly close methods. The initialize method is called by Hive to notify the UDTF the argument types to expect. The UDTF must then return an object inspector corresponding to the row objects that the UDTF will generate. Once initialize() has been called, Hive will give rows to the UDTF using the process() method. While in process(), the UDTF can produce and forward rows to other operators by calling forward(). Lastly, Hive will call the close() method when all the rows have passed to the UDTF.

以上是关于编写代码创建UDTF函数的主要内容,如果未能解决你的问题,请参考以下文章

Hive 通用 UDTF 因数组索引超出范围错误而失败

编写自己的UDTF

Hive UDTF开发指南

简述UDF/UDAF/UDTF是什么,各自解决问题及应用场景

我们可以在 UDTF 中使用标识符函数,其参数来自 Snowflake 中的 UDTF 参数吗?

hive自定义UDTF函数,步骤讲解