Pig UDF 似乎总是在单个减速器中运行 - PARALLEL 不起作用
Posted
技术标签:
【中文标题】Pig UDF 似乎总是在单个减速器中运行 - PARALLEL 不起作用【英文标题】:Pig UDF seems to always run in a single reducer - PARALLEL not working 【发布时间】:2015-05-27 22:05:27 【问题描述】:我有一个带有 Python UDF 的 Pig 脚本,它应该生成用户级功能。我的数据由 Pig 预处理,然后作为元组列表发送到 UDF。 UDF 将处理数据元组并为每个用户返回一个带有我的功能计算机的字符数组。发生这种情况的代码如下所示:
-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Limit the ids to process
userids = LIMIT grouped_ids (long)'$limit';
-- Generate features
user_features = FOREACH userids
GENERATE group as user_id:chararray,
udfs.extract_features(data) as features:chararray;
UDF 代码显然是在 reducer 中运行的,并且由于某种原因,它总是跑到一个 reducer 中,这需要相当长的时间。我正在寻找一种方法来并行执行它,因为现在我的工作总共需要 22 分钟,其中 18 分钟在这个单一的减速器中。
Pig 通常会尝试将 1GB 的数据分配给 reducer,而我的数据确实不到 1GB,大约 300-700MB,但在 UDF 端相当耗时,所以这显然不是最优的,而我的其余部分集群是空的。
我尝试过的事情:
设置default parallel
会影响整个脚本脚本,但仍无法使具有 UDF 的 reducer 并行化
在GROUP data BY user_id
上手动设置parallel
会并行化组的输出并调用多个reducer,但在UDF 启动时,它又是一个reducer
设置pig.exec.reducers.bytes.per.reducer
允许您为每个reducer 设置例如最多10MB 的数据,它显然适用于我脚本的其他部分(并且破坏了并行性,因为这也会影响我管道开始时的数据准备- 正如预期的那样)但同样不允许多个减速器与此 UDF 一起运行。
据我了解发生了什么,我不明白为什么 - 如果 shuffle 阶段可以将 user_id 散列到一个或多个减速器 - 为什么这个脚本不能产生多个减速器,在那里实例化 UDF并根据 user_id 将对应的数据散列到正确的 reducer。我的数据或任何东西都没有明显的偏差。
我显然在这里遗漏了一些东西,但看不到什么。有没有人有任何解释和/或建议?
编辑:我更新了代码,因为缺少一些重要的东西:我在GROUP BY
和FOREACH
之间运行LIMIT
。我还清理了不相关的信息。为了便于阅读,我还将内联代码扩展为单独的行。
【问题讨论】:
【参考方案1】:您的问题是您将整个 data
关系作为输入参数传递给您的 UDF,因此您的 UDF 只使用整个数据调用一次,因此它仅在一个减速器中运行。我猜你想为每组 user_id
调用一次,所以尝试使用嵌套的 foreach:
data_grouped = GROUP data BY user_id;
user_features = FOREACH data_grouped
GENERATE group AS user_id: chararray,
udfs.extract_features(data) AS features: chararray;
通过这种方式,您可以强制 UDF 在与 group by
中使用的减速器一样多的减速器中运行。
【讨论】:
嗨 Balduz,感谢您的建议。这与我最初的帖子有多少相似之处,它似乎有效。我的代码为每个和组都有一个简写嵌套:user_features = FOREACH (GROUP data BY user_id) GENERATE ...
。我在原始帖子中可能遗漏了一件事:我在 GROUP BY
和 FOREACH
之间有一个 LIMIT
,这可能消除了在多个减速器中运行 UDF 的可能性。
Balduz,我编辑了帖子以反映我拥有的实际代码。你的帖子让我找到了解决方案,但你的答案实际上只是扩展了我拥有的代码的内联版本,所以我不确定我是否应该投票作为问题的答案呢?这是我在 *** 上的第一次问答,我想把它做对;-) 我想我应该自己发布一个答案,说明限制运算符是失去并行性的原因?
@Oliver 好吧,这完全取决于你。你用一些代码问了一个问题,这回答了这个问题。然后你编辑了代码并完全改变了它(一件事是普通的foreach,完全不同的是嵌套的foreach),所以它没有回答你的问题。在编辑之前,这是否回答了您的原始问题?如果你这么认为并阅读了规则,你应该接受它(而不是赞成它),如果不是,那就不要:)【参考方案2】:
在group by
和foreach
之间的代码中使用LIMIT
运算符消除了在多个reducer 中运行我的代码的可能性,即使我明确设置了并行度。
-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Limit the ids to process
>>> userids = LIMIT grouped_ids (long)'$limit'; <<<
-- Generate features
user_features = FOREACH userids
GENERATE group as user_id:chararray,
udfs.extract_features(data) as features:chararray;
一旦LIMIT
被进一步放置在代码中,我设法获得预定义数量的reducer 来运行我的UDF:
-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Generate features
user_features = FOREACH grouped_ids
GENERATE group as user_id:chararray,
udfs.extract_features(data) as features:chararray;
-- Limit the features
user_features_limited = LIMIT user_features (long)'$limit';
-- ... process further and persist
所以我试图优化/减少 user_ids 流入的努力对于增加并行性适得其反。
【讨论】:
以上是关于Pig UDF 似乎总是在单个减速器中运行 - PARALLEL 不起作用的主要内容,如果未能解决你的问题,请参考以下文章