Java:合并输入流
Posted
技术标签:
【中文标题】Java:合并输入流【英文标题】:Java: merging InputStreams 【发布时间】:2011-04-25 06:38:43 【问题描述】:我的目标是创建(或使用现有的)一个 InputStream 实现(例如 MergeInputStream),它将尝试从多个 InputStream 中读取并返回第一个结果。之后,它将释放锁定并停止从所有 InputStreams 读取,直到下一次 mergeInputStream.read() 调用。我很惊讶我没有找到任何这样的工具。问题是:所有源 InputStreams 都不是很有限(不是文件,例如,而是 System.in、socket 等),所以我不能使用 SequenceInputReader。我知道这可能需要一些多线程机制,但我完全不知道该怎么做。我尝试谷歌搜索但没有结果。
【问题讨论】:
您将遇到的问题是没有标准方法可以知道读取应该在哪里结束。即,您不能保证获得与写入相同的读取大小(0(这是一种常见的误解)因此,您必须非常小心,以免您的数据变成一团糟。 【参考方案1】:最好使用SelectableChannel
和Selector
解决从多个源读取输入并将它们序列化为一个流的问题。然而,这要求所有源都能够提供可选择的频道。这可能是也可能不是。
如果可选通道不可用,您可以选择使用单个线程解决它,让读取实现执行以下操作:对于每个输入流is
,检查is.available() > 0
,如果是,则返回is.read()
。重复此过程,直到某些输入流有可用数据。
但是,这种方法有两个主要缺点:
NotallimplementationsofInputStream
以某种方式实现available()
,当且仅当read()
将阻塞时它才返回0。结果自然是无法从此流中读取数据,即使is.read()
会返回一个值。这是否被认为是一个错误是值得怀疑的,因为文档只是声明它应该返回一个可用字节数的“估计”。
它使用所谓的“忙循环”,这基本上意味着您需要在循环中设置睡眠(这会导致读取延迟)或不必要地占用 CPU。
您的第三种选择是通过为每个输入流生成一个线程来处理阻塞读取。但是,如果要读取的输入流数量非常多,这将需要仔细同步,并且可能需要一些开销。下面的代码是第一次尝试解决它。我不确定它是否充分同步,或者它是否以最佳方式管理线程。
import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MergedInputStream extends InputStream
AtomicInteger openStreamCount;
BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
InputStream[] sources;
public MergedInputStream(InputStream... sources)
this.sources = sources;
openStreamCount = new AtomicInteger(sources.length);
for (int i = 0; i < sources.length; i++)
new ReadThread(i).start();
public void close() throws IOException
String ex = "";
for (InputStream is : sources)
try
is.close();
catch (IOException e)
ex += e.getMessage() + " ";
if (ex.length() > 0)
throw new IOException(ex.substring(0, ex.length() - 1));
public int read() throws IOException
if (openStreamCount.get() == 0)
return -1;
try
return buf.take();
catch (InterruptedException e)
throw new IOException(e);
private class ReadThread extends Thread
private final int src;
public ReadThread(int src)
this.src = src;
public void run()
try
int data;
while ((data = sources[src].read()) != -1)
buf.put(data);
catch (IOException ioex)
catch (InterruptedException e)
openStreamCount.decrementAndGet();
【讨论】:
您所链接的论坛中的 cmets 绝不是权威参考。【参考方案2】:我可以想到三种方法来做到这一点:
使用非阻塞 I/O (API documentation)。这是最干净的解决方案。 多个线程,一个用于每个合并的输入流。线程将阻塞相关输入流的read()
方法,然后在数据可用时通知MergeInputStream
对象。 MergedInputStream
中的 read()
方法将等待此通知,然后从相应的流中读取数据。
具有繁忙循环的单线程。您的 MergeInputStream.read()
方法需要循环检查每个合并输入流的 available()
方法。如果没有可用数据,请休眠几毫秒。重复,直到数据在合并的输入流之一中可用。
【讨论】:
多线程解决方案会因为同步而导致很多麻烦(至少对我而言)。available
不能保证返回非零值。因此,使用 SelectableChannels 是唯一明智的选择恕我直言。
available()
不保证返回可以在不阻塞的情况下读取的总字节数,但如果有任何可用数据(即如果调用 read()
将不阻塞)。如果有可用数据,许多InputStream
实现将返回1
,否则返回0
。
单线程方案可以正常工作。我将研究 NIO,并将尝试在未来的任务中实现它。谢谢你们。
@Frozen Spider...当然,在某些情况下它会正常工作。在其他情况下,它将惨遭失败。参见例如SSLSocket.available() == 0 always。我的意思是,没有任何保证,而且每当您的程序 I/O 操作看起来不稳定时,您将不得不怀疑并调查错误是否出在此代码中。
所以,我别无选择,只能重写整个模块......谢谢你的链接。以上是关于Java:合并输入流的主要内容,如果未能解决你的问题,请参考以下文章