PipedInputStream - 如何避免“java.io.IOException:管道损坏”

Posted

技术标签:

【中文标题】PipedInputStream - 如何避免“java.io.IOException:管道损坏”【英文标题】:PipedInputStream - How to avoid "java.io.IOException: Pipe broken" 【发布时间】:2010-12-24 09:18:28 【问题描述】:

我有两个线程。其中一个写入 PipedOutputStream,另一个从相应的 PipedInputStream 读取。背景是一个线程正在从远程服务器下载一些数据,并通过管道流将其多路复用到其他几个线程。

问题是有时(尤其是在下载大型 (>50Mb) 文件时)我在尝试从 PipedInputStream 读取数据时收到 java.io.IOException: Pipe broken。 Javadoc 说 A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive. 确实,在将他的所有数据写入 PipedOutputStream 后,我的写作线程真的死了。

有什么解决办法吗?如何防止 PipedInputStream 抛出此异常?即使写入线程完成了他的工作,我也希望能够读取写入 PipedOutputStream 的所有数据。 (如果有人知道如何在所有数据都被读取之前保持写入线程处于活动状态,那么这个解决方案也是可以接受的)。

【问题讨论】:

【参考方案1】:

当您使用多个读取器或写入器线程时,您可能会遇到这些类的问题(请参阅JDK-4028322)。

但是,大多数用户可能只使用一个阅读器和一个编写器线程。既然你也是这种情况,那么你遇到管道破裂的原因很可能是你写完后没有closePipedOutputStream

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

// You can of course also use an Executor or similar for this
new Thread(() -> 
    // Your method writing the data
    writeDataTo(out);
    // Close PipedOutputStream so that reader knows that the end of the data 
    // has been reached
    try 
        out.close();
    
    catch (IOException e) 
        // Handle exception or simply ignore it; current implementation will NEVER 
        // throw an IOException: https://github.com/openjdk/jdk/blob/0e14d5a98453407488057e6714f90f04d7dfa383/src/java.base/share/classes/java/io/PipedOutputStream.java#L174
    
).start();

// Your method reading the data
readDataFrom(in);
// Close PipedInputStream so that writer fails instead of blocking infinitely in case 
// it tries to write again (for whatever reason)
in.close();

也不需要手动调用PipedOutputStream.flush(),只会通知等待的读者,直接调用close()不会丢失数据。

遗憾的是,文档目前对所有这些都不是很清楚。一般来说,依赖实现并不是一个好主意,但在这种情况下,它可能是唯一明智的解决方案:

https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/io/PipedInputStream.java https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/io/PipedOutputStream.java

【讨论】:

【参考方案2】:

使用 java.util.concurrent.CountDownLatch,并且不要在第二个线程发出已完成从管道读取的信号之前结束第一个线程。

更新:快速而肮脏的代码来说明我在下面的评论

    final PipedInputStream pin = getInputStream();
    final PipedOutputStream pout = getOutputStream();

    final CountDownLatch latch = new CountDownLatch(1);

    InputStream in = new InputStream() 

        @Override
        public int read() throws IOException 
            return pin.read();
        

        @Override
        public void close() throws IOException 
            super.close();
            latch.countDown();
        
    ;


    OutputStream out = new OutputStream()

        @Override
        public void write(int b) throws IOException 
            pout.write(b);
        

        @Override
        public void close() throws IOException 
            while(latch.getCount()!=0) 
                try 
                    latch.await();
                 catch (InterruptedException e) 
                    //too bad
                
            
            super.close();
        
    ;

    //give the streams to your threads, they don't know a latch ever existed
    threadOne.feed(in);
    threadTwo.feed(out);

【讨论】:

不错的功能,绝对是+1,但它需要在不同线程之间共享一个CountDownLatch实例。这不是很好,因为写入和读取线程是在不同的地方创建的,我希望它们彼此不知道。我现在的架构是这样的,他们只知道应该写入/读取给定流。 那么,也许你可以扩展 Piped[In|Out]putStream 来处理 CountDownLatch 的操作。 或编写自己的 Input/OutputStream 来包装 Pipe 和 Latch(请参阅我在答案中添加的示例代码) 不需要所有这些额外的代码(在当前的 Java 版本中),请参阅 wds's answer。如果您关闭两个流,则不必担心会遇到损坏的管道。【参考方案3】:

当正在使用它的线程结束时,您是否正在关闭您的PipedOutputStream?您需要这样做,以便将其中的字节刷新到相应的PipedInputStream

【讨论】:

我真的认为这里出了点问题,无论如何,如果写入线程正常结束,您永远不应该收到损坏的管道。如果它的数据不适合PipedInputStream,它应该阻塞直到有空间。 这应该是公认的答案。当 OP 询问时,行为可能有所不同,或者他们没有正确关闭它,但是如果您在完成后关闭流,您将不会遇到损坏的管道。遗憾的是,文档对此不是很清楚,但如果有疑问,您可以查看源代码:github.com/openjdk/jdk/blob/…【参考方案4】:

PipedInputStreamPipedOutputStream 已损坏(关于线程)。他们假设每个实例都绑定到一个特定的线程。这很奇怪。我建议使用您自己的(或至少不同的)实现。

【讨论】:

此响应没有任何价值。这些类以什么方式假设这一点? 每个读/写存储对最后一个读/写线程的引用。当缓冲区为空/满并且最后一个读/写线程终止时,即使另一个线程现在是所有者但尚未对其执行任何操作,也会出现异常。

以上是关于PipedInputStream - 如何避免“java.io.IOException:管道损坏”的主要内容,如果未能解决你的问题,请参考以下文章

java 如何使用PipedInputStream和PipedOutputStream

管道流 pipedinputstream

Java IO PipedInputStream 和 PipedOutputStream

PipedInputStream/PipedOutputStream原理

字节输入流/输出流-----PipedInputStream/PipedOutputStream

Java IO: PipedInputStream