缓冲的后台 InputStream 实现

Posted

技术标签:

【中文标题】缓冲的后台 InputStream 实现【英文标题】:Buffered Background InputStream Implementations 【发布时间】:2011-01-10 08:49:14 【问题描述】:

我编写了后台InputStream(和OutputStream)实现来包装其他流,并在后台线程上预读,主要允许解压缩/压缩在处理解压缩流的不同线程中发生。

这是一个相当标准的生产者/消费者模型。

这似乎是一种简单的方法,可以通过读取、处理和写入数据的简单进程充分利用多核 CPU,从而更有效地利用 CPU 和磁盘资源。也许“高效”不是最好的词,但与直接从ZipInputStream 读取和直接写入ZipOutputStream 相比,它提供了更高的利用率,而且我更感兴趣的是减少了运行时间。

我很高兴发布代码,但我的问题是,我是否正在重新发明现有(以及更频繁使用的)库中现成的东西?

编辑 - 发布代码...

BackgroundInputStream 的代码如下(BackgroundOutputStream 非常相似),但我想改进它的某些方面。

    看来我工作太辛苦了,无法来回传递缓冲区。 如果调用代码丢弃对BackgroundInputStream 的引用,backgroundReaderThread 将永远存在。 信令eof 需要改进。 应将异常传播到前台线程。 我想允许使用来自提供的Executor 的线程。 close() 方法应向后台线程发出信号,并且不应关闭包装流,因为包装流应归从其读取的后台线程所有。 关门后看书等傻事应该得到适当的照顾。

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream 
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) 
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) 
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    

    @Override
    public int read() throws IOException 
        if (bufferQueue == null) 
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        
        if (currentBuffer == null) 
            try 
                if ((!eof) || (bufferQueue.size() > 0)) 
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                 else 
                    return -1;
                
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) 
            freeBuffer = currentBuffer;
            currentBuffer = null;
        
        return b;
    

    @Override
    public int available() throws IOException 
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    

    @Override
    public void close() throws IOException 
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    

    class BackgroundReader implements Runnable 

        @Override
        public void run() 
            try 
                while (!eof) 
                    byte[] newBuffer;
                    if (freeBuffer != null) 
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                     else 
                        newBuffer = new byte[bufferSize];
                    
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) 
                        writtenToBuffer += bytesRead;
                    
                    if (writtenToBuffer > 0) 
                        if (writtenToBuffer < bufferSize) 
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        
                        bufferQueue.put(newBuffer);
                    
                    if (bytesRead == -1) 
                        eof = true;
                    
                
             catch (IOException e) 
                e.printStackTrace();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        

    

【问题讨论】:

您找到答案了吗?这在任何现有的库中都可用吗? @adi 我认为单线程解决方案的简单性意味着通过在单个线程中一次处理多个文件来使用多个内核是最安全的,而不是使用多个线程来加速处理的单个文件。如果您只有一个文件要处理,您可能可以忍受等待在单个线程中处理它所需的额外时间。 【参考方案1】:

听起来很有趣。我从来没有遇到过开箱即用的任何东西,但如果可用的话,尝试使用空闲核心进行压缩是非常有意义的。

也许你可以使用Commons I/O - 这是一个经过良好测试的库,可以帮助处理一些更无聊的东西,让你专注于扩展很酷的并行部分。也许你甚至可以将你的代码贡献给 Commons 项目 ;-)

【讨论】:

【参考方案2】:

我会感兴趣的。我已经考虑过一个类似的项目,但无法弄清楚如何处理无序完成压缩的部分。

【讨论】:

我没有分解解压缩或压缩过程,我只是在额外的单线程上执行它们。

以上是关于缓冲的后台 InputStream 实现的主要内容,如果未能解决你的问题,请参考以下文章

前端实现input[type='file']上传图片预览效果

我应该缓冲 InputStream 还是 InputStreamReader?

廖雪峰Java6 IO编程-2input和output-4Filter模式

阅读 HttpURLConnection InputStream - 手动缓冲区还是 BufferedInputStream?

安卓 | Kotlin:应用程序从 InputStream 读取到缓冲区问题

bufferedinputstream FileInputStream inputstream的比较