Java:合并输入流

Posted

技术标签:

【中文标题】Java:合并输入流【英文标题】:Java: merging InputStreams 【发布时间】:2011-04-25 06:38:43 【问题描述】:

我的目标是创建(或使用现有的)一个 InputStream 实现(例如 MergeInputStream),它将尝试从多个 InputStream 中读取并返回第一个结果。之后,它将释放锁定并停止从所有 InputStreams 读取,直到下一次 mergeInputStream.read() 调用。我很惊讶我没有找到任何这样的工具。问题是:所有源 InputStreams 都不是很有限(不是文件,例如,而是 System.in、socket 等),所以我不能使用 SequenceInputReader。我知道这可能需要一些多线程机制,但我完全不知道该怎么做。我尝试谷歌搜索但没有结果。

【问题讨论】:

您将遇到的问题是没有标准方法可以知道读取应该在哪里结束。即,您不能保证获得与写入相同的读取大小(0(这是一种常见的误解)因此,您必须非常小心,以免您的数据变成一团糟。 【参考方案1】:

最好使用SelectableChannelSelector解决从多个源读取输入并将它们序列化为一个流的问题。然而,这要求所有源都能够提供可选择的频道。这可能是也可能不是。

如果可选通道不可用,您可以选择使用单个线程解决它,让读取实现执行以下操作:对于每个输入流is,检查is.available() > 0 ,如果是,则返回is.read()。重复此过程,直到某些输入流有可用数据。

但是,这种方法有两个主要缺点:

    NotallimplementationsofInputStream 以某种方式实现available(),当且仅当read() 将阻塞时它才返回0。结果自然是无法从此流中读取数据,即使is.read() 会返回一个值。这是否被认为是一个错误是值得怀疑的,因为文档只是声明它应该返回一个可用字节数的“估计”。

    它使用所谓的“忙循环”,这基本上意味着您需要在循环中设置睡眠(这会导致读取延迟)或不必要地占用 CPU。

您的第三种选择是通过为每个输入流生成一个线程来处理阻塞读取。但是,如果要读取的输入流数量非常多,这将需要仔细同步,并且可能需要一些开销。下面的代码是第一次尝试解决它。我不确定它是否充分同步,或者它是否以最佳方式管理线程。

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream 

    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;

    public MergedInputStream(InputStream... sources) 
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    


    public void close() throws IOException 
        String ex = "";
        for (InputStream is : sources) 
            try 
                is.close();
             catch (IOException e) 
                ex += e.getMessage() + " ";
            
        
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    


    public int read() throws IOException 
        if (openStreamCount.get() == 0)
            return -1;

        try 
            return buf.take();
         catch (InterruptedException e) 
            throw new IOException(e);
        
    


    private class ReadThread extends Thread 

        private final int src;
        public ReadThread(int src) 
            this.src = src;
        

        public void run() 
            try 
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
             catch (IOException ioex) 
             catch (InterruptedException e) 
            
            openStreamCount.decrementAndGet();
        
    

【讨论】:

您所链接的论坛中的 cmets 绝不是权威参考。【参考方案2】:

我可以想到三种方法来做到这一点:

使用非阻塞 I/O (API documentation)。这是最干净的解决方案。 多个线程,一个用于每个合并的输入流。线程将阻塞相关输入流的read() 方法,然后在数据可用时通知MergeInputStream 对象。 MergedInputStream 中的 read() 方法将等待此通知,然后从相应的流中读取数据。 具有繁忙循环的单线程。您的 MergeInputStream.read() 方法需要循环检查每个合并输入流的 available() 方法。如果没有可用数据,请休眠几毫秒。重复,直到数据在合并的输入流之一中可用。

【讨论】:

多线程解决方案会因为同步而导致很多麻烦(至少对我而言)。 available 不能保证返回非零值。因此,使用 SelectableChannels 是唯一明智的选择恕我直言。 available() 不保证返回可以在不阻塞的情况下读取的总字节数,但如果有任何可用数据(即如果调用 read() 将不阻塞)。如果有可用数据,许多InputStream 实现将返回1,否则返回0 单线程方案可以正常工作。我将研究 NIO,并将尝试在未来的任务中实现它。谢谢你们。 @Frozen Spider...当然,在某些情况下它会正常工作。在其他情况下,它将惨遭失败。参见例如SSLSocket.available() == 0 always。我的意思是,没有任何保证,而且每当您的程序 I/O 操作看起来不稳定时,您将不得不怀疑并调查错误是否出在此代码中。 所以,我别无选择,只能重写整个模块......谢谢你的链接。

以上是关于Java:合并输入流的主要内容,如果未能解决你的问题,请参考以下文章

Java:合并输入流

IO流 合并流

HOW2J Java 文件输入输出流,合并与拆分

Java核心类库-IO-合并流

java 合并流(SequenceInputStream)

IO流:System.inSequenceInputStream合并流内存输入输出流数据流