为啥没有更多的 Java 代码使用 PipedInputStream / PipedOutputStream?

Posted

技术标签:

【中文标题】为啥没有更多的 Java 代码使用 PipedInputStream / PipedOutputStream?【英文标题】:Why doesn't more Java code use PipedInputStream / PipedOutputStream?为什么没有更多的 Java 代码使用 PipedInputStream / PipedOutputStream? 【发布时间】:2010-10-03 19:16:55 【问题描述】:

我最近发现了这个成语,我想知道我是否遗漏了什么。我从未见过它使用过。我在野外使用过的几乎所有 Java 代码都倾向于将数据放入字符串或缓冲区中,而不是像这个示例那样(例如使用 HttpClient 和 XML API):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() 
            public void run() 
                output.setByteStream(source);
                serializer.write(doc, output);
                try 
                    source.close();
                 catch (IOException e) 
                    throw new RuntimeException(e);
                
            );

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

该代码使用 Unix 管道样式技术来防止 XML 数据的多个副本保存在内存中。它使用 HTTP Post 输出流和 DOM 加载/保存 API 将 XML 文档序列化为 HTTP 请求的内容。据我所知,它通过很少的额外代码(仅RunnablePipedInputStreamPipedOutputStream 的几行代码)最大限度地减少了内存的使用。

那么,这个成语有什么问题?如果这个成语没有问题,为什么我没看到?

编辑:澄清一下,PipedInputStreamPipedOutputStream 替换随处可见的样板缓冲区逐个缓冲区副本,它们还允许您同时处理传入数据并写出处理后的数据。他们不使用操作系统管道。

【问题讨论】:

我看到了您的代码并为 JAXB xml (pastebin.com/zsWR8Dgx) 做了一个示例。代码对我来说似乎很脆弱。很高兴看到一个可靠的例子。 试试 Pipe4j:code.google.com/p/pipe4j 对于它的价值,我确实一直在使用它,它绝不是脆弱的。如果您想要更有意义的回溯,您必须将抛出的异常存储在 Runnable 对象中的某个地方并重新抛出它们。它仍然比编码到显式缓冲区要好得多。 如果 Java 可以将 Process 对象链接在一起,这样就可以作为非 Java 程序通过输入/输出流进行通信的胶水语言,这将是完美的。不幸的是,它不受支持。 ***.com/q/8243157/714112 @SridharSarnobat 在 JDK 11 中,您现在可以将单个 .java 文本文件作为顶部带有 shebang 行的脚本启动。因此,您可以通过 *NIX shell 对 java 程序进行传统的管道传输:openjdk.java.net/jeps/330#%22Shebang%22-files 不是超级有用,但至少有一些进步。 【参考方案1】:

PipedInputStream 和 PipeOutputStream 将使其线程休眠 1 秒,只要它们阻塞等待对方读取或写入已满或空的缓冲区。不要使用。

【讨论】:

评论不用于扩展讨论;这个对话是moved to chat。【参考方案2】:

这是一个管道有意义的用例:

假设您有一个第三方库,例如 xslt 映射器或加密库,其接口如下:doSomething(inputStream, outputStream)。而且您不想在通过线路发送之前缓冲结果。 Apache 和其他客户端不允许直接访问线路输出流。您可以获得的最接近的是在请求实体对象中获取输出流 - 在偏移量处,在写入标头之后。但由于这是在幕后,将输入流和输出流传递给第三方库仍然不够。管道是解决这个问题的好方法。

顺便说一句,我写了一个 Apache 的 HTTP 客户端 API [PipedApacheClientOutputStream] 的反转,它使用 Apache Commons HTTP Client 4.3.4 为 HTTP POST 提供了一个 OutputStream 接口。这是一个管道流可能有意义的示例。

【讨论】:

【参考方案3】:

来自Javadocs:

通常,数据由一个线程从 PipedInputStream 对象读取,数据由其他线程写入相应的 PipedOutputStream。不建议尝试在单个线程中同时使用这两个对象,因为这可能会使线程死锁。

这可以部分解释为什么它不被更常用。

我认为另一个原因是许多开发人员不了解它的目的/好处。

【讨论】:

遗憾的是,并发在不需要的地方被过度使用,而在需要的地方却未被充分利用......哎呀! :) @iny,我认为大多数开发人员并没有编写并发代码。也许它是在并发环境中运行的,但我认为每天都在处理多线程的开发人员是少数(这可能是一件好事) “许多开发人员不了解它的目的/好处”可能是那些以前没有使用过 Unix,因此没有接触过 pips-and-filters 设计模式的有用性的开发人员。跨度> @JohnGardner 同意了。所有 Java/JVM 开发人员都应该阅读 Java Concurrency In Practice(子弹头列车书籍)。这是关于这个关键主题的最佳书面书籍(Doug Lea 会同意),它解释了由于大多数 java 代码(包括 JDK)中未指定的并发属性而导致的许多问题。它可以帮助您解决这些问题,并了解在您自己的 API 中决定和声明的内容。 我写了一个行为正确的替换,因为我无法处理 JDK 版本的愚蠢。见github.com/archiecobbs/dellroad-stuff/blob/master/…【参考方案4】:

java.io 管道有太多的上下文切换(每字节读/写),它们的 java.nio 对应物要求你有一些 NIO 背景和正确使用通道和东西,这是我自己使用阻塞的管道实现对于单个生产者/消费者而言,队列将执行快速且可扩展:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream

  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[];

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  
    this(DEFAULT_BUFFER_SIZE);
  

  public QueueOutputStream(final int bufferSize)
  
    if(bufferSize<=0)
      throw new IllegalArgumentException("Buffer size <= 0");
    
    this.buffer=new byte[bufferSize];
  

  private synchronized void flushBuffer()
  
    if(count>0)
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    
  

  @Override
  public synchronized void write(final int b) throws IOException
  
    if(closed)
      throw new IllegalStateException("Stream is closed");
    
    if(count>=buffer.length)
      flushBuffer();
    
    buffer[count++]=(byte)b;
  

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  
    super.write(b,off,len);
  

  @Override
  public synchronized void close() throws IOException
  
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  
    return executor.submit(
            new Callable<Void>()
            
              @Override
              public Void call() throws Exception
              
                try
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL)
                    outputStream.write(buffer);
                    buffer=queue.take();
                  
                  outputStream.flush();
                 catch(Exception e)
                  close();
                  throw e;
                 finally
                  outputStream.close();
                
                return null;
              
            
    );
  

【讨论】:

【参考方案5】:

在您的示例中,您创建了两个线程来完成可以由一个线程完成的工作。并在混合中引入 I/O 延迟。

你有更好的例子吗?还是我刚刚回答了你的问题。


将一些 cmets(至少我对它们的看法)拉到主要响应中:

并发将复杂性引入应用程序。您现在必须关注独立数据流的排序,而不是处理单个线性数据流。在某些情况下,增加的复杂性可能是合理的,尤其是当您可以利用多个内核/CPU 来执行 CPU 密集型工作时。 如果您处于可以从并发操作中受益的情况,通常有更好的方法来协调线程之间的数据流。例如,使用并发队列在线程之间传递对象,而不是将管道流包装在对象流中。 当您有多个线程执行文本处理时,管道流可能是一个很好的解决方案,例如 Unix 管道(例如:grep | sort)。

在具体示例中,管道流允许使用 HttpClient 提供的现有 RequestEntity 实现类。我认为更好的解决方案是创建一个新的实现类,如下所示,因为该示例最终是一个顺序操作,无法从并发实现的复杂性和开销中受益。虽然我将 RequestEntity 显示为匿名类,但可重用性表明它应该是一流的类。

post.setRequestEntity(new RequestEntity()

    public long getContentLength()
    
        return 0-1;
    

    public String getContentType()
    
        return "text/xml";
    

    public boolean isRepeatable()
    
        return false;
    

    public void writeRequest(OutputStream out) throws IOException
    
        output.setByteStream(out);
        serializer.write(doc, output);
    
);

【讨论】:

这是我方便的一个例子。引入了哪些 IO 延迟? PipedInputStreams 和 PipedOutputStreams 是内存缓冲区。 它们可能是内存缓冲区,但它们使用底层管道实现,即内核 I/O 操作。 不是根据他们不知道的来源。 至于您的示例:我没有使用 HttpClient,但我希望有一种替代方法可以作为 OutputStream 访问请求正文。也许不是,尽管您确定 PostMethod 不会在内存中缓冲其内容(在这种情况下您不会获得任何东西) PostMethod 可以缓冲或不缓冲,这取决于方法是否已配置为对封闭实体进行分块。默认情况下,当未设置内容长度时,它会分块。如果您在回答时假设我已经阅读了相关 API 和源代码,那将会更有帮助。【参考方案6】:

另外,回到最初的例子:不,它也不能完全减少内存使用。构建 DOM 树,完成内存缓冲——虽然这比全字节数组副本要好,但并没有那么好。 但是在这种情况下缓冲会比较慢;并且还创建了一个额外的线程——您不能在单个线程中使用 PipedInput/OutputStream 对。

有时 PipedXxxStreams 很有用,但它们没有被更多使用的原因是因为它们通常不是正确的解决方案。它们适用于线程间通信,这就是我使用它们的价值所在。只是考虑到 SOA 如何将大多数此类边界推到服务之间而不是线程之间,因此没有那么多用例。

【讨论】:

【参考方案7】:

不久前我尝试使用这些类,但我忘记了细节。但我确实发现他们的实施存在致命缺陷。我不记得它是什么,但我有一个偷偷摸摸的记忆,它可能是一个竞争条件,这意味着它们偶尔会死锁(是的,当然我在单独的线程中使用它们:它们根本不能用于单线程,并非设计为)。

我可能会看看他们的源代码,看看我是否能看出问题所在。

【讨论】:

我发现两端都需要关闭。【参考方案8】:

那么,这个成语有什么问题?如果 这个成语没有错 为什么我没看到?

编辑:澄清一下,PipedInputStream 和 PipedOutputStream 替换 样板缓冲区逐缓冲区复制 随处可见,他们也 允许您处理传入的数据 同时写出 处理过的数据。他们不使用操作系统 管道。

你已经说明了它的作用,但没有说明你为什么这样做。

如果您认为这会减少使用的资源(cpu/内存)或提高性能,那么它也不会这样做。但是它会使您的代码更加复杂。

基本上你有一个没有问题的解决方案。

【讨论】:

在这种特殊情况下你是对的——还有另一种编码方式可以避免无限的内存消耗。然而,在一般情况下,与等效的缓冲区复制代码相比,它需要更少的代码来避免无限的内存消耗。 你认为什么是“无限的内存消耗”我已经为交易系统开发网络解决方案六年了,我从未遇到过这个问题。 您的交易系统是否可以在不耗尽空间的情况下处理带有大量有效负载的单条消息?如果是这样,那么它们的内存消耗是有限的;否则他们有无限的内存消耗。 (并不是说我希望交易系统除了拒绝超过一定大小的消息之外会做任何事情,但信不信由你,并非每个域都如此。) 确实,交易消息通常很小,因为延迟很重要。它们可以很快加起来,我们最终会在内存中获得 10 多个演出的数据。但是,我不确定这有什么关系。据我所知,发布的解决方案不会帮助您处理非常大的消息,事实上,您最终会得到两个副本,而不是传递一个消息副本(因为作者无法完成大消息的序列化和丢弃原件,直到读者几乎阅读/重建副本) 这是一个用例。假设您有一个第三方库,例如 xslt 映射器或加密库,其接口如下:doSomething(inputStream, outputStream)。而且您不想在通过线路发送之前缓冲结果。 Apache 和其他客户端不允许直接访问线路输出流。您可以获得的最接近的是在请求实体对象中获取输出流 - 在偏移量处,在写入标头之后。但是由于这是在幕后,将输入流和输出流传递给第三方库仍然不够。管道是解决这个问题的好方法。【参考方案9】:

我也是最近才发现 PipedInputStream/PipedOutputStream 类。

我正在开发一个需要通过 SSH 在远程服务器上执行命令的 Eclipse 插件。我正在使用JSch,Channel API 从输入流读取并写入输出流。但我需要通过输入流提供命令并从输出流中读取响应。这就是 PipedInput/OutputStream 的用武之地。

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();

【讨论】:

javadoc 说管道流可能会在一个线程上死锁???? (这很糟糕,因为我想在没有额外线程的情况下使用完全一样的东西)......这真的有效还是你会陷入僵局?

以上是关于为啥没有更多的 Java 代码使用 PipedInputStream / PipedOutputStream?的主要内容,如果未能解决你的问题,请参考以下文章

为啥很多人觉得Java没有前途

为啥这个元组在 swift 3 中没有更多上下文就模棱两可?

为啥我使用回溯解决数独的 JAVA 代码没有给出任何解决方案? [关闭]

为啥没有更多的控制器由于未使用 skip_forgery_protection 而失败?

为啥这段 Java 代码没有利用所有 CPU 内核?

为啥使用超过 2 个线程会消耗更多时间?