RDD 算子分类

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD 算子分类相关的知识,希望对你有一定的参考价值。

参考技术A 摘要: 本文主要介绍Spark算子的作用,以及算子的分类。

转换:Transformation , 行动: Action

RDD算子分类,大致可以分为两类,即:

1.  Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

2.  Action:行动算子,这类算子会触发SparkContext提交Job作业。

下面分别对两类算子进行详细介绍:

一:Transformation:转换算子

1.  map:

将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做MappedRDD(this,sc.clean(f) )。即:

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

上述例子中把原RDD中每个元素都乘以3来产生一个新的RDD。

2.  mapPartitions:

mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成MapPartitionsRDD。

上述例子是通过 函数filter 对分区中所有数据进行过滤。

3.  mapValues

针对(key,value)型数据中的Value进行操作,而不对Key进行处理。即:

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

4.  mapWith:

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。

eg: 把partition index 乘以10,然后加上2作为新的RDD的元素.(3 是将十个数分为三个区)

5.  flatMap:

将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合,内部创建FlatMappedRDD(this,sc.clean() )。即:

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

eg:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)。

6.  flatMapWith:

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。

7.  flatMapWithValues:

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

8.  reduce:

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

eg:对元素求和。

9.  reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

eg:对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

10.  cartesian:

对两个RDD内的所有元素进行笛卡尔积操作(耗内存),内部实现返回CartesianRDD。

11.  Sample:

sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有有放回的抽样,百分比,随机种子,进而决定采样方式。

内部实现: SampledRDD(withReplacement,fraction,seed)。

函数参数设置:

�             withReplacement=true,表示有放回的抽样。

�             withReplacement=false,表示无放回的抽样。

根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。

12.  union:

使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操作,保存所有的元素,如果想去重,可以使用distinct()。同时,spark还提供更为简洁的使用union的API,即通过++符号相当于union函数操作。

eg: a 与 b 的联合

去重复:

13.  groupBy:

将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

eg:根据数据集中的每个元素的K值对数据分组

14.  join:

join对两个需要连接的RDD进行cogroup函数操作,将相同key的数据能偶放到一个分区,在cgroup操作之后形成新RDD对每个key下的元素进行笛卡尔积的操作,返回的结果在展平,对应key下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

eg:a与b两个数据连接,相当于表的关联

15.  cache:

cache将RDD元素从磁盘缓存到内存。相当于 persist(MEMORY_ONLY) 函数的

功能。

16.  persist:

persist函数对RDD进行缓存操作,数据缓存在哪里,由StorageLevel这个枚举类型进行确定。DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

函数定义: persist(newLevel:StorageLevel)

StorageLevel 是枚举类型,代表存储模式。

MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。

二:Action:行动算子

1.  foreach:

foreach对RDD中的每个元素都应用f函数操作,不返回 RDD 和 Array, 而是返回Uint。

2.  saveAsTextFile:

函数将数据输出,存储到 HDFS 的指定目录。

函数的内 部实现,其内部通过调用 saveAsHadoopFile 进行实现:

this.map(x => (NullWritable.get(), new Text(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。

3.  collect:

collect相当于toArray,不过已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数据,在这个数组上运用 scala 的函数式操作。

4.  count:

count返回整个RDD的元素个数。

Spark RDD算子实战

[TOC]


Spark算子概述

RDD:弹性分布式数据集,是一种特殊集合、支持多种来源、有容错机制、可以被缓存、支持并行操作,一个RDD代表多个分区里的数据集。

RDD有两种操作算子:

  • Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作
  • Action(执行):触发Spark作业的运行,真正触发转换算子的计算

需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都没有简写,因为要简写的话对于scala来说真的就是一句话的事情了。

另外如果是在本地环境进行开发,那么需要添加相关依赖:

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.5</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

Transformation算子

概述

需要操作的Transformation算子说明如下:

  • map(func)

    返回一个新的分布式数据集,由每个原元素经过func函数转换后组成

  • filter(func)

    返回一个新的数据集,由经过func函数后返回值为true的原元素组成

  • flatMap(func)

    类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  • sample(withReplacement, frac, seed)

    根据给定的随机种子seed,随机抽样出数量为frac的数据

  • union(otherDataset)

    返回一个新的数据集,由原数据集和参数联合而成

  • groupByKey([numTasks])

    在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task

  • reduceByKey(func, [numTasks])

    在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。

  • join(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

map

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps1(sc)

        sc.stop()
    }
    /**
      * 1、map:将集合中每个元素乘以7
      * map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
      */
    def transformationOps1(sc:SparkContext): Unit = {
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD = sc.parallelize(list)
        val retRDD = listRDD.map(num => num * 7)
        retRDD.foreach(num => println(num))
    }
}

执行结果如下:

42
7
49
14
56
21
63
28
70
35

filter

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps2(sc)

        sc.stop()
    }
    /**
      * 2、filter:过滤出集合中的奇数
      * filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组成
      *
      * 一般在filter操作之后都要做重新分区(因为可能数据量减少了很多)
      */
    def transformationOps2(sc:SparkContext): Unit = {
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD = sc.parallelize(list)
        val retRDD = listRDD.filter(num => num % 2 == 0)
        retRDD.foreach(println)
    }
}

输出结果如下:

6
2
8
4
10

flatMap

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps3(sc)

        sc.stop()
    }
    /**
      * 3、flatMap:将行拆分为单词
      * flatMap(func):类似于map,但是每一个输入元素,
      * 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
      */
    def transformationOps3(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        wordsRDD.foreach(println)
    }
}

输出结果如下:

hello
hello
he
you
hello
me

sample

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps4(sc)

        sc.stop()
    }
    /**
      * 4、sample:根据给定的随机种子seed,随机抽样出数量为frac的数据
      * sample(withReplacement, frac, seed): 根据给定的随机种子seed,随机抽样出数量为frac的数据
      * 抽样的目的:就是以样本评估整体
      * withReplacement:
      *     true:有放回的抽样
      *     false:无放回的抽样
      * frac:就是样本空间的大小,以百分比小数的形式出现,比如20%,就是0.2
      *
      * 使用sample算子计算出来的结果可能不是很准确,1000个数,20%,样本数量在200个左右,不一定为200
      *
      * 一般情况下,使用sample算子在做spark优化(数据倾斜)的方面应用最广泛
      */
    def transformationOps4(sc:SparkContext): Unit = {
        val list = 1 to 1000
        val listRDD = sc.parallelize(list)
        val sampleRDD = listRDD.sample(false, 0.2)

        sampleRDD.foreach(num => print(num + " "))
        println
        println("sampleRDD count: " + sampleRDD.count())
        println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
    }
}

输出结果如下:

sampleRDD count: 219
Another sampleRDD count: 203

union

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps5(sc)

        sc.stop()
    }
    /**
      * 5、union:返回一个新的数据集,由原数据集和参数联合而成
      * union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
      * 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素
      */
    def transformationOps5(sc:SparkContext): Unit = {
        val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val list2 = List(7, 8, 9, 10, 11, 12)
        val listRDD1 = sc.parallelize(list1)
        val listRDD2 = sc.parallelize(list2)
        val unionRDD = listRDD1.union(listRDD2)

        unionRDD.foreach(println)
    }
}

输出结果如下:

1
6
2
7
3
8
4
9
5
10
7
8
9
10
11
12

groupByKey

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps6(sc)

        sc.stop()
    }
    /**
      * 6、groupByKey:对数组进行 group by key操作
      * groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
      * 注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
      * mr中:
      * <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3>
      * groupByKey类似于shuffle操作
      *
      * 和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下,
      * 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作
      */
    def transformationOps6(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
        pairsRDD.foreach(println)
        val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey()
        println("=============================================")
        gbkRDD.foreach(t => println(t._1 + "..." + t._2))
    }
}

输出结果如下:

(hello,1)
(hello,1)
(you,1)
(he,1)
(hello,1)
(me,1)
=============================================
you...CompactBuffer(1)
hello...CompactBuffer(1, 1, 1)
he...CompactBuffer(1)
me...CompactBuffer(1)

reduceByKey

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps7(sc)

        sc.stop()
    }
    /**
      * 7、reduceByKey:统计每个班级的人数
      * reduceByKey(func, [numTasks]): 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,
      * key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
      *
      * 需要注意的是还有一个reduce的操作,其为action算子,并且其返回的结果只有一个,而不是一个数据集
      * 而reduceByKey是一个transformation算子,其返回的结果是一个数据集
      */
    def transformationOps7(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
        val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)

        retRDD.foreach(t => println(t._1 + "..." + t._2))
    }
}

输出结果如下:

you...1
hello...3
he...1
me...1

join

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps8(sc)

        sc.stop()
    }
    /**
      * 8、join:打印关联的组合信息
      * join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
      * 学生基础信息表和学生考试成绩表
      * stu_info(sid ,name, birthday, class)
      * stu_score(sid, chinese, english, math)
      *
      * *  Serialization stack:
    - object not serializable
        这种分布式计算的过程,一个非常重要的点,传递的数据必须要序列化

        通过代码测试,该join是等值连接(inner join)
        A.leftOuterJoin(B)
            A表所有的数据都包涵,B表中在A表没有关联的数据,显示为null
        之后执行一次filter就是join的结果
      */
    def transformationOps8(sc: SparkContext): Unit = {
        val infoList = List(
            "1,钟  潇,1988-02-04,bigdata",
            "2,刘向前,1989-03-24,linux",
            "3,包维宁,1984-06-16,oracle")
        val scoreList = List(
            "1,50,21,61",
            "2,60,60,61",
            "3,62,90,81",
            "4,72,80,81"
        )

        val infoRDD:RDD[String] = sc.parallelize(infoList)
        val scoreRDD:RDD[String] = sc.parallelize(scoreList)

        val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => {
            val fields = line.split(",")
            val student = new Student(fields(0), fields(1), fields(2), fields(3))
            (fields(0), student)
        })
        val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => {
            val fields = line.split(",")
            val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat)
            (fields(0), score)
        })

        val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD)
        joinedRDD.foreach(t => {
            val sid = t._1
            val student = t._2._1
            val score = t._2._2
            println(sid + "\t" + student + "\t" + score)
        })

        println("=========================================")

        val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD)
        leftOuterRDD.foreach(println)

    }
}

输出结果如下:

3   3 包维宁 1984-06-16 oracle 3 62.0 90.0 81.0
2   2 刘向前 1989-03-24 linux  2 60.0 60.0 61.0
1   1 钟  潇 1988-02-04 bigdata   1 50.0 21.0 61.0
=========================================
(4,(4 72.0 80.0 81.0,None))
(3,(3 62.0 90.0 81.0,Some(3 包维宁 1984-06-16 oracle)))
(2,(2 60.0 60.0 61.0,Some(2 刘向前 1989-03-24 linux)))
(1,(1 50.0 21.0 61.0,Some(1 钟  潇 1988-02-04 bigdata)))

sortByKey

测试代码如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps7(sc)

        sc.stop()
    }
    /**
      * sortByKey:将学生身高进行(降序)排序
      *     身高相等,按照年龄排(升序)
      */
    def transformationOps9(sc: SparkContext): Unit = {
        val list = List(
            "1,李  磊,22,175",
            "2,刘银鹏,23,175",
            "3,齐彦鹏,22,180",
            "4,杨  柳,22,168",
            "5,敦  鹏,20,175"
        )
        val listRDD:RDD[String] = sc.parallelize(list)

        /*  // 使用sortBy操作完成排序
        val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
            override def compare(x: String, y: String): Int = {
                val xFields = x.split(",")
                val yFields = y.split(",")
                val xHgiht = xFields(3).toFloat
                val yHgiht = yFields(3).toFloat
                val xAge = xFields(2).toFloat
                val yAge = yFields(2).toFloat
                var ret = yHgiht.compareTo(xHgiht)
                if (ret == 0) {
                    ret = xAge.compareTo(yAge)
                }
                ret
            }
        } ,ClassTag.Object.asInstanceOf[ClassTag[String]])
        */
        // 使用sortByKey完成操作,只做身高降序排序
        val heightRDD:RDD[(String, String)] = listRDD.map(line => {
            val fields = line.split(",")
            (fields(3), line)
        })
        val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1)   // 需要设置1个分区,否则只是各分区内有序
        retRDD.foreach(println)

        // 使用sortByKey如何实现sortBy的二次排序?将上面的信息写成一个java对象,然后重写compareTo方法,在做map时,key就为该对象本身,而value可以为null

    }
}

输出结果如下:

(180,3,齐彦鹏,22,180)
(175,1,李  磊,22,175)
(175,2,刘银鹏,23,175)
(175,5,敦  鹏,20,175)
(168,4,杨  柳,22,168)

combineByKey与aggregateByKey

下面的代码分别使用combineByKey和aggregateByKey来模拟groupByKey和reduceBykey,所以是有4个操作,只要把combineByKey模拟groupByKey的例子掌握了,其它三个相对就容易许多了。

整体来说理解不太容易,但是非常重要,所以一定是要掌握的!


/**
  * spark的transformation操作:
  * aggregateByKey
  * combineByKey
  *
  * 使用combineByKey和aggregateByKey模拟groupByKey和reduceByKey
  *
  * 通过查看源码,我们发现aggregateByKey底层,还是combineByKey
  *
  * 问题:combineByKey和aggregateByKey的区别?
  * aggregateByKey是柯里化形式的,目前底层源码还没时间去分析,所知道的区别是这个
  */
object _03SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

//        combineByKey2GroupByKey(sc)
//        combineByKey2ReduceByKey(sc)
//        aggregateByKey2ReduceByKey(sc)
        aggregateByKey2GroupByKey(sc)

        sc.stop()
    }

    /**
      * 使用aggregateByKey模拟groupByKey
      */
    def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) (  // 这里需要指定value的类型为ArrayBuffer[Int]()
            (part, num) => {
                part.append(num)
                part
            },
            (part1, part2) => {
                part1.++=(part2)
                part1
            }
        )

        retRDD.foreach(println)
    }

    /**
      * 使用aggregateByKey模拟reduceByKey
      *   def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]
      (zeroValue: U)就对应的是combineByKey中的第一个函数的返回值
      seqOp 就对应的是combineByKey中的第二个函数,也就是mergeValue
      combOp 就对应的是combineByKey中的第三个函数,也就是mergeCombiners
      */
    def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) (
            (partNum, num) => partNum + num,    // 也就是mergeValue
            (partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
        )

        retRDD.foreach(println)
    }

    /**
      * 使用reduceByKey模拟groupByKey
      */
    def combineByKey2ReduceByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        /**
          * 对于createCombiner1   mergeValue1     mergeCombiners1
          * 代码的参数已经体现得很清楚了,其实只要理解了combineByKey模拟groupByKey的例子,这个就非常容易了
          */
        var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)

        retRDD.foreach(println)
    }

    /**
      * reduceByKey操作,value就是该数值本身,则上面的数据会产生:
      * (hello, 1) (bo, 1)   (bo, 1)
      * (zhou, 1)  (xin, 1)  (xin, 1)
      * (hello, 1) (song, 1) (bo, 1)
      * 注意有别于groupByKey的操作,它是创建一个容器
      */
    def createCombiner1(num:Int):Int = {
        num
    }

    /**
      * 同一partition内,对于有相同key的,这里的mergeValue直接将其value相加
      * 注意有别于groupByKey的操作,它是添加到value到一个容器中
      */
    def mergeValue1(localNum1:Int, localNum2:Int): Int = {
        localNum1 + localNum2
    }

    /**
      * 将两个不同partition中的key相同的value值相加起来
      * 注意有别于groupByKey的操作,它是合并两个容器
      */
    def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = {
        thisPartitionNum1 + anotherPartitionNum2
    }

    /**
      * 使用combineByKey模拟groupByKey
      */
    def combineByKey2GroupByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        // 输出每个partition中的map对
        pairsRDD.foreachPartition( partition => {
            println("<=========partition-start=========>")
            partition.foreach(println)
            println("<=========partition-end=========>")
        })

        val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)

        gbkRDD.foreach(println)

        // 如果要测试最后groupByKey的结果是在几个分区,可以使用下面的代码进行测试
        /*gbkRDD.foreachPartition(partition => {
            println("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
            partition.foreach(println)
        })*/

    }

    /**
      * 初始化,将value转变成为标准的格式数据
      * 是在每个分区中进行的操作,去重后的key有几个,就调用次,
      * 因为对于每个key,其容器创建一次就ok了,之后有key相同的,只需要执行mergeValue到已经创建的容器中即可
      */
    def createCombiner(num:Int):ArrayBuffer[Int] = {
        println("----------createCombiner----------")
        ArrayBuffer[Int](num)
    }

    /**
      * 将key相同的value,添加到createCombiner函数创建的ArrayBuffer容器中
      * 一个分区内的聚合操作,将一个分区内key相同的数据,合并
      */
    def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = {
        println("----------mergeValue----------")
        ab.append(num)
        ab
    }

    /**
      * 将key相同的多个value数组,进行整合
      * 分区间的合并操作
      */
    def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = {
        println("----------mergeCombiners----------")
        ab1 ++= ab2
        ab1
    }

}

/*
combineByKey模拟groupByKey的一个输出效果,可以很好地说明createCombiner、mergeValue和mergeCombiners各个阶段的执行时机:
<=========partition-start=========>
<=========partition-start=========>
(hello,1)
(zhou,1)
(bo,1)
(xin,1)
(bo,1)
(xin,1)
<=========partition-end=========>
(hello,1)
(song,1)
(bo,1)
<=========partition-end=========>
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeValue----------
----------mergeValue----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeCombiners----------
----------mergeCombiners----------
(song,ArrayBuffer(1))
(hello,ArrayBuffer(1, 1))
(bo,ArrayBuffer(1, 1, 1))
(zhou,ArrayBuffer(1))
(xin,ArrayBuffer(1, 1))
 */

Actions算子

概述

前面Transformationt算子的测试都是在本地开发环境中直接跑代码,这里Actions算子的测试主要在spark-shell中进行操作,因为会方便很多。

需要说明的Actions算子如下:

  • reduce(func)

    通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行

  • collect()

    在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM

  • count()

    返回数据集的元素个数

  • take(n)

    返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)

  • first()

    返回数据集的第一个元素(类似于take(1))

  • saveAsTextFile(path)

    将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本

  • saveAsSequenceFile(path)

    将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)

  • foreach(func)

    在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

reduce

通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。

关于reduce的执行过程,可以对比scala中类似的reduce函数,相关说明可以参考我的scala整理的知识点。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29

scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)
...
ret: Int = 21

需要注意的是,不同于Transformation算子,其结果仍然是RDD,但是执行Actions算子之后,其结果不再是RDD,而是一个标量。

collect

在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM,这点尤其需要注意。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29

scala> val ret = listRDD.collect()
...
ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)

count

返回数据集的元素个数。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29

scala> val ret = listRDD.count()
...
ret: Long = 6

take

返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29

scala> listRDD.take(3)
...
res7: Array[Int] = Array(1, 2, 3)

first

返回数据集的第一个元素(类似于take(1))。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29

scala> listRDD.first()
...
res8: Int = 1

saveAsTextFile

将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29

scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")
...

可以在文件系统中查看到保存的文件:

[[email protected] action]$ pwd
/home/uplooking/data/spark/action
[[email protected] action]$ ls
part-00000  part-00001  part-00002  part-00003  _SUCCESS

其实可以看到,保存的跟Hadoop的格式是一样的。

当然因为我的spark集群中已经做了跟hadoop相关的配置,所以也可以把文件保存到hdfs中:

scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")
...

然后就可以在hdfs中查看到保存的文件:

[[email protected] action]$ hdfs dfs -ls /output/spark/action
18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r--   3 uplooking supergroup          0 2018-04-27 10:25 /output/spark/action/_SUCCESS
-rw-r--r--   3 uplooking supergroup          2 2018-04-27 10:25 /output/spark/action/part-00000
-rw-r--r--   3 uplooking supergroup          4 2018-04-27 10:25 /output/spark/action/part-00001
-rw-r--r--   3 uplooking supergroup          2 2018-04-27 10:25 /output/spark/action/part-00002
-rw-r--r--   3 uplooking supergroup          4 2018-04-27 10:25 /output/spark/action/part-00003

可以看到,保存的格式跟保存到本地文件系统是一样的。

foreach

在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29

scala> listRDD.foreach(println)
...

saveAsNewAPIHadoopFile

也就是将数据保存到Hadoop HDFS中,但是需要注意的是,前面使用saveAsTextFile也可以进行相关操作,其使用的就是saveAsNewAPIHadoopFile或者saveAsHadoopFile这两个API,而其两者的区别是:

  • saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类
  • saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类。但不管使用哪一个,都是可以完成工作的。

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.core.p2

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark算子操作之Action
  *     saveAsNewAPIHAdoopFile
  *     * saveAsHadoopFile
  * 和saveAsNewAPIHadoopFile的唯一区别就在于OutputFormat的不同
  * saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类
  * saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类
  * 使用哪一个都可以完成工作
  *
  * 前面在使用saveAsTextFile时也可以保存到hadoop文件系统中,注意其源代码也是使用上面的操作的
  *
  *   Caused by: java.net.UnknownHostException: ns1
    ... 35 more
  找不到ns1,因为我们在本地没有配置,无法正常解析,就需要将hadoop的配置文件信息给我们加载进来
    hdfs-site.xml.heihei,core-site.xml.heihei
  */
object _05SparkActionOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName)
        val sc = new SparkContext(conf)

        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val pairsRDD = listRDD.map(word => (word, 1))
        val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2)

        retRDD.saveAsNewAPIHadoopFile(
            "hdfs://ns1/spark/action",      // 保存的路径
            classOf[Text],                      // 相当于mr中的k3
            classOf[IntWritable],               // 相当于mr中的v3
            classOf[TextOutputFormat[Text, IntWritable]]    // 设置(k3, v3)的outputFormatClass
        )

    }
}

之后我们可以在hdfs中查看到相应的文件输出:

[[email protected] ~]$ hdfs dfs -ls /spark/action               
18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   3 Administrator supergroup          0 2018-04-27 12:07 /spark/action/_SUCCESS
-rw-r--r--   3 Administrator supergroup         13 2018-04-27 12:07 /spark/action/part-r-00000
-rw-r--r--   3 Administrator supergroup         11 2018-04-27 12:07 /spark/action/part-r-00001
[[email protected] ~]$ hdfs dfs -text /spark/action/part-r-00000
18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello   3
me      1
[[email protected] ~]$ hdfs dfs -text /spark/action/part-r-00001
18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
you     1
he      1

宽依赖和窄依赖

窄依赖(narrow dependencies)

子RDD的每个分区依赖于常数个父分区(与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap
输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
从输入中选择部分元素的算子,如filter、distinct、substract、sample

宽依赖(wide dependencies)

子RDD的每个分区依赖于所有的父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
对两个RDD基于key进行join和重组,如join
经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销。

注意:reduce是一个action,和reduceByKey完全不同。

关于宽依赖和窄依赖,《Hadoop与大数据挖掘》书本上的说明非常精简,但是理解起来也是不错的,可以参考一下,当然,这本书的Spark内容就写得非常少了。

以上是关于RDD 算子分类的主要内容,如果未能解决你的问题,请参考以下文章

Spark 算子

RDD之六:Action算子

大数据之Spark:Spark Core

SparkRDD操作具体解释4——Action算子

Spark算子系列第0篇:spark常用算子详解

Spark算子系列第0篇:spark常用算子详解