Java InputStream 阻塞读取
Posted
技术标签:
【中文标题】Java InputStream 阻塞读取【英文标题】:Java InputStream blocking read 【发布时间】:2010-10-11 08:06:24 【问题描述】:根据java api,InputStream.read()
被描述为:
如果没有可用的字节,因为 已到达流的尽头, 返回值 -1。这种方法 阻塞直到输入数据可用, 检测到流的结尾,或 抛出异常。
我有一个while(true)
循环进行读取,当没有任何内容通过流发送时,我总是得到-1。这是意料之中的。
我的问题是 read() 什么时候会阻塞?因为如果它没有得到任何数据,它会返回-1。我希望阻塞读取要等到收到数据。如果您已到达输入流的末尾,难道 read() 不应该只是等待数据而不是返回 -1 吗?
或者 read() 只有在有另一个线程访问流并且您的 read() 无法访问流时才会阻塞?
这引出了我的下一个问题。我曾经有事件监听器(由我的库提供),它会在数据可用时通知我。当我收到通知时,我会调用while((aByte = read()) > -1)
存储字节。当我在非常接近的时间内获得两个事件并且并未显示我的所有数据时,我感到很困惑。似乎只有第二个事件的数据的尾部会显示,其余的都不见了。
我最终更改了我的代码,以便当我收到一个名为if(inputStream.available() > 0) while((aByte = read()) > -1)
的事件时存储字节。现在它工作正常,我的所有数据都显示出来了。
有人可以解释这种行为吗?据说InputStream.available()
会返回在阻塞下一个调用者(流的?)之前可以读取的字节数。即使我不使用 .available() 我希望读取第一个事件只会阻止读取第二个事件,但不会擦除或消耗太多流数据。为什么这样做会导致无法显示我的所有数据?
【问题讨论】:
您混淆了流的结尾,目前没有可用的数据。当当前没有数据可用时,它会阻塞。流结束,它返回-1。 【参考方案1】:InputStream
的某些实现的底层数据源可以发出已到达流末尾的信号,不再发送数据。在接收到此信号之前,对此类流的读取操作可能会阻塞。
例如,来自Socket
套接字的InputStream
将阻塞,而不是返回 EOF,直到收到设置了 FIN 标志的 TCP 数据包。当从这样的流中接收到 EOF 时,您可以确保在该套接字上发送的所有数据都已被可靠地接收,并且您将无法再读取任何数据。 (另一方面,如果阻塞读取导致异常,则可能会丢失一些数据。)
其他流,例如来自原始文件或串行端口的流,可能缺少类似的格式或协议,以表明没有更多数据可用。当当前没有数据可用时,此类流可以立即返回 EOF (-1) 而不是阻塞。但是,在没有这种格式或协议的情况下,您无法确定对方何时完成发送数据。
关于你的第二个问题,听起来你可能有竞争条件。在没有看到有问题的代码的情况下,我猜测问题实际上在于您的“显示”方法。也许第二个通知显示的尝试以某种方式破坏了第一个通知期间完成的工作。
【讨论】:
我将输入流用于 COM 端口。同样,如果我执行一次读取的 while 循环,它将始终返回 -1,直到设备(COM 端口的另一端)输出数据。我永远不必重新打开流。所以你的解释对我来说没有意义。设备是否每次都重新打开流? rxtx ...一个用于连接串行和并行端口的java库 嗯,我只使用了 javax.comm,它按预期实现了 InputStream。 rxtx 的实现可能违反了 InputStream.read() 的约定。 也许是设备。该设备正在使用带有 USB 接口的 COM/串行端口。这是一个奇怪的界面。 不,我认为显示器不是问题。我只是通过 system.out 流输出流的数据。【参考方案2】:如果它是流的结尾,则返回 -1。如果流仍然打开(即套接字连接)但没有数据到达读取端(服务器很慢,网络很慢,...) read() 阻塞。
你不需要调用 available()。我很难理解您的通知设计,但是除了 read() 本身之外,您不需要任何调用。方法 available() 只是为了方便。
【讨论】:
奇怪的是我需要 available() 在那里,否则我不会得到我的所有数据。你能解释一下为什么我需要 available() 吗? (available 甚至不能保证返回 0。)【参考方案3】:好的,这有点乱,所以首先让我们澄清一下:InputStream.read()
阻塞与多线程无关。如果您有多个线程从同一个输入流中读取,并且您触发了两个彼此非常接近的事件 - 每个线程都试图消费一个事件,那么您会得到损坏:第一个读取的线程将获得一些字节(可能所有字节),当第二个线程被调度时,它将读取其余的字节。如果您计划在多个线程中使用单个 IO 流,请始终使用synchronized()
进行一些外部约束。
其次,如果您可以从InputStream
读取直到您得到-1,然后等待并可以稍后再次读取,那么您正在使用的 InputStream 实现已损坏! InputStream
的合同明确指出,InputStream.read()
应该只在没有更多数据要读取时返回 -1,因为已经到达整个流的末尾并且不再有数据可用 - 就像当你从一个文件,你就走到了尽头。
“现在没有更多数据可用,请稍候,你会得到更多”的行为是让read()
阻塞,直到有一些数据可用(或抛出异常)才返回。
【讨论】:
很好的解释。我开始相信我正在使用的输入流实现已损坏。【参考方案4】:默认情况下,提供的 RXTX InputStream 的行为是不兼容的。
您必须将接收阈值设置为 1 并禁用接收超时:
serialPort.enableReceiveThreshold(1);
serialPort.disableReceiveTimeout();
来源:RXTX serial connection - issue with blocking read()
【讨论】:
【参考方案5】:是的!不要放弃你的直播 Jbu。 我们在这里谈论串行通信。对于串行的东西,绝对可以/将在读取时返回-1,但仍然期待稍后的数据。 问题是大多数人习惯于处理 TCP/IP,除非 TCP/IP 断开连接,否则它应该始终返回 0……然后是的,-1 是有道理的。 但是,使用 Serial 时,长时间没有数据流,也没有“HTTP Keep Alive”或 TCP/IP 心跳,或者(在大多数情况下)没有硬件流控制。但链接是物理的,仍然通过“铜”连接,并且仍然完美运行。
现在,如果他们所说的是正确的,即:串行应该在 -1 上关闭,那么为什么我们必须注意诸如 OnCTS、pmCarroerDetect、onDSR、onRingIndicator 等内容... 哎呀,如果 0 表示它存在,而 -1 表示它不存在,那么就搞砸所有这些检测功能! :-)
您可能面临的问题可能在其他地方。
现在,具体来说:
问:“好像只显示第二个事件数据的尾部,其余的都不见了。”
A:我猜你是在一个循环中,重复使用相同的 byte[] 缓冲区。第一条消息进来,还没有显示在屏幕/日志/标准输出上(因为你在循环中),然后你阅读第二条消息,替换缓冲区中的第一条消息数据。再说一次,因为我猜你不会存储你读了多少,然后确保将你的存储缓冲区偏移上一个读取量。
问:“我最终更改了我的代码,这样当我收到一个事件时,我调用了 if(inputStream.available() > 0) while((aByte = read()) > -1) 来存储字节。”
A:太棒了……这就是好东西。现在,您的数据缓冲区位于 IF 语句中,您的第二条消息不会破坏您的第一条消息......好吧,实际上,它可能只是第一个位置的一个大(er)消息。但现在,您将一口气读完所有内容,保持数据完整。
C:“...竞争条件...”
A:啊,好家伙抓住所有的替罪羊!竞态条件... :-) 是的,这可能是竞态条件,事实上它很可能是。但是,它也可能只是 RXTX 清除标志的方式。 “数据可用标志”的清除可能不会像人们预期的那样快。例如,任何人都知道 read VS readLine 与清除先前存储数据的缓冲区和重新设置事件标志之间的区别吗?我也没有。:-) 我也找不到答案……但是……让我再啰嗦几句。事件驱动编程仍然存在一些缺陷。让我给你一个我最近不得不处理的真实世界的例子。
我得到了一些 TCP/IP 数据,比如说 20 个字节。 所以我收到了接收数据的 OnEvent。 我什至从 20 个字节开始“读取”。 在我读完我的 20 个字节之前……我又得到了 10 个字节。 然而,TCP/IP 看起来通知我,哦,看到标志仍然是 SET,并且不会再次通知我。 但是,我读完了我的 20 个字节(available() 说有 20 个)... ... 最后 10 个字节保留在 TCP/IP Q... 因为我没有收到通知。看,通知被错过了,因为标志仍然设置......即使我已经开始读取字节。如果我完成了字节,那么标志就会被清除,并且我会收到接下来 10 个字节的通知。
与现在发生的事情完全相反。
所以,是的,使用 IF available() ... 读取返回的数据长度。 然后,如果您偏执,请设置一个计时器并再次调用 available(),如果那里仍有数据,则不读取新数据。如果 available() 返回 0(或 -1),则放松...坐等...等待下一个 OnEvent 通知。
【讨论】:
错了。您违反了 InputStream 合同。根据定义,到达流的end 意味着没有更多的数据到来。到达终点后,您将无法返回更多数据。这只是简单的英语。【参考方案6】:InputStream
只是一个抽象类,不幸的是实现决定了会发生什么。
如果什么也没找到会发生什么:
Sockets(即SocketInputStream
)将阻塞直到接收到数据(默认情况下)。但是可以设置超时(参见:setSoTimeout
),然后read
将阻塞 x 毫秒。如果仍然没有收到任何内容,则会抛出 SocketTimeoutException
。
但是无论是否超时,从SocketInputStream
读取有时会导致-1
。(例如,当多个客户端同时连接到同一个host:port
时,甚至尽管设备似乎已连接,但 read
的结果可能会立即返回 -1
(从不返回数据)。)
Serialio 通信将始终返回-1
;您还可以设置超时(使用setTimeoutRx
),read
将首先阻塞 x 毫秒,但如果没有找到,结果仍将是-1
。 (备注:但有多个串行 io 类可用,行为可能取决于供应商。)
文件(读取器或流)将生成 EOFException
。
制定通用解决方案:
如果您将上述任何流包装在DataInputStream
中,则可以使用 readByte
、readChar
等方法。 所有-1
值都转换为EOFException
。 (PS:如果您执行大量小读取,那么最好先将其包装在BufferedInputStream
中)
SocketTimeoutException
和 EOFException
都扩展了 IOException
,还有其他几种可能的IOException
。 只需检查IOException
就可以方便地检测通信问题。
另一个敏感话题是潮红。 flush
在套接字方面意味着“现在发送”,但在 Serialio 方面意味着“丢弃缓冲区”。
【讨论】:
我发现SocketInputStream.read(byte[] b) 设置一个简单的客户端服务器聊天程序时不会阻塞...我必须使用DataInputStream来装饰SocketInputStream,然后使用DataInputStream。 readUTF() 方法...如果没有收到消息,此方法将阻塞。 @JingHe ,过去我注意到read(byte[])
方法存在类似问题。也许它没有得到很好的实施。巨大的InputStream
接口的怪癖之一是,并非所有流都以应有的方式实现所有方法。 - 无论如何,我经常使用int read()
方法最终读取每个字节,如果您将流包装在BufferedInputStream
中,该方法仍然有效。在我使用它的应用程序中,我经常不得不评估每个字节,因为我需要寻找一个开始和停止字节。
@JingHe - 如果你最终会像我通常那样读取每个字节的字节 - 你可以将这些单独的字节写入固定大小的缓冲区(例如byte[] buffer = new byte[1024]
),但我个人喜欢以下构造更好:buffer = new ByteArrayOutputStream(); buffer.write(byte);
。 ByteArrayOutputStream
就像一个灵活的缓冲区(它会自动调整大小)。
非常感谢!我会试试 ByteArrayOutputStream。
除非对等方已关闭连接,否则从套接字读取不会导致 -1。多个客户与它无关。【参考方案7】:
我认为如果你使用thread.sleep()你可以接收到整个数据流
【讨论】:
这只是休眠调用 thread.sleep() 的线程。不是通过调用 thread.sleep() 读取/接收/处理数据。唯一发生的情况是,调用休眠以希望更多数据到来。以上是关于Java InputStream 阻塞读取的主要内容,如果未能解决你的问题,请参考以下文章
InputStream.available() 在 Java 中做了啥?