在编写 Map/Reduce 作业以找到平均值时需要帮助
Posted
技术标签:
【中文标题】在编写 Map/Reduce 作业以找到平均值时需要帮助【英文标题】:Need help in writing Map/Reduce job to find average 【发布时间】:2013-08-05 15:55:07 【问题描述】:我对 Hadoop Map/Reduce 还很陌生。我正在尝试编写一个 Map/Reduce 作业来查找 n 个进程所花费的平均时间,给定如下输入文本文件:
ProcessName Time
process1 10
process2 20
processn 30
我浏览了一些教程,但仍然无法彻底理解。我的 mapper 和 reducer 类应该如何解决这个问题?我的输出将始终是文本文件,还是可以将平均值直接存储在某种变量中?
谢谢。
【问题讨论】:
Find the average of numbers using map-reduce的可能重复 【参考方案1】:您的映射器读取文本文件并在每一行应用以下映射函数
map: (key, value)
time = value[2]
emit("1", time)
所有 map 调用都会发出键“1”,该键将由单个 reduce 函数处理
reduce: (key, values)
result = sum(values) / n
emit("1", result)
由于您使用的是 Hadoop,您可能已经在 map 函数中看到了 StringTokenizer 的使用,您可以使用它来仅获取一行中的时间。您还可以想出一些方法来计算 n(进程数),例如,您可以在另一个只计算行数的作业中使用 Counter。
更新 如果您要执行此作业,则必须将每一行的元组发送到减速器,如果您在多台机器上运行 Hadoop 集群,则可能会阻塞网络。 更聪明的方法可以计算更接近输入的时间总和,例如通过指定一个组合器:
combine: (key, values)
emit(key, sum(values))
然后在同一台机器的所有地图功能的结果上执行此组合器,即,两者之间没有网络。 然后,reducer 将只获得与集群中的机器一样多的元组,而不是与日志文件中的行一样多。
【讨论】:
谢谢。还有一件事。有什么方法可以查明作业是否均匀分布到所有从节点上?我有一个包含 1 个主节点和 3 个从节点的集群。 你放入 HDFS 的文件被分成几个块,这些块被复制到你的集群上(参见:hadoop.apache.org/docs/stable/hdfs_design.html)。然后在每个块的每个集群上实例化映射器类。偶然地,两个映射器可以在不同集群上的同一块上工作,然后偶然地一个“获胜”,另一个被中止,中间结果被丢弃。如果你想分析你的工作,你必须查看你的工作日志。 jobtracker 的网络界面提供了一些统计数据。 这是一个例子,说明糟糕设计的 Hadoop 算法是多么无用。 @horcrux 请详细说明。 @contradictioned 您正在对化简器中的所有值求和,而不是在映射器中进行预求和,或者更好的是在组合器中求和。所以整个工作由一个工人完成。因此,您需要与可以由单台机器执行的愚蠢的 avg() 方法相同的计算能力。相反,您只是增加了网络开销。【参考方案2】:您的映射器将您的输入映射到您想要取平均值的值。因此,假设您的输入是一个文本文件,其格式类似于
ProcessName Time
process1 10
process2 20
.
.
.
然后您需要获取文件中的每一行,拆分它,获取第二列,然后将该列的值输出为IntWritable
(或其他一些Writable
数字类型)。由于您想取所有时间的平均值,而不是按进程名称或任何内容分组,因此您将拥有一个固定键。因此,您的映射器看起来像
private IntWritable one = new IntWritable(1);
private IntWritable output = new IntWritable();
proctected void map(LongWritable key, Text value, Context context)
String[] fields = value.split("\t");
output.set(Integer.parseInt(fields[1]));
context.write(one, output);
您的减速器采用这些值,并简单地计算平均值。这看起来像
IntWritable one = new IntWritable(1);
DoubleWritable average = new DoubleWritable();
protected void reduce(IntWritable key, Iterable<IntWrtiable> values, Context context)
int sum = 0;
int count = 0;
for(IntWritable value : values)
sum += value.get();
count++;
average.set(sum / (double) count);
context.Write(key, average);
我在这里做了很多假设,关于您的输入格式和其他什么,但它们是合理的假设,您应该能够根据您的确切需求进行调整。
我的输出会一直是文本文件,还是可以直接将平均值存储在某种变量中?
这里有几个选项。您可以对作业的输出进行后处理(写入单个文件),或者,由于您正在计算单个值,因此可以将结果存储在计数器中。
【讨论】:
谢谢杰森。还有一件事。我已经建立了一个带有 1 个作业跟踪器和 3 个其他任务跟踪器的 hadoop 集群。那么我是否需要在所有任务跟踪器中都有输入文件,或者如果我只有在作业跟踪器中就足够了?有没有什么办法可以查出作业是否均匀分布到所有从节点上? HDFS 和 Hadoop 将为您处理。您可以通过在 Hadoop 作业跟踪器节点上打开端口 50030(至少是 默认 端口)来监控作业,并从那里访问您的任务跟踪器。请注意,由于您的 map 作业只有一个输出键,因此您的 reduce 任务将仅在一个节点上运行。以上是关于在编写 Map/Reduce 作业以找到平均值时需要帮助的主要内容,如果未能解决你的问题,请参考以下文章