Spark算子系列第0篇:spark常用算子详解
Posted Frank201608
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子系列第0篇:spark常用算子详解相关的知识,希望对你有一定的参考价值。
一、Spark算子分类:Transformation 算子
Transformation 是变换/转换算子,具有lazy特性,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。常用操作有:map,filter,flatMap,union,sortByKey,reduceByKey等,官方文档如下:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
官网摘录如下(不感兴趣的,可以跳过这段,后面的内容跟精彩!!!):
Transformations
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
二、Spark算子分类:Action 算子
Action 是行动算子,这类算子会触发 SparkContext 提交 Job 作业。
官网文档如下:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
摘录如下:
Actions
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() | Return the number of elements in the dataset. |
first() | Return the first element of the dataset (similar to take(1)). |
take(n) | Return an array with the first n elements of the dataset. |
takeSample(withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
saveAsSequenceFile(path) (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
saveAsObjectFile(path) (Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile() . |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
|
三、具体算子解析
(1) map
将原来 RDD 的每个数据项通过自定义函数 f 映射转变为一个新的数据项。任何原RDD中的每个元素在新RDD中都有且只有一个元素与之对应。map算子源码定义如下:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
上图中每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后,这个 f 函数才会和其他函数在一个stage 中对数据进行运算。在上图第一个分区的数据记录 V1 输入 f,通过 f 转换输出为转换后的分区中的数据记录 V'1, 使用map的示例如下:
//通过parallelize创建一个List列表的rdd对象
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> a.foreach(println) //输出每个元素
rat
salmon
dog
salmon
elephant
scala> a.map(_+"!!!").foreach(println) //在每个元素后面补上三个!
salmon!!!
dog!!!
rat!!!
salmon!!!
elephant!!!
(2) flatMap
将原来 RDD 中的每个元素通过自定义函数 f 转换为新的元素,并将这些元素放入一个集合,构成新的RDD。
算子源代码如下:
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
* 返回一个新的RDD,方法是首先将一个函数应用于该RDD的所有元素,然后展平结果。
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
上图表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”。map每个数据项通过自定义函数 f 映射转变为一个新的数据项。任何原RDD中的每个元素在新RDD中都有且只有一个元素与之对应。而flatMap多一步就是将所有对象合并为一个对象。
示例1:
scala> a.map(_+"!!!").collect
res17: Array[String] = Array(dog!!!, salmon!!!, salmon!!!, rat!!!, elephant!!!)
scala> a.flatMap(_+"!!!").collect
res18: Array[Char] = Array(d, o, g, !, !, !, s, a, l, m, o, n, !, !, !, s, a, l, m, o, n, !, !, !, r, a, t, !, !, !, e, l, e, p, h, a, n, t, !, !, !)
示例2:
打开 d:\\wordcount.txt 内容如下:
word in text
hello spark
the third line
scala> var textFile =sc.textFile("file:///d://wordcount.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///d://wordcount.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> textFile.collect
res0: Array[String] = Array(word in text, hello spark, the third line)
scala> var mapResult = textFile.map(line => line.split("\\\\s+"))
mapResult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26
scala> mapResult.collect
res1: Array[Array[String]] = Array(Array(word, in, text), Array(hello, spark), Array(the, third, line))
scala> var flatMapResult = textFile.flatMap(line => line.split("\\\\s+"))
flatMapResult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:26
scala> flatMapResult.collect
res2: Array[String] = Array(word, in, text, hello, spark, the, third, line)
以上应用示例分析如下图所示:
示例3:
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> a.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.flatMap(1 to _).collect
res15: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.map(1 to _).collect
res17: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1), Range(1, 2), Range(1, 2, 3), Range(1, 2, 3, 4), Range(1, 2, 3, 4, 5), Range(1, 2, 3, 4, 5, 6), Range(1, 2, 3, 4, 5, 6, 7), Range(1, 2, 3, 4, 5, 6, 7, 8), Range(1, 2, 3, 4, 5, 6, 7, 8, 9), Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
示例4:
scala> sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res19: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
scala> sc.parallelize(List(1, 2, 3), 2).map(x => List(x, x, x)).collect
res20: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))
(3) filter
filter 算子是对元素进行过滤 , 满足自定义函数f为true的所有元素组成一个新的RDD数据集,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 算子源代码如下:
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
上图中每个方框代表一个 RDD 分区, T 可以是任意的类型。通过用户自定义的过滤函数 f,对每个数据项操作,将满足条件、返回结果为 true 的数据项保留。例如,过滤掉 V2 和 V3 保留了 V1,为区分命名为 V’1。
算子应用实例代码如下:
示例1:
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
res34: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> a.collect
res35: Array[String] = Array(dog, salmon, salmon, rat, elephant)
scala> a.filter(_.length==3).collect
res37: Array[String] = Array(dog, rat)
示例2:
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> a.collect
res28: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.filter(_ % 2 == 0).collect
res29: Array[Int] = Array(2, 4, 6, 8, 10)
(4) mapPartitions
mapPartitions是map的一个变种。map的自定义函数应用于RDD中的每个元素,mapPartitions通过对RDD的每个分区应用自定义函数获取每个分区的迭代器,在函数中通过迭代器对整个分区的元素进行操作,来返回一个新RDD。
下图中的方框代表一个 RDD 分区,通过函数 f (iter)=>iter.filter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。如图所示:
mapPartitions源代码如下:
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
应用示例如下:
scala> val x=sc.parallelize(1 to 10, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> x.collect
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
//mapPartitions通过迭代器对整个分区的元素进行操作
//示例1:对分区中所有数据进行过滤,大于和等于3的数据保留
scala> x.mapPartitions(iter => iter.filter(_>=3)).collect
res12: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)
//示例2:
scala> x.mapPartitions(iter => for(e<-iter) yield e*2).collect
res24: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
(5) glom
glom(形成一个Array数组):将每个分区内的元素组成一个数组,分区不变。
图中的方框代表一个分区。 该图表示含有V1、 V2、 V3的分区通过函数glom形成一数组Array[(V1),(V2),(V3)]。
/**
* Return an RDD created by coalescing all elements within each partition into an array.
* 返回通过将每个分区中的所有元素合并到一个数组中创建的RDD。
*/
def glom(): RDD[Array[T]] = withScope
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
应用示例代码如下:
scala> val a = sc.parallelize(1 to 10, 5) //5个分区
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> a.collect
res28: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.glom.collect //每个分区内的元素组成一个数组,仍然是5个分区
res31: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8), Array(9, 10))
scala> val a = sc.parallelize(1 to 10, 3) //3个分区
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> a.glom.collect //每个分区内的元素组成一个数组,还是3个分区
res32: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
(6) union
使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
上图中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8形成一个分区。示例代码如下:
scala> val data1=sc.parallelize(1 to 6, 2)
data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val data2=sc.parallelize(7 to 12, 2)
data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> data1.collect
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> data2.collect
res7: Array[Int] = Array(7, 8, 9, 10, 11, 12)
scala> res9.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
scala> (data1 ++ data2).collect
res12: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
scala> data1.union(data2).collect
res13: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
(7) groupBy
groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。算子源代码如下:
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
函数实现如下:
1)将用户函数预处理:val cleanF = sc.clean(f)
2)对数据 map 进行函数操作,最后再进行 groupByKey 分组:this.map(t => (cleanF(t), t)).groupByKey(p)
其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。
上图中方框代表一个 RDD 分区,通过自定义函数,将相同key 的元素合并到一个组。例如 V1 和 V2 合并为 V, Value 为 V1,V2。形成 V,Seq(V1,V2)。示例代码如下:
scala> val data1=sc.parallelize(1 to 6, 3)
data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at parallelize at <console>:24
scala> data1.glom.collect
res53: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6))
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect
res52: Array[(String, Iterable[Int])] = Array((偶数,CompactBuffer(2, 4, 6)), (奇数,CompactBuffer(1, 3, 5)))
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect.foreach(x=>println(x))
(偶数,CompactBuffer(2, 4, 6))
(奇数,CompactBuffer(1, 3, 5))
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect.foreach(x=>println(x+"|"+x._2.sum))
(偶数,CompactBuffer(2, 4, 6))|12
(奇数,CompactBuffer(1, 3, 5))|9
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect.foreach(x=>println(x._1+"|"+x._2.sum))
偶数|12
奇数|9
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect.foreach(x=>println(x._1+"|"+x._2))
偶数|CompactBuffer(2, 4, 6)
奇数|CompactBuffer(1, 3, 5)
scala> data1.groupBy(x => if (x % 2 == 0) "偶数" else "奇数" ).collect.foreach(x=>println(x._1+"|"+x._2.mkString(",")))
偶数|2,4,6
奇数|1,3,5
注:groupBy和groupByKey,两个方法的差异就在于有没有 key这个单词,也就是说:groupBy()方法是根据用户自定义的情况进行分组,而groupByKey()则是根据key值进行分组的,也就是说,进行groupByKey()方法的数据本身就是一种key-value类型的,并且数据的分组方式就是根据这个key值相同的进行分组的。那么groupBy()就是传入的自定义函数f返回的值进行分组。
(8)groupByKey
groupByKey对具有相同键的值进行分组, 在一个PairRDD或(k,v)RDD上调用,返回一个(k,Iterable<v>)。主要作用是将相同的所有的键值对分组到一个集合序列当中,其顺序是不确定的。groupByKey是把所有的键值对集合都加载到内存中存储计算,若一个键对应值太多,则易导致内存溢出。示例图如下:
scala> val wordsRDD = sc.parallelize(Array("one", "two", "two", "three", "three", "three"),2).map(word => (word, 1))
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at map at <console>:24
scala> wordsRDD.groupByKey()
res41: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[35] at groupByKey at <console>:27
//groupByKey算子:
scala> wordsRDD.groupByKey().foreach(x=>println(x._1+"|"+x._2.sum))
two|2
one|1
three|3
如果是非常大的数据集,在使用 reduceByKey 和 groupByKey 时差别会被放大更多倍。另外,当移动的数据量大于单台执行机器内存总量时,Spark还需要
把数据保存到磁盘上,这会更加影响性能。因此非常大的数据集,避免使用 GroupByKey。关于reduceByKey 和 groupByKey的区别,详见:https://blog.csdn.net/zhumr/article/details/104220525
(9)reduceByKey
以上是关于Spark算子系列第0篇:spark常用算子详解的主要内容,如果未能解决你的问题,请参考以下文章
Spark算子篇 --Spark算子之combineByKey详解
Spark算子篇 --Spark算子之aggregateByKey详解
Spark算子系列第1篇: reduceByKey 和 groupByKey