`map` 和 `reduce` 方法在 Spark RDD 中如何工作?
Posted
技术标签:
【中文标题】`map` 和 `reduce` 方法在 Spark RDD 中如何工作?【英文标题】:How do `map` and `reduce` methods work in Spark RDDs? 【发布时间】:2015-12-07 18:07:45 【问题描述】:以下代码来自 Apache Spark 的快速入门指南。 有人可以解释一下什么是“line”变量以及它来自哪里?
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
另外,如何将值传递给 a,b?
链接到 QSG http://spark.apache.org/docs/latest/quick-start.html
【问题讨论】:
正如@Tyth 所说“也许你应该先阅读一些 scala 集合介绍。”我并不是说这是不屑一顾,而是说如果您尝试使用 Spark 做任何不平凡的事情,而至少对您使用的语言有基本的了解,那么您将陷入痛苦的世界。 【参考方案1】:map 函数所做的是,它获取参数列表并将其映射到某个函数。类似于 python 中的 map 函数,如果你熟悉的话。
此外,File 就像一个字符串列表。 (不完全是,但这就是它的迭代方式)
让我们认为这是您的文件。
val list_a: List[String] = List("first line", "second line", "last line")
现在让我们看看 map 函数是如何工作的。
我们需要两个东西,我们已经拥有的list of values
和我们想要将此值映射到的function
。让我们考虑一个非常简单的函数来理解。
val myprint = (arg:String)=>println(arg)
这个函数只接受一个字符串参数并在控制台上打印。
myprint("hello world")
hello world
如果我们将此函数与您的列表相匹配,它将打印所有行
list_a.map(myprint)
我们也可以编写一个如下所述的匿名函数,它做同样的事情。
list_a.map(arg=>println(arg))
在您的情况下,line
是文件的第一行。您可以根据需要更改参数名称。例如,在上面的示例中,如果我将 arg
更改为 line
它将正常工作
list_a.map(line=>println(line))
【讨论】:
【参考方案2】:首先,根据您的链接,textfile
被创建为
val textFile = sc.textFile("README.md")
这样textfile
是RDD[String]
,这意味着它是String
类型的弹性分布式数据集。访问的 API 与常规 Scala 集合的 API 非常相似。
那么现在这个map
做了什么?
假设您有一个 String
s 列表,并希望将其转换为 Int 列表,表示每个字符串的长度。
val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )
map
方法需要一个函数。一个函数,来自String => Int
。使用该函数,列表的每个元素都会被转换。所以intlist的值为List( 2, 3, 1 )
在这里,我们从String => Int
创建了一个匿名函数。那是x => x.length
。甚至可以将函数更明确地写为
stringlist.map( (x: String) => x.length )
如果你确实使用上面的显式写法,你可以
val stringLength : (String => Int) =
x => x.length
val intlist = stringlist.map( stringLength )
所以,这里绝对明显,stringLength 是一个从String
到Int
的函数。
备注:一般来说,map
构成了所谓的 Functor。当您提供来自 A => B 的函数时,仿函数的 map
(此处为 List)允许您使用该函数也来自 List[A] => List[B]
。这称为提升。
回答您的问题
什么是“线”变量?
如上所述,line
是函数line => line.split(" ").size
的输入参数
更明确
(line: String) => line.split(" ").size
例子:如果line
是“hello world”,函数返回2。
"hello world"
=> Array("hello", "world") // split
=> 2 // size of Array
如何将值传递给 a,b?
reduce
还需要来自(A, A) => A
的函数,其中A
是您的RDD
的类型。让我们调用这个函数op
。
reduce
是什么意思。示例:
List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation.
Start with 1, 2, that is
x is 1 and y is 2
Step 2: op( op( 1, 2 ), 3 ) - take the next element 3
Take the next element 3:
x is op(1,2) = 3 and y = 3
Step 3: op( op( op( 1, 2 ), 3 ), 4)
Take the next element 4:
x is op(op(1,2), 3 ) = op( 3,3 ) = 6 and y is 4
这里的结果是列表元素的总和,10。
备注:一般reduce
计算
op( op( ... op(x_1, x_2) ..., x_n-1), x_n)
完整示例
首先,textfile 是一个 RDD[String],比如说
TextFile
"hello Tyth"
"cool example, eh?"
"goodbye"
TextFile.map(line => line.split(" ").size)
2
3
1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
3
Steps here, recall `(a, b) => if (a > b) a else b)`
- op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3
- op( 3, 1 ) = 3
【讨论】:
很好的答案,但我应该这样说: - 每个分区按元素顺序处理 - 多个分区可以由单个 -worker(多个执行程序线程)或不同的工作人员同时处理你如果要每次处理一个元素,则需要进行 n 个分区 可能是我在 SO 上读到的最好的答案之一。谢谢你,先生。 所以您的查询返回一行中的最大字数。很好的答案,Tq【参考方案3】:Map
和reduce
是RDD类的方法,其接口类似于scala集合。
您传递给方法map
和reduce
的内容实际上是匿名函数(map 中有一个参数,reduce 中有两个参数)。 textFile
为它拥有的每个元素(在此上下文中的文本行)调用提供的函数。
也许你应该先阅读一些 scala 集合介绍。
您可以在此处阅读有关 RDD 类 API 的更多信息: https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD
【讨论】:
以上是关于`map` 和 `reduce` 方法在 Spark RDD 中如何工作?的主要内容,如果未能解决你的问题,请参考以下文章
在JavaScript函数式编程里使用Map和Reduce方法
在 Python 中结合 reduce 和 map 的最简洁方法