如何在 Pig 中动态获取组内的前 N% 记录

Posted

技术标签:

【中文标题】如何在 Pig 中动态获取组内的前 N% 记录【英文标题】:How to dynamically get Top N percent records within group in Pig 【发布时间】:2013-10-03 21:21:46 【问题描述】:

我有一个问题,我不确定如何在 Pig 中解决。我在 Hadoop 上有一个数据集(大约 400 万条记录),其中包含按产品类别划分的产品标题。每个标题都有编号。它出现在网页上的次数,没有。点击它进入产品详细信息页面的次数。没有。产品类别中的标题可能会有所不同。

样本数据 -

电子游戏|光环 4|5400|25 电子游戏|极限竞速 4 限量珍藏版|6000|10 电子游戏|漫威终极联盟|2000|55 相机和照片|适用于 GoPro HD 的专业斯坦尼康|12000|250 相机和照片|Hero GoPro Motorsports 1080P 宽高清 5MP 头盔相机|10000|125

我想根据第 3 列(网页上的外观)获取每个产品类别中前 N% 的记录。但是,N % 必须根据类别的权重/重要性而有所不同。例如。对于电子游戏,我想获得前 15% 的记录;对于相机和照片,我想获得前 5% 等。有没有办法在 Pig 中嵌套的 FOREACH 代码块内的 LIMIT 子句中动态设置 % 或整数值?

PRODUCT_DATA = LOAD '<PRODUCT FILE PATH>' USING PigStorage('|') AS (categ_name:chararray, product_titl:chararray, impression_cnt:long, click_through_cnt:long);

GRP_PROD_DATA = GROUP PRODUCT_DATA BY categ_name;

TOP_PROD_LIST = FOREACH GRP_PROD_DATA

                  SORTED_TOP_PROD = ORDER PRODUCT_DATA BY impression_cnt DESC;
                  SAMPLED_DATA = LIMIT SORTED_TOP_PROD <CATEGORY % OR INTEGER VALUE>;
                  GENERATE flatten(SAMPLED_DATA);
                

STORE TOP_PROD_TITLE_LIST INTO '&lt;SOME PATH&gt;' USING PigStorage('|');

如何动态(按类别)设置给定组的百分比或整数值?我想过使用宏,但不能从嵌套的 FOREACH 块中调用宏。我可以编写一个将类别名称作为参数的 UDF,并输出 % OR INTEGER 值,并从 LIMIT 操作中调用此 UDF 吗?

SAMPLED_DATA = LIMIT SORTED_TOP_PROD categLimitVal(categ_name);

有什么建议吗?我使用的是 0.10 版的 Pig。

【问题讨论】:

您使用 Hadoop 流式传输并将 LIMIT 行替换为 STREAM SORTED_TOP_PROD THROUGH `awk ...`;。但是,我建议您重新考虑您的用例,因为要能够获得前 % 的记录,您需要先对它们进行计数(或将它们全部保存在 awk 脚本中的内存中),然后根据类别映射获取前 %百分比。这是一种反 MapReduce。前 X 值处理速度很快;前 Y% 不是。 【参考方案1】:

这样的事情可能会奏效。但是,我从来不需要在 Pig 映射中查找变量键,而 this other SO question 没有答案,因此您需要进行一些试验和错误才能使其工作:

--Load your dynamic percentages as a map
A = LOAD 'percentages' AS (categ_name:chararray, perc:float);
PERCENTAGES = FOREACH A GENERATE TOMAP(categ_name, perc);

PRODUCT_DATA = LOAD ...;
GRP_PROD_DATA = GROUP PRODUCT_DATA BY categ_name;

--Count the elements per group; needed to calculate pecentages
C = FOREACH GRP_PROD_DATA generate FLATTEN(group) AS categ_name, COUNT(*) as count;
c_MAP = FOREACH C GENERATE TOMAP(categ_name, count);

TOP_PROD_LIST = FOREACH GRP_PROD_DATA 
    SORTED_TOP_PROD = ORDER PRODUCT_DATA BY impression_cnt DESC;
    SAMPLED_DATA = LIMIT SORTED_TOP_PROD (C_MAP#group * PERCENTAGES#group);
    GENERATE flatten(SAMPLED_DATA);

您也可以尝试使用Pig's TOP 函数而不是ORDER + LIMIT

【讨论】:

感谢您回答我的问题 Cabad。不幸的是,我尝试了上述方法,但没有奏效。除了整数或标量表达式之外,LIMIT 不允许其他任何内容。它不允许引用关系中的另一列。在这种情况下,Limit 和 TOP 都不起作用。不知道我还能尝试什么。 Pig 允许casting relations to scalars,这样做可能会解决您的问题。或者,我在对您的问题的评论中建议的awk 解决方案肯定会起作用。【参考方案2】:

我想我用一种稍微不同的方法解决了这个问题。我不确定它的优化程度,也许有更好的方法来组织/优化脚本。基本上,如果我按照ASC 展示次数的顺序对每个类别中的产品标题进行排名,并在类别的RANK SAMPLE LIMIT 时进行过滤,那么我可以模拟动态采样。 SAMPLE LIMIT 只不过是每个类别的标题的COUNT * 每个类别定义的PERCENT WEIGHT。对于 RANK 元组,我正在利用 LinkedIn 的 DataFu 开源 jar,它提供了一个 ENUMERATE UDF。

再一次,如果有人对改进/更好地组织代码有任何建议,我会全力以赴 :) 感谢您的输入 Cabad,它真的很有帮助!

脚本:

REGISTER '/tmp/udf/datafu-1.0.0.jar';
define Enumerate datafu.pig.bags.Enumerate('1');
set default_parallel 10;

LKP_DATA = LOAD '/tmp/lkp.dat' USING PigStorage('|') AS (categ_name:chararray, perc:float);
PRODUCT_DATA = LOAD '/tmp/meta.dat' USING PigStorage('|') AS (categ_name:chararray, product_titl:chararray, impression_cnt:long, click_through_cnt:long);

GRP_PROD_DATA = GROUP PRODUCT_DATA BY categ_name;

CATEG_COUNT = FOREACH GRP_PROD_DATA generate FLATTEN(group) AS categ_name, COUNT(PRODUCT_DATA) as cnt;

CATEG_JOINED = JOIN CATEG_COUNT BY categ_name, LKP_DATA BY categ_name USING 'replicated';

CATEG_PERCENT = FOREACH CATEG_JOINED GENERATE CATEG_COUNT::categ_name AS categ_name, CATEG_COUNT::cnt AS record_cnt, LKP_DATA::perc AS  percentage;

PRCNT_PROD_DATA = JOIN PRODUCT_DATA BY categ_name, CATEG_PERCENT BY categ_name;

PRCNT_PROD_DATA = FOREACH PRCNT_PROD_DATA GENERATE PRODUCT_DATA::categ_name AS categ_name, PRODUCT_DATA::product_titl AS product_titl, PRODUCT_DATA::impression_cnt AS impression_cnt, PRODUCT_DATA::click_through_cnt AS click_through_cnt,  CATEG_PERCENT::record_cnt*CATEG_PERCENT::percentage AS sample_size;

GRP_PRCNT_PROD_DATA = GROUP PRCNT_PROD_DATA BY categ_name;

ORDRD_PROD_LIST = FOREACH GRP_PRCNT_PROD_DATA 
                             SORTED_TOP_PROD = ORDER PRCNT_PROD_DATA BY impression_cnt DESC;
                             GENERATE flatten(SORTED_TOP_PROD);
                          

GRP_PROD_LIST = GROUP ORDRD_PROD_LIST BY categ_name;

GRP_PRCNT_PROD_DATA = FOREACH GRP_PROD_LIST GENERATE flatten(Enumerate(ORDRD_PROD_LIST)) AS (categ_name, product_titl, impression_cnt, click_through_cnt,  sample_size, rnk);

SAMPLED_DATA = FILTER GRP_PRCNT_PROD_DATA BY rnk <= sample_size;

SAMPLED_DATA = FOREACH SAMPLED_DATA GENERATE categ_name, product_titl, impression_cnt, click_through_cnt, rnk;

DUMP SAMPLED_DATA;

【讨论】:

快速评论:如果切换到 Pig 0.11,可以使用 RANK 运算符代替 Enumerate UDF。

以上是关于如何在 Pig 中动态获取组内的前 N% 记录的主要内容,如果未能解决你的问题,请参考以下文章

获取每组分组结果的前 n 条记录

选择组内的最高记录[重复]

如何在 Keycloak 中搜索组内的用户?

Apache PIG - 如何获取 Flop 10 数据记录?

如何在 SQL 中测试每个组内的列值序列(基于时间戳)?

如果 PIG 中的第二个字段具有不同的值,如何过滤/删除记录