打怪升级之小白的大数据之旅(四十八)<初识MapReduce>

Posted GaryLea

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级之小白的大数据之旅(四十八)<初识MapReduce>相关的知识,希望对你有一定的参考价值。

打怪升级之小白的大数据之旅(四十八)

初识MapReduce

上次回顾

在Hadoop开篇的时候,说过,大数据的主要用途就是对海量数据的存储与计算分析,通过学习Hadoop的HDFS,我们已经知道了海量数据可以通过集群的方式来进行分块存储,今天将为大家带来Hadoop的第二个核心模块–MapReduce,我们通常称它为MR

MapReduce概述

MapReduce定义

  • 我们再次回顾一下Hadoop开篇的那张图
    在这里插入图片描述
  • 我在前面说过,Hadoop2.0之后,MapReduce就对它自己内部的计算和资源调度进行了拆分,所以我们现在学习的MapReduce就是管理数据的计算这一块
  • HDFS存储数据是利用分布式进行存储,那么MapReduce对数据的计算同样也是,MapReduce是一个分布式运算程序的编程框架
  • 它的核心功能是将用户编写的逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上
  • 业务逻辑代码就是我们需要对数据进行怎样的运算得到我们想要的结果,自带默认模块就是它内部定义的一些辅助我们进行运算的模块

MapReduce的优缺点

每个程序的设计都有它的应用场景,所以不可能事事完美,下面就为大家介绍一下MR的优缺点

优点

  • 易于编程
    • 它简单的实现一些接口,可以完成一个分布式程序,简单来说,我们写一个分布式程序就和前面学习的程序逻辑是一样的,通过简单的串行程序通过MR就可以完成分布式运行
  • 扩展性强
    • 当我们计算的资源不够用时,可以通过增加节点的方法扩展它的计算能力而无需修改我们的编程逻辑
  • 高容错性
    • 就和HDFS存储时一样,当其中一个节点中挂掉了,我们可以将它的计算任务转移到另一个节点上运行,这样就不至于这个任务运行失败,这个过程是Hadoop内部完成的
  • 适合PB级以上海量数据的离线处理
    • 可以实现上千服务器集群并发工作,提供数据处理能力

缺点

  • 不擅长实时计算
    • MR无法像mysql一样,在毫秒内返回结果
    • 实时计算我们后面会学习fink框架,它就是专门用来进行实时的数据计算
  • 不擅长流式计算
    • 流式计算的意思就是输入的数据是动态的,它会动态变化
    • MR的输入数据是静态的,不能动态变化,这是因为它自身的设计特点
  • 不擅长有向图(DAG)计算
    • 多个程序存在依赖关系,后一个程序的输入为前一个程序的输出
    • 因为MR在运算后会将输出结果写入到磁盘中,这么做会造成大量的磁盘IO,所以也不建议这么做

好了,总结一下,MR是一个用于离线的静态数据的计算框架,它可以通过简单的串行编程来实现分布式计算的功能,举个栗子,我们可以用它来计算分析我们历史的数据,比如去年一年的用户消费情况、历史中某某股票的交易走势等等

MapReduce核心思想

  • 核心思想就是它整个的运行逻辑,我先为大家通过一个示例来说明它整个的运行流程,然后针对里面的每个知识点进行讲解,这样通过整体性的认知来学习MapReduce可以有一个清晰地思路
  • MapReduce的工作分为Map和Reduce两个阶段,Map阶段的任务就是分,将同一个任务进行分布式的运行,reduce阶段的任务就是合,将分类运算后的结果进行统计,然后对完成的数据进行存储
    在这里插入图片描述
  • 我有三斤芝麻,里面是白芝麻和黑芝麻混装,我需要将它们区分开,并使用两个容器进行存储起来,并且统计一下不同芝麻粒的总数
  • 如果我一个人来做这个事情,那么我可能需要很多的时间,MR的核心思想就是下面这样:
    • 我找了西游记的几人来帮忙,我将我的混装芝麻分成了三分,分别给了悟空、八戒、唐僧三人,让他们来帮我进行分类和统计,他们做的这个工作就是MR中的map阶段
    • 白龙马和沙僧的任务就是对分好的芝麻进行存储和整个数量的统计

MapReduce进程

前面是MapReduce的运行原理,下面就是MapReduce在运行时具体工作的模块
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程

Hadoop序列化

序列化这个知识点是在Java中的IO流时介绍过,我们再来回顾一下

  • 序列化就是把内存中的对象转换成字节序列,这样就可以在磁盘中进行持久化的保存,我们也可以通过网络来进行对象的传输
  • 通俗一点来说,我们程序中所创建、使用的对象都是运行在内存中的,该对象只能在我们当前的电脑中生存,如果需要持久化保存或者将该对象传输到别的电脑中,就需要利用序列化来进行
  • 因为Java序列化是重量级的,它里面还有封装一些我们Hadoop用不到的接口、API,所以Hadoop为自己量身定制了一个序列化的机制

常用数据序列化类型

我们在IO的学习时说到过,我们要对程序中的对象进行保存就需要使用序列化,所以在Hadoop中,它定义了自己的序列化数据类型

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntegerIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable

自定义序列化

在企业开发中,我们只使用基本的序列化类型当然满足不了我们的需求,因此我们就要学习如何自定义序列化,我们以如何序列化一个Java Bean来举例,实现对象序列化的步骤如下:

  • 必须实现Writable接口

  • 反序列化时,需要通过反射调用空参构造函数,因此,我们必须实现空参构造器(下面的例子是我们下面的实例中的)

  • 重写序列化的方法

    @Override
    public void write(DataOutput out) throws IOException {
    	//需要序列化的属性
    	out.writeLong(upFlow);
    	out.writeLong(downFlow);
    	out.writeLong(sumFlow);
    }
    
  • 重写反序列化的方法

    @Override
    public void readFields(DataInput in) throws IOException {
    	upFlow = in.readLong();
    	downFlow = in.readLong();
    	sumFlow = in.readLong();
    }
    
  • 反序列化和序列化的顺序必须完全一致

  • 要想把结果显示在文件中,需要重写toString(),可用”\\t”分开,方便后续用

  • 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序,Shuffle我后面会详细讲解

    @Override
    public int compareTo(FlowBean o) {
    	// 倒序排列,从大到小
    	return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }
    

案例实操

学习完上面的知识点,为了便于后面对MapReduce框架原理的讲解,我们首先要进行下面两个案例的实操

MapReduce程序的核心流程

前面核心思想是按照一个现实的例子来举例的,下面我按照MapReduce程序真正的流程来为大家介绍一下
在这里插入图片描述
(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行
(5)map阶段中真正实现任务的模块就是MapTask,Reduce阶段真正实现任务的模块是ReduceTask

实现官方的WordCount

  • 还记得在本地模式、集群模式我们一直使用的那个统计一个文件中单词出现次数的案例么?今天我们就来自己实现一个WordCount,下面的编码步骤是框架规定的,大家多写几遍,等我后面讲解MapReduce框架原理的时候就懂了

测试数据

// 文件名称:
hello.txt
// 内容如下
hello hello
bigdata hello
I like bigdata
I like java
I like Hadoop

编码思路

  • 按照前面我们学习的MapReduce核心思想,我们可以将这个需求分成三步
    • 一个主程序,我们定义为WcDriver类
    • 一个用于实现map的类 WcMapper
    • 一个用于实现reduce的类 WcReducer
  • WcMapper
    • 将MapTask传给我们的文本内容转换成java的数据类型
    • 根据空格来将每一行的单词进行切分
    • 将单词输出为k,v(hello,1)(hello,1)
  • WcReducer
    • 汇总各个key的个数
    • 输出key的总次数
  • WcDriver
    • 获取配置信息,获取job实例
    • 指定jar包所在路径
    • 关联WcMapper/WcReducer类
    • 指定WcMapper输出的k,v类型
    • 指定最终输出的k,v类型
    • 指定输入文件路径
    • 指定输出文件路径
    • 提交作业

环境准备

  • 创建mavern工程(这个我写了不少了,就省略了)
  • pom.xml添加依赖
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
    
  • 在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入(HDFS案例中说过)
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="error" strict="true" name="XMLConfig">
        <Appenders>
            <!-- 类型名为Console,名称为必须属性 -->
            <Appender type="Console" name="STDOUT">
                <!-- 布局为PatternLayout的方式,
                输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
                <Layout type="PatternLayout"
                        pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
            </Appender>
    
        </Appenders>
    
        <Loggers>
            <!-- 可加性为false -->
            <Logger name="test" level="info" additivity="false">
                <AppenderRef ref="STDOUT" />
            </Logger>
    
            <!-- root loggerConfig设置 -->
            <Root level="info">
                <AppenderRef ref="STDOUT" />
            </Root>
        </Loggers>
    
    </Configuration>
    

编写程序

  • WcMapper类
    package com.company.wordcount;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /*
        作用 :用来实现需要在MapTask中实现的功能
    
        Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:
            两组:
                第一组:
                    KEYIN :读取数据的偏移量的类型
                    VALUEIN : 读取的一行一行的内容的类型
                第二组:
                     KEYOUT : 写出的Key的类型(在这指的是单词)
                     VALUEOUT :写出的Value的类型(在这指的是单词的数量)
     */
    //自定义的类想要实现MapTask中实现的功能必须继承Mapper
    public class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
        //key中存放的就是单词
        private Text outKey = new Text();
        //value中存放的是单词的数量
        private LongWritable outValue = new LongWritable();
        /**
         * 在map方法中去实现需要在MapTask中实现的功能
         * 注意:map方法在被循环调用(MR框架-MapTask程序)
         *      每调用一次就会传入一行内容
         * @param key :读取数据的偏移量
         * @param value :读取的一行一行的内容
         * @param context :上下文(在这用来通过上下文将K,V写出)
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //1.将内容进行分割
            //1.1将Text转成字符串为了方便操作
            String line = value.toString();
            String[] words = line.split(" ");
            //2.封装K,V
            for (String word : words) {
                //设置key的值
                outKey.set(word);
                //设置value的值
                outValue.set(1);
                //3.将K,V写出去
                context.write(outKey,outValue);
            }
    
        }
    }
    
  • WcReducer类
    package com.company.wordcount;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /*
        作用 :用来实现需要在RedcueTask中实现的功能
    
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
            两组:
                第一组:
                    KEYIN :读取的Key的类型(Mapper输出的Key的类型)
                    VALUEIN :读取的Value的类型(Mapper输出的Value的类型)
                第二组:
                    KEYOUT :输出的Key的类型(在这指的是单词)
                    VALUEOUT :输出的Value的类型(在这指的是单词的数量)
    
     */
    //自定义的类想要实现ReduceTask中实现的功能必须继承Reducer
    public class WCReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
        //输出的value在这用来设置单词的数量
        private LongWritable outValue = new LongWritable();
        /**
         * 在reduce方法中去实现需要在ReduceTask中实现的功能
         * 注意::reduce方法在被循环调用(MR框架-ReduceTask程序)
         *      每调用一次就会传入一组数据(key值相同为一组)
         * @param key :读取的key值
         * @param values :一组数据中所有的value
         * @param context : 上下文(在这用来将K,V写出去)
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            //1.累加所有的value
            for (LongWritable value : values) {
                //将LongWritable转换成long
                long v = value.get();
                sum += v;
            }
            //2.封装K,V
            //设置单词的数量
            outValue.set(sum);
            //3.写出K,V
            context.write(key,outValue);
        }
    }
    
  • WcDriver类
    package com.company.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /*
        程序的入口(主程序)
     */
    public class WCDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1.创建Job对象(提交的任务)
            Job job = Job.getInstance(new Configuration());
    
            //2.设置参数
            //2.1设置Jar加载路径 --在本地运行不需要设置,在集群上运行需要设置
            //作用 :用来告诉集群主类是谁。
            job.setJarByClass(WCDriver.class);
            //2.2设置Mapper和Reducer类
            job.setMapperClass(WCMapper.class);
            job.setReducerClass(WCReducer.class);
            //2.3设置Mapper输出的K,V类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //2.4设置最终输出的K,V类型(在这是Redcuer的K,V类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //2.5设置输入(数据)和输出(结果)的路径
            FileInputFormat.setInputPaths(job,new Path("E:\\\\io\\\\input"));
            //注意:输出的路径必须不存在-否则报错
            FileOutputFormat.setOutputPath(job,new Path("E:\\\\io\\\\output2"));
    
            //3.提交Job
            /*
                参数 :是否打印进度
                返回值 :如果成功返回true
             */
            boolean result = job.waitForCompletion(true);
            //退出JVM的方式 :0正常退出,1非正常退出
            System.exit(result ? 0 : 1);
        }
    }
    

输出结果:

Hadoop	1
I	3
bigdata	2
hello	3
java	1
like	3
在集群中运行wordcount
  • 还是前面的案例,我们现在实现一下在集群中运行,我们需要将打成Jar包在集群上运行,使用Maven中学习到的知识,将它进行package,然放到我们的服务器上
  • 运行前,还需要修改一下我们的WcDriver类,作用是让它识别我们jar包的位置
  • 集群运行wordcount的命令
        命令:hadoop jar xxx.jar 主类的全类名  参数1(输入路径)  参数2(输出路径)
    
  • WcDriver类
    package com.company.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /*
        打成Jar包在集群上运行
    
        命令:hadoop jar xxx.jar 主类的全类名  参数1(输入路径)  参数2(输出路径)
     */
    public class WCDriver2 {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1.创建Job对象(提交的任务)
            Job job = Job.getInstance(new Configuration());
    
            //2.设置参数
            //2.1设置Jar加载路径 --在本地运行不需要设置,在集群上运行需要设置
            //作用 :用来告诉集群主类是谁。
            job.setJarByClass(WCDriver2.class);
            //2.2设置Mapper和Reducer类
            job.setMapperClass(WCMapper.class);
            job.setReducerClass(WCReducer.class);
            //2.3设置Mapper输出的K,V类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //2.4设置最终输出的K,V类型(在这是Redcuer的K,V类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //2.5设置输入(数据)和输出(结果)的路径
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //注意:输出的路径必须不存在-否则报错
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            //3.提交Job
            /*
                参数 :是否打印进度
                返回值 :如果成功返回true
             */
            boolean result = job.waitForCompletion(true);
            //退出JVM的方式 :0正常退出,1非正常退出
            System.exit(result ? 0 : 1);
        }
    }
    

从Windows向集群提交Job

前面我们是将jar包放到了集群上,我们也可以在windows本地来向集群提交job,来进行wordcount的任务

  • WcDriver类
    package com.company.wordcont;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    
    /*
        从Windows向集群提交Job
    
        1.配置:
             //设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
            conf.set("fs.defaultFS", "hdfs://hadoop102:9820");
            //指定MR运行在Yarn上
            conf.set("mapreduce.framework.name","yarn");
            //指定MR可以在远程集群运行
            conf.set("mapreduce.app-submission.cross-platform","true");
            //指定yarn resourcemanager的位置
            conf.set("yarn.resourcemanager.hostname", "hadoop103");
        2.打包
        3.将job.setJarByClass(WCDriver3.class);注释掉
          添加job.setJar(jar包路径)
        4.在EditConfigurations中
            VM Options : -DHADOOP_USER_NAME=atguigu
            Program Arguments : hdfs://hadoop102:9820/input hdfs://hadoop102:9820/output3
     */
    public class WCDriver3 {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            //设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
            conf.set("fs.defaultFS", "hdfs://hadoop102:9820");
            //指定MR运行在Yarn上
            conf.set("mapreduce.framework.name","yarn");
            //指定MR可以在远程集群运行
            conf.set("mapreduce.app-submission.cross-platform","true");
            //指定yarn resourcemanager的位置
            conf.set("yarn.resourcemanager.hostname", "hadoop103");
    
            Job job = Job.getInstance(conf);
            //job.setJarByClass(WCDriver3.class);
            //指定向集群提交的Job包路径
            job.setJar("D:\\\\class_video\\\\0323\\\\06-hadoop\\\\3.代码\\\\MRDemo\\\\target\\\\MRDemo-1.0-SNAPSHOT.jar");
    
            job.setMapperClass(WCMapper.class);
            job.setReducerClass(WCReducer.class);
            job.setMapOutputKeyClass(Text.class);
      

    以上是关于打怪升级之小白的大数据之旅(四十八)<初识MapReduce>的主要内容,如果未能解决你的问题,请参考以下文章

    打怪升级之小白的大数据之旅(五十八)<HadoopHA>

    打怪升级之小白的大数据之旅(四十一)<大数据与Hadoop概述>

    打怪升级之小白的大数据之旅(四十三)<Hadoop运行模式(集群搭建)>

    打怪升级之小白的大数据之旅(四十二)<Hadoop运行环境搭建>

    打怪升级之小白的大数据之旅(四十七)<HDFS扩展知识点>

    打怪升级之小白的大数据之旅(四十六)<HDFS各模块的原理>