Java 中 InputStream 的多个阅读器

Posted

技术标签:

【中文标题】Java 中 InputStream 的多个阅读器【英文标题】:Multiple readers for InputStream in Java 【发布时间】:2011-06-29 09:23:14 【问题描述】:

我有一个从中读取字符的 InputStream。我希望多个读者访问这个 InputStream。似乎实现这一点的合理方法是将传入数据写入 StringBuffer 或 StringBuilder,并让多个阅读器读取它。不幸的是,不推荐使用 StringBufferInputStream。 StringReader 读取一个字符串,而不是一个不断更新的可变对象。我有哪些选择?自己写?

【问题讨论】:

我认为你应该问问自己为什么要这样做。我有一个这样的模型,但是单个读取器读取数据并将该数据传递到它需要去的地方。 我可以做到,当然。 InputStream 来自一个进程。我想将进程的输出记录到文件中,并将其显示在 GUI 的控制台上。我可以读取输出,并将文本发送到日志和 GUI,但如果有缓存的输入流或其他东西会更容易。 @PeterLawrey:我看到了一个场景——不同的读者是独立的。示例:包装控制台/终端。一位阅读器始终将要显示的内容传递给用户。另一个阅读器通过解析响应来对命令做出反应,并最终根据相同的输入向用户提供不同的反馈。两个阅读器是完全独立的。 【参考方案1】:

注意:My other answer 更通用(我认为更好)。

正如@dimo414 所指出的,以下答案要求第一个读者始终领先于第二个读者。如果您确实是这种情况,那么这个答案可能仍然更可取,因为它建立在标准类之上。


要创建两个从同一来源独立读取的读取器,您必须确保它们不会使用来自同一流的数据。

这可以通过组合来自 Apache Commons 的 TeeInputStreamPipedInputStreamPipedOutputStream 来实现,如下所示:

import java.io.*;
import org.apache.commons.io.input.TeeInputStream;
class Test 
    public static void main(String[] args) throws IOException 

        // Create the source input stream.
        InputStream is = new FileInputStream("filename.txt");

        // Create a piped input stream for one of the readers.
        PipedInputStream in = new PipedInputStream();

        // Create a tee-splitter for the other reader.
        TeeInputStream tee = new TeeInputStream(is, new PipedOutputStream(in));

        // Create the two buffered readers.
        BufferedReader br1 = new BufferedReader(new InputStreamReader(tee));
        BufferedReader br2 = new BufferedReader(new InputStreamReader(in));

        // Do some interleaved reads from them.
        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();

        System.out.println("Two lines from br2:");
        System.out.println(br2.readLine());
        System.out.println(br2.readLine());
        System.out.println();

        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();
    

输出:

One line from br1:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start

Two lines from br2:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start
Line2: consectetur adipisicing elit,

One line from br1:
Line2: consectetur adipisicing elit,    <-- resumes on line 2

【讨论】:

请注意复制粘贴此代码。这在任何情况下都不起作用。例如,如果 InputStream 来自ClassLoader.getSystemResourceAsStream("myResource"),它将被卡住。 这种模式可能会导致您的程序意外阻塞。 br2 只能读取过去的br1,因为br1 的缓冲区足够长,可以容纳两条线。如果你注释掉第一个br1.readLine(),程序就会挂起。 @dimo414,感谢您收看这个。在顶部添加了一个大disramer,并写了一个正确的答案here。欢迎对我的新答案提供反馈。【参考方案2】:

您可能已经注意到,一旦您从输入流中读取了一个字节,它就会永远消失(除非您自己将它保存在某个地方)。

下面的解决方案会保存字节,直到所有订阅输入流都读取它。

它的工作原理如下:

// Create a SplittableInputStream from the originalStream
SplittableInputStream is  = new SplittableInputStream(originalStream);

// Fork this to get more input streams reading independently from originalStream
SplittableInputStream is2 = is.split();
SplittableInputStream is3 = is.split();

每次issplit() 时,它都会产生一个新的InputStream,它将从is 被拆分的位置读取字节。

SplittableInputStream 如下所示(复制粘贴!):

class SplittableInputStream extends InputStream 

    // Almost an input stream: The read-method takes an id.
    static class MultiplexedSource 

        static int MIN_BUF = 4096;

        // Underlying source
        private InputStream source;

        // Read positions of each SplittableInputStream
        private List<Integer> readPositions = new ArrayList<>();

        // Data to be read by the SplittableInputStreams
        int[] buffer = new int[MIN_BUF];

        // Last valid position in buffer
        int writePosition = 0;

        public MultiplexedSource(InputStream source) 
            this.source = source;
        

        // Add a multiplexed reader. Return new reader id.
        int addSource(int splitId) 
            readPositions.add(splitId == -1 ? 0 : readPositions.get(splitId));
            return readPositions.size() - 1;
        

        // Make room for more data (and drop data that has been read by
        // all readers)
        private void readjustBuffer() 
            int from = Collections.min(readPositions);
            int to = Collections.max(readPositions);
            int newLength = Math.max((to - from) * 2, MIN_BUF);
            int[] newBuf = new int[newLength];
            System.arraycopy(buffer, from, newBuf, 0, to - from);
            for (int i = 0; i < readPositions.size(); i++)
                readPositions.set(i, readPositions.get(i) - from);
            writePosition -= from;
            buffer = newBuf;
        

        // Read and advance position for given reader
        public int read(int readerId) throws IOException 

            // Enough data in buffer?
            if (readPositions.get(readerId) >= writePosition) 
                readjustBuffer();
                buffer[writePosition++] = source.read();
            

            int pos = readPositions.get(readerId);
            int b = buffer[pos];
            if (b != -1)
                readPositions.set(readerId, pos + 1);
            return b;
        
    

    // Non-root fields
    MultiplexedSource multiSource;
    int myId;

    // Public constructor: Used for first SplittableInputStream
    public SplittableInputStream(InputStream source) 
        multiSource = new MultiplexedSource(source);
        myId = multiSource.addSource(-1);
    

    // Private constructor: Used in split()
    private SplittableInputStream(MultiplexedSource multiSource, int splitId) 
        this.multiSource = multiSource;
        myId = multiSource.addSource(splitId);
    

    // Returns a new InputStream that will read bytes from this position
    // onwards.
    public SplittableInputStream split() 
        return new SplittableInputStream(multiSource, myId);
    

    @Override
    public int read() throws IOException 
        return multiSource.read(myId);
    

最后是一个演示:

String str = "Lorem ipsum\ndolor sit\namet\n";
InputStream is = new ByteArrayInputStream(str.getBytes("UTF-8"));

// Create the two buffered readers.
SplittableInputStream is1 = new SplittableInputStream(is);
SplittableInputStream is2 = is1.split();

BufferedReader br1 = new BufferedReader(new InputStreamReader(is1));
BufferedReader br2 = new BufferedReader(new InputStreamReader(is2));

// Do some interleaved reads from them.
System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

System.out.println("Two lines from br2:");
System.out.println(br2.readLine());
System.out.println(br2.readLine());
System.out.println();

System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

输出:

One line from br1:
Lorem ipsum

Two lines from br2:
Lorem ipsum
dolor sit

One line from br1:
dolor sit

【讨论】:

酷,看起来可行!可以使用少 4 倍的缓冲区空间和重载 read(byte[], int, int) 以提高效率,但这是“读者练习”。在readjustBuffer 中直接说int to = writePosition; 会更清楚。 如果你有一个阅读器,那么 readjustBuffer 将在每个字节读取时被调用,这会损害性能,因为你必须为每个字节读取复制 4K 字节..【参考方案3】:

使用TeeInputStream 将从InputStream 读取的所有字节复制到辅助OutputStream,例如ByteArrayOutputStream.

【讨论】:

【参考方案4】:

输入流是这样工作的:一旦你从中读取了一部分,它就永远消失了。你不能回去重新阅读它。你可以做的是这样的:

class InputStreamSplitter 
  InputStreamSplitter(InputStream toReadFrom) 
    this.reader = new InputStreamReader(toReadFrom);
  
  void addListener(Listener l) 
    this.listeners.add(l);
  
  void work() 
    String line = this.reader.readLine();
        while(line != null) 
      for(Listener l : this.listeners) 
        l.processLine(line);
      
    
  


interface Listener 
  processLine(String line);

让所有相关方实现 Listener 并将它们添加到 InputStreamSplitter

【讨论】:

“输入流是这样工作的:一旦你从中读取了一部分,它就永远消失了。” 【参考方案5】:

不要使用 StringWriter/StringBufferInputStream,而是将原始 InputStream 写入 ByteArrayOutputStream。完成对原始 InputStream 的读取后,将从 ByteArrayOutputStream.toByteArray 返回的字节数组传递给 ByteArrayInputStream。将此 InputStream 用作选择的 InputStream 以传递需要从中读取的其他内容。

基本上,您在这里所做的只是将原始 InputStream 的内容存储到内存中的 byte[] 缓存中,就像您最初尝试使用 StringWriter/StringBufferInputStream 所做的那样。

【讨论】:

【参考方案6】:

这是另一种独立读取两个流的方法,不需要假设一个在另一个之前,而是使用标准类。但是,它确实会在后台急切地从底层输入流中读取数据,这可能是不可取的,具体取决于您的应用程序。

public static void main(String[] args) throws IOException 
  // Create the source input stream.
  InputStream is = new ByteArrayInputStream("line1\nline2\nline3".getBytes());

  // Create a piped input stream for each reader;
  PipedInputStream in1 = new PipedInputStream();
  PipedInputStream in2 = new PipedInputStream();

  // Start copying the input stream to both piped input streams.
  startCopy(is, new TeeOutputStream(
      new PipedOutputStream(in1), new PipedOutputStream(in2)));

  // Create the two buffered readers.
  BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
  BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));

  // Do some interleaved reads from them.
  // ...


private static void startCopy(InputStream in, OutputStream out) 
  (new Thread() 
    public void run() 
      try 
        IOUtils.copy(in, out);
       catch (IOException e) 
        throw new RuntimeException(e);
      
    
  ).start();

【讨论】:

startCopy的参数需要是final的 从 Java 8 开始,它们不需要是最终的,但在 Java 8 之前是的。【参考方案7】:

寻找一种可能的方式让输出流将字节发送到两个或更多不同的输入流,我找到了这个论坛。 不幸的是,确切的解决方案是指向 PipedOutputStream 和 PipedInputStream。 因此,我被拒绝编写 PipeOutputStream 扩展。 这里是。该示例是在 PipedOutputStream 的“main”方法中编写的。

/**
 * Extensao de @link PipedOutputStream, onde eh possivel conectar mais de um @link PipedInputStream
 */
public class PipedOutputStreamEx extends PipedOutputStream 

    /**
     * 
     */
    public PipedOutputStreamEx() 
        // TODO Auto-generated constructor stub
    

    /* REMIND: identification of the read and write sides needs to be
       more sophisticated.  Either using thread groups (but what about
       pipes within a thread?) or using finalization (but it may be a
       long time until the next GC). */
    private PipedInputStreamEx[] sinks=null;

    public synchronized void connect(PipedInputStreamEx... pIns) throws IOException 
        for (PipedInputStreamEx snk : pIns) 
            if (snk == null) 
                throw new NullPointerException();
             else if (sinks != null || snk.connected) 
                throw new IOException("Already connected");
            
            snk.in = -1;
            snk.out = 0;
            snk.connected = true;
        

        this.sinks = pIns;
    


    /**
     * Writes the specified <code>byte</code> to the piped output stream. 
     * <p>
     * Implements the <code>write</code> method of <code>OutputStream</code>.
     *
     * @param      b   the <code>byte</code> to be written.
     * @exception IOException if the pipe is <a href=#BROKEN> broken</a>,
     *      @link #connect(java.io.PipedInputStream) unconnected,
     *      closed, or if an I/O error occurs.
     */
    public void write(int b)  throws IOException 
        if (this.sinks == null) 
            throw new IOException("Pipe(s) not connected");
        
        for (PipedInputStreamEx sink : this.sinks) 
            sink.receive(b);
        
    

    /**
     * Writes <code>len</code> bytes from the specified byte array 
     * starting at offset <code>off</code> to this piped output stream. 
     * This method blocks until all the bytes are written to the output
     * stream.
     *
     * @param      b     the data.
     * @param      off   the start offset in the data.
     * @param      len   the number of bytes to write.
     * @exception IOException if the pipe is <a href=#BROKEN> broken</a>,
     *          @link #connect(java.io.PipedInputStream) unconnected,
     *      closed, or if an I/O error occurs.
     */
    public void write(byte b[], int off, int len) throws IOException 
        if (sinks == null) 
            throw new IOException("Pipe not connected");
         else if (b == null) 
            throw new NullPointerException();
         else if ((off < 0) || (off > b.length) || (len < 0) ||
                ((off + len) > b.length) || ((off + len) < 0)) 
            throw new IndexOutOfBoundsException();
         else if (len == 0) 
            return;
         
        for (PipedInputStreamEx sink : this.sinks) 
            sink.receive(b, off, len);
        
    

    /**
     * Flushes this output stream and forces any buffered output bytes 
     * to be written out. 
     * This will notify any readers that bytes are waiting in the pipe.
     *
     * @exception IOException if an I/O error occurs.
     */
    public synchronized void flush() throws IOException 
        if (sinks != null) 
            for (PipedInputStreamEx sink : this.sinks) 
                synchronized (sink) 
                    sink.notifyAll();
                
            
        
    

    /**
     * Closes this piped output stream and releases any system resources 
     * associated with this stream. This stream may no longer be used for 
     * writing bytes.
     *
     * @exception  IOException  if an I/O error occurs.
     */
    public void close()  throws IOException 
        if (sinks != null) 
            for (PipedInputStreamEx sink : this.sinks) 
                sink.receivedLast();
            
        
    


    /**
     * Teste desta extensao de @link PipedOutputStream
     * @param args
     * @throws InterruptedException 
     * @throws IOException 
     */
    public static void main(String[] args) throws InterruptedException, IOException 
        final PipedOutputStreamEx pOut = new PipedOutputStreamEx();
        final PipedInputStreamEx pInHash = new PipedInputStreamEx();
        final PipedInputStreamEx pInConsole = new PipedInputStreamEx();

        pOut.connect(pInHash, pInConsole);

        Thread escreve = new Thread("Escrevendo") 
            @Override
            public void run() 
                String[] paraGravar = new String[]
                        "linha1 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha2 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha3 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha4 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha5 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha6 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha7 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha8 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha9 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha10 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha11 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha12 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha13 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha14 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha15 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha16 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha17 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha18 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha19 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha20 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                ;
                for (String s :paraGravar) 
                    try 
                        pOut.write(s.getBytes("ISO-8859-1") );
                        Thread.sleep(100);
                     catch (Exception e) 
                        throw new RuntimeException(e);
                    
                
                try 
                    pOut.close();
                 catch (IOException e) 
                    e.printStackTrace();
                
               
        ;

        Thread le1 = new Thread("Le1 - hash") 
            @Override
            public void run() 
                try 
                    System.out.println("HASH: "+HashUtil.getHashCRC(pInHash,true));                         
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        ;

        Thread le2 = new Thread("Le2 - escreve no console") 
            @Override
            public void run() 
                BufferedReader bIn = new BufferedReader(new InputStreamReader(pInConsole));
                String s;
                try 
                    while ( (s=bIn.readLine())!=null) 
                        Thread.sleep(700); //teste simulando o um leitor lento...
                        System.out.println(s);
                    
                 catch (IOException e) 
                    throw new RuntimeException(e);
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        ;


        escreve.start();
        le1.start();
        le2.start();

        escreve.join();
        le1.join();
        le2.join();

        pInHash.close();
        pInConsole.close();
    

这里是 PipedInputStreamEx 代码。不幸的是,我不得不复制所有 JDK 代码,才能访问“connected”、“in”和“out”属性。

/**
 * Extensao de @link PipedInputStream, que permite conetar mais de um destes no @link PipedOutputStream
 * Como a classe ancestral possui propriedades 'package friend', tivemos que copiar o codigo herdado :/
 */
public class PipedInputStreamEx extends PipedInputStream 

    @Override
    public void connect(PipedOutputStream src) throws IOException 
        throw new IOException("conecte usando PipedOutputStream.connect()");
    

//----------------------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------
//--------- INICIO codigo da classe herdada (alguns metodos comentados...)----------------------------------
//----------------------------------------------------------------------------------------------------------

    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

    /* REMIND: identification of the read and write sides needs to be
       more sophisticated.  Either using thread groups (but what about
       pipes within a thread?) or using finalization (but it may be a
       long time until the next GC). */
    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * The default size of the pipe's circular input buffer.
     * @since   JDK1.1
     */
    // This used to be a constant before the pipe size was allowed
    // to change. This field will continue to be maintained
    // for backward compatibility. 
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    /**
     * The circular buffer into which incoming data is placed.
     * @since   JDK1.1
     */
    protected byte buffer[];

    /**
     * The index of the position in the circular buffer at which the
     * next byte of data will be stored when received from the connected
     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
     * <code>in==out</code> implies the buffer is full
     * @since   JDK1.1
     */
    protected int in = -1;

    /**
     * The index of the position in the circular buffer at which the next
     * byte of data will be read by this piped input stream.
     * @since   JDK1.1
     */
    protected int out = 0;

//    /**
//     * Creates a <code>PipedInputStream</code> so
//     * that it is connected to the piped output
//     * stream <code>src</code>. Data bytes written
//     * to <code>src</code> will then be  available
//     * as input from this stream.
//     *
//     * @param      src   the stream to connect to.
//     * @exception  IOException  if an I/O error occurs.
//     */
//    public PipedInputStream(PipedOutputStream src) throws IOException 
//        this(src, DEFAULT_PIPE_SIZE);
//    
//
//    /**
//     * Creates a <code>PipedInputStream</code> so that it is
//     * connected to the piped output stream
//     * <code>src</code> and uses the specified pipe size for
//     * the pipe's buffer.
//     * Data bytes written to <code>src</code> will then
//     * be available as input from this stream.
//     *
//     * @param      src   the stream to connect to.
//     * @param      pipeSize the size of the pipe's buffer.
//     * @exception  IOException  if an I/O error occurs.
//     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
//     * @since    1.6
//     */
//    public PipedInputStream(PipedOutputStream src, int pipeSize)
//            throws IOException 
//   initPipe(pipeSize);
//   connect(src);
//    

    /**
     * Creates a <code>PipedInputStream</code> so
     * that it is not yet @linkplain #connect(java.io.PipedOutputStream)
     * connected.
     * It must be @linkplain java.io.PipedOutputStream#connect(
     * java.io.PipedInputStream) connected to a
     * <code>PipedOutputStream</code> before being used.
     */
    public PipedInputStreamEx() 
    initPipe(DEFAULT_PIPE_SIZE);
    

    /**
     * Creates a <code>PipedInputStream</code> so that it is not yet
     * @linkplain #connect(java.io.PipedOutputStream) connected and
     * uses the specified pipe size for the pipe's buffer.
     * It must be @linkplain java.io.PipedOutputStream#connect(
     * java.io.PipedInputStream)
     * connected to a <code>PipedOutputStream</code> before being used.
     *
     * @param      pipeSize the size of the pipe's buffer.
     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
     * @since      1.6
     */
    public PipedInputStreamEx(int pipeSize) 
    initPipe(pipeSize);
    

    private void initPipe(int pipeSize) 
         if (pipeSize <= 0) 
            throw new IllegalArgumentException("Pipe Size <= 0");
         
         buffer = new byte[pipeSize];
    

//    /**
//     * Causes this piped input stream to be connected
//     * to the piped  output stream <code>src</code>.
//     * If this object is already connected to some
//     * other piped output  stream, an <code>IOException</code>
//     * is thrown.
//     * <p>
//     * If <code>src</code> is an
//     * unconnected piped output stream and <code>snk</code>
//     * is an unconnected piped input stream, they
//     * may be connected by either the call:
//     * <p>
//     * <pre><code>snk.connect(src)</code> </pre>
//     * <p>
//     * or the call:
//     * <p>
//     * <pre><code>src.connect(snk)</code> </pre>
//     * <p>
//     * The two
//     * calls have the same effect.
//     *
//     * @param      src   The piped output stream to connect to.
//     * @exception  IOException  if an I/O error occurs.
//     */
//    public void connect(PipedOutputStream src) throws IOException 
//  src.connect(this);
//    

    /**
     * Receives a byte of data.  This method will block if no input is
     * available.
     * @param b the byte being received
     * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
     *      @link #connect(java.io.PipedOutputStream) unconnected,
     *      closed, or if an I/O error occurs.
     * @since     JDK1.1
     */
    protected synchronized void receive(int b) throws IOException 
        checkStateForReceive();
        writeSide = Thread.currentThread();
        if (in == out)
            awaitSpace();
    if (in < 0) 
        in = 0;
        out = 0;
    
    buffer[in++] = (byte)(b & 0xFF);
    if (in >= buffer.length) 
        in = 0;
    
    

    /**
     * Receives data into an array of bytes.  This method will
     * block until some input is available.
     * @param b the buffer into which the data is received
     * @param off the start offset of the data
     * @param len the maximum number of bytes received
     * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
     *       @link #connect(java.io.PipedOutputStream) unconnected,
     *       closed,or if an I/O error occurs.
     */
    synchronized void receive(byte b[], int off, int len)  throws IOException 
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) 
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;
            if (out < in) 
                nextTransferAmount = buffer.length - in;
             else if (in < out) 
                if (in == -1) 
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                 else 
                    nextTransferAmount = out - in;
                
            
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) 
                in = 0;
            
        
    

    private void checkStateForReceive() throws IOException 
        if (!connected) 
            throw new IOException("Pipe not connected");
         else if (closedByWriter || closedByReader) 
        throw new IOException("Pipe closed");
     else if (readSide != null && !readSide.isAlive()) 
            throw new IOException("Read end dead");
        
    

    private void awaitSpace() throws IOException 
    while (in == out) 
        checkStateForReceive();

        /* full: kick any waiting readers */
        notifyAll();
        try 
            wait(1000);
         catch (InterruptedException ex) 
        throw new java.io.InterruptedIOException();
        
    
    

    /**
     * Notifies all waiting threads that the last byte of data has been
     * received.
     */
    synchronized void receivedLast() 
    closedByWriter = true;
    notifyAll();
    

    /**
     * Reads the next byte of data from this piped input stream. The
     * value byte is returned as an <code>int</code> in the range
     * <code>0</code> to <code>255</code>. 
     * This method blocks until input data is available, the end of the
     * stream is detected, or an exception is thrown.
     *
     * @return     the next byte of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if the pipe is
     *       @link #connect(java.io.PipedOutputStream) unconnected,
     *       <a href=#BROKEN> <code>broken</code></a>, closed,
     *       or if an I/O error occurs.
     */
    public synchronized int read()  throws IOException 
        if (!connected) 
            throw new IOException("Pipe not connected");
         else if (closedByReader) 
        throw new IOException("Pipe closed");
     else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) 
            throw new IOException("Write end dead");
    

        readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) 
        if (closedByWriter) 
        /* closed by writer, return EOF */
        return -1;
        
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) 
        throw new IOException("Pipe broken");
        
            /* might be a writer waiting */
        notifyAll();
        try 
            wait(1000);
         catch (InterruptedException ex) 
        throw new java.io.InterruptedIOException();
        
    
    int ret = buffer[out++] & 0xFF;
    if (out >= buffer.length) 
        out = 0;
    
    if (in == out) 
            /* now empty */
        in = -1;
    

    return ret;
    

    /**
     * Reads up to <code>len</code> bytes of data from this piped input
     * stream into an array of bytes. Less than <code>len</code> bytes
     * will be read if the end of the data stream is reached or if 
     * <code>len</code> exceeds the pipe's buffer size.
     * If <code>len </code> is zero, then no bytes are read and 0 is returned; 
     * otherwise, the method blocks until at least 1 byte of input is 
     * available, end of the stream has been detected, or an exception is
     * thrown.
     *
     * @param      b     the buffer into which the data is read.
     * @param      off   the start offset in the destination array <code>b</code>
     * @param      len   the maximum number of bytes read.
     * @return     the total number of bytes read into the buffer, or
     *             <code>-1</code> if there is no more data because the end of
     *             the stream has been reached.
     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
     * @exception  IndexOutOfBoundsException If <code>off</code> is negative, 
     * <code>len</code> is negative, or <code>len</code> is greater than 
     * <code>b.length - off</code>
     * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
     *       @link #connect(java.io.PipedOutputStream) unconnected,
     *       closed, or if an I/O error occurs.
     */
    public synchronized int read(byte b[], int off, int len)  throws IOException 
    if (b == null) 
        throw new NullPointerException();
     else if (off < 0 || len < 0 || len > b.length - off) 
        throw new IndexOutOfBoundsException();
     else if (len == 0) 
        return 0;
    

        /* possibly wait on the first character */
    int c = read();
    if (c < 0) 
        return -1;
    
    b[off] = (byte) c;
    int rlen = 1;
    while ((in >= 0) && (len > 1)) 

        int available; 

        if (in > out) 
        available = Math.min((buffer.length - out), (in - out));
         else 
        available = buffer.length - out;
        

        // A byte is read beforehand outside the loop
        if (available > (len - 1)) 
        available = len - 1;
        
        System.arraycopy(buffer, out, b, off + rlen, available);
        out += available;
        rlen += available; 
        len -= available;

        if (out >= buffer.length) 
        out = 0;
        
        if (in == out) 
                /* now empty */
        in = -1;
        
    
    return rlen;
    

    /**
     * Returns the number of bytes that can be read from this input
     * stream without blocking.
     *
     * @return the number of bytes that can be read from this input stream
     *         without blocking, or @code 0 if this input stream has been
     *         closed by invoking its @link #close() method, or if the pipe
     *         is @link #connect(java.io.PipedOutputStream) unconnected, or
     *      <a href=#BROKEN> <code>broken</code></a>.
     *
     * @exception  IOException  if an I/O error occurs.
     * @since   JDK1.0.2
     */
    public synchronized int available() throws IOException 
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
    

    /**
     * Closes this piped input stream and releases any system resources
     * associated with the stream.
     *
     * @exception  IOException  if an I/O error occurs.
     */
    public void close()  throws IOException 
    closedByReader = true;
        synchronized (this) 
            in = -1;
        
    

//----------------------------------------------------------------------------------------------------------
//--------- FIM codigo da classe herdada -------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------



【讨论】:

以上是关于Java 中 InputStream 的多个阅读器的主要内容,如果未能解决你的问题,请参考以下文章

InputStream.available() 在 Java 中做了啥?

如何在 InputStream 中接收多个文件并进行相应处理?

阅读 HttpURLConnection InputStream - 手动缓冲区还是 BufferedInputStream?

Java I/O流 04

Java IO--输入流和输出流简介

struts2文件下载出现Can not find a java.io.InputStream with the name的错误