Java中的命名管道和多线程
Posted
技术标签:
【中文标题】Java中的命名管道和多线程【英文标题】:Named pipes in Java and multithreading 【发布时间】:2013-05-16 16:10:49 【问题描述】:我是否正确我假设在同一个进程的范围内,有 2 个线程读取/写入命名管道根本不会阻塞读取器/写入器?那么如果时间错误,可能会丢失一些数据?
如果有多个进程 - 读取器将等待某些数据可用,而写入器将被阻塞,直到读取器读取读取器提供的所有数据?
我计划使用命名管道从外部进程传递几个(数十、数百个)文件,并在我的 Java 应用程序中使用这些文件。编写简单的单元测试以使用一个线程写入管道,而另一个线程用于从管道读取,由于缺少数据块而导致零星的测试失败。
我认为这是因为线程和相同的过程,所以我的测试通常不正确。这个假设正确吗?
这里有一些例子来说明这种情况:
import java.io.FileOutputStream, FileInputStream, File
import java.util.concurrent.Executors
import org.apache.commons.io.IOUtils
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class PipeTest extends FlatSpec
def md5sum(data: Array[Byte]) =
import java.security.MessageDigest
MessageDigest.getInstance("MD5").digest(data).map("%02x".format(_)).mkString
"Pipe" should "block here" in
val pipe = new File("/tmp/mypipe")
val srcData = new File("/tmp/random.10m")
val md5 = "8e0a24d1d47264919f9d47f5223c913e"
val executor = Executors.newSingleThreadExecutor()
executor.execute(new Runnable
def run()
(1 to 10).foreach
id =>
val fis = new FileInputStream(pipe)
assert(md5 === md5sum(IOUtils.toByteArray(fis)))
fis.close()
)
(1 to 10).foreach
id =>
val is = new FileInputStream(srcData)
val os = new FileOutputStream(pipe)
IOUtils.copyLarge(is, os)
os.flush()
os.close()
is.close()
Thread.sleep(200)
没有 Thread.sleep(200) 测试失败的原因
断管异常 MD5 和不正确使用此延迟设置 - 效果非常好。我正在使用包含 10 MB 随机数据的文件。
【问题讨论】:
您必须提供一些上下文。你从哪里读到命名管道永远不会阻塞?这听起来违反直觉,它们是……嗯……管道,而不是溢出的水桶。 就个人而言,丢失数据块听起来像是您的测试是错误的,您可能在某处破坏了流缓冲区。尝试使用多个进程,而不是线程进行测试。或者,在您的测试设置中发布一个问题,询问它为什么会丢失数据块。 (这会使这成为一个“XY 问题”问题。您应该询问您遇到的问题,而不是您对原因或解决方法的推测。) @millimoose 我并不是说命名管道永远不会阻塞,但这就是我在测试结果中看到的。 “Writer”线程可以将3个文件写入管道,“Reader”只能读取其中一个。这令人困惑。 命名管道是一个有限大小的 FIFO。如果它已满并且您没有进行非阻塞写入(此时您的写入可能交错),它将阻塞。你……做错了什么。更多信息:unix.stackexchange.com/questions/68146/… @jdevelop 不,您的意思是命名管道在其缓冲区已满时默默地丢弃数据,这听起来有点疯狂。这是一种有效的同步方法:一个无锁的有界队列拒绝接受数据而不是阻塞,但这通常会明确反映在 API 中。 (即 API 将包括能够判断队列是否已满,以及追加是否成功。) 【参考方案1】:这是您代码中的一个非常简单的竞争条件:您正在向管道写入固定大小的消息,并假设您可以读回相同的消息。但是,对于任何给定的读取,您不知道管道中有多少可用数据。
如果您在写入前加上写入的字节数,并确保每个读取只读取该字节数,您会发现管道完全按照宣传的方式工作。
如果您遇到多个写入者和/或多个读取者的情况,我建议使用实际的消息队列。实际上,我建议在任何情况下都使用消息队列,因为它解决了消息边界划分的问题;重新发明那个特定的***没有什么意义。
【讨论】:
我认为不是这样,因为我在读/写操作之前打开管道并在之后关闭它。所以根本不应该是比赛条件的问题。 我认为它应该防止在完全读取之前第二次尝试打开管道。 @jdevelop - 打开和关闭管道绝对不会改变管道的内容。它当然不能作为您的读取和写入之间的同步点。但是,即使这样做,您也会在阅读者和作家之间进行比赛。如果您认为文档中的内容有所不同,请适当更新您的问题。 我正在努力理解。如果进程 A 打开管道 P,并向其中写入一些数据,然后关闭管道。进程 B 正在读取管道 P。在进程 B 完成管道读取之前 - 进程 C 打开同一管道 P 并开始对其进行写入。这是否意味着进程 B 也会从进程 A 和进程 C 获取数据?如果是这样 - 那么我需要为我的 IPC 找到其他东西。 @jdevelop - 正是它的工作原理。命名管道只是字节的 FIFO 缓冲区。它不提供同步,除了在系统调用级别(即,单个写入将是原子的)。【参考方案2】:我是否正确我假设在同一个进程的范围内,有 2 个线程读取/写入命名管道不会阻塞读取器/写入器?
除非您使用非阻塞 I/O,否则不会。
所以如果时间错误,可能会丢失一些数据?
除非您使用非阻塞 I/O,否则不会。
【讨论】:
我认为这有点不准确。 “用尽”的命名管道不会阻止等待更多数据,而是返回 EOF,不是吗? @millimoose 没有错误,只是您的误解。我没有说读取总是阻塞。我说 OP 说它永远不会阻塞是错误的。以上是关于Java中的命名管道和多线程的主要内容,如果未能解决你的问题,请参考以下文章