Hadoop中的MapReduce——分布式离线计算框架

Posted 阳哥赚钱很牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的MapReduce——分布式离线计算框架相关的知识,希望对你有一定的参考价值。

上次的文章中,我们使用Java代码实现了MapReduce,这次我们正式的来做MapReduce编程

另外,MapReduce不是只能由Java写,用python、scala也可以

运行三个实例进程

分布式计算程序的时候 先分map后合reduce
 *       MapTask----将计算程序分布的运行在多台电脑上  并且是并行运行  互相之前不干扰
 *       ReduceTask----将不同电脑上map计算出来的结果汇总起来  也可以有个reduce 并且
 *                    多个reduce之前是并行运行  互不干扰的
 *                    但是reduce的执行需要依赖于Map阶段的数据
 *       MRAppMaster----YARN  管理MapTask和ReduceTask进行调度


做Mapreduce编程的话:三步编程法
 *       1、编写Mapper类  也就是我们的MapTask任务
 *       2、编写Reducer类 也就是我们的reduceTask任务
 *       3、编写Driver驱动类  关联MapTask和ReduceTask任务并且提交运行

【注意】:
 *       1、MapReduce程序处理的数据大部分都是HDFS上的文件数据
 *       2、Map阶段和reduce阶段数据都需要通过key-value键值对的形式进行输入和输出
 *       3、一般情况下map阶段的输出的key-value键值对就是reduce阶段输入的key-value键值对
 *       4、一般情况下一个MapReduce程序只能有一个map和一个reduce,如果程序逻辑复杂,那么需要编写多个MR程序去串行运行

        上次我们写的java文件流来去HDFS上的文件,对于结果输出,输出路径不能提前存在,否则会报错。那么我们如果想多次执行,每次都需要删那个输出文件。我们现在可以改进一下:

//通过Job指定文件输入
        FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.10.3:9000/wordcount.txt"));
        //通过job指定文件输出到什么地方
        Path outPath = new Path("hdfs://192.168.100.3:9000/output");
        //输出路径不能提前存在 否则报错
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);

        //job提交运行
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);

使用if判断是否存在这个输出文件,若存在则提前删除。

一、MapReduce中的序列化

1.什么是序列化、反序列化?

        序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。 

        反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

2.序列化的原因以及JAVA序列化和大数据序列化的区别

        Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。

翻译一下  : 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储"活的”"对象,可以将“活的”对象发送到远程计算机。.

Hadoop序列化特点:
(1)紧凑:高效使用存储空间。

(2)快速:读写数据的额外开销小。

(3)可扩展:随着通信协议的升级而可升级。

(4)互操作:支持多语言的交互。

3.常用的数据序列化类型

常用的数据类型对应的hadoop数据序列化类型

Java类型

Hadoop Writable类型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

4.自定义bean对象实现序列化接口

数据序列化类型远不止上面那些已有的类型,还可以是用户自定义的对象。

自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下7项。

1)必须实现Writable接口

2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

3)重写序列化方法

4)重写反序列化方法

5)注意反序列化的顺序和序列化的顺序完全一致

6)要想把结果显示在文件中,需要重写toString(),且用”\\t”分开,方便后续用

7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

给大家看一眼我们要操作的文本数据:

 

我们现在来自定义一个bean对象:

**
 * JavaBean对象主要的目的是为了封装手机号的上行流量和下行流量
 * 然后在Map阶段当作我们key-value键值对的value输出到reduce阶段
 *
 *FlowBean是无法直接当作MR程序的key-value键值对的  除非Javabean对象是Hadoop的一个序列化对象
 *   1、让自定义Javabean对象实现一个接口Writable(hadoop的序列化接口 实现这个接口那么Javabean对象就可实现序列化)
 *   2、重写这个接口中的两个方法:序列化的方法--怎么将对象序列化成二进制    反序列化的方法:怎么将二进制转成Javabean对象
 */
public class FlowBean implements Writable{
    private String phone;
    private int upFlow;
    private int downFlow;
    private int sumFlow;

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public int getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(int upFlow) {
        this.upFlow = upFlow;
    }

    public int getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(int downFlow) {
        this.downFlow = downFlow;
    }

    public int getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(int sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "phone='" + phone + '\\'' +
                ", upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }

    /**
     * 序列化的方法:将Java对象的属性值怎么序列化写出
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.phone);//将一个String类型的属性序列化写出二进制数据
        dataOutput.writeInt(upFlow);
        dataOutput.writeInt(downFlow);
        dataOutput.writeInt(sumFlow);
    }

    /**
     * 反序列化的方法:将二进制代码转成Javabean对象属性的值
     *    反序列化的时候  读取二进制数据值的时候不能随便读
     *    你序列化写出的时候先写出的哪个属性的值 那么你就先读哪个属性的值
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.phone = dataInput.readUTF();
        this.upFlow = dataInput.readInt();
        this.downFlow = dataInput.readInt();
        this.sumFlow = dataInput.readInt();
    }
}

二、编写Mapper程序

map阶段:
 *       读取到每一行数据   将每一行中的数据手机号  上行流量 和下行流量获取出来
 *       map阶段再去输出中间键值对数据的时候 以手机号为key  以上行流量和下行流量为value发送给reduce

/**
 * 计算每一个手机号总的上行流量  总的下行流量  总流量
 * 两个阶段:
 *    map阶段:
 *       读取到每一行数据   将每一行中的数据手机号  上行流量 和下行流量获取出来‘
 *       map阶段再去输出中间键值对数据的时候 以手机号为key  以上行流量和下行流量为value发送给reduce
 *    reduce阶段
 *       reduce根据手机号(key)将这个手机对应的所有的上行流量和下行流量获取到  然后累加即可
 *
 */
public class MapperTest extends Mapper<LongWritable, Text,Text,FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\\t");
        FlowBean flowBean = new FlowBean();
        String phone = fields[1];
        int upFlow = Integer.parseInt(fields[fields.length - 3]);//倒数第三个数据
        int downFlow = Integer.parseInt(fields[fields.length - 2]);
        flowBean.setPhone(phone);
        flowBean.setUpFlow(upFlow);
        flowBean.setDownFlow(downFlow);
        /**
         * 将数据以手机号为key  flowbean对象为value写出到reduce
         */
        context.write(new Text(phone),flowBean);
    }
}

三、Reduce程序编写

public class ReduceTest extends Reducer<Text,FlowBean, NullWritable,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        Iterator<FlowBean> iterator = values.iterator();
        int upSum=0;
        int downSum = 0;
        while (iterator.hasNext()){
            FlowBean bean = iterator.next();
            upSum+=bean.getUpFlow();
            downSum+=bean.getDownFlow();
        }
        int sum = upSum+downSum;
        FlowBean flowBean = new FlowBean();
        flowBean.setPhone(key.toString());
        flowBean.setUpFlow(upSum);
        flowBean.setDownFlow(downSum);
        flowBean.setSumFlow(sum);
        context.write(NullWritable.get(),flowBean);
    }
}

四、Driver驱动类编写

public class Driver {
    public static void main(String[] args) throws Exception {
        //获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置jar包
        job.setJarByClass(Driver.class);
        //关联mapper和reducer
        job.setMapperClass(MapperTest.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setReducerClass(ReduceTest.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(FlowBean.class);
        //关联文件的输入和文件的输出
        FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.10.3:9000/phone_data.txt"));
        Path outPath = new Path("hdfs://192.168.100.3:9000/output1");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        //提交运行
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

五、总结

 Hadoop中序列化机制
  主要的作用是将MR程序中产生的数据以序列化类型在网络中 不同的电脑中数据传递
 *    常用序列化数据类型
 *      LongWritable---long
 *      IntWritable----int
 *      NullWritable---null值

  如果我们想用一个自定义的Javabean对象充当mr程序的key-value键值对的输入和输出 那么Javabean对象必须实现hadoop的序列化机制
 *    1、实现接口Writable
 *    2、重写write方法--序列化写出的方法----将Java对象的属性值序列化写出
 *    3、重写readFields方法---反序列化的方法 ----将二进制数据反序列化成Javabean对象的属性值
 *         要求:反序列化的时候 反序列化的顺序必须是write方法写出数据的顺序
 *    4、如果我们reduce阶段也是输出的JavaBean对象 那么在文件当中数据的格式就是Javabean对象的toString()方法
 

以上是关于Hadoop中的MapReduce——分布式离线计算框架的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop之分布式存储HDFS和离线计算MapReduce 网盘分享 百度云

入门Hadoop---Mapreduce,Yarn是什么?

3. Hadoop MapReduce

Hbase原理深入解析及集成Hadoop

Spark与Hadoop的比较

hadoop离线day06--Hadoop MapReduceHDFS高阶