MapReduce分析流量汇总

Posted areyouready

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce分析流量汇总相关的知识,希望对你有一定的参考价值。

一、MapReduce编程规范

一、MapReduce编程规范
    用户编写mr程序主要分为三个部分:Mapper,Reducer,Driver
    1.Mapper阶段
        (1)用户自定义Mapper类 要继承父类Mapper
        (2)Mapper的输入数据的kv对形式(kv类型可以自定义)
        (3)Mapper的map方法的重写(加入业务逻辑)
        (4)Mapper的数据输出kv对的形式(kv类型可以自定义)
        (5)map()方法(maptask进程)对每个<k,v>调用一次
        
    2.Reducer阶段
        (1)用户自定义Reducer类 要继承父类Reducer
        (2)Reducer的数据输入类型对应的是Mapper阶段的输出数据类型,也是kv对
        (3)Reducer的reduce方法的重写(加入业务逻辑)
        (4)ReduceTask进程对每组的k的<k,v>组调用一次reduce方法
        
    3.Driver阶段
        MR程序需要一个Driver来进行任务的提交,提交的任务是一个描述了各种重要信息的job对象
    
    4.修改mapred-site.xml文件<configuration>中加入
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

二、常用数据序列化类型
    1. JAVA 类型                    HADOOP 类型
        int                            IntWritable
        float                        FloatWritable
        long                        LongWritable
        double                        DoubleWritable
        string                        Text
        boolean                        BooleanWritable
        byte                        ByteWritable
        map                            MapWritable
        array                        ArrayWritable

    2.为什么要序列化?
    存储“活的对象”

    3.什么是序列化?
    序列化就是把内存当中的对象,转换成字节序列以便于存储和网络传输。
    反序列化就是将受到的字节序列或者硬盘的持久化数据,转换成内存中的对象。

    java的序列化-->Serializable

    4.为什么不使用java提供的序列化接口?
    java的序列化是一个重量级的序列化框架,一个对象被序列化后会附带很多额外的信息(效验信息,header,继承体系等)。
    不便于在网络中高效传输,所以hadoop开发了一套序列化机制(Writable),精简/高效。

    5.为什么序列化在hadoop中很重要?
    hadoop通信是通过远程调用(rpc)实现的,需要进行序列化

    6.特点:
        1)紧凑
        2)快速
        3)可拓展
        4)互操作

二、MapReduce分析流量汇总

1.Mapper类

package com.css.flow;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
 * 13726130503  299    681 980
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.获取数据
        String line = value.toString();        
        // 2.切割
        String[] fields = line.split("	");      
        // 3.封装对象 拿到关键字段 数据清洗
        String phoneN = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dfFlow = Long.parseLong(fields[fields.length - 2]);        
        // 4.输出到reduce端
        context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
    }
}

2.Reducer类

package com.css.flow;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        // 1.相同手机号 的流量使用再次汇总
        long upFlow_sum = 0;
        long dfFlow_sum = 0;        
        // 2.累加
        for (FlowBean f : values) {
            upFlow_sum += f.getUpFlow();
            dfFlow_sum += f.getDfFlow();
        }        
        FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);        
        // 3.输出
        context.write(key, rs);
    }
}

3.Driver类

package com.css.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 2.获取jar包
        job.setJarByClass(FlowCountDriver.class);
        
        // 3.获取自定义的mapper与reducer类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        
        // 4.设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        // 5.设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        // 6.设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
        
        // 7.提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

4.封装类,数据的传输

package com.css.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 封装类 数据的传输
 */
public class FlowBean implements Writable{
    // 定义属性
    private long upFlow;
    private long dfFlow;
    private long flowSum;    
public FlowBean() { }
// 流量累加 public FlowBean(long upFlow, long dfFlow) { this.upFlow = upFlow; this.dfFlow = dfFlow; this.flowSum = upFlow + dfFlow; }
// 反序列化 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dfFlow = in.readLong(); flowSum = in.readLong(); }
// 序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowSum); }
@Override public String toString() { return upFlow + " " + dfFlow + " " + flowSum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDfFlow() { return dfFlow; } public void setDfFlow(long dfFlow) { this.dfFlow = dfFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } }

5.输入的文件HTTP_20180313143750.dat

3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
3631279950322    13822544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4    www.taobao.com    淘宝网    4    0    264    0    200
3631279910362    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
3631244000322    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
3631279930342    18212575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
3631279950342    13884138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
3631279930352    13510439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
3631279950332    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    316    296    200
3631279830392    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
3631279840312    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    660    690    200
3631279730382    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    369    338    200
3631279860392    15889002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    938    380    200
3631279920332    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
3631279860312    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    120    1320    200
3631279840302    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    198    910    200
3631279950332    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
3631279820302    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    735    11349    400
3631279860322    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    212    200
3631279900332    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    4243    200
3631279880322    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
3631279850362    13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
3631279930352    13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1136    94    200
3631279930353    13560436326    C4-17-FE-BA-DE-D9:CMCC    120.196.100.77    lol.qq.com/    英雄联盟    18    15    1136    94    200

6.输出的文件part-r-00000

13480253104    120    1320    1440
13502468823    735    11349    12084
13510439658    1116    954    2070
13560436326    1136    94    1230
13560436666    1136    94    1230
13560439658    918    4938    5856
13602846565    198    910    1108
13660577991    660    690    1350
13719199419    240    0    240
13726130503    299    681    980
13726238888    2481    24681    27162
13760778710    120    120    240
13822544101    264    0    264
13884138413    4116    1432    5548
13922314466    3008    3720    6728
13925057413    11058    4243    15301
13926251106    240    0    240
13926435656    132    1512    1644
15013685858    369    338    707
15889002119    938    380    1318
15920133257    316    296    612
18212575961    1527    2106    3633
18320173382    9531    212    9743

 






以上是关于MapReduce分析流量汇总的主要内容,如果未能解决你的问题,请参考以下文章

基于MapReduce的手机流量统计分析

MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

MapReduce实现手机上网流量分析

基于MapReduce的手机上网流量统计分析

Hadoop学习之路(十九)MapReduce框架排序

MapReduce分区和排序