Spark 使用sortByKey进行二次排序
Posted 大冰的小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 使用sortByKey进行二次排序相关的知识,希望对你有一定的参考价值。
Spark的sortByKey API允许自定义排序规则,这样就可以进行自定义的二次排序、三次排序等等。
先来看一下sortByKey的源码实现:
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] =
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
sortByKey(comp, ascending)
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] =
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag] @DeveloperApi() (
self: RDD[P])
extends Logging with Serializable
通过代码我们可以发现要实现自定义的二次排序,则Key必须实现Spark 的Ordered特质和Java的Serializable接口。
Java实现:
首先是Key类的自定义实现:
import scala.math.Ordered;
import java.io.Serializable;
/**
* Key的自定义
* Created by Administrator on 2016/8/14 0014.
*/
public class SecondarySortKey implements Ordered<SecondarySort>, Serializable
public int getFirst()
return first;
public int getSecond()
return second;
public void setFirst(int first)
this.first = first;
public void setSecond(int second)
this.second = second;
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SecondarySort that = (SecondarySort) o;
if (first != that.first) return false;
return second == that.second;
@Override
public int hashCode()
int result = first;
result = 31 * result + second;
return result;
// 需要排序的key
private int first;
private int second;
// 二次排序的公开构造器
public SecondarySortKey(int first, int second)
this.first = first;
this.second = second;
@Override
public int compare(SecondarySort other)
if (this.$greater(other))
return 1;
else if (this.$less(other))
return -1;
return 0;
@Override
public boolean $less(SecondarySort other)
if (this.first < other.first)
return true;
else if (this.first == other.first && this.second < other.second)
return true;
return false;
@Override
public boolean $greater(SecondarySort other)
if (this.first > other.first)
return true;
else if (this.first == other.first && this.second > other.first)
return true;
return false;
@Override
public boolean $less$eq(SecondarySort other)
if (this.$less(other))
return true;
else if (this.first == other.first && this.second == other.second)
return true;
return false;
@Override
public boolean $greater$eq(SecondarySort other)
if (this.$greater(other))
return true;
else if (this.first == other.first && this.second == other.second)
return true;
return false;
@Override
public int compareTo(SecondarySort other)
if (this.$greater(other))
return 1;
else if (this.$less(other))
return -1;
return 0;
二次排序:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* Created by Administrator on 2016/8/14 0014.
*/
public class SecondarySortApp
public static void main(String[] args)
SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("/home/resources/helloSpark.txt");
JavaPairRDD<SecondarySort, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySort, String>()
@Override
public Tuple2<SecondarySort, String> call(String line) throws Exception
String[] splited = line.split(" ");
SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]), Integer.valueOf(splited[1]));
return new Tuple2<SecondarySort, String>(key, line);
);
JavaPairRDD<SecondarySort, String> sorted = pairs.sortByKey(); // 完成二次排序
JavaRDD<String> result = sorted.map(new Function<Tuple2<SecondarySort,String>, String>()
@Override
public String call(Tuple2<SecondarySort, String> v1) throws Exception
return v1._2;
);
for (String s : result.collect())
System.out.println(s);
sc.stop();
Scala 版本实现
Key:
package com.spark.App
/**
* Created by Administrator on 2016/8/14 0014.
*/
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable
override def compare(other: SecondarySortKey): Int =
if (this.first > other.first || (this.first == other.first && this.second > other.second))
return 1;
else if (this.first < other.first || (this.first == other.first && this.second < other.second))
return -1;
return 0;
二次排序:
package com.spark.App
import org.apache.spark.SparkContext, SparkConf
/**
* Created by Administrator on 2016/8/14 0014.
*/
object SecondarySortApp
def main(args: Array[String])
val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("/home/resources/helloSpark.txt")
val pairRDD = lines.map(line =>
val splited = line.split(" ")
val key = new SecondarySortKey(splited(0).toInt, splited(1).toInt)
(key, line)
)
val sorted = pairRDD.sortByKey(false)
val result = sorted.map(item => item._2)
result.collect().foreach(println)
以上是关于Spark 使用sortByKey进行二次排序的主要内容,如果未能解决你的问题,请参考以下文章
spark wordcont Spark: sortBy和sortByKey函数详解