MapReduce Join案例ETL压缩简介

Posted weixin_46628668

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce Join案例ETL压缩简介相关的知识,希望对你有一定的参考价值。

Join案例

MapReduce里的join跟SQL很类似,本质上还是联表查询,即我们最终需要的信息存储在两张表里,两张表有一个相同的字段,从而可以关联起来。MapReduce里的join动作可以再Reduce端做也可以在Map端做,数据量大时则最好在Map阶段做。

先看需求:有一个订单表和一个商品表,订单表里只有商品id,没有商品名称,商品名称在商品表里,现在需要将订单表里的商品id替换为商品名称显示出来。

 如果是在Reduce端做join操作,最好定义一个TableBean(名称随意,随便取,只要实现Writable接口即可),Map阶段的输出的key就是商品id(通过商品id将两张表关联起来),输出的value就是这个TableBean。在map阶段也要区分好每次读取的内容来自哪个文件,可以通过setup的方法来完成,setup方法会针对每个分片执行一次,因此可以获取源文件路径:

FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();

然后在map方法中,根据fileName(可定义为类的私有变量)来判断读的是订单表还是商品表,如果是订单表,设置这个TableBean的订单id、商品id、数量信息,并且把标志位记为订单,如果是商品表,设置这个TableBean的商品id、商品名称,并且把标志位记为商品。然后相同key(即相同商品id)的TableBean会进入到同一个reduce,此时只要将商品TableBean的商品名称设给订单TableBean即可,直接输出。

这种在reduce阶段进行join的操作可能会给reduce带来很大的压力,因此推荐在map阶段就进行join。因为商品表数据不会很多,所以考虑把商品表缓存起来,在job提交阶段,设置缓存文件为商品表:

job.addCacheFile(new URI("xxx"));

同时也要设置ReduceTask数量为0:

job.setNumReduceTasks(0);

在setup方法中获取缓存文件并读取:

URI[] cacheFiles = context.getCacheFiles();

FsDataInputStream fis = FileSystem.get(context.getConfiguration()).open(new Path(cacheFiles[0]))

BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

String line;
while(StringUtils.isNotEmpty(line = reader.readLine())) 
    String[] fields = line.split("\\t");
    map.put(fields[0], fields[1]);


IOUtils.closeStream(reader);

ETL

ETL,即Extract Transform Load,描述数据从抽取(Extract)、转换(Transform)、加载(Load)到目的端的过程,这期间往往需要数据清洗,即清理掉不需要的数据。数据清洗一般只需要允许map程序,不需要允许reduce程序,一般配合正则表达式使用。

数据压缩

在map阶段的输入、输出(即reduce阶段的输入)、reduce阶段的输出,文件格式通常为压缩文件,这样可以节省磁盘空间、减少IO,但是也增加了CPU的开销,这种利弊需要在具体项目中进行权衡。常见的压缩算法对比:

 这些算法的压缩性能比较:

可以看出,snappy压缩/解压速度最快,但是压缩率不高,bzip2压缩率最高,但是速度最慢。压缩算法的选择可以参考下图:

要想在hadoop中开启压缩,需要配置如下参数:

 

例如,在map端输出开启压缩:

conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

这个设置之后可能没效果,因为这只是map的输出压缩格式,reduce端会自动解压,修改reduce输出端的压缩格式才会看到输出的是压缩文件:

FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

Snappy的压缩格式需要结合Linux8使用,如果在windows环境会报错。

以上是关于MapReduce Join案例ETL压缩简介的主要内容,如果未能解决你的问题,请参考以下文章

Etl之HiveSql调优(left join where的位置)

大数据之Hadoop(MapReduce):数据清洗(ETL)

大数据之Hadoop(MapReduce):压缩实操案例

Hadoop核心之MapReduce案例总结Ⅱ

hadoop mapreduce开发实践文件合并(join)

大数据之Hadoop(MapReduce):Join之Reduce Join应用