Spark mapPartitions 及mapPartitionsWithIndex算子

Posted abcdwxc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark mapPartitions 及mapPartitionsWithIndex算子相关的知识,希望对你有一定的参考价值。

mapPartitions

 与map类似,map函数是应用到每个元素,而mapPartitions的输入函数是每个分区的数据,把每个分区中的内容作为整体来处理的。 当map里面有比较耗时的初始化操作时,比如连接db,可以采用mapPartitions,它对每个partition操作一次,其函数的输入与输出都是iterator类型。

 实例如下:

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
     | var res=List[(T,T)]()
     | var pre=iter.next
     | while (iter.hasNext) {
     | val cur=iter.next
     | res.::=(pre,cur)
     | pre=cur
     | }
     | res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
scala> rdd1.mapPartitions(myfunc)
res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:28
scala> res2.collect()
res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

 

mapPartitionsWithIndex

与 mapPartitions 类似,参数需多传一个分区的index.

实例如下: 

scala> val mapReslut=rdd1.mapPartitionsWithIndex{
     | (index,iterator)=>{
     | val list=iterator.toList
     | list.map(x=>x +"->"+index).iterator
     | }
     | }
mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:25
scala> mapReslut.collect
res6: Array[String] = Array(1->0, 2->0, 3->0, 4->1, 5->1, 6->1, 7->2, 8->2, 9->2) 

以上是关于Spark mapPartitions 及mapPartitionsWithIndex算子的主要内容,如果未能解决你的问题,请参考以下文章

spark中map与mapPartitions区别

SPARK之map()和mapPartition()的区别

Spark foreachpartiton和mappartition的异同

spark中map和mapPartitions算子的区别

[Spark精进]必须掌握的4个RDD算子之mapPartitions算子

[Spark精进]必须掌握的4个RDD算子之mapPartitions算子