Spark Core学习之常用算子(含经典面试题)

Posted 烟雨蒋楠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Core学习之常用算子(含经典面试题)相关的知识,希望对你有一定的参考价值。

目录

前言

第一部分:Transformation算子

Value类型

map()映射

mapPartitions()以分区单位执行Map

mapPartitionsWithIndex()带分区号

flatMap()压平

glom()分区转换数组

groupBy()分组

扩展:复杂版的wordcount

filter()过滤

sample()采样

distinct()去重

coalesce()重新分区

reparation()重新分区(执行Shuffle)

sortBy()排序

pipe()调用脚本

双Value类型交互

union()并集、subtract()差集、intersection()交集

 zip()拉链

Key-Value类型

partitionBy()按照k重新分区

reduceByKey()按照k进行聚合v

备注: 正则表达式过滤字符串的方法

groupByKey()按照k重新分组

aggregateByKey()按照k进行分区内和分区间逻辑

foldByKey()分区内核分区间具有相同逻辑的aggregateByKey()

combineByKey()转换结构后分区内和分区间操作

sortByKey()按照k进行排序

mapValues()只对v进行操作

join()将相同k对应的多个v关联在一起

扩展:左外连接、右外连接和全外连接

cogroup()类似全连接,但是在同一个RDD中对k聚合

第二部分:Action行动算子

reduce()聚合

collect()以数组的形式返回数据集

count()返回RDD中元素个数

first()返回RDD中的第一个元素

take(n)返回由RDD前n个元素组成的数组

takeOrdered(n)返回RDD排序后前n个元素组成的数组

aggregate()案例

fold()案例

countByKey()统计每种key的个数

save相关算子

foreach()&foreachPartition遍历RDD中每一个元素

总结


前言

        在 Spark Core中,RDD(Resilient Distributed Dataset,弹性分布式数据集) 支持 2 种操作:

1、transformation
        从一个已知的 RDD 中创建出来一个新的 RDD 。例如: map就是一个transformation。
2、action
        在数据集上计算结束之后, 给驱动程序返回一个值.。例如: reduce就是一个action。

        注意:

                本文只简单讲述Transformation算子和Action算子,目的是了解其相应的算子如何使用;并不打算深入理解每一个算子的执行过程和逻辑思想。


        在 Spark 中几乎所有的transformation操作都是懒执行的(lazy),也就是说transformation操作并不会立即计算他们的结果,而是记住了这个操作。只有当通过一个action来获取结果返回给驱动程序的时候这些转换操作才开始计算。这种设计可以使 Spark 运行起来更加的高效


        默认情况下,你每次在一个 RDD 上运行一个action的时候,前面的每个transformed RDD 都会被重新计算。但是我们可以通过persist (or cache)方法来持久化一个 RDD 在内存中,也可以持久化到磁盘上, 来加快访问速度。

 根据 RDD 中数据类型的不同,整体分为 2 种 RDD:

  • Value类型
  • Key-Value类型(其实就是存了一个二维的元组)

第一部分:Transformation算子

Value类型

map()映射

需求:

创建一个1~4数组的RDD,两个分区,将所有元素*2形成新的RDD。

        代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    mapTest(sc)

    //4.关闭连接
    sc.stop()
  }

  def mapTest(sc: SparkContext): Unit = {
    // 3.1 创建一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 调用map方法,每个元素乘以2
    val mapRdd: RDD[Int] = rdd.map(_ * 2)

    // 3.3 打印修改后的RDD中数据
    mapRdd.collect().foreach(println)
  }
}

         map()操作如图所示:

map()函数结构 
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

功能描述

        参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

        在本例中,f为:_*2

        备注:rdd.map(_ * 2)是rdd.map((f: Int) => f * 2)的简写。

mapPartitions()以分区单位执行Map

需求:

创建一个RDD,4个元素,2个分区,使每个元素*2组成新的RDD

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    mapPartitionsTest(sc)

    //4.关闭连接
    sc.stop()
  }

  /**
   * 创建一个RDD,4个元素,2个分区,使每个元素*2组成新的RDD
   * */
  def mapPartitionsTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 需求实现
    val value: RDD[Int] = rdd.mapPartitions((data: Iterator[Int]) => data.map((x: Int) => x * 2))

    // 3.3 打印
    value.foreach(println)
  }
}

mapPartitions()操作如图所示:

 mapPartitions()函数结构:

def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false
): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

功能介绍:

  • f函数把每一个分区的数据分别放入到迭代器中,批处理。
  • preservesPartitioning:是否保留上游RDD的分区信息,默认false
  • Map是一次处理一个元素,而mapPartitions一次处理一个分区数据。

备注:map()和mapPartitions()的区别

  1. map():每次处理一条数据。
  2. mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。
  3. 开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。

mapPartitionsWithIndex()带分区号

需求:

创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    mapPartitionsWithIndexTest(sc)
    //4.关闭连接
    sc.stop()
  }

  /**
   * 创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD
   * */
  def mapPartitionsWithIndexTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 需求实现
    val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index: Int, item: Iterator[Int]) => {
      item.map((f: Int) => (index,f))
    })
    // 3.3 打印
    value.foreach(println)
  }
}

mapPartitionsWithIndex()操作如图所示:

 mapPartitionsWithIndex()函数结构:

def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }

功能介绍:

  1.  f: (Int, Iterator[T]) => Iterator[U]中Int表示分区编号
  2. 类似于mapPartitions,比mapPartitions多一个整数参数表示分区号

flatMap()压平

需求:

创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    flatMapTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中
   * */
  def flatMapTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4),List(5,6),List(7)),2)

    // 3.2 需求实现
    val value: RDD[Int] = rdd.flatMap((list: List[Int]) => list)

    // 3.3 打印
    value.foreach(println)
  }
}

flatMap操作如图所示:

扩展: 可以通过任务上下文获取分区号。

 rdd.foreach((f: List[Int]) => {
      // 通过任务上下文获取partitionID
      println(
TaskContext.getPartitionId() + "---"+ f.mkString(","))

    })

flatMap()函数结构:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

功能介绍:

  1. 与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
  2. 区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。并且新的RDD中继承了原RDD中的分区数。 

glom()分区转换数组

需求:

创建一个2个分区的RDD,并将每个分区的数据放到一个数组,求出每个分区的最大值。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    glomTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 创建一个2个分区的RDD,并将每个分区的数据放到一个数组,求出每个分区的最大值
   * */
  def glomTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
    // 3.2 需求实现
    val value: RDD[Int] = rdd.glom().mapPartitions((x: Iterator[Array[Int]]) => x.map((f: Array[Int]) => f.max))

    // 3.3 打印
    value.foreach((f: Int) => {
      println(TaskContext.getPartitionId() + ":" + f)
    })
  }
}

glom()操作如图所示:

glom()函数结构:

 def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

功能介绍:

        该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致。

groupBy()分组

需求:

创建一个RDD,按照元素模以2的值进行分组。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    groupByTest(sc)
    //4.关闭连接
    sc.stop()
  }

  /**
   * 创建一个RDD,按照元素模以2的值进行分组
   * */
  def groupByTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10,3)
    // 3.2 需求实现
    val value: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
    // 3.3 打印
        value.foreach(println)

  }
}

groupBy()操作如图所示:

groupBy()函数结构:

  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))
  }

功能介绍:

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

备注:

1、groupBy会存在shuffle过程
2、shuffle:将不同的分区数据进行打乱重组的过程
3、shuffle一定会落盘。

扩展:复杂版的wordcount

需求:

        有如下的数据,("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2),("I Love You",5),("I Miss You",2),("Best wish",9)。其中数字代表出现的个数。求每个单词出现的次数。

代码实现:

def worldCountTest(sc: SparkContext): Unit ={
    val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2), ("I Love You", 5), ("I Miss You", 2), ("Best wish", 9)))

    // 方式一:适合scala
    /*val value: String = rdd.map {
          // 模式匹配
      case (str, count) => {
        // scala对字符串操作,("Hello Scala" + " ") * 2 = Hello Scala Hello Scala
        (str + " ") * count
      }
    }
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .collect()
      .mkString(",")
*/

    // 方式二:比较通用
    val value: String = rdd.flatMap {
      case (str, i) => {
        str.split(" ").map((word: String) => (word, i))
      }
    }.reduceByKey(_ + _)
      .collect()
      .mkString(",")

    println(value)

  }

filter()过滤

需求:

创建一个1到10的RDD,过滤出偶数。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    filterTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 过滤偶数
   * */
  def filterTest(sc: SparkContext): Unit = {
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10, 3)
    // 3.2 需求实现
    val value: RDD[Int] = rdd.filter(_ % 2 == 0)
    // 3.3 打印
    value.foreach(println)
  }
}

filter()操作过程如下:

filter()函数结构:

  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

功能介绍:

  • 接收一个返回值为布尔类型的函数作为参数。
  • 当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

sample()采样

需求:

创建一个RDD(1-10),从中选择放回和不放回抽样。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    sampleTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 随机采样
   * */
  def sampleTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 20, 3)
    // 3.2 需求实现
    val value: String = rdd.sample(true,0.3,3).collect().mkString(",")
    val value2: String = rdd.sample(false,0.3,3).collect().mkString(",")
    // 3.3 打印
    
    println("不放回采样结果:" + value2)
    println("放回采样结果:" + value)
  }
}

sample()操作如图所示:

sample()函数结构:

def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong
): RDD[T] = {
    ......
  }

功能介绍:

  • 从大量的数据中采样
  • withReplacement: Boolean: 抽出的数据是否放回
  • fraction: Double:
    • 当withReplacement=false时:选择每个元素的概率;取值一定是[0,1] ;底层使用泊松分布。
    • 当withReplacement=true时:选择每个元素的期望次数; 取值必须大于等于0,底层使用伯努利采样。
  • seed: Long: 指定随机数生成器种子

备注:

       1、 该函数随机采样是伪随机,因为传入的随机种子是一样的,计算结果当然一样。

distinct()去重

需求:

对如下的数据去重:3,2,9,1,2,1,5,2,9,6,1

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    distinctTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 数据去重
   * */
  def distinctTest(sc: SparkContext): Unit = {
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(List(3,2,9,1,2,1,5,2,9,6,1))
    // 3.2 需求实现
    val value: RDD[Int] = rdd.distinct()
    // 3.3 打印
    value.foreach(println)
  }
}

distinct()操作如图所示:

distinct()函数结构:

  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }

  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

功能介绍:

对内部的元素去重,并将去重后的元素放到新的RDD中。

备注:

        1、distinct()用分布式的方式去重比HashSet集合方式不容易OOM.

        2、默认情况下,distinct会生成与原RDD分区个数一致的分区数,当然也可以指定分区数。

coalesce()重新分区

需求:

1、将4个分区的RDD合并成两个分区的RDD

2、将三个分区的RDD合并成两个分区的RDD

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    coalesceTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * coalesce重分区
   * */
  def coalesceTest(sc: SparkContext): Unit = {
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,4)
    val rdd2: RDD[Int] = sc.makeRDD(1 to 4,3)
    // 3.2 需求实现
    val value1: Array[(Int, Int)] = rdd.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
    val value2: Array[(Int, Int)] = rdd2.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
    // 3.3 打印
    println("value1:" + value1.mkString(","))
    println("value2:" + value2.mkString(","))
  }
}

coalesce()操作如图所示:

coalesce()函数结构:

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)

      : RDD[T] = withScope {

......

}

功能介绍:

  • 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
  • shuffle默认为false,则该操作会将分区数较多的原始RDD向分区数比较少的目标RDD,进行转换。
  • shuffle:
    • true: 进行shuffle,此时目标分区数可以大于可以小于原分区数。即:可以缩小和扩大原分区。
    • false:不进行shuffle,此时目标分区数只能小于原分区数;当大于时,分区数不生效。即:只能缩小或等于原分区。

备注:

        1、shuffle原理: 将数据打散,然后重新组合。

        2、具体shuffle过程,读者可以参考“彻底搞懂spark的shuffle过程”。

reparation()重新分区(执行Shuffle)

需求:

创建一个4个分区的RDD,对其重新分区

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    repartitionTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * repartition重分区
   * */
  def repartitionTest(sc: SparkContext): Unit = {
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10,4)

    // 3.2 需求实现
    val value1: Array[(Int, Int)] = rdd.repartition(2).map((TaskContext.getPartitionId(),_)).collect()
    val value2: Array[(Int, Int)] = rdd.repartition(5).map((TaskContext.getPartitionId(),_)).collect()

    // 3.3 打印
    println("value1:" + value1.mkString(","))
    println("value2:" + value2.mkString(","))
  }
}

reparation()操作如图所示:

reparation()函数结构:

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

功能介绍:

  • 该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。
  • 无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

备注: coalesce与reparation的区别

  • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  • repartition实际上是调用的coalesce,进行shuffle。
  • 如果是减少分区, 尽量避免 shuffle,使用coalesce完成。
  • 绝大多数情况下,减少分区使用coalesce,增加分区使用reparation。

sortBy()排序

需求:

创建一个RDD,按照不同的规则进行排序。

  1. 按照数字大小升序排序
  2. 按照数字大小降序排序
  3. 按照模以5的余数降序排序
  4. 二元组,则先按照第一个元素升序,如果第一个元素相同,在按照第二个元素降序

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    sortByTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * sortBy排序
   * */
  def sortByTest(sc: SparkContext): Unit = {
    // 3.1 创建第一个RDD
    val rdd: RDD[Int] = sc.makeRDD(List(3,16,5,8,2,10,9,1,7,6))

    // 3.2 需求实现
    // 升序
    val value1: String = rdd.sortBy((num: Int) => num).collect().mkString(",")
    // 降序
    val value2: String = rdd.sortBy((num: Int) => num,false).collect().mkString(",")
    // 按照%5排序
    val value3: String = rdd.sortBy((num: Int) => num % 5,false).collect().mkString(",")

    // 先按照第一个元素升序,如果第一个元素相同,在按照第二个元素降序
    val value4: String = rdd2.sortBy((x: (Int, Int)) => (x._1,-x._2)).collect().mkString(",")

    // 3.3 打印
    println("value1:" + value1)
    println("value2:" + value2)
    println("value3:" + value3)
    println("value4:" + value4)
  }
}

sortBy()操作如图所示:

sortBy()函数结构:

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length
)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }

功能介绍:

  • 该操作用于排序数据。
  • 在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。
  • 默认情况下,排序后新产生的RDD的分区数与原RDD的分区数一致。
  • 可以通过numPartitions参数调整目标分区数。

pipe()调用脚本

需求:

编写一个脚本,使用管道将脚本作用于RDD上。

代码实现:

# 编写一个脚本,并增加执行权限
[root@node001 spark]$ vim pipe.sh
#!/bin/sh
echo "Start"
while read LINE; do
   echo ">>>"${LINE}
done

[root@node001 spark]$ chmod 777 pipe.sh

# 创建一个只有一个分区的RDD
scala> val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1)

# 将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

#创建一个有两个分区的RDD
scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2)

# 将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res19: Array[String] = Array(Start, >>>hi, >>>Hello, Start, >>>how, >>>are, >>>you)

pipe()操作如图所示:

pipe()函数结构:

def pipe(command: String): RDD[String] = withScope {
    // Similar to Runtime.exec(), if we are given a single string, split it into words
    // using a standard StringTokenizer (i.e. by spaces)
    pipe(PipedRDD.tokenize(command))
  }

功能介绍:

  • 管道,针对每个分区,都调用一次shell脚本,返回输出的RDD
  • 脚本要放在 worker 节点可以访问到的位置

  • 每个分区执行一次脚本, 但是每个元素算是标准输入中的一行

双Value类型交互

        双Value类型的交互常用的算子是数学中的并集、交集合差集。

union()并集、subtract()差集、intersection()交集

需求:

创建两个RDD,求并集、交集、差集

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    aggTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * 求并集、交集、差集
   * */
  def aggTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd1: RDD[Int] = sc.makeRDD (1 to 6)
    val rdd2: RDD[Int] = sc.makeRDD (4 to 10)

    // 3.2 需求实现
    // 并集
    val value1: Array[Int] = rdd1.union(rdd2).collect()
    // 差集
    val value2: Array[Int] = rdd1.subtract(rdd2).collect()
    // 交集
    val value3: Array[Int] = rdd1.intersection(rdd2).collect()

    // 3.3 打印
    println("并集:" + value1.mkString(","))
    println("差集:" + value2.mkString(","))
    println("交集:" + value3.mkString(","))
  }
}

union()并集操作如图所示:

subtract()差集操作如图所示:

intersection()交集操作如图所示:

union()函数结构:

  def union(other: RDD[T]): RDD[T] = withScope {
    sc.union(this, other)
  }

subtract()函数结构:

 def subtract(other: RDD[T]): RDD[T] = withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
  }

intersection()函数结构:

  def intersection(other: RDD[T]): RDD[T] = withScope {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }

功能介绍:

  • 并集:对源RDD和参数RDD求并集后返回一个新的RDD
  • 差集:计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来
  • 交集:对源RDD和参数RDD求交集后返回一个新的RDD

 zip()拉链

需求:

创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    zipTest(sc)
    
    //4.关闭连接
    sc.stop()
  }
  /**
   * zip拉链
   * */
  def zipTest(sc: SparkContext): Unit ={
    // 3.1 创建第一个RDD
    val rdd1: RDD[Int] = sc.makeRDD (1 to 3,2)
    val rdd2: RDD[String] = sc.makeRDD (List("a","b","c"),2)

    // 3.2 需求实现
    val value: Array[(String, Int)] = rdd2.zip(rdd1).collect()

    // 3.3 打印
    println(value.mkString(","))
  }
}

zip()操作如图所示:

zip()函数结构:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {

......

}

功能介绍:

  • 该操作可以将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。
  • 将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

Key-Value类型

partitionBy()按照k重新分区

需求:

创建一个5个分区的RDD,对其重新分区。

代码实现:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具体业务逻辑
    partitionByKeyTest(sc)

    //4.关闭连接
    sc.stop()
  }
  /**
   * partitionByKey:根据key重新分区
   * */
  def partitionByKeyTest(sc: SparkContext): Unit ={
    val tuples: List[(Int, String)] = List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc"), (4, "dd")
      ,(1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd"))
    // 3.1 创建第一个RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD (tuples,5)
    rdd.cache()

    println("-------分区前-------")
    val value1: String = rdd.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
      data.map((index,_))
    }).collect().mkString("\\t")

    println(value1)
    // 3.2 需求实现
    val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))

    // 3.3 打印
    println("-------分区后-------")
    val value2: String = value.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
      data.map((index,_))
    }).collect().mkString("\\t")
    println(value2)
  }
}

partitionBy()操作如图所示:

partitionBy()函数结构:

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

功能介绍:

  • 将RDD[K,V]中的K按照指定Partitioner重新进行分区;会产生Shuffle过程。
  • 如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则进行分区。
  • 默认的分区器是HashPartitioner。

备注:

        1、HashPartitioner的原理是:

  • 依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,
  • 支持key值为null的情况,当key为null的时候,返回0;
  • 该分区器基本上适合所有RDD数据类型的数据进行分区操作。

        2、详细的分区器原理,请参考:[Spark] - HashPartitioner & RangePartitioner 区别

注意:

        1、由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中。(未测)

        2、在scala中,如果RDD的key是Array数组类型,编译不通过。Exception in thread "main" org.apache.spark.SparkException: HashPartitioner cannot partition array keys.

     这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。

自定义分区

class MyPartitioner(num: Int) extends Partitioner {
    // 设置的分区数
    override def numPartitions: Int = num

    // 具体分区逻辑
    override def getPartition(key: Any): Int = {

        if (key.isInstanceOf[Int]) {

            val keyInt: Int = key.asInstanceOf[Int]
            if (keyInt % 2 == 0)
                0
            else
                1
        }else{
            0
        }
    }
}

reduceByKey()按照k进行聚合v

需求:

统计单词出现次数(wordCount)。

代码实现:

为了增加难度,本次实验给字符串加了一些中英文的字符。

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

以上是关于Spark Core学习之常用算子(含经典面试题)的主要内容,如果未能解决你的问题,请参考以下文章

spark学习之并行度并发core数和分区的关系

Spark-core学习之二 Spark-core

Spark-core学习之八 SparkShuffle & Spark内存管理

Spark-core算子

Spark-core学习之三 Spark集群搭建

机器学习之Spark详解