跨衍生进程复制“multiprocessing.Queue”

Posted

技术标签:

【中文标题】跨衍生进程复制“multiprocessing.Queue”【英文标题】:Duplicating a `multiprocessing.Queue` across spawned processes 【发布时间】:2022-01-03 13:13:21 【问题描述】:

这似乎是 Google 和 *** 上很少讨论的话题,因为大多数问题都误解了如何使用 multiprocessing.Queue(我很可能正在这样做),或者只是解释了 Queue non-picklability 的问题。

Linux 5.10.15-zen2-1-zen:

import multiprocessing


class Example:
  def __init__(self):
    self.rx_queue = multiprocessing.Queue()

  def poll(self):
    print("received queue?", self.rx_queue.get())


if __name__ == "__main__":
  queue_to_duplicate = multiprocessing.Queue()

  ex = Example()
  ex_proc = multiprocessing.Process(target=ex.poll)
  ex_proc.start()

  ex.rx_queue.put(queue_to_duplicate)
  ex_proc.join()
  # RuntimeError: Queue objects should only be shared between processes through inheritance

我有以下示例代码,我的意图是能够传递一个multiprocessing.Queue 对象,或者复制文件描述符,以便我可以在接收过程中重新创建它。我这样做的本质是让一个主进程可以在给定另一个进程的队列的情况下启动一个通信通道,然后另一个进程可以建立自己的队列来完成双工。

我很可能错过了一个明显的替代方案,但我确实了解 Linux 支持 dup* 系统调用,并且 multiprocessing.QueueConnection 对象组成,这些对象薄薄地位于文件描述符之上,我可以理论上是重复的,但至于如何,我不确定。

【问题讨论】:

【参考方案1】:
import multiprocessing


class Example:
  def __init__(self):
    self.rx_queue = multiprocessing.Queue()

  def poll(self):
    queue = self.rx_queue.get()
    queue.put("echo")


if __name__ == "__main__":
  manager = multiprocessing.Manager()
  queue_to_duplicate = manager.Queue()

  ex = Example()
  ex_proc = multiprocessing.Process(target=ex.poll)
  ex_proc.start()

  ex.rx_queue.put(queue_to_duplicate)
  ex_proc.join()

  print(queue_to_duplicate.get())
  # ... "echo"

我对文档的监督,使用 multiprocessing.Manager 完全符合此目的。

【讨论】:

以上是关于跨衍生进程复制“multiprocessing.Queue”的主要内容,如果未能解决你的问题,请参考以下文章

跨不同存储帐户复制 Blob 存储中的目录

C# 检测衍生进程

jenkins不杀死衍生进程修改方法

抑制从衍生进程到终端的消息[重复]

如何从 Erlang 的衍生进程中获取返回值?

PyQt4 QProcess.startDetached() - 无法获得衍生进程的返回值和 PID