Hadoop3 - MapReduce 数据压缩
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 数据压缩相关的知识,希望对你有一定的参考价值。
一、数据压缩
在数据规模大和工作负载密集的情况下,运行 MapReduce
程序时,磁盘I/O
操作、网络数据传输需要花费大量的时间,对于大数据处理本身就是一个花费时间的问题,在磁盘I/O
或 网络传输上再占用大量的时间无疑是额外的消耗,那需要如何解决呢 !
在我们平常给别人传文件时,都会习惯性的将文件压缩下再进行发送,因为这样不仅可以加快传输的速度,而且还节省空间,这种方式同样可以适用于 MapReduce
处理中。
数据压缩本身就是是 MapReduce
的一种优化策略:通过压缩编码对 Mapper 或者Reduce
的输出进行压缩,以减少磁盘IO
,提高MR
程序运行速度,还可以减少文件存储所占空间,加快文件传输效率,并降低降低IO读写的次数。但是压缩也有缺点,使用数据时需要先对文件解压,无疑加重了CPU
的负荷,压缩算法越复杂,解压时间越长。
在 Hadoop
中常见的压缩算法有 Gzip、Lzo、Bzip2、Snappy
,下面是各个算法之间的优缺点:
压缩算法 | 优点 | 缺点 |
---|---|---|
Gzip | 压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便 | 不支持split |
Lzo | 压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便 | 压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式) |
Bzip2 | 支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便 | 压缩/解压速度慢;不支持native |
Snappy | 压缩速度快;支持hadoop native库 | 不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令 |
在 Hadoop
中的压缩位置可以是 Input
源头数据,Mapper
的中间输出,Reduce
的最终输出:
Input 源头数据压缩
Hadoop会自动检查压缩文件的扩展名,使用对应的解码器进行解码无需单独指定。
Mapper的中间输出压缩
需要开启压缩,并指定压缩算法类型:
conf.set("mapreduce.map.output.compress","true");
conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
Reduce的最终输出压缩
同样需要开启压缩,并指定压缩算法:
conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");
下面基于下面数据集进行处理:
date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27
通过进入压缩的方式,计算每个州的累计 deaths 总和。
这里我们使用 Bzip2
作为 Mapper
阶段压缩,使用 Gzip
作为 Reduces
阶段压缩,已演示整体流程:
先将文本数据集压缩成 Gzip 格式:
然后在驱动类中,开启 Mapper
阶段和 Reducer
阶段的压缩,并指定算法,完整的代码如下:
public class GzipDriver extends Configured implements Tool
public static void main(String[] args) throws Exception
Configuration conf = new Configuration();
//配置 Mapper 输出为 Gzip 格式
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.BZip2Codec");
//配置输出结果压缩为Gzip格式
conf.set("mapreduce.output.fileoutputformat.compress", "true");
conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
int status = ToolRunner.run(conf, new GzipDriver(), args);
System.exit(status);
@Override
public int run(String[] args) throws Exception
Job job = Job.getInstance(getConf(), GzipDriver.class.getSimpleName());
job.setJarByClass(GzipDriver.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path("D:/test/input2"));
Path output = new Path("D:/test/output2");
FileSystem fs = FileSystem.get(getConf());
if (fs.exists(output))
fs.delete(output, true);
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true) ? 0 : 1;
public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable>
private Text outKey = new Text();
private LongWritable outValue = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String[] fields = value.toString().split(",");
outKey.set(fields[2]);
outValue.set(Long.parseLong(fields[fields.length - 1]));
context.write(outKey, outValue);
public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable>
private LongWritable outValue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
long totalDeaths = 0;
//累加统计
for (LongWritable value : values)
totalDeaths += value.get();
outValue.set(totalDeaths);
context.write(key, outValue);
解压后查看内容:
以上是关于Hadoop3 - MapReduce 数据压缩的主要内容,如果未能解决你的问题,请参考以下文章