在地图减少中计算中位数
Posted
技术标签:
【中文标题】在地图减少中计算中位数【英文标题】:Computing median in map reduce 【发布时间】:2012-04-23 23:15:29 【问题描述】:有人可以举例说明 map reduce 中中位数/分位数的计算吗?
我对 Datafu 的中位数的理解是,'n' 个映射器对 数据并将数据发送到负责排序的“1”reducer 来自 n 个映射器的所有数据并找到中值(中间值) 我的理解正确吗?,
如果是这样,这种方法是否适用于 大量数据,我可以清楚地看到一个减速器 努力完成最后的任务。 谢谢
【问题讨论】:
【参考方案1】:试图找到一个系列中的中位数(中间数字)将需要 1 个 reducer 传递整个数字范围以确定哪个是“中间”值。
根据输入集中值的范围和唯一性,您可以引入组合器来输出每个值的频率 - 减少发送到单个减速器的地图输出数量。然后,您的 reducer 可以使用排序值/频率对来识别中位数。
另一种可以扩展的方法(如果您知道值的范围和粗略分布)是使用自定义分区器,该分区器按范围桶分配键(0-99 转到减速器 0,100-199 到减速器 2 , 等等)。然而,这将需要一些辅助工作来检查 reducer 输出并执行最终的中值计算(例如,知道每个 reducer 中的键数,您可以计算哪个 reducer 输出将包含中值以及偏移量)
【讨论】:
【参考方案2】:O((n log n)/p) 排序,然后 O(1) 得到中位数。
是的...您可以获得 O(n/p),但您不能在 Hadoop 中使用开箱即用的排序功能。除非你能证明 2-20 小时的开发时间来编写并行的第 k 大算法,否则我只会排序并获得中心项目。
【讨论】:
【参考方案3】:您真的需要精确中位数和分位数吗?
很多时候,您最好只获得近似值并使用它们,特别是如果您将其用于例如数据分区。
其实你可以使用近似分位数来加快寻找准确分位数的速度(其实是在O(n/p)
时间),下面是大致的策略大纲:
-
每个分区都有一个映射器计算所需的分位数,并将它们输出到新的数据集。该数据集应该小几个数量级(除非您要求的分位数太多!)
在此数据集中,再次计算分位数,类似于“中位数的中位数”。这些是您的初步估计。
根据这些分位数对数据进行重新分区(甚至以这种方式获得的附加分区)。目标是最终保证真正的分位数在一个分区中,并且每个分区最多应该有一个期望的分位数
在每个分区中,执行 QuickSelect(在
O(n)
中)以找到真正的分位数。
每个步骤都是线性时间。成本最高的步骤是第 3 部分,因为它需要重新分配整个数据集,因此会生成O(n)
网络流量。
您可以通过为第一次迭代选择“备用”分位数来优化该过程。说,你想找到全球中位数。您无法在线性过程中轻松找到它,但是当将其拆分为 k 个分区时,您可能将其缩小到数据集的 1/kth。因此,不是让每个节点报告其中值,而是让每个节点另外报告 (k-1)/(2k) 和 (k+1)/(2k) 处的对象。这应该允许您缩小真正的中位数必须显着存在的值的范围。因此在下一步中,您可以将每个节点在所需范围内的那些对象发送到单个主节点,并仅选择该范围内的中位数。
【讨论】:
在这种方法中找到精确的分位数可能会非常昂贵,但比天真的方法更好。步骤 1 到 4 实际上有助于将集合分成两半并在更小的空间内解决相同的问题。但在这种方法中,可能需要第 1 步到第 4 步的 logn 次迭代才能真正获得分位数。【参考方案4】:在许多实际场景中,数据集中值的基数相对较小。在这种情况下,可以通过两个 MapReduce 作业有效地解决问题:
-
计算数据集中值的频率(基本上是字数统计工作)
身份映射器 + 一个基于 对计算中值的化简器
Job 1. 将大大减少数据量,并且可以完全并行执行。工作 2. 的 Reducer 只需要处理 n
(n
= cardinality of your value set
) 项目,而不是所有值,就像天真的方法一样。
下面是作业2的reducer示例。它是可以直接在Hadoop流中使用的python脚本。假设您的数据集中的值为ints
,但可以轻松地为double
s 采用
import sys
item_to_index_range = []
total_count = 0
# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
item, count = line.strip().split("\t", 1)
new_total_count = total_count + int(count)
item_to_index_range.append((item, (total_count + 1, new_total_count + 1)))
total_count = new_total_count
# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
middle_items_indexes += [total_count / 2]
# Retrieve middle item(s)
middle_items = []
for i in middle_items_indexes:
for item, index_range in item_to_index_range:
if i in range(*index_range):
middle_items.append(item)
continue
print sum(middle_items) / float(len(middle_items))
这个答案建立在最初来自answer 的Chris White 的建议之上。答案建议使用组合器作为计算值频率的平均值。但是,在 MapReduce 中,并不能保证始终执行组合器。这有一些副作用:
reducer 首先必须计算最终的【讨论】:
以上是关于在地图减少中计算中位数的主要内容,如果未能解决你的问题,请参考以下文章