spark算子
Posted 上官沐雪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark算子相关的知识,希望对你有一定的参考价值。
spark算子介绍
一、sparkCore相关算子
1. source读取数据相关算子:
##从内存中创建的集合,默认按照分区的数据平均分配,最后多的数据落到最后一个分区
1. parallelize:从集合中获取rdd
val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
val sparkContext = new SparkContext(sparkConf)
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(12, 21))
2. makeRdd:此方法是调用parallelize方法
sparkContext.makeRDD(List(121))
2. transform转换算子:
## coalesce:数据进行重新分区,默认不经过shuffle
## 缩减分区:coalesce,如果使用数据均衡,可以采用shuffle
## 扩大分区使用repartition,底层调用coalesce 一定会shuffle coalesce(numPartitions, shuffle = true)
1.coalesce 减少分区,不经过shuffle
val sourceRdd = sparkContext.parallelize(List(1, 2, 4, 5), 4) //指定4个分区
sourceRdd.coalesce(2) //减小到两个分区,不经过shuffle
2.扩大分区使用repartition,底层调用coalesce 一定会shuffle
//coalesce(numPartitions, shuffle = true)
sourceRdd.repartition(2)
3.cogroup:两个相同的key join到一起会shuffle 如:(a,(CompactBuffer(1, 2),CompactBuffer(1)))
val dataRDD1 = sparkContext.makeRDD(List(("a", 1), ("a", 2), ("c", 3)))
val dataRDD2 = sparkContext.makeRDD(List(("a", 1), ("c", 2), ("c", 3)))
dataRDD1.cogroup(dataRDD2)
4.combineByKey:
/**
* 第一个参数:将value转换成什么形式
* 第二个参数:将分区内的操作
* 第三个参数:分区之间的计算规则
*/
val sourceRdd = sparkContext.parallelize(List(("1", 1), ("1", 2), ("2", 1), ("2", 3)))
val combineRdd: RDD[(String, (Int, Int))] = sourceRdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) =>
(t._1 + v, t._2 + 1)
,
(t1: (Int, Int), t2: (Int, Int)) =>
(t1._1 + t2._1, t1._2 + t2._2)
)
5.spark distinct算子:分布式算子进行去重 使用reduceBykey 只要key 再map一下取第一个。scala distinct方法:去重是hashset去重
sourceRdd.distinct()
6. intersection:交集,
union:并集,
subtract:差集,
zip:拉链 ,不需要相同的数据类型,拉链的数据源的分区数量要相同,且分区里面的元素相同
val rdd1 = sparkContext.makeRDD(List(12, 1, 3, 2, 4))
val rdd2 = sparkContext.makeRDD(List(12, 6, 3, 2, 8))
rdd1.intersection(rdd2) // 交集 第一个rdd与第二个rdd的相同元素
rdd1.union(rdd2) // 并集 第一个rdd与第二个元素合并
rdd1.subtract(rdd2) // 差集 第一个rdd比第二个rdd多的元素
rdd1.zip(rdd2) // 拉链:合并相同元素的rdd 返回值为tuple 第一个值为集合一中的第一个元素,第二个值为第二个集合相应位置的元素. 注意:拉链的数据源的分区数量要相同,且分区里面的元素相同
7. flatMap:对数组,集合进行压平操作
8.glom把同分区的数据转换成数组,逆行flatMap
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(4, 65, 76, 66, 76))
val transRdd: RDD[Array[Int]] = sourceRdd.glom()
9.groupBy:根据指定的规则进行分组,分区个数不变,数据被打乱重新分就是shuffle。也可能数据被分到同一个分区。
sourceRdd.groupBy(_ % 3)
10.join:相同key的value 会连接成元组,相当于内连接,如果两个源中有相同的key 会出现笛卡尔积
//如: (a,(1,Some(8)))
val rdd1 = sparkContext.parallelize(List(("a", 1), ("b", 1), ("c", 1), ("d", 1), ("e", 1)))
val rdd2 = sparkContext.parallelize(List(("a", 8), ("b", 6), ("c", 4), ("d", 2)))
val resultRdd: RDD[(String, Int)] = joinRdd.mapValues(
case (a, b) => a + b
)
11. groupBy:分组是自己指定的key,所以后面的元素为相应的原来RDD的元素
groupByKey:按照key进行分组,key已经确定,后面的值为原来元素的值
groupByKey:只有分组没有聚合的概念
reduceByKey:会先在分局内进行combine(预聚合操作),减少shuffle落盘的数量
(都会shuffle)
val sourceRdd = sparkContext.makeRDD(List(("a", 1), ("b", 1), ("a", 3), ("d", 1), ("c", 3)))
/**
* 只有分组没有聚合的概念
* 聚合后的结果如:(a,CompactBuffer(1, 1))
*/
sourceRdd.groupByKey().foreach(println(_))
/**
* 按照key进行聚合,分区内和分区之间的聚合方式一样
*/
sourceRdd.reduceByKey(_ + _)
/**
* 第一个参数列表:
* 第一个值:初始值 可以是数值,字符串,tuple等
* 第二个值:分区的个数
* 第二个参数列表:
* 第一个值:分区间计算规则
* 第二个值:分区内的计算规则
*/
sourceRdd.aggregateByKey(0)(
(a,b) => a+b
,(a,b) => a+b
).foreach(println(_))
/**
* 第一个参数:对value进行处理
* 第二个参数:分区内进行处理
* 第三个参数:分区间 的处理
*/
sourceRdd.combineByKey(v=>v,(a:Int,b)=>a+b,(a:Int,b:Int)=>a+b)
/**
* 使用aggregateByKey求平均值
*/
val aggreRdd: RDD[(String, (Int, Int))] = sourceRdd.aggregateByKey((0, 0))(
(t, v) =>
(t._1 + v, t._2 + 1)
,
(t1, t2) =>
(t1._1 + t2._1, t1._2 + t2._2)
)
12. mapPartitions:参数和返回值都是迭代器,分区之间的迭代器返回之后再合成rdd
mapPartitions 会把每个分区数据发给 executor 执行,一个算子作用在只作用分区一次
map:有多少条数据就会执行多少次,串行操作
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 5, 3, 6, 72, 9))
// 分区内最大
val resultRdd: RDD[Int] = sourceRdd.mapPartitions(iter =>
List(iter.max).iterator
)
13. mapPartitionsWithIndex:对每个分区数据进行处理,两个参数,第一个是分区编号,第二个是分区数据。
val sourceIndex: RDD[Int] = sparkContext.parallelize(List(64, 75, 4, 443, 65, 22), 2)
val resultRdd: RDD[Int] = sourceIndex.mapPartitionsWithIndex((index, iter) =>
index match
case a:Int => iter
case _ => List().iterator
)
14. partitionBy:按照一定的规则,对数据重分区 (会shuffle)
val sourceRdd: RDD[(Int, Int)] = sparkContext.makeRDD(List((1, 2), (2, 2), (3, 2), (4, 2)))
val hashPartition = sourceRdd.partitionBy(new HashPartitioner(2))
15. reduceByKey:相同的key放到同一个组里,对value聚合。当key只有一个的时候,是不会参加计算的。(会shuffle)
val sourceRdd = sparkContext.makeRDD(List(("a", 1), ("b", 1), ("a", 1), ("d", 1), ("c", 1)))
//reduceByKey 相同的key放到同一个组里,对value聚合
val resultRdd = sourceRdd.reduceByKey((x, y) =>
println(s"x= $x, y=$y")
x + y
)
16. repartition:底层调用coalesce 一定会shuffle
coalesce:减少分区,不经过shuffle=false。如果使用数据均衡,可以采用shuffle。
sourceRdd.repartition(2)
17. sample:抽样
/**
* 第一个参数:是否被放回
* 第二个参数:
* 数据被抽中的概率 基准值的概念
*
* 第三个参数:随机数种子
* 如果不传,那么会用当前系统时间
*/
sourceRdd.sample(true,0.4).foreach(println(_))
18:sortBy:存在shuffle过程,是整体进行排序
val rdd01 = sparkContext.parallelize(List(("21", 1), ("12", 1), ("13", 1), ("11", 1), ("18", 1)))
//第一个参数,排序规则
//第二个参数升序降序
//第三个参数 指定分区数量
rdd01.sortBy(ele => ele._1, false, 1).foreach(println(_))
3. action动作算子:
1. aggregate:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合,
/**
* 初始值:不仅参与分局间计算,还会参与分区内计算
* 触发算子直接转换成结果,返回int类型值
*/
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 432, 34, 56))
val res: Int = sourceRdd.aggregate(0)((_ + _), (_ + _))
2. collect:将数据按照分区的顺序进行采集到driver内存,形成数组
val res: Array[Int] = sourceRdd.collect()
print(res.mkString(","))
3. count:返回 RDD 中元素的个数int值
/**
* 统计rdd中的个数
*/
val count: Long = sourceRdd.count()
4. countByKey:
/**
* 计算相应的值出现的次数
* Map(34 -> 1, 2 -> 2, 1 -> 2)
* 前面一个数是值,后面是出现次数
*/
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 2, 34, 1))
val res1: collection.Map[Int, Long] = sourceRdd.countByValue()
println(res1)
/**
* 相应的key出现的次数
*/
val sourceRdd1: RDD[(String, Int)] = sparkContext.parallelize(List(("a", 2), ("a", 1), ("b", 3), ("b", 5)))
val res02: collection.Map[String, Long] = sourceRdd1.countByKey()
sourceRdd1.cache()
5. first:返回 RDD 中的第一个元素
val i: Int = sourceRdd.first()
6. fold:折叠操作,aggregate 的简化版操作,分区间操作和分区内的操作相同
/**
* 直接返回相应的值
* 分区内和分区间计算规则相同
*/
val res: Int = sourceRdd.fold(0)(_ + _)
println(res)
7.reduce:先聚合分区内数据,再聚合分区间数据
/**
* 对数据进行聚合返回值类型就是int
*/
val res: Int = sourceRdd.reduce(_ + _)
println(res)
8.saveAsTextFile:text格式
sourceRdd.saveAsTextFile("./output01")
sourceRdd.saveAsObjectFile("./output01")
9.take:返回一个由 RDD 的前 n 个元素组成的数组
/**
* 返回一个由 RDD 的前 n 个元素组成的数组
*/
val takeRdd: Array[Int] = sourceRdd.take(3)
println(takeRdd.mkString(","))
4. broadcast广播算子:向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用
/**
* 广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个
* 或多个 Spark 操作使用
*/
val rdd1 = sparkContext.parallelize(List(("a", "1"), ("b", "1"), ("c", "1")))
val rdd2 = sparkContext.parallelize(List(("a", "2"), ("b", "2"), ("c", "2")))
val map = Map(("a", "2"), ("b", "2"), ("c", "2"))
val mapBroadcast = sparkContext.broadcast(map)
rdd1.join(rdd2).foreach(println(_))
val res = rdd1.map(ele =>
val value2 = mapBroadcast.value.getOrElse(ele._1, "")
(ele._1, (ele._2, value2))
)
5. Accumulator:分布式累写变量,excutor都安数据不共享, Driver 程序中定义的变量,Executor 端的每个 Task 都会得到这个变量的一份新的副本。每个 task 更新这些副本的值后传回 Driver 端进行 merge
val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
val sparkContext = new SparkContext(conf)
val sourceRdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3))
//获取系统的累加器
//spark 默认提供类简单聚合类的累加器 longAccumulator doubleAccumulator collectionAccumulator
val sumAcc: LongAccumulator = sparkContext.longAccumulator("sum")
sourceRdd.foreach(element =>
sumAcc.add(element)
)
/**
* 出现多加:多次调用行动算子
* 出现少加:没有触发行动算子
* 一般累加器放在行动算子里
*/
//获取累加器的值
println(sumAcc.value)
6. Partitioner:自定义分区器
def main(args: Array[String]): Unit =
val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
val sparkContext = new SparkContext(conf)
val sourceRdd = sparkContext.makeRDD(Spark算子篇 --Spark算子之combineByKey详解