如何检测多处理。管道已满?

Posted

技术标签:

【中文标题】如何检测多处理。管道已满?【英文标题】:How to detect multiprocessing.Pipe is full? 【发布时间】:2017-07-26 06:13:18 【问题描述】:

问题描述:我在 Python 中进行多处理,并使用 multiprocessing.Pipe() 来在进程之间进行通信。我一直在搜索,但仍然找不到检测管道是否已满的方法。例如下面的例子,writePipe 进程不断地将数字放入 2 个不同的管道(奇数和偶数)中,而 readPipe 进程不断地从这 2 个管道中读取。但是,从奇数管道读取的速度要快得多,因此偶数管道会被填满。此时,writePipe 进程将被阻塞,而 readPipe 进程仍在等待从 Odd Pipe 读取,从而导致死锁。

我的问题:有什么方法可以检测到管道已满,这样我们就可以在仍在运行的同时停止将数字放入完整的管道并将数字放入仍有空格的管道中?

from multiprocessing import Process, Pipe


def writePipe(sendNumberOdd, sendNumberEven):
    i = 0
    while True:
        if i % 2 == 0:
            sendNumberEven.send(i)
        else:
            sendNumberOdd.send(i)
        i += 1

def readPipe(recvNumberOdd, recvNumberEven):
    countEven = 0
    while True:
        countEven += 1
        print(countEven, recvNumberEven.recv())

        countOdd = 0
        while countOdd < 50:
            countOdd += 1
            print (countOdd, recvNumberOdd.recv())



if __name__ == '__main__':
    recvNumberOdd, sendNumberOdd = Pipe(duplex=False)
    recvNumberEven, sendNumberEven = Pipe(duplex=False)

    write = Process(target=writePipe, args=(sendNumberOdd, sendNumberEven))
    read = Process(target=readPipe, args=(recvNumberOdd, recvNumberEven))
    write.start()
    read.start()

    sendNumberOdd.close()
    sendNumberEven.close()

【问题讨论】:

【参考方案1】:

您可以使用select 模块中的select 函数来实现输出管道是否已满的测试。

import select
import multiprocessing.connection as mpc


def pipe_full(conn):
    r, w, x = select.select([], [conn], [], 0.0)
    return 0 == len(w)


i, o = mpc.Pipe(duplex=False)

n = 0
while not pipe_full(o):
    o.send(n)
    n += 1

print(' items fit.'.format(n))

【讨论】:

我认为pipe_full 测试select.PIPE_BUF 字节是否可写入管道(?)。 @user66081:是的,根据文档,这是管道情况下“完整”的定义:docs.python.org/3.9/library/select.html#select.PIPE_BUF【参考方案2】:

提案未经测试

class Connection(multiprocessing.Connection):
    def __init__(self, maxsize=0):
        self.__maxsize = maxsize
        self.size = 0
        self.__lock = multiprocessing.Lock

    def send(self, obj):
        with self.__lock:
            self.size += sizeof(obj)
        super().send(obj)

    def recv(self):
        _recv = super().recv()
        with self.__lock:
            self.size -= sizeof(_recv)
        return _recv

    def full(self):
        if self.__maxsize > 0:
            return self.size >= self.__maxsize
        return None

def Pipe(maxsize=0, duplex=True):
    return Connection(maxsize), Connection(maxsize)

实现poll() 以检查是否有任何数据准备就绪。

Python » 文档:poll([timeout])

Return whether there is any data available to be read.  

例如:

if recvNumberEven.poll():
    countEven += 1
    print(countEven, recvNumberEven.recv())

两者都可以替代使用wait(...)

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready.  
Returns the list of those objects in object_list which are ready.

【讨论】:

感谢您的回答。但是,我认为您误解了我的问题。我需要一种方法来检测管道是否已满,以停止从 writePipe 进程向管道发送更多数据以防止死锁。同时, poll() 只能用于知道 Pipe 中是否有任何数据要读取,因此在这种情况下它无济于事。我需要类似于 Queue.put(block=False) 或 Queue.put_nowait 的东西,它会引发 queue.Full 异常,以便在管道已满时进行处理。 @Le Quoc Khanh:不存在 Pipe is full 的情况,你唯一的解决方案是阻止 Consumer 进程到 永远流浪 . 再次感谢您,但据我了解,multiprocessing.Queue 建立在 Pipe 之上,它有一些机制来检测和引发 queue.Full 队列满时的异常。我试图阅读多处理库中 queues.py 的源代码,并注意到它使用了某种 Boundedsemaphore 来检测,但我仍然不明白其中的逻辑。你对此有什么线索吗? Queue使用大小计数器处理此问题,如果大小计数器等于缓冲区中的数据,则条件 FULLTrue。但是为什么要重新发明***,为什么Queue 不符合您的需求? 因为 Queue 的性能真的很慢(比 Pipe 慢 3 倍),因为 Queue 是通过一些我不需要的多生产者和多消费者安全机制实现的(我只在半双工中使用单生产者、单消费者 IPC)。由于我必须实时处理大量数据,因此我必须选择使用 Pipe 而不是 Queue。您能否更详细地解释 Queue 如何检测它是否已满,以及我如何对 Pipe 应用类似的方法。我非常感谢。非常感谢。

以上是关于如何检测多处理。管道已满?的主要内容,如果未能解决你的问题,请参考以下文章

如何结合python多处理和管道技术?

如果管道已满,写入管道的进程是不是会阻塞?

PHP多进程处理并行处理任务实例(转,备用)

PHP多进程处理并行处理任务实例

检测命名管道的关闭

多处理是不是支持命名管道(FIFO)?