Hadoop3 - MapReduce 数据压缩

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 数据压缩相关的知识,希望对你有一定的参考价值。

一、数据压缩

在数据规模大和工作负载密集的情况下,运行 MapReduce 程序时,磁盘I/O 操作、网络数据传输需要花费大量的时间,对于大数据处理本身就是一个花费时间的问题,在磁盘I/O 或 网络传输上再占用大量的时间无疑是额外的消耗,那需要如何解决呢 !

在我们平常给别人传文件时,都会习惯性的将文件压缩下再进行发送,因为这样不仅可以加快传输的速度,而且还节省空间,这种方式同样可以适用于 MapReduce 处理中。

数据压缩本身就是是 MapReduce 的一种优化策略:通过压缩编码对 Mapper 或者Reduce 的输出进行压缩,以减少磁盘IO,提高MR程序运行速度,还可以减少文件存储所占空间,加快文件传输效率,并降低降低IO读写的次数。但是压缩也有缺点,使用数据时需要先对文件解压,无疑加重了CPU 的负荷,压缩算法越复杂,解压时间越长。

Hadoop 中常见的压缩算法有 Gzip、Lzo、Bzip2、Snappy ,下面是各个算法之间的优缺点:

压缩算法优点缺点
Gzip压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便不支持split
Lzo压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)
Bzip2支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便压缩/解压速度慢;不支持native
Snappy压缩速度快;支持hadoop native库不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令

Hadoop 中的压缩位置可以是 Input 源头数据,Mapper的中间输出,Reduce 的最终输出:

Input 源头数据压缩

Hadoop会自动检查压缩文件的扩展名,使用对应的解码器进行解码无需单独指定。

Mapper的中间输出压缩

需要开启压缩,并指定压缩算法类型:

conf.set("mapreduce.map.output.compress","true");
conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

Reduce的最终输出压缩

同样需要开启压缩,并指定压缩算法:

conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");

下面基于下面数据集进行处理:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27

通过进入压缩的方式,计算每个州的累计 deaths 总和。

这里我们使用 Bzip2 作为 Mapper 阶段压缩,使用 Gzip 作为 Reduces 阶段压缩,已演示整体流程:

先将文本数据集压缩成 Gzip 格式:


然后在驱动类中,开启 Mapper 阶段和 Reducer 阶段的压缩,并指定算法,完整的代码如下:

public class GzipDriver extends Configured implements Tool 

    public static void main(String[] args) throws Exception 
        Configuration conf = new Configuration();
        //配置 Mapper 输出为 Gzip 格式
        conf.set("mapreduce.map.output.compress", "true");
        conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.BZip2Codec");
        //配置输出结果压缩为Gzip格式
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        int status = ToolRunner.run(conf, new GzipDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        Job job = Job.getInstance(getConf(), GzipDriver.class.getSimpleName());
        job.setJarByClass(GzipDriver.class);

        job.setMapperClass(SumMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setNumReduceTasks(1);
        job.setReducerClass(SumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.addInputPath(job, new Path("D:/test/input2"));
        Path output = new Path("D:/test/output2");
        FileSystem fs = FileSystem.get(getConf());
        if (fs.exists(output)) 
            fs.delete(output, true);
        
        FileOutputFormat.setOutputPath(job, output);
        return job.waitForCompletion(true) ? 0 : 1;
    

    public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> 

        private Text outKey = new Text();
        private LongWritable outValue = new LongWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
            String[] fields = value.toString().split(",");
            outKey.set(fields[2]);
            outValue.set(Long.parseLong(fields[fields.length - 1]));
            context.write(outKey, outValue);
        
    

    public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> 

        private LongWritable outValue = new LongWritable();

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
            long totalDeaths = 0;
            //累加统计
            for (LongWritable value : values) 
                totalDeaths += value.get();
            
            outValue.set(totalDeaths);
            context.write(key, outValue);
        
    

解压后查看内容:

以上是关于Hadoop3 - MapReduce 数据压缩的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce DB 操作

Hadoop3 - MapReduce DB 操作

Hadoop3 - MapReduce 分组介绍及案例实践