spark 能执行udf 不能执行udaf,啥原因

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 能执行udf 不能执行udaf,啥原因相关的知识,希望对你有一定的参考价值。

参考技术A 科普SparkSpark何使用Spark 1.Spark基于算布式计算(简单) 2.Spark与MapReduce同 3.Spark比Hadoop灵 4.Spark局限 5.情况适合使用Spark 图" class="ikqb_img_alink"> Spark SparkUC Berkeley AMP lab所源类Hadoop MapReduce通用并行计算框架Spark基于map reduce算实现布式计算拥Hadoop MapReduce所具优点;同于MapReduceJob间输结保存内存再需要读写HDFSSpark能更适用于数据挖掘与机器习等需要迭代map reduce算其架构图所示: 图" class="ikqb_img_alink"> Spark与Hadoop比 Spark间数据放内存于迭代运算效率更高 Spark更适合于迭代运算比较MLDM运算Spark面RDD抽象概念 Spark比Hadoop更通用 Spark提供数据集操作类型种像Hadoop提供MapReduce两种操作比map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等种操作类型Spark些操作称Transformations同提供Count, collect, reduce, lookup, save等种actions操作 些种数据集操作类型给给发层应用用户提供便各处理节点间通信模型再像Hadoop唯Data Shuffle种模式用户命名物化控制间结存储、区等说编程模型比Hadoop更灵 由于RDD特性Spark适用种异步细粒度更新状态应用例web服务存储或者增量web爬虫索引于种增量修改应用模型适合 容错性 布式数据集计算通checkpoint实现容错checkpoint两种式checkpoint datalogging the updates用户控制采用哪种式实现容错 用性 Spark通提供丰富Scala, JavaPython API及交互式Shell提高用性 Spark与Hadoop结合 Spark直接HDFS进行数据读写同支持Spark on YARNSpark与MapReduce运行于同集群共享存储资源与计算数据仓库Shark实现借用Hive几乎与Hive完全兼容 Spark适用场景 Spark基于内存迭代计算框架适用于需要操作特定数据集应用场合需要反复操作数越所需读取数据量越受益越数据量计算密集度较场合受益相较(数据库架构否考虑使用Spark重要素) 由于RDD特性Spark适用种异步细粒度更新状态应用例web服务存储或者增量web爬虫索引于种增量修改应用模型适合总说Spark适用面比较广泛且比较通用 运行模式 本模式 Standalone模式 Mesoes模式 yarn模式 Spark态系统 Shark ( Hive on Spark): Shark基本Spark框架基础提供HiveH iveQL命令接口程度保持Hive兼容性Shark使用HiveAPI实现query Parsing Logic Plan generationPhysicalPlan execution阶段用Spark代替Hadoop MapReduce通配置Shark参数Shark自内存缓存特定RDD实现数据重用进加快特定数据集检索同Shark通UDF用户自定义函数实现特定数据析习算使SQL数据查询运算析能结合起化RDD重复使用 Spark streaming: 构建Spark处理Stream数据框架基本原理Stream数据间片断(几秒)类似batch批量处理式处理部数据Spark Streaming构建Spark面Spark低延迟执行引擎(100ms+)用于实计算另面相比基于Record其处理框架(Storm)RDD数据集更容易做高效容错处理外批量处理式使同兼容批量实数据处理逻辑算便些需要历史数据实数据联合析特定应用场合 Bagel: Pregel on Spark用Spark进行图计算非用项目Bagel自带例实现GooglePageRank算 End

UDF/UDAF开发总结

参考文章:

1.UDF,UDAF,UDTF区别

UDF:最简单的自定义,实现一对一,输入一行数据输出一行数据  
UDAF:自定义聚合函数,实现多对一,输入多行数据输出一行数  
UDTF:用来实现一行输入多行输出,这次先不讲 

2.UDF开发

要点:1.UDF类需要继承org.apache.hadoop.hive.ql.exec.UDF.

2.UDF类需要实现evaluate类.

UDF开发实例:

开发一个udf getdate以返回当前系统时间

package udf.test;
import org.apache.hadoop.hive.ql.exec.UDF;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Getdate extends UDF {
    public String evaluate(){
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
}

然后maven打包:mvn clean compile.package
接着把包放到服务器上,比如放到/home/azkaban/UDF/udf-jar.1.1.0
进入hive shell,执行add jar /home/azkaban/UDF/udf-jar.1.1.0
接着执行create tempopary function getdate as ‘udf.test.Getdate‘;
这里的getdate就是function名称。在hive shell中执行select getdate()就会返回当前的系统时间。

待解决:hive中类似于bigint的类型,在udf的evaluate方法中如何返回,改成long?

3.UDAF开发

Hive的UDAF分为两种:

  • Simple。即继承org.apache.hadoop.hive.ql.exec.UDAF类,并在派生类中以静态内部类的方式实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口。这种方式简单直接,但是在使用过程中需要依赖JAVA反射机制,因此性能相对较低。在Hive源码包org.apache.hadoop.hive.contrib.udaf.example中包含几个示例。可以直接参阅。但是这些接口已经被注解为Deprecated,建议不要使用这种方式开发新的UDAF函数。
  • Generic。这是Hive社区推荐的新的写法,以抽象类代替原有的接口。新的抽象类 org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver替代老的UDAF接口,新的抽象类 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator替代老的UDAFEvaluator接口。

UDAF的运行流程简介

其实hive就是对MapReduce的一层包装,所以我们写UDAF的时候可以通过对应到Map Reduce进行理解。

UDAF的四个阶段

  • PARTIAL1:原始数据到部分聚合,调用iterate和terminatePartial --> map阶段
  • PARTIAL2: 部分聚合到部分聚合,调用merge和terminatePartial --> combine阶段
  • FINAL: 部分聚合到完全聚合,调用merge和terminate --> reduce阶段
  • COMPLETE: 从原始数据直接到完全聚合 --> map阶段,并且没有reduce
    除了上面提到的iterate,merge,terminatePartial以外,还有init(初始化并返回,返回值的类型) ,getNewAggregationBuffer(获取新的buffer,也就是方法间传递参数的对象),reset(重置buffer对象)

UDAF需要实现的方法

在四个阶段中,我们可以得知,需要实现7个方法

  • init:这个方法不写会报错:fatal: nullpointexception null
  • getNewAggregationBuffer:我们定义一个对象,在这个方法里面实现该对象以用于参数传递
  • reset:重置buffer对象
  • iterate:类似于map()
  • merge:类似于Reduce()
  • terminatePartial:返回部分聚合数据的持久化对象。因为调用这个方法时,说明已经是map或者combine的结束了,必须将数据持久化以后交给reduce(也就是调用merge)进行处理。
  • terminate:结束,生成最终结果。

对象实例

  • 现要求实现某个字段以","进行提取的函数wm_concat,比如.

table:customers

name gender age
张三 23
李氏 26
王婆 54
尼古拉斯-赵六 43

select wm_concat(name) from customers;
返回的是 "张三,李氏,王婆,尼古拉斯-赵六"

  • 代码如下:
package com.maihaoche.baiyan.UDF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

public class Wm_concat extends AbstractGenericUDAFResolver{
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new GenericUDAFWmconcatEvaluator();
    }



    public static class GenericUDAFWmconcatEvaluator extends GenericUDAFEvaluator{

        static class stringagg implements AggregationBuffer{
            StringBuffer stringBuffer=new StringBuffer();
            String flag=null;
            boolean empty;
        }

        @Override
        /*
        init方法不写的话会报nullpointexception null 的错误
         */
            public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            if(parameters.length!=1){
                throw new UDFArgumentException("Argument Exception");
            }
            return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
        }

        /*
        获取存放中间结果的对象
         */
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                stringagg sa=new stringagg();
                String str=null;
                return sa;
        }
        public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
                stringagg sa=(stringagg)aggregationBuffer;
                sa.empty=true;
                sa.stringBuffer.delete(0,sa.stringBuffer.length());
        }

        public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
            if(objects.length !=1 ){
                throw new UDFArgumentException("Argument Exception");
            }
            this.merge(aggregationBuffer,objects[0]);
        }

        public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
            return this.terminate(aggregationBuffer);
        }

        public void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {
                stringagg sa=(stringagg)aggregationBuffer;
                if(o!=null){
                    sa.stringBuffer.append(o.toString());
                    sa.empty=false;
                }
        }

        public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
            stringagg sa=(stringagg)aggregationBuffer;
            if(sa.empty==true) return null;
            int length=sa.stringBuffer.toString().length();
            return new Text(sa.stringBuffer.toString().substring(0,length-1));//通过substring解决最后一个字段跟着的分隔符
        }
    }
}

很明显,我们可以看出来,AbstractGenericUDAFResolver就是一层皮,我们可以在里面加一写验证条件,比如:
检测下面就进行检测是否有2个参数以及判断数据类型

 public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) throws SemanticException {
    if (parameters.length != 2) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Please specify exactly two arguments.");
    }

    // validate the first parameter, which is the expression to compute over
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
    case TIMESTAMP:
    case DECIMAL:
      break;
    case STRING:
    case BOOLEAN:
    case DATE:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

待解决:如何写希望输入的是两个参数的,比如现在希望自己指定wm_concat的分割符。









以上是关于spark 能执行udf 不能执行udaf,啥原因的主要内容,如果未能解决你的问题,请参考以下文章

spark 能执行udf 不能执行udaf,啥原因

spark的udf和udaf的注册

spark编写UDF和UDAF

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用

Spark 自定义函数(udf,udaf)

spark-sql自定义函数UDF和UDAF