Scala实现Mapreduce程序4-----数据去重

Posted 流浪在伯纳乌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Scala实现Mapreduce程序4-----数据去重相关的知识,希望对你有一定的参考价值。

数据去重,key只输出一次

scala实现:先groupByKey(),然后SortByKey(),然后输出keys

 

object Reduplicate {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("remove duplication");
val sc=new SparkContext(conf);
val line=sc.textFile("");
line.filter(_.trim.length>0).map(line=>(line.trim,"")).groupByKey().sortByKey().keys.collect()

}

}

MapReduce实现:以整个数据作为key,reduce过程中输出key就可以了

package HadoopvsSpark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;

//对数据进行去重,以整个数据作为key,reduce过程中输出key就可以了

/**
* Created by Administrator on 2017/5/25.
*/
public class Duplicate {
public static class Map extends Mapper<Object,Text,Text,Text>{

private static Text text=new Text( );
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
text=value;
context.write( new Text(text),new Text(" ") );
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
context.write( key,new Text( "" ) );
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job=new Job();
Configuration conf=new Configuration();
conf.set("mapred.job.tracker","192.169.1.101:8200");

String[] input=new String[]{"a","s"};
String[] otherArgs=new GenericOptionsParser(conf,input).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}

job.setJarByClass(Duplicate.class);
FileInputFormat.addInputPath( job,new Path( otherArgs[0] ) );
FileOutputFormat.setOutputPath( job,new Path(otherArgs[1]) );

job.setOutputKeyClass( Text.class);
job.setOutputValueClass( Text.class );
job.setMapperClass( Map.class );
job.setCombinerClass( Reduce.class );
job.setReducerClass( Reduce.class);
System.out.println(job.waitForCompletion( true )? 1:0 );
}

}

以上是关于Scala实现Mapreduce程序4-----数据去重的主要内容,如果未能解决你的问题,请参考以下文章

Scala实现Mapreduce程序2-----Top5

Shell脚本实现MapReduce统计单词数程序

Hadoop中的MapReduce——分布式离线计算框架

mapreduce 实现数子排序

javaApi,mapreduce,awk,scala??????????????????????????????

Mapreduce+Hive+Spark+Scala平台搭建