在 mapreduce 中操作迭代器

Posted

技术标签:

【中文标题】在 mapreduce 中操作迭代器【英文标题】:manipulating iterator in mapreduce 【发布时间】:2010-08-14 03:44:42 【问题描述】:

我正在尝试使用 hadoop 查找任何给定点的总和,我遇到的问题是从单个减速器中的给定键获取所有值。看起来像这样。

减速机:

 public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, DoubleWritable> 

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException 
        Text word = new Text();

        Iterator<IntWritable> tr = values;
        IntWritable v;
        while (tr.hasNext()) 
             v = tr.next();

            Iterator<IntWritable> td = values;
            while (td.hasNext()) 

                IntWritable u = td.next();
                double sum = u+v;
                word.set( u + " + " + v);
                output.collect(word, new DoubleWritable(sum));
            
        
    

我正在尝试创建 Iterator 变量的两个副本,以便我可以遍历第二个迭代器的所有值,同时从前一个 Iterator 获得单个值(上面的两个 while 循环)但是两个迭代器持有始终相同的值。

我不确定这是否是正确的做法。

【问题讨论】:

我也在试图找出类似的问题。我需要在 reduce 函数中检查记录两次。我正在使用带有 python 的 hadoop 流,并且不知道如何为 reducer 中的记录倒带迭代器。 根据定义,迭代器只向一个方向移动。所以一旦你做了 .next() 你推进了它,你持有的迭代器的任何其他实例现在也将指向下一个值。这不是 Hadoop 特有的。当您说“尝试创建 Iterator 变量的两个副本”时,您实际上并没有创建任何副本,正如@casper 所说,它们都是相同的实例。也就是说,map-reduce 可能不适合解决这个问题,一种方法是在某处编写 reducer 输出后,在 M/R 之外运行嵌套的 while 循环。 【参考方案1】:

reducer 中的迭代器并不像你想象的那么简单。

问题是您正在迭代的项目总数可能不适合内存。这意味着迭代器可能正在从磁盘读取。如果您有两个独立的迭代器副本,那么您可以让其中一个远远领先于另一个,这意味着不能删除两个迭代器指向的位置之间的数据。

为简化实施,Hadoop 不支持为 reduce 值使用多个迭代器。

这样做的实际影响是您不能两次遍历同一个迭代器。这不好,但事实就是如此。如果您绝对知道项目的数量将适合内存,那么您可以按照 MrGomez 的建议将所有项目复制到一个列表中。如果您不知道这一点,您可能必须使用辅助存储。

更好的方法是重新设计您的程序,这样您就不需要在 reducer 中进行无限存储。这可能有点棘手,但有一些标准方法可以解决这个问题。

对于您的特定问题,您的输出大小相对于最大的减少输入集呈二次增长。这通常是一个非常糟糕的主意。在大多数情况下,您不需要所有对,只需要最重要的对。如果你能以某种方式修剪这组对,那么你就准备好了,你也许可以移除所有对的约束。

例如,如果您尝试为每个归约集找到总和最大的 100 对,则可以保留一个包含迄今为止看到的 100 个最大输入的优先级队列和一个包含迄今为止看到的 100 个最大总和的优先级队列.对于每个新输入,您可以用迄今为止看到的最大 100 个数字形成总和,并尝试将这些总和放入第二个队列。最后,您应该将新输入粘贴到第一个队列中,并通过删除最小值(如有必要)将两个队列修剪为 100 个元素。在 reduce 的 close 方法中,您应该转储优先级队列。这种方法保证您只需要 min(n^2, 200) 个存储元素,从而避免了 n^2 问题并通过保持可见的 100 个最大的项目而不是所有可见的项目来避免双重通过输入。

【讨论】:

【参考方案2】:

我不确定您到底想要完成什么,但我知道很多:Hadoop 迭代器的行为有点奇怪。调用 Iterator.next() 将始终返回 IntWritable 的 SAME EXACT 实例,并将该实例的内容替换为下一个值。因此,在对 Iterator.next() 的调用中保持对 IntWritable 的引用几乎总是一个错误。我相信这种行为是为了减少对象创建量和 GC 开销而设计的。

解决此问题的一种方法是使用 WritableUtils.clone() 克隆您尝试在调用 Iterator.next() 时保留的实例。

【讨论】:

是的,这只是今天发生在我身上。 Hadoop 迭代器万岁!这种行为是否正式记录在任何地方(博客和此处除外)?【参考方案3】:

要复制迭代器,您不能将迭代器分配给新变量。您应该将迭代器“克隆”到迭代器类的新变量。 当迭代器A分配另一个迭代器变量B时,迭代器的两个变量指向相同的数据。

【讨论】:

【参考方案4】:

经过your previous question,,您似乎被困在the iterator problem piccolbo described. 您的reducer 公式还表明您已经放弃了他为幼稚方法提出的算法......尽管不是最理想的,但它会起作用。

请允许我用我的回答稍微清理一下你的代码:

// Making use of Hadoop's Iterable reduce, assuming it's available to you
//
//  The method signature is:
//
//  protected void reduce(KEYIN key, java.lang.Iterable<VALUEIN> values, 
//   org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context 
//   context) throws java.io.IOException, java.lang.InterruptedException
//
public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException 

    // I assume you declare this here to save on GC
    Text outKey = new Text();
    IntWritable outVal = new IntWritable();

    // Since you've forgone piccolbo's approach, you'll need to maintain the
    // data structure yourself. Since we always walk the list forward and
    // wish to optimize the insertion speed, we use LinkedList. Calls to
    // IntWritable.get() will give us an int, which we then copy into our list.
    LinkedList<Integer> valueList = new LinkedList<Integer>();

    // Here's why we changed the method signature: use of Java's for-each
    for (IntWritable iw: values) 
        valueList.add(iw.get());
    

    // And from here, we construct each value pair as an O(n^2) operation
    for (Integer i: valueList) 
        for (Integer j: valueList) 
            outKey.set(i + " + " + j);
            outVal.set(i + j);
            context.write(outKey, outVal);
        
    

    // Do note: I've also changed your return value from DoubleWritable to
    // IntWritable, since you should always be performing integer operations
    // as defined. If your points are Double, supply DoubleWritable instead.

这行得通,但它做了几个假设,在构建距离矩阵时会限制性能,包括要求在单个 reduce 操作中执行组合。

如果您事先知道输入数据集的大小和维度,请考虑 piccolbo's approach。在最坏的情况下,这应该可以通过在线性时间内遍历输入行来实现。

(请参阅this thread,了解为什么我们不能将其实现为前向迭代器。)

【讨论】:

以上是关于在 mapreduce 中操作迭代器的主要内容,如果未能解决你的问题,请参考以下文章

解决 hive maPredue转换hivesql出错Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.a

解决 hive maPredue转换hivesql出错Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.a

为什么spark比mapreduce处理数据快

为什么要用MapReduce

MapReduce之单词计数

深入理解MapReduce的架构及原理