Spark自定义排序与分区
Posted lsbigdata
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark自定义排序与分区相关的知识,希望对你有一定的参考价值。
Spark自定义排序与分区
前言:
随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中;由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代。作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力。Spark无论是在数据处理还是数据分析、挖掘方面都展现出了强大的主导能力。其分布式计算能力受到越来越多的青睐。本文将介绍spark的排序以及分区。
一、Spark自定义排序
在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些api能获得不少的便利。但是有的时候这些默认的规则并不足以实现我们的目的,这时候需要我们了解其底层原理,编写一套适合我们需求的处理逻辑。下面通过代码简单介绍一下spark如何自定义排序。
import org.apache.spark.rdd.RDD import org.apache.spark.SparkConf, SparkContext object CustomSort1 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]") val sc = new SparkContext(conf) //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序 val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99") //将Driver端的数据并行化变成RDD val lines: RDD[String] = sc.parallelize(users) //切分整理数据 val userRDD: RDD[User] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt //(name, age, fv) new User(name, age, fv) ) //不满足要求 //tpRDD.sortBy(tp => tp._3, false) //将RDD里面装的User类型的数据进行排序 val sorted: RDD[User] = userRDD.sortBy(u => u) val r = sorted.collect() println(r.toBuffer) sc.stop() class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable override def compare(that: User): Int = if(this.fv == that.fv) this.age - that.age else -(this.fv - that.fv) override def toString: String = s"name: $name, age: $age, fv: $fv"
对于自定义排序有多种方式实现:
1、User类继承Ordered使User类变成可排序的类。在spark中由于我们虽然测试是在本地测试,但是他会模拟集群模式,所以我们自定义的object在运行时会shuffle有网络传输会涉及序列化的问题。所以需要同时继承Serializable。
2、使用case class样例类:
case class Man(age: Int, fv: Int) extends Ordered[Man]
不需要继承序列化类,case class默认已经实现序列化。
3、定义样例类隐式排序规则
object SortRules implicit object OrderingUser extends Ordering[User] override def compare(x: User, y: User): Int = if(x.fv == y.fv) x.age - y.age else y.fv - x.fv
主程序代码:
//切分整理数据 val tpRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序) import SortRules.OrderingUser val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => User(tp._2, tp._3))
4、某些特殊数据类型不需要自定义,使用原生api更方便。
//充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
5、将排序规则添加到隐士转换中
//Ordering[(Int, Int)]最终比较的规则格式 //on[(String, Int, Int)]未比较之前的数据格式 //(t =>(-t._3, t._2))怎样将规则转换成想要比较的格式 implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2)) val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)
二、Spark自定义分区器
1、combineByKey
在reduceByKey、groupByKey等算子都基于combineByKey算子实现。这是一个底层的算子,可以自定义一些规则,比较灵活。
Rdd.combineByKey(x=>x,(m:Int,n:Int)=>m+n,(a:Int,B:Int)=>a+b,new HashPartition(2),true,null)
参数解释:
(1)、相同key的value放入一个分区
(2)、局部聚合
(3)、全局聚合
(4)、分区数(可以设置分区数)
(5)、是否进行map端局部聚合
(6)、序列化参数
conbineByKey是一个较为底层的api,一般情况下可能不会用到它,但是当一些高级api满足不了我们的需求的时候它给我们提供了解决便利。
2、自定义分区器
在spark计算中不可避免的会涉及到shuffle,数据会根据不同的规则有分区器分发到不同的分区中。所以分区器决定了上游的数据发送到哪个下游。以不同专业学生数据计算不同专业的学生成绩。分组取topN :
(1)、自定义分区器
//自定义分区器:majors:专业集合 class MajorParitioner(majors: Array[String]) extends Partitioner //相当于主构造器(new的时候回执行一次) //用于存放规则的一个map val rules = new mutable.HashMap[String, Int]() var i = 0 for(major<- majors) //rules(major) = i rules.put(major, i) i += 1 //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = majors.length //根据传入的key计算分区标号 //key是一个元组(String, String) override def getPartition(key: Any): Int = //获取key val major= key.asInstanceOf[(String, String)]._1 //根据规则计算分区编号 rules(major)
(2)、使用自定义分区器
//调用自定义的分区器,并且按照指定的分区器进行分区 val majorPatitioner = new MajorParitioner(subjects); //partitionBy按照指定的分区规则进行分区 //调用partitionBy时RDD的Key是(String, String) val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(majorPatitioner ) //如果一次拿出一个分区(可以操作一个分区中的数据了) val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => //将迭代器转换成list,然后排序,在转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator ) // val r: Array[((String, String), Int)] = sorted.collect()
通过这样自定义分区器后,数据通过shuffle之后每个分区的数据就是一个专业的学生数据,对这个分区的数据排序后取出前N个就是所需结果了。但是这个程序中还是会出现一个问题,当数据量太大的时候可能会导致内存溢出的情况,因为我们是将数据放到了list中进行排序,而list是存放于内存中。所以会导致内存溢出。那么怎么才能避免这个情况呢。我们可以在mapPartitions内部定义一个集合,不加载所有数据。,每次将这个集合排序后最小的值移除,通过多次循环后最终集合中剩下的就是需要的结果。
三、总结
无论是排序还是分区,在spark中都封装了高级的api共我们使用,但是他不会适用于所有情况,只会适用与部分情况,而通过对这些api的底层实现了解,通过自定义规则可以编辑一套适合于我们需求的程序。这样一来可以大大提高效率。没有什么能适配万物,随机应变才是取胜之道。
以上是关于Spark自定义排序与分区的主要内容,如果未能解决你的问题,请参考以下文章