RDD转换算子与操作算子

Posted yangp

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD转换算子与操作算子相关的知识,希望对你有一定的参考价值。

一、RDD算子分类

1. RDD算子分类及概述

  RDD的算子分为Transformation和Action两类,Transformation是延迟执行,Action是立即执行。Transformation和Action本质上的区别是,Transformation是从一个RDD到一个RDD,Action是从一个RDD到一个值。由下图可知,Spark的的转换算子与操作算子的执行流程。首先可以从HDFS中使用textFile方法将数据加载到内存,然后经过转换算子对RDD进行转换,最后再通过操作算子Actions,如saveAsTextFile,将结果存回HDFS中去。

技术分享图片

2. 源码解读Transformation算子与Action算子的本质区别

  • Transformation本质
//例如flatMap,应用在RDD上,结果是new MapPartitionsRDD,这个方法就结束了。
/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
  •  Action本质
//例如reduce,应用在RDD上,结果是使用sc执行了runJob方法。
/**
 * Reduces the elements of this RDD using the specified commutative and
 * associative binary operator.
 */
def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
  val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  sc.runJob(this, reducePartition, mergeResult)
  // Get the final result out of our Option, or throw an exception if the RDD was empty
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

二、常用Transformations算子及释义

1. textFile

  将文件加载到内存,抽象成一个RDD,是一个Transformation操作。

2. map

  map(f:T=>U) : RDD[T]=>RDD[U]

  map函数需要一个参数,此参数是一个匿名函数,在参数进来时使用f来接收。这个匿名函数需要的参数进来时是T类型,进过匿名函数转化之后变成U类型。这些算子都是操作在RDD上的,那么函数map(f:T=>U)的功能是把一个RDD[T],这RDD里面每个元素是类型的转成U类型。map函数实质上就是对RDD集合中的每个元素依次进行操作。

3. filter

  filter(f:T=>Bool) : RDD[T]=>RDD[T]

  filter函数,传一个匿名函数进来,参数为T,匿名函数转换后变成Boolean类型,如果参数是True就被保留下来,如果是False就被过滤掉。RDD[T]=>RDD[T],filter作用前后RDD中的元素都是T类型,只是把不符合的元素去除了。

4. flatmap

  flatmap(f:T=>Seq[U]) : RDD[T]=>RDD[U]

  一个flat+一个map,map是对集合的每个元素进行操作,flat英文直译是压扁,flatmap(f:T=>Seq[U])中的Seq是集合序列之意。例如,一个英文句子,进行分词可以按空格隔开,句子进来就是String类型,对每个元素进行分词就是一个个单词组成的String数组,这就是一个Seq集合;如果对于一个句子使用map的话出去的还是一个句子。因为map之后是一个数组,并没有打散成为一个个单词,所以flat的作用就在于此。

5. sample

  sample(fraction:Float) : RDD[T]=>RDD[T](Deterministic sampling)

  传入浮点数,采样。

6. groupByKey

  groupByKey() : RDD[(K,V)]=>RDD[(K,Seq[V])]

  按Key进行分组,必须运用于一个元素为键值对形式的RDD,可以按照Key进行分组,把相同Key的值放到一个集合中去。

7. reduceByKey

  reduceByKey(f:(V,V)=>V) : RDD[(K,V)]=>RDD[(K,V)]

  reduceByKey=一个groupByKey()+一个reduce。先groupByKey(),把相同的Key的value放到一个序列中;然后进行reduce操作,例如:_+_就是把每一个元素依次进行操作。RDD进来时的每一个元素是(K,V),这里的Key可能重复;出去的RDD的元素也是(K,V),但是这里的Key无重复,因为有groupByKey()操作。

8. union

  union() : (RDD[T], RDD[T])=>RDD[T]

  union意为联合,把两个相同元素类型的RDD,转化到一起去。

9. join

  join() : (RDD[(K,V)], RDD[(K,W)]) => RDD[(K,(V,W))]

  join是把两个以键值对形式的为元素的RDD,按K进行分组,转化成一个RDD,这个RDD的元素依旧是键值对形式,键为K,值为由原来相同键的值组成的元组。

10. cogroup

  cogroup() : (RDD[(K,V)], RDD[(K,W)])=> RDD[(K,(Seq[V],Seq[W]))]

  cogroup把两个RDD变成一个RDD,最后的RDD是键值对形式,按K分组,把值V和W变成,数组序列。

11. crossProduct

  crossProduct() : (RDD[T],RDD[U])=>RDD[(T,U)]

  把两个RDD中的元素放到一个RDD中,并且分别作为最终RDD元素的键和值。

12. mapValues

  mapValues(f:V=>W) : RDD[(K,V)]=>RDD[(K,W)](Preserves partitioning)

  mapValues是对RDD中的每个元素(K,V)中的V进行map操作,K保持不变。

13. sort

  sort(c:Comparator[K]) : RDD[(K,V)]=>RDD[(K,V)]

  sort进行排序,至于排序方法是和传进来的Comparator有关。RDD[(K,V)] => RDD[(K,V)],元素本身没有变,只是进行了排序。

14. sortBykey

   和sort类似,sortByKey是根据键进行排序。

15. sortBy

   sortBy与sortByKey不同,sortBy可以指定根据集合中的第几个元素进行排序。例如根据value排序,rdd.sortBy(_._2)。

16. partitionBy

  partitionBy(p:Partitioner[K]) : RDD[(K,V)]=>RDD[(K,V)]

  可以传入自定义的Partitioner,传入Key,然后根据Key重新进行Partitioner。这时再看RDD里的内容是完全没有变化的,只是内部有一个shuffle,重新分组,把数据多机的内存传到另外多机的内存中去。 

三、常用Actions算子及释义

1. count

  count() : RDD[T]=>Long

  count是统计RDD里面元素的数量。因为count不知道RDD元素的个数,所以返回值类型为Long类型。 

2. collect

  collect() : RDD[T]=>Seq[T]

  collect,把一个RDD返回到一个集合序列中去。RDD[T]=>Seq[T],RDD[T]是弹性分布式数据集,Seq[T]是单机的集合。collect是很恐怖的操作,相当于把分布式的数据,拿到当前的机器或当前客户端的程序中来,如果数据量大而且当前机器的内存小的话机器就容易挂掉。

3. reduce

  reduce(f:(T,T)=>T) : RDD[T]=>T

  reduce和reduceByKey不同,reduceByKey是一个Transformation,而reduce是一个action。reduce可以将以T为元素类型的RDD转成T类型。例如一个集合是(1,4,6,4,1),对它进行reduce就会返回相同的类型的值。 

4. lookup

  lookup(k:K) : RDD[(K,V)=>Seq[V](On hash/range partitioned RDDs)

  根据Key,把Key的value都找出来,然后放到本地的集合序列中去。 

5. save

  save(path:String) : Outputs RDD to a storage system, e.g., HDFS

  save需要传一个路径,把RDD的结果给输出到文件中去,可以是本地的也可以是hdfs的。 

6. saveAsTextFile

  saveAsTextFlie(path:String) : Outputs RDD to a storage system, e.g., HDFS

  这个saveAsTextFlie不能使用本地的路径。 

7. foreach

  对RDD中的每个元素进行遍历操作。例如,打印RDD的每个元素:

  rdd.foreach(println)

  更多资源可以参看Zhen He RDD API

以上是关于RDD转换算子与操作算子的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子

RDD转换操作算子 --- 集合操作(unionintersectionsubtract)

Spark之RDD算子-转换算子

Spark RDD算子实战

spark算子介绍

spark算子 分为3大类