如何优化 PIG latin 中的 group by 语句?
Posted
技术标签:
【中文标题】如何优化 PIG latin 中的 group by 语句?【英文标题】:How to optimize a group by statement in PIG latin? 【发布时间】:2012-05-24 06:45:48 【问题描述】:我有一个倾斜的数据集,我需要按操作进行分组,然后对其进行嵌套 foreach。由于数据倾斜,很少有 reducer 需要很长时间,而其他的则没有时间。我知道存在倾斜连接,但是 group by 和 foreach 有什么用?这是我的猪代码(重命名变量):
foo_grouped = GROUP foo_grouped by FOO;
FOO_stats = FOREACH foo_grouped
a_FOO_total = foo_grouped.ATTR;
a_FOO_total = DISTINCT a_FOO_total;
bar_count = foo_grouped.BAR;
bar_count = DISTINCT bar_count;
a_FOO_type1 = FILTER foo_grouped by COND1=='Y';
a_FOO_type1 = a_FOO_type1.ATTR;
a_FOO_type1 = DISTINCT a_FOO_type1;
a_FOO_type2 = FILTER foo_grouped by COND2=='Y' OR COND3=='HIGH';
a_FOO_type2 = a_FOO_type2.ATTR;
a_FOO_type2 = DISTINCT a_FOO_type2;
generate group as FOO,
COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2) as a_FOO_type2, COUNT(bar_count) as bar_count;
【问题讨论】:
【参考方案1】:在您的示例中,FOREACH 中有许多嵌套的 DISTINCT 运算符,它们在 reducer 中执行,它依赖 RAM 来计算唯一值,这个查询只产生一个作业。如果组中的唯一元素太多,您也可以获得与内存相关的异常。
幸运的是,PIG Latin 是一种数据流语言,您可以编写某种执行计划。为了利用更多的 CPU,您可以更改代码以强制执行更多可以并行执行的 MapReduce 作业。为此,我们应该在不使用嵌套 DISTINCT 的情况下重写查询,诀窍是执行不同的操作,而不是像只有一列一样分组,然后合并结果。它非常类似于 SQL,但它可以工作。这里是:
records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;
grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;
grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;
grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;
mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into '....' USING PigStorage(',');
执行后我得到以下摘要,表明不同的操作没有受到第一个作业处理的数据倾斜的影响:
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY
job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."
Output(s):
Successfully stored 9 records (181 bytes) in: "..."
从 Job DAG 我们可以看到 groupby 操作是并行执行的:
Job DAG:
job_201206061712_0244 -> job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248 -> job_201206061712_0249,
job_201206061712_0246 -> job_201206061712_0249,
job_201206061712_0247 -> job_201206061712_0249,
job_201206061712_0245 -> job_201206061712_0249,
job_201206061712_0249
它在我的数据集上运行良好,其中一个组键值(在 g 列中)占数据的 95%。它还消除了与内存相关的异常。
【讨论】:
多么美妙的答案啊!!你现在在 nosql 做哪些项目?【参考方案2】:我最近遇到了这个加入的错误。如果组中有任何空值,那么整个关系将被删除。.
【讨论】:
以上是关于如何优化 PIG latin 中的 group by 语句?的主要内容,如果未能解决你的问题,请参考以下文章