使用 gzip 压缩 InputStream
Posted
技术标签:
【中文标题】使用 gzip 压缩 InputStream【英文标题】:Compress an InputStream with gzip 【发布时间】:2012-06-17 16:10:36 【问题描述】:我想在 java 中使用 Gzip 压缩来压缩输入流。
假设我们有一个未压缩的输入流(1GB 数据..)。因此,我想要来自源的压缩输入流:
public InputStream getCompressedStream(InputStream unCompressedStream)
// Not working because it's uncompressing the stream, I want the opposite.
return new GZIPInputStream(unCompressedStream);
【问题讨论】:
【参考方案1】:DeflaterInputStream 不是您想要的,因为它缺少 gzip 标头/预告片,并且使用的压缩方式略有不同。
如果您从 OutputStream(推)更改为 InputStream(拉),您需要做不同的事情。
GzipOutputStream 的作用是:
编写静态 gzip 标头 使用 DeflaterOutputStream 编写压缩流。写入流时,会根据未压缩的数据构建 CRC32 校验和,并计算字节数 编写一个包含 CRC32 校验和和字节数的尾部。如果你想对 InputStreams 做同样的事情,你需要一个包含以下内容的流:
标题 压缩的内容 预告片最好的方法是提供 3 个不同的流并将它们合并为一个。幸运的是,有 SequenceInputStream 可以为您组合流。
这是我的实现加上一个简单的单元测试:
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.zip.CRC32;
import java.util.zip.Deflater;
import java.util.zip.DeflaterInputStream;
import java.util.zip.DeflaterOutputStream;
/**
* @author mwyraz
* Wraps an input stream and compresses it's contents. Similiar to DeflateInputStream but adds GZIP-header and trailer
* See GzipOutputStream for details.
* LICENSE: Free to use. Contains some lines from GzipOutputStream, so oracle's license might apply as well!
*/
public class GzipCompressingInputStream extends SequenceInputStream
public GzipCompressingInputStream(InputStream in) throws IOException
this(in,512);
public GzipCompressingInputStream(InputStream in, int bufferSize) throws IOException
super(new StatefullGzipStreamEnumerator(in,bufferSize));
static enum StreamState
HEADER,
CONTENT,
TRAILER
protected static class StatefullGzipStreamEnumerator implements Enumeration<InputStream>
protected final InputStream in;
protected final int bufferSize;
protected StreamState state;
public StatefullGzipStreamEnumerator(InputStream in, int bufferSize)
this.in=in;
this.bufferSize=bufferSize;
state=StreamState.HEADER;
public boolean hasMoreElements()
return state!=null;
public InputStream nextElement()
switch (state)
case HEADER:
state=StreamState.CONTENT;
return createHeaderStream();
case CONTENT:
state=StreamState.TRAILER;
return createContentStream();
case TRAILER:
state=null;
return createTrailerStream();
return null;
static final int GZIP_MAGIC = 0x8b1f;
static final byte[] GZIP_HEADER=new byte[]
(byte) GZIP_MAGIC, // Magic number (short)
(byte)(GZIP_MAGIC >> 8), // Magic number (short)
Deflater.DEFLATED, // Compression method (CM)
0, // Flags (FLG)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Extra flags (XFLG)
0 // Operating system (OS)
;
protected InputStream createHeaderStream()
return new ByteArrayInputStream(GZIP_HEADER);
protected InternalGzipCompressingInputStream contentStream;
protected InputStream createContentStream()
contentStream=new InternalGzipCompressingInputStream(new CRC32InputStream(in), bufferSize);
return contentStream;
protected InputStream createTrailerStream()
return new ByteArrayInputStream(contentStream.createTrailer());
/**
* Internal stream without header/trailer
*/
protected static class CRC32InputStream extends FilterInputStream
protected CRC32 crc = new CRC32();
protected long byteCount;
public CRC32InputStream(InputStream in)
super(in);
@Override
public int read() throws IOException
int val=super.read();
if (val>=0)
crc.update(val);
byteCount++;
return val;
@Override
public int read(byte[] b, int off, int len) throws IOException
len=super.read(b, off, len);
if (len>=0)
crc.update(b,off,len);
byteCount+=len;
return len;
public long getCrcValue()
return crc.getValue();
public long getByteCount()
return byteCount;
/**
* Internal stream without header/trailer
*/
protected static class InternalGzipCompressingInputStream extends DeflaterInputStream
protected final CRC32InputStream crcIn;
public InternalGzipCompressingInputStream(CRC32InputStream in, int bufferSize)
super(in, new Deflater(Deflater.DEFAULT_COMPRESSION, true),bufferSize);
crcIn=in;
public void close() throws IOException
if (in != null)
try
def.end();
in.close();
finally
in = null;
protected final static int TRAILER_SIZE = 8;
public byte[] createTrailer()
byte[] trailer= new byte[TRAILER_SIZE];
writeTrailer(trailer, 0);
return trailer;
/*
* Writes GZIP member trailer to a byte array, starting at a given
* offset.
*/
private void writeTrailer(byte[] buf, int offset)
writeInt((int)crcIn.getCrcValue(), buf, offset); // CRC-32 of uncompr. data
writeInt((int)crcIn.getByteCount(), buf, offset + 4); // Number of uncompr. bytes
/*
* Writes integer in Intel byte order to a byte array, starting at a
* given offset.
*/
private void writeInt(int i, byte[] buf, int offset)
writeShort(i & 0xffff, buf, offset);
writeShort((i >> 16) & 0xffff, buf, offset + 2);
/*
* Writes short integer in Intel byte order to a byte array, starting
* at a given offset
*/
private void writeShort(int s, byte[] buf, int offset)
buf[offset] = (byte)(s & 0xff);
buf[offset + 1] = (byte)((s >> 8) & 0xff);
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.zip.CRC32;
import java.util.zip.GZIPInputStream;
import org.junit.Test;
public class TestGzipCompressingInputStream
@Test
public void test() throws Exception
testCompressor("test1 test2 test3");
testCompressor("1MB binary data",createTestPattern(1024*1024));
for (int i=0;i<4096;i++)
testCompressor(i+" bytes of binary data",createTestPattern(i));
protected byte[] createTestPattern(int size)
byte[] data=new byte[size];
byte pattern=0;
for (int i=0;i<size;i++)
data[i]=pattern++;
return data;
protected void testCompressor(String data) throws IOException
testCompressor("String: "+data,data.getBytes());
protected void testCompressor(String dataInfo, byte[] data) throws IOException
InputStream uncompressedIn=new ByteArrayInputStream(data);
InputStream compressedIn=new GzipCompressingInputStream(uncompressedIn);
InputStream uncompressedOut=new GZIPInputStream(compressedIn);
byte[] result=StreamHelper.readBinaryStream(uncompressedOut);
assertTrue("Test failed for: "+dataInfo,Arrays.equals(data,result));
【讨论】:
【参考方案2】:可以在流行的开源 ESB Mule:GZIPCompressorInputStream
中找到压缩输入流的工作示例。
它使用 JRE 提供的 DeflaterInputStream
进行压缩,预先添加 gzip 标头并附加 gzip 尾部(又名页脚)。
不幸的是,它在CPA License下,这似乎不是很常见。 另外,好像没有单元测试。
【讨论】:
Apache Commons Compress 现在有 GzipCompressorInputStream @user1585916:Apache commons 实现具有相同的名称,但是 它不压缩 - 来自 JavaDoc:Input stream that decompresses .gz files.
【参考方案3】:
如果您不想将内容加载到大字节数组中并需要真正的流式解决方案:
package x.y.z;
import org.apache.commons.io.IOUtils;
import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipOutputStream;
/**
* Stream Compression Utility
*
* @author Thamme Gowda N
*/
public enum CompressionUtil
INSTANCE;
public static final int NUM_THREADS = 5;
private final ExecutorService pool;
CompressionUtil()
this.pool = Executors.newFixedThreadPool(NUM_THREADS);
public static CompressionUtil getInstance()
return INSTANCE;
/**
* Supported compression type names
*/
public static enum CompressionType
GZIP,
ZIP
/**
* Wraps the given stream in a Compressor stream based on given type
* @param sourceStream : Stream to be wrapped
* @param type : Compression type
* @return source stream wrapped in a compressor stream
* @throws IOException when some thing bad happens
*/
public static OutputStream getCompressionWrapper(OutputStream sourceStream,
CompressionType type) throws IOException
switch (type)
case GZIP:
return new GZIPOutputStream(sourceStream);
case ZIP:
return new ZipOutputStream(sourceStream);
default:
throw new IllegalArgumentException("Possible values :"
+ Arrays.toString(CompressionType.values()));
/**
* Gets Compressed Stream for given input Stream
* @param sourceStream : Input Stream to be compressed to
* @param type: Compression types such as GZIP
* @return Compressed Stream
* @throws IOException when some thing bad happens
*/
public static InputStream getCompressedStream(final InputStream sourceStream,
CompressionType type ) throws IOException
if(sourceStream == null)
throw new IllegalArgumentException("Source Stream cannot be NULL");
/**
* sourceStream --> zipperOutStream(->intermediateStream -)--> resultStream
*/
final PipedInputStream resultStream = new PipedInputStream();
final PipedOutputStream intermediateStream = new PipedOutputStream(resultStream);
final OutputStream zipperOutStream = getCompressionWrapper(intermediateStream, type);
Runnable copyTask = new Runnable()
@Override
public void run()
try
int c;
while((c = sourceStream.read()) >= 0)
zipperOutStream.write(c);
zipperOutStream.flush();
catch (IOException e)
IOUtils.closeQuietly(resultStream); // close it on error case only
throw new RuntimeException(e);
finally
// close source stream and intermediate streams
IOUtils.closeQuietly(sourceStream);
IOUtils.closeQuietly(zipperOutStream);
IOUtils.closeQuietly(intermediateStream);
;
getInstance().pool.submit(copyTask);
return resultStream;
public static void main(String[] args) throws IOException
String input = "abcdefghij";
InputStream sourceStream = new ByteArrayInputStream(input.getBytes());
InputStream compressedStream =
getCompressedStream(sourceStream, CompressionType.GZIP);
GZIPInputStream decompressedStream = new GZIPInputStream(compressedStream);
List<String> lines = IOUtils.readLines(decompressedStream);
String output = lines.get(0);
System.out.println("test passed ? " + input.equals(output));
【讨论】:
【参考方案4】:这是我编写的一个版本,其中没有 CRC/GZIP Magic cookie,因为它委托给 GZIPOutputStream。它还具有内存效率,因为它只使用足够的内存来缓冲压缩(42MB 文件使用 45k 缓冲区)。性能与压缩到内存相同。
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
/**
* Compresses an InputStream in a memory-optimal, on-demand way only compressing enough to fill a buffer.
*
* @author Ben La Monica
*/
public class GZIPCompressingInputStream extends InputStream
private InputStream in;
private GZIPOutputStream gz;
private OutputStream delegate;
private byte[] buf = new byte[8192];
private byte[] readBuf = new byte[8192];
int read = 0;
int write = 0;
public GZIPCompressingInputStream(InputStream in) throws IOException
this.in = in;
this.delegate = new OutputStream()
private void growBufferIfNeeded(int len)
if ((write + len) >= buf.length)
// grow the array if we don't have enough space to fulfill the incoming data
byte[] newbuf = new byte[(buf.length + len) * 2];
System.arraycopy(buf, 0, newbuf, 0, buf.length);
buf = newbuf;
@Override
public void write(byte[] b, int off, int len) throws IOException
growBufferIfNeeded(len);
System.arraycopy(b, off, buf, write, len);
write += len;
@Override
public void write(int b) throws IOException
growBufferIfNeeded(1);
buf[write++] = (byte) b;
;
this.gz = new GZIPOutputStream(delegate);
@Override
public int read(byte[] b, int off, int len) throws IOException
compressStream();
int numBytes = Math.min(len, write-read);
if (numBytes > 0)
System.arraycopy(buf, read, b, off, numBytes);
read += numBytes;
else if (len > 0)
// if bytes were requested, but we have none, then we're at the end of the stream
return -1;
return numBytes;
private void compressStream() throws IOException
// if the reader has caught up with the writer, then zero the positions out
if (read == write)
read = 0;
write = 0;
while (write == 0)
// feed the gzip stream data until it spits out a block
int val = in.read(readBuf);
if (val == -1)
// nothing left to do, we've hit the end of the stream. finalize and break out
gz.close();
break;
else if (val > 0)
gz.write(readBuf, 0, val);
@Override
public int read() throws IOException
compressStream();
if (write == 0)
// write should not be 0 if we were able to get data from compress stream, must mean we're at the end
return -1;
else
// reading a single byte
return buf[read++] & 0xFF;
【讨论】:
似乎有效,谢谢!我发现将 close 方法转发到委托流很有用:@Override public void close() throws IOException in.close();
【参考方案5】:
看来我迟到了 3 年,但也许对某人有用。
我的解决方案类似于@Michael Wyraz 的解决方案,唯一不同的是我的解决方案是基于FilterInputStream
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.CRC32;
import java.util.zip.Deflater;
public class GZipInputStreamDeflater extends FilterInputStream
private static enum Stage
HEADER,
DATA,
FINALIZATION,
TRAILER,
FINISH
private GZipInputStreamDeflater.Stage stage = Stage.HEADER;
private final Deflater deflater = new Deflater( Deflater.DEFLATED, true );
private final CRC32 crc = new CRC32();
/* GZIP header magic number */
private final static int GZIP_MAGIC = 0x8b1f;
private ByteArrayInputStream trailer = null;
private ByteArrayInputStream header = new ByteArrayInputStream( new byte[]
(byte) GZIP_MAGIC, // Magic number (short)
(byte) ( GZIP_MAGIC >> 8 ), // Magic number (short)
Deflater.DEFLATED, // Compression method (CM)
0, // Flags (FLG)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Modification time MTIME (int)
0, // Extra flags (XFLG)
0, // Operating system (OS)
);
public GZipInputStreamDeflater(InputStream in)
super( in );
crc.reset();
@Override
public int read( byte[] b, int off, int len ) throws IOException
int read = -1;
switch( stage )
case FINISH:
return -1;
case HEADER:
read = header.read( b, off, len );
if( header.available() == 0 )
stage = Stage.DATA;
return read;
case DATA:
byte[] b2 = new byte[len];
read = super.read( b2, 0, len );
if( read <= 0 )
stage = Stage.FINALIZATION;
deflater.finish();
return 0;
else
deflater.setInput( b2, 0, read );
crc.update( b2, 0, read );
read = 0;
while( !deflater.needsInput() && len - read > 0 )
read += deflater.deflate( b, off + read, len - read, Deflater.NO_FLUSH );
return read;
case FINALIZATION:
if( deflater.finished() )
stage = Stage.TRAILER;
int crcVaue = (int) crc.getValue();
int totalIn = deflater.getTotalIn();
trailer = new ByteArrayInputStream( new byte[]
(byte) ( crcVaue >> 0 ),
(byte) ( crcVaue >> 8 ),
(byte) ( crcVaue >> 16 ),
(byte) ( crcVaue >> 24 ),
(byte) ( totalIn >> 0 ),
(byte) ( totalIn >> 8 ),
(byte) ( totalIn >> 16 ),
(byte) ( totalIn >> 24 ),
);
return 0;
else
read = deflater.deflate( b, off, len, Deflater.FULL_FLUSH );
return read;
case TRAILER:
read = trailer.read( b, off, len );
if( trailer.available() == 0 )
stage = Stage.FINISH;
return read;
return -1;
@Override
public void close( ) throws IOException
super.close();
deflater.end();
if( trailer != null )
trailer.close();
header.close();
用法:
AmazonS3Client s3client = new AmazonS3Client( ... );
try ( InputStream in = new GZipInputStreamDeflater( new URL( "http://....../very-big-file.csv" ).openStream() ); )
PutObjectRequest putRequest = new PutObjectRequest( "BUCKET-NAME", "/object/key", in, new ObjectMetadata() );
s3client.putObject( putRequest );
【讨论】:
Nicolai 您是否考虑过将其作为图书馆发布? 谢谢你,@JoshLemer。我没想过把它作为图书馆出版。你认为它会是一个有用的图书馆吗?它只包含一个类,对于库来说非常小,不是吗? 请记住,不向 S3 提供Content-Length
元数据作为 PutObjectRequest
的一部分,这将导致 S3 SDK 在内部缓冲整个流以发现将导致 OutOfMemoryError 的长度大型上传的情况,因为您可能会超出内存限制。使用InitiateMultipartUploadRequest 然后UploadPartRequest 最后CompleteMultipartUploadRequest 启动分段上传会更安全。
恰如其分地晚了三年,我担心这里面有一个错误。在DATA
状态下,deflater
可能有更多数据要返回,读取缓冲区可以容纳。但是,下一次读取将覆盖第一个 defalter.setInput
上的数据。修复是确保我们仅在 deflater 耗尽时设置更多输入数据。如果有兴趣,可以在 http4k 中找到带有此修复的 Kotlin 变体。【参考方案6】:
PipedOutputStream 允许您写入 GZIPOutputStream 并通过 InputStream 公开该数据。与将整个数据流缓冲到数组或文件的其他解决方案不同,它具有固定的内存成本。唯一的问题是你不能从同一个线程读取和写入,你必须使用一个单独的。
private InputStream gzipInputStream(InputStream in) throws IOException
PipedInputStream zipped = new PipedInputStream();
PipedOutputStream pipe = new PipedOutputStream(zipped);
new Thread(
() ->
try(OutputStream zipper = new GZIPOutputStream(pipe))
IOUtils.copy(in, zipper);
catch (IOException e)
e.printStackTrace();
).start();
return zipped;
【讨论】:
一个不错的解决方案。唯一让我头疼的是如何将潜在的IOException
从线程中取出,使其不被忽略。
我想那时我会联系EasyStream's OutputStreamToInputStream,正如@smartwjw 在他的回答中所建议的那样。它本质上是在后台执行此操作,但文档说它通过getResult() 公开任何异常
作为评论,已经尝试过但它根本不起作用,不要尝试将 PipedInputStream
和 PipedOutputStream
声明放在 try-with-resources 声明中,例如try (InputStream is = ... ; //whatever to open the input stream PipedInputStream zipped = new PipedInputStream(); PipedOutputStream pipe = new PipedOutputStream(zipped);) new Thread(....) //as above
- 失败了!如果你想这样做,把上面in
的开头放在try-with-resources 声明中,其余的放在try 块中【参考方案7】:
JRE 中没有DeflatingGZIPInputStream
。要使用“deflate”压缩格式进行放气,请使用java.util.zip.DeflaterInputStream
和java.util.zip.DeflaterOutputStream
:
public InputStream getCompressedStream(InputStream unCompressedStream)
return new DeflaterInputStream(unCompressedStream);
您可以通过查看java.util.zip.GZIPOutputStream
的来源从java.util.zip.DeflaterInputStream
派生一个以GZIP 格式放气的类。
【讨论】:
【参考方案8】:要压缩数据,您需要GZIPOutputStream
。但是由于您需要像从 InputStream 中读取数据一样,您需要将 OutputStream 转换为 InputStream。您可以使用 getBytes() 这样做:
GZIPOutputStream gout = new GZIPOutputStream(out);
//... Code to read from your original uncompressed data and write to out.
//Convert to InputStream.
new ByteArrayInputStream(gout.getBytes());
但这种方法有一个限制,即您需要先读取所有数据 - 这意味着您必须有足够的内存来保存该缓冲区。
此线程中提到了使用管道的替代方法 - How to convert OutputStream to InputStream?
【讨论】:
@Fabien - 阅读下面的评论 - 如果您确实有 1TB 数据要从输入流中读取 - 不要使用上述方法!当然,除非你有 1TB 的内存可用。使用管道方法。 没有“好的”标准方法可以让 OutputStream 成为 InputStream。只有两种方法可以做到这一点。要么在某处缓存整个 OutputSTream,要么使用线程。两者都有其缺点。如果可以,请使用 InputStream。请参阅下面的帖子。 @kjp - GZIPOutputStream 没有默认构造函数。 请更新答案,不确定2012年有效,但我知道2016年无效。【参考方案9】:在这种情况下你不应该看GZIPOutputStream
吗?
public OutputStream getCompressedStream(InputStream input)
OutputStream output = new GZIPOutputStream(new ByteArrayOutputStream());
IOUtils.copy(input, output);
return output;
【讨论】:
GZIPOutputStream 返回一个 OutputStream。但我想要一个 InputStream。 您不能从输入创建输入。您只能创建一个输出。【参考方案10】:您可以使用EasyStream。
try(final InputStreamFromOutputStream<Void> isOs = new InputStreamFromOutputStream<Void>()
@Override
protected void produce(final OutputStream dataSink) throws Exception
InputStream in = new GZIPInputStream(unCompressedStream);
IOUtils.copy(in, dataSink);
)
//You can use the compressed input stream here
catch (final IOException e)
//Handle exceptions here
【讨论】:
【参考方案11】:public InputStream getCompressed( InputStream is ) throws IOException
byte data[] = new byte[2048];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GzipOutputStream zos = new GzipOutputStream( bos );
BufferedInputStream entryStream = new BufferedInputStream( is, 2048);
int count;
while ( ( count = entryStream.read( data, 0, 2048) ) != -1 )
zos.write( data, 0, count );
entryStream.close();
zos.close();
return new ByteArrayInputStream( bos.toByteArray() );
参考:zip compression
【讨论】:
【参考方案12】:我建议使用Apache Commons Compress 中的GzipCompressorInputStream。
【讨论】:
GzipCompressorInputStream 像 JRE GZIPInputStream 那样解压缩。以上是关于使用 gzip 压缩 InputStream的主要内容,如果未能解决你的问题,请参考以下文章