MapReduce程序之数据去重
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce程序之数据去重相关的知识,希望对你有一定的参考价值。
[toc]MapReduce程序之数据去重
需求
有下面两个文件:
[email protected]:~/data/input/duplication$ cat file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
[email protected]:~/data/input/duplication$ cat file2.txt
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
要求去除两个文件中的重复行并输出到文件中。
MapReduce程序
关于如何去重,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序,程序代码如下:
package com.uplooking.bigdata.mr.duplication;
import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* 数据去重
*/
public class DuplicationJob {
/**
* 驱动程序,使用工具类来生成job
*
* @param args
*/
public static void main(String[] args) throws Exception {
if (args == null || args.length < 2) {
System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
System.exit(-1);
}
Job job = MapReduceJobUtil.buildJob(new Configuration(),
DuplicationJob.class,
args[0],
TextInputFormat.class,
DuplicationMapper.class,
Text.class,
NullWritable.class,
new Path(args[1]),
TextOutputFormat.class,
DuplicationReducer.class,
Text.class,
NullWritable.class);
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
/**
* 单词去重Mapper操作,主要是将每一行的内容作为key输出到reduce中
* 即map输出的key为某一行的内容,value为NullWritable
*/
public static class DuplicationMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接将行作为key写到context中
context.write(value, NullWritable.get());
}
}
/**
* 因为shuffle会将map输出中key相同的key-value对都拉取到同一个reducer中
* 所以数据到达reducer后,key就是唯一的key,而values则为空的集合
* 所以在reducer中也是直接将数据写入到context中,让reducer写出数据即可
*/
public static class DuplicationReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
}
测试
这里使用本地环境来运行MapReduce程序,输入的参数如下:
/Users/yeyonghao/data/input/duplication /Users/yeyonghao/data/output/mr/duplication
也可以将其打包成jar包,然后上传到Hadoop环境中运行。
运行程序后,查看输出结果如下:
[email protected]:~/data/output/mr/duplication$ cat part-r-00000
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
可以看到,使用我们的MapReduce程序,已经完成了数据去重的目的。
数据去重的另一种思路
除了前面的数据去重思路外,还有另外一种思路,虽然相对会复杂一些,不过这里也提及一下。
Map处理时,可以将每一行数据处理成<2012-3-3, c>
的形式,然后输出,作为reduce的输入,比如经过shuffle之后,到达reducer的输入为<2012-3-3, [c, c]>,那么就可以对迭代列表中的数据通过Java中的set集合来进行去重,这也可以作为解决这个上面的数据去重案例的一种思路,但显然这个方法没有前面的方法好,不具有通用性,所以还是建议使用第一种方法。
以上是关于MapReduce程序之数据去重的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop之MapReduce学习之ip去重MaxScore示例TotalScoreMapper示例