MapReduce实现数据去重

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实现数据去重相关的知识,希望对你有一定的参考价值。

一、原理分析

  Mapreduce的处理过程,由于Mapreduce会在Map~reduce中,将重复的Key合并在一起,所以Mapreduce很容易就去除重复的行。Map无须做任何处理,设置Map中写入context的东西为不作任何处理的行,也就是Map中最初处理的value即可,而Reduce同样无须做任何处理,写入输出文件的东西就是,最初得到的Key。

  我原来以为是map阶段用了hashmap,根据hash值的唯一性。估计应该不是...

  Map是输入文件有几行,就运行几次。

二、代码

2.1 Mapper

package algorithm;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DuplicateRemoveMapper extends
		Mapper<LongWritable, Text, Text, Text> {
	//输入文件是数字 不过可能也有字符等 所以用Text,不用LongWritable
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		context.write(value, new Text());//后面不能是null,否则,空指针

	}

}

  

2.2 Reducer

package algorithm;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DuplicateRemoveReducer extends Reducer<Text, Text, Text, Text> {

	public void reduce(Text key, Iterable<Text> value, Context context)
			throws IOException, InterruptedException {
		// process values
		context.write(key, null); //可以出处null
	}

}

  

2.3 Main

package algorithm;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DuplicateMainMR  {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Configuration conf = new Configuration(); 
		Job job = new Job(conf,"DuplicateRemove");
		job.setJarByClass(DuplicateMainMR.class);
		job.setMapperClass(DuplicateRemoveMapper.class);
		job.setReducerClass(DuplicateRemoveReducer.class);
		job.setOutputKeyClass(Text.class);
		//输出是null,不过不能随意写  否则包类型不匹配
		job.setOutputValueClass(Text.class);
		
		job.setNumReduceTasks(1);
		//hdfs上写错了文件名 DupblicateRemove  多了个b
		//hdfs不支持修改操作
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DupblicateRemove/DuplicateRemove.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DuplicateRemove/DuplicateRemoveOut"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

  

三、输出分析

3.1 输入与输出

没啥要对比的....不贴了

3.2 控制台

 

doop.mapreduce.Job.updateStatus(Job.java:323)
  INFO - Job job_local4032991_0001 completed successfully
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:765)
  INFO - Counters: 38
	File System Counters
		FILE: Number of bytes read=560
		FILE: Number of bytes written=501592
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=48
		HDFS: Number of bytes written=14
		HDFS: Number of read operations=13
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Map-Reduce Framework
		Map input records=8
		Map output records=8
		Map output bytes=26
		Map output materialized bytes=48
		Input split bytes=142
		Combine input records=0
		Combine output records=0
		Reduce input groups=6
		Reduce shuffle bytes=48
		Reduce input records=8
		Reduce output records=6
		Spilled Records=16
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=4
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
		Total committed heap usage (bytes)=457179136
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=24
	File Output Format Counters 
		Bytes Written=14
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323)
 DEBUG - stopping client from cache: [email protected]
 DEBUG - removing client from cache: [email protected]
 DEBUG - stopping actual client because no more references remain: [email protected]
 DEBUG - Stopping client
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: closed
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: stopped, remaining connections 0

 

以上是关于MapReduce实现数据去重的主要内容,如果未能解决你的问题,请参考以下文章

mapreduce实现数据去重

MapReduce去重

MapReduce实现数据去重

Scala实现Mapreduce程序4-----数据去重

实验5 MapReduce初级编程实践——编程实现文件合并和去重操作

实验5 MapReduce初级编程实践——编程实现文件合并和去重操作