将管道/连接作为上下文参数传递给多处理 Pool.apply_async()

Posted

技术标签:

【中文标题】将管道/连接作为上下文参数传递给多处理 Pool.apply_async()【英文标题】:Passing a Pipe/Connection as context arg to multiprocessing Pool.apply_async() 【发布时间】:2013-11-19 19:40:37 【问题描述】:

我想使用管道与池中的流程实例通信,但出现错误:

让 __p 成为 Pool() 的一个实例:

    (master_pipe, worker_pipe) = Pipe()

    self.__p.apply_async(_worker_task, 
                         (handler_info, 
                          context_info,
                          worker_pipe))

当我执行此操作时,我收到以下错误 [对于每个实例,显然]:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
TypeError: Required argument 'handle' (pos 1) not found
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    return recv()
    return recv()
    self._target(*self._args, **self._kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 102, in worker
TypeError: Required argument 'handle' (pos 1) not found
TypeError: Required argument 'handle' (pos 1) not found
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
TypeError: Required argument 'handle' (pos 1) not found

该错误专门指的是我试图传递的 Connection 实例。如果我将其设为“无”,则工人分叉不会出错。

我不明白这一点,因为正如文档通过示例所强调的那样,我可以轻松地将相同的参数传递给 Process(),并让它完美地工作:

from multiprocessing import Pipe, Process
def call_me(p):
  print("Here: %s" % (p))

(master, worker) = Pipe()
p = Process(target=call_me, args=(worker,))
p.start()

Here: <read-write Connection, handle 6>

p.join()

【问题讨论】:

【参考方案1】:

看起来像这个讨论中提到的这个错误 (http://bugs.python.org/issue4892):Python 2.6 send connection object over Queue / Pipe / etc

池最初使用管道将子进程分叉,用于将任务/结果与子进程通信。它是在它爆炸的现有管道上传达您的 Pipe 对象 - 而不是在分叉上。 (失败是当子进程在队列抽象上尝试 get() 时)。

看起来问题的出现是因为 Pipe 对象是如何被腌制/取消腌制以进行通信的。

在您提到的第二种情况下,管道被传递给流程实例,然后分叉 - 因此行为有所不同。

我无法想象在纯任务分配之外主动与池进程通信是多处理池的预期用例。状态/协议方面,这意味着您希望对流程进行更多控制。这将需要比一般 Pool 对象所能知道的更多的上下文。

【讨论】:

如果它是 Process() 的一个允许和展示的用例,那么对于 Pools 也应该这样说。 正如我上面提到的,当实例化和启动(分叉)一个 Process 对象时,路径是不同的。这不需要通过现有管道传递 Pipe 对象。由于分叉,它在子进程的上下文中可用。 用例肯定也不同——这意味着子进程存在一定程度的持久性,需要一个协议来与父进程进行主动通信以实现任意目的。该池是专门为处理独立的、上下文无关的、原子任务而构建的——这就是“apply_async”被命名的全部原因。所以我的意思是,看起来你会让自己的事情变得比需要的更困难。至少在表面上——但你会比我更清楚。 Multiprocessing 有一个未记录的“reduction”模块,在官方文档的示例中简要提及(对于与您的非常相似的用例:)查找“一个工作池如何处理的示例。 ..”)。另请查看***.com/questions/8545307/…。该错误的官方修复最终利用了这个模块,但现在看起来你可以手动使用它来通信管道对象(使用 apply_async() 时在主进程上下文中减少并在移交给孩子的任务中重建过程)【参考方案2】:

这可以通过在创建池及其进程时使用初始化程序和 initargs 参数来解决。 诚然,还必须涉及一个全局变量。但是,如果您将工作代码放在单独的模块中,它看起来并没有那么糟糕。它只是该过程的全局性。 :-)

一个典型的情况是您希望您的工作进程将内容添加到多处理队列中。由于这与必须驻留在内存中某个位置的东西有关,因此酸洗将不起作用。即使它会起作用,它也只会复制有关某些进程有队列这一事实的数据。这与我们在这里想要的相反。我们想共享同一个队列。

所以这里是一个元代码示例:

包含工作代码的模块,我们称之为“worker_module”:

def worker_init(_the_queue):
    global the_queue
    the_queue = _the_queue

def do_work(_a_string):
    # Add something to the queue
    the_queue.put("the string " + _a_string)

然后创建池,然后让它做某事

# Import our functions
from worker_module import worker_init, do_work

# Good idea: Call it MPQueue to not confuse it with the other Queue
from multiprocessing import Queue as MPQueue
from multiprocessing import Pool

the_queue = MPQueue() 
# Initialize workers, it is only during initialization we can pass the_queue
the_pool = Pool(processes= 3, initializer=worker_init, initargs=[the_queue,])
# Do the work
the_pool.apply(do_work, ["my string",])
# The string is now on the queue
my_string = the_queue.get(True))

【讨论】:

【参考方案3】:

这是一个bug,已在 Python 3 中修复。

最简单的解决方案是按照其他答案中的建议通过池的初始化程序传递队列。

【讨论】:

以上是关于将管道/连接作为上下文参数传递给多处理 Pool.apply_async()的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将多个参数传递给 multiprocessing.pool? [复制]

使用on()jQuery方法将'this'作为参数传递给事件处理程序

如何将 dict 参数传递给 Python Pool.apply_async 方法?

extjs 上下文菜单单击,将父网格作为参数传递给控制器​​方法

将路径参数传递给 React 上下文提供程序

如何将函数与表连接起来,并将表列作为参数传递给雪花中的函数