Apache Spark:地图与地图分区?

Posted

技术标签:

【中文标题】Apache Spark:地图与地图分区?【英文标题】:Apache Spark: map vs mapPartitions? 【发布时间】:2014-01-17 11:41:12 【问题描述】:

RDD's mapmapPartitions 方法有什么区别? flatMap 的行为是像 map 还是像 mapPartitions?谢谢。

(编辑) 即有什么区别(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = 
    rdd.mapPartitions( iter: Iterator[A] => for (i <- iter) yield fn(i) ,
      preservesPartitioning = true)
  

还有:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = 
    rdd.map(fn)
  

【问题讨论】:

看完下面的回答,可以看看实际使用过的人分享的【这个经验】。(bzhangusc.wordpress.com/2014/06/19/…)bzhangusc.wordpress.com/2014/06/19/… 【参考方案1】:

展示。提示:

只要你有重量级的初始化,就应该做一次 对于许多 RDD 元素,而不是每个 RDD 元素一次,如果这样 初始化,例如从第三方创建对象 库,无法序列化(以便 Spark 可以将其传输 集群到工作节点),使用mapPartitions() 而不是 map()mapPartitions() 提供要完成的初始化 每个工作任务/线程/分区一次,而不是每个RDD 数据一次 example : 的元素见下文。

val newRd = myRdd.mapPartitions(partition => 
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => 
    readMatchingFromDB(record, connection)
  ).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
)

第二季度。 flatMap 的行为是像 map 还是像 mapPartitions

是的。请参阅flatmap.. 的示例 2。它不言自明。

第一季度。 RDD 的 mapmapPartitions 有什么区别

map 在每个元素级别运行正在使用的函数,而 mapPartitions 在分区级别执行函数。

示例场景如果我们在特定的 RDD 分区中有 100K 元素,那么我们将在使用时触发映射转换使用的函数 100K 次map

相反,如果我们使用mapPartitions,那么我们只会调用特定函数一次,但我们会传入所有 100K 记录并在一次函数调用中取回所有响应。

由于map 在特定函数上工作了很多次,所以性能会有所提升,特别是如果函数每次都在做一些昂贵的事情,如果我们传入所有元素就不需要做一次(如果是mappartitions)。

地图

对 RDD 的每一项应用一个转换函数并返回 结果作为一个新的 RDD。

列出变体

def map[U: ClassTag](f: T => U): RDD[U]

例子:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

地图分区

这是一个专门的映射,每个分区只调用一次。 各个分区的全部内容可作为 通过输入参数 (Iterarator[T]) 的顺序值流。 自定义函数必须返回另一个 Iterator[U]。结合的 结果迭代器会自动转换为新的 RDD。请 请注意,以下元组 (3,4) 和 (6,7) 缺失 结果是由于我们选择的分区。

preservesPartitioning 表示输入函数是否保留 partitioner,应该是false,除非这是一对 RDD 和输入 函数不会修改键。

列出变体

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

示例 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = 
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   
   res.iterator
 
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

示例 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = 
   var res = List[Int]()
   while (iter.hasNext) 
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   
   res.iterator
 
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上面的程序也可以用flatMap写成如下。

使用平面图的示例 2

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

结论:

mapPartitions 转换比map 更快,因为它调用你的函数一次/分区,而不是一次/元素..

延伸阅读:foreach Vs foreachPartitions When to use What?

【讨论】:

我知道你可以使用mapmapPartitions来达到同样的效果(见问题中的两个例子);这个问题是关于为什么你会选择一种方式而不是另一种方式。另一个答案中的 cmets 真的很有用!另外,您没有提到mapflatMapfalse 传递给preservesPartitioning,这意味着什么。 每次执行的函数与分区的函数执行一次是我缺少的链接。使用 mapPartition 一次访问多个数据记录是一件非常宝贵的事情。感谢回答 有没有map优于mapPartitions的场景? mapPartitions这么好,为什么不是默认的map实现呢? @oneleggedmule:两者都是针对不同的要求,如果您要实例化诸如数据库连接之类的资源(如上例所示),那么我们必须明智地使用这些资源,这些资源成本很高,那么 mappartitions 是正确的方法,因为每个分区一个连接.还 saveAsTextFile 内部使用了 mappartitions see @oneleggedmule 在我看来,map() 更容易理解和学习,也是很多不同语言的通用方法。如果有人一开始不熟悉这个 Spark 特定方法,它可能比 mapPartitions() 更容易使用。如果没有性能差异,那么我更喜欢使用 map()。【参考方案2】:

RDD 的 map 和 mapPartitions 方法有什么区别?

map 方法通过应用函数将源 RDD 的每个 元素 转换为结果 RDD 的单个元素。 mapPartitions 将源 RDD 的每个 partition 转换为结果的多个元素(可能没有)。

flatMap 的行为是像 map 还是像 mapPartitions?

flatMap 也不适用于单个元素(如 map)并生成结果的多个元素(如 mapPartitions)。

【讨论】:

谢谢 - map 是否会导致随机播放(或以其他方式更改分区数量)?它是否在节点之间移动数据?我一直在使用 mapPartitions 来避免在节点之间移动数据,但不确定 flapMap 是否会这样做。 如果您查看源代码 -- github.com/apache/incubator-spark/blob/… 和 github.com/apache/incubator-spark/blob/… -- mapflatMap 的分区与父级完全相同。 请注意,2013 年旧金山 Spark 峰会 (goo.gl/JZXDCR) 上的一位演讲者提供的演示文稿强调,使用 mapPartition 的每条记录开销较高的任务比使用地图的性能更好转型。根据演示文稿,这是由于设置新任务的成本很高。 我看到了相反的情况——即使是非常小的操作,调用 mapPartitions 和迭代比调用 map 更快。我假设这只是启动将处理地图任务的语言引擎的开销。 (我在 R 中,这可能有更多的启动开销。)如果您要执行多个操作,那么 mapPartitions 似乎要快得多——我假设这是因为它只读取 RDD 一次。即使 RDD 缓存在 RAM 中,也可以节省大量类型转换的开销。 map 基本上将你的函数f 传递给iter.map(f)。所以基本上它是一种包装mapPartitions的便捷方法。如果纯地图样式转换作业(即功能相同)具有性能优势,我会感到惊讶,如果您需要创建一些对象进行处理,如果这些对象可以共享,那么mapPartitions 会有优势。【参考方案3】:

地图

    一次处理一行,非常类似于 MapReduce 的 map() 方法。 您在每一行之后从转换中返回。

地图分区

    它一次性处理完整的分区。 处理整个分区后,您只能从函数返回一次。 在处理整个分区之前,所有中间结果都需要保存在内存中。 提供你喜欢的 MapReduce 的 setup() map() 和 cleanup() 函数

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Maphttp://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

【讨论】:

关于 2 - 如果您正在执行迭代器到迭代器的转换,而不是将迭代器具体化为某种集合,则实际上您不必将整个分区保存在内存中, 这样 spark 将能够将部分分区溢出到磁盘。 您不必将整个分区保存在内存中,而是保存结果。处理完整个分区后才能返回结果【参考方案4】:

地图:

地图变换。

地图一次只在一行上工作。

Map 在每个输入行之后返回。

映射不将输出结果保存在内存中。

地图没办法弄清楚然后就结束服务了。

// map example

val dfList = (1 to 100) toList

val df = dfList.toDF()

val dfInt = df.map(x => x.getInt(0)+2)

display(dfInt)

地图分区:

MapPartition 转换。

MapPartition 一次在一个分区上工作。

MapPartition 在处理完分区中的所有行后返回。

MapPartition 输出保留在内存中,因为它可以在处理特定分区中的所有行后返回。

MapPartition 服务可以在返回前关闭。

// MapPartition example

Val dfList = (1 to 100) toList

Val df = dfList.toDF()

Val df1 = df.repartition(4).rdd.mapPartition((int) => Iterator(itr.length))

Df1.collec()

//display(df1.collect())

更多详情请参考Spark map vs mapPartitions transformation文章。

希望对您有所帮助!

【讨论】:

以上是关于Apache Spark:地图与地图分区?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 动态分区 OverWrite 问题

地图中的 dynamoDB 唯一键值不是分区键

scala-spark实现重分区和打印各个分区的data

Apache spark如何计算分区以及在executor中如何处理分区

大数据入门第二十二天——spark自定义分区排序与查找

如何在 apache spark 中读取最新的分区