`map` 和 `reduce` 方法在 Spark RDD 中如何工作?

Posted

技术标签:

【中文标题】`map` 和 `reduce` 方法在 Spark RDD 中如何工作?【英文标题】:How do `map` and `reduce` methods work in Spark RDDs? 【发布时间】:2015-12-07 18:07:45 【问题描述】:

以下代码来自 Apache Spark 的快速入门指南。 有人可以解释一下什么是“line”变量以及它来自哪里?

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

另外,如何将值传递给 a,b?

链接到 QSG http://spark.apache.org/docs/latest/quick-start.html

【问题讨论】:

正如@Tyth 所说“也许你应该先阅读一些 scala 集合介绍。”我并不是说这是不屑一顾,而是说如果您尝试使用 Spark 做任何不平凡的事情,而至少对您使用的语言有基本的了解,那么您将陷入痛苦的世界。 【参考方案1】:

ma​​p 函数所做的是,它获取参数列表并将其映射到某个函数。类似于 python 中的 map 函数,如果你熟悉的话。

此外,File 就像一个字符串列表。 (不完全是,但这就是它的迭代方式)

让我们认为这是您的文件。

val list_a: List[String] = List("first line", "second line", "last line")

现在让我们看看 ma​​p 函数是如何工作的。

我们需要两个东西,我们已经拥有的list of values 和我们想要将此值映射到的function。让我们考虑一个非常简单的函数来理解。

val myprint = (arg:String)=>println(arg)

这个函数只接受一个字符串参数并在控制台上打印。

myprint("hello world")
hello world

如果我们将此函数与您的列表相匹配,它将打印所有行

list_a.map(myprint)

我们也可以编写一个如下所述的匿名函数,它做同样的事情。

list_a.map(arg=>println(arg))

在您的情况下,line 是文件的第一行。您可以根据需要更改参数名称。例如,在上面的示例中,如果我将 arg 更改为 line 它将正常工作

list_a.map(line=>println(line))

【讨论】:

【参考方案2】:

首先,根据您的链接,textfile 被创建为

val textFile = sc.textFile("README.md")

这样textfileRDD[String],这意味着它是String 类型的弹性分布式数据集。访问的 API 与常规 Scala 集合的 API 非常相似。

那么现在这个map 做了什么?

假设您有一个 Strings 列表,并希望将其转换为 Int 列表,表示每个字符串的长度。

val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )

map 方法需要一个函数。一个函数,来自String => Int。使用该函数,列表的每个元素都会被转换。所以intlist的值为List( 2, 3, 1 )

在这里,我们从String => Int 创建了一个匿名函数。那是x => x.length。甚至可以将函数更明确地写为

stringlist.map( (x: String) => x.length )  

如果你确实使用上面的显式写法,你可以

val stringLength : (String => Int) = 
  x => x.length

val intlist = stringlist.map( stringLength )

所以,这里绝对明显,stringLength 是一个从StringInt 的函数。

备注:一般来说,map 构成了所谓的 Functor。当您提供来自 A => B 的函数时,仿函数的 map(此处为 List)允许您使用该函数也来自 List[A] => List[B]。这称为提升。

回答您的问题

什么是“线”变量?

如上所述,line是函数line => line.split(" ").size的输入参数

更明确 (line: String) => line.split(" ").size

例子:如果line是“hello world”,函数返回2。

"hello world" 
=> Array("hello", "world")  // split 
=> 2                        // size of Array

如何将值传递给 a,b?

reduce 还需要来自(A, A) => A 的函数,其中A 是您的RDD 的类型。让我们调用这个函数op

reduce 是什么意思。示例:

List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation. 
  Start with 1, 2, that is 
    x is 1  and  y is 2
Step 2:  op( op( 1, 2 ), 3 ) - take the next element 3
  Take the next element 3: 
    x is op(1,2) = 3   and y = 3
Step 3:  op( op( op( 1, 2 ), 3 ), 4) 
  Take the next element 4: 
    x is op(op(1,2), 3 ) = op( 3,3 ) = 6    and y is 4

这里的结果是列表元素的总和,10。

备注:一般reduce计算

op( op( ... op(x_1, x_2) ..., x_n-1), x_n)

完整示例

首先,textfile 是一个 RDD[String],比如说

TextFile
 "hello Tyth"
 "cool example, eh?"
 "goodbye"

TextFile.map(line => line.split(" ").size)
 2
 3
 1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
 3
   Steps here, recall `(a, b) => if (a > b) a else b)`
   - op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3 
   - op( 3, 1 ) = 3

【讨论】:

很好的答案,但我应该这样说: - 每个分区按元素顺序处理 - 多个分区可以由单个 -worker(多个执行程序线程)或不同的工作人员同时处理你如果要每次处理一个元素,则需要进行 n 个分区 可能是我在 SO 上读到的最好的答案之一。谢谢你,先生。 所以您的查询返回一行中的最大字数。很好的答案,Tq【参考方案3】:

Mapreduce是RDD类的方法,其接口类似于scala集合。

您传递给方法mapreduce 的内容实际上是匿名函数(map 中有一个参数,reduce 中有两个参数)。 textFile 为它拥有的每个元素(在此上下文中的文本行)调用提供的函数。

也许你应该先阅读一些 scala 集合介绍。

您可以在此处阅读有关 RDD 类 API 的更多信息: https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD

【讨论】:

以上是关于`map` 和 `reduce` 方法在 Spark RDD 中如何工作?的主要内容,如果未能解决你的问题,请参考以下文章

hive设置map和reduce数量

在JavaScript函数式编程里使用Map和Reduce方法

如何确定 Hadoop map和reduce的个数

在 Python 中结合 reduce 和 map 的最简洁方法

map、foreach、reduce、filters的用法及区别

hive如何调整map数和reduce数