spark算子

Posted 上官沐雪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark算子相关的知识,希望对你有一定的参考价值。

spark算子介绍

一、sparkCore相关算子

1. source读取数据相关算子:

##从内存中创建的集合,默认按照分区的数据平均分配,最后多的数据落到最后一个分区
1. parallelize:从集合中获取rdd
val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    val sparkContext = new SparkContext(sparkConf)
    val sourceRdd: RDD[Int] = sparkContext.parallelize(List(12, 21))


2. makeRdd:此方法是调用parallelize方法
    sparkContext.makeRDD(List(121))

2. transform转换算子:

## coalesce:数据进行重新分区,默认不经过shuffle
## 缩减分区:coalesce,如果使用数据均衡,可以采用shuffle
## 扩大分区使用repartition,底层调用coalesce 一定会shuffle  coalesce(numPartitions, shuffle = true)

1.coalesce 减少分区,不经过shuffle
	val sourceRdd = sparkContext.parallelize(List(1, 2, 4, 5), 4) //指定4个分区
	sourceRdd.coalesce(2)  //减小到两个分区,不经过shuffle

2.扩大分区使用repartition,底层调用coalesce 一定会shuffle 
	//coalesce(numPartitions, shuffle = true)
	sourceRdd.repartition(2)

3.cogroup:两个相同的key join到一起会shuffle 如:(a,(CompactBuffer(1, 2),CompactBuffer(1)))
    val dataRDD1 = sparkContext.makeRDD(List(("a", 1), ("a", 2), ("c", 3)))
    val dataRDD2 = sparkContext.makeRDD(List(("a", 1), ("c", 2), ("c", 3)))
	dataRDD1.cogroup(dataRDD2)

4.combineByKey:
    /**
     * 第一个参数:将value转换成什么形式
     * 第二个参数:将分区内的操作
     * 第三个参数:分区之间的计算规则
     */
  	val sourceRdd = sparkContext.parallelize(List(("1", 1), ("1", 2), ("2", 1), ("2", 3)))
	val combineRdd: RDD[(String, (Int, Int))] = sourceRdd.combineByKey(
      v => (v, 1),
      (t: (Int, Int), v) => 
        (t._1 + v, t._2 + 1)
      ,
      (t1: (Int, Int), t2: (Int, Int)) => 
        (t1._1 + t2._1, t1._2 + t2._2)
      
    )

5.spark distinct算子:分布式算子进行去重 使用reduceBykey 只要key 再map一下取第一个。scala distinct方法:去重是hashset去重
	sourceRdd.distinct()

6.	intersection:交集,
	union:并集,
	subtract:差集,
	zip:拉链 ,不需要相同的数据类型,拉链的数据源的分区数量要相同,且分区里面的元素相同

    val rdd1 = sparkContext.makeRDD(List(12, 1, 3, 2, 4))
    val rdd2 = sparkContext.makeRDD(List(12, 6, 3, 2, 8))
	rdd1.intersection(rdd2)	// 交集 第一个rdd与第二个rdd的相同元素
	rdd1.union(rdd2)		// 并集  第一个rdd与第二个元素合并
	rdd1.subtract(rdd2)		// 差集  第一个rdd比第二个rdd多的元素
	rdd1.zip(rdd2)			// 拉链:合并相同元素的rdd 返回值为tuple 第一个值为集合一中的第一个元素,第二个值为第二个集合相应位置的元素. 注意:拉链的数据源的分区数量要相同,且分区里面的元素相同


7. flatMap:对数组,集合进行压平操作


8.glom把同分区的数据转换成数组,逆行flatMap
    val sourceRdd: RDD[Int] = sparkContext.parallelize(List(4, 65, 76, 66, 76))
    val transRdd: RDD[Array[Int]] = sourceRdd.glom()


9.groupBy:根据指定的规则进行分组,分区个数不变,数据被打乱重新分就是shuffle。也可能数据被分到同一个分区。
	sourceRdd.groupBy(_ % 3)

10.join:相同key的value 会连接成元组,相当于内连接,如果两个源中有相同的key 会出现笛卡尔积
	//如: (a,(1,Some(8)))
    val rdd1 = sparkContext.parallelize(List(("a", 1), ("b", 1), ("c", 1), ("d", 1), ("e", 1)))
    val rdd2 = sparkContext.parallelize(List(("a", 8), ("b", 6), ("c", 4), ("d", 2)))
	val resultRdd: RDD[(String, Int)] = joinRdd.mapValues(
      case (a, b) => a + b
    )

11.	groupBy:分组是自己指定的key,所以后面的元素为相应的原来RDD的元素
	groupByKey:按照key进行分组,key已经确定,后面的值为原来元素的值
	
	groupByKey:只有分组没有聚合的概念
	reduceByKey:会先在分局内进行combine(预聚合操作),减少shuffle落盘的数量
	(都会shuffle)

    val sourceRdd = sparkContext.makeRDD(List(("a", 1), ("b", 1), ("a", 3), ("d", 1), ("c", 3)))
    /**
     * 只有分组没有聚合的概念
     * 聚合后的结果如:(a,CompactBuffer(1, 1))
     */
	sourceRdd.groupByKey().foreach(println(_))

    /**
     * 按照key进行聚合,分区内和分区之间的聚合方式一样
     */
    sourceRdd.reduceByKey(_ + _)

    /**
     * 第一个参数列表:
     * 第一个值:初始值 可以是数值,字符串,tuple等
     * 第二个值:分区的个数
     * 第二个参数列表:
     * 第一个值:分区间计算规则
     * 第二个值:分区内的计算规则
     */
	sourceRdd.aggregateByKey(0)(
          (a,b) => a+b
          ,(a,b) => a+b
        ).foreach(println(_))

    /**
     * 第一个参数:对value进行处理
     * 第二个参数:分区内进行处理
     * 第三个参数:分区间 的处理
     */
    sourceRdd.combineByKey(v=>v,(a:Int,b)=>a+b,(a:Int,b:Int)=>a+b)

    /**
     * 使用aggregateByKey求平均值
     */
    val aggreRdd: RDD[(String, (Int, Int))] = sourceRdd.aggregateByKey((0, 0))(
      (t, v) => 
        (t._1 + v, t._2 + 1)
      ,
      (t1, t2) => 
        (t1._1 + t2._1, t1._2 + t2._2)
      
    )


12.	mapPartitions:参数和返回值都是迭代器,分区之间的迭代器返回之后再合成rdd
	mapPartitions 会把每个分区数据发给 executor 执行,一个算子作用在只作用分区一次
	map:有多少条数据就会执行多少次,串行操作
	
    val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 5, 3, 6, 72, 9))
	//    分区内最大
    val resultRdd: RDD[Int] = sourceRdd.mapPartitions(iter => 
      List(iter.max).iterator
    )

13. mapPartitionsWithIndex:对每个分区数据进行处理,两个参数,第一个是分区编号,第二个是分区数据。

	val sourceIndex: RDD[Int] = sparkContext.parallelize(List(64, 75, 4, 443, 65, 22), 2)
    val resultRdd: RDD[Int] = sourceIndex.mapPartitionsWithIndex((index, iter) => 
      index match 
        case a:Int => iter
        case _ => List().iterator
      
    )

14. partitionBy:按照一定的规则,对数据重分区 (会shuffle)

	val sourceRdd: RDD[(Int, Int)] = sparkContext.makeRDD(List((1, 2), (2, 2), (3, 2), (4, 2)))
    val hashPartition = sourceRdd.partitionBy(new HashPartitioner(2))


15. reduceByKey:相同的key放到同一个组里,对value聚合。当key只有一个的时候,是不会参加计算的。(会shuffle)

    val sourceRdd = sparkContext.makeRDD(List(("a", 1), ("b", 1), ("a", 1), ("d", 1), ("c", 1)))
    //reduceByKey 相同的key放到同一个组里,对value聚合
    val resultRdd = sourceRdd.reduceByKey((x, y) => 
      println(s"x= $x, y=$y")
      x + y
    )

16. repartition:底层调用coalesce 一定会shuffle
	coalesce:减少分区,不经过shuffle=false。如果使用数据均衡,可以采用shuffle。
	
	sourceRdd.repartition(2)

17. sample:抽样

    /**
     * 第一个参数:是否被放回
     * 第二个参数:
     * 数据被抽中的概率 基准值的概念
     *
     * 第三个参数:随机数种子
     *          如果不传,那么会用当前系统时间
     */
    sourceRdd.sample(true,0.4).foreach(println(_))

18:sortBy:存在shuffle过程,是整体进行排序

    val rdd01 = sparkContext.parallelize(List(("21", 1), ("12", 1), ("13", 1), ("11", 1), ("18", 1)))
    //第一个参数,排序规则
    //第二个参数升序降序
    //第三个参数 指定分区数量
    rdd01.sortBy(ele => ele._1, false, 1).foreach(println(_))

3. action动作算子:


1. aggregate:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合,
    /**
     * 初始值:不仅参与分局间计算,还会参与分区内计算
     * 触发算子直接转换成结果,返回int类型值
     */
  	val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 432, 34, 56))
    val res: Int = sourceRdd.aggregate(0)((_ + _), (_ + _))


2. collect:将数据按照分区的顺序进行采集到driver内存,形成数组
 
    val res: Array[Int] = sourceRdd.collect()
    print(res.mkString(","))

3. count:返回 RDD 中元素的个数int值
   /**
     * 统计rdd中的个数
     */
    val count: Long = sourceRdd.count()

4. countByKey:
    /**
     * 计算相应的值出现的次数
     * Map(34 -> 1, 2 -> 2, 1 -> 2)
     * 前面一个数是值,后面是出现次数
     */
    val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 2, 34, 1))
    val res1: collection.Map[Int, Long] = sourceRdd.countByValue()
    println(res1)
    /**
     * 相应的key出现的次数
     */
    val sourceRdd1: RDD[(String, Int)] = sparkContext.parallelize(List(("a", 2), ("a", 1), ("b", 3), ("b", 5)))
    val res02: collection.Map[String, Long] = sourceRdd1.countByKey()
    sourceRdd1.cache()

5. first:返回 RDD 中的第一个元素
    val i: Int = sourceRdd.first()

6. fold:折叠操作,aggregate 的简化版操作,分区间操作和分区内的操作相同

    /**
     * 直接返回相应的值
     * 分区内和分区间计算规则相同
     */
    val res: Int = sourceRdd.fold(0)(_ + _)
    println(res)


7.reduce:先聚合分区内数据,再聚合分区间数据
    /**
     * 对数据进行聚合返回值类型就是int
     */
    val res: Int = sourceRdd.reduce(_ + _)
    println(res)

8.saveAsTextFile:text格式
    sourceRdd.saveAsTextFile("./output01")
    sourceRdd.saveAsObjectFile("./output01")


9.take:返回一个由 RDD 的前 n 个元素组成的数组
  	/**
     * 返回一个由 RDD 的前 n 个元素组成的数组
     */
    val takeRdd: Array[Int] = sourceRdd.take(3)
    println(takeRdd.mkString(","))

4. broadcast广播算子:向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用


/**
 *   广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个
 *    或多个 Spark 操作使用
 */
  	val rdd1 = sparkContext.parallelize(List(("a", "1"), ("b", "1"), ("c", "1")))
    val rdd2 = sparkContext.parallelize(List(("a", "2"), ("b", "2"), ("c", "2")))
    val map = Map(("a", "2"), ("b", "2"), ("c", "2"))
    
    val mapBroadcast = sparkContext.broadcast(map)
    rdd1.join(rdd2).foreach(println(_))

    val res = rdd1.map(ele => 
      val value2 = mapBroadcast.value.getOrElse(ele._1, "")
      (ele._1, (ele._2, value2))
    )

5. Accumulator:分布式累写变量,excutor都安数据不共享, Driver 程序中定义的变量,Executor 端的每个 Task 都会得到这个变量的一份新的副本。每个 task 更新这些副本的值后传回 Driver 端进行 merge

 	val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    val sparkContext = new SparkContext(conf)
    val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3))
    //获取系统的累加器
    //spark 默认提供类简单聚合类的累加器 longAccumulator doubleAccumulator collectionAccumulator
    val sumAcc: LongAccumulator = sparkContext.longAccumulator("sum")
    sourceRdd.foreach(element => 
      sumAcc.add(element)
    
    )
    /**
     * 出现多加:多次调用行动算子
     * 出现少加:没有触发行动算子
     * 一般累加器放在行动算子里
     */
    //获取累加器的值
    println(sumAcc.value)

6. Partitioner:自定义分区器


  def main(args: Array[String]): Unit = 

    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    val sparkContext = new SparkContext(conf)
    val sourceRdd = sparkContext.makeRDD(Spark算子篇 --Spark算子之combineByKey详解

Spark算子之aggregateByKey详解

Spark-core:Spark RDD的高级算子

spark总结4 算子问题总结

Spark性能调优-RDD算子调优篇

Spark性能调优-RDD算子调优篇