大数据MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

Posted Android Graphics

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据MapReduce(单词计数;二次排序;计数器;join;分布式缓存)相关的知识,希望对你有一定的参考价值。

    前言:

    根据前面的几篇博客学习,现在可以进行MapReduce学习了。本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存)。

 

一 概述

定义

MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间。

适用范围:数据量大,但是数据种类小可以放入内存。

基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约。

理解MapReduce和Yarn:在新版Hadoop中,Yarn作为一个资源管理调度框架,是Hadoop下MapReduce程序运行的生存环境。其实MapRuduce除了可以运行Yarn框架下,也可以运行在诸如Mesos,Corona之类的调度框架上,使用不同的调度框架,需要针对Hadoop做不同的适配。(了解YARN见上一篇博客>>  http://www.cnblogs.com/1996swg/p/7286490.html  )

MapReduce编程

编写在Hadoop中依赖Yarn框架执行的MapReduce程序,并不需要自己开发MRAppMaster和YARNRunner,因为Hadoop已经默认提供通用的YARNRunner和MRAppMaster程序, 大部分情况下只需要编写相应的Map处理和Reduce处理过程的业务程序即可。

编写一个MapReduce程序并不复杂,关键点在于掌握分布式的编程思想和方法,主要将计算过程分为以下五个步骤:

(1)迭代。遍历输入数据,并将之解析成key/value对。

(2)将输入key/value对映射(map)成另外一些key/value对。

(3)依据key对中间数据进行分组(grouping)。

(4)以组为单位对数据进行归约(reduce)。

(5)迭代。将最终产生的key/value对保存到输出文件中。

 Java API解析

(1)InputFormat:用于描述输入数据的格式,常用的为TextInputFormat提供如下两个功能:

数据切分: 按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。

为Mapper提供数据:给定某个split,能将其解析成一个个key/value对。

(2)OutputFormat:用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。

(3)Mapper/Reducer: Mapper/Reducer中封装了应用程序的数据处理逻辑。

(4)Writable:Hadoop自定义的序列化接口。实现该类的接口可以用作MapReduce过程中的value数据使用。

(5)WritableComparable:在Writable基础上继承了Comparable接口,实现该类的接口可以用作MapReduce过程中的key数据使用。(因为key包含了比较排序的操作)。

二 单词计数实验

  !单词计数文件word

    

  1‘  启动Hadoop  执行命令启动(前面博客)部署好的Hadoop系统。

        命令:  

              cd /usr/cstor/hadoop/

               sbin/start-all.sh

        

  2’  验证HDFS上没有wordcount的文件夹  此时HDFS上应该是没有wordcount文件夹。

                 cd /usr/cstor/hadoop/

                 bin/hadoop fs -ls /                    #查看HDFS上根目录文件 /

        

  3‘  上传数据文件到HDFS

               cd /usr/cstor/hadoop/

               bin/hadoop fs -put /root/data/5/word  /

        

  4’  编写MapReduce程序

        在eclipse新建mapreduce项目(方法见博客>> http://www.cnblogs.com/1996swg/p/7286136.html ),新建class类WordCount

        主要编写Map和Reduce类,其中Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法;Reduce过程需要继承org.apache.hadoop.mapreduce包中Reduce类,并重写其reduce方法。   

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
10 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
11  
12 import java.io.IOException;
13 import java.util.StringTokenizer;
14  
15  
16 public class WordCount {
17     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
18         private final static IntWritable one = new IntWritable(1);
19         private Text word = new Text();
20         //map方法,划分一行文本,读一个单词写出一个<单词,1>
21         public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
22             StringTokenizer itr = new StringTokenizer(value.toString());
23             while (itr.hasMoreTokens()) {
24                 word.set(itr.nextToken());
25                 context.write(word, one);//写出<单词,1>
26             }}}
27     //定义reduce类,对相同的单词,把它们<K,VList>中的VList值全部相加
28     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
29         private IntWritable result = new IntWritable();
30         public void reduce(Text key, Iterable<IntWritable> values,Context context)
31                 throws IOException, InterruptedException {
32             int sum = 0;
33             for (IntWritable val : values) {
34                 sum += val.get();//相当于<Hello,1><Hello,1>,将两个1相加
35             }
36             result.set(sum);
37             context.write(key, result);//写出这个单词,和这个单词出现次数<单词,单词出现次数>
38         }}
39     public static void main(String[] args) throws Exception {//主方法,函数入口
40         Configuration conf = new Configuration();           //实例化配置文件类
41         Job job = new Job(conf, "WordCount");             //实例化Job类
42         job.setInputFormatClass(TextInputFormat.class);     //指定使用默认输入格式类
43         TextInputFormat.setInputPaths(job, args[0]);      //设置待处理文件的位置
44         job.setJarByClass(WordCount.class);               //设置主类名
45         job.setMapperClass(TokenizerMapper.class);        //指定使用上述自定义Map类
46         job.setCombinerClass(IntSumReducer.class);        //指定开启Combiner函数
47         job.setMapOutputKeyClass(Text.class);            //指定Map类输出的<K,V>,K类型
48         job.setMapOutputValueClass(IntWritable.class);     //指定Map类输出的<K,V>,V类型
49         job.setPartitionerClass(HashPartitioner.class);       //指定使用默认的HashPartitioner类
50         job.setReducerClass(IntSumReducer.class);         //指定使用上述自定义Reduce类
51         job.setNumReduceTasks(Integer.parseInt(args[2]));  //指定Reduce个数
52         job.setOutputKeyClass(Text.class);                //指定Reduce类输出的<K,V>,K类型
53         job.setOutputValueClass(Text.class);               //指定Reduce类输出的<K,V>,V类型
54         job.setOutputFormatClass(TextOutputFormat.class);  //指定使用默认输出格式类
55         TextOutputFormat.setOutputPath(job, new Path(args[1]));    //设置输出结果文件位置
56         System.exit(job.waitForCompletion(true) ? 0 : 1);    //提交任务并监控任务状态
57     }
58 }

 

 5\'  打包成jar文件上传

      假定打包后的文件名为hdpAction.jar,主类WordCount位于包njupt下,则可使用如下命令向YARN集群提交本应用。

             ./yarn  jar  hdpAction.jar  mapreduce1.WordCount  /word  /wordcount 1

      其中“yarn”为命令,“jar”为命令参数,后面紧跟打包后的代码地址,“mapreduce1”为包名,“WordCount”为主类名,“/word”为输入文件在HDFS中的位置,/wordcount为输出文件在HDFS中的位置。

       注意:如果打包时明确了主类,那么在输入命令时,就无需输入mapreduce1.WordCount来确定主类!

             

      结果显示:  

 1 17/08/05 03:37:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 2 17/08/05 03:37:06 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 3 17/08/05 03:37:06 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
 4 17/08/05 03:37:07 INFO input.FileInputFormat: Total input paths to process : 1
 5 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: number of splits:1
 6 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0004
 7 17/08/05 03:37:07 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0004
 8 17/08/05 03:37:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0004/
 9 17/08/05 03:37:07 INFO mapreduce.Job: Running job: job_1501872322130_0004
10 17/08/05 03:37:12 INFO mapreduce.Job: Job job_1501872322130_0004 running in uber mode : false
11 17/08/05 03:37:12 INFO mapreduce.Job:  map 0% reduce 0%
12 17/08/05 03:37:16 INFO mapreduce.Job:  map 100% reduce 0%
13 17/08/05 03:37:22 INFO mapreduce.Job:  map 100% reduce 100%
14 17/08/05 03:37:22 INFO mapreduce.Job: Job job_1501872322130_0004 completed successfully
15 17/08/05 03:37:22 INFO mapreduce.Job: Counters: 49
16     File System Counters
17         FILE: Number of bytes read=54
18         FILE: Number of bytes written=232239
19         FILE: Number of read operations=0
20         FILE: Number of large read operations=0
21         FILE: Number of write operations=0
22         HDFS: Number of bytes read=166
23         HDFS: Number of bytes written=28
24         HDFS: Number of read operations=6
25         HDFS: Number of large read operations=0
26         HDFS: Number of write operations=2
27     Job Counters 
28         Launched map tasks=1
29         Launched reduce tasks=1
30         Data-local map tasks=1
31         Total time spent by all maps in occupied slots (ms)=2275
32         Total time spent by all reduces in occupied slots (ms)=2598
33         Total time spent by all map tasks (ms)=2275
34         Total time spent by all reduce tasks (ms)=2598
35         Total vcore-seconds taken by all map tasks=2275
36         Total vcore-seconds taken by all reduce tasks=2598
37         Total megabyte-seconds taken by all map tasks=2329600
38         Total megabyte-seconds taken by all reduce tasks=2660352
39     Map-Reduce Framework
40         Map input records=8
41         Map output records=20
42         Map output bytes=154
43         Map output materialized bytes=54
44         Input split bytes=88
45         Combine input records=20
46         Combine output records=5
47         Reduce input groups=5
48         Reduce shuffle bytes=54
49         Reduce input records=5
50         Reduce output records=5
51         Spilled Records=10
52         Shuffled Maps =1
53         Failed Shuffles=0
54         Merged Map outputs=1
55         GC time elapsed (ms)=47
56         CPU time spent (ms)=1260
57         Physical memory (bytes) snapshot=421257216
58         Virtual memory (bytes) snapshot=1647611904
59         Total committed heap usage (bytes)=402653184
60     Shuffle Errors
61         BAD_ID=0
62         CONNECTION=0
63         IO_ERROR=0
64         WRONG_LENGTH=0
65         WRONG_MAP=0
66         WRONG_REDUCE=0
67     File Input Format Counters 
68         Bytes Read=78
69     File Output Format Counters 
70         Bytes Written=28

     >生成结果文件wordcount目录下的part-r-00000,用hadoop命令查看生成文件

    

三  二次排序

      MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到MR计算过程之中,而不是单独来做。

      二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

      !需排序文件secsortdata.txt

      

1\'  编写程序IntPair 类和主类 SecondarySort

    同第一个实验在eclipse编程的创建方法!

    程序主要难点在于排序和聚合。

      对于排序我们需要定义一个IntPair类用于数据的存储,并在IntPair类内部自定义Comparator类以实现第一字段和第二字段的比较。

      对于聚合我们需要定义一个FirstPartitioner类,在FirstPartitioner类内部指定聚合规则为第一字段。

      此外,我们还需要开启MapReduce框架自定义Partitioner 功能和GroupingComparator功能。

    Inpair.java

 1 package mr;
 2  
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6  
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.WritableComparable;
 9  
10 public class IntPair implements WritableComparable<IntPair> {
11     private IntWritable first;
12     private IntWritable second;
13     public void set(IntWritable first, IntWritable second) {
14         this.first = first;
15         this.second = second;
16     }
17     //注意:需要添加无参的构造方法,否则反射时会报错。
18     public IntPair() {
19         set(new IntWritable(), new IntWritable());
20     }
21     public IntPair(int first, int second) {
22         set(new IntWritable(first), new IntWritable(second));
23     }
24     public IntPair(IntWritable first, IntWritable second) {
25         set(first, second);
26     }
27     public IntWritable getFirst() {
28         return first;
29     }
30     public void setFirst(IntWritable first) {
31         this.first = first;
32     }
33     public IntWritable getSecond() {
34         return second;
35     }
36     public void setSecond(IntWritable second) {
37         this.second = second;
38     }
39     public void write(DataOutput out) throws IOException {
40         first.write(out);
41         second.write(out);
42     }
43     public void readFields(DataInput in) throws IOException {
44         first.readFields(in);
45         second.readFields(in);
46     }
47     public int hashCode() {
48         return first.hashCode() * 163 + second.hashCode();
49     }
50     public boolean equals(Object o) {
51         if (o instanceof IntPair) {
52             IntPair tp = (IntPair) o;
53             return first.equals(tp.first) && second.equals(tp.second);
54         }
55         return false;
56     }
57     public String toString() {
58         return first + "\\t" + second;
59     }
60     public int compareTo(IntPair tp) {
61         int cmp = first.compareTo(tp.first);
62         if (cmp != 0) {
63             return cmp;
64         }
65         return second.compareTo(tp.second);
66     }
67 }

 

    secsortdata.java

 1 package mr;
 2  
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.io.WritableComparator;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Partitioner;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18  
19 public class SecondarySort {
20     static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
21         @Override
22         protected void map(LongWritable key, Text value, Context context)
23                 throws IOException, InterruptedException {
24             String[] fields = value.toString().split("\\t");
25             int field1 = Integer.parseInt(fields[0]);
26             int field2 = Integer.parseInt(fields[1]);
27             context.write(new IntPair(field1,field2), NullWritable.get());
28         }
29     }
30     static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
31         //private static final Text SEPARATOR = new Text("------------------------------------------------");
32         @Override
33         protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
34                 throws IOException, InterruptedException {
35             context.write(key, NullWritable.get());
36         }
37     }
38     public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
39         public int getPartition(IntPair key, NullWritable value,
40                 int numPartitions) {
41             return Math.abs(key.getFirst().get()) % numPartitions;
42         }
43     }
44     //如果不添加这个类,默认第一列和第二列都是升序排序的。
45 //这个类的作用是使第一列升序排序,第二列降序排序
46     public static class KeyComparator extends WritableComparator {
47         //无参构造器必须加上,否则报错。
48         protected KeyComparator() {
49             super(IntPair.class, true);
50         }
51         public int compare(WritableComparable a, WritableComparable b) {
52             IntPair ip1 = (IntPair) a;
53             IntPair ip2 = (IntPair) b;
54             //第一列按升序排序
55             int cmp = ip1.getFirst().compareTo(ip2.getFirst());
56             if (cmp != 0) {
57                 return cmp;
58             }
59             //在第一列相等的情况下,第二列按倒序排序
60             return -ip1.getSecond().compareTo(ip2.getSecond());
61         }
62     }
63     //入口程序
64     public static void main(String[] args) throws Exception {
65         Configuration conf = new Configuration();
66         Job job = Job.getInstance(conf);
67         job.setJarByClass(SecondarySort.class);
68         //设置Mapper的相关属性
69         job.setMapperClass(TheMapper.class);
70         //当Mapper中的输出的key和value的类型和Reduce输出
71 //的key和value的类型相同时,以下两句可以省略。
72         //job.setMapOutputKeyClass(IntPair.class);
73         //job.setMapOutputValueClass(NullWritable.class);
74         FileInputFormat.setInputPaths(job, new Path(args[0]));
75         //设置分区的相关属性
76         job.setPartitionerClass(FirstPartitioner.class);
77         //在map中对key进行排序
78         job.setSortComparatorClass(KeyComparator.class);
79         //job.setGroupingComparatorClass(GroupComparator.class);
80         //设置Reducer的相关属性
81         job.setReducerClass(TheReducer.class);
82         job.setOutputKeyClass(IntPair.class);
83         job.setOutputValueClass(NullWritable.class);
84         FileOutputFormat.setOutputPath(job, new Path(args[1]));
85         //设置Reducer数量
86         int reduceNum = 1;
87         if(args.length >= 3 && args[2] != null){
88             reduceNum = Integer.parseInt(args[2]);
89         }
90         job.setNumReduceTasks(reduceNum);
91         job.waitForCompletion(true);
92     }
93 }

 

 2’  打包提交

      使用Eclipse开发工具将该代码打包,选择主类为mr.Secondary。如果没有指定主类,那么在执行时就要指定须执行的类。假定打包后的文件名为Secondary.jar,主类SecondarySort位于包mr下,则可使用如下命令向Hadoop集群提交本应用。

       bin/hadoop jar hdpAction6.jar mr.Secondary /user/mapreduce/secsort/in/secsortdata.txt  /user/mapreduce/secsort/out  1

      其中“hadoop”为命令,“jar”为命令参数,后面紧跟打的包,/user/mapreduce/secsort/in/secsortdata.txt”为输入文件在HDFS中的位置,如果HDFS中没有这个文件,则自己自行上传。“/user/mapreduce/secsort/out/”为输出文件在HDFS中的位置,“1”为Reduce个数。

      如果打包时已经设定了主类,此时命令中无需再次输入定义主类!

      (上传secsortdata.txt到HDFS  命令: ” hadoop fs -put  目标文件包括路径  hdfs路径  “)

      

     显示结果:

 1 [root@master hadoop]# bin/hadoop jar SecondarySort.jar  /secsortdata.txt  /user/mapreduce/secsort/out  1
 2 Not a valid JAR: /usr/cstor/hadoop/SecondarySort.jar
 3 [root@master hadoop]# bin/hadoop jar hdpAction6.jar  /secsortdata.txt  /user/mapreduce/secsort/out  1
 4 17/08/05 04:05:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 5 17/08/05 04:05:49 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 6 17/08/05 04:05:49 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
 7 17/08/05 04:05:50 INFO input.FileInputFormat: Total input paths to process : 1
 8 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: number of splits:1
 9 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0007
10 17/08/05 04:05:50 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0007
11 17/08/05 04:05:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0007/
12 17/08/05 04:05:50 INFO mapreduce.Job: Running job: job_1501872322130_0007
13 17/08/05 04:05:56 INFO mapreduce.Job: Job job_1501872322130_0007 running in uber mode : false
14 17/08/05 04:05:56 INFO mapreduce.Job:  map 0% reduce 0%
15 17/08/05 04:06:00 INFO mapreduce.Job:  map 100% reduce 0%
16 17/08/05 04:06:05 INFO mapreduce.Job:  map 100% reduce 100%
17 17/08/05 04:06:06 INFO mapreduce.Job: Job job_1501872322130_0007 completed successfully
18 17/08/05 04:06:07 INFO mapreduce.Job: Counters: 49
19     File System Counters
20         FILE: Number of bytes read=106
21         FILE: Number of bytes written=230897
22         FILE: Number of read operations=0
23         FILE: Number of large read operations=0
24         FILE: Number of write operations=0
25         HDFS: Number of bytes read=159
26         HDFS: Number of bytes written=60
27         HDFS: Number of read operations=6
28         HDFS: Number of large read operations=0
29         HDFS: Number of write operations=2
30     Job Counters 
31         Launched map tasks=1
32         Launched reduce tasks=1
33         Data-lo

以上是关于大数据MapReduce(单词计数;二次排序;计数器;join;分布式缓存)的主要内容,如果未能解决你的问题,请参考以下文章

跟A君学大数据-用MapReduce实现表关联

MapReduce之单词计数

HDFS 手写mapreduce单词计数框架

学习笔记Hadoop—— MapReduce开发入门

基于 MapReduce 的单词计数(Word Count)的实现

大数据系列之分布式计算批处理引擎MapReduce实践