什么是 Map/Reduce?

Posted

技术标签:

【中文标题】什么是 Map/Reduce?【英文标题】:What is Map/Reduce? 【发布时间】:2010-09-28 03:34:05 【问题描述】:

我听到了很多关于 map/reduce 的信息,尤其是在 Google 的大规模并行计算系统的背景下。究竟是什么?

【问题讨论】:

MapReduce Explained。它比我能解释得更好。有帮助吗? 当然,我可以并且做到了谷歌这个;但是 (a) SO 旨在发展为对所有重要问题都有答案(我们甚至鼓励发布我们已经有答案的问题)并且 (b) 我希望这个社区能够接受它。 【参考方案1】:

来自 Google 的 MapReduce 研究发布页面的摘要:

MapReduce 是一种编程模型,并且 相关的实现 处理和生成大数据 套。用户指定地图功能 处理键/值对以 生成一组中间 键/值对和 reduce 函数 合并所有中间值 与同一中间体相关联 键。

MapReduce 的优点是可以在多个处理节点(多个服务器)上并行执行处理,因此它是一个可以很好地扩展的系统。

由于它基于functional programming 模型,因此mapreduce 步骤都没有任何副作用(map 过程的每个子部分的状态和结果不依赖于另一个) ,因此被映射和减少的数据集可以分别在多个处理节点上。

Joel 的 Can Your Programming Language Do This? 文章讨论了理解函数式编程对于 Google 提出 MapReduce(为其搜索引擎提供支持)的重要性。如果您不熟悉函数式编程以及它如何允许可扩展代码,那么这是一本非常好的读物。

另请参阅:Wikipedia: MapReduce

相关问题:Please explain mapreduce simply

【讨论】:

解释得很好。对于 Software Monkey,一旦您理解了 M/R,几乎可以在任何东西上轻松实现 M/R,而且不仅限于此处给出的示例。有几种方法可以让您了解它,一种是将其视为收集器和漏斗。【参考方案2】:

Map 是一个函数,它将另一个函数应用于列表中的所有项目,以生成另一个列表,其中包含所有返回值。 (“将 f 应用于 x”的另一种说法是“调用 f,将其传递给 x”。所以有时说“应用”而不是“调用”听起来更好。)

这可能是用 C# 编写 map 的方式(它被称为 Select 并且在标准库中):

public static IEnumerable<R> Select<T, R>(this IEnumerable<T> list, Func<T, R> func)

    foreach (T item in list)
        yield return func(item);

由于您是 Java 小伙子,而 Joel Spolsky 喜欢向 GROSSLY UNFAIR LIES 讲述 Java 是多么糟糕(实际上,他没有撒谎,这很糟糕,但我想赢得您的支持),这是我的Java版本的粗略尝试(我没有Java编译器,我隐约记得Java版本1.1!):

// represents a function that takes one arg and returns a result
public interface IFunctor

    object invoke(object arg);


public static object[] map(object[] list, IFunctor func)

    object[] returnValues = new object[list.length];

    for (int n = 0; n < list.length; n++)
        returnValues[n] = func.invoke(list[n]);

    return returnValues;

我相信这可以通过一百万种方式进行改进。但这是基本思想。

Reduce 是一个将列表中的所有项目转换为单个值的函数。为此,需要为其提供另一个函数func,它将两个项目转换为一个值。它可以通过将前两项提供给func 来工作。然后是第三项的结果。然后是第四个项目的结果,依此类推,直到所有项目都消失了,我们只剩下一个值。

在 C# 中,reduce 被称为 Aggregate,并且再次出现在标准库中。我将直接跳到 Java 版本:

// represents a function that takes two args and returns a result
public interface IBinaryFunctor

    object invoke(object arg1, object arg2);


public static object reduce(object[] list, IBinaryFunctor func)

    if (list.length == 0)
        return null; // or throw something?

    if (list.length == 1)
        return list[0]; // just return the only item

    object returnValue = func.invoke(list[0], list[1]);

    for (int n = 1; n < list.length; n++)
        returnValue = func.invoke(returnValue, list[n]);

    return returnValue;

这些 Java 版本需要添加泛型,但我不知道如何在 Java 中做到这一点。但是您应该能够将匿名内部类传递给它们以提供函子:

string[] names = getLotsOfNames();

string commaSeparatedNames = (string)reduce(names, 
   new IBinaryFunctor 
       public object invoke(object arg1, object arg2)
            return ((string)arg1) + ", " + ((string)arg2); 
   

希望泛型能够摆脱强制转换。 C# 中的类型安全等效项是:

string commaSeparatedNames = names.Aggregate((a, b) => a + ", " + b);

为什么这很“酷”?将较大的计算分解为较小的部分的简单方法,以便它们可以以不同的方式重新组合在一起,总是很酷。 Google 将这一想法应用于并行化,因为 map 和 reduce 都可以在多台计算机上共享。

但关键要求不是您的语言可以将函数视为值。任何 OO 语言都可以做到这一点。并行化的实际要求是传递给 map 和 reduce 的小 func 函数不得使用或更新任何状态。它们必须返回一个仅依赖于传递给它们的参数的值。否则,当您尝试并行运行整个事情时,结果将完全搞砸。

【讨论】:

总体不错,值得+1;虽然不喜欢 Java 的刺拳——但自从从 C 迁移到 Java 后,我就错过了函数值,并且同意它们在 Java 中的可用性早就应该使用了。 对 Java 来说并不是一个严重的挑战——它有三个左右的缺陷足以让我现在更喜欢 C#,但 C# 也有一系列缺陷可能会让我更喜欢另一种语言总有一天。 顺便说一句,如果有人可以编辑示例以便他们使用 Java 泛型,我会很高兴,如果这实际上可能的话。或者,如果您无法编辑,请在此处发布 sn-ps,我将进行编辑。 我开始编辑,但是map()方法创建了一个返回类型的数组; Java 不允许创建泛型类型的数组。我本可以将其更改为使用列表(并可能将其转换为数组),但我当时就没有野心了。 类似于 (a, b) => a + ", " + b 的闭包语法是我在 Java 7 中真正期待的东西,尤其是一些看起来像这样的新 API它会进去的。这种语法会让这样的东西更干净;太糟糕了,它看起来不会发生。【参考方案3】:

在对很长的华夫饼或很短的模糊博客文章感到最沮丧之后,我最终发现了这个very good rigorous concise paper。

然后我继续并通过翻译成 Scala 使其更简洁,我在其中提供了最简单的情况,用户只需指定应用程序的 mapreduce 部分。在 Hadoop/Spark 中,严格来说,采用了更复杂的编程模型,需要用户明确指定此处列出的另外 4 个函数:http://en.wikipedia.org/wiki/MapReduce#Dataflow

import scalaz.syntax.id._

trait MapReduceModel 
  type MultiSet[T] = Iterable[T]

  // `map` must be a pure function
  def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
                              (data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = 
    data.flatMap(map)

  def shufflePhase[K2, V2](mappedData: MultiSet[(K2, V2)]): Map[K2, MultiSet[V2]] =
    mappedData.groupBy(_._1).mapValues(_.map(_._2))

  // `reduce` must be a monoid
  def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
                             (shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
    shuffledData.flatMap(reduce).map(_._2)

  def mapReduce[K1, K2, V1, V2, V3](data: MultiSet[(K1, V1)])
                                   (map: ((K1, V1)) => MultiSet[(K2, V2)])
                                   (reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)]): MultiSet[V3] =
    mapPhase(map)(data) |> shufflePhase |> reducePhase(reduce)


// Kinda how MapReduce works in Hadoop and Spark except `.par` would ensure 1 element gets a process/thread on a cluster
// Furthermore, the splitting here won't enforce any kind of balance and is quite unnecessary anyway as one would expect
// it to already be splitted on HDFS - i.e. the filename would constitute K1
// The shuffle phase will also be parallelized, and use the same partition as the map phase.  
abstract class ParMapReduce(mapParNum: Int, reduceParNum: Int) extends MapReduceModel 
  def split[T](splitNum: Int)(data: MultiSet[T]): Set[MultiSet[T]]

  override def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
                                       (data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = 
    val groupedByKey = data.groupBy(_._1).map(_._2)
    groupedByKey.flatMap(split(mapParNum / groupedByKey.size + 1))
    .par.flatMap(_.map(map)).flatten.toList
  

  override def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
                             (shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
    shuffledData.map(g => split(reduceParNum / shuffledData.size + 1)(g._2).map((g._1, _)))
    .par.flatMap(_.map(reduce))
    .flatten.map(_._2).toList

【讨论】:

【参考方案4】:

Map 是一种可以应用于数组的原生 JS 方法。由于某些函数映射到原始数组中的每个元素,它会创建一个新数组。因此,如果您映射了一个函数(元素) return element * 2;,它将返回一个新数组,其中每个元素都加倍。原始数组不会被修改。

https://developer.mozilla.org/en-US/docs/Web/javascript/Reference/Global_Objects/Array/map

Reduce 是一种原生 JS 方法,也可以应用于数组。它将函数应用于数组并具有称为累加器的初始输出值。它遍历数组中的每个元素,应用一个函数,并将它们减少为一个值(从累加器开始)。它很有用,因为你可以有任何你想要的输出,你只需要从那种类型的累加器开始。因此,如果我想将某些东西简化为一个对象,我会从一个累加器 开始。

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce?v=a

【讨论】:

【参考方案5】:

如果是大数据:

1.MapReduce is a Hadoop framework used for writing applications
2.MapReduce provides analytical capabilities for analyzing huge volumes of 
   complex data
3.facilitates concurrent processing by splitting petabytes of data into smaller 
   chunks, and processing them in parallel on Hadoop commodity servers
4.used for querying and selecting data in the Hadoop Distributed File System
5.MapReduce programming enables companies to access new sources of data

【讨论】:

这个问题有很多更好的答案。这个没有添加相关信息。

以上是关于什么是 Map/Reduce?的主要内容,如果未能解决你的问题,请参考以下文章

Mongodb聚合框架比map/reduce更快吗?

Fork/Join 和 Map/Reduce 的区别

用通俗易懂的大白话讲解Map/Reduce原理

第九篇:Map/Reduce 工作机制分析 - 作业的执行流程

python 之 map/reduce

Python---map/reduce