spark-sql自定义函数UDF和UDAF

Posted soft.push("zzq")

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-sql自定义函数UDF和UDAF相关的知识,希望对你有一定的参考价值。

    SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("mysqlTest");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(javaSparkContext);

        List<String> list = new ArrayList<String>();
        list.add("2018-9-9,1");
        list.add("2018-5-9,1124");
        list.add("2018-9-9,1125");
        list.add("2018-5-9,1126");
        list.add("2016-10-9,1127");

        JavaRDD<String> rdd_list = javaSparkContext.parallelize(list, 5);

        JavaRDD<Row> rdd_row_list = rdd_list.map(new Function<String, Row>() {
            @Override
            public Row call(String s) throws Exception {
                return RowFactory.create(s.split(",")[0], Long.parseLong(s.split(",")[1]));//转换成一个row对象
            }
        });

        List<StructField> structFieldList = new ArrayList<StructField>();
        structFieldList.add(DataTypes.createStructField("date", DataTypes.StringType, true));
        structFieldList.add(DataTypes.createStructField("s", DataTypes.LongType, true));
        StructType dyType = DataTypes.createStructType(structFieldList);

        DataFrame df_dyType = sqlContext.createDataFrame(rdd_row_list, dyType);

        df_dyType.registerTempTable("tmp_req");
        df_dyType.show();

        //1,注册一个简单用户自定义函数
        sqlContext.udf().register("zzq123", new UDF1<String, Integer>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataTypes.IntegerType);

        DataFrame df_group = sqlContext.sql("select date,s,zzq123(date) as zzq123 from tmp_req ");//UDF如果没有指定名称,则随机名称
        df_group.show();

        //1,注册一个复杂的用户自定义聚合函数
        sqlContext.udf().register("zzq_agg", new StringLen());//zzq_agg,模拟类似count聚合函数
        DataFrame df_group_agg = sqlContext.sql("select date,zzq_agg(s) as zzq_agg from tmp_req group by date ");//UDAF为聚合情况下使用
        df_group_agg.show();

聚合函数:

public class StringCount extends UserDefinedAggregateFunction {
    @Override
    public StructType inputSchema() {//inputSchema指的是输入的数据类型
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("str", DataTypes.StringType, true));
        return DataTypes.createStructType(fields);
    }

    @Override
    public StructType bufferSchema() {//bufferSchema指的是  中间进行聚合时  所处理的数据类型
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("count", DataTypes.IntegerType, true));
        return DataTypes.createStructType(fields);
    }

    @Override
    public DataType dataType() {//dataType指的是函数返回值的类型
        return DataTypes.IntegerType;
    }

    @Override
    public boolean deterministic() {//一致性检验,如果为true,那么输入不变的情况下计算的结果也是不变的
        return true;
    }

    /**
     * 用输入数据input更新buffer值,类似于combineByKey
     *
     * @param buffer
     * @param input
     */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        buffer.update(0, Integer.valueOf(buffer.getAs(0).toString()) + 1);
    }

    /**
     * 合并两个buffer,将buffer2合并到buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey
     * 这里要注意该方法没有返回值,在实现的时候是把buffer2合并到buffer1中去,你需要实现这个合并细节
     *
     * @param buffer1
     * @param buffer2
     */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        buffer1.update(0, Integer.valueOf(buffer1.getAs(0).toString()) + Integer.valueOf(buffer2.getAs(0).toString()));
    }

    /**
     * 设置聚合中间buffer的初始值,但需要保证这个语义:两个初始buffer调用下面实现的merge方法后也应该为初始buffer
     * 即如果你初始值是1,然后你merge是执行一个相加的动作,两个初始buffer合并之后等于2,
     * 不会等于初始buffer了。这样的初始值就是有问题的,所以初始值也叫"zero value"
     *
     * @param buffer
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, 0);
    }

    /**
     * 计算并返回最终的聚合结果
     *
     * @param buffer
     * @return
     */
    @Override
    public Object evaluate(Row buffer) {
        return buffer.getInt(0);
    }

 

以上是关于spark-sql自定义函数UDF和UDAF的主要内容,如果未能解决你的问题,请参考以下文章

hive自定义函数UDF UDTF UDAF

(五)Hive的UDF、UDAF和UDTF自定义函数

Hive 自定义函数 UDF UDAF UDTF

Hive--10---函数----自定义函数 (UDF-UDAF-UDTF)

详解Spark sql用户自定义函数:UDF与UDAF

Spark 自定义函数(udf,udaf)