Spark- 自定义排序
Posted rzcong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark- 自定义排序相关的知识,希望对你有一定的参考价值。
考察spark自定义排序
方式一:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成自定义类类型的RDD,使用本身排序即可。
package com.rz.spark.base import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} // 自定义排序 object CustomSort1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 78 100", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[User] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt //(name, age, fv) new User(name, age, fv) }) // 不满足要求 // tpRDD.sortBy(tp=> tp._3, false) // 将RDD里面封装在User类型的数据进行排序 val sorted: RDD[User] = userRDD.sortBy(u=>u) val result = sorted.collect() println(result.toBuffer) sc.stop() } } // shuffle时数据要通过网络传输,需要对数据进行序列化 class User(val name:String, val age:Int, val fv:Int) extends Ordered[User] with Serializable { override def compare(that: User): Int = { if (this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } override def toString: String = s"name: $name, age: $age, fv: $fv" }
方式2:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成元组类型的RDD,使用就自定义类做排序规则。
package com.rz.spark.base import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CustomSort2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) class Boy不是多例 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> new Boy(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() } } // shuffle时数据要通过网络传输,需要对数据进行序列化 class Boy(val age:Int, val fv:Int) extends Ordered[Boy] with Serializable { override def compare(that: Boy): Int = { if (this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } }
方式3:作用多例的case class来做排序规则
package com.rz.spark.base import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CustomSort3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Man(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() } } // shuffle时数据要通过网络传输,需要对数据进行序列化 // case class 本身已经实现序列化且多例 (缺点是规则写死,无法用新的规则排序,可用隐式转换实现) case class Man(age:Int, fv:Int) extends Ordered[Man]{ override def compare(that: Man): Int = { if (this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } }
方式4,通过隐式参数指定灵活的排序规则
package com.rz.spark.base import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CustomSort4 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 传入一个Ordering类型的隐式参数 import SortRules.OrderingHero val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Hero(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() } } // shuffle时数据要通过网络传输,需要对数据进行序列化 // case class 本身已经实现序列化,不指定固定的排序规则,由隐式参数指定 case class Hero(age:Int, fv:Int)
方式5:元组有自己的compareTo方法,充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个。如果还满足不了再自定义排序的类来排序
package com.rz.spark.base import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CustomSort5 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2)) val result = sorted.collect() println(result.toBuffer) sc.stop() } }
方式6:和方式5相似,但是用到自定义的隐式参数作排序规则
package com.rz.spark.base import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CustomSort6 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2)) val result = sorted.collect() println(result.toBuffer) sc.stop() } }
以上是关于Spark- 自定义排序的主要内容,如果未能解决你的问题,请参考以下文章