将输入流连接到输出流
Posted
技术标签:
【中文标题】将输入流连接到输出流【英文标题】:Connecting an input stream to an outputstream 【发布时间】:2010-12-07 04:27:57 【问题描述】:在 java9 中更新:https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-
我看到了一些类似的,但不是我需要的线程。
我有一个服务器,它基本上会从客户端客户端 A 接收输入,并将其逐字节转发到另一个客户端客户端 B。
我想将客户端 A 的输入流与客户端 B 的输出流连接起来。这可能吗?有什么方法可以做到这一点?
此外,这些客户端正在相互发送消息,这些消息对时间有些敏感,因此无法进行缓冲。我不想要一个 500 的缓冲区,而客户端发送 499 个字节,然后我的服务器推迟转发 500 个字节,因为它没有收到最后一个字节来填充缓冲区。
现在,我正在解析每条消息以查找其长度,然后读取长度字节,然后转发它们。我认为(并测试)这比读取一个字节并一遍又一遍地转发一个字节要好,因为那会非常慢。出于我在上一段中所述的原因,我也不想使用缓冲区或计时器 - 我不希望仅仅因为缓冲区未满而等待很长时间才能通过的消息。
有什么好的方法可以做到这一点?
【问题讨论】:
【参考方案1】:仅仅因为您使用缓冲区并不意味着流必须填充该缓冲区。换句话说,这应该没问题:
public static void copyStream(InputStream input, OutputStream output)
throws IOException
byte[] buffer = new byte[1024]; // Adjust if you want
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1)
output.write(buffer, 0, bytesRead);
这应该可以正常工作 - 基本上 read
调用将阻塞,直到有 一些 数据可用,但它不会等到 all 可用以填充缓冲区. (我想它可以,而且我相信FileInputStream
通常会填充缓冲区,但附加到套接字的流更有可能立即为您提供数据。)
我认为至少值得先尝试这个简单的解决方案。
【讨论】:
是的,我认为这可以解决问题。我想我对确实需要填充缓冲区的 readFully() 感到困惑。 我已经尝试过您的代码,我还尝试通过读取消息的长度然后执行 byte[] buf = length; 来逐条读取消息inputstream.read(buf)....后一种方法更快,我不知道为什么。它似乎执行了更多的代码行,但速度更快。几乎快 2 倍。 @Zibbobz:任何数组大小都可以工作——它越大,需要的读取越少,但它在工作时占用的内存就越多。它不一定是流的实际长度。 @sgibly:好吧,鉴于close()
无论如何都会刷新它,我个人认为这不值得。当然,如果您采用这样的代码,您应该可以随意添加它:)
@sgibly:我会说它的文档记录很差,而不是 intent 是每个人都必须调用 flush...【参考方案2】:
使用怎么样
void feedInputToOutput(InputStream in, OutputStream out)
IOUtils.copy(in, out);
并完成它?
来自 jakarta apache commons i/o 库,该库已被大量项目使用,因此您可能已经在类路径中拥有该 jar。
【讨论】:
或者只使用函数本身,因为不需要调用具有完全相同参数的另一个函数.... 是的,这就是我个人所做的。我想我只是输入了额外的方法名称作为文档,但它不是必需的。 据我所知,此方法一直阻塞,直到通过 while 输入。因此,这应该在提问者的异步线程中完成。【参考方案3】:JDK 9 已为此功能添加了InputStream#transferTo(OutputStream out)
。
【讨论】:
【参考方案4】:为了完整起见,guava 也有一个 handy utility 用于此
ByteStreams.copy(input, output);
【讨论】:
【参考方案5】:您可以使用循环缓冲区:
代码
// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());
Maven 依赖项
<dependency>
<groupId>org.ostermiller</groupId>
<artifactId>utils</artifactId>
<version>1.07.00</version>
</dependency>
模式详情
http://ostermiller.org/utils/CircularBuffer.html
【讨论】:
【参考方案6】:异步方式来实现它。
void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out)
Thread t = new Thread(new Runnable()
public void run()
try
int d;
while ((d = inputStream.read()) != -1)
out.write(d);
catch (IOException ex)
//TODO make a callback on exception.
);
t.setDaemon(true);
t.start();
【讨论】:
这是在不阻塞当前线程的情况下将数据从一个流传输到另一个流。【参考方案7】:BUFFER_SIZE 是要读取的卡盘大小。应该 > 1kb 且
private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException
try
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead = input.read(buffer);
while (bytesRead != -1)
output.write(buffer, 0, bytesRead);
bytesRead = input.read(buffer);
//If needed, close streams.
finally
input.close();
output.close();
【讨论】:
应该远小于 10MB。这就是我们所说的 TCP。任何大于套接字接收缓冲区的大小都是完全没有意义的,它们以千字节而不是兆字节为单位。【参考方案8】:使用 org.apache.commons.io.IOUtils
InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);
或 copyLarge 大小 >2GB
【讨论】:
【参考方案9】:这是一个干净且快速的 Scala 版本(没有 ***):
import scala.annotation.tailrec
import java.io._
implicit class InputStreamOps(in: InputStream)
def >(out: OutputStream): Unit = pipeTo(out)
def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))
@tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match
case n if n > 0 =>
out.write(buffer, 0, n)
pipeTo(out, buffer)
case _ =>
in.close()
out.close()
这可以使用>
符号,例如inputstream > outputstream
并传入自定义缓冲区/大小。
【讨论】:
你能提供一些类似的Java实现吗? @Luchostein:我正在回复下面 George Pligor 的错误 Scala 回答【参考方案10】:如果您喜欢函数式,这是一个用 Scala 编写的函数,展示了如何仅使用 val(而不是 var)将输入流复制到输出流。
def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int)
val buffer = new Array[Byte](bufferSize);
def recurse()
val len = inputStream.read(buffer);
if (len > 0)
outputStream.write(buffer.take(len));
recurse();
recurse();
请注意,不建议在可用内存很少的 java 应用程序中使用此方法,因为使用递归函数很容易出现堆栈溢出异常错误
【讨论】:
-1:递归 Scala 解决方案如何与 Java 问题相关? 方法recurse是尾递归的。如果您使用 @tailrec 对其进行注释,则不会出现堆栈溢出问题。 这个答案验证了所有的纯java程序员都在承受着老板的压力,需要严肃的愤怒管理!以上是关于将输入流连接到输出流的主要内容,如果未能解决你的问题,请参考以下文章