Hadoop 综合揭秘——MapReduce 编程实例(详细介绍 CombinePartitionerWritableComparableWritableComparator 使用方式)

Posted tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 综合揭秘——MapReduce 编程实例(详细介绍 CombinePartitionerWritableComparableWritableComparator 使用方式)相关的知识,希望对你有一定的参考价值。

前言

本文主要介绍 MapReduce 的原理及开发,讲解如何利用 Combine、Partitioner、WritableComparator等组件对数据进行排序筛选聚合分组的功能。
由于文章是针对开发人员所编写的,在阅读本文前,文章假设读者已经对Hadoop的工作原理、安装过程有一定的了解,因此对Hadoop的安装就不多作说明。请确保源代码运行在Hadoop 2.x以上版本,并以伪分布形式安装以方便进行调试(单机版会对 Partitioner 功能进行限制)。
文章主要利用例子介绍如何利用 MapReduce 模仿 SQL 关系数据库进行SELECT、WHERE、GROUP、JOIN 等操作,并对 GroupingComparator、SortComparator 等功能进行说明。
希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。

 

目录

一、MapReduce 工作原理简介

二、MapReduce 开发实例

三、利用 Partitioner 控制键值分配

四、利用 Combiner 提高系统性能

五、WritableComparatable 自定义键值说明

六、实现数据排序与分组

七、数据集连接处理方式介绍

 

 

 

一、MapReduce 工作原理简介

对Hadoop有兴趣的朋友相信对Hadoop的主要工作原理已经有一定的认识,在讲解MapReduce的程序开发前,本文先针对Mapper、Reducer、Partitioner、Combiner、Suhffle、Sort的工作原理作简单的介绍,以帮助各位更好地了解后面的内容。

图 1.1

1.1 Mapper 阶段

当系统对数据进行分片后,每个输入分片会分配到一个Mapper任务来处理,默认情况下系统会以HDFS的一个块大小64M作为分片大小,当然也可以通过配置文件设置块的大小。随后Mapper节点输出的数据将保存到一个缓冲区中(缓冲区的大小默认为512M,可通过mapreduce.task.io.sort.mb属性进行修改),缓冲区越大排序效率越高。当该缓冲区快要溢出时(缓冲区默认大小为80%,可通过mapreduce.map.sort.spill.percent属性进行修改),系统会启动一个后台线程,将数据传输到会到本地的一个文件当中。

1.2 Partitioner 阶段

在Mapper完成 KEY/VALUE 格式的数据操作后,Partitioner 将会被调用,由于真实环境中 Hadoop 可能会包含几十个甚至上百个Reducer ,Partitioner 的主要作用就是根据自定义方式确定数据将被传输到哪一个Reducer进行处理。

1.3 Combiner 阶段

如果系统定义了Combiner,在经过 Partitioner 排序处理后将会进行 Combiner处理。我们可以把 Combiner 看作为一个小型的 Reducer ,由于数据从 Mapper 通过网络传送到 Reducer ,资源开销很大,Combiner 目的就是在数据传送到Reducer前作出初步聚集处理,减少服务器的压力。如果数据量太大,还可以把 mapred.compress.map.out 设置为 true,就可以将数据进行压缩。(关于数据压缩的内容已经超越本文的讨论范围,以后会有独立的篇章针对数据压缩进行专题讨论,敬请期待)

1.4 Shuffle 阶段

在 Shuffle 阶段,每个 Reducer 会启动 5 个线程(可通过 mapreduce.reduce.shuffle.parallelcopies 进行设置)通过HTTP协议获取Mapper传送过来的数据。每次数据发送到 Reducer 前,都会根据键先进行排序。开发人员也可通过自定义的 SortComparator 进行数据排序,也是根据 GroupComparator 按照数据的其他特性进行分组处理,下面章节将会详细举例介绍。对数据进行混洗、排序完成后,将传送到对应的Reducer进行处理。

1.5 Reducer 阶段

当 Mapper 实例完成输入的数据超过设定值后(可通过mapreduce.job.reduce.slowstart.completedmaps 进行设置), Reducer 就会开始执行。Reducer 会接收到不同 Mapper 任务传来已经过排序的数据,并通过Iterable 接口进行处理。在 Partitioner 阶段,系统已定义哪些数据将由个 Reducer 进行管理。当 Reducer 检测到 KEY 时发生变化时,系统就会按照已定的规则生成一个新的 Reducer 对数据进行处理。
如果 Reducer 端接受的数据量较小,数据则可直接存储在内存缓冲区中,方便后面的数据输出(缓冲区大小可通过mapred.job.shuffle.input.buffer.percent 进行设置)
如果数据量超过了该缓冲区大小的一定比例(可以通过 mapred.job.shuffle.merge.percent 进行设置),数据将会被合并后写到磁盘中。

回到目录

二、MapReduce 开发实例

上一章节讲解了 MapReduce 的主要流程,下面将以几个简单的例子模仿 SQL 关系数据库向大家介绍一下 MapReduce 的开发过程。

HDFS常用命令  (此处只介绍几个常用命令,详细内容可在网上查找)

  • 创建目录    hdfs dfs -mkdir -p 【Path】 
  • 复制文件    hdfs dfs -copyFromLocal 【InputPath】【OutputPath】
  • 查看目录    hdfs dfs -ls 【Path】
  • 运行JAR    hadoop jar 【Jar名称】 【Main类全名称】 【InputPath】 【OutputPath】 

2.1 使用 SELECT 获取数据

应用场景:假设在 hdfs 文件夹 / input / 20180509 路径的 *.dat 类型文件中存放在着大量不同型号的 iPhone 手机当天在不同地区的销售记录,系统想对这些记录进行统计,计算出不同型号手机的销售总数。

计算时,在Mapper中获取每一行的信息,并把iPhone名称作为Key插入,把数据作为Value插入到Context当中。
当Reducer接收到相同Key数据后,再作统一处理。

注意  :   当前例子当中  Mapper 的输入 Key 为  LongWritable 长类型

在此过程中要注意几点: 例子中 SaleManager 继承了 org.apache.hadoop.conf.Configured 类并实现了 org.apache.hadoop.util.Tool 接口的 public static int run(Configuration conf,Tool tool, String[] args) 方法,MapReduce的相关操作都在run里面实现。由于 Configured 已经实现了 getConf() 与setConfig() 方法,创建Job时相关的配置信息就可通过getConf()方法读入。

系统可以通过以下方法注册Mapper及Reducer处理类
Job.setMapperClass(MyMapper.class);
Job.setReducerClass(MyReducer.class);

在整个运算过程当中,数据会经过筛选与计算,所以Mapper的读入信息K1,V1与Reducer的输出信息K3,V3不一定是同一格式。
org.apache.hadoop.mapreduce.Mapper<K1,V1,K2,V2>
org.apache.hadoop.mapreduce.Reducer<K2,V2,K3,V3>

当Mapper的输出的键值类型与Reduces输出的键值类型相同时,系统可以通过下面方法设置转出数据的格式
Job.setOutputKeyClass(K);
Job.setOutputValueClass(V);

当Mapper的输出的键值类型与Reduces输出的键值类型不相同时,系统则需要通过下面方法设置Mapper转出格式
Job.setMapOutputKeyClass(K);
Job.setMapOutputValueClass(V);

 1 public class Phone {
 2     public String type;
 3     public Integer count;
 4     public String area;
 5     
 6     public Phone(String line){
 7        String[] data=line.split(",");
 8        this.type=data[0].toString();
 9        this.count=Integer.valueOf(data[1].toString());
10        this.area=data[2].toString();
11     }
12     
13     public String getType(){
14         return this.type;
15     }
16     
17     public Integer getCount(){
18         return this.count;
19     }
20     
21     public String getArea(){
22         return this.area;
23     }
24 }
25 
26 public class SaleManager extends Configured implements Tool{
27     public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
28         public void map(LongWritable key,Text value,Context context)
29             throws IOException,InterruptedException{
30             String data=value.toString();
31             Phone iPhone=new Phone(data);
32             //以iPhone型号作为Key,数量为作Value传入
33             context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
34         }
35     }
36     
37     public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
38         public void reduce(Text key,Iterable<IntWritable> values,Context context)
39             throws IOException,InterruptedException{
40             int sum=0;
41             //对同一型号的iPhone数量进行统计
42             for(IntWritable val : values){
43                 sum+=val.get();
44             }
45             context.write(key, new IntWritable(sum));
46         }
47     }
48 
49     public int run(String[] arg0) throws Exception {
50         // TODO 自动生成的方法存根
51         // TODO Auto-generated method stub
52         Job job=Job.getInstance(getConf());
53         job.setJarByClass(SaleManager.class);
54         //注册Key/Value类型为Text
55         job.setOutputKeyClass(Text.class);
56         job.setOutputValueClass(IntWritable.class);
57         //注册Mapper及Reducer处理类
58         job.setMapperClass(MyMapper.class);
59         job.setReducerClass(MyReducer.class);
60         //输入输出数据格式化类型为TextInputFormat
61         job.setInputFormatClass(TextInputFormat.class);
62         job.setOutputFormatClass(TextOutputFormat.class);
63         //默认情况下Reducer数量为1个(可忽略)
64         job.setNumReduceTasks(1);
65         //获取命令参数
66         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
67         //设置读入文件路径
68         FileInputFormat.setInputPaths(job,new Path(args[0]));
69         //设置转出文件路径
70         FileOutputFormat.setOutputPath(job,new Path(args[1]));
71         boolean status=job.waitForCompletion(true);
72         if(status)
73             return 0;
74         else
75             return 1;
76     }
77     
78     public static void main(String[] args) throws Exception{
79         Configuration conf=new Configuration();
80         ToolRunner.run(new SaleManager(), args);
81     }
82 }

计算结果

 

2.2 使用 WHERE 对数据进行筛选

在计算过程中,并非所有的数据都适用于Reduce的计算,由于海量数据是通过网络传输的,所消耗的 I/O 资源巨大,所以可以尝试在Mapper过程中提前对数据进行筛选。以上面的数据为例,当前系统只需要计算输入参数地区的销售数据。此时只需要修改一下Mapper类,重写setup方法,通过Configuration类的 public String[] Configuration.getStrings(参数名,默认值) 方法获取命令输入的参数,再对数据进行筛选。

 1 public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
 2         private String area;
 3         
 4         @Override
 5         public void setup(Context context){
 6             this.area=context.getConfiguration().getStrings("area", "BeiJing")[0];
 7         }
 8         
 9         public void map(LongWritable key,Text value,Context context)
10             throws IOException,InterruptedException{
11             String data=value.toString();
12             Phone iPhone=new Phone(data);
13             if(this.area.equals(iPhone.area))
14                 context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
15         }
16     }

执行命令 hadoop jar 【Jar名称】 【Main类全名称】-D 【参数名=参数值】 【InputPath】 【OutputPath】
例如:hadoop jar hadoopTest-0.2.jar sun.hadoopTest.SaleManager -D area=BeiJing /tmp/input/050901 /tmp/output/050901 
此时系统将选择 area 等于BeiJing 的数据进行统计
计算结果

回到目录

三、利用 Partitioner 控制键值分配

3.1 深入分析 Partitioner

Partitioner 类在 org.apache.hadoop.mapreduce.Partitioner 中,通过 Job.setPartitionerClass(Class<? extends Partitioner> cls) 方法可绑定自定义的 Partitioner。若用户没有实现自定义Partitioner 时,系统将自动绑定 Hadoop 的默认类 org.apache.hadoop.mapreduce.lib.partiton.HashPartitioner 。Partitioner 包含一个主要方法是 int getPartition(K key,V value,int numReduceTasks) ,功能是控制将哪些键分配到哪个 Reducer。此方法的返回值是 Reducer 的索引值,若系统定义了4个Reducer,其返回值为0~3。numReduceTasks 侧是当前系统的 Reducer 数量,此数量可通过Job.setNumReduceTasks(int tasks) 进行设置,在伪分布环境下,其默认值为1。 

注意:

在单机环境下,系统只会使用一个 Reducer,这将导致 Partitioner 缺乏意义,这也是在本文引言中强调要使用伪分布环境进行调试的原因 。

通过反编译查看 HashPartitioner ,可见系统是通过(key.hashCode() & Interger.MAX_VALUE )%numReduceTasks 方法,根据 KEY 的 HashCode 对 Reducer 数量求余方式,确定数据分配到哪一个 Reducer 进行处理的。但如果想根据用户自定义的逻辑把数据分配到对应 Reducer,单依靠 HashPartitioner 是无法实现的,此时侧需要自定义 Partitioner 。

1 public class HashPartitioner<K, V> extends Partitioner<K, V> {
2 
3   public int getPartition(K key, V value, int numReduceTasks) {
4     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
5   }
6 }

3.2 自定义 Partitioner

在例子当中,假设系统需要把北、上、广、深4个不同的地区的iPhone销售情况分别交付给不同 Reducer 进行统计处理。我们可以自定义一个 MyPartitioner, 通过 Job.setPartitionerClass( MyPartitioner.class ) 进行绑定。通过 Job.setNumReduceTasks(4) 设置4个Reducer 。以手机类型作为KEY,把销售数据与地区作为VALUE。在 int getPartition(K key,V value,int numReduceTasks) 方法中,根据 VALUE 值的不同返回不同的索引值。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString();
  9        this.count=Integer.valueOf(data[1].toString());
 10        this.area=data[2].toString();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class MyPatitional extends Partitioner<Text,Text> {
 27 
 28     @Override
 29     public int getPartition(Text arg0, Text arg1, int arg2) {
 30         // TODO 自动生成的方法存根
 31         String area=arg1.toString().split(",")[0];
 32         // 根据不同的地区返回不同的索引值       
 33         if(area.contentEquals("BeiJing"))
 34             return 0;
 35         if(area.contentEquals("GuangZhou"))
 36             return 1;
 37         if(area.contentEquals("ShenZhen"))
 38             return 2;
 39         if(area.contentEquals("ShangHai"))
 40             return 3;
 41         return 0;
 42     }
 43 }    
 44 
 45 public class SaleManager extends Configured implements Tool{
 46     public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
 47         
 48         public void map(LongWritable key,Text value,Context context)
 49             throws IOException,InterruptedException{
 50             String data=value.toString();
 51             Phone iPhone=new Phone(data);
 52             context.write(new Text(iPhone.getType()), new Text(iPhone.getArea()+","+iPhone.getCount().toString()));
 53         }
 54     }
 55     
 56     public static class MyReducer extends Reducer<Text,Text,Text,IntWritable>{
 57         
 58         public void reduce(Text key,Iterable<Text> values,Context context)
 59             throws IOException,InterruptedException{
 60             int sum=0;
 61             //对同一型号的iPhone数量进行统计
 62             for(Text value : values){
 63                 String count=value.toString().split(",")[1];
 64                 sum+=Integer.valueOf(count).intValue();
 65             }
 66             context.write(key, new IntWritable(sum));
 67         }
 68     }
 69 
 70     public int run(String[] arg0) throws Exception {
 71         // TODO 自动生成的方法存根
 72          // TODO Auto-generated method stub
 73         Job job=Job.getInstance(getConf());
 74         job.setJarByClass(SaleManager.class);
 75         //注册Key/Value类型为Text
 76         job.setOutputKeyClass(Text.class);
 77         job.setOutputValueClass(IntWritable.class);
 78         //若Map的转出Key/Value不相同是需要分别注册
 79         job.setMapOutputKeyClass(Text.class);
 80         job.setMapOutputValueClass(Text.class);
 81         //注册Mapper及Reducer处理类
 82         job.setMapperClass(MyMapper.class);
 83         job.setReducerClass(MyReducer.class);
 84         //输入输出数据格式化类型为TextInputFormat
 85         job.setInputFormatClass(TextInputFormat.class);
 86         job.setOutputFormatClass(TextOutputFormat.class);
 87         //设置Reduce数量为4个,伪分布式情况下不设置时默认为1
 88         job.setNumReduceTasks(4);
 89         //注册自定义Partitional类
 90         job.setPartitionerClass(MyPatitional.class);
 91         //获取命令参数
 92         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
 93         //设置读入文件路径
 94         FileInputFormat.setInputPaths(job,new Path(args[0]));
 95         //设置转出文件路径
 96         FileOutputFormat.setOutputPath(job,new Path(args[1]));
 97         boolean status=job.waitForCompletion(true);
 98         if(status)
 99             return 0;
100         else
101             return 1;
102     }
103     
104     public static void main(String[] args) throws Exception{
105         Configuration conf=new Configuration();
106         ToolRunner.run(new SaleManager(), args);
107     }
108 }

计算结果


回到目录

四、利用 Combiner 提高系统性能

在前面几节所描述的例子当中,我们都是把所有的数据完整发送到 Reducer 中再作统计。试想一下,在真实环境当中,iPhone 的销售记录数以千万计,如此巨大的数据需要在 Mapper/Reducer 当中进行传输,将会耗费多少的网络资源。这么多年来 iPhone 出品的机型不过十多个,系统能否先针对同类的机型在Mapper端作出初步的聚合计算,再把计算结果发送到 Reducer。如此一来,传到 Reducer 端的数据量将会大大减少,只要在适当的情形下使用将有利于系统的性能提升。
针对此类问题,Combiner 应运而生,我们可以把 Combiner 看作为一个小型的 Reducer ,它的目的就是在数据传送到Reducer前在Mapper中作出初步聚集处理,减少服务器之间的 I/O 数据传输压力。Combiner 也继承于Reducer,通过Job.setCombinerClass(Class<? extends Reducer> cls) 方法进行注册。
下面继续以第3节的例子作为参考,系统想要在同一个Reducer中计算所有地区不同型号手机的销售情况。我们可以把地区名作为KEY,把销售数量和手机类型转换成 MapWritable 作为 VALUE。当数据输入后,不是直接把数据传输到 Reducer ,而是通过Combiner 把Mapper中不同的型号手机的销售数量进行聚合计算,把5种型号手机的销售总数算好后传输给Reducer。在Reducer中再把来源于不同 Combiner 的数据进行求和,得出最后结果。

注意  :   

MapWritable 是 系统自带的 Writable 集合类中的其中一个,它实现了  java.util.Map<Writable,Writable> 接口,以单字节充当类型数据的索引,常用于枚举集合的元素。

 

  1 public class SaleManager extends Configured implements Tool{
  2     private static IntWritable TYPE=new IntWritable(0);
  3     private static IntWritable VALUE=new IntWritable(1);
  4     private static IntWritable IPHONE7=new IntWritable(2);
  5     private static IntWritable IPHONE7_PLUS=new IntWritable(3);
  6     private static IntWritable IPHONE8=new IntWritable(4);
  7     private static IntWritable IPHONE8_PLUS=new IntWritable(5);
  8     private static IntWritable IPHONEX=new IntWritable(6);
  9     
 10     public static class MyMapper extends Mapper<LongWritable,Text,Text,MapWritable>{
 11         
 12         public void map(LongWritable key,Text value,Context context)
 13             throws IOException,InterruptedException{
 14             String data=value.toString();
 15             Phone iPhone=new Phone(data);
 16             context.write(new Text(iPhone.getArea()), getMapWritable(iPhone.getType(), iPhone.getCount()));
 17         }      
 18         
 19         private MapWritable getMapWritable(String type,Integer count){
 20             Text _type=new Text(type);
 21             Text _count=new Text(count.toString());
 22             MapWritable mapWritable=new MapWritable();
 23             mapWritable.put(TYPE,_type);
 24             mapWritable.put(VALUE,_count);
 25             return mapWritable;
 26         }
 27     }
 28     
 29     public static class MyCombiner extends Reducer<Text,MapWritable,Text,MapWritable> {
 30         public void reduce(Text key,Iterable<MapWritable> values,Context context) 
 31             throws IOException, InterruptedException{
 32                int iPhone7=0;
 33                int iPhone7_PLUS=0;
 34                int iPhone8=0;
 35                int iPhone8_PLUS=0;
 36                int iPhoneX=0;
 37                //对同一个Mapper所处理的不同型号的手机数据进行初步统计
 38                for(MapWritable value:values){
 39                     String type=value.get(TYPE).toString();
 40                     Integer count=Integer.valueOf(value.get(VALUE).toString());
 41                     if(type.contentEquals("iPhone7"))
 42                         iPhone7+=count;
 43                     if(type.contentEquals("iPhone7_PLUS"))
 44                         iPhone7_PLUS+=count;
 45                     if(type.contentEquals("iPhone8"))
 46                         iPhone8+=count;
 47                     if(type.contentEquals("iPhone8_PLUS"))
 48                         iPhone8_PLUS+=count;
 49                     if(type.contentEquals("iPhoneX"))
 50                         iPhoneX+=count;
 51                 }                
 52                 MapWritable mapWritable=new MapWritable();
 53                 mapWritable.put(IPHONE7, new IntWritable(iPhone7));
 54                 mapWritable.put(IPHONE7_PLUS, new IntWritable(iPhone7_PLUS));
 55                 mapWritable.put(IPHONE8, new IntWritable(iPhone8));
 56                 mapWritable.put(IPHONE8_PLUS, new IntWritable(iPhone8_PLUS));
 57                 mapWritable.put(IPHONEX, new IntWritable(iPhoneX));
 58                 context.write(key,mapWritable);
 59         }
 60    }
 61     
 62     public static class MyReducer extends Reducer<Text,MapWritable,Text,Text>{
 63         public void reduce(Text key,Iterable<MapWritable> values,Context context)
 64             throws IOException,Interrupted

以上是关于Hadoop 综合揭秘——MapReduce 编程实例(详细介绍 CombinePartitionerWritableComparableWritableComparator 使用方式)的主要内容,如果未能解决你的问题,请参考以下文章

分享知识-快乐自己:大数据(hadoop)环境搭建

分享知识-快乐自己:运行(wordcount)案例

分享知识-快乐自己:Liunx-大数据(Hadoop)初始化环境搭建

hadoop综合实验(对日志的处理mapreduce保存到csv)

实训任务4:Hadoop综合操作

实训任务4:Hadoop综合操作