将输入流连接到输出流

Posted

技术标签:

【中文标题】将输入流连接到输出流【英文标题】:Connecting an input stream to an outputstream 【发布时间】:2010-12-07 04:27:57 【问题描述】:

在 java9 中更新:https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

我看到了一些类似的,但不是我需要的线程。

我有一个服务器,它基本上会从客户端客户端 A 接收输入,并将其逐字节转发到另一个客户端客户端 B。

我想将客户端 A 的输入流与客户端 B 的输出流连接起来。这可能吗?有什么方法可以做到这一点?

此外,这些客户端正在相互发送消息,这些消息对时间有些敏感,因此无法进行缓冲。我不想要一个 500 的缓冲区,而客户端发送 499 个字节,然后我的服务器推迟转发 500 个字节,因为它没有收到最后一个字节来填充缓冲区。

现在,我正在解析每条消息以查找其长度,然后读取长度字节,然后转发它们。我认为(并测试)这比读取一个字节并一遍又一遍地转发一个字节要好,因为那会非常慢。出于我在上一段中所述的原因,我也不想使用缓冲区或计时器 - 我不希望仅仅因为缓冲区未满而等待很长时间才能通过的消息。

有什么好的方法可以做到这一点?

【问题讨论】:

【参考方案1】:

仅仅因为您使用缓冲区并不意味着流必须填充该缓冲区。换句话说,这应该没问题:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException

    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    
        output.write(buffer, 0, bytesRead);
    

这应该可以正常工作 - 基本上 read 调用将阻塞,直到有 一些 数据可用,但它不会等到 all 可用以填充缓冲区. (我想它可以,而且我相信FileInputStream 通常填充缓冲区,但附加到套接字的流更有可能立即为您提供数据。)

我认为至少值得先尝试这个简单的解决方案。

【讨论】:

是的,我认为这可以解决问题。我想我对确实需要填充缓冲区的 readFully() 感到困惑。 我已经尝试过您的代码,我还尝试通过读取消息的长度然后执行 byte[] buf = length; 来逐条读取消息inputstream.read(buf)....后一种方法更快,我不知道为什么。它似乎执行了更多的代码行,但速度更快。几乎快 2 倍。 @Zibbobz:任何数组大小都可以工作——它越大,需要的读取越少,但它在工作时占用的内存就越多。它不一定是流的实际长度。 @sgibly:好吧,鉴于close() 无论如何都会刷新它,我个人认为这不值得。当然,如果您采用这样的代码,您应该可以随意添加它:) @sgibly:我会说它的文档记录很差,而不是 intent 是每个人都必须调用 flush...【参考方案2】:

使用怎么样

void feedInputToOutput(InputStream in, OutputStream out) 
   IOUtils.copy(in, out);

并完成它?

来自 jakarta apache commons i/o 库,该库已被大量项目使用,因此您可能已经在类路径中拥有该 jar。

【讨论】:

或者只使用函数本身,因为不需要调用具有完全相同参数的另一个函数.... 是的,这就是我个人所做的。我想我只是输入了额外的方法名称作为文档,但它不是必需的。 据我所知,此方法一直阻塞,直到通过 while 输入。因此,这应该在提问者的异步线程中完成。【参考方案3】:

JDK 9 已为此功能添加了InputStream#transferTo(OutputStream out)

【讨论】:

【参考方案4】:

为了完整起见,guava 也有一个 handy utility 用于此

ByteStreams.copy(input, output);

【讨论】:

【参考方案5】:

您可以使用循环缓冲区:

代码

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());

Maven 依赖项

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>

模式详情

http://ostermiller.org/utils/CircularBuffer.html

【讨论】:

【参考方案6】:

异步方式来实现它。

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) 
    Thread t = new Thread(new Runnable() 

        public void run() 
            try 
                int d;
                while ((d = inputStream.read()) != -1) 
                    out.write(d);
                
             catch (IOException ex) 
                //TODO make a callback on exception.
            
        
    );
    t.setDaemon(true);
    t.start();

【讨论】:

这是在不阻塞当前线程的情况下将数据从一个流传输到另一个流。【参考方案7】:

BUFFER_SIZE 是要读取的卡盘大小。应该 > 1kb 且

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException 
    try 
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) 
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        
    //If needed, close streams.
     finally 
        input.close();
        output.close();
    

【讨论】:

应该远小于 10MB。这就是我们所说的 TCP。任何大于套接字接收缓冲区的大小都是完全没有意义的,它们以千字节而不是兆字节为单位。【参考方案8】:

使用 org.apache.commons.io.IOUtils

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

copyLarge 大小 >2GB

【讨论】:

【参考方案9】:

这是一个干净且快速的 Scala 版本(没有 ***):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) 
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match 
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    
  

这可以使用&gt; 符号,例如inputstream &gt; outputstream 并传入自定义缓冲区/大小。

【讨论】:

你能提供一些类似的Java实现吗? @Luchostein:我正在回复下面 George Pligor 的错误 Scala 回答【参考方案10】:

如果您喜欢函数式,这是一个用 Scala 编写的函数,展示了如何仅使用 val(而不是 var)将输入流复制到输出流。

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) 
  val buffer = new Array[Byte](bufferSize);
  def recurse() 
    val len = inputStream.read(buffer);
    if (len > 0) 
      outputStream.write(buffer.take(len));
      recurse();
    
  
  recurse();

请注意,不建议在可用内存很少的 java 应用程序中使用此方法,因为使用递归函数很容易出现堆栈溢出异常错误

【讨论】:

-1:递归 Scala 解决方案如何与 Java 问题相关? 方法recurse是尾递归的。如果您使用 @tailrec 对其进行注释,则不会出现堆栈溢出问题。 这个答案验证了所有的纯java程序员都在承受着老板的压力,需要严肃的愤怒管理!

以上是关于将输入流连接到输出流的主要内容,如果未能解决你的问题,请参考以下文章

9.0.网络编程_IO 通信模型

我们如何将 Spark 结构化流连接到 Redis?

如何检查 Java 程序的输入/输出流是不是连接到终端?

Java-输入输出流

java.io.PipedInputStream

输入流与输出流