MapReduce编程实践

Posted jing-wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce编程实践相关的知识,希望对你有一定的参考价值。

一、MapReduce编程思想

学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段:

Map阶段:读取原始数据,形成key-value数据(map方法)

Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法)

它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,对map阶段的具体实现是map task,对reduce阶段的实现是reduce task。这些框架已经为我们提供了一些通用功能的实现,让我们专注于数据处理的逻辑,而不考虑分布式的具体实现,比如读取文件、写文件、数据分发等。我们要做的工作就是在这些编程框架下,来实现我们的具体需求。

下面我们先介绍一些map task和reduce task中的一些具体实现:

二、MapTask和ReduceTask

2.1 Map Task

读数据:利用InputFormat组件完成数据的读取。

    InputFormat-->TextInputFormat 读取文本文件的具体实现

            -->SequenceFileInputFormat 读取Sequence文件

            -->DBInputFormat 读数据库

处理数据:这一阶段将读取到的数据按照规则进行处理,生成key-value形式的结果。maptask通过调用用Mapper类的map方法实现对数据的处理。

分区:这一阶段主要是把map阶段产生的key-value数据进行分区,以分发给不同的reduce task来处理,使用的是Partitioner类。maptask通过调用Partitioner类的getPartition()方法来决定如何划分数据给不同的reduce task。

排序:这一阶段,对key-value数据做排序。maptask会按照key对数据进行排序,排序时调用key.compareTo()方法来实现对key-value数据排序。

2.2 Reduce Task

读数据:这一阶段通过http方式从maptask产生的数据文件中下载属于自己的“区”的数据。由于一个区的数据可能来自多个maptask,所以reduce还要把这些分散的数据进行合并(归并排序)

处理数据:一个reduce task中,处理刚才下载到自己本地的数据。通过调用GroupingComparator的compare()方法来判断文件中的哪些key-value属于同一组。然后将这一组数传给Reducer类的reduce()方法聚合一次。

输出结果:调用OutputFormat组件将结果key-value数据写出去。

    Outputformat --> TextOutputFormat 写文本文件(会把一个key-value对写一行,分隔符为制表符\t

          --> SequenceFileOutputFormat 写Sequence文件(直接将key-value对象序列化到文件中)

          --> DBOutputFormat 

下面介绍下利用MapReduce框架下的一般编程过程。我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中。我们编写的MapReduce的job客户端主要包括三个部分,Mapper 、 Reducer和JobSubmitter,三个部分分别完成MR程序的map逻辑、reduce逻辑以及将我们编写的job程序提交给集群。下面分别介绍这三个部分如何实现。

三、Hadoop中MapReduce框架下的一般编程步骤

Mapper:创建类,该类要实现Mapper父类,复写read()方法,在方法内实现当前工程中的map逻辑。

Reducer:创建类,继承Reducer父类,复写reduce()方法,方法内实现当前工程中的reduce逻辑。

jobSubmitter:这是job在集群上实际运行的类,主要是通过main方法,封装job相关参数,并把job提交。jobsubmitter内一般包括以下操作

step1:创建Configuration对象,并通过创建的对象对集群进行配置,同时支持用户自定义一些变量并配置。这一步有些像我们集群搭建的时候对$haoop_home/etc/hadoop/*下的一些文件进行的配置。

step2:获得job对象,并通过job对象对我们job运行进行一些配置。例如,设置集群运行的jar文件、设置实际执行map和reduce的类等,下面列出一些必要设置和可选设置。

        Configuration conf = new Configuration(); //创建集群配置对象。
        Job job = Job.getInstance(conf);//根据配置对象获取一个job客户端实例。
        job.setJarByClass(JobSubmitter.class);//设置集群上job执行的类
        job.setMapperClass(FlowCountMapper.class);//设置job执行时使用的Mapper类
        job.setReducerClass(FlowCountReducer.class);//设置job执行时使用的Reducer类

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));
        FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));


        //设置maptask做数据分发时使用的分发逻辑类,如果不指定,默认使用hashpar
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(4);//自定义的分发逻辑下,可能产生n个分区,所以reducetask的数量需要是n

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0:-1);

 一般实践中,可以定义一个类,其中添加main方法对job进行提交,并在其中定义静态内部类maper和reduce类。

四、MapReduce框架中的可自定义项

遇到一些复杂的需求,需要我们自定义实现一些组件

2.1 自定义排序规则

2.2 自定义序列化数据类型

五、MR程序的调试、执行方式

5.1 提交到linux运行

5.2 Win本地执行

 

以上是关于MapReduce编程实践的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce 2.0编程实践(涉及多语言编程)

实验5 MapReduce初级编程实践——编程实现文件合并和去重操作

实验5 MapReduce初级编程实践——编程实现文件合并和去重操作

小白视角大数据基础实践 MapReduce编程基础操作

MapReduce编程实践——WordCount运行实例(Python实现)

实验5 MapReduce初级编程实践——对给定的表格进行信息挖掘