Spark 中的二次排序Java实现
Posted ywendeng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 中的二次排序Java实现相关的知识,希望对你有一定的参考价值。
需求描述:对文本中数据记录进行排序,如果第一行数据记录相同,则根据第二行数据进行排序
编程思想描述:首先,将需要将待排序的字段封装成一个类,该实现了Ordered和Serializable 接口,实现接口中的方法。同时为待排序的属性字段提供getter 、setter 、hashcode 以及 equals 方法。在application 应用程序中 将key 封装为之前我们定义好的对象,之后调用sortedByKey() 方法进行排序。 下面是一个完整的实现示例。
1、需要处理的原文本数据sort.txt 示例:
1 5
4 3
5 7
1 3
4 2
3 8
9 1
2 8
2、处理后结果示例:
1 3
1 5
2 8
3 8
4 2
4 3
5 7
9 1
3、将待排序字段封装为自定义类
package ywendeng.Spark_study_java.core;
import java.io.Serializable;
import scala.math.Ordered;
public class SortedKey implements Ordered<SortedKey>,Serializable
private static final long serialVersionUID = 1L;
//自定义需要排序的属性,并设置属性对应的getter、setter以及hashcode方法
int first;
int second;
public SortedKey(int first,int second)
this.first=first;
this.second=second;
public boolean $greater(SortedKey other)
if(this.first>other.getFirst())
return true;
else if(this.first==other.getFirst() && this.second>other.getSecond())
return true;
return false;
public boolean $greater$eq(SortedKey other)
if(this.$greater(other))
return true;
else if(this.first==other.getFirst() && this.second==other.getSecond())
return true;
return false;
public boolean $less(SortedKey other)
if(this.first<other.getFirst())
return true;
else if(this.first==other.getFirst() && this.second<other.getSecond())
return true;
return false;
public boolean $less$eq(SortedKey other)
if(this.$less(other))
return true;
else if(this.first==other.getFirst() && this.second==other.getSecond())
return true;
return false;
public int compare(SortedKey other)
if((this.first-other.getFirst())!=0)
return this.first-other.getFirst();
else
return this.second-other.getSecond();
public int compareTo(SortedKey other)
if((this.first-other.getFirst())!=0)
return this.first-other.getFirst();
else
return this.second-other.getSecond();
public int getFirst()
return first;
public void setFirst(int first)
this.first = first;
public int getSecond()
return second;
public void setSecond(int second)
this.second = second;
@Override
public int hashCode()
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
@Override
public boolean equals(Object obj)
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SortedKey other = (SortedKey) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
4、实现application 程序
package ywendeng.Spark_study_java.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class SeconedSort
public static void main(String[] args)
//二次排序需要将需要排序的字段 封装为自定义排序的key类
SparkConf conf= new SparkConf()
.setAppName("SecondSort")
.setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String>str=sc.textFile("C:\\\\Users\\\\JimG\\\\Desktop\\\\sort.txt");
//将str RDD 进行封装为自定的key方式
JavaPairRDD<SortedKey, String> strKey=str.mapToPair(new PairFunction<String, SortedKey,String>()
private static final long serialVersionUID = 1L;
public Tuple2<SortedKey, String> call(String line) throws Exception
String[]key=line.split("\\t");
SortedKey sortedKey=new SortedKey(Integer.parseInt(key[0]), Integer.parseInt(key[1]));
return new Tuple2<SortedKey, String>(sortedKey, line);
);
//对新生成了key-value的键值对调用sortedByKey的方法,按照key进行排序
JavaPairRDD<SortedKey, String> sortedStrKey=strKey.sortByKey();
sortedStrKey.foreach(new VoidFunction<Tuple2<SortedKey,String>>()
private static final long serialVersionUID = 1L;
public void call(Tuple2<SortedKey, String> t) throws Exception
System.out.println(t._2);
);
sc.close();
以上是关于Spark 中的二次排序Java实现的主要内容,如果未能解决你的问题,请参考以下文章