Hive中自定义UDAF函数生产小案例

Posted 若泽大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive中自定义UDAF函数生产小案例相关的知识,希望对你有一定的参考价值。

一、UDAF 回顾

  1. 定义:UDAF(User Defined Aggregation Funcation ) 用户自定义聚类方法,和group by联合使用,接受多个输入数据行,并产生一个输出数据行。

  2. Hive有两种UDAF:简单和通用 
    简单:利用抽象类UDAF和UDAFEvaluator,使用Java反射导致性能损失,且有些特性不能使用,如可变长度参数列表 。
    通用:
    利用接口GenericUDAFResolver2(或抽象类AbstractGenericUDAFResolver)和抽象类GenericUDAFEvaluator,可以使用所有功能,但比较复杂,不直观。

  3. 一个计算函数必须实现的5个方法的具体含义如下:

    init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
    iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输 入值合法或者正确计算了,则就返回true。
    terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
    merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
    terminate():Hive最终聚集
    结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。


二、需求

       使用UDAF简单方式实现统计区域产品用户访问排名


、自定义UDAF函数代码实现

  1. package hive.org.ruozedata;

  2. import java.util.*;

  3. import org.apache.hadoop.hive.ql.exec.UDAF;

  4. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

  5. import org.apache.log4j.Logger;

  6. public class UserClickUDAF extends UDAF {

  7.     // 日志对象初始化

  8.     public static Logger logger = Logger.getLogger(UserClickUDAF.class);

  9.     // 静态类实现UDAFEvaluator

  10.     public static class Evaluator implements UDAFEvaluator {

  11.         // 设置成员变量,存储每个统计范围内的总记录数

  12.         private static Map<String, String> courseScoreMap;

  13.         private static Map<String, String> city_info;

  14.         private static Map<String, String> product_info;

  15.         private static Map<String, String> user_click;

  16.         //初始化函数,map和reduce均会执行该函数,起到初始化所需要的变量的作用

  17.         public Evaluator() {

  18.             init();

  19.         }

  20.         // 初始化函数间传递的中间变量

  21.         public void init() {

  22.             courseScoreMap = new HashMap<String, String>();

  23.             city_info = new HashMap<String, String>();

  24.             product_info = new HashMap<String, String>();

  25.        }


  26.         //map阶段,返回值为boolean类型,当为true则程序继续执行,当为false则程序退出

  27.         public boolean iterate(String pcid, String pcname, String pccount) {

  28.             if (pcid == null || pcname == null || pccount == null) {

  29.                 return true;

  30.             }

  31.             if (pccount.equals("-1")) {

  32.                 // 城市表

  33.                 city_info.put(pcid, pcname);

  34.             }

  35.             else if (pccount.equals("-2")) {

  36.                 // 产品表

  37.                 product_info.put(pcid, pcname);

  38.             }

  39.             else {

  40.                 // 处理用户点击关联

  41.                 unionCity_Prod_UserClic1(pcid, pcname, pccount);

  42.            }

  43.             return true;

  44.         }


  45.         // 处理用户点击关联

  46.         private void unionCity_Prod_UserClic1(String pcid, String pcname, String pccount) {

  47.             if (product_info.containsKey(pcid)) {

  48.                 if (city_info.containsKey(pcname)) {

  49.                     String city_name = city_info.get(pcname);

  50.                     String prod_name = product_info.get(pcid);

  51.                     String cp_name = city_name + prod_name;

  52.                     // 如果之前已经Put过Key值为区域信息,则把记录相加处理

  53.                     if (courseScoreMap.containsKey(cp_name)) {

  54.                         int pcrn = 0;

  55.                         String strTemp = courseScoreMap.get(cp_name);

  56.                         String courseScoreMap_pn 

  57.                          = strTemp.substring(strTemp.lastIndexOf("\t".toString())).trim();

  58.                         pcrn = Integer.parseInt(pccount) + Integer.parseInt(courseScoreMap_pn);

  59.                         courseScoreMap.put(cp_name, city_name + "\t" + prod_name + "\t"+ Integer.toString(pcrn));

  60.                     }

  61.                     else {

  62.                         courseScoreMap.put(cp_name, city_name + "\t" + prod_name + "\t"+ pccount);

  63.                     }

  64.                 }

  65.             }

  66.         }


  67.         /**

  68.          * 类似于combiner,在map范围内做部分聚合,将结果传给merge函数中的形参mapOutput

  69.          * 如果需要聚合,则对iterator返回的结果处理,否则直接返回iterator的结果即可

  70.          */

  71.         public Map<String, String> terminatePartial() {

  72.             return courseScoreMap;

  73.         }


  74.         // reduce 阶段,用于逐个迭代处理map当中每个不同key对应的 terminatePartial的结果

  75.         public boolean merge(Map<String, String> mapOutput) {

  76.             this.courseScoreMap.putAll(mapOutput);

  77.             return true;

  78.         }

  79.         // 处理merge计算完成后的结果,即对merge完成后的结果做最后的业务处理

  80.         public String terminate() {

  81.             return courseScoreMap.toString();

  82.         }

  83.     }

  84. }



三、创建hive中的临时函数

  1. DROP TEMPORARY FUNCTION user_click;

  2. add jar /data/hive_udf-1.0.jar;

  3. CREATE TEMPORARY FUNCTION user_click AS 'hive.org.ruozedata.UserClickUDAF';


四、调用自定义UDAF函数处理数据

  1. insert overwrite directory '/works/tmp1' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

  2. select regexp_replace(substring(rs, instr(rs, '=')+1), '}', '') from (

  3.   select explode(split(user_click(pcid, pcname, type),',')) as rs from (

  4.     select * from (

  5.       select '-2' as type, product_id as pcid, product_name as pcname from product_info

  6.       union all

  7.       select '-1' as type, city_id as pcid,area as pcname from city_info

  8.       union all

  9.       select count(1) as type,

  10.              product_id as pcid,

  11.              city_id as pcname

  12.         from user_click

  13.        where action_time='2016-05-05'

  14.       group by product_id,city_id

  15.     ) a

  16.   order by type) b

  17. ) c ;


五、创建Hive临时外部表

  1. create external table tmp1(

  2. city_name string,

  3. product_name string,

  4. rn string

  5. )

  6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

  7. location '/works/tmp1';



六、统计最终区域前3产品排名

  1. select * from (

  2. select city_name,

  3.        product_name,

  4.        floor(sum(rn)) visit_num,

  5.        row_number()over(partition by city_name order by sum(rn) desc) rn,

  6.        '2016-05-05' action_time

  7.   from tmp1 

  8.  group by city_name,product_name

  9. ) a where rn <=3 ;


七、最终结果



打个小小的广告哟Hive中自定义UDAF函数生产小案例Hive中自定义UDAF函数生产小案例Hive中自定义UDAF函数生产小案例

1.若泽数据 官网:  

www.ruozedata.com     

微信不支持链接跳转,单击下方[阅读全文]

2.面试题/博客汇总: 

 https://github.com/ruozedata/BigData

 微信不支持内链接跳转,浏览器拼写一下


每周3篇大数据相关原创文章,联系客服领取,

若泽2017+2018年所有腾讯课堂公开课视频,尚未外泄,独此1家

Hive中自定义UDAF函数生产小案例

4.


5.若泽大数据--星星: ruoze_star ,加我邀请进群


单击下方【阅读全文】,进入官网!



以上是关于Hive中自定义UDAF函数生产小案例的主要内容,如果未能解决你的问题,请参考以下文章

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDAF UDTF

(五)Hive的UDF、UDAF和UDTF自定义函数

学习笔记Hive—— 自定义函数

hive-UDF/UDTF/UDAF