如何在 Java 中合并两个输入流?
Posted
技术标签:
【中文标题】如何在 Java 中合并两个输入流?【英文标题】:How do you merge two input streams in Java? 【发布时间】:2010-10-20 02:26:11 【问题描述】:在 Java 中有两个 InputStream,有没有一种方法可以合并它们,从而以一个 InputStream 结束,从而为您提供两个流的输出?怎么样?
【问题讨论】:
究竟以什么方式合并?在读取第一个流后无缝继续读取第二个流?我对 Java 不是很熟悉,但在 C# 中,您可以通过实现一个继承自 Stream 的类(包含对两个基本流的引用)然后重写 Read 方法来轻松做到这一点。 【参考方案1】:正如评论,不清楚你所说的合并是什么意思。
InputStream.available
并不一定会给您一个有用的答案和阻止流的行为,从而使“随机”从任何一个中获取可用输入变得复杂。您需要两个线程从流中读取数据,然后通过 java.io.Piped(In|Out)putStream
传回数据(尽管这些类有问题)。或者,对于某些类型的流,可以使用不同的接口,例如 java.nio
非阻塞通道。
如果您想要第一个输入流的完整内容,然后是第二个:new java.io.SequenceInputStream(s1, s2)
。
【讨论】:
哦,太好了,我刚刚学到了一些新东西。 SequenceInputStream 本质上与我的 CatInputStream 相同,但使用传统的枚举而不是 LinkedList。 :-) 作为对答案第一部分的破解,在一般情况下很难解决,但对于 FileInputStream 的特定情况(也许还有套接字?),您可以使用 instanceof/cast 并创建一个通道其中。 (其他流可以使用 Channels.newChannel 创建一致的接口,但不具备所需的非阻塞特性。) Collections.enumeration 是你的朋友。我忘记了第一部分的一部分 - 将编辑。 实际上 FileChannel 不是 SelectableChannel,尽管 SocketChannel 是。所以我相信你有点被文件困住了。直到 JDK7 中的“更多 NIO 功能”(可能)。 NIO2 英尺;它还将具有异步 I/O,这将使实现这个合并通道业务变得更加有趣(任何输入通道上的回调都会导致对合并通道的回调)。 :-)【参考方案2】:java.io.SequenceInputStream
可能是您需要的。它接受流的枚举,并会输出第一个流的内容,然后是第二个,依此类推,直到所有流都为空。
【讨论】:
【参考方案3】:您可以编写一个自定义的InputStream
实现来执行此操作。示例:
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
public class CatInputStream extends InputStream
private final Deque<InputStream> streams;
public CatInputStream(InputStream... streams)
this.streams = new LinkedList<InputStream>();
Collections.addAll(this.streams, streams);
private void nextStream() throws IOException
streams.removeFirst().close();
@Override
public int read() throws IOException
int result = -1;
while (!streams.isEmpty()
&& (result = streams.getFirst().read()) == -1)
nextStream();
return result;
@Override
public int read(byte b[], int off, int len) throws IOException
int result = -1;
while (!streams.isEmpty()
&& (result = streams.getFirst().read(b, off, len)) == -1)
nextStream();
return result;
@Override
public long skip(long n) throws IOException
long skipped = 0L;
while (skipped < n && !streams.isEmpty())
int thisSkip = streams.getFirst().skip(n - skipped);
if (thisSkip > 0)
skipped += thisSkip;
else
nextStream();
return skipped;
@Override
public int available() throws IOException
return streams.isEmpty() ? 0 : streams.getFirst().available();
@Override
public void close() throws IOException
while (!streams.isEmpty())
nextStream();
此代码未经测试,因此您的里程可能会有所不同。
【讨论】:
这不是和 Merzbow 建议的 SequenceInputStream 做同样的事情吗? 抱歉,tackline 首先建议使用 SequenceInputStream(为此我为他 +1 了)。在 SO 中,最早的好答案获胜;你永远不知道后来的答案是不是抄袭。另外,请阅读我对 tackline 的评论,以比较 SequenceInputStream 和 CatInputStream (我确实同意他关于使用 Collections.enumeration 的观点)。 如果因为我上次的评论而否决了我的答案的人,我很抱歉;我应该更好地解释一下:如果我写的答案结果是之前发布的答案的重复(或子集),我通常会删除它,因为我知道我永远不会得到任何分数。在 SO 上,它确实是“西方最快的枪”(搜索那个问题标题)。【参考方案4】:我想不到。您可能必须将两个流的内容读入一个 byte[],然后从中创建一个 ByteArrayInputStream。
【讨论】:
赞成一个简单、易于理解、可行的解决方案。如果阻塞很重要(或者它们很大),可能没有所需的行为。【参考方案5】:这是一个特定于字节数组的 MVar 实现(确保添加您自己的包定义)。从这里开始,在合并流上编写输入流是微不足道的。如果需要,我也可以发布。
import java.nio.ByteBuffer;
public final class MVar
private static enum State
EMPTY, ONE, MANY
private final Object lock;
private State state;
private byte b;
private ByteBuffer bytes;
private int length;
public MVar()
lock = new Object();
state = State.EMPTY;
public final void put(byte b)
synchronized (lock)
while (state != State.EMPTY)
try
lock.wait();
catch (InterruptedException e)
this.b = b;
state = State.ONE;
lock.notifyAll();
public final void put(byte[] bytes, int offset, int length)
if (length == 0)
return;
synchronized (lock)
while (state != State.EMPTY)
try
lock.wait();
catch (InterruptedException e)
this.bytes = ByteBuffer.allocateDirect(length);
this.bytes.put(bytes, offset, length);
this.bytes.position(0);
this.length = length;
state = State.MANY;
lock.notifyAll();
public final byte take()
synchronized (lock)
while (state == State.EMPTY)
try
lock.wait();
catch (InterruptedException e)
switch (state)
case ONE:
state = State.EMPTY;
byte b = this.b;
lock.notifyAll();
return b;
case MANY:
byte b = bytes.get();
state = --length <= 0 ? State.EMPTY : State.MANY;
lock.notifyAll();
return b;
default:
throw new AssertionError();
public final int take(byte[] bytes, int offset, int length)
if (length == 0)
return 0;
synchronized (lock)
while (state == State.EMPTY)
try
lock.wait();
catch (InterruptedException e)
switch (state)
case ONE:
bytes[offset] = b;
state = State.EMPTY;
lock.notifyAll();
return 1;
case MANY:
if (this.length > length)
this.bytes.get(bytes, offset, length);
this.length = this.length - length;
synchronized (lock)
lock.notifyAll();
return length;
this.bytes.get(bytes, offset, this.length);
this.bytes = null;
state = State.EMPTY;
length = this.length;
lock.notifyAll();
return length;
default:
throw new AssertionError();
【讨论】:
以上是关于如何在 Java 中合并两个输入流?的主要内容,如果未能解决你的问题,请参考以下文章