MapReduce小文件优化与分区

Posted areyouready

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce小文件优化与分区相关的知识,希望对你有一定的参考价值。

一、小文件优化

1.Mapper类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 思路?
 * wordcount单词计数
 * <单词,1>
 * 
 * 数据传输
 * 
 * KEYIN:数据的起始偏移量0~10 11~20 21~30 
 * VALUEIN:数据
 * 
 * KEYOUT:mapper输出到reduce阶段 k的类型
 * VALUEOUT:mapper输出到reduce阶段v的类型
 * <China,1><Beijing,1><love,1>
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    //key 起始偏移量 value 数据  context 上下文
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.读取数据
        String line = value.toString();
        // 2.切割 love Beijing
        String[] words = line.split(" ");
        // 3.循环的写到下一个阶段<love,1><Beijing,1>
        for (String w : words) {
            // 4.输出到reducer阶段
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

2.Reducer类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        // 统计单词出现的次数
        int sum = 0;
        // 累加求和
        for (IntWritable count : values) {
            // 拿到值累加
            sum += count.get();
        }
        // 结果输出
        context.write(key, new IntWritable(sum));
    }
}

3.Driver类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 获取jar包
        job.setJarByClass(WordCountDriver.class);
        // 获取自定义的mapper与reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 指定运行的inputformat方式  默认的方式是textinputformat(小文件优化)
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M
        CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M
        // 设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/in1024/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/out1024/"));
        // 提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

二、分区

1.Mapper类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
 * 
 * 13726130503  299    681 980
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.获取数据
        String line = value.toString();
        
        // 2.切割
        String[] fields = line.split("	");
        
        // 3.封装对象 拿到关键字段 数据清洗
        String phoneN = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dfFlow = Long.parseLong(fields[fields.length - 2]);
        
        // 4.输出到reduce端
        context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
    }
}

2.Reducer类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        // 1.相同手机号 的流量使用再次汇总
        long upFlow_sum = 0;
        long dfFlow_sum = 0;
        
        // 2.累加
        for (FlowBean f : values) {
            upFlow_sum += f.getUpFlow();
            dfFlow_sum += f.getDfFlow();
        }        
        FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);        
        // 3.输出
        context.write(key, rs);
    }
}

3.封装类

package com.css.flow.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 封装类 数据的传输
 */
public class FlowBean implements Writable{
    // 定义属性
    private long upFlow;
    private long dfFlow;
    private long flowSum;
    
    public FlowBean() {        
    }
    
    // 流量累加
    public FlowBean(long upFlow, long dfFlow) {
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowSum = upFlow + dfFlow;
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowSum = in.readLong();        
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowSum);
    }
        
    @Override
    public String toString() {
        return upFlow + "	" + dfFlow + "	" + flowSum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDfFlow() {
        return dfFlow;
    }

    public void setDfFlow(long dfFlow) {
        this.dfFlow = dfFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }
}

4.分区类

package com.css.flow.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{

    // 根据手机号前三位进行分区
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 获取手机号前三位
        String phoneNum = key.toString().substring(0, 3);
        // 分区
        int partitioner = 4;
        
        if ("135".equals(phoneNum)) {
            return 0;
        }else if ("137".equals(phoneNum)) {
            return 1;
        }else if ("138".equals(phoneNum)) {
            return 2;
        }else if ("139".equals(phoneNum)) {
            return 3;
        }
        return partitioner;
    }
}

5.Driver类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 2.获取jar包
        job.setJarByClass(FlowCountDriver.class);
        
        // 3.获取自定义的mapper与reducer类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        
        // 4.设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        // 5.设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        // 设置自定义的分区类
        // 自定义分区个数要大于分区数
        job.setPartitionerClass(PhoneNumPartitioner.class);
        job.setNumReduceTasks(5);
        
        // 6.设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
        
        // 7.提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

6.输入的文件HTTP_20180313143750.dat

3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
3631279950322    13822544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4    www.taobao.com    淘宝网    4    0    264    0    200
3631279910362    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
3631244000322    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
3631279930342    18212575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
3631279950342    13884138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
3631279930352    13510439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
3631279950332    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    316    296    200
3631279830392    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
3631279840312    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    660    690    200
3631279730382    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    369    338    200
3631279860392    15889002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    938    380    200
3631279920332    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
3631279860312    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    120    1320    200
3631279840302    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    198    910    200
3631279950332    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
3631279820302    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    735    11349    400
3631279860322    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    212    200
3631279900332    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    4243    200
3631279880322    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
3631279850362    13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
3631279930352    13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1136    94    200
3631279930353    13560436326    C4-17-FE-BA-DE-D9:CMCC    120.196.100.77    lol.qq.com/    英雄联盟    18    15    1136    94    200

7.输出的文件

(1)part-r-00000
13502468823    735    11349    12084
13510439658    1116    954    2070
13560436326    1136    94    1230
13560436666    1136    94    1230
13560439658    918    4938    58562)part-r-00001
13719199419    240    0    240
13726130503    299    681    980
13726238888    2481    24681    27162
13760778710    120    120    2403)part-r-00002
13822544101    264    0    264
13884138413    4116    1432    55484)part-r-00003
13922314466    3008    3720    6728
13925057413    11058    4243    15301
13926251106    240    0    240
13926435656    132    1512    16445)part-r-00004
13480253104    120    1320    1440
13602846565    198    910    1108
13660577991    660    690    1350
15013685858    369    338    707
15889002119    938    380    1318
15920133257    316    296    612
18212575961    1527    2106    3633
18320173382    9531    212    9743

以上是关于MapReduce小文件优化与分区的主要内容,如果未能解决你的问题,请参考以下文章

hive优化之小文件合并

Hadoop调优

hive 的支持的文件类型与 压缩格式

大数据小文件问题与企业级解决方案

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客

Hadoop3 - MapReduce 属性优化