Apache Spark:地图与地图分区?
Posted
技术标签:
【中文标题】Apache Spark:地图与地图分区?【英文标题】:Apache Spark: map vs mapPartitions? 【发布时间】:2014-01-17 11:41:12 【问题描述】:RDD's map
和 mapPartitions
方法有什么区别? flatMap
的行为是像 map
还是像 mapPartitions
?谢谢。
(编辑) 即有什么区别(在语义上或在执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] =
rdd.mapPartitions( iter: Iterator[A] => for (i <- iter) yield fn(i) ,
preservesPartitioning = true)
还有:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] =
rdd.map(fn)
【问题讨论】:
看完下面的回答,可以看看实际使用过的人分享的【这个经验】。(bzhangusc.wordpress.com/2014/06/19/…)bzhangusc.wordpress.com/2014/06/19/… 【参考方案1】:展示。提示:
只要你有重量级的初始化,就应该做一次 对于许多
RDD
元素,而不是每个RDD
元素一次,如果这样 初始化,例如从第三方创建对象 库,无法序列化(以便 Spark 可以将其传输 集群到工作节点),使用mapPartitions()
而不是map()
。mapPartitions()
提供要完成的初始化 每个工作任务/线程/分区一次,而不是每个RDD
数据一次 example : 的元素见下文。
val newRd = myRdd.mapPartitions(partition =>
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record =>
readMatchingFromDB(record, connection)
).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
)
第二季度。
flatMap
的行为是像 map 还是像mapPartitions
?
是的。请参阅flatmap
.. 的示例 2。它不言自明。
第一季度。 RDD 的
map
和mapPartitions
有什么区别
map
在每个元素级别运行正在使用的函数,而mapPartitions
在分区级别执行函数。
示例场景:如果我们在特定的 RDD
分区中有 100K 元素,那么我们将在使用时触发映射转换使用的函数 100K 次map
。
相反,如果我们使用mapPartitions
,那么我们只会调用特定函数一次,但我们会传入所有 100K 记录并在一次函数调用中取回所有响应。
由于map
在特定函数上工作了很多次,所以性能会有所提升,特别是如果函数每次都在做一些昂贵的事情,如果我们传入所有元素就不需要做一次(如果是mappartitions
)。
地图
对 RDD 的每一项应用一个转换函数并返回 结果作为一个新的 RDD。
列出变体
def map[U: ClassTag](f: T => U): RDD[U]
例子:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
地图分区
这是一个专门的映射,每个分区只调用一次。 各个分区的全部内容可作为 通过输入参数 (Iterarator[T]) 的顺序值流。 自定义函数必须返回另一个 Iterator[U]。结合的 结果迭代器会自动转换为新的 RDD。请 请注意,以下元组 (3,4) 和 (6,7) 缺失 结果是由于我们选择的分区。
preservesPartitioning
表示输入函数是否保留 partitioner,应该是false
,除非这是一对 RDD 和输入 函数不会修改键。列出变体
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
示例 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] =
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
res.iterator
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
示例 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] =
var res = List[Int]()
while (iter.hasNext)
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
res.iterator
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上面的程序也可以用flatMap写成如下。
使用平面图的示例 2
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论:
mapPartitions
转换比map
更快,因为它调用你的函数一次/分区,而不是一次/元素..
延伸阅读:foreach Vs foreachPartitions When to use What?
【讨论】:
我知道你可以使用map
或mapPartitions
来达到同样的效果(见问题中的两个例子);这个问题是关于为什么你会选择一种方式而不是另一种方式。另一个答案中的 cmets 真的很有用!另外,您没有提到map
和flatMap
将false
传递给preservesPartitioning
,这意味着什么。
每次执行的函数与分区的函数执行一次是我缺少的链接。使用 mapPartition 一次访问多个数据记录是一件非常宝贵的事情。感谢回答
有没有map
优于mapPartitions
的场景? mapPartitions
这么好,为什么不是默认的map实现呢?
@oneleggedmule:两者都是针对不同的要求,如果您要实例化诸如数据库连接之类的资源(如上例所示),那么我们必须明智地使用这些资源,这些资源成本很高,那么 mappartitions 是正确的方法,因为每个分区一个连接.还 saveAsTextFile 内部使用了 mappartitions see
@oneleggedmule 在我看来,map() 更容易理解和学习,也是很多不同语言的通用方法。如果有人一开始不熟悉这个 Spark 特定方法,它可能比 mapPartitions() 更容易使用。如果没有性能差异,那么我更喜欢使用 map()。【参考方案2】:
RDD 的 map 和 mapPartitions 方法有什么区别?
map 方法通过应用函数将源 RDD 的每个 元素 转换为结果 RDD 的单个元素。 mapPartitions 将源 RDD 的每个 partition 转换为结果的多个元素(可能没有)。
flatMap 的行为是像 map 还是像 mapPartitions?
flatMap 也不适用于单个元素(如 map
)并生成结果的多个元素(如 mapPartitions
)。
【讨论】:
谢谢 - map 是否会导致随机播放(或以其他方式更改分区数量)?它是否在节点之间移动数据?我一直在使用 mapPartitions 来避免在节点之间移动数据,但不确定 flapMap 是否会这样做。 如果您查看源代码 -- github.com/apache/incubator-spark/blob/… 和 github.com/apache/incubator-spark/blob/… --map
和 flatMap
的分区与父级完全相同。
请注意,2013 年旧金山 Spark 峰会 (goo.gl/JZXDCR) 上的一位演讲者提供的演示文稿强调,使用 mapPartition 的每条记录开销较高的任务比使用地图的性能更好转型。根据演示文稿,这是由于设置新任务的成本很高。
我看到了相反的情况——即使是非常小的操作,调用 mapPartitions 和迭代比调用 map 更快。我假设这只是启动将处理地图任务的语言引擎的开销。 (我在 R 中,这可能有更多的启动开销。)如果您要执行多个操作,那么 mapPartitions 似乎要快得多——我假设这是因为它只读取 RDD 一次。即使 RDD 缓存在 RAM 中,也可以节省大量类型转换的开销。
map
基本上将你的函数f
传递给iter.map(f)
。所以基本上它是一种包装mapPartitions
的便捷方法。如果纯地图样式转换作业(即功能相同)具有性能优势,我会感到惊讶,如果您需要创建一些对象进行处理,如果这些对象可以共享,那么mapPartitions
会有优势。【参考方案3】:
地图:
一次处理一行,非常类似于 MapReduce 的 map() 方法。 您在每一行之后从转换中返回。
地图分区
它一次性处理完整的分区。 处理整个分区后,您只能从函数返回一次。 在处理整个分区之前,所有中间结果都需要保存在内存中。 提供你喜欢的 MapReduce 的 setup() map() 和 cleanup() 函数
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/
【讨论】:
关于 2 - 如果您正在执行迭代器到迭代器的转换,而不是将迭代器具体化为某种集合,则实际上您不必将整个分区保存在内存中, 这样 spark 将能够将部分分区溢出到磁盘。 您不必将整个分区保存在内存中,而是保存结果。处理完整个分区后才能返回结果【参考方案4】:地图:
地图变换。
地图一次只在一行上工作。
Map 在每个输入行之后返回。
映射不将输出结果保存在内存中。
地图没办法弄清楚然后就结束服务了。
// map example
val dfList = (1 to 100) toList
val df = dfList.toDF()
val dfInt = df.map(x => x.getInt(0)+2)
display(dfInt)
地图分区:
MapPartition 转换。
MapPartition 一次在一个分区上工作。
MapPartition 在处理完分区中的所有行后返回。
MapPartition 输出保留在内存中,因为它可以在处理特定分区中的所有行后返回。
MapPartition 服务可以在返回前关闭。
// MapPartition example
Val dfList = (1 to 100) toList
Val df = dfList.toDF()
Val df1 = df.repartition(4).rdd.mapPartition((int) => Iterator(itr.length))
Df1.collec()
//display(df1.collect())
更多详情请参考Spark map vs mapPartitions transformation文章。
希望对您有所帮助!
【讨论】:
以上是关于Apache Spark:地图与地图分区?的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark 动态分区 OverWrite 问题