Hadoop压缩codec

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop压缩codec相关的知识,希望对你有一定的参考价值。

简介

codec其实就是coder和decoder两个单词的词头组成的缩略词。CompressionCodec定义了压缩和解压缩接口,我们这里讲的codec就是实现了CompressionCodec接口的一些压缩格式的类,下面就是这些类的列表:

技术分享

使用CompressionCodecs解压缩

CompressionCodec有两个方法可以方便的压缩和解压:

压缩:通过createOutputStream(OutputStream out)方法获得CompressionOutputStream对象。

解压:通过createInputStream(InputStream in)方法获得CompressionInputStream对象。

 

压缩的示例代码

 

[java] view plain copy
 
  1. package com.sweetop.styhadoop;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.io.IOUtils;  
  5. import org.apache.hadoop.io.compress.CompressionCodec;  
  6. import org.apache.hadoop.io.compress.CompressionOutputStream;  
  7. import org.apache.hadoop.util.ReflectionUtils;  
  8.   
  9. /** 
  10.  * Created with IntelliJ IDEA. 
  11.  * User: lastsweetop 
  12.  * Date: 13-6-25 
  13.  * Time: 下午10:09 
  14.  * To change this template use File | Settings | File Templates. 
  15.  */  
  16. public class StreamCompressor {  
  17.     public static void main(String[] args) throws Exception {  
  18.         String codecClassName = args[0];  
  19.         Class<?> codecClass = Class.forName(codecClassName);  
  20.         Configuration conf = new Configuration();  
  21.         CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
  22.   
  23.         CompressionOutputStream out = codec.createOutputStream(System.out);  
  24.         IOUtils.copyBytes(System.in, out, 4096, false);  
  25.         out.finish();  
  26.     }  
  27. }  

从命令行接收一个CompressionCodec实现类的参数,然后通过ReflectionUtils实例化该类,调用CompressionCodec的接口方法对标准输出流进行封装,封装成一个压缩流,通过IOUtils方法把标准输入流拷贝到压缩流中,最后调用CompressionCodec的finish方法,完成压缩。

 

再来看下命令行:

 

[html] view plain copy
 
  1. echo "Hello lastsweetop" | ~/hadoop/bin/hadoop com.sweetop.styhadoop.StreamCompressor  org.apache.hadoop.io.compress.GzipCodec | gunzip -  

使用GzipCode类来压缩"Hello lastsweetop",然后再通过gunzip工具解压。

 

我们来看一下输出:

 

[html] view plain copy
 
  1. [exec] 13/06/26 20:01:53 INFO util.NativeCodeLoader: Loaded the native-hadooplibrary  
  2.     [exec] 13/06/26 20:01:53 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library  
  3.     [exec] Hello lastsweetop  


使用CompressionCodecFactory解压缩

 

如果你想读取一个被压缩的文件的话,首先你的先通过扩展名判断该用哪一种codec,当然这里有更简便的方法,CompressionCodecFactory已经帮你把这件事做了,通过传入一个Path调用它的getCodec()方法即可获得相应的codec。我们来看源码:

 

[java] view plain copy
 
  1. public class FileDecompressor {  
  2.   
  3.     public static void main(String[] args) throws Exception {  
  4.           
  5.         String uri = args[0];  
  6.         Configuration conf = new Configuration();  
  7.         //获取文件系统  
  8.         FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);  
  9.           
  10.         //构建输入路径  
  11.         Path inputPath = new Path(uri);  
  12.         //创建CompressionCodecFactory对象  
  13.         CompressionCodecFactory factory = new CompressionCodecFactory(conf);  
  14.         //获取文件的压缩格式  
  15.         CompressionCodec codec = factory.getCodec(inputPath);  
  16.           
  17.         //如果压缩格式不存在就退出  
  18.         if (codec == null){  
  19.             System.out.println("No codec found for " + uri);  
  20.             System.exit(1);  
  21.         }  
  22.         //去掉文件的后缀,键outputUri作为解压的输出路径  
  23.         String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());  
  24.           
  25.         //定义输入输出流  
  26.         InputStream in = null;  
  27.         OutputStream out = null;  
  28.           
  29.         try {  
  30.             //创建输入输出流  
  31.             in = codec.createInputStream(fileSystem.open(inputPath));  
  32.             out = fileSystem.create(new Path(outputUri));  
  33.             //解压  
  34.             IOUtils.copyBytes(in, out, conf);  
  35.         } catch (Exception e) {  
  36.             e.printStackTrace();  
  37.         }finally{  
  38.             IOUtils.closeStream(in);  
  39.             IOUtils.closeStream(out);  
  40.         }  
  41.     }  
  42. }  

注意看下removeSuffix方法,这是一个静态方法,它可以将文件的后缀去掉,然后我们将这个路径作为解压的输出路径。CompressionCodeFactory能找到的codec也是有限的,默认只有三种org.apache.hadoop.io.compress.GzipCodec;org.apache.hadoop.io.compress.BZip2Codec;org.apache.hadoop.io.compress.DefaultCodec。如果想要添加其他的codec你需要更改io.compression.codecs属性,并注册codec。

 

 

原生库

现在越来越多原生库的概念,HDFS的codec也不例外,原生库可以极大的提升性能比如gzip的原生库解压提高50%,压缩提高10%,但不是所有codec都有原生库的,而一些codec只有原生库。我们来看下列表:

技术分享

Linux下,hadoop以前提前编译好了32位的原生库和64位的原生库,我们看下:

 

[html] view plain copy
 
  1. [[email protected] native]$pwd  
  2. /home/hadoop/hadoop/lib/native  
  3. [[email protected] native]$ls -ls  
  4. total 8  
  5. 4 drwxrwxrwx 2 root root 4096 Nov 14  2012 Linux-amd64-64  
  6. 4 drwxrwxrwx 2 root root 4096 Nov 14  2012 Linux-i386-32  

如果是其他平台的话,你就需要自己编译了,详细步骤请看这里:http://wiki.apache.org/hadoop/NativeHadoop
java原生库的路径可以通过java.library.path指定,在bin目录下,hadoop的启动脚本已经指定,如果你不用这个脚本,那么你就需要在你的程序中指定了。

 

 

[html] view plain copy
 
  1. if [ -d "${HADOOP_HOME}/build/native" -o -d "${HADOOP_HOME}/lib/native" -o -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then  
  2.   
  3.   if [ -d "$HADOOP_HOME/build/native" ]; then  
  4.     JAVA_LIBRARY_PATH=${HADOOP_HOME}/build/native/${JAVA_PLATFORM}/lib  
  5.   fi  
  6.   
  7.   if [ -d "${HADOOP_HOME}/lib/native" ]; then  
  8.     if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then  
  9.       JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}  
  10.     else  
  11.       JAVA_LIBRARY_PATH=${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}  
  12.     fi  
  13.   fi  
  14.   
  15.   if [ -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then  
  16.     JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib  
  17.   fi  
  18. fi  

hadoop会去查找对应的原生库,并且自动加载,你不需要关心这些设置。但某些时候你不想使用原生库,比如调试一些bug的时候,那么可以通过hadoop.native.lib设置为false来实现。

 

如果你用原生库做大量的压缩和解压的话可以考虑用CodecPool,有点像连接池,这样你就无需频繁的去创建codec对象。

 

[java] view plain copy
 
  1. public class PooledStreamCompressor {  
  2.   
  3.     public static void main(String[] args) throws ClassNotFoundException {  
  4.           
  5.         String codecClassName = args[0];  
  6.         //获取压缩类的字节码用于反射  
  7.         Class<?> codecClass = Class.forName(codecClassName);  
  8.         //创建配置信息  
  9.         Configuration conf = new Configuration();  
  10.         //通过反射机制创建压缩类  
  11.         CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
  12.         //定义压缩类  
  13.         Compressor compressor = null;  
  14.           
  15.         try {  
  16.             //通过CodecPool创建compressor  
  17.             compressor = CodecPool.getCompressor(codec);  
  18.             //创建压缩类对象  
  19.             CompressionOutputStream out = codec.createOutputStream(System.out, compressor);  
  20.             //压缩  
  21.             IOUtils.copyBytes(System.in, out,4096, false);  
  22.             //完成  
  23.             out.finish();  
  24.         } catch (Exception e) {  
  25.             e.printStackTrace();  
  26.         } finally {  
  27.             CodecPool.returnCompressor(compressor);  
  28.         }  
  29.           
  30.     }  
  31. }  

代码比较容易理解,通过CodecPool的getCompressor()方法获得Compressor对象,该方法需要传入一个codec,然后Compressor对象在createOutputStream中使用,使用完毕后再通过returnCompressor()放回去。
输出结果如下:

 

 

[html] view plain copy
 
  1. [exec] 13/06/27 12:00:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
  2.     [exec] 13/06/27 12:00:06 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library  
  3.     [exec] 13/06/27 12:00:06 INFO compress.CodecPool: Got brand-new compressor  
  4.     [exec] Hello lastsweetop  



原文来自:http://blog.csdn.net/lastsweetop/article/details/9173061

 

代码来自:https://github.com/lastsweetop/styhadoop

以上是关于Hadoop压缩codec的主要内容,如果未能解决你的问题,请参考以下文章

HadoopHadoop IO之Compression和Codecs

大数据之Hadoop(MapReduce):压缩位置选择和压缩参数配置

如何在 hadoop mapreduce 中进行 lzo 压缩?

教程:给hadoop建立一个快速自定义压缩编解码器

大数据之Hadoop(MapReduce):Map输出端采用压缩

hive调优