如何在 Java 中合并两个输入流?

Posted

技术标签:

【中文标题】如何在 Java 中合并两个输入流?【英文标题】:How do you merge two input streams in Java? 【发布时间】:2010-10-20 02:26:11 【问题描述】:

在 Java 中有两个 InputStream,有没有一种方法可以合并它们,从而以一个 InputStream 结束,从而为您提供两个流的输出?怎么样?

【问题讨论】:

究竟以什么方式合并?在读取第一个流后无缝继续读取第二个流?我对 Java 不是很熟悉,但在 C# 中,您可以通过实现一个继承自 Stream 的类(包含对两个基本流的引用)然后重写 Read 方法来轻松做到这一点。 【参考方案1】:

正如评论,不清楚你所说的合并是什么意思。

InputStream.available 并不一定会给您一个有用的答案和阻止流的行为,从而使“随机”从任何一个中获取可用输入变得复杂。您需要两个线程从流中读取数据,然后通过 java.io.Piped(In|Out)putStream 传回数据(尽管这些类有问题)。或者,对于某些类型的流,可以使用不同的接口,例如 java.nio 非阻塞通道。

如果您想要第一个输入流的完整内容,然后是第二个:new java.io.SequenceInputStream(s1, s2)

【讨论】:

哦,太好了,我刚刚学到了一些新东西。 SequenceInputStream 本质上与我的 CatInputStream 相同,但使用传统的枚举而不是 LinkedList。 :-) 作为对答案第一部分的破解,在一般情况下很难解决,但对于 FileInputStream 的特定情况(也许还有套接字?),您可以使用 instanceof/cast 并创建一个通道其中。 (其他流可以使用 Channels.newChannel 创建一致的接口,但不具备所需的非阻塞特性。) Collections.enumeration 是你的朋友。我忘记了第一部分的一部分 - 将编辑。 实际上 FileChannel 不是 SelectableChannel,尽管 SocketChannel 是。所以我相信你有点被文件困住了。直到 JDK7 中的“更多 NIO 功能”(可能)。 NIO2 英尺;它还将具有异步 I/O,这将使实现这个合并通道业务变得更加有趣(任何输入通道上的回调都会导致对合并通道的回调)。 :-)【参考方案2】:

java.io.SequenceInputStream 可能是您需要的。它接受流的枚举,并会输出第一个流的内容,然后是第二个,依此类推,直到所有流都为空。

【讨论】:

【参考方案3】:

您可以编写一个自定义的InputStream 实现来执行此操作。示例:

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;

public class CatInputStream extends InputStream 
    private final Deque<InputStream> streams;

    public CatInputStream(InputStream... streams) 
        this.streams = new LinkedList<InputStream>();
        Collections.addAll(this.streams, streams);
    

    private void nextStream() throws IOException 
        streams.removeFirst().close();
    

    @Override
    public int read() throws IOException 
        int result = -1;
        while (!streams.isEmpty()
                && (result = streams.getFirst().read()) == -1) 
            nextStream();
        
        return result;
    

    @Override
    public int read(byte b[], int off, int len) throws IOException 
        int result = -1;
        while (!streams.isEmpty()
                && (result = streams.getFirst().read(b, off, len)) == -1) 
            nextStream();
        
        return result;
    

    @Override
    public long skip(long n) throws IOException 
        long skipped = 0L;
        while (skipped < n && !streams.isEmpty()) 
            int thisSkip = streams.getFirst().skip(n - skipped);
            if (thisSkip > 0)
                skipped += thisSkip;
            else
                nextStream();
        
        return skipped;
    

    @Override
    public int available() throws IOException 
        return streams.isEmpty() ? 0 : streams.getFirst().available();
    

    @Override
    public void close() throws IOException 
        while (!streams.isEmpty())
            nextStream();
    

此代码未经测试,因此您的里程可能会有所不同。

【讨论】:

这不是和 Merzbow 建议的 SequenceInputStream 做同样的事情吗? 抱歉,tackline 首先建议使用 SequenceInputStream(为此我为他 +1 了)。在 SO 中,最早的好答案获胜;你永远不知道后来的答案是不是抄袭。另外,请阅读我对 tackline 的评论,以比较 SequenceInputStream 和 CatInputStream (我确实同意他关于使用 Collections.enumeration 的观点)。 如果因为我上次的评论而否决了我的答案的人,我很抱歉;我应该更好地解释一下:如果我写的答案结果是之前发布的答案的重复(或子集),我通常会删除它,因为我知道我永远不会得到任何分数。在 SO 上,它确实是“西方最快的枪”(搜索那个问题标题)。【参考方案4】:

我想不到。您可能必须将两个流的内容读入一个 byte[],然后从中创建一个 ByteArrayInputStream。

【讨论】:

赞成一个简单、易于理解、可行的解决方案。如果阻塞很重要(或者它们很大),可能没有所需的行为。【参考方案5】:

这是一个特定于字节数组的 MVar 实现(确保添加您自己的包定义)。从这里开始,在合并流上编写输入流是微不足道的。如果需要,我也可以发布。

import java.nio.ByteBuffer;

public final class MVar 

  private static enum State 
    EMPTY, ONE, MANY
  

  private final Object lock;

  private State state;

  private byte b;

  private ByteBuffer bytes;
  private int length;

  public MVar() 
    lock = new Object();
    state = State.EMPTY;
  

  public final void put(byte b) 
    synchronized (lock) 
      while (state != State.EMPTY) 
        try 
          lock.wait();
         catch (InterruptedException e) 
      
      this.b = b;
      state = State.ONE;
      lock.notifyAll();
    
  

  public final void put(byte[] bytes, int offset, int length) 
    if (length == 0) 
      return;
    
    synchronized (lock) 
      while (state != State.EMPTY) 
        try 
          lock.wait();
         catch (InterruptedException e) 
      
      this.bytes = ByteBuffer.allocateDirect(length);
      this.bytes.put(bytes, offset, length);
      this.bytes.position(0);
      this.length = length;
      state = State.MANY;
      lock.notifyAll();
    
  

  public final byte take() 
    synchronized (lock) 
      while (state == State.EMPTY) 
        try 
          lock.wait();
         catch (InterruptedException e) 
      
      switch (state) 
      case ONE: 
        state = State.EMPTY;
        byte b = this.b;
        lock.notifyAll();
        return b;
      
      case MANY: 
        byte b = bytes.get();
        state = --length <= 0 ? State.EMPTY : State.MANY;
        lock.notifyAll();
        return b;
      
      default:
        throw new AssertionError();
      
    
  

  public final int take(byte[] bytes, int offset, int length) 
    if (length == 0) 
      return 0;
    
    synchronized (lock) 
      while (state == State.EMPTY) 
        try 
          lock.wait();
         catch (InterruptedException e) 
      
      switch (state) 
      case ONE:
        bytes[offset] = b;
        state = State.EMPTY;
        lock.notifyAll();
        return 1;
      case MANY:
        if (this.length > length) 
          this.bytes.get(bytes, offset, length);
          this.length = this.length - length;
          synchronized (lock) 
            lock.notifyAll();
          
          return length;
        
        this.bytes.get(bytes, offset, this.length);
        this.bytes = null;
        state = State.EMPTY;
        length = this.length;
        lock.notifyAll();
        return length;
      default:
        throw new AssertionError();
      
    
  

【讨论】:

以上是关于如何在 Java 中合并两个输入流?的主要内容,如果未能解决你的问题,请参考以下文章

如何合并输入和输出音频以发送另一个会议者

java 合并流(SequenceInputStream)

如何使用向量合并另一个或文件中的两个或多个流?

如何在流分析组中将多条记录与字符串和空值合并

Java IO操作:合并流

Java:合并输入流