在编写 Map/Reduce 作业以找到平均值时需要帮助

Posted

技术标签:

【中文标题】在编写 Map/Reduce 作业以找到平均值时需要帮助【英文标题】:Need help in writing Map/Reduce job to find average 【发布时间】:2013-08-05 15:55:07 【问题描述】:

我对 Hadoop Map/Reduce 还很陌生。我正在尝试编写一个 Map/Reduce 作业来查找 n 个进程所花费的平均时间,给定如下输入文本文件:

ProcessName Time
process1    10
process2    20
processn    30

我浏览了一些教程,但仍然无法彻底理解。我的 mapper 和 reducer 类应该如何解决这个问题?我的输出将始终是文本文件,还是可以将平均值直接存储在某种变量中?

谢谢。

【问题讨论】:

Find the average of numbers using map-reduce的可能重复 【参考方案1】:

您的映射器读取文本文件并在每一行应用以下映射函数

map: (key, value)
  time = value[2]
  emit("1", time)

所有 map 调用都会发出键“1”,该键将由单个 reduce 函数处理

reduce: (key, values)
  result = sum(values) / n
  emit("1", result)

由于您使用的是 Hadoop,您可能已经在 map 函数中看到了 StringTokenizer 的使用,您可以使用它来仅获取一行中的时间。您还可以想出一些方法来计算 n(进程数),例如,您可以在另一个只计算行数的作业中使用 Counter。

更新 如果您要执行此作业,则必须将每一行的元组发送到减速器,如果您在多台机器上运行 Hadoop 集群,则可能会阻塞网络。 更聪明的方法可以计算更接近输入的时间总和,例如通过指定一个组合器:

combine: (key, values)
  emit(key, sum(values))

然后在同一台机器的所有地图功能的结果上执行此组合器,即,两者之间没有网络。 然后,reducer 将只获得与集群中的机器一样多的元组,而不是与日志文件中的行一样多。

【讨论】:

谢谢。还有一件事。有什么方法可以查明作业是否均匀分布到所有从节点上?我有一个包含 1 个主节点和 3 个从节点的集群。 你放入 HDFS 的文件被分成几个块,这些块被复制到你的集群上(参见:hadoop.apache.org/docs/stable/hdfs_design.html)。然后在每个块的每个集群上实例化映射器类。偶然地,两个映射器可以在不同集群上的同一块上工作,然后偶然地一个“获胜”,另一个被中止,中间结果被丢弃。如果你想分析你的工作,你必须查看你的工作日志。 jobtracker 的网络界面提供了一些统计数据。 这是一个例子,说明糟糕设计的 Hadoop 算法是多么无用。 @horcrux 请详细说明。 @contradictioned 您正在对化简器中的所有值求和,而不是在映射器中进行预求和,或者更好的是在组合器中求和。所以整个工作由一个工人完成。因此,您需要与可以由单台机器执行的愚蠢的 avg() 方法相同的计算能力。相反,您只是增加了网络开销。【参考方案2】:

您的映射器将您的输入映射到您想要取平均值的值。因此,假设您的输入是一个文本文件,其格式类似于

ProcessName Time
process1    10
process2    20
.
.
.

然后您需要获取文件中的每一行,拆分它,获取第二列,然后将该列的值输出为IntWritable(或其他一些Writable 数字类型)。由于您想取所有时间的平均值,而不是按进程名称或任何内容分组,因此您将拥有一个固定键。因此,您的映射器看起来像

private IntWritable one = new IntWritable(1);
private IntWritable output = new IntWritable();
proctected void map(LongWritable key, Text value, Context context) 
    String[] fields = value.split("\t");
    output.set(Integer.parseInt(fields[1]));
    context.write(one, output);

您的减速器采用这些值,并简单地计算平均值。这看起来像

IntWritable one = new IntWritable(1);
DoubleWritable average = new DoubleWritable();
protected void reduce(IntWritable key, Iterable<IntWrtiable> values, Context context) 
    int sum = 0;
    int count = 0;
    for(IntWritable value : values) 
        sum += value.get();
        count++;
    
    average.set(sum / (double) count);
    context.Write(key, average);

我在这里做了很多假设,关于您的输入格式和其他什么,但它们是合理的假设,您应该能够根据您的确切需求进行调整。

我的输出会一直是文本文件,还是可以直接将平均值存储在某种变量中?

这里有几个选项。您可以对作业的输出进行后处理(写入单个文件),或者,由于您正在计算单个值,因此可以将结果存储在计数器中。

【讨论】:

谢谢杰森。还有一件事。我已经建立了一个带有 1 个作业跟踪器和 3 个其他任务跟踪器的 hadoop 集群。那么我是否需要在所有任务跟踪器中都有输入文件,或者如果我只有在作业跟踪器中就足够了?有没有什么办法可以查出作业是否均匀分布到所有从节点上? HDFS 和 Hadoop 将为您处理。您可以通过在 Hadoop 作业跟踪器节点上打开端口 50030(至少是 默认 端口)来监控作业,并从那里访问您的任务跟踪器。请注意,由于您的 map 作业只有一个输出键,因此您的 reduce 任务将仅在一个节点上运行。

以上是关于在编写 Map/Reduce 作业以找到平均值时需要帮助的主要内容,如果未能解决你的问题,请参考以下文章

Apache Crunch 管道如何生成 map reduce 作业?

简单的 Java Map/Reduce 框架 [关闭]

Hadoop Map/Reduce

在Hadoop中链接多个MapReduce作业

您如何在 map/reduce 中实现排名和排序?

第九篇:Map/Reduce 工作机制分析 - 作业的执行流程