MapReduce编程实践
Posted jing-wang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce编程实践相关的知识,希望对你有一定的参考价值。
一、MapReduce编程思想
学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段:
Map阶段:读取原始数据,形成key-value数据(map方法)
Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法)
它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,对map阶段的具体实现是map task,对reduce阶段的实现是reduce task。这些框架已经为我们提供了一些通用功能的实现,让我们专注于数据处理的逻辑,而不考虑分布式的具体实现,比如读取文件、写文件、数据分发等。我们要做的工作就是在这些编程框架下,来实现我们的具体需求。
下面我们先介绍一些map task和reduce task中的一些具体实现:
二、MapTask和ReduceTask
2.1 Map Task
读数据:利用InputFormat组件完成数据的读取。
InputFormat-->TextInputFormat 读取文本文件的具体实现
-->SequenceFileInputFormat 读取Sequence文件
-->DBInputFormat 读数据库
处理数据:这一阶段将读取到的数据按照规则进行处理,生成key-value形式的结果。maptask通过调用用Mapper类的map方法实现对数据的处理。
分区:这一阶段主要是把map阶段产生的key-value数据进行分区,以分发给不同的reduce task来处理,使用的是Partitioner类。maptask通过调用Partitioner类的getPartition()方法来决定如何划分数据给不同的reduce task。
排序:这一阶段,对key-value数据做排序。maptask会按照key对数据进行排序,排序时调用key.compareTo()方法来实现对key-value数据排序。
2.2 Reduce Task
读数据:这一阶段通过http方式从maptask产生的数据文件中下载属于自己的“区”的数据。由于一个区的数据可能来自多个maptask,所以reduce还要把这些分散的数据进行合并(归并排序)
处理数据:一个reduce task中,处理刚才下载到自己本地的数据。通过调用GroupingComparator的compare()方法来判断文件中的哪些key-value属于同一组。然后将这一组数传给Reducer类的reduce()方法聚合一次。
输出结果:调用OutputFormat组件将结果key-value数据写出去。
Outputformat --> TextOutputFormat 写文本文件(会把一个key-value对写一行,分隔符为制表符\t
--> SequenceFileOutputFormat 写Sequence文件(直接将key-value对象序列化到文件中)
--> DBOutputFormat
下面介绍下利用MapReduce框架下的一般编程过程。我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中。我们编写的MapReduce的job客户端主要包括三个部分,Mapper 、 Reducer和JobSubmitter,三个部分分别完成MR程序的map逻辑、reduce逻辑以及将我们编写的job程序提交给集群。下面分别介绍这三个部分如何实现。
三、Hadoop中MapReduce框架下的一般编程步骤
Mapper:创建类,该类要实现Mapper父类,复写read()方法,在方法内实现当前工程中的map逻辑。
Reducer:创建类,继承Reducer父类,复写reduce()方法,方法内实现当前工程中的reduce逻辑。
jobSubmitter:这是job在集群上实际运行的类,主要是通过main方法,封装job相关参数,并把job提交。jobsubmitter内一般包括以下操作
step1:创建Configuration对象,并通过创建的对象对集群进行配置,同时支持用户自定义一些变量并配置。这一步有些像我们集群搭建的时候对$haoop_home/etc/hadoop/*下的一些文件进行的配置。
step2:获得job对象,并通过job对象对我们job运行进行一些配置。例如,设置集群运行的jar文件、设置实际执行map和reduce的类等,下面列出一些必要设置和可选设置。
Configuration conf = new Configuration(); //创建集群配置对象。 Job job = Job.getInstance(conf);//根据配置对象获取一个job客户端实例。 job.setJarByClass(JobSubmitter.class);//设置集群上job执行的类 job.setMapperClass(FlowCountMapper.class);//设置job执行时使用的Mapper类 job.setReducerClass(FlowCountReducer.class);//设置job执行时使用的Reducer类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input")); FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri")); //设置maptask做数据分发时使用的分发逻辑类,如果不指定,默认使用hashpar job.setPartitionerClass(ProvincePartitioner.class); job.setNumReduceTasks(4);//自定义的分发逻辑下,可能产生n个分区,所以reducetask的数量需要是n boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1);
一般实践中,可以定义一个类,其中添加main方法对job进行提交,并在其中定义静态内部类maper和reduce类。
四、MapReduce框架中的可自定义项
遇到一些复杂的需求,需要我们自定义实现一些组件
2.1 自定义排序规则
2.2 自定义序列化数据类型
五、MR程序的调试、执行方式
5.1 提交到linux运行
5.2 Win本地执行
以上是关于MapReduce编程实践的主要内容,如果未能解决你的问题,请参考以下文章
实验5 MapReduce初级编程实践——编程实现文件合并和去重操作
实验5 MapReduce初级编程实践——编程实现文件合并和去重操作